← All Examples

nightly-report

Cron-fired workflow that demonstrates the rest of the workflow engine in one example: a recurring schedule kicks off the parent, the parent uses ctx:side_effect to capture a non-deterministic report ID, scans for "anomalies", and starts a child workflow per anomaly to handle each in parallel-ish (each child runs independently against the same engine).

What it does

NightlyReport(input)              ← fired by cron
  ├─> side_effect "issue_report_id" → "rep-2026-04-16-XXXXX"
  ├─> activity "scan_anomalies"     → returns [a1, a2, a3]
  └─> for each anomaly:
        start_child_workflow "HandleAnomaly" {anomaly}
          └─> activity "remediate"  → {fixed = true, anomaly}
  return {report_id, anomalies_handled = 3}

side_effect makes the report ID stable across worker crashes — even if the worker dies after the report ID is generated but before the scan runs, the next worker re-replaying the workflow will see the same report ID from the event log instead of generating a new one.

Run

# Terminal 1
assay serve

# Terminal 2
cd examples/workflows/nightly-report
assay run worker.lua

Wire up the schedule (one-off; persists in the engine DB):

# Fires every 5 seconds for the demo. Replace with "0 0 0 * * *" for
# real nightly cadence (the cron crate wants 6/7 fields, with seconds).
curl -X POST http://localhost:8080/api/v1/schedules \
  -H 'Content-Type: application/json' \
  -d '{
    "namespace": "main",
    "name": "nightly-report",
    "workflow_type": "NightlyReport",
    "cron_expr": "*/5 * * * * *",
    "task_queue": "default",
    "input": {"region": "eu-west-1"}
  }'

Within ~15s (one scheduler tick) you'll see workflows appearing on the dashboard:

Pause the schedule when you've seen enough:

curl -X POST 'http://localhost:8080/api/v1/schedules/nightly-report/pause?namespace=main'

What to look for

Source

worker.lua

-- nightly-report — cron + side_effect + child workflows in one file.
-- Run: assay run worker.lua  (with `assay serve` running on :8080)

local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL") or "http://localhost:8080")

-- The parent workflow: kicked off by the cron schedule.
workflow.define("NightlyReport", function(ctx, input)
    -- side_effect lets us pull a fresh report ID without breaking
    -- determinism — the value is captured in the event log on the first
    -- replay and returned from cache thereafter, even if a worker crashes
    -- between this call and the next step.
    local report = ctx:side_effect("issue_report_id", function()
        return "rep-" .. tostring(os.time()) .. "-" .. tostring(math.random(10000, 99999))
    end)

    local scan = ctx:execute_activity("scan_anomalies", {
        region = input.region,
        report_id = report,
    })

    -- Spawn one HandleAnomaly child per anomaly. Each child workflow_id
    -- is deterministic (parent report id + anomaly index), so a replay
    -- finds the existing child instead of starting a new one.
    for i, anomaly in ipairs(scan.anomalies) do
        ctx:start_child_workflow("HandleAnomaly", {
            workflow_id = report .. "-anomaly-" .. tostring(i),
            input = { anomaly = anomaly, report_id = report },
        })
    end

    return {
        report_id = report,
        region = input.region,
        anomalies_handled = #scan.anomalies,
    }
end)

-- The child workflow: handles a single anomaly.
workflow.define("HandleAnomaly", function(ctx, input)
    local r = ctx:execute_activity("remediate", input)
    return { fixed = r.fixed, anomaly = input.anomaly }
end)

-- Activities — these would do real work (DB scans, alerts, etc.) in a
-- production setup. Here they're deterministic stand-ins.
workflow.activity("scan_anomalies", function(ctx, input)
    -- Pretend we found three anomalies in this region
    return {
        region = input.region,
        anomalies = {
            { id = "a-1", kind = "stale_lock" },
            { id = "a-2", kind = "missing_index" },
            { id = "a-3", kind = "orphaned_record" },
        },
    }
end)

workflow.activity("remediate", function(ctx, input)
    -- Pretend we fixed it
    return { fixed = true, anomaly = input.anomaly }
end)

log.info("nightly-report worker ready — POST a schedule to fire it")
workflow.listen({ queue = "default" })