Skip to content

feat(ingestion): Add InfluxDB 3 database connector#29682

Open
vinhhap wants to merge 6 commits into
open-metadata:mainfrom
vinhhap:feat/influxdb3-connector
Open

feat(ingestion): Add InfluxDB 3 database connector#29682
vinhhap wants to merge 6 commits into
open-metadata:mainfrom
vinhhap:feat/influxdb3-connector

Conversation

@vinhhap

@vinhhap vinhhap commented Jul 2, 2026

Copy link
Copy Markdown

Summary

New database connector for InfluxDB 3, the latest version of the most widely used time-series database. InfluxDB 3 uses SQL (via HTTP API /api/v3/query_sql) instead of Flux/InfluxQL.

Connector architecture

Since InfluxDB 3 has no SQLAlchemy dialect, this connector follows the same CommonNoSQLSource pattern as MongoDB and Cassandra — using the native HTTP SQL API for metadata introspection.

InfluxDB 3 concept OpenMetadata entity
InfluxDB instance Database
InfluxDB database Schema
InfluxDB table Table

Files

File Purpose
influxdbConnection.json JSON Schema — hostPort + token authentication
connection.py InfluxDBClient HTTP wrapper + InfluxDBConnection(BaseConnection)
metadata.py InfluxDBSource(CommonNoSQLSource) — metadata extraction
test_influxdb.py 18 unit tests across 5 classes
databaseService.json Registration: enum, javaEnum, connection $ref

Capabilities

  • Metadata ingestion — databases, tables, columns (via information_schema)
  • Test connection — health check (CheckAccess + GetDatabases steps)
  • Column type mapping (InfluxDB → OpenMetadata)
  • Database filtering via databaseName config option

Dependencies

  • requests (already a transitive dependency of openmetadata-ingestion)

Validation Checklist

  • JSON Schema validates, all $ref paths resolve
  • make generate succeeds (CI will verify)
  • mvn clean install -pl openmetadata-spec succeeds (CI will verify)
  • yarn parse-schema succeeds (CI will verify)
  • Connection creates client, test_connection passes (via test_connection_steps)
  • Source create() validates config type
  • ServiceSpec is discoverable by the framework
  • Unit tests pass with real assertions (18 tests, all pass)
  • No unbounded file reads without size checks
  • Client closed after use
  • make py_format produces no changes (All checks passed!)
  • mvn spotless:apply produces no changes (CI will verify)

Closes #29681

Greptile Summary

This PR adds a new InfluxDB 3 database connector following the CommonNoSQLSource pattern used by MongoDB and Cassandra, using InfluxDB's HTTP SQL API (/api/v3/query_sql) for metadata introspection since no SQLAlchemy dialect exists.

  • connection.py: Adds InfluxDBClient (thin HTTP wrapper) and InfluxDBConnection(BaseConnection). test_connection is correctly wired through test_connection_steps with automation_workflow and timeout_seconds forwarded properly.
  • metadata.py: Implements InfluxDBSource(CommonNoSQLSource) with column type mapping, schema/database listing, and table column extraction. A fetch_sample_rows method is defined on the client but is never invoked from the source — sample data ingestion described in the PR is not actually connected to the topology.
  • influxdbConnection.json / databaseService.json: Schema and registry entries are placed correctly in alphabetical order with required fields (hostPort, token) declared.

Confidence Score: 4/5

Safe to merge for metadata extraction; the claimed sample-data capability is not wired up and should be addressed before documenting it as a feature.

The connector correctly handles connection lifecycle, type mapping, schema filtering, and test-connection flow. The one gap is that fetch_sample_rows on InfluxDBClient has no caller in the ingestion topology — sample data will silently produce nothing despite being listed as a supported capability in the PR description.

connection.pyfetch_sample_rows is defined but never invoked from the source class or any topology step.

Important Files Changed

Filename Overview
ingestion/src/metadata/ingestion/source/database/influxdb/connection.py Adds InfluxDBClient HTTP wrapper and InfluxDBConnection. test_connection is now correctly wired through test_connection_steps. fetch_sample_rows is defined but has no caller in the ingestion topology — claimed sample-data support is non-functional.
ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py Implements InfluxDBSource via CommonNoSQLSource. Column type mapping and schema filtering look correct. Minor issues: double API call in warning path, no-op yield_table override, and TypeError instead of InvalidSourceException in create().
openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/influxdbConnection.json New JSON schema for InfluxDB connection with hostPort, token, and optional databaseName. Required fields declared correctly.
openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json InfluxDB enum, javaEnum entry, and connection $ref added in correct alphabetical position.
ingestion/tests/unit/topology/database/test_influxdb.py 15 unit tests covering type mapping, HTTP client mocking, column extraction, schema filtering, and connection class. Coverage is solid for the implemented functionality.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant UI as OpenMetadata UI
    participant Source as InfluxDBSource
    participant Conn as InfluxDBConnection
    participant Client as InfluxDBClient
    participant API as InfluxDB 3 HTTP API

    UI->>Conn: test_connection(metadata, automation_workflow)
    Conn->>Client: test_connection() GET /health
    API-->>Client: 200 OK
    Client-->>Conn: True
    Conn->>Client: "list_databases() GET /api/v3/query_sql?db=_internal"
    API-->>Client: "[{database_name: ...}]"
    Conn-->>UI: TestConnectionResult (via test_connection_steps)

    UI->>Source: run ingestion
    Source->>Client: list_databases()
    API-->>Client: [db1, db2]
    Source->>Client: list_tables(schema_name)
    API-->>Client: [table1, table2]
    Source->>Client: get_columns(schema_name, table_name)
    API-->>Client: "[{column_name, data_type, is_nullable}]"
    Source-->>UI: Metadata (Database → Schema → Table → Columns)
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant UI as OpenMetadata UI
    participant Source as InfluxDBSource
    participant Conn as InfluxDBConnection
    participant Client as InfluxDBClient
    participant API as InfluxDB 3 HTTP API

    UI->>Conn: test_connection(metadata, automation_workflow)
    Conn->>Client: test_connection() GET /health
    API-->>Client: 200 OK
    Client-->>Conn: True
    Conn->>Client: "list_databases() GET /api/v3/query_sql?db=_internal"
    API-->>Client: "[{database_name: ...}]"
    Conn-->>UI: TestConnectionResult (via test_connection_steps)

    UI->>Source: run ingestion
    Source->>Client: list_databases()
    API-->>Client: [db1, db2]
    Source->>Client: list_tables(schema_name)
    API-->>Client: [table1, table2]
    Source->>Client: get_columns(schema_name, table_name)
    API-->>Client: "[{column_name, data_type, is_nullable}]"
    Source-->>UI: Metadata (Database → Schema → Table → Columns)
Loading

Reviews (5): Last reviewed commit: "fix(review): pass py_format + use TypeEr..." | Re-trigger Greptile

- Add JSON Schema for InfluxDB 3 connection config (hostPort + token auth)
- Add InfluxDBClient HTTP wrapper for /api/v3/query_sql metadata introspection
- Add InfluxDBSource extending CommonNoSQLSource (non-SQLAlchemy pattern)
- Map InfluxDB 3 concepts: instance→Database, database→Schema, table→Table
- Port type mapping from existing custom connector (Influx→OpenMetadata types)
- Add 15 unit tests across 5 test classes (type mapping, client, source, schema)
- Register InfluxDB in databaseServiceType enum, javaEnums, and connection oneOf

Closes open-metadata#29681
@vinhhap vinhhap requested a review from a team as a code owner July 2, 2026 05:02
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

❌ PR checklist incomplete

This PR cannot be merged until the following are addressed on its linked issue:

The fields live on the linked issue in the Shipping project (open the issue → right sidebar → Projects). After you set them, re-run this check (or push a commit) — issue/project changes do not re-trigger it automatically.

Maintainers can bypass this check by adding the skip-pr-checks label.

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/connection.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py Outdated
Comment on lines +91 to +104
def fetch_sample_rows(
self, database: str, table: str, limit: int = 50
) -> Tuple[List[str], List[List[Any]]]:
sql = (
f'SELECT * FROM "{table}" '
f"WHERE time >= now() - INTERVAL '24 hours' "
f"LIMIT {limit}"
)
data = self._query(database, sql)
if not data:
return [], []
columns = list(data[0].keys())
rows = [[row.get(col) for col in columns] for row in data]
return columns, rows

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: 24h window may yield no sample rows for stale tables

fetch_sample_rows hardcodes WHERE time >= now() - INTERVAL '24 hours'. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with LIMIT SAMPLE_SIZE (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

Was this helpful? React with 👍 / 👎

Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/connection.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/connection.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/connection.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py Outdated
…ions

- Fix create() test config_dict to match WorkflowSource model (flat, not nested)
- Fix Column.name comparisons: column.name returns ColumnName object, use .root
- Fix reject-wrong-type test: MysqlConnection needs username+authType fields
- Remove redundant __init__ patch in reject test
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@gitar-bot

gitar-bot Bot commented Jul 2, 2026

Copy link
Copy Markdown
Code Review 👍 Approved with suggestions 2 resolved / 5 findings

Introduces an InfluxDB 3 connector using the native SQL API, but requires fixes for a Pydantic validation error in test_connection and missing identifier escaping in fetch_sample_rows.

💡 Security: Table name not escaped in fetch_sample_rows SQL

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104 📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:82-89

In fetch_sample_rows, the table name is interpolated directly into the query: f'SELECT * FROM "{table}" ...' with no escaping. This is inconsistent with get_columns, which escapes single quotes (table.replace("'", "''")). A table name containing a double quote (or other special characters) would break the query or allow SQL injection into the query_sql endpoint.

Table names originate from information_schema so the risk is low, but the code should escape embedded double quotes for correctness and consistency, e.g. safe_table = table.replace('"', '""') and interpolate safe_table.

Escape embedded double quotes in the table identifier.
safe_table = table.replace('"', '""')
sql = (
    f'SELECT * FROM "{safe_table}" '
    f"WHERE time >= now() - INTERVAL '24 hours' "
    f"LIMIT {limit}"
)
💡 Edge Case: 24h window may yield no sample rows for stale tables

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

fetch_sample_rows hardcodes WHERE time >= now() - INTERVAL '24 hours'. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with LIMIT SAMPLE_SIZE (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

💡 Quality: Unused test_connection override in source metadata.py

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:215-218

InfluxDBSource.test_connection calls self.connection_obj.test_connection(), but the test-connection flow is driven by InfluxDBConnection.test_connection in connection.py (consistent with MongoDB/Cassandra, which only override it in connection.py). The base DatabaseServiceSource.test_connection uses test_connection_common. This source-level override appears to be dead code that duplicates health-check logic and may mask the standard behavior. Recommend removing it unless it is intentionally invoked somewhere in the ingestion path.

✅ 2 resolved
Bug: Sample data lookup may run before table is persisted

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:169-183
yield_table in metadata.py yields the CreateTableRequest to the sink via yield from super().yield_table(...) and then immediately calls _ingest_sample_data, which does self.metadata.get_by_name(entity=Table, fqn=table_fqn) and returns early (silently skipping sample data) when the entity is None.

The OpenMetadata ingestion workflow sinks yielded records through an asynchronous producer/consumer queue, so there is no guarantee the just-yielded table has been created on the server by the time get_by_name runs. When it hasn't, table_entity is None and sample data ingestion is silently skipped, so the advertised "sample data" capability will frequently be a no-op. This differs from the MongoDB/Cassandra pattern, which never calls get_by_name during yield_table.

Consider ingesting sample data through the standard mechanism (e.g. implementing sample-data support that runs after tables are persisted, such as in the profiler/sample-data stage) rather than reading the entity back inside yield_table.

Bug: databaseName used as both OM database name and schema filter

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:103-117
databaseName is consumed in two conflicting ways. get_database_names returns databaseName or DEFAULT_DATABASE as the OpenMetadata Database name (the "InfluxDB instance"), while get_schema_name_list also filters the InfluxDB databases (mapped to OM Schemas) down to the one equal to databaseName.

Per the documented mapping (InfluxDB instance → Database, InfluxDB database → Schema), databaseName is meant to limit which InfluxDB database/schema is ingested, not to rename the instance-level Database. As written, setting databaseName="iot_data" names the OM Database "iot_data" AND restricts schemas to "iot_data", producing a confusing iot_data.iot_data hierarchy. Consider keeping the OM Database name fixed (e.g. DEFAULT_DATABASE) and only using databaseName for schema filtering.

🤖 Prompt for agents
Code Review: Introduces an InfluxDB 3 connector using the native SQL API, but requires fixes for a Pydantic validation error in `test_connection` and missing identifier escaping in `fetch_sample_rows`.

1. 💡 Security: Table name not escaped in fetch_sample_rows SQL
   Files: ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104, ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:82-89

   In `fetch_sample_rows`, the table name is interpolated directly into the query: `f'SELECT * FROM "{table}" ...'` with no escaping. This is inconsistent with `get_columns`, which escapes single quotes (`table.replace("'", "''")`). A table name containing a double quote (or other special characters) would break the query or allow SQL injection into the query_sql endpoint.
   
   Table names originate from `information_schema` so the risk is low, but the code should escape embedded double quotes for correctness and consistency, e.g. `safe_table = table.replace('"', '""')` and interpolate `safe_table`.

   Fix (Escape embedded double quotes in the table identifier.):
   safe_table = table.replace('"', '""')
   sql = (
       f'SELECT * FROM "{safe_table}" '
       f"WHERE time >= now() - INTERVAL '24 hours' "
       f"LIMIT {limit}"
   )

2. 💡 Edge Case: 24h window may yield no sample rows for stale tables
   Files: ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

   `fetch_sample_rows` hardcodes `WHERE time >= now() - INTERVAL '24 hours'`. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with `LIMIT SAMPLE_SIZE` (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

3. 💡 Quality: Unused test_connection override in source metadata.py
   Files: ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:215-218

   `InfluxDBSource.test_connection` calls `self.connection_obj.test_connection()`, but the test-connection flow is driven by `InfluxDBConnection.test_connection` in connection.py (consistent with MongoDB/Cassandra, which only override it in connection.py). The base `DatabaseServiceSource.test_connection` uses `test_connection_common`. This source-level override appears to be dead code that duplicates health-check logic and may mask the standard behavior. Recommend removing it unless it is intentionally invoked somewhere in the ingestion path.

Options

Display: compact → Showing less information.

Comment with these commands to change the behavior for this request:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

- Fix TestConnectionResult to use StatusType enum and steps=[] (P1)
- Escape double-quotes in fetch_sample_rows table name (P1)
- Remove unused import 'requests' in connection.py (P2)
- Remove unused imports in metadata.py (P2)
- Remove duplicate get_database_names override (already in base class)
- Remove dead test_connection override on source class
- Fix databaseName: use only for schema filtering, not OM database name
- Remove inline sample data ingestion from yield_table (race condition)
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@gitar-bot

gitar-bot Bot commented Jul 2, 2026

Copy link
Copy Markdown
Code Review 👍 Approved with suggestions 4 resolved / 5 findings

Adds the InfluxDB 3 connector, addressing runtime validation failures in test connections and SQL malformation during sample data ingestion. Consider adjusting the 24-hour lookback window in fetch_sample_rows to ensure table previews succeed for stale data.

💡 Edge Case: 24h window may yield no sample rows for stale tables

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

fetch_sample_rows hardcodes WHERE time >= now() - INTERVAL '24 hours'. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with LIMIT SAMPLE_SIZE (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

✅ 4 resolved
Bug: Sample data lookup may run before table is persisted

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:169-183
yield_table in metadata.py yields the CreateTableRequest to the sink via yield from super().yield_table(...) and then immediately calls _ingest_sample_data, which does self.metadata.get_by_name(entity=Table, fqn=table_fqn) and returns early (silently skipping sample data) when the entity is None.

The OpenMetadata ingestion workflow sinks yielded records through an asynchronous producer/consumer queue, so there is no guarantee the just-yielded table has been created on the server by the time get_by_name runs. When it hasn't, table_entity is None and sample data ingestion is silently skipped, so the advertised "sample data" capability will frequently be a no-op. This differs from the MongoDB/Cassandra pattern, which never calls get_by_name during yield_table.

Consider ingesting sample data through the standard mechanism (e.g. implementing sample-data support that runs after tables are persisted, such as in the profiler/sample-data stage) rather than reading the entity back inside yield_table.

Bug: databaseName used as both OM database name and schema filter

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:103-117
databaseName is consumed in two conflicting ways. get_database_names returns databaseName or DEFAULT_DATABASE as the OpenMetadata Database name (the "InfluxDB instance"), while get_schema_name_list also filters the InfluxDB databases (mapped to OM Schemas) down to the one equal to databaseName.

Per the documented mapping (InfluxDB instance → Database, InfluxDB database → Schema), databaseName is meant to limit which InfluxDB database/schema is ingested, not to rename the instance-level Database. As written, setting databaseName="iot_data" names the OM Database "iot_data" AND restricts schemas to "iot_data", producing a confusing iot_data.iot_data hierarchy. Consider keeping the OM Database name fixed (e.g. DEFAULT_DATABASE) and only using databaseName for schema filtering.

Security: Table name not escaped in fetch_sample_rows SQL

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104 📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:82-89
In fetch_sample_rows, the table name is interpolated directly into the query: f'SELECT * FROM "{table}" ...' with no escaping. This is inconsistent with get_columns, which escapes single quotes (table.replace("'", "''")). A table name containing a double quote (or other special characters) would break the query or allow SQL injection into the query_sql endpoint.

Table names originate from information_schema so the risk is low, but the code should escape embedded double quotes for correctness and consistency, e.g. safe_table = table.replace('"', '""') and interpolate safe_table.

Quality: Unused test_connection override in source metadata.py

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:215-218
InfluxDBSource.test_connection calls self.connection_obj.test_connection(), but the test-connection flow is driven by InfluxDBConnection.test_connection in connection.py (consistent with MongoDB/Cassandra, which only override it in connection.py). The base DatabaseServiceSource.test_connection uses test_connection_common. This source-level override appears to be dead code that duplicates health-check logic and may mask the standard behavior. Recommend removing it unless it is intentionally invoked somewhere in the ingestion path.

🤖 Prompt for agents
Code Review: Adds the InfluxDB 3 connector, addressing runtime validation failures in test connections and SQL malformation during sample data ingestion. Consider adjusting the 24-hour lookback window in `fetch_sample_rows` to ensure table previews succeed for stale data.

1. 💡 Edge Case: 24h window may yield no sample rows for stale tables
   Files: ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

   `fetch_sample_rows` hardcodes `WHERE time >= now() - INTERVAL '24 hours'`. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with `LIMIT SAMPLE_SIZE` (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

Options

Display: compact → Showing less information.

Comment with these commands to change the behavior for this request:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Comment thread ingestion/src/metadata/ingestion/source/database/influxdb/connection.py Outdated
… lifecycle

- Replace bare TestConnectionResult with test_connection_steps() call
- Define structured test_fn dict (CheckAccess + GetDatabases steps)
- Use self.client instead of creating/destroying a temporary client
- Fixes P1 bug where 'Test Connection' button hung in UI indefinitely
- Matches MongoDB/Cassandra NoSQL connector pattern
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

- Remove hard failure when list_databases() returns no databases
- A healthy but empty instance should pass connectivity check
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@gitar-bot

gitar-bot Bot commented Jul 2, 2026

Copy link
Copy Markdown
Code Review 👍 Approved with suggestions 5 resolved / 6 findings

Adds a new InfluxDB 3 connector with comprehensive unit testing, resolving previous test connection failures and SQL injection risks. Update the dataLength assignment in metadata.py to ensure accurate string-type metadata is surfaced in the UI.

💡 Edge Case: 24h window may yield no sample rows for stale tables

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

fetch_sample_rows hardcodes WHERE time >= now() - INTERVAL '24 hours'. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with LIMIT SAMPLE_SIZE (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

✅ 5 resolved
Bug: Sample data lookup may run before table is persisted

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:169-183
yield_table in metadata.py yields the CreateTableRequest to the sink via yield from super().yield_table(...) and then immediately calls _ingest_sample_data, which does self.metadata.get_by_name(entity=Table, fqn=table_fqn) and returns early (silently skipping sample data) when the entity is None.

The OpenMetadata ingestion workflow sinks yielded records through an asynchronous producer/consumer queue, so there is no guarantee the just-yielded table has been created on the server by the time get_by_name runs. When it hasn't, table_entity is None and sample data ingestion is silently skipped, so the advertised "sample data" capability will frequently be a no-op. This differs from the MongoDB/Cassandra pattern, which never calls get_by_name during yield_table.

Consider ingesting sample data through the standard mechanism (e.g. implementing sample-data support that runs after tables are persisted, such as in the profiler/sample-data stage) rather than reading the entity back inside yield_table.

Bug: databaseName used as both OM database name and schema filter

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:103-117
databaseName is consumed in two conflicting ways. get_database_names returns databaseName or DEFAULT_DATABASE as the OpenMetadata Database name (the "InfluxDB instance"), while get_schema_name_list also filters the InfluxDB databases (mapped to OM Schemas) down to the one equal to databaseName.

Per the documented mapping (InfluxDB instance → Database, InfluxDB database → Schema), databaseName is meant to limit which InfluxDB database/schema is ingested, not to rename the instance-level Database. As written, setting databaseName="iot_data" names the OM Database "iot_data" AND restricts schemas to "iot_data", producing a confusing iot_data.iot_data hierarchy. Consider keeping the OM Database name fixed (e.g. DEFAULT_DATABASE) and only using databaseName for schema filtering.

Security: Table name not escaped in fetch_sample_rows SQL

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104 📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:82-89
In fetch_sample_rows, the table name is interpolated directly into the query: f'SELECT * FROM "{table}" ...' with no escaping. This is inconsistent with get_columns, which escapes single quotes (table.replace("'", "''")). A table name containing a double quote (or other special characters) would break the query or allow SQL injection into the query_sql endpoint.

Table names originate from information_schema so the risk is low, but the code should escape embedded double quotes for correctness and consistency, e.g. safe_table = table.replace('"', '""') and interpolate safe_table.

Quality: Unused test_connection override in source metadata.py

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:215-218
InfluxDBSource.test_connection calls self.connection_obj.test_connection(), but the test-connection flow is driven by InfluxDBConnection.test_connection in connection.py (consistent with MongoDB/Cassandra, which only override it in connection.py). The base DatabaseServiceSource.test_connection uses test_connection_common. This source-level override appears to be dead code that duplicates health-check logic and may mask the standard behavior. Recommend removing it unless it is intentionally invoked somewhere in the ingestion path.

Edge Case: Test connection fails on healthy but empty InfluxDB instance

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:141-145
In test_get_databases, the step raises ConnectionError("No InfluxDB databases found") when client.list_databases() returns an empty list. A freshly provisioned, healthy InfluxDB 3 instance with no user databases (or one where all databases are filtered out — note list_databases() already excludes _internal) would cause the "Test Connection" flow to report failure even though credentials and connectivity are valid. Consider treating an empty result as a successful (possibly warning) outcome rather than a hard failure, so users can validate connectivity before creating databases.

🤖 Prompt for agents
Code Review: Adds a new InfluxDB 3 connector with comprehensive unit testing, resolving previous test connection failures and SQL injection risks. Update the `dataLength` assignment in `metadata.py` to ensure accurate string-type metadata is surfaced in the UI.

1. 💡 Edge Case: 24h window may yield no sample rows for stale tables
   Files: ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

   `fetch_sample_rows` hardcodes `WHERE time >= now() - INTERVAL '24 hours'`. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with `LIMIT SAMPLE_SIZE` (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

Options

Display: compact → Showing less information.

Comment with these commands to change the behavior for this request:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

- Replace ValueError with TypeError per ruff TRY004
- Remove unused connection_obj type annotation (F821)
- Ruff check: All checks passed
- Ruff format: all files conform to project style
@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@gitar-bot

gitar-bot Bot commented Jul 2, 2026

Copy link
Copy Markdown
Code Review 👍 Approved with suggestions 5 resolved / 6 findings

Adds a new InfluxDB 3 connector with comprehensive unit testing, resolving previous test connection failures and SQL injection risks. Update the dataLength assignment in metadata.py to ensure accurate string-type metadata is surfaced in the UI.

💡 Edge Case: 24h window may yield no sample rows for stale tables

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

fetch_sample_rows hardcodes WHERE time >= now() - INTERVAL '24 hours'. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with LIMIT SAMPLE_SIZE (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

✅ 5 resolved
Bug: Sample data lookup may run before table is persisted

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:169-183
yield_table in metadata.py yields the CreateTableRequest to the sink via yield from super().yield_table(...) and then immediately calls _ingest_sample_data, which does self.metadata.get_by_name(entity=Table, fqn=table_fqn) and returns early (silently skipping sample data) when the entity is None.

The OpenMetadata ingestion workflow sinks yielded records through an asynchronous producer/consumer queue, so there is no guarantee the just-yielded table has been created on the server by the time get_by_name runs. When it hasn't, table_entity is None and sample data ingestion is silently skipped, so the advertised "sample data" capability will frequently be a no-op. This differs from the MongoDB/Cassandra pattern, which never calls get_by_name during yield_table.

Consider ingesting sample data through the standard mechanism (e.g. implementing sample-data support that runs after tables are persisted, such as in the profiler/sample-data stage) rather than reading the entity back inside yield_table.

Bug: databaseName used as both OM database name and schema filter

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:103-117
databaseName is consumed in two conflicting ways. get_database_names returns databaseName or DEFAULT_DATABASE as the OpenMetadata Database name (the "InfluxDB instance"), while get_schema_name_list also filters the InfluxDB databases (mapped to OM Schemas) down to the one equal to databaseName.

Per the documented mapping (InfluxDB instance → Database, InfluxDB database → Schema), databaseName is meant to limit which InfluxDB database/schema is ingested, not to rename the instance-level Database. As written, setting databaseName="iot_data" names the OM Database "iot_data" AND restricts schemas to "iot_data", producing a confusing iot_data.iot_data hierarchy. Consider keeping the OM Database name fixed (e.g. DEFAULT_DATABASE) and only using databaseName for schema filtering.

Security: Table name not escaped in fetch_sample_rows SQL

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104 📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:82-89
In fetch_sample_rows, the table name is interpolated directly into the query: f'SELECT * FROM "{table}" ...' with no escaping. This is inconsistent with get_columns, which escapes single quotes (table.replace("'", "''")). A table name containing a double quote (or other special characters) would break the query or allow SQL injection into the query_sql endpoint.

Table names originate from information_schema so the risk is low, but the code should escape embedded double quotes for correctness and consistency, e.g. safe_table = table.replace('"', '""') and interpolate safe_table.

Quality: Unused test_connection override in source metadata.py

📄 ingestion/src/metadata/ingestion/source/database/influxdb/metadata.py:215-218
InfluxDBSource.test_connection calls self.connection_obj.test_connection(), but the test-connection flow is driven by InfluxDBConnection.test_connection in connection.py (consistent with MongoDB/Cassandra, which only override it in connection.py). The base DatabaseServiceSource.test_connection uses test_connection_common. This source-level override appears to be dead code that duplicates health-check logic and may mask the standard behavior. Recommend removing it unless it is intentionally invoked somewhere in the ingestion path.

Edge Case: Test connection fails on healthy but empty InfluxDB instance

📄 ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:141-145
In test_get_databases, the step raises ConnectionError("No InfluxDB databases found") when client.list_databases() returns an empty list. A freshly provisioned, healthy InfluxDB 3 instance with no user databases (or one where all databases are filtered out — note list_databases() already excludes _internal) would cause the "Test Connection" flow to report failure even though credentials and connectivity are valid. Consider treating an empty result as a successful (possibly warning) outcome rather than a hard failure, so users can validate connectivity before creating databases.

🤖 Prompt for agents
Code Review: Adds a new InfluxDB 3 connector with comprehensive unit testing, resolving previous test connection failures and SQL injection risks. Update the `dataLength` assignment in `metadata.py` to ensure accurate string-type metadata is surfaced in the UI.

1. 💡 Edge Case: 24h window may yield no sample rows for stale tables
   Files: ingestion/src/metadata/ingestion/source/database/influxdb/connection.py:91-104

   `fetch_sample_rows` hardcodes `WHERE time >= now() - INTERVAL '24 hours'`. Tables that have not received data in the last 24 hours will return no rows, so the sample-data preview will be empty even though the table has historical data. Combined with `LIMIT SAMPLE_SIZE` (1000), this fixed window is an implicit assumption that all tables are actively written. Consider dropping the time predicate (relying on LIMIT + ordering) or making the window configurable so preview data is available for lower-frequency measurements.

Options

Display: compact → Showing less information.

Comment with these commands to change the behavior for this request:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add InfluxDB 3 database connector

1 participant