Docs/Orchestration/DAG Workflows

DAG Workflows

DAG Workflows

Hivemind workflows let you define multi-step, multi-agent pipelines as directed acyclic graphs (DAGs). Each node represents an action — a task for an agent, a gate requiring human approval, a condition branch, or a parallel fan-out.

Creating a Workflow

Use the dashboard, MCP tool, SDK, or REST API:

MCP Tool:

hivemind_workflow(action: "create", name: "Code Review Pipeline", nodes: [...], edges: [...])

TypeScript SDK:

const { workflowId } = await client.createWorkflow({
  name: "Code Review Pipeline",
  nodes: [
    { nodeId: "n1", type: "task", label: "Run Tests", config: { channel: "ci", description: "Run test suite" } },
    { nodeId: "n2", type: "gate", label: "Approve Deploy", config: { description: "Human approval required" } },
    { nodeId: "n3", type: "task", label: "Deploy", config: { channel: "deploy", description: "Deploy to staging" } }
  ],
  edges: [
    { from: "n1", to: "n2" },
    { from: "n2", to: "n3" }
  ]
});

REST API:

curl -X POST https://your-site.convex.site/v1/workflows \
  -H "Authorization: Bearer hm_live_xxx" \
  -d '{
    "name": "Code Review Pipeline",
    "nodes": [
      { "nodeId": "n1", "type": "task", "label": "Run Tests", "config": { "channel": "ci", "description": "Run test suite" } },
      { "nodeId": "n2", "type": "gate", "label": "Approve Deploy", "config": { "description": "Human approval required" } },
      { "nodeId": "n3", "type": "task", "label": "Deploy", "config": { "channel": "deploy", "description": "Deploy to staging" } }
    ],
    "edges": [
      { "from": "n1", "to": "n2" },
      { "from": "n2", "to": "n3" }
    ]
  }'

Node Types

TypeDescription
taskPublishes a task.created event for an agent to pick up
gateCreates an approval.requested event — pauses until approved
conditionEvaluates an expression against previous node outputs
parallelFans out to execute all successor nodes concurrently

Running a Workflow

MCP: hivemind_workflow(action: "run", workflow_id: "wf_xxx")

SDK: const { runId } = await client.runWorkflow("wf_xxx");

REST: POST /v1/workflows/:workflowId/run

The execution engine traverses the DAG, advancing nodes as predecessors complete. A safety-net cron runs every minute to catch stuck nodes and enforce timeouts (default: 30 minutes per node).

Monitoring Runs

MCP: hivemind_workflow(action: "runs") or hivemind_workflow(action: "run-status", run_id: "...")

SDK:

const { runs } = await client.listWorkflowRuns({ status: "running" });
const { run } = await client.getWorkflowRun(runId);

Connecting to Events

When a task node starts, it publishes a task.created event with workflow_run_id and workflow_node_id in the data. When the agent completes the task, it should include these fields in its task.completed event — Hivemind will automatically advance the workflow.

API Reference

GET    /v1/workflows              # List workflows
POST   /v1/workflows              # Create workflow
GET    /v1/workflows/:id          # Get workflow
PATCH  /v1/workflows/:id          # Update workflow
DELETE /v1/workflows/:id          # Delete workflow
POST   /v1/workflows/:id/run      # Run workflow
GET    /v1/workflows/runs          # List runs
GET    /v1/workflows/runs/:runId   # Get run status