from agno.workflow import Workflow
from agno.workflow.step import Step
from agno.workflow.steps import Steps
from agno.workflow.types import HumanReview, StepInput, StepOutput
from agno.db.sqlite import SqliteDb
def validate_data(step_input: StepInput) -> StepOutput:
return StepOutput(content="Validation: Schema verified")
def transform_data(step_input: StepInput) -> StepOutput:
return StepOutput(content="Transform: Data normalized")
def enrich_data(step_input: StepInput) -> StepOutput:
return StepOutput(content="Enrichment: External data merged")
workflow = Workflow(
name="data_pipeline",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(name="collect", executor=collect_data),
Steps(
name="advanced_processing",
steps=[
Step(name="validate", executor=validate_data),
Step(name="transform", executor=transform_data),
Step(name="enrich", executor=enrich_data),
],
human_review=HumanReview(
requires_confirmation=True,
confirmation_message="Run advanced processing pipeline?",
),
),
Step(name="report", executor=generate_report),
],
)
run_output = workflow.run("Process data")
if run_output.is_paused:
for req in run_output.steps_requiring_confirmation:
print(f"Pipeline: {req.step_name}")
print(f"Message: {req.confirmation_message}")
if input("Run pipeline? (y/n): ").lower() == "y":
req.confirm()
print("Executing pipeline")
else:
req.reject()
print("Skipping pipeline")
run_output = workflow.continue_run(run_output)
print(run_output.content)