From 6732ff694580090752d07aa21882bb2130bc80b8 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 28 May 2026 11:26:57 -0400 Subject: [PATCH 1/5] Implement retry state machine with CDN-driven httpConfig support Port the RetryStateMachine architecture from analytics-kotlin to align retry behavior across all three SDKs. The state machine classifies HTTP status codes, tracks per-batch failure counts with exponential backoff, handles 429 rate limiting with Retry-After support, and accepts dynamic configuration from CDN settings (httpConfig). Key changes: - Add Retry/ module: RetryStateMachine, RetryState, RetryConfig, RetryTypes, TimeProvider, RetryStateStorage, HttpConfigParser - Update EventPipeline and SyncEventPipeline upload loops to consult the state machine before/after each upload - Add X-Retry-Count header and Retry-After parsing to HTTPClient - Wire SegmentDestination to parse httpConfig from CDN settings - Update e2e-cli with retry-enabled pipeline provider and poll loop - Add 47 unit tests and enable retry + retry-settings e2e suites (68 e2e tests passing) --- Analytics-CSharp/Analytics-CSharp.csproj | 3 + .../Analytics/Plugins/SegmentDestination.cs | 16 +- .../Analytics/Retry/HttpConfigParser.cs | 128 ++++++ .../Segment/Analytics/Retry/RetryConfig.cs | 123 ++++++ .../Segment/Analytics/Retry/RetryState.cs | 93 ++++ .../Analytics/Retry/RetryStateMachine.cs | 226 ++++++++++ .../Analytics/Retry/RetryStateStorage.cs | 220 ++++++++++ .../Segment/Analytics/Retry/RetryTypes.cs | 58 +++ .../Segment/Analytics/Retry/TimeProvider.cs | 14 + .../Analytics/Utilities/EventPipeline.cs | 96 +++- .../Segment/Analytics/Utilities/HTTPClient.cs | 67 ++- .../Segment/Analytics/Utilities/Storage.cs | 2 + .../Analytics/Utilities/SyncEventPipeline.cs | 100 ++++- Tests/Retry/HttpConfigParserTest.cs | 107 +++++ Tests/Retry/RetryStateMachineTest.cs | 413 ++++++++++++++++++ Tests/Retry/RetryStateStorageTest.cs | 113 +++++ Tests/Tests.csproj | 2 +- Tests/Utilities/EventPipelineTest.cs | 25 +- e2e-cli/Program.cs | 139 ++++-- e2e-cli/e2e-config.json | 8 +- 20 files changed, 1876 insertions(+), 77 deletions(-) create mode 100644 Analytics-CSharp/Segment/Analytics/Retry/HttpConfigParser.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Retry/RetryConfig.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Retry/RetryState.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Retry/RetryStateMachine.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Retry/RetryTypes.cs create mode 100644 Analytics-CSharp/Segment/Analytics/Retry/TimeProvider.cs create mode 100644 Tests/Retry/HttpConfigParserTest.cs create mode 100644 Tests/Retry/RetryStateMachineTest.cs create mode 100644 Tests/Retry/RetryStateStorageTest.cs diff --git a/Analytics-CSharp/Analytics-CSharp.csproj b/Analytics-CSharp/Analytics-CSharp.csproj index 7729c72..32c7619 100644 --- a/Analytics-CSharp/Analytics-CSharp.csproj +++ b/Analytics-CSharp/Analytics-CSharp.csproj @@ -49,5 +49,8 @@ <_Parameter1>DynamicProxyGenAssembly2 + + <_Parameter1>e2e-cli + diff --git a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs index 9586be0..de0dd9d 100644 --- a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs +++ b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs @@ -1,3 +1,4 @@ +using Segment.Analytics.Retry; using Segment.Analytics.Utilities; using Segment.Serialization; using Segment.Sovran; @@ -77,10 +78,23 @@ public override void Update(Settings settings, UpdateType type) base.Update(settings, type); JsonObject segmentInfo = settings.Integrations?.GetJsonObject(Key); + string apiHost = segmentInfo?.GetString(ApiHost); if (apiHost != null && _pipeline != null) - { _pipeline.ApiHost = apiHost; + + JsonObject httpConfigJson = segmentInfo?.GetJsonObject("httpConfig"); + if (httpConfigJson != null) + { + HttpConfig parsedConfig = HttpConfigParser.Parse(httpConfigJson); + if (parsedConfig != null) + { + EventPipeline concretePipeline = _pipeline as EventPipeline; + concretePipeline?.UpdateHttpConfig(parsedConfig); + + SyncEventPipeline syncPipeline = _pipeline as SyncEventPipeline; + syncPipeline?.UpdateHttpConfig(parsedConfig); + } } } diff --git a/Analytics-CSharp/Segment/Analytics/Retry/HttpConfigParser.cs b/Analytics-CSharp/Segment/Analytics/Retry/HttpConfigParser.cs new file mode 100644 index 0000000..8412944 --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Retry/HttpConfigParser.cs @@ -0,0 +1,128 @@ +using System.Collections.Generic; +using System.Globalization; +using Segment.Serialization; + +namespace Segment.Analytics.Retry +{ + internal static class HttpConfigParser + { + public static HttpConfig Parse(JsonObject httpConfigJson) + { + if (httpConfigJson == null) + return null; + + JsonObject rateLimitJson = httpConfigJson.GetJsonObject("rateLimitConfig"); + JsonObject backoffJson = httpConfigJson.GetJsonObject("backoffConfig"); + + // CDN-sourced config defaults enabled to true (presence implies active). + // Only honor explicit enabled: false from CDN. + bool rateLimitEnabled = true; + if (rateLimitJson != null) + { + string enabledStr = rateLimitJson.GetString("enabled"); + if (enabledStr != null && bool.TryParse(enabledStr, out bool parsed)) + rateLimitEnabled = parsed; + } + + bool backoffEnabled = true; + if (backoffJson != null) + { + string enabledStr = backoffJson.GetString("enabled"); + if (enabledStr != null && bool.TryParse(enabledStr, out bool parsed)) + backoffEnabled = parsed; + } + + RateLimitConfig rateLimitConfig = ParseRateLimitConfig(rateLimitJson, rateLimitEnabled); + BackoffConfig backoffConfig = ParseBackoffConfig(backoffJson, backoffEnabled); + + return new HttpConfig( + rateLimitConfig: rateLimitConfig.Validated(), + backoffConfig: backoffConfig.Validated() + ); + } + + private static RateLimitConfig ParseRateLimitConfig(JsonObject json, bool enabled) + { + if (json == null) + return new RateLimitConfig(enabled: enabled); + + int maxRetryCount = 100; + string maxRetriesStr = json.GetString("maxRetryCount"); + if (maxRetriesStr != null && int.TryParse(maxRetriesStr, out int parsedMaxRetries)) + maxRetryCount = parsedMaxRetries; + + int maxRetryInterval = 300; + string intervalStr = json.GetString("maxRetryInterval"); + if (intervalStr != null && int.TryParse(intervalStr, out int parsedInterval)) + maxRetryInterval = parsedInterval; + + return new RateLimitConfig( + enabled: enabled, + maxRetryCount: maxRetryCount, + maxRetryInterval: maxRetryInterval + ); + } + + private static BackoffConfig ParseBackoffConfig(JsonObject json, bool enabled) + { + if (json == null) + return new BackoffConfig(enabled: enabled); + + int maxRetryCount = 100; + string maxRetriesStr = json.GetString("maxRetryCount"); + if (maxRetriesStr != null && int.TryParse(maxRetriesStr, out int parsedMaxRetries)) + maxRetryCount = parsedMaxRetries; + + double baseBackoffInterval = 0.5; + string baseStr = json.GetString("baseBackoffInterval"); + if (baseStr != null && double.TryParse(baseStr, NumberStyles.Float, CultureInfo.InvariantCulture, out double parsedBase)) + baseBackoffInterval = parsedBase; + + int maxBackoffInterval = 300; + string maxStr = json.GetString("maxBackoffInterval"); + if (maxStr != null && int.TryParse(maxStr, out int parsedMax)) + maxBackoffInterval = parsedMax; + + long maxTotalBackoffDuration = 43200; + string durationStr = json.GetString("maxTotalBackoffDuration"); + if (durationStr != null && long.TryParse(durationStr, out long parsedDuration)) + maxTotalBackoffDuration = parsedDuration; + + int jitterPercent = 10; + string jitterStr = json.GetString("jitterPercent"); + if (jitterStr != null && int.TryParse(jitterStr, out int parsedJitter)) + jitterPercent = parsedJitter; + + Dictionary statusCodeOverrides = null; + JsonObject overridesJson = json.GetJsonObject("statusCodeOverrides"); + if (overridesJson != null) + statusCodeOverrides = ParseStatusCodeOverrides(overridesJson); + + return new BackoffConfig( + enabled: enabled, + maxRetryCount: maxRetryCount, + baseBackoffInterval: baseBackoffInterval, + maxBackoffInterval: maxBackoffInterval, + maxTotalBackoffDuration: maxTotalBackoffDuration, + jitterPercent: jitterPercent, + statusCodeOverrides: statusCodeOverrides + ); + } + + private static Dictionary ParseStatusCodeOverrides(JsonObject json) + { + var result = new Dictionary(); + foreach (string key in json.Keys) + { + if (!int.TryParse(key, out int code) || code < 100 || code > 599) + continue; + string val = json.GetString(key); + if (val == "retry") + result[code] = RetryBehavior.Retry; + else if (val == "drop") + result[code] = RetryBehavior.Drop; + } + return result; + } + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryConfig.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryConfig.cs new file mode 100644 index 0000000..5be1331 --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryConfig.cs @@ -0,0 +1,123 @@ +using System; +using System.Collections.Generic; + +namespace Segment.Analytics.Retry +{ + internal class RateLimitConfig + { + public bool Enabled { get; } + public int MaxRetryCount { get; } + public int MaxRetryInterval { get; } + + public RateLimitConfig(bool enabled = false, int maxRetryCount = 100, int maxRetryInterval = 300) + { + Enabled = enabled; + MaxRetryCount = maxRetryCount; + MaxRetryInterval = maxRetryInterval; + } + + public RateLimitConfig Validated() => new RateLimitConfig( + enabled: Enabled, + maxRetryCount: Math.Max(0, Math.Min(MaxRetryCount, 1000)), + maxRetryInterval: Math.Max(1, Math.Min(MaxRetryInterval, 3600)) + ); + } + + internal class BackoffConfig + { + public bool Enabled { get; } + public int MaxRetryCount { get; } + public double BaseBackoffInterval { get; } + public int MaxBackoffInterval { get; } + public long MaxTotalBackoffDuration { get; } + public int JitterPercent { get; } + public RetryBehavior Default4xxBehavior { get; } + public RetryBehavior Default5xxBehavior { get; } + public RetryBehavior UnknownCodeBehavior { get; } + public Dictionary StatusCodeOverrides { get; } + + public BackoffConfig( + bool enabled = false, + int maxRetryCount = 100, + double baseBackoffInterval = 0.5, + int maxBackoffInterval = 300, + long maxTotalBackoffDuration = 43200, + int jitterPercent = 10, + RetryBehavior default4xxBehavior = RetryBehavior.Drop, + RetryBehavior default5xxBehavior = RetryBehavior.Retry, + RetryBehavior unknownCodeBehavior = RetryBehavior.Drop, + Dictionary statusCodeOverrides = null) + { + Enabled = enabled; + MaxRetryCount = maxRetryCount; + BaseBackoffInterval = baseBackoffInterval; + MaxBackoffInterval = maxBackoffInterval; + MaxTotalBackoffDuration = maxTotalBackoffDuration; + JitterPercent = jitterPercent; + Default4xxBehavior = default4xxBehavior; + Default5xxBehavior = default5xxBehavior; + UnknownCodeBehavior = unknownCodeBehavior; + StatusCodeOverrides = statusCodeOverrides ?? DefaultStatusCodeOverrides; + } + + public BackoffConfig Validated() => new BackoffConfig( + enabled: Enabled, + maxRetryCount: Math.Max(0, Math.Min(MaxRetryCount, 1000)), + baseBackoffInterval: Math.Max(0.1, Math.Min(BaseBackoffInterval, 60.0)), + maxBackoffInterval: Math.Max(1, Math.Min(MaxBackoffInterval, 3600)), + maxTotalBackoffDuration: Math.Max(0, Math.Min(MaxTotalBackoffDuration, 604800)), + jitterPercent: Math.Max(0, Math.Min(JitterPercent, 50)), + default4xxBehavior: Default4xxBehavior, + default5xxBehavior: Default5xxBehavior, + unknownCodeBehavior: UnknownCodeBehavior, + statusCodeOverrides: ValidateOverrides(StatusCodeOverrides) + ); + + private static Dictionary ValidateOverrides( + Dictionary overrides) + { + var result = new Dictionary(); + foreach (var kvp in overrides) + { + if (kvp.Key >= 100 && kvp.Key <= 599) + result[kvp.Key] = kvp.Value; + } + return result; + } + + private static readonly Dictionary DefaultStatusCodeOverrides = + new Dictionary + { + { 408, RetryBehavior.Retry }, + { 410, RetryBehavior.Retry }, + { 429, RetryBehavior.Retry }, + { 460, RetryBehavior.Retry }, + { 501, RetryBehavior.Drop }, + { 505, RetryBehavior.Drop } + }; + } + + internal class RetryConfig + { + public RateLimitConfig RateLimitConfig { get; } + public BackoffConfig BackoffConfig { get; } + + public RetryConfig(RateLimitConfig rateLimitConfig = null, BackoffConfig backoffConfig = null) + { + RateLimitConfig = rateLimitConfig ?? new RateLimitConfig(); + BackoffConfig = backoffConfig ?? new BackoffConfig(); + } + } + + internal class HttpConfig + { + public RateLimitConfig RateLimitConfig { get; } + public BackoffConfig BackoffConfig { get; } + + public HttpConfig(RateLimitConfig rateLimitConfig = null, BackoffConfig backoffConfig = null) + { + RateLimitConfig = rateLimitConfig ?? new RateLimitConfig(); + BackoffConfig = backoffConfig ?? new BackoffConfig(); + } + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryState.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryState.cs new file mode 100644 index 0000000..f56b6a0 --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryState.cs @@ -0,0 +1,93 @@ +using System.Collections.Generic; +using System.Linq; + +namespace Segment.Analytics.Retry +{ + internal class BatchMetadata + { + public int FailureCount { get; } + public long? NextRetryTime { get; } + public long? FirstFailureTime { get; } + + public BatchMetadata(int failureCount = 0, long? nextRetryTime = null, long? firstFailureTime = null) + { + FailureCount = failureCount; + NextRetryTime = nextRetryTime; + FirstFailureTime = firstFailureTime; + } + + public bool ShouldRetry(long currentTime) + { + if (NextRetryTime == null) return true; + return currentTime >= NextRetryTime.Value; + } + + public bool ExceedsMaxDuration(long currentTime, long maxDurationMs) + { + if (FirstFailureTime == null) return false; + return (currentTime - FirstFailureTime.Value) > maxDurationMs; + } + } + + internal class RetryState + { + public PipelineState PipelineState { get; } + public long? WaitUntilTime { get; } + public int GlobalRetryCount { get; } + public Dictionary BatchMetadata { get; } + + private static readonly Dictionary s_emptyMetadata = + new Dictionary(); + + public RetryState( + PipelineState pipelineState = PipelineState.Ready, + long? waitUntilTime = null, + int globalRetryCount = 0, + Dictionary batchMetadata = null) + { + PipelineState = pipelineState; + WaitUntilTime = waitUntilTime; + GlobalRetryCount = globalRetryCount; + BatchMetadata = batchMetadata ?? s_emptyMetadata; + } + + public bool IsRateLimited(long currentTime) + { + return PipelineState == PipelineState.RateLimited + && WaitUntilTime != null + && currentTime < WaitUntilTime.Value; + } + + public RetryState With( + PipelineState? pipelineState = null, + long? waitUntilTime = null, + bool clearWaitUntilTime = false, + int? globalRetryCount = null, + Dictionary batchMetadata = null) + { + return new RetryState( + pipelineState: pipelineState ?? PipelineState, + waitUntilTime: clearWaitUntilTime ? null : (waitUntilTime ?? WaitUntilTime), + globalRetryCount: globalRetryCount ?? GlobalRetryCount, + batchMetadata: batchMetadata ?? BatchMetadata + ); + } + + public RetryState RemoveBatch(string batchFile) + { + if (!BatchMetadata.ContainsKey(batchFile)) + return this; + + var newMetadata = BatchMetadata.Where(kvp => kvp.Key != batchFile) + .ToDictionary(kvp => kvp.Key, kvp => kvp.Value); + return With(batchMetadata: newMetadata); + } + + public RetryState SetBatchMetadata(string batchFile, BatchMetadata metadata) + { + var newMetadata = new Dictionary(BatchMetadata); + newMetadata[batchFile] = metadata; + return With(batchMetadata: newMetadata); + } + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateMachine.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateMachine.cs new file mode 100644 index 0000000..3538bc8 --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateMachine.cs @@ -0,0 +1,226 @@ +using System; +using System.Collections.Generic; + +namespace Segment.Analytics.Retry +{ + internal class RetryStateMachine + { + private readonly RetryConfig _config; + private readonly ITimeProvider _timeProvider; + private readonly Random _random; + + public bool IsLegacyMode => !_config.RateLimitConfig.Enabled && !_config.BackoffConfig.Enabled; + + public RetryStateMachine(RetryConfig config, ITimeProvider timeProvider = null, Random random = null) + { + _config = config ?? new RetryConfig(); + _timeProvider = timeProvider ?? new SystemTimeProvider(); + _random = random ?? new Random(); + } + + public RetryState HandleResponse(RetryState state, ResponseInfo response) + { + if (IsLegacyMode) + { + if (response.StatusCode >= 200 && response.StatusCode <= 299) + return state.RemoveBatch(response.BatchFile); + if (response.StatusCode == 429 || (response.StatusCode >= 500 && response.StatusCode <= 599)) + return state; // Keep + return state.RemoveBatch(response.BatchFile); // Drop on 4xx + } + + long currentTime = response.CurrentTime; + + if (response.StatusCode >= 200 && response.StatusCode <= 299) + { + return state.With( + pipelineState: PipelineState.Ready, + clearWaitUntilTime: true, + globalRetryCount: 0, + batchMetadata: RemoveFromMetadata(state, response.BatchFile) + ); + } + + if (response.StatusCode == 429) + { + if (_config.RateLimitConfig.Enabled) + return HandleRateLimitResponse(state, response, currentTime); + return state.RemoveBatch(response.BatchFile); + } + + RetryBehavior behavior = ResolveStatusCodeBehavior(response.StatusCode); + if (behavior == RetryBehavior.Retry && _config.BackoffConfig.Enabled) + return HandleRetryableError(state, response, currentTime); + + return state.RemoveBatch(response.BatchFile); + } + + public Tuple ShouldUploadBatch(RetryState state, string batchFile) + { + if (IsLegacyMode) + return Tuple.Create(UploadDecision.Proceed, state); + + long currentTime = _timeProvider.CurrentTimeMillis(); + + // Check 1: Global rate limiting + if (state.IsRateLimited(currentTime)) + return Tuple.Create(UploadDecision.SkipAllBatches, state); + + // Clear stale rate limit state if it has expired + RetryState clearedState = state; + if (state.PipelineState == PipelineState.RateLimited + && state.WaitUntilTime != null + && currentTime >= state.WaitUntilTime.Value) + { + clearedState = state.With( + pipelineState: PipelineState.Ready, + clearWaitUntilTime: true + ); + } + + // Check 2: Global rate limit retry count + if (_config.RateLimitConfig.Enabled + && clearedState.GlobalRetryCount >= _config.RateLimitConfig.MaxRetryCount) + { + RetryState resetState = clearedState + .With(globalRetryCount: 0) + .RemoveBatch(batchFile); + return Tuple.Create( + UploadDecision.DropBatch(DropReason.MaxRetriesExceeded), + resetState); + } + + // Check 3: Per-batch metadata + BatchMetadata metadata; + if (clearedState.BatchMetadata.TryGetValue(batchFile, out metadata)) + { + // Check retry count limit + if (_config.BackoffConfig.Enabled + && metadata.FailureCount >= _config.BackoffConfig.MaxRetryCount) + { + return Tuple.Create( + UploadDecision.DropBatch(DropReason.MaxRetriesExceeded), + clearedState.RemoveBatch(batchFile)); + } + + // Check duration limit + if (_config.BackoffConfig.Enabled + && metadata.ExceedsMaxDuration(currentTime, _config.BackoffConfig.MaxTotalBackoffDuration * 1000)) + { + return Tuple.Create( + UploadDecision.DropBatch(DropReason.MaxDurationExceeded), + clearedState.RemoveBatch(batchFile)); + } + + // Check if backoff time has passed + if (_config.BackoffConfig.Enabled && !metadata.ShouldRetry(currentTime)) + { + return Tuple.Create(UploadDecision.SkipThisBatch, clearedState); + } + } + + return Tuple.Create(UploadDecision.Proceed, clearedState); + } + + public int GetRetryCount(RetryState state, string batchFile) + { + BatchMetadata metadata; + int batchRetryCount = state.BatchMetadata.TryGetValue(batchFile, out metadata) + ? metadata.FailureCount + : 0; + return Math.Max(batchRetryCount, state.GlobalRetryCount); + } + + public bool ShouldDeleteBatch(int statusCode) + { + if (IsLegacyMode) + return statusCode >= 400 && statusCode <= 499 && statusCode != 429; + + if (statusCode >= 200 && statusCode <= 299) + return true; + + if (statusCode == 429) + return !_config.RateLimitConfig.Enabled; + + RetryBehavior behavior = ResolveStatusCodeBehavior(statusCode); + if (behavior == RetryBehavior.Retry && !_config.BackoffConfig.Enabled) + return true; + + return behavior == RetryBehavior.Drop; + } + + private RetryState HandleRateLimitResponse(RetryState state, ResponseInfo response, long currentTime) + { + long waitUntilTimeMs = CalculateWaitUntilTimeMs(response.RetryAfterSeconds, currentTime); + return state.With( + pipelineState: PipelineState.RateLimited, + waitUntilTime: waitUntilTimeMs, + globalRetryCount: state.GlobalRetryCount + 1 + ); + } + + private long CalculateWaitUntilTimeMs(int? retryAfterSeconds, long currentTime) + { + int seconds = retryAfterSeconds.HasValue + ? Math.Max(retryAfterSeconds.Value, 0) + : _config.RateLimitConfig.MaxRetryInterval; + int clampedSeconds = Math.Min(seconds, _config.RateLimitConfig.MaxRetryInterval); + return currentTime + (clampedSeconds * 1000L); + } + + private RetryState HandleRetryableError(RetryState state, ResponseInfo response, long currentTime) + { + BatchMetadata existing; + state.BatchMetadata.TryGetValue(response.BatchFile, out existing); + + int newFailureCount = (existing?.FailureCount ?? 0) + 1; + long firstFailureTime = existing?.FirstFailureTime ?? currentTime; + long nextRetryTime = currentTime + CalculateBackoffMs(newFailureCount); + + var newMetadata = new BatchMetadata( + failureCount: newFailureCount, + nextRetryTime: nextRetryTime, + firstFailureTime: firstFailureTime + ); + + return state.SetBatchMetadata(response.BatchFile, newMetadata); + } + + private long CalculateBackoffMs(int failureCount) + { + double baseMs = _config.BackoffConfig.BaseBackoffInterval * 1000; + long maxMs = _config.BackoffConfig.MaxBackoffInterval * 1000L; + + double exponentialBackoff = baseMs * Math.Pow(2.0, failureCount - 1); + double cappedBackoff = Math.Min(exponentialBackoff, maxMs); + + double jitterAmount = cappedBackoff * (_config.BackoffConfig.JitterPercent / 100.0); + double jitter = _random.NextDouble() * jitterAmount; + + return (long)Math.Min(cappedBackoff + jitter, maxMs); + } + + private RetryBehavior ResolveStatusCodeBehavior(int code) + { + RetryBehavior overrideBehavior; + if (_config.BackoffConfig.StatusCodeOverrides.TryGetValue(code, out overrideBehavior)) + return overrideBehavior; + + if (code >= 400 && code <= 499) + return _config.BackoffConfig.Default4xxBehavior; + if (code >= 500 && code <= 599) + return _config.BackoffConfig.Default5xxBehavior; + return _config.BackoffConfig.UnknownCodeBehavior; + } + + private static Dictionary RemoveFromMetadata(RetryState state, string batchFile) + { + if (!state.BatchMetadata.ContainsKey(batchFile)) + return state.BatchMetadata; + + var newMetadata = new Dictionary(state.BatchMetadata); + newMetadata.Remove(batchFile); + return newMetadata; + } + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs new file mode 100644 index 0000000..f646b65 --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs @@ -0,0 +1,220 @@ +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Text; +using Segment.Analytics.Utilities; + +namespace Segment.Analytics.Retry +{ + internal static class RetryStateStorage + { + public static void SaveRetryState(IStorage storage, RetryState state) + { + try + { + string json = SerializeState(state); + storage.WritePrefs(StorageConstants.RetryState, json); + } + catch (Exception) + { + // Defensive: never crash on serialization failure + } + } + + public static RetryState LoadRetryState(IStorage storage) + { + try + { + string json = storage.Read(StorageConstants.RetryState); + if (string.IsNullOrEmpty(json)) + return new RetryState(); + return DeserializeState(json); + } + catch (Exception) + { + return new RetryState(); + } + } + + public static void ClearRetryState(IStorage storage) + { + storage.Remove(StorageConstants.RetryState); + } + + private static string SerializeState(RetryState state) + { + var sb = new StringBuilder(); + sb.Append("{"); + sb.Append("\"pipelineState\":\"").Append((int)state.PipelineState).Append("\""); + if (state.WaitUntilTime.HasValue) + sb.Append(",\"waitUntilTime\":\"").Append(state.WaitUntilTime.Value.ToString(CultureInfo.InvariantCulture)).Append("\""); + sb.Append(",\"globalRetryCount\":\"").Append(state.GlobalRetryCount.ToString(CultureInfo.InvariantCulture)).Append("\""); + + if (state.BatchMetadata.Count > 0) + { + sb.Append(",\"batchMetadata\":{"); + bool first = true; + foreach (var kvp in state.BatchMetadata) + { + if (!first) sb.Append(","); + first = false; + sb.Append("\"").Append(EscapeJsonString(kvp.Key)).Append("\":{"); + sb.Append("\"failureCount\":\"").Append(kvp.Value.FailureCount.ToString(CultureInfo.InvariantCulture)).Append("\""); + if (kvp.Value.NextRetryTime.HasValue) + sb.Append(",\"nextRetryTime\":\"").Append(kvp.Value.NextRetryTime.Value.ToString(CultureInfo.InvariantCulture)).Append("\""); + if (kvp.Value.FirstFailureTime.HasValue) + sb.Append(",\"firstFailureTime\":\"").Append(kvp.Value.FirstFailureTime.Value.ToString(CultureInfo.InvariantCulture)).Append("\""); + sb.Append("}"); + } + sb.Append("}"); + } + + sb.Append("}"); + return sb.ToString(); + } + + private static string EscapeJsonString(string s) + { + return s.Replace("\\", "\\\\").Replace("\"", "\\\""); + } + + private static RetryState DeserializeState(string json) + { + // Manual parsing to avoid Serialization.NET's numeric string coercion. + // Format is well-defined since we control serialization. + var fields = ParseJsonFields(json); + + PipelineState pipelineState = PipelineState.Ready; + if (fields.TryGetValue("pipelineState", out string psVal) + && int.TryParse(psVal, NumberStyles.Integer, CultureInfo.InvariantCulture, out int psInt) + && psInt == 1) + pipelineState = PipelineState.RateLimited; + + long? waitUntilTime = null; + if (fields.TryGetValue("waitUntilTime", out string waitStr) + && long.TryParse(waitStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out long waitVal)) + waitUntilTime = waitVal; + + int globalRetryCount = 0; + if (fields.TryGetValue("globalRetryCount", out string grcStr) + && int.TryParse(grcStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out int grcVal)) + globalRetryCount = grcVal; + + var batchMetadata = new Dictionary(); + int bmStart = json.IndexOf("\"batchMetadata\":{", StringComparison.Ordinal); + if (bmStart >= 0) + { + int objStart = json.IndexOf('{', bmStart + 16); + string bmJson = ExtractBalancedBraces(json, objStart); + if (bmJson != null) + batchMetadata = ParseBatchMetadata(bmJson); + } + + return new RetryState(pipelineState, waitUntilTime, globalRetryCount, batchMetadata); + } + + private static Dictionary ParseJsonFields(string json) + { + var result = new Dictionary(); + int i = 0; + while (i < json.Length) + { + int keyStart = json.IndexOf('"', i); + if (keyStart < 0) break; + int keyEnd = json.IndexOf('"', keyStart + 1); + if (keyEnd < 0) break; + string key = json.Substring(keyStart + 1, keyEnd - keyStart - 1); + + int colonIdx = json.IndexOf(':', keyEnd + 1); + if (colonIdx < 0) break; + + int valStart = colonIdx + 1; + while (valStart < json.Length && json[valStart] == ' ') valStart++; + + if (valStart >= json.Length) break; + + if (json[valStart] == '{') + { + // Skip nested objects + i = SkipBraces(json, valStart) + 1; + continue; + } + + if (json[valStart] == '"') + { + int valEnd = json.IndexOf('"', valStart + 1); + if (valEnd < 0) break; + result[key] = json.Substring(valStart + 1, valEnd - valStart - 1); + i = valEnd + 1; + } + else + { + int valEnd = valStart; + while (valEnd < json.Length && json[valEnd] != ',' && json[valEnd] != '}') + valEnd++; + result[key] = json.Substring(valStart, valEnd - valStart).Trim(); + i = valEnd; + } + } + return result; + } + + private static int SkipBraces(string json, int start) + { + int depth = 0; + for (int i = start; i < json.Length; i++) + { + if (json[i] == '{') depth++; + else if (json[i] == '}') { depth--; if (depth == 0) return i; } + } + return json.Length - 1; + } + + private static string ExtractBalancedBraces(string json, int start) + { + if (start < 0 || start >= json.Length || json[start] != '{') + return null; + int end = SkipBraces(json, start); + return json.Substring(start, end - start + 1); + } + + private static Dictionary ParseBatchMetadata(string json) + { + var result = new Dictionary(); + int i = 1; // skip opening { + while (i < json.Length) + { + int keyStart = json.IndexOf('"', i); + if (keyStart < 0) break; + int keyEnd = json.IndexOf('"', keyStart + 1); + if (keyEnd < 0) break; + string batchFile = json.Substring(keyStart + 1, keyEnd - keyStart - 1); + + int objStart = json.IndexOf('{', keyEnd + 1); + if (objStart < 0) break; + string metaJson = ExtractBalancedBraces(json, objStart); + if (metaJson == null) break; + + var fields = ParseJsonFields(metaJson); + + int failureCount = 0; + if (fields.TryGetValue("failureCount", out string fcStr)) + int.TryParse(fcStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out failureCount); + + long? nextRetryTime = null; + if (fields.TryGetValue("nextRetryTime", out string nrtStr) + && long.TryParse(nrtStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out long nrtVal)) + nextRetryTime = nrtVal; + + long? firstFailureTime = null; + if (fields.TryGetValue("firstFailureTime", out string fftStr) + && long.TryParse(fftStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out long fftVal)) + firstFailureTime = fftVal; + + result[batchFile] = new BatchMetadata(failureCount, nextRetryTime, firstFailureTime); + i = objStart + metaJson.Length + 1; + } + return result; + } + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryTypes.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryTypes.cs new file mode 100644 index 0000000..7a87348 --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryTypes.cs @@ -0,0 +1,58 @@ +namespace Segment.Analytics.Retry +{ + internal enum PipelineState + { + Ready, + RateLimited + } + + internal enum RetryBehavior + { + Retry, + Drop + } + + internal enum DropReason + { + MaxRetriesExceeded, + MaxDurationExceeded, + NonRetryableError + } + + internal abstract class UploadDecision + { + public static readonly UploadDecision Proceed = new ProceedDecision(); + public static readonly UploadDecision SkipThisBatch = new SkipThisBatchDecision(); + public static readonly UploadDecision SkipAllBatches = new SkipAllBatchesDecision(); + + public static UploadDecision DropBatch(DropReason reason) => new DropBatchDecision(reason); + + private UploadDecision() { } + + internal sealed class ProceedDecision : UploadDecision { } + internal sealed class SkipThisBatchDecision : UploadDecision { } + internal sealed class SkipAllBatchesDecision : UploadDecision { } + + internal sealed class DropBatchDecision : UploadDecision + { + public DropReason Reason { get; } + public DropBatchDecision(DropReason reason) { Reason = reason; } + } + } + + internal class ResponseInfo + { + public int StatusCode { get; } + public int? RetryAfterSeconds { get; } + public string BatchFile { get; } + public long CurrentTime { get; } + + public ResponseInfo(int statusCode, int? retryAfterSeconds, string batchFile, long currentTime) + { + StatusCode = statusCode; + RetryAfterSeconds = retryAfterSeconds; + BatchFile = batchFile; + CurrentTime = currentTime; + } + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Retry/TimeProvider.cs b/Analytics-CSharp/Segment/Analytics/Retry/TimeProvider.cs new file mode 100644 index 0000000..bdfecca --- /dev/null +++ b/Analytics-CSharp/Segment/Analytics/Retry/TimeProvider.cs @@ -0,0 +1,14 @@ +using System; + +namespace Segment.Analytics.Retry +{ + internal interface ITimeProvider + { + long CurrentTimeMillis(); + } + + internal class SystemTimeProvider : ITimeProvider + { + public long CurrentTimeMillis() => DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + } +} diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs index 21fe7ce..e390bae 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs @@ -2,6 +2,7 @@ using global::System; using global::System.Linq; using Segment.Analytics.Policies; +using Segment.Analytics.Retry; using Segment.Concurrent; using Segment.Serialization; @@ -19,10 +20,14 @@ public class EventPipeline: IEventPipeline private Channel _uploadChannel; - private readonly HTTPClient _httpClient; + internal readonly HTTPClient _httpClient; private readonly IStorage _storage; + internal RetryStateMachine _retryStateMachine; + + private RetryState _retryState; + public string ApiHost { get; set; } public bool Running { get; private set; } @@ -39,6 +44,15 @@ public EventPipeline( string apiKey, IList flushPolicies, string apiHost = HTTPClient.DefaultAPIHost) + : this(analytics, logTag, apiKey, flushPolicies, apiHost, (HttpConfig)null) { } + + internal EventPipeline( + Analytics analytics, + string logTag, + string apiKey, + IList flushPolicies, + string apiHost, + HttpConfig httpConfig) { _analytics = analytics; _logTag = logTag; @@ -51,6 +65,20 @@ public EventPipeline( _httpClient.AnalyticsRef = analytics; _storage = analytics.Storage; Running = false; + + var retryConfig = httpConfig != null + ? new RetryConfig(httpConfig.RateLimitConfig, httpConfig.BackoffConfig) + : new RetryConfig(); + _retryStateMachine = new RetryStateMachine(retryConfig); + _retryState = RetryStateStorage.LoadRetryState(_storage); + } + + internal void UpdateHttpConfig(HttpConfig config) + { + var retryConfig = config != null + ? new RetryConfig(config.RateLimitConfig, config.BackoffConfig) + : new RetryConfig(); + _retryStateMachine = new RetryStateMachine(retryConfig); } public void Put(RawEvent @event) => _writeChannel.Send(@event); @@ -134,27 +162,85 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi foreach (string url in fileUrlList) { if (string.IsNullOrEmpty(url)) + continue; + + var decision = _retryStateMachine.ShouldUploadBatch(_retryState, url); + _retryState = decision.Item2; + + if (decision.Item1 is UploadDecision.SkipAllBatchesDecision) + { + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping uploads: pipeline is rate-limited"); + break; + } + if (decision.Item1 is UploadDecision.SkipThisBatchDecision) { + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping batch " + url + ": not ready for retry"); + continue; + } + if (decision.Item1 is UploadDecision.DropBatchDecision dropDecision) + { + Analytics.Logger.Log(LogLevel.Error, message: _logTag + " dropping batch " + url + ": " + dropDecision.Reason); + _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, + message: "Batch dropped: " + dropDecision.Reason); + _storage.RemoveFile(url); + RetryStateStorage.SaveRetryState(_storage, _retryState); continue; } + // Proceed with upload byte[] data = _storage.ReadAsBytes(url); if (data == null) - { continue; - } + int retryCount = _retryStateMachine.GetRetryCount(_retryState, url); + int statusCode = 0; + int? retryAfterSeconds = null; bool shouldCleanup = true; + try { - shouldCleanup = await _httpClient.Upload(data); - Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url); + HTTPClient.Response response = await _httpClient.UploadWithResponse(data, retryCount); + statusCode = response.StatusCode; + + if (!string.IsNullOrEmpty(response.RetryAfterHeader) + && int.TryParse(response.RetryAfterHeader.Trim(), out int parsedRetryAfter)) + { + retryAfterSeconds = parsedRetryAfter; + } + + if (response.IsSuccessStatusCode) + { + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url); + shouldCleanup = true; + } + else + { + Analytics.Logger.Log(LogLevel.Error, message: "Error " + statusCode + " uploading " + url); + shouldCleanup = _retryStateMachine.ShouldDeleteBatch(statusCode); + if (shouldCleanup) + { + _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, + message: "HTTP " + statusCode + ": batch rejected by server"); + } + } } catch (Exception e) { Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url"); + statusCode = 0; + shouldCleanup = false; } + // Update retry state based on response + var responseInfo = new ResponseInfo( + statusCode: statusCode > 0 ? statusCode : 500, + retryAfterSeconds: retryAfterSeconds, + batchFile: url, + currentTime: DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + ); + _retryState = _retryStateMachine.HandleResponse(_retryState, responseInfo); + RetryStateStorage.SaveRetryState(_storage, _retryState); + if (shouldCleanup) { _storage.RemoveFile(url); diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs b/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs index 552383b..85b955f 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/HTTPClient.cs @@ -1,10 +1,12 @@ using System; using System.IO; using System.IO.Compression; +using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Threading.Tasks; +using Segment.Analytics.Retry; using Segment.Serialization; namespace Segment.Analytics.Utilities @@ -105,22 +107,18 @@ public virtual async Task Upload(byte[] data) { Analytics.Logger.Log(LogLevel.Error, message: "Error " + response.StatusCode + " uploading to url"); - switch (response.StatusCode) + if (response.StatusCode == 429) { - case var n when n >= 1 && n < 300: - return false; - case var n when n >= 300 && n < 400: - AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkUnexpectedHttpCode, message: "Response code: " + n); - return false; - case 429: - AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerLimited, message: "Response code: 429"); - return false; - case var n when n >= 400 && n < 500: - AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, message: "Response code: " + n + ". Payloads were rejected by server. Marked for removal."); - return true; - default: - return false; + AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerLimited, message: "Response code: 429"); + return false; } + if (response.StatusCode >= 400 && response.StatusCode < 500) + { + AnalyticsRef?.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, message: "Response code: " + response.StatusCode + ". Payloads were rejected by server. Marked for removal."); + return true; + } + // 5xx and others — keep for retry + return false; } return true; @@ -133,6 +131,12 @@ public virtual async Task Upload(byte[] data) return false; } + internal virtual async Task UploadWithResponse(byte[] data, int retryCount = 0) + { + string uploadURL = SegmentURL(_apiHost, "/b"); + return await DoPost(uploadURL, data, retryCount); + } + /// /// Handle GET request /// @@ -148,6 +152,16 @@ public virtual async Task Upload(byte[] data) /// Awaitable response of the POST request public abstract Task DoPost(string url, byte[] data); + /// + /// Handle POST request with retry count for the X-Retry-Count header. + /// Default implementation calls DoPost(url, data) — override in subclasses + /// that support the retry count header. + /// + public virtual Task DoPost(string url, byte[] data, int retryCount) + { + return DoPost(url, data); + } + /// /// A wrapper class for http response, so that the HTTPClient is /// not dependent on a specific network library. @@ -164,6 +178,11 @@ public class Response /// public string Content { get; set; } + /// + /// Value of the Retry-After response header, or null if absent. + /// + public string RetryAfterHeader { get; set; } + /// /// A convenient method to check if the http request is successful /// @@ -201,7 +220,12 @@ public override async Task DoGet(string url) return result; } - public override async Task DoPost(string url, byte[] data) + public override Task DoPost(string url, byte[] data) + { + return DoPost(url, data, 0); + } + + public override async Task DoPost(string url, byte[] data, int retryCount) { using (MemoryStream ms = new MemoryStream()) { @@ -217,10 +241,21 @@ public override async Task DoPost(string url, byte[] data) var request = new HttpRequestMessage(HttpMethod.Post, url); request.Headers.Add("Connection", "close"); request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/plain")); + if (retryCount > 0) + request.Headers.Add("X-Retry-Count", retryCount.ToString()); request.Content = streamContent; HttpResponseMessage response = await _httpClient.SendAsync(request); - var result = new Response {StatusCode = (int)response.StatusCode}; + + string retryAfterHeader = null; + if (response.Headers.TryGetValues("Retry-After", out var values)) + retryAfterHeader = values.FirstOrDefault(); + + var result = new Response + { + StatusCode = (int)response.StatusCode, + RetryAfterHeader = retryAfterHeader + }; response.Dispose(); return result; diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs b/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs index a3ae9b9..b543a0e 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/Storage.cs @@ -29,12 +29,14 @@ public readonly struct StorageConstants public const string _AnonymousId = "segment.anonymousId"; public const string _Settings = "segment.settings"; public const string _Events = "segment.events"; + public const string _RetryState = "segment.retry.state"; // enum alternatives public static readonly StorageConstants UserId = new StorageConstants(_UserId); public static readonly StorageConstants Traits = new StorageConstants(_Traits); public static readonly StorageConstants AnonymousId = new StorageConstants(_AnonymousId); public static readonly StorageConstants Settings = new StorageConstants(_Settings); public static readonly StorageConstants Events = new StorageConstants(_Events); + public static readonly StorageConstants RetryState = new StorageConstants(_RetryState); } #endregion diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs index 4411e6f..ad2e1a8 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs @@ -4,6 +4,7 @@ using global::System; using global::System.Linq; using Segment.Analytics.Policies; +using Segment.Analytics.Retry; using Segment.Concurrent; using Segment.Serialization; @@ -33,10 +34,14 @@ public class SyncEventPipeline: IEventPipeline private Channel _uploadChannel; - private readonly HTTPClient _httpClient; + internal readonly HTTPClient _httpClient; private readonly IStorage _storage; + internal RetryStateMachine _retryStateMachine; + + private RetryState _retryState; + public string ApiHost { get; set; } public bool Running { get; private set; } @@ -52,6 +57,17 @@ public SyncEventPipeline( string apiHost = HTTPClient.DefaultAPIHost, int flushTimeout = -1, CancellationToken? flushCancellationToken = null) + : this(analytics, logTag, apiKey, flushPolicies, apiHost, flushTimeout, flushCancellationToken, null) { } + + internal SyncEventPipeline( + Analytics analytics, + string logTag, + string apiKey, + IList flushPolicies, + string apiHost, + int flushTimeout, + CancellationToken? flushCancellationToken, + HttpConfig httpConfig) { _analytics = analytics; _logTag = logTag; @@ -66,6 +82,20 @@ public SyncEventPipeline( Running = false; _flushTimeout = flushTimeout; _flushCancellationToken = flushCancellationToken ?? CancellationToken.None; + + var retryConfig = httpConfig != null + ? new RetryConfig(httpConfig.RateLimitConfig, httpConfig.BackoffConfig) + : new RetryConfig(); + _retryStateMachine = new RetryStateMachine(retryConfig); + _retryState = RetryStateStorage.LoadRetryState(_storage); + } + + internal void UpdateHttpConfig(HttpConfig config) + { + var retryConfig = config != null + ? new RetryConfig(config.RateLimitConfig, config.BackoffConfig) + : new RetryConfig(); + _retryStateMachine = new RetryStateMachine(retryConfig); } public void Put(RawEvent @event) => _writeChannel.Send(@event); @@ -77,7 +107,7 @@ public void Flush() { _writeChannel.Send(flushEvent); flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken); } - } + } public void Start() { @@ -157,27 +187,85 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi foreach (string url in fileUrlList) { if (string.IsNullOrEmpty(url)) + continue; + + var decision = _retryStateMachine.ShouldUploadBatch(_retryState, url); + _retryState = decision.Item2; + + if (decision.Item1 is UploadDecision.SkipAllBatchesDecision) + { + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping uploads: pipeline is rate-limited"); + break; + } + if (decision.Item1 is UploadDecision.SkipThisBatchDecision) { + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " skipping batch " + url + ": not ready for retry"); + continue; + } + if (decision.Item1 is UploadDecision.DropBatchDecision dropDecision) + { + Analytics.Logger.Log(LogLevel.Error, message: _logTag + " dropping batch " + url + ": " + dropDecision.Reason); + _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, + message: "Batch dropped: " + dropDecision.Reason); + _storage.RemoveFile(url); + RetryStateStorage.SaveRetryState(_storage, _retryState); continue; } + // Proceed with upload byte[] data = _storage.ReadAsBytes(url); if (data == null) - { continue; - } + int retryCount = _retryStateMachine.GetRetryCount(_retryState, url); + int statusCode = 0; + int? retryAfterSeconds = null; bool shouldCleanup = true; + try { - shouldCleanup = await _httpClient.Upload(data); - Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url); + HTTPClient.Response response = await _httpClient.UploadWithResponse(data, retryCount); + statusCode = response.StatusCode; + + if (!string.IsNullOrEmpty(response.RetryAfterHeader) + && int.TryParse(response.RetryAfterHeader.Trim(), out int parsedRetryAfter)) + { + retryAfterSeconds = parsedRetryAfter; + } + + if (response.IsSuccessStatusCode) + { + Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url); + shouldCleanup = true; + } + else + { + Analytics.Logger.Log(LogLevel.Error, message: "Error " + statusCode + " uploading " + url); + shouldCleanup = _retryStateMachine.ShouldDeleteBatch(statusCode); + if (shouldCleanup) + { + _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, + message: "HTTP " + statusCode + ": batch rejected by server"); + } + } } catch (Exception e) { Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url"); + statusCode = 0; + shouldCleanup = false; } + // Update retry state based on response + var responseInfo = new ResponseInfo( + statusCode: statusCode > 0 ? statusCode : 500, + retryAfterSeconds: retryAfterSeconds, + batchFile: url, + currentTime: DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + ); + _retryState = _retryStateMachine.HandleResponse(_retryState, responseInfo); + RetryStateStorage.SaveRetryState(_storage, _retryState); + if (shouldCleanup) { _storage.RemoveFile(url); diff --git a/Tests/Retry/HttpConfigParserTest.cs b/Tests/Retry/HttpConfigParserTest.cs new file mode 100644 index 0000000..89ff4cf --- /dev/null +++ b/Tests/Retry/HttpConfigParserTest.cs @@ -0,0 +1,107 @@ +using Segment.Analytics.Retry; +using Segment.Serialization; +using Xunit; + +namespace Tests.Retry +{ + public class HttpConfigParserTest + { + [Fact] + public void Parse_Null_ReturnsNull() + { + Assert.Null(HttpConfigParser.Parse(null)); + } + + [Fact] + public void Parse_EmptyObject_DefaultsEnabledTrue() + { + var json = JsonUtility.FromJson("{}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.NotNull(config); + Assert.True(config.RateLimitConfig.Enabled); + Assert.True(config.BackoffConfig.Enabled); + } + + [Fact] + public void Parse_ExplicitEnabled_Respected() + { + var json = JsonUtility.FromJson( + "{\"rateLimitConfig\":{\"enabled\":\"false\"},\"backoffConfig\":{\"enabled\":\"true\"}}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.False(config.RateLimitConfig.Enabled); + Assert.True(config.BackoffConfig.Enabled); + } + + [Fact] + public void Parse_BackoffConfig_ParsesValues() + { + var json = JsonUtility.FromJson( + "{\"backoffConfig\":{\"maxRetryCount\":\"5\",\"baseBackoffInterval\":\"1.0\",\"maxBackoffInterval\":\"60\"}}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.Equal(5, config.BackoffConfig.MaxRetryCount); + Assert.Equal(1.0, config.BackoffConfig.BaseBackoffInterval); + Assert.Equal(60, config.BackoffConfig.MaxBackoffInterval); + } + + [Fact] + public void Parse_RateLimitConfig_ParsesValues() + { + var json = JsonUtility.FromJson( + "{\"rateLimitConfig\":{\"maxRetryCount\":\"10\",\"maxRetryInterval\":\"120\"}}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.Equal(10, config.RateLimitConfig.MaxRetryCount); + Assert.Equal(120, config.RateLimitConfig.MaxRetryInterval); + } + + [Fact] + public void Parse_StatusCodeOverrides_Parsed() + { + var json = JsonUtility.FromJson( + "{\"backoffConfig\":{\"statusCodeOverrides\":{\"400\":\"retry\",\"500\":\"drop\"}}}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.Equal(RetryBehavior.Retry, config.BackoffConfig.StatusCodeOverrides[400]); + Assert.Equal(RetryBehavior.Drop, config.BackoffConfig.StatusCodeOverrides[500]); + } + + [Fact] + public void Parse_InvalidStatusCodeOverrides_Filtered() + { + var json = JsonUtility.FromJson( + "{\"backoffConfig\":{\"statusCodeOverrides\":{\"abc\":\"retry\",\"999\":\"retry\",\"200\":\"invalid\"}}}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.Empty(config.BackoffConfig.StatusCodeOverrides); + } + + [Fact] + public void Parse_ClampsValues() + { + var json = JsonUtility.FromJson( + "{\"rateLimitConfig\":{\"maxRetryCount\":\"9999\",\"maxRetryInterval\":\"99999\"}," + + "\"backoffConfig\":{\"baseBackoffInterval\":\"999\",\"maxBackoffInterval\":\"99999\"}}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.Equal(1000, config.RateLimitConfig.MaxRetryCount); + Assert.Equal(3600, config.RateLimitConfig.MaxRetryInterval); + Assert.Equal(60.0, config.BackoffConfig.BaseBackoffInterval); + Assert.Equal(3600, config.BackoffConfig.MaxBackoffInterval); + } + + [Fact] + public void Parse_PartialConfig_UsesDefaults() + { + var json = JsonUtility.FromJson( + "{\"backoffConfig\":{\"maxRetryCount\":\"50\"}}"); + HttpConfig config = HttpConfigParser.Parse(json); + + Assert.Equal(50, config.BackoffConfig.MaxRetryCount); + Assert.Equal(0.5, config.BackoffConfig.BaseBackoffInterval); // default + Assert.Equal(300, config.BackoffConfig.MaxBackoffInterval); // default + } + } +} diff --git a/Tests/Retry/RetryStateMachineTest.cs b/Tests/Retry/RetryStateMachineTest.cs new file mode 100644 index 0000000..69b0250 --- /dev/null +++ b/Tests/Retry/RetryStateMachineTest.cs @@ -0,0 +1,413 @@ +using System; +using Segment.Analytics.Retry; +using Xunit; + +namespace Tests.Retry +{ + public class FakeTimeProvider : ITimeProvider + { + public long Time { get; set; } + public long CurrentTimeMillis() => Time; + } + + public class RetryStateMachineTest + { + private RetryStateMachine CreateMachine( + bool rateLimitEnabled = true, + bool backoffEnabled = true, + int maxRetryCount = 100, + int maxRetryInterval = 300, + FakeTimeProvider timeProvider = null) + { + var config = new RetryConfig( + new RateLimitConfig(enabled: rateLimitEnabled, maxRetryCount: maxRetryCount, maxRetryInterval: maxRetryInterval), + new BackoffConfig(enabled: backoffEnabled, maxRetryCount: maxRetryCount) + ); + return new RetryStateMachine(config, timeProvider ?? new FakeTimeProvider(), new Random(42)); + } + + [Fact] + public void LegacyMode_BothDisabled() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + Assert.True(machine.IsLegacyMode); + } + + [Fact] + public void NotLegacyMode_WhenEitherEnabled() + { + var machine = CreateMachine(rateLimitEnabled: true, backoffEnabled: false); + Assert.False(machine.IsLegacyMode); + } + + // --- HandleResponse tests --- + + [Fact] + public void HandleResponse_Success_ClearsState() + { + var machine = CreateMachine(); + var state = new RetryState(globalRetryCount: 5); + var response = new ResponseInfo(200, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.Equal(PipelineState.Ready, newState.PipelineState); + Assert.Equal(0, newState.GlobalRetryCount); + Assert.Null(newState.WaitUntilTime); + Assert.False(newState.BatchMetadata.ContainsKey("batch1.json")); + } + + [Fact] + public void HandleResponse_429_SetsRateLimited() + { + var machine = CreateMachine(maxRetryInterval: 300); + var state = new RetryState(); + var response = new ResponseInfo(429, 60, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.Equal(PipelineState.RateLimited, newState.PipelineState); + Assert.Equal(1, newState.GlobalRetryCount); + Assert.Equal(61000L, newState.WaitUntilTime); // 1000 + 60*1000 + } + + [Fact] + public void HandleResponse_429_ClampsRetryAfter() + { + var machine = CreateMachine(maxRetryInterval: 10); + var state = new RetryState(); + var response = new ResponseInfo(429, 999, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + // Clamped to maxRetryInterval=10 + Assert.Equal(11000L, newState.WaitUntilTime); // 1000 + 10*1000 + } + + [Fact] + public void HandleResponse_429_NullRetryAfter_UsesMaxInterval() + { + var machine = CreateMachine(maxRetryInterval: 300); + var state = new RetryState(); + var response = new ResponseInfo(429, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.Equal(301000L, newState.WaitUntilTime); // 1000 + 300*1000 + } + + [Fact] + public void HandleResponse_429_RateLimitDisabled_DropsBatch() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: true); + var state = new RetryState( + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 1) } + }); + var response = new ResponseInfo(429, 60, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.False(newState.BatchMetadata.ContainsKey("batch1.json")); + } + + [Fact] + public void HandleResponse_500_TracksBackoff() + { + var machine = CreateMachine(); + var state = new RetryState(); + var response = new ResponseInfo(500, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.True(newState.BatchMetadata.ContainsKey("batch1.json")); + Assert.Equal(1, newState.BatchMetadata["batch1.json"].FailureCount); + Assert.Equal(1000L, newState.BatchMetadata["batch1.json"].FirstFailureTime); + Assert.NotNull(newState.BatchMetadata["batch1.json"].NextRetryTime); + } + + [Fact] + public void HandleResponse_500_IncreasesBackoff() + { + var machine = CreateMachine(); + var existing = new BatchMetadata(failureCount: 2, firstFailureTime: 0, nextRetryTime: 500); + var state = new RetryState( + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", existing } + }); + var response = new ResponseInfo(500, null, "batch1.json", 5000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.Equal(3, newState.BatchMetadata["batch1.json"].FailureCount); + Assert.True(newState.BatchMetadata["batch1.json"].NextRetryTime > 5000); + } + + [Fact] + public void HandleResponse_400_DropsBatch() + { + var machine = CreateMachine(); + var state = new RetryState( + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 1) } + }); + var response = new ResponseInfo(400, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.False(newState.BatchMetadata.ContainsKey("batch1.json")); + } + + [Fact] + public void HandleResponse_501_DropsBatch() + { + var machine = CreateMachine(); + var state = new RetryState(); + var response = new ResponseInfo(501, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.False(newState.BatchMetadata.ContainsKey("batch1.json")); + } + + [Fact] + public void HandleResponse_408_RetriesWithBackoff() + { + var machine = CreateMachine(); + var state = new RetryState(); + var response = new ResponseInfo(408, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.True(newState.BatchMetadata.ContainsKey("batch1.json")); + Assert.Equal(1, newState.BatchMetadata["batch1.json"].FailureCount); + } + + // --- Legacy mode tests --- + + [Fact] + public void HandleResponse_LegacyMode_429_KeepsBatch() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + var state = new RetryState(); + var response = new ResponseInfo(429, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.Same(state, newState); // unchanged + } + + [Fact] + public void HandleResponse_LegacyMode_500_KeepsBatch() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + var state = new RetryState(); + var response = new ResponseInfo(500, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.Same(state, newState); + } + + [Fact] + public void HandleResponse_LegacyMode_400_DropsBatch() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + var state = new RetryState( + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 1) } + }); + var response = new ResponseInfo(400, null, "batch1.json", 1000); + + RetryState newState = machine.HandleResponse(state, response); + + Assert.False(newState.BatchMetadata.ContainsKey("batch1.json")); + } + + // --- ShouldUploadBatch tests --- + + [Fact] + public void ShouldUploadBatch_LegacyMode_AlwaysProceeds() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + var state = new RetryState(); + + var result = machine.ShouldUploadBatch(state, "batch1.json"); + + Assert.IsType(result.Item1); + } + + [Fact] + public void ShouldUploadBatch_RateLimited_SkipsAll() + { + var tp = new FakeTimeProvider { Time = 5000 }; + var machine = CreateMachine(timeProvider: tp); + var state = new RetryState( + pipelineState: PipelineState.RateLimited, + waitUntilTime: 10000); + + var result = machine.ShouldUploadBatch(state, "batch1.json"); + + Assert.IsType(result.Item1); + } + + [Fact] + public void ShouldUploadBatch_RateLimitExpired_Proceeds() + { + var tp = new FakeTimeProvider { Time = 15000 }; + var machine = CreateMachine(timeProvider: tp); + var state = new RetryState( + pipelineState: PipelineState.RateLimited, + waitUntilTime: 10000); + + var result = machine.ShouldUploadBatch(state, "batch1.json"); + + Assert.IsType(result.Item1); + Assert.Equal(PipelineState.Ready, result.Item2.PipelineState); + } + + [Fact] + public void ShouldUploadBatch_MaxRetriesExceeded_DropsBatch() + { + var tp = new FakeTimeProvider { Time = 1000 }; + var machine = CreateMachine(maxRetryCount: 3, timeProvider: tp); + var state = new RetryState( + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 3, nextRetryTime: 0, firstFailureTime: 0) } + }); + + var result = machine.ShouldUploadBatch(state, "batch1.json"); + + Assert.IsType(result.Item1); + Assert.Equal(DropReason.MaxRetriesExceeded, + ((UploadDecision.DropBatchDecision)result.Item1).Reason); + } + + [Fact] + public void ShouldUploadBatch_BackoffNotReady_Skips() + { + var tp = new FakeTimeProvider { Time = 1000 }; + var machine = CreateMachine(timeProvider: tp); + var state = new RetryState( + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 1, nextRetryTime: 5000, firstFailureTime: 0) } + }); + + var result = machine.ShouldUploadBatch(state, "batch1.json"); + + Assert.IsType(result.Item1); + } + + [Fact] + public void ShouldUploadBatch_BackoffReady_Proceeds() + { + var tp = new FakeTimeProvider { Time = 6000 }; + var machine = CreateMachine(timeProvider: tp); + var state = new RetryState( + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 1, nextRetryTime: 5000, firstFailureTime: 0) } + }); + + var result = machine.ShouldUploadBatch(state, "batch1.json"); + + Assert.IsType(result.Item1); + } + + // --- ShouldDeleteBatch tests --- + + [Fact] + public void ShouldDeleteBatch_LegacyMode_400_True() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + Assert.True(machine.ShouldDeleteBatch(400)); + } + + [Fact] + public void ShouldDeleteBatch_LegacyMode_429_False() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + Assert.False(machine.ShouldDeleteBatch(429)); + } + + [Fact] + public void ShouldDeleteBatch_LegacyMode_500_False() + { + var machine = CreateMachine(rateLimitEnabled: false, backoffEnabled: false); + Assert.False(machine.ShouldDeleteBatch(500)); + } + + [Fact] + public void ShouldDeleteBatch_SmartMode_400_True() + { + var machine = CreateMachine(); + Assert.True(machine.ShouldDeleteBatch(400)); + } + + [Fact] + public void ShouldDeleteBatch_SmartMode_500_False() + { + var machine = CreateMachine(); + Assert.False(machine.ShouldDeleteBatch(500)); + } + + [Fact] + public void ShouldDeleteBatch_SmartMode_501_True() + { + var machine = CreateMachine(); + Assert.True(machine.ShouldDeleteBatch(501)); + } + + [Fact] + public void ShouldDeleteBatch_SmartMode_408_False() + { + var machine = CreateMachine(); + Assert.False(machine.ShouldDeleteBatch(408)); + } + + // --- GetRetryCount tests --- + + [Fact] + public void GetRetryCount_NoMetadata_ReturnsZero() + { + var machine = CreateMachine(); + var state = new RetryState(); + + Assert.Equal(0, machine.GetRetryCount(state, "batch1.json")); + } + + [Fact] + public void GetRetryCount_WithMetadata_ReturnsMax() + { + var machine = CreateMachine(); + var state = new RetryState( + globalRetryCount: 2, + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 5) } + }); + + Assert.Equal(5, machine.GetRetryCount(state, "batch1.json")); + } + + [Fact] + public void GetRetryCount_GlobalHigher_ReturnsGlobal() + { + var machine = CreateMachine(); + var state = new RetryState( + globalRetryCount: 10, + batchMetadata: new System.Collections.Generic.Dictionary + { + { "batch1.json", new BatchMetadata(failureCount: 2) } + }); + + Assert.Equal(10, machine.GetRetryCount(state, "batch1.json")); + } + } +} diff --git a/Tests/Retry/RetryStateStorageTest.cs b/Tests/Retry/RetryStateStorageTest.cs new file mode 100644 index 0000000..187fab6 --- /dev/null +++ b/Tests/Retry/RetryStateStorageTest.cs @@ -0,0 +1,113 @@ +using System.Collections.Generic; +using Moq; +using Segment.Analytics.Retry; +using Segment.Analytics.Utilities; +using Xunit; + +namespace Tests.Retry +{ + public class RetryStateStorageTest + { + private readonly Mock _storage; + private string _savedValue; + + public RetryStateStorageTest() + { + _storage = new Mock(); + _storage + .Setup(s => s.WritePrefs(StorageConstants.RetryState, It.IsAny())) + .Callback((_, value) => _savedValue = value); + _storage + .Setup(s => s.Read(StorageConstants.RetryState)) + .Returns(() => _savedValue); + } + + [Fact] + public void RoundTrip_DefaultState() + { + var state = new RetryState(); + RetryStateStorage.SaveRetryState(_storage.Object, state); + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(PipelineState.Ready, loaded.PipelineState); + Assert.Null(loaded.WaitUntilTime); + Assert.Equal(0, loaded.GlobalRetryCount); + Assert.Empty(loaded.BatchMetadata); + } + + [Fact] + public void RoundTrip_RateLimitedState() + { + var state = new RetryState( + pipelineState: PipelineState.RateLimited, + waitUntilTime: 123456789L, + globalRetryCount: 3); + + RetryStateStorage.SaveRetryState(_storage.Object, state); + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(PipelineState.RateLimited, loaded.PipelineState); + Assert.Equal(123456789L, loaded.WaitUntilTime); + Assert.Equal(3, loaded.GlobalRetryCount); + } + + [Fact] + public void RoundTrip_WithBatchMetadata() + { + var metadata = new Dictionary + { + { "file1.json", new BatchMetadata(failureCount: 2, nextRetryTime: 5000, firstFailureTime: 1000) }, + { "file2.json", new BatchMetadata(failureCount: 1, nextRetryTime: 3000, firstFailureTime: 2000) } + }; + var state = new RetryState(batchMetadata: metadata); + + RetryStateStorage.SaveRetryState(_storage.Object, state); + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(2, loaded.BatchMetadata.Count); + Assert.Equal(2, loaded.BatchMetadata["file1.json"].FailureCount); + Assert.Equal(5000L, loaded.BatchMetadata["file1.json"].NextRetryTime); + Assert.Equal(1000L, loaded.BatchMetadata["file1.json"].FirstFailureTime); + Assert.Equal(1, loaded.BatchMetadata["file2.json"].FailureCount); + } + + [Fact] + public void LoadRetryState_NullStorage_ReturnsDefault() + { + _storage.Setup(s => s.Read(StorageConstants.RetryState)).Returns((string)null); + + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(PipelineState.Ready, loaded.PipelineState); + Assert.Equal(0, loaded.GlobalRetryCount); + } + + [Fact] + public void LoadRetryState_EmptyString_ReturnsDefault() + { + _storage.Setup(s => s.Read(StorageConstants.RetryState)).Returns(""); + + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(PipelineState.Ready, loaded.PipelineState); + } + + [Fact] + public void LoadRetryState_CorruptJson_ReturnsDefault() + { + _storage.Setup(s => s.Read(StorageConstants.RetryState)).Returns("not valid json{{{"); + + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(PipelineState.Ready, loaded.PipelineState); + } + + [Fact] + public void ClearRetryState_RemovesKey() + { + RetryStateStorage.ClearRetryState(_storage.Object); + + _storage.Verify(s => s.Remove(StorageConstants.RetryState), Times.Once); + } + } +} diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj index 099502d..407cbd6 100644 --- a/Tests/Tests.csproj +++ b/Tests/Tests.csproj @@ -1,7 +1,7 @@ - net6.0;net46 + net10.0;net6.0 false diff --git a/Tests/Utilities/EventPipelineTest.cs b/Tests/Utilities/EventPipelineTest.cs index 45ff74c..389bebf 100644 --- a/Tests/Utilities/EventPipelineTest.cs +++ b/Tests/Utilities/EventPipelineTest.cs @@ -44,6 +44,9 @@ public EventPipelineTest() _mockHttpClient .Setup(httpclient => httpclient.Upload(It.IsAny())) .ReturnsAsync(true); + _mockHttpClient + .Setup(httpclient => httpclient.UploadWithResponse(It.IsAny(), It.IsAny())) + .ReturnsAsync(new HTTPClient.Response { StatusCode = 200 }); _storage = new Mock(); @@ -93,7 +96,7 @@ public async Task TestFlush(IEventPipelineProvider provider) _storage.Verify(o => o.Rollover(), Times.Exactly(1)); _storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(1)); - _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(1)); + _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(1)); _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(1)); } @@ -126,7 +129,7 @@ public async void TestStop(IEventPipelineProvider provider) await Task.Delay(1000); _storage.Verify(o => o.Rollover(), Times.Never); _storage.Verify(o => o.Read(StorageConstants.Events), Times.Never); - _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Never); + _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Never); _storage.Verify(o => o.RemoveFile(_file), Times.Never); } @@ -143,7 +146,7 @@ public async Task TestFlushCausedByOverflow(IEventPipelineProvider provider) _storage.Verify(o => o.Rollover(), Times.Exactly(1)); _storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(1)); - _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(1)); + _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(1)); _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(1)); } @@ -182,7 +185,7 @@ public async Task TestPeriodicalFlush(IEventPipelineProvider provider) _storage.Verify(o => o.Rollover(), Times.Exactly(2)); _storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(2)); - _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(2)); + _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(2)); _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(2)); } @@ -203,7 +206,7 @@ public async Task TestFlushInterruptedWhenNoFileExist(IEventPipelineProvider pro _storage.Verify(o => o.Rollover(), Times.Exactly(1)); _storage.Verify(o => o.Read(StorageConstants.Events), Times.Exactly(1)); - _mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(0)); + _mockHttpClient.Verify(o => o.UploadWithResponse(_bytes, It.IsAny()), Times.Exactly(0)); _storage.Verify(o => o.RemoveFile(_file), Times.Exactly(0)); } @@ -237,14 +240,14 @@ public void TestSyncEventPipelineProviderWaits() int totalUploads = 0; _mockHttpClient - .Setup(client => client.Upload(It.IsAny())) - .Callback(bytes => + .Setup(client => client.UploadWithResponse(It.IsAny(), It.IsAny())) + .Callback((bytes, _retryCount) => { string content = System.Text.Encoding.UTF8.GetString(bytes); int count = content.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1; totalUploads += count; }) - .ReturnsAsync(true); + .ReturnsAsync(new HTTPClient.Response { StatusCode = 200 }); var config = new Configuration( writeKey: "123", @@ -272,9 +275,9 @@ public void TestSyncEventPipelineProviderWaits() analytics.Flush(); #pragma warning disable CS4014 // Silly compiler, this isn't an invocation so it doesn't need to be awaited - _mockHttpClient.Verify(client => client.Upload(It.IsAny()), Times.AtLeastOnce, $"Iteration {j} of {eventCount}"); -#pragma warning restore CS4014 - IInvocation lastUploadInvocation = _mockHttpClient.Invocations.Last(invocation => invocation.Method.Name == "Upload"); + _mockHttpClient.Verify(client => client.UploadWithResponse(It.IsAny(), It.IsAny()), Times.AtLeastOnce, $"Iteration {j} of {eventCount}"); +#pragma warning restore CS4014 + IInvocation lastUploadInvocation = _mockHttpClient.Invocations.Last(invocation => invocation.Method.Name == "UploadWithResponse"); int testsUploaded = System.Text.Encoding.UTF8 .GetString((byte[])lastUploadInvocation.Arguments[0]) .Split(new string[] { "test" }, StringSplitOptions.None).Length - 1; diff --git a/e2e-cli/Program.cs b/e2e-cli/Program.cs index c2f1c31..acd729b 100644 --- a/e2e-cli/Program.cs +++ b/e2e-cli/Program.cs @@ -4,9 +4,12 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Text.Json; using System.Threading; using Segment.Analytics; +using Segment.Analytics.Plugins; +using Segment.Analytics.Retry; using Segment.Analytics.Utilities; using Segment.Serialization; using JsonUtility = Segment.Serialization.JsonUtility; @@ -54,6 +57,8 @@ // config block (optional) int flushAt = 15; int flushInterval = 10; // seconds +int maxRetries = 100; +int timeoutSeconds = 20; if (root.TryGetProperty("config", out var configEl)) { if (configEl.TryGetProperty("flushAt", out var fa)) flushAt = fa.GetInt32(); @@ -63,16 +68,16 @@ int fiMs = fi.GetInt32(); flushInterval = Math.Max(1, fiMs / 1000); } + if (configEl.TryGetProperty("maxRetries", out var mr)) maxRetries = mr.GetInt32(); + if (configEl.TryGetProperty("timeout", out var to)) timeoutSeconds = to.GetInt32(); } // ── Error handler ──────────────────────────────────────────────────────────── -var errors = new List(); -var errorHandler = new CapturingErrorHandler(errors); +var deliveryErrors = new List(); +var errorHandler = new CapturingErrorHandler(deliveryErrors); // ── Build configuration ────────────────────────────────────────────────────── -// Determine scheme from apiHost so we can override SegmentURL for http:// targets -// (the SDK always prepends "https://" by default). // Determine scheme and strip it — the SDK prepends scheme via SegmentURL, // which we override in PlainHttpClient to respect http:// targets. string scheme = "https://"; @@ -97,6 +102,13 @@ var httpClientProvider = new PlainHttpClientProvider(scheme); +// Enable smart retry directly via a custom pipeline provider (same approach as Kotlin e2e-cli). +var retryHttpConfig = new HttpConfig( + new RateLimitConfig(enabled: true, maxRetryCount: maxRetries), + new BackoffConfig(enabled: true, maxRetryCount: maxRetries, baseBackoffInterval: 0.5) +); +var pipelineProvider = new RetryEnabledPipelineProvider(retryHttpConfig); + var configBuilder = new Configuration( writeKey, flushAt: flushAt, @@ -105,13 +117,18 @@ storageProvider: new InMemoryStorageProvider(), apiHost: rawApiHost, cdnHost: rawCdnHost, - httpClientProvider: httpClientProvider + httpClientProvider: httpClientProvider, + eventPipelineProvider: pipelineProvider ); -Console.Error.WriteLine($"[e2e-cli] Initialising analytics (writeKey={writeKey[..Math.Min(8, writeKey.Length)]}…, apiHost={apiHost ?? "default"})"); +Console.Error.WriteLine($"[e2e-cli] Initialising analytics (writeKey={writeKey[..Math.Min(8, writeKey.Length)]}…, apiHost={apiHost ?? "default"}, maxRetries={maxRetries})"); var analytics = new Analytics(configBuilder); +// Wait for SDK to initialize (settings fetch, pipeline start/stop cycle). +// This prevents duplicate uploads from the pipeline restart during init. +Thread.Sleep(2000); + // ── Process sequences ──────────────────────────────────────────────────────── int totalEvents = 0; @@ -149,10 +166,6 @@ case "track": { - // Set userId state first if provided - if (userId != null && analytics.UserId() != userId) - analytics.Identify(userId); - string eventName = ev.TryGetProperty("event", out var enEl) ? enEl.GetString() ?? "Unknown" : "Unknown"; @@ -163,9 +176,6 @@ case "page": { - if (userId != null && analytics.UserId() != userId) - analytics.Identify(userId); - string title = ev.TryGetProperty("name", out var nameEl) ? nameEl.GetString() ?? "" : ""; @@ -179,9 +189,6 @@ case "screen": { - if (userId != null && analytics.UserId() != userId) - analytics.Identify(userId); - string title = ev.TryGetProperty("name", out var nameEl) ? nameEl.GetString() ?? "" : ""; @@ -195,27 +202,15 @@ case "alias": { - // For alias: previousId becomes the current userId state, newId is the alias target. - // The SDK Alias(newId) uses _userInfo._userId as previousId. - string? previousId = ev.TryGetProperty("previousId", out var prevEl) - ? prevEl.GetString() - : null; string newId = userId ?? (ev.TryGetProperty("newId", out var newIdEl) ? newIdEl.GetString() ?? "" : ""); - - if (previousId != null && analytics.UserId() != previousId) - analytics.Identify(previousId); - analytics.Alias(newId); break; } case "group": { - if (userId != null && analytics.UserId() != userId) - analytics.Identify(userId); - string groupId = ev.TryGetProperty("groupId", out var gidEl) ? gidEl.GetString() ?? "" : ""; @@ -234,14 +229,75 @@ } } +// ── Flush and poll until delivery completes ────────────────────────────────── Console.Error.WriteLine($"[e2e-cli] Flushing {totalEvents} event(s)…"); +deliveryErrors.Clear(); + +// The SDK's CountFlushPolicy (flushAt) auto-triggers uploads. +// We trigger one explicit flush to handle cases where events haven't been flushed yet, +// then poll and only trigger retries when pending files persist across cycles. analytics.Flush(); -// Give the async pipeline time to upload -Thread.Sleep(5000); +// Poll until batch files are processed (uploaded or dropped). +long deadlineMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + (timeoutSeconds * 1000L); +bool everSeenPending = false; +int pollInterval = 300; +int pollCount = 0; +int stableEmptyCount = 0; + +while (DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() < deadlineMs) +{ + Thread.Sleep(pollInterval); + pollCount++; + + var pending = analytics.PendingUploads() + .Where(s => !string.IsNullOrEmpty(s)) + .ToList(); + + if (pending.Count > 0) + { + everSeenPending = true; + stableEmptyCount = 0; + deliveryErrors.Clear(); + // Trigger a new upload cycle for retry + analytics.Flush(); + } + else if (everSeenPending) + { + // Files gone — wait for a stable "empty" state to confirm upload completed + stableEmptyCount++; + if (stableEmptyCount >= 2) + break; + } + + // Adaptive intervals + if (pollCount >= 10 && pollInterval < 1000) pollInterval = 1000; + else if (pollCount >= 5 && pollInterval < 500) pollInterval = 500; +} // ── Output result ───────────────────────────────────────────────────────────── -bool success = errors.Count == 0; +var remaining = analytics.PendingUploads() + .Where(s => !string.IsNullOrEmpty(s)) + .ToList(); + +bool success; +string? error = null; + +if (remaining.Count > 0) +{ + success = false; + error = $"Delivery incomplete: {remaining.Count} batch file(s) still pending"; +} +else if (deliveryErrors.Count > 0) +{ + success = false; + error = "Delivery failed: " + string.Join("; ", deliveryErrors); +} +else +{ + success = true; +} + if (success) { Console.WriteLine($"{{\"success\":true,\"sentBatches\":1}}"); @@ -249,8 +305,7 @@ } else { - string combinedErrors = string.Join("; ", errors); - Console.WriteLine($"{{\"success\":false,\"sentBatches\":0,\"error\":\"{Escape(combinedErrors)}\"}}"); + Console.WriteLine($"{{\"success\":false,\"sentBatches\":0,\"error\":\"{Escape(error ?? "unknown")}\"}}"); Environment.Exit(1); } @@ -261,7 +316,6 @@ if (!parent.TryGetProperty(key, out var el) || el.ValueKind == JsonValueKind.Null) return null; - // Serialise the JsonElement back to a JSON string, then parse with Segment's JsonUtility string json = el.GetRawText(); try { @@ -312,3 +366,20 @@ public void OnExceptionThrown(Exception e) _errors.Add(msg); } } + +// ── Pipeline provider that enables retry from construction ──────────────────── + +class RetryEnabledPipelineProvider : Segment.Analytics.Utilities.IEventPipelineProvider +{ + private readonly HttpConfig _httpConfig; + public RetryEnabledPipelineProvider(HttpConfig httpConfig) => _httpConfig = httpConfig; + + public Segment.Analytics.Utilities.IEventPipeline Create(Analytics analytics, string key) + { + return new EventPipeline(analytics, key, + analytics.Configuration.WriteKey, + analytics.Configuration.FlushPolicies, + analytics.Configuration.ApiHost, + _httpConfig); + } +} diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index eea1cca..91e0a5f 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -1,7 +1,9 @@ { "sdk": "csharp", - "test_suites": "basic", - "auto_settings": false, + "test_suites": "basic,retry,retry-settings", + "auto_settings": true, "patch": null, - "env": {} + "env": { + "HTTP_CONFIG_SETTINGS": "true" + } } From b227c1b359336b5378f912f353de5406917e4b5b Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 28 May 2026 17:50:30 -0400 Subject: [PATCH 2/5] Updating .gitignore for e2e build directory --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index c5b6d2e..f83c870 100644 --- a/.gitignore +++ b/.gitignore @@ -92,6 +92,7 @@ $RECYCLE.BIN/ x64/ x86/ bld/ +build/ [Bb]in/ [Oo]bj/ [Ll]og/ From ed4ec1c9f8d2c2cd466bc14f24b4d8fb507d186f Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 1 Jun 2026 16:27:28 -0400 Subject: [PATCH 3/5] Fix retry state JSON round-trip and e2e drop masking RetryStateStorage: batch keys are absolute file paths and on Windows contain backslashes. SerializeState escaped them but DeserializeState never unescaped, so keys failed to round-trip and per-batch retry metadata was orphaned after a restart. Add UnescapeJsonString and an escape-aware closing-quote scan; apply to all parsed keys and string values. Adds round-trip tests for Windows paths and embedded quotes. e2e-cli: stop clearing deliveryErrors inside the poll loop. The pipeline only reports an error on a permanent drop (never on transient retries), so clearing while other batches are pending could mask a genuine drop as a passing test. --- .../Analytics/Retry/RetryStateStorage.cs | 51 ++++++++++++++++--- Tests/Retry/RetryStateStorageTest.cs | 47 +++++++++++++++++ e2e-cli/Program.cs | 5 +- 3 files changed, 95 insertions(+), 8 deletions(-) diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs index f646b65..876ff70 100644 --- a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs +++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs @@ -78,6 +78,45 @@ private static string EscapeJsonString(string s) return s.Replace("\\", "\\\\").Replace("\"", "\\\""); } + // Reverse of EscapeJsonString. Walks the string once so a "\\" sequence + // isn't re-interpreted as the start of another escape (which a naive + // chained Replace would do). + private static string UnescapeJsonString(string s) + { + if (s.IndexOf('\\') < 0) return s; + var sb = new StringBuilder(s.Length); + for (int i = 0; i < s.Length; i++) + { + if (s[i] == '\\' && i + 1 < s.Length) + { + char next = s[i + 1]; + if (next == '\\' || next == '"') + { + sb.Append(next); + i++; + continue; + } + } + sb.Append(s[i]); + } + return sb.ToString(); + } + + // Finds the index of the closing quote for a JSON string whose opening + // quote is at openQuote, skipping any backslash-escaped character so an + // escaped quote (\") doesn't terminate the scan early. Returns -1 if + // unterminated. + private static int FindClosingQuote(string json, int openQuote) + { + for (int i = openQuote + 1; i < json.Length; i++) + { + char c = json[i]; + if (c == '\\') { i++; continue; } + if (c == '"') return i; + } + return -1; + } + private static RetryState DeserializeState(string json) { // Manual parsing to avoid Serialization.NET's numeric string coercion. @@ -121,9 +160,9 @@ private static Dictionary ParseJsonFields(string json) { int keyStart = json.IndexOf('"', i); if (keyStart < 0) break; - int keyEnd = json.IndexOf('"', keyStart + 1); + int keyEnd = FindClosingQuote(json, keyStart); if (keyEnd < 0) break; - string key = json.Substring(keyStart + 1, keyEnd - keyStart - 1); + string key = UnescapeJsonString(json.Substring(keyStart + 1, keyEnd - keyStart - 1)); int colonIdx = json.IndexOf(':', keyEnd + 1); if (colonIdx < 0) break; @@ -142,9 +181,9 @@ private static Dictionary ParseJsonFields(string json) if (json[valStart] == '"') { - int valEnd = json.IndexOf('"', valStart + 1); + int valEnd = FindClosingQuote(json, valStart); if (valEnd < 0) break; - result[key] = json.Substring(valStart + 1, valEnd - valStart - 1); + result[key] = UnescapeJsonString(json.Substring(valStart + 1, valEnd - valStart - 1)); i = valEnd + 1; } else @@ -186,9 +225,9 @@ private static Dictionary ParseBatchMetadata(string json) { int keyStart = json.IndexOf('"', i); if (keyStart < 0) break; - int keyEnd = json.IndexOf('"', keyStart + 1); + int keyEnd = FindClosingQuote(json, keyStart); if (keyEnd < 0) break; - string batchFile = json.Substring(keyStart + 1, keyEnd - keyStart - 1); + string batchFile = UnescapeJsonString(json.Substring(keyStart + 1, keyEnd - keyStart - 1)); int objStart = json.IndexOf('{', keyEnd + 1); if (objStart < 0) break; diff --git a/Tests/Retry/RetryStateStorageTest.cs b/Tests/Retry/RetryStateStorageTest.cs index 187fab6..4aae059 100644 --- a/Tests/Retry/RetryStateStorageTest.cs +++ b/Tests/Retry/RetryStateStorageTest.cs @@ -71,6 +71,53 @@ public void RoundTrip_WithBatchMetadata() Assert.Equal(1, loaded.BatchMetadata["file2.json"].FailureCount); } + [Fact] + public void RoundTrip_WindowsPathKeys() + { + // Batch keys are absolute file paths; on Windows they contain backslashes. + // The serializer escapes them, so the parser must unescape them back to the + // exact live key — otherwise per-batch retry metadata is orphaned after a restart. + const string winPath = @"C:\Users\x\AppData\Local\segment\segment.events.123.tmp"; + var metadata = new Dictionary + { + { winPath, new BatchMetadata(failureCount: 4, nextRetryTime: 9000, firstFailureTime: 1000) } + }; + var state = new RetryState(batchMetadata: metadata); + + RetryStateStorage.SaveRetryState(_storage.Object, state); + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.True(loaded.BatchMetadata.ContainsKey(winPath)); + Assert.Equal(4, loaded.BatchMetadata[winPath].FailureCount); + Assert.Equal(9000L, loaded.BatchMetadata[winPath].NextRetryTime); + Assert.Equal(1000L, loaded.BatchMetadata[winPath].FirstFailureTime); + } + + [Fact] + public void RoundTrip_KeyWithEscapedQuoteAndTrailingBatch() + { + // A key containing a quote must not terminate the scan early and corrupt + // parsing of subsequent batches. + const string trickyKey = "file\"with\"quotes.tmp"; + const string plainKey = "file2.tmp"; + var metadata = new Dictionary + { + { trickyKey, new BatchMetadata(failureCount: 1, nextRetryTime: 1111, firstFailureTime: 2222) }, + { plainKey, new BatchMetadata(failureCount: 7, nextRetryTime: 3333, firstFailureTime: 4444) } + }; + var state = new RetryState(batchMetadata: metadata); + + RetryStateStorage.SaveRetryState(_storage.Object, state); + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(2, loaded.BatchMetadata.Count); + Assert.True(loaded.BatchMetadata.ContainsKey(trickyKey)); + Assert.Equal(1, loaded.BatchMetadata[trickyKey].FailureCount); + Assert.True(loaded.BatchMetadata.ContainsKey(plainKey)); + Assert.Equal(7, loaded.BatchMetadata[plainKey].FailureCount); + Assert.Equal(3333L, loaded.BatchMetadata[plainKey].NextRetryTime); + } + [Fact] public void LoadRetryState_NullStorage_ReturnsDefault() { diff --git a/e2e-cli/Program.cs b/e2e-cli/Program.cs index acd729b..9be36cd 100644 --- a/e2e-cli/Program.cs +++ b/e2e-cli/Program.cs @@ -258,8 +258,9 @@ { everSeenPending = true; stableEmptyCount = 0; - deliveryErrors.Clear(); - // Trigger a new upload cycle for retry + // Trigger a new upload cycle for retry. Errors captured from here on are + // genuine drops (the pipeline only reports on permanent rejection, never + // on transient retries), so they're left intact to fail the run. analytics.Flush(); } else if (everSeenPending) From 54106dc4da3af360820ada3a5ab31422594e8233 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 2 Jun 2026 12:04:56 -0400 Subject: [PATCH 4/5] Address medium review findings: IO dispatcher, state-machine race, enum persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit EventPipeline/SyncEventPipeline (M1): marshal SaveRetryState onto the FileIODispatcher via Scope.WithContext instead of writing the full prefs file inline on the network thread, matching Kotlin's withContext(fileIODispatcher) (EventPipeline.kt:273). EventPipeline/SyncEventPipeline (M2): mark _retryStateMachine volatile and snapshot it into a local once per flush cycle, so a concurrent UpdateHttpConfig swap on the AnalyticsDispatcher can't yield inconsistent upload decisions across batches in the same cycle. RetryStateStorage (M3): persist PipelineState by name (Ready/RateLimited) instead of the raw ordinal, decoupling the on-disk format from enum order and matching Kotlin. Still reads the legacy "1" ordinal for upgrades. e2e-cli (M5): allow the stable-empty early-break even when pending was never observed (a fast 200 can complete before the first poll), using an extra stable poll to absorb the async-write window — avoids paying the full timeout on every fast-success test. Declined M4 (ReportInternalError on routine drops): current behavior matches Kotlin and the e2e contract requires a dropped batch to report failure (http-status-codes.test.ts:331). Adds tests for by-name PipelineState round-trip and legacy ordinal load. --- .../Analytics/Retry/RetryStateStorage.cs | 13 +++++++---- .../Analytics/Utilities/EventPipeline.cs | 20 +++++++++++----- .../Analytics/Utilities/SyncEventPipeline.cs | 20 +++++++++++----- Tests/Retry/RetryStateStorageTest.cs | 23 +++++++++++++++++++ e2e-cli/Program.cs | 10 +++++--- 5 files changed, 66 insertions(+), 20 deletions(-) diff --git a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs index 876ff70..e3d6fd8 100644 --- a/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs +++ b/Analytics-CSharp/Segment/Analytics/Retry/RetryStateStorage.cs @@ -45,7 +45,7 @@ private static string SerializeState(RetryState state) { var sb = new StringBuilder(); sb.Append("{"); - sb.Append("\"pipelineState\":\"").Append((int)state.PipelineState).Append("\""); + sb.Append("\"pipelineState\":\"").Append(state.PipelineState.ToString()).Append("\""); if (state.WaitUntilTime.HasValue) sb.Append(",\"waitUntilTime\":\"").Append(state.WaitUntilTime.Value.ToString(CultureInfo.InvariantCulture)).Append("\""); sb.Append(",\"globalRetryCount\":\"").Append(state.GlobalRetryCount.ToString(CultureInfo.InvariantCulture)).Append("\""); @@ -124,10 +124,13 @@ private static RetryState DeserializeState(string json) var fields = ParseJsonFields(json); PipelineState pipelineState = PipelineState.Ready; - if (fields.TryGetValue("pipelineState", out string psVal) - && int.TryParse(psVal, NumberStyles.Integer, CultureInfo.InvariantCulture, out int psInt) - && psInt == 1) - pipelineState = PipelineState.RateLimited; + if (fields.TryGetValue("pipelineState", out string psVal)) + { + // Current format is the enum name; "1" is the legacy ordinal for RateLimited. + if (string.Equals(psVal, PipelineState.RateLimited.ToString(), StringComparison.Ordinal) + || psVal == "1") + pipelineState = PipelineState.RateLimited; + } long? waitUntilTime = null; if (fields.TryGetValue("waitUntilTime", out string waitStr) diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs index e390bae..3056e8a 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs @@ -24,7 +24,9 @@ public class EventPipeline: IEventPipeline private readonly IStorage _storage; - internal RetryStateMachine _retryStateMachine; + // volatile: swapped on AnalyticsDispatcher by UpdateHttpConfig, read on + // NetworkIODispatcher by Upload — ensures the upload thread sees the latest machine. + internal volatile RetryStateMachine _retryStateMachine; private RetryState _retryState; @@ -158,13 +160,17 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi await Scope.WithContext(_analytics.FileIODispatcher, async () => await _storage.Rollover()); + // Snapshot the (volatile) state machine once so a mid-flush UpdateHttpConfig + // swap can't yield inconsistent decisions across batches in the same cycle. + RetryStateMachine retryStateMachine = _retryStateMachine; + string[] fileUrlList = _storage.Read(StorageConstants.Events).Split(','); foreach (string url in fileUrlList) { if (string.IsNullOrEmpty(url)) continue; - var decision = _retryStateMachine.ShouldUploadBatch(_retryState, url); + var decision = retryStateMachine.ShouldUploadBatch(_retryState, url); _retryState = decision.Item2; if (decision.Item1 is UploadDecision.SkipAllBatchesDecision) @@ -183,7 +189,8 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, message: "Batch dropped: " + dropDecision.Reason); _storage.RemoveFile(url); - RetryStateStorage.SaveRetryState(_storage, _retryState); + await Scope.WithContext(_analytics.FileIODispatcher, () => + RetryStateStorage.SaveRetryState(_storage, _retryState)); continue; } @@ -192,7 +199,7 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi if (data == null) continue; - int retryCount = _retryStateMachine.GetRetryCount(_retryState, url); + int retryCount = retryStateMachine.GetRetryCount(_retryState, url); int statusCode = 0; int? retryAfterSeconds = null; bool shouldCleanup = true; @@ -238,8 +245,9 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi batchFile: url, currentTime: DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() ); - _retryState = _retryStateMachine.HandleResponse(_retryState, responseInfo); - RetryStateStorage.SaveRetryState(_storage, _retryState); + _retryState = retryStateMachine.HandleResponse(_retryState, responseInfo); + await Scope.WithContext(_analytics.FileIODispatcher, () => + RetryStateStorage.SaveRetryState(_storage, _retryState)); if (shouldCleanup) { diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs index ad2e1a8..4657be9 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs @@ -38,7 +38,9 @@ public class SyncEventPipeline: IEventPipeline private readonly IStorage _storage; - internal RetryStateMachine _retryStateMachine; + // volatile: swapped on AnalyticsDispatcher by UpdateHttpConfig, read on + // NetworkIODispatcher by Upload — ensures the upload thread sees the latest machine. + internal volatile RetryStateMachine _retryStateMachine; private RetryState _retryState; @@ -183,13 +185,17 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi await Scope.WithContext(_analytics.FileIODispatcher, async () => await _storage.Rollover()); + // Snapshot the (volatile) state machine once so a mid-flush UpdateHttpConfig + // swap can't yield inconsistent decisions across batches in the same cycle. + RetryStateMachine retryStateMachine = _retryStateMachine; + string[] fileUrlList = _storage.Read(StorageConstants.Events).Split(','); foreach (string url in fileUrlList) { if (string.IsNullOrEmpty(url)) continue; - var decision = _retryStateMachine.ShouldUploadBatch(_retryState, url); + var decision = retryStateMachine.ShouldUploadBatch(_retryState, url); _retryState = decision.Item2; if (decision.Item1 is UploadDecision.SkipAllBatchesDecision) @@ -208,7 +214,8 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi _analytics.ReportInternalError(AnalyticsErrorType.NetworkServerRejected, message: "Batch dropped: " + dropDecision.Reason); _storage.RemoveFile(url); - RetryStateStorage.SaveRetryState(_storage, _retryState); + await Scope.WithContext(_analytics.FileIODispatcher, () => + RetryStateStorage.SaveRetryState(_storage, _retryState)); continue; } @@ -217,7 +224,7 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi if (data == null) continue; - int retryCount = _retryStateMachine.GetRetryCount(_retryState, url); + int retryCount = retryStateMachine.GetRetryCount(_retryState, url); int statusCode = 0; int? retryAfterSeconds = null; bool shouldCleanup = true; @@ -263,8 +270,9 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi batchFile: url, currentTime: DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() ); - _retryState = _retryStateMachine.HandleResponse(_retryState, responseInfo); - RetryStateStorage.SaveRetryState(_storage, _retryState); + _retryState = retryStateMachine.HandleResponse(_retryState, responseInfo); + await Scope.WithContext(_analytics.FileIODispatcher, () => + RetryStateStorage.SaveRetryState(_storage, _retryState)); if (shouldCleanup) { diff --git a/Tests/Retry/RetryStateStorageTest.cs b/Tests/Retry/RetryStateStorageTest.cs index 4aae059..66e0d1a 100644 --- a/Tests/Retry/RetryStateStorageTest.cs +++ b/Tests/Retry/RetryStateStorageTest.cs @@ -139,6 +139,29 @@ public void LoadRetryState_EmptyString_ReturnsDefault() Assert.Equal(PipelineState.Ready, loaded.PipelineState); } + [Fact] + public void LoadRetryState_LegacyOrdinalPipelineState_DeserializesRateLimited() + { + // States persisted by an earlier build wrote pipelineState as the ordinal int. + // "1" must still load as RateLimited so a rate-limit window survives an upgrade. + _storage.Setup(s => s.Read(StorageConstants.RetryState)) + .Returns("{\"pipelineState\":\"1\",\"waitUntilTime\":\"999\",\"globalRetryCount\":\"0\"}"); + + RetryState loaded = RetryStateStorage.LoadRetryState(_storage.Object); + + Assert.Equal(PipelineState.RateLimited, loaded.PipelineState); + Assert.Equal(999L, loaded.WaitUntilTime); + } + + [Fact] + public void SerializeState_WritesPipelineStateByName() + { + var state = new RetryState(pipelineState: PipelineState.RateLimited, waitUntilTime: 1L); + RetryStateStorage.SaveRetryState(_storage.Object, state); + + Assert.Contains("\"pipelineState\":\"RateLimited\"", _savedValue); + } + [Fact] public void LoadRetryState_CorruptJson_ReturnsDefault() { diff --git a/e2e-cli/Program.cs b/e2e-cli/Program.cs index 9be36cd..cb99218 100644 --- a/e2e-cli/Program.cs +++ b/e2e-cli/Program.cs @@ -263,11 +263,15 @@ // on transient retries), so they're left intact to fail the run. analytics.Flush(); } - else if (everSeenPending) + else { - // Files gone — wait for a stable "empty" state to confirm upload completed + // Files gone — wait for a stable "empty" state to confirm upload completed. + // A flush was already issued before the loop, so a fast 200 can finish + // before the first poll (everSeenPending never set); break in that case too, + // but require one extra stable poll to absorb the brief async-write window. stableEmptyCount++; - if (stableEmptyCount >= 2) + int stableThreshold = everSeenPending ? 2 : 3; + if (stableEmptyCount >= stableThreshold) break; } From eda29912beb39573bc156831cff69b4f3c72297d Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 2 Jun 2026 15:16:51 -0400 Subject: [PATCH 5/5] Document that custom pipelines must consume CDN httpConfig themselves SegmentDestination.Update applies CDN-driven httpConfig only to the built-in EventPipeline/SyncEventPipeline (via as-cast). A custom IEventPipeline supplied through Configuration.EventPipelineProvider silently stays in its constructed retry mode. Document the contract on IEventPipeline and at the cast site rather than expanding the public API surface (HttpConfig and friends are internal, and adding a method to the public interface would source-break existing implementers). --- .../Segment/Analytics/Plugins/SegmentDestination.cs | 3 +++ .../Segment/Analytics/Utilities/IEventPipeline.cs | 11 +++++++++++ 2 files changed, 14 insertions(+) diff --git a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs index de0dd9d..5ee218c 100644 --- a/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs +++ b/Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs @@ -89,6 +89,9 @@ public override void Update(Settings settings, UpdateType type) HttpConfig parsedConfig = HttpConfigParser.Parse(httpConfigJson); if (parsedConfig != null) { + // Only the built-in pipelines understand HttpConfig. A custom IEventPipeline + // supplied via Configuration.EventPipelineProvider is responsible for reading + // httpConfig itself (see the note on IEventPipeline). EventPipeline concretePipeline = _pipeline as EventPipeline; concretePipeline?.UpdateHttpConfig(parsedConfig); diff --git a/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs b/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs index b9cce6d..6ecd50d 100644 --- a/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs +++ b/Analytics-CSharp/Segment/Analytics/Utilities/IEventPipeline.cs @@ -1,5 +1,16 @@ namespace Segment.Analytics.Utilities { + /// + /// Abstraction over the event pipeline that buffers, batches, and uploads events. + /// A custom implementation can be supplied via . + /// + /// NOTE: CDN-driven httpConfig (smart-retry enable/disable and tuning) is applied by + /// SegmentDestination only to the built-in EventPipeline and SyncEventPipeline. + /// A custom will not receive it automatically — if your pipeline + /// needs CDN-driven retry configuration, read it yourself from + /// settings.Integrations.GetJsonObject("Segment.io").GetJsonObject("httpConfig") in your + /// plugin's Update(Settings, UpdateType). + /// public interface IEventPipeline { bool Running { get; }