Use workflows to model and run deliberative processes

An eight-factor AI application uses explicit workflow definitions to manage complex, multi-step processes. A workflow defines a sequence of steps, their dependencies, and how information flows between them. Workflows should be declarative, observable, and maintainable.

Workflows in an eight-factor app have clear characteristics:

  • If complex processes aren’t explicitly modeled, it’s not an eight-factor app – it becomes impossible to monitor, debug, or improve them
  • If workflows are too rigid, the application can’t adapt to unexpected situations or leverage model capabilities

A workflow definition includes:

  • Steps and their dependencies
  • Data flow between steps
  • Success/failure criteria
  • Recovery strategies
  • Observation points

Bad practice - implicit process flow:

def analyze_document(document):
    # Process buried in code, hard to modify or monitor
    summary = summarize_document(document)
    topics = extract_topics(summary)
    sentiment = analyze_sentiment(document)
    entities = extract_entities(document)
    return compile_results(summary, topics, sentiment, entities)

Good practice - explicit workflow definition:

class DocumentAnalysisWorkflow(Workflow):
    def define(self) -> WorkflowDefinition:
        return WorkflowDefinition(
            name="document_analysis",
            steps=[
                Step(
                    name="summarization",
                    handler=self.summarize,
                    retry_policy=RetryPolicy.exponential(max_attempts=3)
                ),
                Step(
                    name="topic_extraction",
                    handler=self.extract_topics,
                    depends_on=["summarization"]
                ),
                Step(
                    name="parallel_analysis",
                    handler=ParallelStep([
                        Step(name="sentiment", handler=self.analyze_sentiment),
                        Step(name="entities", handler=self.extract_entities)
                    ])
                ),
                Step(
                    name="compilation",
                    handler=self.compile_results,
                    depends_on=["topic_extraction", "parallel_analysis"]
                )
            ],
            on_failure=self.handle_failure
        )

    async def summarize(self, context: Context) -> StepResult:
        summary = await self.model.generate(
            self.prompts.get("summarization"),
            context.document
        )
        return StepResult(summary=summary)

    def handle_failure(self, error: WorkflowError) -> Recovery:
        if isinstance(error, ModelTemporaryError):
            return Recovery.retry()
        return Recovery.fail_workflow()

Workflows should be observable:

class WorkflowMonitor:
    def __init__(self):
        self.metrics = MetricsCollector()
        self.tracer = WorkflowTracer()
    
    async def observe_workflow(self, workflow: Workflow):
        trace = self.tracer.start_trace(workflow)
        
        try:
            result = await workflow.execute()
            self.metrics.record_success(workflow, trace)
            return result
        except WorkflowError as e:
            self.metrics.record_failure(workflow, trace, e)
            raise

class WorkflowTracer:
    def capture_step(self, step: Step, context: Context):
        return {
            "step": step.name,
            "input": self.sanitize(context),
            "timestamp": datetime.now(),
            "duration": step.duration,
            "tokens_used": step.token_usage
        }

This approach enables:

  • Clear process visualization
  • Flexible error handling
  • Progress monitoring
  • Cost tracking
  • Performance optimization

Workflows should be treated as first-class entities:

  • Workflow definitions should be version controlled
  • Execution traces should be stored
  • Performance should be monitored
  • Costs should be tracked
  • Outcomes should be measured

Applications may need dynamic workflows:

class DynamicWorkflow(Workflow):
    async def plan_next_steps(self, context: Context) -> List[Step]:
        planning_prompt = self.prompts.get("workflow_planning")
        plan = await self.model.generate(planning_prompt, context)
        return self.step_factory.create_steps(plan)
    
    async def execute(self) -> WorkflowResult:
        context = Context()
        
        while not self.is_complete(context):
            next_steps = await self.plan_next_steps(context)
            results = await self.execute_steps(next_steps, context)
            context.update(results)
            
        return self.create_result(context)

This pattern enables:

  • Adaptive process flows
  • Dynamic step generation
  • Context-aware execution
  • Flexible problem solving
  • Observable decision making

Workflow implementations should optimize for:

  • Clarity - explicit process definition
  • Observability - comprehensive monitoring
  • Resilience - robust error handling
  • Flexibility - adaptive execution
  • Efficiency - resource optimization

Each workflow should specify:

  • Clear success criteria
  • Resource constraints
  • Error handling strategies
  • Monitoring requirements
  • Performance targets