test(connectors): add source liveness helper#3377
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #3377 +/- ##
=============================================
- Coverage 73.71% 46.18% -27.54%
Complexity 943 943
=============================================
Files 1223 1229 +6
Lines 115755 105623 -10132
Branches 92491 82359 -10132
=============================================
- Hits 85329 48777 -36552
- Misses 27556 54206 +26650
+ Partials 2870 2640 -230
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
remove this file; source_suite.rs code should be self-documenting
There was a problem hiding this comment.
I have removed source_suite.md; the helper now stands on the Rust code only.
|
overall change is OK, but what about the rest of connectors? you marked that #2892 will be closed, yet this issue it only touching the random source connector which is essentially a pure-test connector type. |
@hubcio I have changed this to |
hubcio
left a comment
There was a problem hiding this comment.
thanks for PR, I finally had some time to properly review it.
two blockers inline (the restart test can't actually fail, and poll_until_min_messages drops messages when min_messages is above one batch), then a note on direction.
on direction: #2892 is really about behaviour - progress-before-send, retry-on-fail, restart-resume, mark/delete-after-commit, replay window. a poll-until-non-empty helper proves liveness, which already works everywhere; it doesn't give a seam for any of those, and two structural things stop it growing into the suite as-is.
first, the random source can't host 4 of the 5 checks - no seed, the produced ProducedMessage goes out with envelope id: None (only a random, non-monotonic Uuid lives inside the json payload as Record.id), no consumer-visible cursor (just an in-process messages_produced counter that's already unit-tested in the crate), and no external system to mark/delete. so it can only ever be a liveness smoke. the sources that already have an observable, gated cursor are postgres and influxdb - those are the honest first wires for a real behaviour test, and the postgres source/restart tests from #2579 are the pattern to copy.
second, heads up that behaviours 1 and 2 aren't guaranteed by the SDK today: the plugin advances its cursor inside poll() before returning (random does messages_produced +=, postgres marks/deletes rows and bumps tracking offsets), and the send path is one-way - SendCallback returns (), and the runtime's failed-send branch only logs/metrics/sets-error, nothing re-offers the batch. so on a failed send the un-sent batch is dropped (at-most-once on the runtime->iggy hop), and #2892 item 2 can't pass against any source without an SDK/FFI change. this is the same contract question already open in #2940 (partial writes / replay-safe progress / deterministic write identity) - worth linking the suite scope to that and rescoping, rather than writing a test that asserts behaviour the code doesn't have. the one invariant the runtime does enforce is on-disk state only advances on send success (the state save sits entirely in the success arm), which is testable now.
suggested shape if you want this to become the real suite: a small SourceUnderTest trait the runner is generic over (config_toml, seed(n) with a monotonic stamped id, expected()), plus one accumulating poll_until_n that replaces the inline loops and unifies the retry budget - today that's two independent per-module copies of POLL_ATTEMPTS/POLL_INTERVAL_MS (postgres + elasticsearch) plus hardcoded literals in influxdb, so there's no single shared constant to point at yet. that's basically the existing postgres fixture made generic. with the connector roadmap (#2753) and new sources still landing (mongodb #3285, meilisearch #3404), a real per-source baseline pays off. random then stays as an explicit liveness smoke and the restart check lives on postgres.
|
Updated this to liveness-only: random restart is dropped, polling now accumulates/retries, and postgres/influxdb behavior coverage can follow with #2940 scope in mind. |
| use integration::harness::{TestHarness, seeds}; | ||
| use tokio::time::{sleep, timeout}; | ||
|
|
||
| pub(crate) struct SourceSuiteConfig { |
There was a problem hiding this comment.
SourceSuiteConfig carries 5 fields plus a Default impl, but the only caller goes through ::default() with min_messages=1 and never overrides anything, and poll_until_min_messages is pub(crate) with no other caller. that's parameterization with nothing to parameterize yet. non-blocking, but worth collapsing: inline the poll loop into assert_source_produces_messages with hardcoded constants (min 1, batch 100, 100ms retry, 5s timeout). bonus, it makes the multi-batch accumulate path unreachable by construction instead of unreachable by luck. when the real postgres/influxdb suite lands it'll want an observable-cursor abstraction, not this counter poller, so there's little to reuse here anyway.
| impl Default for SourceSuiteConfig { | ||
| fn default() -> Self { | ||
| Self { | ||
| consumer_name: "source_suite_consumer", |
There was a problem hiding this comment.
the source_suite naming (this file, the struct, the source_suite_consumer id, the source suite timed out panic text) over-promises - the actual behaviour suite was deferred to the postgres/influxdb follow-up, and this is a liveness smoke for the one pure-test source. renaming to something like random_source_liveness would make the name match the scope. follow-up, not blocking.
| }) | ||
| } | ||
|
|
||
| pub(crate) async fn assert_source_produces_messages(harness: &TestHarness) -> Vec<IggyMessage> { |
There was a problem hiding this comment.
-> Vec<IggyMessage> is unused - the sole caller (random_source_produces_messages) discards the result. make it -> () unless a content-checking caller is coming soon.
|
|
||
| pub(crate) async fn assert_source_produces_messages(harness: &TestHarness) -> Vec<IggyMessage> { | ||
| let messages = poll_until_min_messages(harness, &SourceSuiteConfig::default()).await; | ||
| assert!( |
There was a problem hiding this comment.
this assert!(!messages.is_empty()) can't fire - poll_until_min_messages only returns when len() >= min_messages (default 1) or panics on timeout, so a successful return already guarantees non-empty. drop it. (if you inline the loop here as suggested above it turns into a real guard, since the threshold constant would live alongside.)
|
I approved, above comments are just nits. feel free to fix them now or in future PRs. let me know :) |
Summary
Part of #2892.
Adds a small source connector liveness helper and wires the random source through it as smoke coverage only.
Changes:
connectors::source_suitefor bounded, accumulating source message polling.The broader behavior suite should start from sources with observable cursors/ids, such as postgres or influxdb, with #2940 scope kept separate for failed-send semantics.
Local validation
cargo fmt --all --checkgit diff --checkLocal blockers:
cargo test -p integration --no-runis blocked on Windows before reaching these tests by existing Unix-onlymessage_busimports and missingpkg-config/hwlocdiscovery.cargois not installed in WSL.