Add StreamThrottler for rate-limiting bursty values#53
Conversation
Leading-edge throttler that delivers the first value immediately and drops subsequent values until the time window expires. Useful for typing indicators, read receipts, and notification delivery.
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
WalkthroughIntroduces a new Stream throttling API via the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Throttler as StreamThrottlerImpl
participant Callback
participant Coroutine as Coroutine Delay
Client->>Throttler: submit(value1)
Throttler->>Throttler: window inactive?
Throttler->>Callback: onValue(value1)
Callback->>Client: ✓ returns true
Throttler->>Coroutine: launch delay(windowMs)
Client->>Throttler: submit(value2)
Throttler->>Throttler: window active?
Throttler->>Client: ✗ returns false (dropped)
Coroutine->>Throttler: delay complete
Throttler->>Throttler: clear window
Client->>Throttler: submit(value3)
Throttler->>Throttler: window inactive?
Throttler->>Callback: onValue(value3)
Callback->>Client: ✓ returns true
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt (1)
86-90: Consider validatingwindowMsparameter.Negative or zero values for
windowMscould cause unexpected behavior (immediate window close or no-op delay). Arequire(windowMs > 0)check would make the API more robust.🛡️ Suggested validation
public fun <T> StreamThrottler( scope: CoroutineScope, logger: StreamLogger, windowMs: Long = 3_000L, -): StreamThrottler<T> = StreamThrottlerImpl(scope = scope, logger = logger, windowMs = windowMs) +): StreamThrottler<T> { + require(windowMs > 0) { "windowMs must be positive, was $windowMs" } + return StreamThrottlerImpl(scope = scope, logger = logger, windowMs = windowMs) +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt` around lines 86 - 90, The factory function StreamThrottler(...) accepts windowMs but does not validate it; add a guard at the start of the factory (or inside StreamThrottlerImpl constructor) such as require(windowMs > 0) { "windowMs must be > 0" } (or throw IllegalArgumentException) to prevent zero/negative values causing undefined behavior; update the StreamThrottler factory that returns StreamThrottlerImpl (and/or StreamThrottlerImpl's constructor) to perform this check and surface a clear error message referencing the windowMs parameter.stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt (1)
214-237: Consider adding a test for the reset race condition.This test verifies that
reset()allows immediate delivery, but doesn't advance time past the original window boundary. A test that continues past t=1000 (the original window expiry) would reveal the race condition where the stale delay coroutine prematurely clears the new window.📝 Suggested additional test case
`@Test` fun `reset mid-window does not allow stale delay to close new window`() = runTest { val dispatcher = StandardTestDispatcher(testScheduler) val scope = CoroutineScope(SupervisorJob() + dispatcher) val delivered = mutableListOf<String>() val throttler = StreamThrottlerImpl<String>( scope = scope, logger = mockk(relaxed = true), windowMs = 1_000, ) throttler.onValue { delivered.add(it) } throttler.submit("first") // t=0, window until t=1000 testScheduler.runCurrent() advanceTimeBy(300) throttler.reset() throttler.submit("second") // t=300, new window until t=1300 testScheduler.runCurrent() // Advance past original window expiry but before new window expiry advanceTimeBy(800) // now at t=1100 testScheduler.runCurrent() // This should be dropped (new window active until t=1300) assertFalse(throttler.submit("should-be-dropped")) assertEquals(listOf("first", "second"), delivered) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt` around lines 214 - 237, The test reveals a race where a stale delay coroutine from the original window can clear the newly started window after reset; in StreamThrottlerImpl ensure reset() either cancels the previous window delay coroutine/job or uses a monotonic window token/epoch that the delay coroutine checks before clearing the window so only the matching epoch can clear it; update the implementations of reset(), the window scheduling logic (the coroutine started after submit()), and any window-clear path to cancel or validate the current window token so the stale delay cannot close a newly-reset window.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.kt`:
- Around line 44-57: The submit(...) logic can be racy with reset(): the delay
coroutine launched in submit() unconditionally calls windowActive.set(false)
when it completes, so a reset() or a new submit() can be undone by an earlier
pending coroutine; modify the implementation to tie the window-expiry to a
generation token or a cancellable Job so only the most recent window can clear
windowActive. Concretely, add a generation counter (AtomicInteger) or store the
delay coroutine's Job (e.g., activeDelayJob) when you scope.launch the delay,
and in reset() increment the generation or cancel the Job; in the delay
coroutine capture the current generation (or check Job.isActive) and only call
windowActive.set(false) if the generation matches (or the Job was not
cancelled). Update submit(), reset(), and the delay-launching code paths
(references: submit, reset, windowActive, windowMs, scope.launch, callbackRef)
accordingly.
---
Nitpick comments:
In
`@stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.kt`:
- Around line 86-90: The factory function StreamThrottler(...) accepts windowMs
but does not validate it; add a guard at the start of the factory (or inside
StreamThrottlerImpl constructor) such as require(windowMs > 0) { "windowMs must
be > 0" } (or throw IllegalArgumentException) to prevent zero/negative values
causing undefined behavior; update the StreamThrottler factory that returns
StreamThrottlerImpl (and/or StreamThrottlerImpl's constructor) to perform this
check and surface a clear error message referencing the windowMs parameter.
In
`@stream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt`:
- Around line 214-237: The test reveals a race where a stale delay coroutine
from the original window can clear the newly started window after reset; in
StreamThrottlerImpl ensure reset() either cancels the previous window delay
coroutine/job or uses a monotonic window token/epoch that the delay coroutine
checks before clearing the window so only the matching epoch can clear it;
update the implementations of reset(), the window scheduling logic (the
coroutine started after submit()), and any window-clear path to cancel or
validate the current window token so the stale delay cannot close a newly-reset
window.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: a1fb316b-e7fa-4b12-9970-cb36a30b301d
📒 Files selected for processing (3)
stream-android-core/src/main/java/io/getstream/android/core/api/processing/StreamThrottler.ktstream-android-core/src/main/java/io/getstream/android/core/internal/processing/StreamThrottlerImpl.ktstream-android-core/src/test/java/io/getstream/android/core/internal/processing/StreamThrottlerImplTest.kt
StreamThrottler for rate-limiting bursty values
Replace fixed leading-edge with StreamThrottlePolicy sealed interface supporting Leading, Trailing, and LeadingAndTrailing strategies. Rename mode→policy for consistency with StreamRetryPolicy.
Unify window-expiry tracking across all throttle modes using a shared windowJob reference. reset() now cancels the pending coroutine, preventing a stale delay from prematurely closing a new window started after reset. Adds regression test for the exact race scenario.
Validate windowMs > 0 in StreamThrottlePolicy factory methods. Add 6 edge case tests: invalid windowMs, callback exception recovery, trailing stale window race, post-trailing window restart, double reset, and submit after scope cancellation.
- Make data class constructors internal to force validation via factory methods, preventing windowMs <= 0 bypass - Fix reset() ordering: clear trailingValue before opening windowActive to prevent concurrent submit from losing its value - Fix LeadingAndTrailing KDoc: trailing fires when a newer value was submitted, not when it differs from the leading value
Prevents bypassing validation via copy() on StreamThrottlePolicy subtypes. Constructors remain internal, copy() now matches.
Cover StreamSingleFlightProcessor, StreamRetryProcessor, StreamSerialProcessingQueue, StreamBatcher, StreamDebouncer, and StreamThrottler factory functions (all three policy modes).
|
|
🚀 Available in v4.0.0 |



Goal
Add a configurable throttler primitive to core's processing toolkit. Rate-limits bursty value streams with pluggable strategies — needed for typing indicators, read receipts, and notification delivery across chat and video products.
Implementation
StreamThrottlePolicysealed interface with three strategies:Leading— first value delivered immediately, rest dropped until window expiresTrailing— last value delivered when window expiresLeadingAndTrailing— first value immediately + last value at window endStreamThrottler<T>interface inapi/processing/withonValue,submit,resetStreamThrottlerImpl<T>usingAtomicBooleanfor lock-free window gatingreset()to prevent stale coroutine races@ConsistentCopyVisibility+ internal constructors to force validation via factory methodswindowMsvalidated > 0Testing
pr: core