feat(snowflake): opt-in ACCESS_HISTORY lineage path (POC)#28149
feat(snowflake): opt-in ACCESS_HISTORY lineage path (POC)#28149ulixius9 wants to merge 4 commits into
Conversation
Adds an alternative Snowflake lineage path that reads precomputed table-to-table and column-to-column lineage directly from ACCOUNT_USAGE.ACCESS_HISTORY, bypassing client-side SQL parsing. Opt-in via connectionOptions.useAccessHistory="true" with a runtime probe that silently demotes to the legacy parser path on Standard Edition or when the role lacks the IMPORTED PRIVILEGES grant. Zero behavior change for pipelines that do not set the flag. The combined SQL groups table edges with MAX_BY for dedup and aggregates column pairs per (downstream, upstream) edge via ARRAY_AGG so the client streams one row per directed edge with column lineage already attached — constant client memory regardless of catalog size, single round-trip to Snowflake. COPY_HISTORY is also surfaced for external stage→table lineage, resolving the upstream Container by stage URL; internal Snowflake stages are skipped silently. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| @staticmethod | ||
| def _split_snowflake_fqn(snowflake_fqn: str) -> Optional[Tuple[str, str, str]]: # noqa: UP006, UP045 | ||
| """ | ||
| Split a Snowflake `DB.SCHEMA.TABLE` FQN into its three parts. | ||
| Returns None for malformed inputs (quoted names with embedded dots are | ||
| not handled in the POC and are skipped silently). | ||
| """ | ||
| if not snowflake_fqn or '"' in snowflake_fqn: | ||
| return None | ||
| parts = snowflake_fqn.split(".") | ||
| if len(parts) != 3: | ||
| return None | ||
| return parts[0], parts[1], parts[2] |
There was a problem hiding this comment.
💡 Edge Case: Quoted Snowflake identifiers silently dropped from lineage
_split_snowflake_fqn (line 439) rejects any FQN containing a double-quote character. Snowflake routinely quotes identifiers that contain spaces, mixed case, or special characters (e.g., "My DB"."My Schema"."My Table"). ACCESS_HISTORY returns quoted identifiers for such objects, so their lineage edges are silently skipped.
While documented as a POC limitation, this could lead to significant lineage gaps for customers with mixed-case or special-character naming conventions, with no visibility into what was missed (only the aggregate skip count is logged).
Strip quotes and split correctly, or at minimum log a DEBUG message per skipped FQN so operators can assess impact:
@staticmethod
def _split_snowflake_fqn(snowflake_fqn: str) -> Optional[Tuple[str, str, str]]:
if not snowflake_fqn:
return None
# Strip surrounding quotes from each part
parts = snowflake_fqn.split(".")
# Quoted identifiers may contain dots — reassemble quoted segments
# For POC: handle the simple case of "DB"."SCHEMA"."TABLE"
stripped = [p.strip('"') for p in parts]
if len(stripped) != 3:
logger.debug(f"Skipping FQN with unexpected part count: {snowflake_fqn}")
return None
return stripped[0], stripped[1], stripped[2]
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
| if not (db and schema and table and stage_location): | ||
| return None | ||
|
|
||
| downstream_fqn = fqn._build(self.config.serviceName, db, schema, table) |
There was a problem hiding this comment.
💡 Quality: Using private fqn._build instead of public API
fqn._build is a private helper (underscore-prefixed) that simply joins components with .. The public fqn.build function provides entity-type-aware FQN construction and is the standard across the codebase. Using the private function bypasses any future validation or normalization added to the public API.
Was this helpful? React with 👍 / 👎
The combined ACCESS_HISTORY SQL now LEFT JOINs back to QUERY_HISTORY on the representative query_id (already selected via MAX_BY) and returns QUERY_TEXT alongside the edge. `_build_access_history_edge` populates LineageDetails.sqlQuery so the OpenMetadata lineage panel shows the SQL that produced the edge — matching the per-edge "SQL Query" surface in the Snowflake-native lineage view. LineageDetails is now built via the constructor (sqlQuery, columnsLineage, source) rather than post-assignment, since Pydantic 2 skips coercion on attribute setters and the RootModel wrapper would not get applied. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drops the "(POC path)" suffix from the dispatch log line and POC framing from docstrings now that the path is stable enough to ship as a permanent connector option. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… path
The combined SQL now exposes a `{filter_condition}` placeholder inside the
`access_history_filtered` CTE so users can scope which queries contribute
to lineage — same field the legacy QUERY_HISTORY parser path already
respects via `get_filters()`. Unqualified column names resolve against
QUERY_HISTORY (alias `qh`) in the CTE, so existing filterCondition values
like `query_type = 'COPY'` or `user_name = 'etl_user'` carry over without
edits.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review 👍 Approved with suggestions 1 resolved / 3 findingsIntroduces an opt-in Snowflake ACCESS_HISTORY lineage path for improved ingestion performance, including enhanced SQL context and filter support. Please address the silent dropping of quoted identifiers in FQN splitting and switch to the public FQN API instead of internal methods. 💡 Edge Case: Quoted Snowflake identifiers silently dropped from lineage📄 ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py:432-444
While documented as a POC limitation, this could lead to significant lineage gaps for customers with mixed-case or special-character naming conventions, with no visibility into what was missed (only the aggregate skip count is logged). Strip quotes and split correctly, or at minimum log a DEBUG message per skipped FQN so operators can assess impact💡 Quality: Using private fqn._build instead of public API📄 ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py:376 📄 ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py:406
✅ 1 resolved✅ Bug: Pop of useAccessHistory happens after engine creation — ineffective
🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
Summary
ACCOUNT_USAGE.ACCESS_HISTORYinstead of parsing every relevant query client-side. Targets large query windows where the current sqlglot/sqlfluff/sqlparse pipeline becomes a wall-clock bottleneck.connectionOptions.useAccessHistory: \"true\"on the Snowflake service connection. Default off — existing pipelines see zero change. A runtime probe demotes silently to the legacy parser path when ACCESS_HISTORY isn't readable (Standard Edition or missingIMPORTED PRIVILEGES ON DATABASE SNOWFLAKE).MAX_BYdedup on table edges,ARRAY_AGG(DISTINCT OBJECT_CONSTRUCT(...))on column pairs per(downstream, upstream)edge,LEFT JOIN'd so the cursor streams one row per directed edge with column lineage already attached. Constant client memory regardless of edge count.s3://,azure://,gcs://,https://) are resolved against ingested Container entities viaes_search_container_by_path; internal Snowflake stages (@~/,@%table/,@db.schema.stage/) are skipped silently.StoredProcedureLineageMixin) is unchanged and continues to run regardless of the flag.connectionOptionsMap<String,String> on the Snowflake connection — no JSON schema or generated-model changes. If validated, a follow-up PR will promote the key to a first-class field.How to enable
Customer pipeline YAML:
Files changed
Test plan
Notes
🤖 Generated with Claude Code
Summary by Gitar
QUERY_TEXTfromACCOUNT_USAGE.QUERY_HISTORYinto theACCESS_HISTORYlineage path to provide representative SQL context for edges.LineageDetailsto includesqlQueryand added corresponding unit tests to verify SQL text attachment._build_filter_condition_clauseto injectsourceConfig.filterConditiondirectly into theACCESS_HISTORYSQL query.SNOWFLAKE_ACCESS_HISTORY_LINEAGEquery to support filtering at the source CTE level.This will update automatically on new commits.