8000 Add minio storage for open source by guillaq · Pull Request #19 · WorkflowAI/WorkflowAI · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Add minio storage for open source #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,15 @@ WORKFLOWAI_MONGO_MIGRATIONS_ON_START= 8000 true
JOBS_BROKER_URL=redis://localhost:6379/0

# ================
# Azurite
# S3 Storage

# A local minio instance is used for testing, see docker-compose.yml
WORKFLOWAI_STORAGE_CONNECTION_STRING=s3://minio:miniosecret@localhost:9000/workflowai-task-runs?secure=false

# If using Azurite
# Not a secret, see https://github.com/Azure/Azurite?tab=readme-ov-file#connection-strings
# This points to the docker container used for testing that replicates Azure Blob Storage
WORKFLOWAI_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
# WORKFLOWAI_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;

# ================
# Allowed origins
Expand Down
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cp .env.sample .env
docker-compose build
# [Optional] Start the dependencies in the background, this way we can shut down the app while
# ke 8000 eping the dependencies running
docker-compose up -d clickhouse azurite redis mongo
docker-compose up -d clickhouse minio redis mongo
# Start the docker images
docker-compose up
# The WorkflowAI api is also a WorkflowAI user
Expand All @@ -33,6 +33,18 @@ docker-compose up
> latencies for development. Detailed setup for both the [client](./client/README.md) and [api](./api/README.md)
> are provided in their respective READMEs.

> For now, we rely on public read access to the storage in the frontend. The URLs are not discoverable though so it should be ok until we implement temporary leases for files.
> On minio that's possible with the following commands

```sh
# Run sh inside the running minio container
docker-compose exec minio sh
# Create an alias for the bucket
mc anonymous set download myminio/workflowai-task-runs
# Set download permissions
mc alias set myminio http://minio:9000 minio miniosecret
```

### Structure

#### API
Expand All @@ -53,9 +65,9 @@ The [client](./client/README.md) is a NextJS app that serves as a frontend
and query duration.
- **Redis**: We use Redis as a broker for messages for taskiq. TaskIQ supports a number
of different message broker.
- **Azure Blob Storage** is used to store files.

<!-- TODO: switch to S3 when -->
- **Minio** is used to store files but any _S3 compatible storage_ will do. We also have a plugin for _Azure Blob Storage_.
The selected storage depends on the `WORKFLOWAI_STORAGE_CONNECTION_STRING` env variable. A variable starting with
`s3://` will result in the S3 storage being used.

### Setting up providers

Expand Down
11 changes: 3 additions & 8 deletions api/api/dependencies/services.py
5D32
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from collections.abc import Callable
from typing import Annotated

Expand All @@ -19,6 +18,7 @@
TranscriptionStorageDep,
)
from api.dependencies.task_info import TaskTupleDep
from api.services import file_storage
from api.services.analytics import AnalyticsService, analytics_service
from api.services.api_keys import APIKeyService
from api.services.feedback_svc import FeedbackTokenGenerator
Expand All @@ -36,7 +36,7 @@
from api.services.versions import VersionsService
from core.deprecated.workflowai import WorkflowAI
from core.domain.users import UserIdentifier
from core.storage.azure.azure_blob_file_storage import AzureBlobFileStorage, FileStorage
from core.storage.file_storage import FileStorage


async def analytics_service_dependency(
Expand All @@ -57,12 +57,7 @@ async def analytics_service_dependency(


def file_storage_dependency() -> FileStorage:
# We return an empty blob storage if the env variables are not set
# Currently we instantiate the storage every time but it is rarely needed
return AzureBlobFileStorage(
os.getenv("WORKFLOWAI_STORAGE_CONNECTION_STRING", ""),
os.getenv("WORKFLOWAI_STORAGE_TASK_RUNS_CONTAINER", ""),
)
return file_storage.shared_file_storage


FileStorageDep = Annotated[FileStorage, Depends(file_storage_dependency)]
Expand Down
4 changes: 3 additions & 1 deletion api/api/jobs/common.py
AE96
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def storage_dep(event: EventDep, event_router: EventRouterDep) -> BackendStorage


def file_storage_dep() -> FileStorage:
return storage_service.file_storage_for_tenant()
from api.services import file_storage as file_storage_service

return file_storage_service.shared_file_storage


FileStorageDep = Annotated[FileStorage, TaskiqDepends(file_storage_dep)]
Expand Down
16 changes: 13 additions & 3 deletions api/api/routers/upload.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import mimetypes

from fastapi import APIRouter, UploadFile
from pydantic import BaseModel

from api.dependencies.security import TenantDep
from api.dependencies.services import FileStorageDep
from core.domain.fields.file import DomainUploadFile
from core.storage.file_storage import FileData

router = APIRouter(prefix="/upload")

Expand All @@ -12,6 +14,14 @@ class UploadFileResponse(BaseModel):
url: str


def _content_type(file: UploadFile) -> str | None:
if file.content_type:
return file.content_type
if not file.filename:
return None
return mimetypes.guess_type(file.filename)[0]


@router.post("/{task_id}", response_model=UploadFileResponse)
async def upload_file(
file_storage: FileStorageDep,
Expand All @@ -20,10 +30,10 @@ async def upload_file(
file: UploadFile,
) -> UploadFileResponse:
url = await file_storage.store_file(
DomainUploadFile(
FileData(
filename=file.filename or F438 "",
contents=await file.read(),
content_type=file.content_type or "",
content_type=_content_type(file),
),
f"{tenant}/{task_id}/uploads",
)
Expand Down
40 changes: 40 additions & 0 deletions api/api/services/file_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging
import os
from typing import override

from core.storage.file_storage import FileData, FileStorage


def _default_file_storage() -> FileStorage:
# We return an empty blob storage if the env variables are not set

connection_string = os.getenv("WORKFLOWAI_STORAGE_CONNECTION_STRING", "")
if connection_string.startswith("DefaultEndpointsProtocol"):
from core.storage.azure.azure_blob_file_storage import AzureBlobFileStorage

return AzureBlobFileStorage(
os.getenv("WORKFLOWAI_STORAGE_CONNECTION_STRING", ""),
os.getenv("WORKFLOWAI_STORAGE_TASK_RUNS_CONTAINER", "workflowai-task-runs"),
)

if connection_string.startswith("s3://"):
from core.storage.s3.s3_file_storage import S3FileStorage

return S3FileStorage(
connection_string,
os.getenv("WORKFLOWAI_STORAGE_TASK_RUNS_CONTAINER", ""),
)

logging.getLogger(__name__).warning(
"No file storage configured, using noop file storage. Set WORKFLOWAI_STORAGE_CONNECTION_STRING to use a real file storage.",
)

class NoopFileStorage(FileStorage):
@override
async def store_file(self, file: FileData, folder_path: str) -> str:
return ""

return NoopFileStorage()


shared_file_storage = _default_file_storage()
9 changes: 7 additions & 2 deletions api/api/services/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from core.storage.abstract_storage import AbstractStorage
from core.storage.azure.azure_blob_file_storage import CouldNotStoreFileError, FileStorage
from core.storage.backend_storage import BackendStorage
from core.storage.file_storage import FileData
from core.utils.dicts import InvalidKeyPathError, delete_at_keypath, set_at_keypath
from core.utils.models.dumps import safe_dump_pydantic_model

Expand Down Expand Up @@ -226,12 +227,16 @@ async def _store_files(
files: list[FileWithKeyPath],
) -> list[FileWithKeyPath]:
for file in files:
if not file.data:
bts = file.content_bytes()
if not bts:
# Skipping, only reason a file might not have data is if it's private
continue

try:
file.storage_url = await file_storage.store_file(file, folder_path=folder_path)
file.storage_url = await file_storage.store_file(
FileData(contents=bts, content_type=file.content_type),
folder_path=folder_path,
)
except CouldNotStoreFileError as e:
_logger.exception(
"Error storing file",
Expand Down
9 changes: 0 additions & 9 deletions api/api/services/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from base64 import b64decode

from core.domain.events import EventRouter
from core.storage.azure.azure_blob_file_storage import AzureBlobFileStorage, FileStorage
from core.storage.backend_storage import SystemBackendStorage
from core.storage.combined.combined_storage import CombinedStorage
from core.storage.mongo.mongo_storage import MongoStorage
Expand Down Expand Up @@ -41,11 +40,3 @@ def storage_for_tenant(

def system_storage(encryption: Encryption | None = None) -> SystemBackendStorage:
return storage_for_tenant("__system__", -1, no_op.event_router, encryption)


# TODO: add tenant param + prefix all file paths with the tenant
def file_storage_for_tenant() -> FileStorage:
return AzureBlobFileStorage(
os.environ["WORKFLOWAI_STORAGE_CONNECTION_STRING"],
os.environ.get("WORKFLOWAI_STORAGE_TASK_RUNS_CONTAINER", "workflowai-task-runs"),
)
4 changes: 2 additions & 2 deletions api/core/deprecated/workflowai.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from core.runners.abstract_runner import AbstractRunner, CacheFetcher
from core.runners.workflowai.workflowai_runner import WorkflowAIRunner
from core.storage.abstract_storage import AbstractStorage
from core.storage.azure.azure_blob_file_storage import FileStorage
from core.storage.file_storage import FileStorage
from core.storage.noop_storage import NoopStorage
from core.utils.no_op import NoopFileStorage

Expand Down Expand Up @@ -89,7 +89,7 @@ def __init__(
self._run_service = run_service
self.storage: AbstractStorage = storage or NoopStorage()
self._logger = logging.getLogger(self.__class__.__name__)
self._file_storage = file_storage or NoopFileStorage()
self._file_storage: FileStorage = file_storage or NoopFileStorage()
self._cache_fetcher = cache_fetcher

@classmethod
Expand Down
30 changes: 5 additions & 25 deletions api/core/domain/fields/file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import hashlib
import logging
import mimetypes
from base64 import b64decode
Expand Down Expand Up @@ -93,38 +92,19 @@ def is_text(self) -> bool | None:
return None
return self.content_type in ["text/plain", "text/markdown", "text/csv", "text/json", "text/html"]

def get_content_hash(self) -> str:
if self.data:
return hashlib.sha256(self.data.encode()).hexdigest()
if self.url:
if self.url.startswith("data:"):
_, data = _parse_data_url(self.url[5:])
return hashlib.sha256(data.encode()).hexdigest()
return hashlib.sha256(self.url.encode()).hexdigest()
raise ValueError("No data or URL provided for file")

def get_extension(self) -> str:
if self.content_type:
return mimetypes.guess_extension(self.content_type) or ""
return ""

def content_bytes(self) -> bytes | None:
if self.data:
return b64decode(self.data)
return None


def _parse_data_url(data_url: str) -> tuple[str, str]:
splits = data_url.split(";base64,")
if len(splits) != 2:
raise ValueError("Invalid base64 data URL")
return splits[0], splits[1]


class DomainUploadFile(BaseModel):
filename: str
contents: bytes
content_type: str

def get_content_hash(self) -> str:
return hashlib.sha256(self.contents).hexdigest()

def get_extension(self) -> str:
if self.filename:
return "." + self.filename.split(".")[-1]
return ""
13 changes: 0 additions & 13 deletions api/core/domain/fields/file_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import hashlib

from core.domain.fields.file import File


Expand Down Expand Up @@ -29,14 +27,3 @@ def test_validate_data_content_type(self):
def test_validate_data_content_type_none(self):
img = File(url="https://bla.com/file")
assert img.content_type is None

def test_get_content_hash(self):
img = File(url="https://bla.com/file.png")
assert img.url is not None
assert img.get_content_hash() == hashlib.sha256(img.url.encode()).hexdigest()

img = File(data="iVBORw0KGgoAAAANSUhEUgAAAAUA")
assert img.get_content_hash() == hashlib.sha256(b"iVBORw0KGgoAAAANSUhEUgAAAAUA").hexdigest()

img2 = File(url="")
assert img2.get_content_hash() == hashlib.sha256(b"iVBORw0KGgoAAAANSUhEUgAAAAUA").hexdigest()
4 changes: 2 additions & 2 deletions api/core/providers/amazon_bedrock/amazon_bedrock_auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

import boto3 # pyright: ignore [reportMissingTypeStubs]
import boto3
import httpx
from botocore.auth import SigV4Auth # pyright: ignore [reportMissingTypeStubs]
from botocore.awsrequest import AWSRequest # pyright: ignore [reportMissingTypeStubs]
Expand All @@ -14,7 +14,7 @@ def _get_session(
aws_secret_key: str | None,
aws_session_token: str | None,
region: str | None,
) -> boto3.Session:
):
return boto3.Session(
region_name=region,
aws_access_key_id=aws_access_key,
Expand Down
Loading
0