nightly-report
Cron-fired workflow that demonstrates the rest of the workflow engine in one example: a recurring
schedule kicks off the parent, the parent uses ctx:side_effect to capture a non-deterministic
report ID, scans for "anomalies", and starts a child workflow per anomaly to handle each in
parallel-ish (each child runs independently against the same engine).
What it does
NightlyReport(input) ← fired by cron
├─> side_effect "issue_report_id" → "rep-2026-04-16-XXXXX"
├─> activity "scan_anomalies" → returns [a1, a2, a3]
└─> for each anomaly:
start_child_workflow "HandleAnomaly" {anomaly}
└─> activity "remediate" → {fixed = true, anomaly}
return {report_id, anomalies_handled = 3}
side_effect makes the report ID stable across worker crashes — even if the worker dies after the
report ID is generated but before the scan runs, the next worker re-replaying the workflow will see
the same report ID from the event log instead of generating a new one.
Run
# Terminal 1
assay serve
# Terminal 2
cd examples/workflows/nightly-report
assay run worker.lua
Wire up the schedule (one-off; persists in the engine DB):
# Fires every 5 seconds for the demo. Replace with "0 0 0 * * *" for
# real nightly cadence (the cron crate wants 6/7 fields, with seconds).
curl -X POST http://localhost:8080/api/v1/schedules \
-H 'Content-Type: application/json' \
-d '{
"namespace": "main",
"name": "nightly-report",
"workflow_type": "NightlyReport",
"cron_expr": "*/5 * * * * *",
"task_queue": "default",
"input": {"region": "eu-west-1"}
}'
Within ~15s (one scheduler tick) you'll see workflows appearing on the dashboard:
- One
NightlyReportworkflow per fire - Three
HandleAnomalychild workflows perNightlyReport
Pause the schedule when you've seen enough:
curl -X POST 'http://localhost:8080/api/v1/schedules/nightly-report/pause?namespace=main'
What to look for
- In the event log of a
NightlyReportworkflow:WorkflowStarted,SideEffectRecorded,ActivityScheduled(scan),ActivityCompleted,ChildWorkflowStarted× 3,ChildWorkflowCompleted× 3,WorkflowCompleted. - In a child
HandleAnomaly:parent_idset, normal activity-driven flow. - In the workers view: one worker, registered for the
defaultqueue, polling both workflow tasks and activity tasks.
Source
worker.lua
-- nightly-report — cron + side_effect + child workflows in one file.
-- Run: assay run worker.lua (with `assay serve` running on :8080)
local workflow = require("assay.workflow")
workflow.connect(env.get("ASSAY_ENGINE_URL") or "http://localhost:8080")
-- The parent workflow: kicked off by the cron schedule.
workflow.define("NightlyReport", function(ctx, input)
-- side_effect lets us pull a fresh report ID without breaking
-- determinism — the value is captured in the event log on the first
-- replay and returned from cache thereafter, even if a worker crashes
-- between this call and the next step.
local report = ctx:side_effect("issue_report_id", function()
return "rep-" .. tostring(os.time()) .. "-" .. tostring(math.random(10000, 99999))
end)
local scan = ctx:execute_activity("scan_anomalies", {
region = input.region,
report_id = report,
})
-- Spawn one HandleAnomaly child per anomaly. Each child workflow_id
-- is deterministic (parent report id + anomaly index), so a replay
-- finds the existing child instead of starting a new one.
for i, anomaly in ipairs(scan.anomalies) do
ctx:start_child_workflow("HandleAnomaly", {
workflow_id = report .. "-anomaly-" .. tostring(i),
input = { anomaly = anomaly, report_id = report },
})
end
return {
report_id = report,
region = input.region,
anomalies_handled = #scan.anomalies,
}
end)
-- The child workflow: handles a single anomaly.
workflow.define("HandleAnomaly", function(ctx, input)
local r = ctx:execute_activity("remediate", input)
return { fixed = r.fixed, anomaly = input.anomaly }
end)
-- Activities — these would do real work (DB scans, alerts, etc.) in a
-- production setup. Here they're deterministic stand-ins.
workflow.activity("scan_anomalies", function(ctx, input)
-- Pretend we found three anomalies in this region
return {
region = input.region,
anomalies = {
{ id = "a-1", kind = "stale_lock" },
{ id = "a-2", kind = "missing_index" },
{ id = "a-3", kind = "orphaned_record" },
},
}
end)
workflow.activity("remediate", function(ctx, input)
-- Pretend we fixed it
return { fixed = true, anomaly = input.anomaly }
end)
log.info("nightly-report worker ready — POST a schedule to fire it")
workflow.listen({ queue = "default" })