feat(llm): expand graph extraction service APIs#361
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR introduces a new graph extraction/import service layer and expands the FastAPI surface area with job-based extraction and import endpoints, while also making extraction behavior more configurable (chunk handling, split types, and parallel chunk processing) and improving robustness around malformed LLM output and import result reporting.
Changes:
- Added
GraphExtractService/GraphImportServiceplus request/response model updates, including redaction and schema normalization. - Added async-style job endpoints (
/graph/extract/jobs/*) with an in-memory job store, plus new/graph/importand/graph/extract-and-importroutes. - Updated extraction and import flows/operators to support configurable split types, pre-split chunks, parallel chunk extraction, and structured import stats.
Reviewed changes
Copilot reviewed 42 out of 42 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| hugegraph-llm/src/tests/utils/test_graph_index_utils.py | Adds regression coverage for extract_graph() scheduler call shape. |
| hugegraph-llm/src/tests/operators/llm_op/test_property_graph_extract.py | Adds coverage for parent edgelabel schemas, parallel chunk ordering, serial fallback, and malformed JSON handling. |
| hugegraph-llm/src/tests/operators/llm_op/test_info_extract.py | Adds coverage for regex extraction with schema “shape” normalization. |
| hugegraph-llm/src/tests/operators/hugegraph_op/test_commit_to_hugegraph_load_into_graph.py | Updates expectations to “continue + report import_result” rather than raise on create failures. |
| hugegraph-llm/src/tests/operators/hugegraph_op/test_commit_to_hugegraph.py | Adds broader import behavior coverage including counts, id mapping, and normalized extraction inputs. |
| hugegraph-llm/src/tests/operators/document_op/test_chunk_split.py | Adds paragraph boundary behavior tests for short paragraphs. |
| hugegraph-llm/src/tests/nodes/test_request_graph_config.py | Tests request-scoped graph config propagation into nodes/operators. |
| hugegraph-llm/src/tests/nodes/test_extract_node.py | Ensures ExtractNode uses extract-LLM config and wires max-parallel-chunks. |
| hugegraph-llm/src/tests/nodes/test_base_node.py | Adds coverage that unexpected operator exceptions become error statuses. |
| hugegraph-llm/src/tests/models/llms/test_init_llm.py | Adds coverage for extract-LLM config fallback behavior across providers. |
| hugegraph-llm/src/tests/flows/test_graph_extract_flow.py | Tests split/content-type defaults and state reset semantics. |
| hugegraph-llm/src/tests/document/test_graph_extract_configurable_split.py | Extends flow post-deal expectations to include max_parallel_chunks in output. |
| hugegraph-llm/src/tests/api/test_graph_import_api.py | Adds coverage for import + extract-and-import endpoints, client_config behavior, and embedding updates. |
| hugegraph-llm/src/tests/api/test_graph_extract_jobs.py | Adds end-to-end coverage for job creation, execution, cancellation, expiry, and concurrency. |
| hugegraph-llm/src/tests/api/test_graph_extract_api.py | Refactors API tests around service layer, adds concurrency isolation and structured error expectations. |
| hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py | Adds request-scoped graph_config support to client creation. |
| hugegraph-llm/src/hugegraph_llm/state/ai_state.py | Extends workflow input/state with content_type, max_parallel_chunks, and graph_config. |
| hugegraph-llm/src/hugegraph_llm/services/graph_extract_service.py | Introduces extraction/import services, schema normalization, redaction, and flow JSON validation. |
| hugegraph-llm/src/hugegraph_llm/services/graph_extract_jobs.py | Adds an in-memory job store with TTL, queueing, worker threads, and status transitions. |
| hugegraph-llm/src/hugegraph_llm/services/init.py | Adds services package marker. |
| hugegraph-llm/src/hugegraph_llm/operators/llm_op/property_graph_extract.py | Adds parallel chunk extraction and explicit malformed-JSON failure path. |
| hugegraph-llm/src/hugegraph_llm/operators/llm_op/info_extract.py | Adds schema shape normalization helper for regex extraction. |
| hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py | Makes semantic index building respect request-scoped graph config. |
| hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/schema_manager.py | Adds graph_config support while retaining full “connection unit” behavior. |
| hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/commit_to_hugegraph.py | Adds request-scoped graph config and structured import_result counts/errors; continues on create failures. |
| hugegraph-llm/src/hugegraph_llm/operators/document_op/chunk_split.py | Adds paragraph-boundary splitting that preserves explicit paragraph breaks. |
| hugegraph-llm/src/hugegraph_llm/nodes/llm_node/extract_info.py | Switches to extract-LLM configuration and wires max-parallel-chunks into operator. |
| hugegraph-llm/src/hugegraph_llm/nodes/index_node/build_semantic_index.py | Passes request-scoped graph config into semantic index operator. |
| hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/schema.py | Plumbs request graph_config into SchemaManager when not using full connection dict. |
| hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/fetch_graph_data.py | Uses request-scoped graph_config for HugeGraph client creation. |
| hugegraph-llm/src/hugegraph_llm/nodes/hugegraph_node/commit_to_hugegraph.py | Creates Commit2Graph with request-scoped graph_config. |
| hugegraph-llm/src/hugegraph_llm/nodes/document_node/chunk_split.py | Skips splitting when content is already chunks; sets context["chunks"] directly. |
| hugegraph-llm/src/hugegraph_llm/nodes/base_node.py | Broadens exception handling to convert unexpected operator exceptions into error statuses. |
| hugegraph-llm/src/hugegraph_llm/models/llms/init_llm.py | Adds extract-LLM fallback rules (e.g., reuse chat config when extract config missing). |
| hugegraph-llm/src/hugegraph_llm/flows/update_vid_embeddings.py | Adds graph_config plumbing into the flow input. |
| hugegraph-llm/src/hugegraph_llm/flows/import_graph_data.py | Adds graph_config plumbing into the import flow input. |
| hugegraph-llm/src/hugegraph_llm/flows/graph_extract.py | Adds content_type/max_parallel_chunks parameters and enriches post-deal output. |
| hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py | Adjusts route registration order with new graph endpoints. |
| hugegraph-llm/src/hugegraph_llm/config/llm_config.py | Adds global defaults/limits for graph-extract parallel chunk calls. |
| hugegraph-llm/src/hugegraph_llm/api/models/graph_extract_responses.py | Adds typed error/job/import response models. |
| hugegraph-llm/src/hugegraph_llm/api/models/graph_extract_requests.py | Redesigns request contract around content_type/content, adds parallelism validation and import request models. |
| hugegraph-llm/src/hugegraph_llm/api/graph_extract_api.py | Adds job endpoints, import endpoints, structured error semantics, and request validation wrapping. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try: | ||
| max_parallel_chunks = max(1, int(context.get("max_parallel_chunks") or self.max_parallel_chunks)) | ||
| except (TypeError, ValueError): | ||
| max_parallel_chunks = max(1, self.max_parallel_chunks) | ||
| chunk_count = len(chunks) | ||
| worker_count = min(max_parallel_chunks, chunk_count) | ||
| context["max_parallel_chunks"] = worker_count | ||
| if worker_count <= 1: | ||
| proceeded_chunks = [self.extract_property_graph_by_llm(schema, chunk) for chunk in chunks] | ||
| else: | ||
| with ThreadPoolExecutor(max_workers=worker_count) as executor: | ||
| proceeded_chunks = list( | ||
| executor.map(lambda chunk: self.extract_property_graph_by_llm(schema, chunk), chunks) | ||
| ) |
There was a problem hiding this comment.
Fixed in 7850be7. Empty chunks now return without LLM calls and keep max_parallel_chunks metadata positive, with regression coverage.
| with ThreadPoolExecutor(max_workers=worker_count) as executor: | ||
| proceeded_chunks = list( | ||
| executor.map(lambda chunk: self.extract_property_graph_by_llm(schema, chunk), chunks) | ||
| ) |
There was a problem hiding this comment.
Not changed in this pass. The current LLM wrappers used here do not maintain per-request mutable response buffers in PropertyGraphExtract; serializing extract_property_graph_by_llm behind one lock would effectively disable the chunk-level parallelism this API is adding. The API also bounds per-request parallelism by config and request. If a future provider wrapper proves non-thread-safe, the better fix would be provider-local isolation rather than a global lock in the extraction operator.
| @router.post("/graph/import", status_code=status.HTTP_200_OK) | ||
| def graph_import_api(req: GraphImportRequest) -> GraphImportResponse: |
There was a problem hiding this comment.
Fixed in 7850be7. Added response_model declarations for job, import, and extract-and-import endpoints, with route registration coverage.
| @router.post("/graph/extract-and-import", status_code=status.HTTP_200_OK) | ||
| def graph_extract_and_import_api(req: GraphExtractAndImportRequest) -> GraphExtractAndImportResponse: |
There was a problem hiding this comment.
Fixed in 7850be7. Added response_model declarations for job, import, and extract-and-import endpoints, with route registration coverage.
| if not vertices and not edges and not triples: | ||
| log.critical("(Loading) Both vertices and edges are empty. Please check the input data again.") | ||
| raise ValueError("Both vertices and edges input are empty.") |
There was a problem hiding this comment.
Fixed in 7850be7. Empty input messages now mention vertices, edges, and triples. Schema-free mode rejects vertices or edges, and schema mode rejects triples so mixed inputs are not silently dropped. Added regression coverage.
| if not vertices and not edges: | ||
| log.critical("(Loading) Both vertices and edges are empty. Please check the input data again.") | ||
| raise ValueError("Both vertices and edges input are empty.") |
There was a problem hiding this comment.
Fixed in 7850be7. Empty input messages now mention vertices, edges, and triples. Schema-free mode rejects vertices or edges, and schema mode rejects triples so mixed inputs are not silently dropped. Added regression coverage.
| except RequestValidationError as exc: | ||
| return JSONResponse( | ||
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | ||
| content={ | ||
| "detail": _error( | ||
| "GRAPH_EXTRACT_VALIDATION_ERROR", | ||
| str(exc), | ||
| "request", | ||
| ) | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Fixed in 7850be7. Validation responses now use sanitized loc, msg, and type summaries from exc.errors() and omit raw input values. Added coverage for password and URL not being echoed.
00dee19 to
e761776
Compare
|
Update pushed in 9cecb9b:
Local verification:
|
imbajin
left a comment
There was a problem hiding this comment.
I found one blocking import/schema issue and several API contract / coverage issues that should be addressed before merge. Also, this PR removes the non-blocking ty-check workflow (uv run ty check hugegraph-llm/src hugegraph-python-client/src) without an equivalent replacement; please keep or replace that type-check signal.
| "id": "person:Tom Hanks", | ||
| "label": "person", | ||
| "properties": {"name": "Tom Hanks", "age": 67}, | ||
| }, |
There was a problem hiding this comment.
CUSTOMIZE_STRING only by calling load_into_graph() directly, so it bypasses the real import flow that first calls init_schema_if_need(). That production path still creates every vertex label with usePrimaryKeyId().primaryKeys(...), while load_into_graph() later writes explicit ids for id_strategy == "CUSTOMIZE_STRING". An inline schema declaring custom string ids can therefore create a primary-key schema before the custom-id write path runs. Please make init_schema_if_need() branch on id_strategy and call useCustomizeStringId() for CUSTOMIZE_STRING, and add an end-to-end Commit2Graph.run() or import-flow test for this schema.
There was a problem hiding this comment.
Fixed in 10524d2. init_schema_if_need() now branches on id_strategy and uses useCustomizeStringId() for CUSTOMIZE_STRING vertex labels instead of creating a primary-key schema first. Added a Commit2Graph.run() regression test so the production init_schema_if_need() path is covered before load_into_graph().
| if not isinstance(vertices, list) or not isinstance(edges, list): | ||
| raise FlowOutputValidationError("property graph result must contain list vertices and edges") | ||
| for vertex in vertices: | ||
| if not isinstance(vertex, dict) or "label" not in vertex or "properties" not in vertex: |
There was a problem hiding this comment.
/graph/extract workflow-output contract is weaker than the import request contract. This only checks that each vertex/edge is a dict and has the required keys, but it does not verify that label/outV/outVLabel/inV/inVLabel are non-empty strings or that properties is an object. A scheduler result like {"vertices":[{"label":"person","properties":null}],"edges":[]} would be returned as a successful extract response even though /graph/import rejects the same property-graph payload. Please align this validator with GraphImportRequest.validate_data() and add a contract test for malformed workflow output.
There was a problem hiding this comment.
Fixed in 10524d2. GraphExtractService now validates workflow property-graph output against the import contract: vertex label must be a non-empty string, vertex properties must be an object, edge label/outV/outVLabel/inV/inVLabel must be non-empty strings, and edge properties must be an object. Added endpoint/service regression coverage where the scheduler returns properties=null and /graph/extract responds with GRAPH_EXTRACT_INVALID_FLOW_OUTPUT.
| return JSONResponse( | ||
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | ||
| content={ | ||
| "detail": _error( |
There was a problem hiding this comment.
GRAPH_EXTRACT_VALIDATION_ERROR. Invalid /graph/import bodies therefore return an extract-specific code/phase, making client error handling ambiguous. Please choose the validation error code/phase based on the route, or use separate handlers for extract and import routes, and cover invalid import payloads in endpoint tests.
There was a problem hiding this comment.
Fixed in 10524d2. GraphExtractAPIRoute now maps request-validation errors by route path. Invalid /graph/import bodies return GRAPH_IMPORT_VALIDATION_ERROR with phase=import instead of the extract-specific validation code. Added endpoint coverage that an invalid import payload is rejected before GraphImportService is called.
| # TODO: transform to Enum first (better in earlier step) | ||
| data_type = property_label["data_type"] | ||
| cardinality = property_label["cardinality"] | ||
| if not self._check_property_data_type(data_type, cardinality, value): |
There was a problem hiding this comment.
bool for BYTE/INT/LONG, but this path uses isinstance(value, int), so Python accepts True/False as integer values when named-schema or internal workflow paths bypass the request-side inline-schema check. Please share one validator between GraphImportRequest and Commit2Graph, and add coverage for INT=True plus the intended FLOAT/DOUBLE behavior for JSON integer values.
There was a problem hiding this comment.
Fixed in 10524d2. Property value validation is now shared in hugegraph_llm.utils.schema_property and used by both GraphImportRequest and Commit2Graph. Integer types exclude bool, and FLOAT/DOUBLE accept JSON numeric int/float values while still excluding bool. Added request and operator coverage for INT=True, DOUBLE=1, and the named/internal import path that bypasses request-side inline-schema validation.
| } | ||
|
|
||
|
|
||
| def _client(service=None, job_store=None, run_jobs_inline=True): |
There was a problem hiding this comment.
graph_extract_http_api(...) is run_jobs_inline=None, which submits work through jobs.submit_job() and daemon workers, but this helper defaults to inline execution and the tests only exercise inline mode or a deliberately non-running pending mode. Please add a route-level test that omits run_jobs_inline, creates a job, polls until it reaches a terminal state, and verifies result retrieval so the public async worker path is covered.
There was a problem hiding this comment.
Fixed in 10524d2. Added a route-level test for the production default run_jobs_inline=None path: it creates a job, lets the background worker execute it, polls until SUCCEEDED, then retrieves the result from /graph/extract/jobs/{job_id}/result.
|
Updated in Addressed the latest review items:
Workflow/type-check signal:
Local verification:
|
Summary
content_type/contentsupport for raw text and pre-split chunks, request-bounded chunk parallelism, metadata, and structured error responses.Relation to #351
This builds on the initial synchronous
POST /graph/extractendpoint from #351. The deprecatedtextsalias remains accepted: a string maps tocontent_type=text, and a list maps to pre-split chunks. Multi-document extraction remains caller-managed through multiple API requests instead of hidden batch semantics in the synchronous endpoint.Write API Safety
/graph/importand/graph/extract-and-importrequirewrite_to_graph=true.client_config.graphso the target graph is explicit in the request and response metadata.Job Endpoint Notes
/graph/extract/jobsuses an in-memory, process-local job store.Tests
uv run ruff format --check .uv run ruff check .uv run pytest hugegraph-llm/src/tests/api/test_graph_extract_api.py hugegraph-llm/src/tests/api/test_graph_import_api.py hugegraph-llm/src/tests/api/test_graph_extract_jobs.py -qSKIP_EXTERNAL_SERVICES=true uv run pytest hugegraph-llm/src/tests/api -v --tb=shortSKIP_EXTERNAL_SERVICES=true uv run pytest hugegraph-llm/src/tests/config/ hugegraph-llm/src/tests/document/ hugegraph-llm/src/tests/operators/ hugegraph-llm/src/tests/models/ hugegraph-llm/src/tests/indices/ hugegraph-llm/src/tests/test_utils.py -v --tb=shortReview