Skip to content

sink: add before field for avro protocol#5154

Open
wk989898 wants to merge 2 commits into
pingcap:masterfrom
wk989898:avro-before
Open

sink: add before field for avro protocol#5154
wk989898 wants to merge 2 commits into
pingcap:masterfrom
wk989898:avro-before

Conversation

@wk989898
Copy link
Copy Markdown
Collaborator

What problem does this PR solve?

Issue Number: close #5153

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

wk989898 added 2 commits May 29, 2026 09:00
Signed-off-by: wk989898 <nhsmwk@gmail.com>
Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot ti-chi-bot Bot added the release-note Denotes a PR that will be considered when it comes time to generate release notes. label May 29, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 29, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 48d0415e-5b2b-422f-aa5c-f7db5fe3768b

📥 Commits

Reviewing files that changed from the base of the PR and between 99f4859 and 67596ee.

📒 Files selected for processing (11)
  • api/v2/model.go
  • api/v2/model_test.go
  • pkg/config/sink.go
  • pkg/sink/codec/avro/arvo.go
  • pkg/sink/codec/avro/avro_test.go
  • pkg/sink/codec/avro/decoder.go
  • pkg/sink/codec/avro/helper.go
  • pkg/sink/codec/common/config.go
  • pkg/sink/codec/common/config_test.go
  • tests/integration_tests/avro_basic/data/data.sql
  • tests/integration_tests/avro_basic/run.sh

📝 Walkthrough

Walkthrough

This PR adds optional before-value inclusion to TiCDC's Avro codec. When AvroIncludeBeforeValue is enabled, insert/update/delete events include pre-row data in a ticdcBefore union field. The setting is exposed as a configuration flag throughout the API, internal config, and codec layers, with encoder/decoder implementations and comprehensive test coverage.

Changes

Avro Before-Value Support

Layer / File(s) Summary
Configuration contract and parameter wiring
api/v2/model.go, api/v2/model_test.go, pkg/config/sink.go, pkg/sink/codec/common/config.go, pkg/sink/codec/common/config_test.go
AvroIncludeBeforeValue field added to API and internal config models with v2↔internal conversion; common codec config wires query parameter parsing, CLI/TOML support, and config merging so the flag can be set via Kafka URI or SinkConfig.
Avro encoder before-value implementation
pkg/sink/codec/avro/arvo.go
encodeValue now selects PreRows for delete events when before-value is enabled and generates a separate before-record schema; ticdcBefore union field conditionally added to the Avro record; checksum handling updated to use Checksum.Previous for deletes and includes new helpers for before-record encoding.
Avro decoder before-value assembly
pkg/sink/codec/avro/decoder.go
NextDMLEvent distinguishes legacy key-only deletes from value-carrying deletes via hasValue flag; assembleEvent refactored to conditionally derive commitTs and require before data for value-carrying deletes; checksum extraction differentiates delete (Previous) from insert/update (Current); extension-field detection generalized via isAvroExtensionField helper.
Helper constants and operation codes
pkg/sink/codec/avro/helper.go
Delete operation code "d" and internal field constant _ticdc_before introduced; getOperation extended to recognize delete events via IsDelete flag.
Unit tests for configuration and encoding/decoding
pkg/sink/codec/avro/avro_test.go, pkg/sink/codec/common/config_test.go
Config tests validate query parameter and SinkConfig parsing; TestAvroEncodeDeleteChecksum validates checksum extraction for deletes; TestAvroEncodeIncludeBeforeValue table-driven test covers insert/update/delete round-trip with before-value field presence and checksum validation.
Integration test data and configuration
tests/integration_tests/avro_basic/data/data.sql, tests/integration_tests/avro_basic/run.sh
New test table with insert/update/delete sequence added to SQL data; Kafka sink URI updated with avro-include-before-value=true flag for integration test coverage.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

A rabbit hopped through codec schemes,
Encoding before-values, it seems,
Delete events now remember their past,
With ticdcBefore unions so vast,
ticdc whispers sweet change at last! 🐰✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.39% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title clearly summarizes the main change: adding a before field for the Avro protocol in the sink component.
Description check ✅ Passed The pull request description follows the template structure with Issue Number and multiple test types checked, but lacks detailed implementation explanation and release notes.
Linked Issues check ✅ Passed The code changes implement support for before-value in the Avro protocol by adding configuration options, updating encoders/decoders, and adding comprehensive tests covering the feature.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing Avro before-value support: configuration handling, encoder/decoder updates, schema generation, and integration tests. No unrelated changes detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.12.2)

Command failed


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label May 29, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 29, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 29, 2026

@wk989898: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-error-log-review 67596ee link true /test pull-error-log-review

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the AvroIncludeBeforeValue configuration option, allowing Avro-encoded update and delete events to include their pre-row ('before') values under the _ticdc_before field. The changes span across configuration definitions, the Avro encoder/decoder implementations, and integration tests. Feedback on the changes highlights two key issues in the decoder: first, a suggestion to dynamically check the _tidb_op field in the decoded map to determine delete events rather than relying on the decoder's static configuration; second, a potential out-of-bounds panic when splitting the schema namespace if it does not contain a dot.

Comment on lines +147 to +149
if d.config.AvroIncludeBeforeValue {
isDelete = valueMap[tidbOp] == deleteOperation
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Instead of relying on the decoder's local configuration d.config.AvroIncludeBeforeValue to determine if the event is a delete, it is much more robust to dynamically check for the presence of the _tidb_op field in the decoded valueMap. This prevents decoding failures or mismatches if the decoder's configuration does not perfectly align with the producer's configuration.

Suggested change
if d.config.AvroIncludeBeforeValue {
isDelete = valueMap[tidbOp] == deleteOperation
}
if op, ok := valueMap[tidbOp].(string); ok {
isDelete = op == deleteOperation
}

Comment on lines +247 to +248
namespace := schema["namespace"].(string)
schemaName := strings.Split(namespace, ".")[1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the namespace string does not contain a dot (e.g., if the schema or keyspace is empty), strings.Split(namespace, ".")[1] will panic with an out-of-bounds index. It is safer to check the length of the split parts before accessing the index.

namespace := schema["namespace"].(string)
	parts := strings.Split(namespace, ".")
	var schemaName string
	if len(parts) > 1 {
		schemaName = parts[1]
	} else {
		schemaName = namespace
	}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TiCDC Avro Protocol Supports "Before" State

1 participant