from agno.workflow import Workflow, OnError
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.db.sqlite import SqliteDb
import random
def unreliable_api_call(step_input: StepInput) -> StepOutput:
if random.random() < 0.7: # 70% failure rate
raise Exception("API call failed: Connection timeout")
return StepOutput(content="API call succeeded")
def process_data(step_input: StepInput) -> StepOutput:
return StepOutput(content=f"Processed: {step_input.previous_step_content}")
workflow = Workflow(
name="api_workflow",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(
name="fetch_data",
executor=unreliable_api_call,
on_error=OnError.pause,
),
Step(name="process", executor=process_data),
],
)
run_output = workflow.run("Fetch and process")
while run_output.is_paused:
for req in run_output.steps_with_errors:
print(f"Step '{req.step_name}' failed")
print(f"Error: {req.error_message}")
print(f"Retry count: {req.retry_count}")
choice = input("Retry or skip? (r/s): ").lower()
if choice == "r":
req.retry()
else:
req.skip()
run_output = workflow.continue_run(run_output)
print(run_output.content)