-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrigger_graph.py
More file actions
105 lines (83 loc) · 3.55 KB
/
trigger_graph.py
File metadata and controls
105 lines (83 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/env python3
"""
Batch Document Processing Graph Trigger
This script triggers the batch document processing workflow
with a real CSV file containing document paths and processing parameters using the Exosphere Python SDK.
"""
import asyncio
import os
import sys
import csv
from exospherehost import StateManager
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
async def trigger_graph_execution(graph_name: str="batch-process-docs", csv_file_path: str="documents.csv", chunk_size: str = "5", processing_prompt: str = None, gemini_api_key: str = None, database_url: str = None):
"""Trigger the batch document processing workflow using Exosphere Python SDK"""
# Configuration from environment variables
namespace = os.getenv("EXOSPHERE_NAMESPACE", "batch-process-docs")
state_manager_uri = os.getenv("EXOSPHERE_STATE_MANAGER_URI", "http://localhost:8000")
api_key = os.getenv("EXOSPHERE_API_KEY", "")
database_url = os.getenv("DATABASE_URL", "{{DATABASE_URL}}")
gemini_api_key = os.getenv("GEMINI_API_KEY", "{{GEMINI_API_KEY}}")
# Validate CSV file exists
if not os.path.exists(csv_file_path):
print(f"Error: CSV file '{csv_file_path}' does not exist")
return None
# Default processing prompt if not provided
if not processing_prompt:
processing_prompt = """
Please extract the following information from each document:
1. Document title
2. Main content/summary
3. Key metadata (page count, word count, etc.)
4. Any important dates or numbers mentioned
Return the information in JSON format with the following structure:
{
"title": "Document title",
"content": "Main content or summary",
"metadata": {
"pages": number,
"word_count": number,
"dates": ["date1", "date2"],
"numbers": ["number1", "number2"]
}
}
"""
# Initialize state manager
state_manager = StateManager(
namespace=namespace,
state_manager_uri=state_manager_uri,
key=api_key
)
print("Triggering batch document processing workflow...")
print(f"CSV file: {csv_file_path}")
print(f"Chunk size: {chunk_size}")
print("-" * 50)
# Trigger the graph with store values (no inputs needed since parameters are in store)
result = await state_manager.trigger(
graph_name,
store={
"csv_file_path": csv_file_path,
"chunk_size": chunk_size,
"prompt": processing_prompt
}
)
return result['run_id']
async def main():
graph_name = "parse-and-process-batch-docs1"
csv_file_path = "documents.csv"
chunk_size = "5"
processing_prompt = "Extract title, content, and metadata from each document"
gemini_api_key = os.getenv("GEMINI_API_KEY", "{{GEMINI_API_KEY}}")
database_url = os.getenv("DATABASE_URL", "{{DATABASE_URL}}")
run_id = await trigger_graph_execution(graph_name=graph_name, csv_file_path=csv_file_path, chunk_size=chunk_size, processing_prompt=processing_prompt, gemini_api_key=gemini_api_key, database_url=database_url)
if run_id:
print(f"\n Run ID: {run_id}")
print(f" Graph: {graph_name}")
print(f"\nYou can monitor the execution on the Exosphere dashboard.")
else:
print("Failed to trigger batch document processing workflow.")
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())