Documentation • Examples • Github • Discord
DBOS provides lightweight durable workflows built on top of Postgres. Instead of managing your own workflow orchestrator or task queue system, you can use DBOS to add durable workflows and queues to your program in just a few lines of code.
To get started, follow the quickstart to install this open-source library and connect it to a Postgres database. Then, annotate workflows and steps in your program to make it durable! That's all you need to do—DBOS is entirely contained in this open-source library, there's no additional infrastructure for you to configure or manage.
You should consider using DBOS if your application needs to reliably handle failures. For example, you might be building a payments service that must reliably process transactions even if servers crash mid-operation, or a long-running data pipeline that needs to resume seamlessly from checkpoints rather than restart from the beginning when interrupted.
Handling failures is costly and complicated, requiring complex state management and recovery logic as well as heavyweight tools like external orchestration services. DBOS makes it simpler: annotate your code to checkpoint it in Postgres and automatically recover from any failure. DBOS also provides powerful Postgres-backed primitives that makes it easier to write and operate reliable code, including durable queues, notifications, scheduling, event processing, and programmatic workflow management.
💾 Durable Workflows
DBOS workflows make your program durable by checkpointing its state in Postgres. If your program ever fails, when it restarts all your workflows will automatically resume from the last completed step.
You add durable workflows to your existing Python program by annotating ordinary functions as workflows and steps:
from dbos import DBOS
@DBOS.step()
def step_one():
...
@DBOS.step()
def step_two():
...
@DBOS.workflow()
def workflow()
step_one()
step_two()
Workflows are particularly useful for
- Orchestrating business processes so they seamlessly recover from any failure.
- Building observable and fault-tolerant data pipelines.
- Operating an AI agent, or any application that relies on unreliable or non-deterministic APIs.
📒 Durable Queues
DBOS queues help you durably run tasks in the background. You can enqueue a task (which can be a single step or an entire workflow) from a durable workflow and one of your processes will pick it up for execution. DBOS manages the execution of your tasks: it guarantees that tasks complete, and that their callers get their results without needing to resubmit them, even if your application is interrupted.
Queues also provide flow control, so you can limit the concurrency of your tasks on a per-queue or per-process basis. You can also set timeouts for tasks, rate limit how often queued tasks are executed, deduplicate tasks, or prioritize tasks.
You can add queues to your workflows in just a couple lines of code. They don't require a separate queueing service or message broker—just Postgres.
from dbos import DBOS, Queue
queue = Queue("example_queue")
@DBOS.step()
def process_task(task):
...
@DBOS.workflow()
def process_tasks(tasks):
task_handles = []
# Enqueue each task so all tasks are processed concurrently.
for task in tasks:
handle = queue.enqueue(process_task, task)
task_handles.append(handle)
# Wait for each task to complete and retrieve its result.
# Return the results of all tasks.
return [handle.get_result() for handle in task_handles]
⚙️ Programmatic Workflow Management
Your workflows are stored as rows in a Postgres table, so you have full programmatic control over them. Write scripts to query workflow executions, batch pause or resume workflows, or even restart failed workflows from a specific step. Handle bugs or failures that affect thousands of workflows with power and flexibility.
# Create a DBOS client connected to your Postgres database.
client = DBOSClient(database_url)
# Find all workflows that errored between 3:00 and 5:00 AM UTC on 2025-04-22.
workflows = client.list_workflows(status="ERROR",
start_time="2025-04-22T03:00:00Z", end_time="2025-04-22T05:00:00Z")
for workflow in workflows:
# Check which workflows failed due to an outage in a service called from Step 2.
steps = client.list_workflow_steps(workflow)
if len(steps) >= 3 and isinstance(steps[2]["error"], ServiceOutage):
# To recover from the outage, restart those workflows from Step 2.
DBOS.fork_workflow(workflow.workflow_id, 2)
🎫 Exactly-Once Event Processing
Use DBOS to build reliable webhooks, event listeners, or Kafka consumers by starting a workflow exactly-once in response to an event. Acknowledge the event immediately while reliably processing it in the background.
For example:
def handle_message(request: Request) -> None:
event_id = request.body["event_id"]
# Use the event ID as an idempotency key to start the workflow exactly-once
with SetWorkflowID(event_id):
# Start the workflow in the background, then acknowledge the event
DBOS.start_workflow(message_workflow, request.body["event"])