getWorld

Access the World instance for low-level storage, queuing, and streaming operations.

Retrieves the World instance for direct access to workflow storage, queuing, and streaming backends. This function returns a World which provides low-level access to manage workflow runs, steps, events, and hooks.

Use this function when you need direct access to the underlying workflow infrastructure, such as listing all runs, querying events, or implementing custom workflow management logic.

import { getWorld } from "workflow/runtime";

const world = getWorld();

API Signature

Parameters

This function does not accept any parameters.

Returns

Returns a World object:

NameTypeDescription
start() => Promise<void>A function that will be called to start any background tasks needed by the World implementation. For example, in the case of a queue backed World, this would start the queue processing.
close() => Promise<void>Release any resources held by the World implementation (connection pools, listeners, etc.). After calling close(), the World instance should not be used again. This is important for CLI commands and short-lived processes that need to exit cleanly without relying on process.exit().
resolveLatestDeploymentId() => Promise<string>Resolve the most recent deployment ID for the current deployment's environment. Used when deploymentId: 'latest' is passed to start(). The implementation determines the latest deployment that shares the same environment (e.g., same "production" target or same git branch for "preview" deployments) as the current deployment. Not all World implementations support this — it is only implemented by world-vercel where deployment routing is meaningful.
getEncryptionKeyForRun{ (run: { runId: string; deploymentId: string; workflowName: string; input: unknown; createdAt: Date; updatedAt: Date; status: "pending" | "running"; output: undefined; error: undefined; ... 4 more ...; startedAt?: Date | undefined; } | { ...; } | { ...; } | { ...; }): Promise<...>; (runId: string, context?: Record<...Retrieve the AES-256 encryption key for a specific workflow run. The returned key is a ready-to-use 32-byte AES-256 key. The World implementation handles all key retrieval and derivation internally (e.g., HKDF from a deployment key). The core encryption module uses this key directly for AES-GCM encrypt/decrypt operations. Two overloads: - getEncryptionKeyForRun(run) — Preferred. Pass a WorkflowRun when the run entity already exists. The World reads any context it needs (e.g., deploymentId) directly from the run. - getEncryptionKeyForRun(runId, context?) — Used only by start() when the run entity has not yet been created. The context parameter carries opaque world-specific data (e.g., { deploymentId } for world-vercel) that the World needs to resolve the correct key. When context is omitted, the World assumes the current deployment. When not implemented, encryption is disabled — data is stored unencrypted.
getDeploymentId() => Promise<string>
queue(queueName: __wkf_step_${string} | __wkf_workflow_${string}, message: { runId: string; traceCarrier?: Record<string, string> | undefined; requestedAt?: Date | undefined; serverErrorRetryCount?: number | undefined; } | { ...; } | { ...; }, opts?: QueueOptions | undefined) => Promise<...>Enqueues a message to the specified queue.
createQueueHandler(queueNamePrefix: "__wkf_step_" | "__wkf_workflow_", handler: (message: unknown, meta: { attempt: number; queueName: __wkf_step_${string} | __wkf_workflow_${string}; messageId: string & $brand<"MessageId">; requestId?: string; }) => Promise<...>) => (req: Request) => Promise<...>Creates an HTTP queue handler for processing messages from a specific queue.
runs{ get(id: string, params: GetWorkflowRunParams & { resolveData: "none"; }): Promise<WorkflowRunWithoutData>; get(id: string, params?: (GetWorkflowRunParams & { ...; }) | undefined): Promise<...>; get(id: string, params?: GetWorkflowRunParams | undefined): Promise<...>; list(params: ListWorkflowRunsParams & { ...; })...
steps{ get(runId: string | undefined, stepId: string, params: GetStepParams & { resolveData: "none"; }): Promise<StepWithoutData>; get(runId: string | undefined, stepId: string, params?: (GetStepParams & { ...; }) | undefined): Promise<...>; get(runId: string | undefined, stepId: string, params?: GetStepParams | undefine...
events{ create(runId: string | null, data: { eventType: "run_created"; eventData: { deploymentId: string; workflowName: string; input: unknown; executionContext?: Record<string, any> | undefined; }; correlationId?: string | undefined; specVersion?: number | undefined; }, params?: CreateEventParams | undefined): Promise<.....
hooks{ get(hookId: string, params?: GetHookParams | undefined): Promise<Hook>; getByToken(token: string, params?: GetHookParams | undefined): Promise<...>; list(params: ListHooksParams): Promise<...>; }
writeToStream(name: string, runId: string, chunk: string | Uint8Array<ArrayBufferLike>) => Promise<void>
writeToStreamMulti(name: string, runId: string, chunks: (string | Uint8Array<ArrayBufferLike>)[]) => Promise<void>Write multiple chunks to a stream in a single operation. This is an optional optimization for world implementations that can batch multiple writes efficiently (e.g., single HTTP request for world-vercel). If not implemented, the caller should fall back to sequential writeToStream() calls.
closeStream(name: string, runId: string) => Promise<void>
readFromStream(name: string, startIndex?: number | undefined) => Promise<ReadableStream<Uint8Array<ArrayBufferLike>>>Read from a stream starting at the given chunk index. Positive values skip that many chunks from the start (0-based). Negative values start that many chunks before the current end (e.g. -3 on a 10-chunk stream starts at chunk 7). Clamped to 0.
listStreamsByRunId(runId: string) => Promise<string[]>
getStreamChunks(name: string, runId: string, options?: GetChunksOptions | undefined) => Promise<StreamChunksResponse>Fetch stream chunks with cursor-based pagination. Unlike readFromStream (which returns a live ReadableStream that waits for new chunks in real-time), getStreamChunks returns a snapshot of currently available chunks in a standard paginated response.
getStreamInfo(name: string, runId: string) => Promise<StreamInfoResponse>Retrieve lightweight metadata about a stream. Returns the tail index (index of the last known chunk, 0-based) and whether the stream is complete. This is useful for resolving a negative startIndex into an absolute position before connecting to a stream.

Examples

List Workflow Runs

List all workflow runs with pagination:

import { getWorld } from "workflow/runtime";

export async function GET(req: Request) {
  const url = new URL(req.url);
  const cursor = url.searchParams.get("cursor") ?? undefined;

  try {
    const world = getWorld(); 
    const runs = await world.runs.list({
      pagination: { cursor },
    });

    return Response.json(runs);
  } catch (error) {
    return Response.json(
      { error: "Failed to list workflow runs" },
      { status: 500 }
    );
  }
}

Cancel a Workflow Run

Cancel a running workflow:

import { getWorld } from "workflow/runtime";

export async function POST(req: Request) {
  const { runId } = await req.json();

  if (!runId) {
    return Response.json({ error: "No runId provided" }, { status: 400 });
  }

  try {
    const world = getWorld(); 
    const run = await world.runs.cancel(runId); 

    return Response.json({ status: run.status });
  } catch (error) {
    return Response.json(
      { error: "Failed to cancel workflow run" },
      { status: 500 }
    );
  }
}

List Steps for a Run (Without Data)

List steps for a workflow run with resolveData: 'none' to efficiently get step metadata without fetching serialized input/output. Use parseStepName to extract user-friendly display names:

import { getWorld } from "workflow/runtime";
import { parseStepName } from "@workflow/utils/parse-name"; 

export async function GET(req: Request) {
  const url = new URL(req.url);
  const runId = url.searchParams.get("runId");

  if (!runId) {
    return Response.json({ error: "No runId provided" }, { status: 400 });
  }

  try {
    const world = getWorld(); 
    const steps = await world.steps.list({ 
      runId, 
      resolveData: "none", // Skip fetching input/output for performance
    }); 

    // Map steps to a progress view using parseStepName for display
    const progress = steps.data.map((step) => {
      const parsed = parseStepName(step.stepName); 
      return {
        stepId: step.stepId,
        // Use shortName for UI display (e.g., "fetchUserData")
        displayName: parsed?.shortName ?? step.stepName, 
        // Module info available for debugging
        module: parsed?.moduleSpecifier, 
        status: step.status,
        startedAt: step.startedAt,
        completedAt: step.completedAt,
      };
    });

    return Response.json({ progress, cursor: steps.cursor });
  } catch (error) {
    return Response.json(
      { error: "Failed to list steps" },
      { status: 500 }
    );
  }
}

Get Step with Hydrated Input/Output

Retrieve a step with its serialized data and hydrate it for display. This example shows how to decrypt and deserialize step input/output:

import { getWorld } from "workflow/runtime";
import { parseStepName } from "@workflow/utils/parse-name"; 
import { 
  hydrateResourceIO, 
  observabilityRevivers, 
} from "@workflow/core/serialization-format"; 

export async function GET(req: Request) {
  const url = new URL(req.url);
  const runId = url.searchParams.get("runId");
  const stepId = url.searchParams.get("stepId");

  if (!runId || !stepId) {
    return Response.json({ error: "runId and stepId required" }, { status: 400 });
  }

  try {
    const world = getWorld(); 
    // Fetch step with data (default resolveData behavior)
    const step = await world.steps.get(runId, stepId); 

    // Hydrate serialized input/output for display
    const hydrated = hydrateResourceIO(step, observabilityRevivers); 

    // Parse the stepName for user-friendly display
    const parsed = parseStepName(step.stepName);

    return Response.json({
      stepId: hydrated.stepId,
      displayName: parsed?.shortName ?? step.stepName, 
      module: parsed?.moduleSpecifier, 
      status: hydrated.status,
      attempt: hydrated.attempt,
      // Hydrated input/output ready for rendering
      input: hydrated.input, 
      output: hydrated.output, 
    });
  } catch (error) {
    return Response.json(
      { error: "Step not found" },
      { status: 404 }
    );
  }
}

The stepName field contains a machine-readable identifier like step//./src/workflows/order//processPayment. Use parseStepName() from @workflow/utils/parse-name to extract the shortName (e.g., "processPayment") and moduleSpecifier for display in your UI.

  • getRun() - Higher-level API for working with individual runs by ID.
  • start() - Start a new workflow run.