Documentation Index
Fetch the complete documentation index at: https://upstash.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
Some workflows require human approval or input before proceeding. When combined with Upstash Realtime, you can create interactive workflows that pause for user input and provide real-time feedback to your frontend during the entire process.
This guide shows you how to implement a human-in-the-loop workflow pattern with real-time updates using Upstash Workflow and Upstash Realtime.
How It Works
In a human-in-the-loop workflow:
- The workflow executes initial steps and emits progress events
- The workflow pauses at a specific point using
context.waitForEvent()
- A “waiting for input” event is emitted to notify the frontend
- The user makes a decision in the frontend (approve/reject)
- The frontend calls an API to notify the workflow using
client.notify()
- The workflow resumes with the user’s decision
- An “input resolved” event is emitted so the frontend can update its UI
- The workflow continues and completes based on the decision
Prerequisites
- An Upstash account with:
- A QStash project for workflows
- A Redis database for Realtime
- Next.js application set up
- Completed the basic real-time workflow setup
Event Types
For human-in-the-loop workflows, extend your schema in lib/realtime.ts with these additional event types:
const schema = {
workflow: {
runFinish: z.object({}),
stepFinish: z.object({
stepName: z.string(),
result: z.unknown().optional(),
}),
waitingForInput: z.object({
eventId: z.string(),
message: z.string(),
}),
inputResolved: z.object({
eventId: z.string(),
}),
},
};
The new event types are:
waitingForInput: Emitted when the workflow pauses and needs user input
inputResolved: Emitted when the user provides input, so the frontend knows to clear the waiting state
Create the Realtime Middleware
Create a custom middleware that will emit events to Realtime at lib/middleware.ts:
import { WorkflowMiddleware } from "@upstash/workflow";
import { realtime } from "./realtime";
export const realtimeMiddleware = new WorkflowMiddleware({
name: "realtime-events",
callbacks: {
beforeExecution: async ({ context, stepName }) => {
const channel = realtime.channel(context.workflowRunId);
// Detect wait-for-event steps and emit waitingForInput
if (stepName === "wait-for-approval") {
await channel.emit("workflow.waitingForInput", {
eventId: `approval-${context.workflowRunId}`,
message: `Waiting for approval`,
});
}
},
afterExecution: async ({ context, stepName, result }) => {
const channel = realtime.channel(context.workflowRunId);
// Emit inputResolved after wait-for-event steps complete
if (stepName === "wait-for-approval") {
await channel.emit("workflow.inputResolved", {
eventId: `approval-${context.workflowRunId}`,
});
}
// Emit stepFinish for all steps
await channel.emit("workflow.stepFinish", {
stepName,
result,
});
},
runCompleted: async ({ context }) => {
const channel = realtime.channel(context.workflowRunId);
await channel.emit("workflow.runFinish", {});
},
},
});
Key points:
- The middleware handles all realtime event emissions automatically
beforeExecution: Detects wait-for-event steps by checking the stepName and emits workflow.waitingForInput
afterExecution: Emits workflow.inputResolved for wait steps and workflow.stepFinish for all steps
runCompleted: Emits workflow.runFinish when the workflow finishes
- All emission logic is centralized in the middleware, keeping workflow code clean
Building the Workflow
1. Create the Workflow Endpoint
Create your workflow at app/api/workflow/human-in-loop/route.ts:
app/api/workflow/human-in-loop/route.ts
import { serve } from "@upstash/workflow/nextjs";
import { realtimeMiddleware } from "@/lib/middleware";
type WorkflowPayload = {
userId: string;
action: string;
};
export const { POST } = serve<WorkflowPayload>(
async (context) => {
const { userId, action } = context.requestPayload;
// Step 1: Initial Processing
await context.run("initial-processing", async () => {
// Your processing logic
return {
preprocessed: true,
userId,
action,
requiresApproval: true,
};
});
// Step 2: Wait for Human Approval
const eventId = `approval-${context.workflowRunId}`;
const { eventData, timeout } = await context.waitForEvent<{
approved: boolean;
}>("wait-for-approval", eventId, { timeout: "5m" });
// Handle timeout
if (timeout) {
return { success: false, reason: "timeout" };
}
const status = eventData.approved ? "approved" : "rejected";
// Step 3: Process based on approval
await context.run(`process-${status}`, async () => {
return {
status,
processedAt: Date.now(),
action,
userId,
};
});
// Step 4: Finalize (only if approved)
if (eventData.approved) {
// Additional steps...
}
return {
success: true,
approved: eventData.approved,
workflowRunId: context.workflowRunId,
};
},
{
middlewares: [realtimeMiddleware],
}
);
Key patterns:
- Middleware for all events: The
realtimeMiddleware automatically handles all realtime event emissions by detecting wait-for-event steps through the stepName
- Step name detection: The middleware checks if
stepName === "wait-for-approval" to know when to emit waitingForInput and inputResolved events
- Unique event IDs: Use a unique
eventId (like approval-${workflowRunId}) to identify which approval request this is
- Timeout handling: Always handle the timeout case when waiting for events
2. Create the Notify Endpoint
Create an endpoint at app/api/notify/route.ts to handle user input:
import { Client } from "@upstash/workflow";
import { NextRequest, NextResponse } from "next/server";
const workflowClient = new Client({
baseUrl: process.env.QSTASH_URL!,
token: process.env.QSTASH_TOKEN!,
});
export async function POST(request: NextRequest) {
const body = await request.json();
const { eventId, eventData } = body;
if (!eventId) {
return NextResponse.json(
{ success: false, error: "eventId is required" },
{ status: 400 }
);
}
// Notify the workflow
await workflowClient.notify({
eventId,
eventData,
});
return NextResponse.json({ success: true });
}
Preventing Race Conditions: If you need to trigger a workflow and immediately send it an event, you can use the workflowRunId parameter to enable lookback:await workflowClient.notify({
eventId,
eventData,
workflowRunId: "wfr_abc123", // Ensures notification is delivered even if sent before waitForEvent
});
Learn more in the notify documentation.
Building the Frontend
1. Extend the Custom Hook
Extend your hook from the basic example to handle waiting states:
"use client";
import { useRealtime } from "@/lib/realtime-client";
import { useState, useCallback } from "react";
interface WorkflowStep {
stepName: string;
result?: unknown;
}
interface WaitingState {
eventId: string;
message: string;
}
export function useWorkflowWithRealtime() {
const [workflowRunId, setWorkflowRunId] = useState<string | null>(null);
const [steps, setSteps] = useState<WorkflowStep[]>([]);
const [waitingState, setWaitingState] = useState<WaitingState | null>(null);
const [isTriggering, setIsTriggering] = useState(false);
const [isRunFinished, setIsRunFinished] = useState(false);
useRealtime({
enabled: !!workflowRunId,
channels: workflowRunId ? [workflowRunId] : [],
events: [
"workflow.stepFinish",
"workflow.runFinish",
"workflow.waitingForInput",
"workflow.inputResolved",
],
onData({ event, data }) {
if (event === "workflow.stepFinish") {
setSteps((prev) => [
...prev,
{
stepName: data.stepName,
result: data.result,
},
]);
} else if (event === "workflow.runFinish") {
setIsRunFinished(true);
} else if (event === "workflow.inputResolved") {
// Clear waiting state if it matches
setWaitingState((prev) =>
prev?.eventId === data.eventId ? null : prev
);
} else if (event === "workflow.waitingForInput") {
setWaitingState({
eventId: data.eventId,
message: data.message,
});
}
},
});
const trigger = useCallback(async () => {
setIsTriggering(true);
setSteps([]);
setWaitingState(null);
setIsRunFinished(false);
const response = await fetch("/api/trigger", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ workflowType: "human-in-loop" }),
});
const data = await response.json();
setWorkflowRunId(data.workflowRunId);
setIsTriggering(false);
}, []);
const continueWorkflow = useCallback(
async (data: { approved: boolean }) => {
if (!waitingState) {
throw new Error("No workflow waiting for input");
}
const response = await fetch("/api/notify", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
eventId: waitingState.eventId,
eventData: data,
}),
});
if (!response.ok) {
throw new Error("Failed to notify workflow");
}
// The waiting state will be cleared when we receive inputResolved event
},
[waitingState]
);
return {
trigger,
continueWorkflow,
isTriggering,
workflowRunId,
steps,
waitingState,
isRunFinished,
};
}
Key additions:
waitingState: Tracks when the workflow is waiting for input
continueWorkflow: Function to submit user decisions back to the workflow
- Multiple events subscription: Uses
events array to subscribe to multiple event types
- Input resolved handling: Clears the waiting state when the workflow receives the user’s input
2. Use the Hook with Approval UI
"use client";
import { useWorkflowWithRealtime } from "@/hooks/useWorkflowWithRealtime";
export default function WorkflowPage() {
const {
trigger,
isTriggering,
steps,
isRunFinished,
waitingState,
continueWorkflow,
} = useWorkflowWithRealtime();
return (
<div
style={{
maxWidth: "600px",
margin: "40px auto",
fontFamily: "Arial, sans-serif",
}}
>
<button onClick={trigger} disabled={isTriggering}>
{isTriggering ? "Starting..." : "Click to Trigger Workflow"}
</button>
{isRunFinished && (
<h3 style={{ marginTop: "20px" }}>✅ Workflow Finished!</h3>
)}
{/* Show workflow steps */}
<h3 style={{ marginTop: "20px" }}>Workflow Steps:</h3>
<div>
{steps.map((step, index) => (
<div key={index}>
<strong>{step.stepName}</strong>
{Boolean(step.result) && (
<span>: {JSON.stringify(step.result)}</span>
)}
</div>
))}
</div>
{/* Show approval UI when waiting for input */}
{waitingState && (
<div style={{ marginTop: "20px" }} className="approval-prompt">
<p>{waitingState.message}</p>
<p>
<button onClick={() => continueWorkflow({ approved: true })}>
Click to Approve
</button>
</p>
<p>
<button onClick={() => continueWorkflow({ approved: false })}>
Click to Reject
</button>
</p>
</div>
)}
</div>
);
}
How the Pattern Works
Timeline of Events
- Initial processing:
stepFinish event → Frontend shows completed step
- Waiting for approval:
waitingForInput event → Frontend shows approval buttons
- User clicks approve/reject: Frontend calls
/api/notify
- Workflow resumes:
inputResolved event → Frontend hides approval buttons
- Processing continues: More
stepFinish events as workflow continues
- Workflow completes:
runFinish event → Frontend shows “Workflow Finished!”
Benefits
- Real-time feedback: Users see exactly when their approval is needed
- No polling: Instant updates via Server-Sent Events
- Timeout handling: Workflows don’t hang indefinitely waiting for input
Full Example
For a complete working example with all steps, error handling, and full UI components, check out the Upstash Realtime example on GitHub.
Next Steps