Skip to content

[FLINK-39748][postgres] Fix snapshot timestamp drift for historical TIMESTAMP/DATE columns#4412

Open
JNSimba wants to merge 2 commits into
apache:masterfrom
JNSimba:fix-pg-snapshot-timestamp-calendar-drift
Open

[FLINK-39748][postgres] Fix snapshot timestamp drift for historical TIMESTAMP/DATE columns#4412
JNSimba wants to merge 2 commits into
apache:masterfrom
JNSimba:fix-pg-snapshot-timestamp-calendar-drift

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented May 25, 2026

This closes FLINK-39748.

The Postgres CDC snapshot path reads column values via a bare rs.getObject(i + 1) in PostgresScanFetchTask. For TIMESTAMP / TIMESTAMPTZ / DATE columns, the PG JDBC driver constructs the returned java.sql.Timestamp / java.sql.Date through GregorianCalendar (default Julian/Gregorian cutover at 1582-10-15) using the JVM default time zone. This makes pre-cutover dates drift by N days (e.g. 0001-01-01 by 2 days), and also adds an LMT delta on JVMs whose default zone has an LMT segment (e.g. Asia/Shanghai is +08:05:43 until 1901, vs +08:00:00 after).

The Postgres logical decoding (streaming) path does not pass through GregorianCalendar, so the same row produces different Debezium records on snapshot vs streaming, breaking idempotent UPSERT semantics for downstream sinks.

This patch:

  1. Routes the snapshot path through PostgresConnection.getColumnValue, which already does per-type dispatch for MONEY / BIT / NUMERIC / TIME / TIMETZ, by replacing the bare rs.getObject(i + 1) in PostgresScanFetchTask.createDataEventsForTable with jdbcConnection.getColumnValue(rs, i + 1, column, table, databaseSchema). This mirrors how Debezium's own RelationalSnapshotChangeEventSource reads rows.

  2. Extends the switch in PostgresConnection.getColumnValue with three new cases for PgOid.TIMESTAMP / TIMESTAMPTZ / DATE, reading the columns as java.time.LocalDateTime / OffsetDateTime / LocalDate via rs.getObject(columnIndex, ...class). This bypasses GregorianCalendar. PG +/-infinity sentinels are preserved as Timestamp(Long.MAX/MIN_VALUE) to keep the existing downstream contract.

A regression test in PostgresScanFetchTaskTest snapshots boundary dates (0001-01-01, 1582-10-04, 1582-10-15, 1900-12-31, 1901-01-02, and a microsecond-precision value) for TIMESTAMP and DATE columns and asserts the produced Debezium record values match the proleptic-UTC expectation.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a snapshot vs. streaming drift in the Postgres CDC connector for historical TIMESTAMP / TIMESTAMPTZ / DATE values. The snapshot path previously used a bare rs.getObject(i + 1), which routes through GregorianCalendar (Julian/Gregorian cutover + JVM-default LMT segments) and produced different values than the logical-replication streaming path, breaking idempotent UPSERTs. The fix routes snapshot column reads through PostgresConnection.getColumnValue and extends it with java.time-based extraction for temporal types, while preserving the existing +/-infinityTimestamp(Long.MAX/MIN_VALUE) contract.

Changes:

  • PostgresScanFetchTask.createDataEventsForTable now calls jdbcConnection.getColumnValue(rs, i + 1, col, table, databaseSchema) instead of rs.getObject(i + 1), mirroring Debezium's RelationalSnapshotChangeEventSource and other connectors (Oracle/DB2/SQL Server).
  • PostgresConnection.getColumnValue adds cases for PgOid.TIMESTAMP / TIMESTAMPTZ / DATE that read via LocalDateTime / OffsetDateTime / LocalDate, with LocalDateTime.MAX/MIN and OffsetDateTime.MAX/MIN mapped back to Timestamp(Long.MAX/MIN_VALUE) for infinity.
  • New historical_dates.sql fixture and PostgresScanFetchTaskTest.testHistoricalDatesInSnapshotScan regression test covering boundary dates (0001-01-01, 1582-10-04, 1582-10-15, 1900-12-31 23:59:59.123456, 1901-01-02) on TIMESTAMP(6), TIMESTAMP(6) WITH TIME ZONE, and DATE columns.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
.../postgres/source/fetch/PostgresScanFetchTask.java Switches snapshot row population from rs.getObject to the connection-level getColumnValue dispatch.
.../io/debezium/connector/postgresql/connection/PostgresConnection.java Adds TIMESTAMP / TIMESTAMPTZ / DATE cases using java.time types, preserving PG infinity semantics.
.../test/.../PostgresScanFetchTaskTest.java Adds testHistoricalDatesInSnapshotScan and a raw SourceRecord helper to verify Debezium values for boundary dates.
.../test/resources/ddl/historical_dates.sql New DDL fixture seeding boundary dates around the Julian/Gregorian cutover and the Asia/Shanghai LMT switch.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Copy Markdown

@liaoxin01 liaoxin01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants