发现优质的 AI Agent 技能
聚合 Claude Skills、LangChain、AutoGPT 等优质资源,助力开发者快速构建智能应用
Long running work
pass thread = threading.Thread(target=long_task, args=(data,)) thread.start() ```
DBOS state from previous test!
result = another_workflow() ``` **Correct (reset fixture):**
For Postgres, use transactions instead of steps
engine.execute("INSERT INTO table VALUES (?)", data) ``` **Correct (using transaction):**
Manual retry logic is error-prone
for attempt in range(3): try: return requests.get("https://api.example.com").json() except Exception: if attempt == 2:
External API call directly in workflow - not checkpointed!
response = requests.get("https://api.example.com/data") return response.json() ``` **Correct (external call in step):**
May hit rate limits if too many calls
return openai.chat.completions.create(...) ``` **Correct (with rate limit):**
All tasks treated equally - urgent tasks may wait
for task in tasks: queue.enqueue(process_task, task) ``` **Correct (with priority):**
One user blocks all other users!
queue.enqueue(process_task, task) ``` **Correct (per-user limits with partitioning):**
Every worker processes both queues
if __name__ == "__main__": DBOS(config=config) DBOS.launch() ``` **Correct (workers listen to specific queues):**
Multiple requests = multiple workflows for same user!
queue.enqueue(process_workflow, user_id) ``` **Correct (deduplicated by user):**
Uses lots of memory
pass ``` **Correct (worker concurrency):**
Starting many workflows without control
for task in tasks: DBOS.start_workflow(process_task, task) # Could overwhelm resources ``` **Correct (using queue):**