diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Strategies/AzureBlobStorageUploadStrategy.cs b/src/ByteSync.Client/Services/Communications/Transfers/Strategies/AzureBlobStorageUploadStrategy.cs index 42b62e54..cb65e129 100644 --- a/src/ByteSync.Client/Services/Communications/Transfers/Strategies/AzureBlobStorageUploadStrategy.cs +++ b/src/ByteSync.Client/Services/Communications/Transfers/Strategies/AzureBlobStorageUploadStrategy.cs @@ -21,7 +21,7 @@ public async Task UploadAsync(FileUploaderSlice slice, FileS try { var options = new BlobClientOptions(); - options.Retry.NetworkTimeout = TimeSpan.FromMinutes(1); + options.Retry.NetworkTimeout = TimeSpan.FromMinutes(10); slice.MemoryStream.Position = 0; var blob = new BlobClient(new Uri(storageLocation.Url), options); diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategy.cs b/src/ByteSync.Client/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategy.cs index a22df878..ea6e4573 100644 --- a/src/ByteSync.Client/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategy.cs +++ b/src/ByteSync.Client/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategy.cs @@ -29,7 +29,7 @@ public async Task UploadAsync(FileUploaderSlice slice, FileS slice.MemoryStream.Position = 0; using var httpClient = _httpClientFactory.CreateClient(); - httpClient.Timeout = TimeSpan.FromMinutes(1); + httpClient.Timeout = Timeout.InfiniteTimeSpan; httpClient.DefaultRequestHeaders.ExpectContinue = false; // Build ReadOnlyMemory without copying when possible; fallback to ToArray otherwise diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Strategies/UploadFailureClassifier.cs b/src/ByteSync.Client/Services/Communications/Transfers/Strategies/UploadFailureClassifier.cs index 37418add..a6caf226 100644 --- a/src/ByteSync.Client/Services/Communications/Transfers/Strategies/UploadFailureClassifier.cs +++ b/src/ByteSync.Client/Services/Communications/Transfers/Strategies/UploadFailureClassifier.cs @@ -1,4 +1,7 @@ using System; +using System.IO; +using System.Net.Http; +using System.Net.Sockets; using System.Threading; using ByteSync.Common.Business.Communications.Transfers; @@ -12,12 +15,44 @@ public static UploadFileResponse Classify(Exception exception, CancellationToken { return UploadFileResponse.ClientCancellation(exception); } - + if (exception is OperationCanceledException) { return UploadFileResponse.ClientTimeout(exception); } + if (IsClientNetworkError(exception)) + { + return UploadFileResponse.ClientNetworkError(exception); + } + return UploadFileResponse.Failure(500, exception); } + + private static bool IsClientNetworkError(Exception exception) + { + if (exception is not HttpRequestException and not IOException and not SocketException) + { + return false; + } + + var current = exception; + while (current != null) + { + if (current is SocketException socketException) + { + return socketException.SocketErrorCode is SocketError.ConnectionReset + or SocketError.ConnectionAborted + or SocketError.TimedOut + or SocketError.NetworkDown + or SocketError.NetworkUnreachable + or SocketError.HostDown + or SocketError.HostUnreachable; + } + + current = current.InnerException; + } + + return false; + } } diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs index e910950f..f3e258de 100644 --- a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs +++ b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs @@ -14,7 +14,7 @@ public class AdaptiveUploadController : IAdaptiveUploadController private const int MAX_CHUNK_SIZE_BYTES = 16 * 1024 * 1024; // 16 MB private const int MIN_PARALLELISM = 2; private const int MAX_PARALLELISM = 4; - private const int CLIENT_TIMEOUTS_BEFORE_DOWNSCALE = 2; + private const int CLIENT_NETWORK_ISSUES_BEFORE_DOWNSCALE = 2; private const double MULTIPLIER_2_X = 2.0; private const double MULTIPLIER_1_75_X = 1.75; @@ -37,7 +37,7 @@ public class AdaptiveUploadController : IAdaptiveUploadController private readonly Queue _recentBytes; private int _successesInWindow; private int _windowSize; - private int _consecutiveClientTimeouts; + private int _consecutiveClientNetworkIssues; private readonly ILogger _logger; private readonly object _syncRoot = new(); @@ -90,17 +90,17 @@ public void RecordUploadResult(UploadResult uploadResult) { if (uploadResult.FailureKind == UploadFailureKind.ClientCancellation) { - _consecutiveClientTimeouts = 0; + _consecutiveClientNetworkIssues = 0; return; } - if (uploadResult.FailureKind == UploadFailureKind.ClientTimeout) + if (uploadResult.FailureKind is UploadFailureKind.ClientTimeout or UploadFailureKind.ClientNetworkError) { - HandleClientTimeout(uploadResult.FileId); + HandleClientNetworkIssue(uploadResult.FileId, uploadResult.FailureKind); return; } - _consecutiveClientTimeouts = 0; + _consecutiveClientNetworkIssues = 0; EnqueueSample(uploadResult.Elapsed, uploadResult.IsSuccess, uploadResult.ActualBytes); @@ -162,26 +162,27 @@ private void EnqueueSample(TimeSpan elapsed, bool isSuccess, long actualBytes) } } - private void HandleClientTimeout(string? fileId) + private void HandleClientNetworkIssue(string? fileId, UploadFailureKind failureKind) { - _consecutiveClientTimeouts += 1; - if (_consecutiveClientTimeouts < CLIENT_TIMEOUTS_BEFORE_DOWNSCALE) + _consecutiveClientNetworkIssues += 1; + if (_consecutiveClientNetworkIssues < CLIENT_NETWORK_ISSUES_BEFORE_DOWNSCALE) { _logger.LogDebug( - "Adaptive: file {FileId} client timeout {TimeoutCount}/{Threshold}. Waiting before downscale", + "Adaptive: file {FileId} client network issue {FailureKind} {IssueCount}/{Threshold}. Waiting before downscale", fileId ?? "-", - _consecutiveClientTimeouts, - CLIENT_TIMEOUTS_BEFORE_DOWNSCALE); + failureKind, + _consecutiveClientNetworkIssues, + CLIENT_NETWORK_ISSUES_BEFORE_DOWNSCALE); return; } _logger.LogInformation( - "Adaptive: file {FileId} client timeout threshold reached ({TimeoutCount}). Downscaling upload settings", + "Adaptive: file {FileId} client network issue threshold reached ({IssueCount}). Downscaling upload settings", fileId ?? "-", - _consecutiveClientTimeouts); - _consecutiveClientTimeouts = 0; - Downscale(fileId, "client timeouts"); + _consecutiveClientNetworkIssues); + _consecutiveClientNetworkIssues = 0; + Downscale(fileId, "client network issues"); } private bool HandleBandwidthReset(bool isSuccess, int? statusCode) @@ -395,7 +396,7 @@ private void ResetState() _currentChunkSizeBytes = Math.Clamp(INITIAL_CHUNK_SIZE_BYTES, MIN_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES); _currentParallelism = MIN_PARALLELISM; _windowSize = _currentParallelism; - _consecutiveClientTimeouts = 0; + _consecutiveClientNetworkIssues = 0; } ResetWindow(); diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileSlicer.cs b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileSlicer.cs index 954274ca..5497e495 100644 --- a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileSlicer.cs +++ b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileSlicer.cs @@ -63,7 +63,10 @@ public async Task SliceAndEncryptAsync(SharedFileDefinition sharedFileDefinition _semaphoreSlim.Release(); } - await _availableSlices.Writer.WriteAsync(fileUploaderSlice); + if (!await TryWriteSliceAsync(fileUploaderSlice)) + { + return; + } } else { @@ -126,7 +129,10 @@ public async Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDe _semaphoreSlim.Release(); } - await _availableSlices.Writer.WriteAsync(fileUploaderSlice); + if (!await TryWriteSliceAsync(fileUploaderSlice)) + { + return; + } } else { @@ -152,4 +158,20 @@ public async Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDe _availableSlices.Writer.TryComplete(ex); } } + + private async Task TryWriteSliceAsync(FileUploaderSlice fileUploaderSlice) + { + try + { + await _availableSlices.Writer.WriteAsync(fileUploaderSlice); + + return true; + } + catch (ChannelClosedException) when (_exceptionOccurred.WaitOne(0)) + { + await fileUploaderSlice.MemoryStream.DisposeAsync(); + + return false; + } + } } diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadCoordinator.cs b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadCoordinator.cs index 1756701f..52196b65 100644 --- a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadCoordinator.cs +++ b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadCoordinator.cs @@ -12,7 +12,7 @@ public class FileUploadCoordinator : IFileUploadCoordinator private readonly ManualResetEvent _uploadingIsFinished; private readonly ManualResetEvent _exceptionOccurred; private readonly ILogger _logger; - private const int CHANNEL_CAPACITY = 8; + private const int CHANNEL_CAPACITY = 2; public FileUploadCoordinator(ILogger logger) { diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadWorker.cs b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadWorker.cs index 07483dbd..62f3b5ad 100644 --- a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadWorker.cs +++ b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadWorker.cs @@ -28,10 +28,7 @@ public class FileUploadWorker : IFileUploadWorker private CancellationTokenSource CancellationTokenSource { get; } private static int _workerTaskCounter; - - private const int AttemptTimeoutFloorSeconds = 60; - private const int AttemptTimeoutCeilingSeconds = 120; - private const int SecondsPerMegabyteHeuristic = 3; + private int _remainingSlicesDrained; public FileUploadWorker( IPolicyFactory policyFactory, @@ -86,66 +83,84 @@ public FileUploadWorker( public async Task UploadAvailableSlicesAdaptiveAsync(Channel availableSlices, UploadProgressState progressState) { var workerId = Interlocked.Increment(ref _workerTaskCounter); - while (await availableSlices.Reader.WaitToReadAsync()) + try { - if (!availableSlices.Reader.TryRead(out var slice)) - { - continue; - } - - try + while (await availableSlices.Reader.WaitToReadAsync(CancellationTokenSource.Token)) { - var sliceStart = Stopwatch.StartNew(); - - await IncrementConcurrentAsync(progressState); - var policy = _policyFactory.BuildFileUploadPolicy(); - var attempt = 0; - - var response = await policy.ExecuteAsync(async () => + if (_exceptionOccurred.WaitOne(0)) { - attempt++; - - return await ExecuteUploadAttemptAsync(slice, workerId, attempt, CancellationTokenSource.Token); - }); - - EnsureSuccessOrThrow(response); - - var fileName = _sharedFileDefinition.GetFileName(slice.PartNumber); - var assertSw = Stopwatch.StartNew(); - _logger.LogDebug("UploadAvailableSlice: worker {WorkerId} start asserting slice {Number} for {FileName}", - workerId, slice.PartNumber, fileName); - - var transferParameters = new TransferParameters + return; + } + + if (!availableSlices.Reader.TryRead(out var slice)) { - SessionId = _sharedFileDefinition.SessionId, - SharedFileDefinition = _sharedFileDefinition, - PartNumber = slice.PartNumber, - PartSizeInBytes = slice.MemoryStream.Length - }; - - await AssertSliceUploadedAsync(policy, transferParameters, workerId, slice.PartNumber, fileName, assertSw); - assertSw.Stop(); - _logger.LogDebug( - "UploadAvailableSlice: worker {WorkerId} finished asserting slice {Number} for {FileName} in {ElapsedMs} ms", - workerId, slice.PartNumber, fileName, assertSw.ElapsedMilliseconds); - - // Success path bookkeeping - await UpdateProgressOnSuccessAsync(progressState, slice, sliceStart); - } - catch (Exception ex) - { - await HandleUploadExceptionAsync(progressState, ex, workerId); - - return; - } - finally - { - DisposeSlice(slice); - await DecrementConcurrentAsync(progressState); - - // No final release here: attempts handled slot release per attempt + continue; + } + + try + { + var sliceStart = Stopwatch.StartNew(); + + await IncrementConcurrentAsync(progressState); + var policy = _policyFactory.BuildFileUploadPolicy(); + var attempt = 0; + + var response = await policy.ExecuteAsync(async () => + { + attempt++; + + return await ExecuteUploadAttemptAsync(slice, workerId, attempt, CancellationTokenSource.Token); + }); + + if (!response.IsSuccess && CancellationTokenSource.IsCancellationRequested && _exceptionOccurred.WaitOne(0)) + { + return; + } + + EnsureSuccessOrThrow(response); + + var fileName = _sharedFileDefinition.GetFileName(slice.PartNumber); + var assertSw = Stopwatch.StartNew(); + _logger.LogDebug("UploadAvailableSlice: worker {WorkerId} start asserting slice {Number} for {FileName}", + workerId, slice.PartNumber, fileName); + + var transferParameters = new TransferParameters + { + SessionId = _sharedFileDefinition.SessionId, + SharedFileDefinition = _sharedFileDefinition, + PartNumber = slice.PartNumber, + PartSizeInBytes = slice.MemoryStream.Length + }; + + await AssertSliceUploadedAsync(policy, transferParameters, workerId, slice.PartNumber, fileName, assertSw); + assertSw.Stop(); + _logger.LogDebug( + "UploadAvailableSlice: worker {WorkerId} finished asserting slice {Number} for {FileName} in {ElapsedMs} ms", + workerId, slice.PartNumber, fileName, assertSw.ElapsedMilliseconds); + + await UpdateProgressOnSuccessAsync(progressState, slice, sliceStart); + } + catch (OperationCanceledException) when (CancellationTokenSource.IsCancellationRequested && _exceptionOccurred.WaitOne(0)) + { + return; + } + catch (Exception ex) + { + await HandleUploadExceptionAsync(progressState, availableSlices, ex, workerId); + + return; + } + finally + { + DisposeSlice(slice); + await DecrementConcurrentAsync(progressState); + } } } + catch (OperationCanceledException) when (CancellationTokenSource.IsCancellationRequested) + { + return; + } await CompleteIfFinishedAsync(progressState); } @@ -154,13 +169,23 @@ private async Task ExecuteUploadAttemptAsync(FileUploaderSli CancellationToken globalToken) { var attemptStart = DateTime.UtcNow; - var timeoutSec = ComputeAttemptTimeoutSeconds(slice); + var currentChunkSizeBytes = _adaptiveUploadController.CurrentChunkSizeBytes; + var timeoutSec = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + slice.MemoryStream.Length, + attempt, + currentChunkSizeBytes); using var attemptCts = CancellationTokenSource.CreateLinkedTokenSource(globalToken); attemptCts.CancelAfter(TimeSpan.FromSeconds(timeoutSec)); var beforeWait = _uploadSlots.CurrentCount; - _logger.LogDebug("UploadAvailableSlice: worker {WorkerId} waiting for upload slot (available {Available})", - workerId, beforeWait); + _logger.LogDebug( + "UploadAvailableSlice: worker {WorkerId} waiting for upload slot (available {Available}), attempt {Attempt}, timeout {TimeoutSec}s, slice {SliceKb} KB, currentChunk {CurrentChunkKb} KB", + workerId, + beforeWait, + attempt, + timeoutSec, + Math.Round(slice.MemoryStream.Length / 1024d), + Math.Round(currentChunkSizeBytes / 1024d)); var acquired = false; try @@ -269,19 +294,6 @@ private async Task ExecuteUploadAttemptAsync(FileUploaderSli } } - private static int ComputeAttemptTimeoutSeconds(FileUploaderSlice slice) - { - return ComputeAttemptTimeoutSeconds(slice.MemoryStream.Length); - } - - private static int ComputeAttemptTimeoutSeconds(long sliceLengthBytes) - { - var sizeMb = Math.Max(1, (int)Math.Ceiling(sliceLengthBytes / (1024d * 1024d))); - var timeoutSec = Math.Clamp(SecondsPerMegabyteHeuristic * sizeMb, AttemptTimeoutFloorSeconds, AttemptTimeoutCeilingSeconds); - - return timeoutSec; - } - private static UploadFailureKind RefineFailureKind(UploadFailureKind kind, CancellationTokenSource attemptCts, CancellationToken globalToken) { @@ -374,7 +386,7 @@ private static void EnsureSuccessOrThrow(UploadFileResponse? response) { if (response == null || !response.IsSuccess) { - throw new Exception( + throw new InvalidOperationException( $"UploadAvailableSlice: upload attempt failed. Status: {response?.StatusCode}, Error: {response?.ErrorMessage}"); } } @@ -399,7 +411,11 @@ private async Task UpdateProgressOnSuccessAsync(UploadProgressState progressStat } } - private async Task HandleUploadExceptionAsync(UploadProgressState progressState, Exception ex, int workerId) + private async Task HandleUploadExceptionAsync( + UploadProgressState progressState, + Channel availableSlices, + Exception ex, + int workerId) { _logger.LogError(ex, "UploadAvailableSlice: worker {WorkerId} error", workerId); @@ -414,6 +430,22 @@ private async Task HandleUploadExceptionAsync(UploadProgressState progressState, } _exceptionOccurred.Set(); + availableSlices.Writer.TryComplete(ex); + await CancellationTokenSource.CancelAsync(); + DrainRemainingSlices(availableSlices.Reader); + } + + private void DrainRemainingSlices(ChannelReader reader) + { + if (Interlocked.Exchange(ref _remainingSlicesDrained, 1) == 1) + { + return; + } + + while (reader.TryRead(out var slice)) + { + DisposeSlice(slice); + } } private void DisposeSlice(FileUploaderSlice slice) @@ -477,7 +509,7 @@ private async Task DoUpload(FileUploaderSlice slice, int wor { var fileName = _sharedFileDefinition.GetFileName(slice.PartNumber); _logger.LogError(ex, - "Error while uploading slice {Number} for {FileName} (worker {WorkerId}), sharedFileDefinitionId:{sharedFileDefinitionId} ", + "Error while uploading slice {Number} for {FileName} (worker {WorkerId}), SharedFileDefinitionId:{SharedFileDefinitionId} ", slice.PartNumber, fileName, workerId, _sharedFileDefinition.Id); throw; diff --git a/src/ByteSync.Client/Services/Communications/Transfers/Uploading/UploadAttemptTimeoutPolicy.cs b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/UploadAttemptTimeoutPolicy.cs new file mode 100644 index 00000000..a1376e81 --- /dev/null +++ b/src/ByteSync.Client/Services/Communications/Transfers/Uploading/UploadAttemptTimeoutPolicy.cs @@ -0,0 +1,57 @@ +using System; + +namespace ByteSync.Services.Communications.Transfers.Uploading; + +public static class UploadAttemptTimeoutPolicy +{ + private const int AttemptTimeoutFloorSeconds = 60; + private const int AttemptTimeoutCeilingSeconds = 180; + private const int SecondsPerMegabyteHeuristic = 3; + private const int RetryGrowthSeconds = 15; + private const int StaleChunkPenaltySeconds = 5; + + public static int ComputeTimeoutSeconds(long sliceLengthBytes, int attempt, int currentChunkSizeBytes) + { + var timeoutSec = (long)ComputeBaseTimeoutSeconds(sliceLengthBytes); + if (attempt <= 1) + { + return (int)timeoutSec; + } + + var staleChunkPenalty = ComputeStaleChunkPenaltySeconds(sliceLengthBytes, currentChunkSizeBytes); + timeoutSec += (long)(attempt - 1) * RetryGrowthSeconds + staleChunkPenalty; + + return (int)Math.Clamp(timeoutSec, AttemptTimeoutFloorSeconds, AttemptTimeoutCeilingSeconds); + } + + private static int ComputeBaseTimeoutSeconds(long sliceLengthBytes) + { + var sizeMb = Math.Max(1d, Math.Ceiling(sliceLengthBytes / (1024d * 1024d))); + var ceilingSizeMb = Math.Ceiling(AttemptTimeoutCeilingSeconds / (double)SecondsPerMegabyteHeuristic); + if (sizeMb >= ceilingSizeMb) + { + return AttemptTimeoutCeilingSeconds; + } + + return Math.Clamp( + SecondsPerMegabyteHeuristic * (int)sizeMb, + AttemptTimeoutFloorSeconds, + AttemptTimeoutCeilingSeconds); + } + + private static long ComputeStaleChunkPenaltySeconds(long sliceLengthBytes, int currentChunkSizeBytes) + { + if (currentChunkSizeBytes <= 0 || sliceLengthBytes <= currentChunkSizeBytes) + { + return 0; + } + + var chunkRatio = Math.Ceiling(sliceLengthBytes / (double)currentChunkSizeBytes); + if (chunkRatio >= AttemptTimeoutCeilingSeconds / (double)StaleChunkPenaltySeconds + 1) + { + return AttemptTimeoutCeilingSeconds; + } + + return (long)(chunkRatio - 1) * StaleChunkPenaltySeconds; + } +} diff --git a/src/ByteSync.Common/Business/Communications/Transfers/UploadFailureKind.cs b/src/ByteSync.Common/Business/Communications/Transfers/UploadFailureKind.cs index 11db98f1..28235ecd 100644 --- a/src/ByteSync.Common/Business/Communications/Transfers/UploadFailureKind.cs +++ b/src/ByteSync.Common/Business/Communications/Transfers/UploadFailureKind.cs @@ -6,4 +6,5 @@ public enum UploadFailureKind ServerError = 1, ClientCancellation = 2, ClientTimeout = 3, + ClientNetworkError = 4, } diff --git a/src/ByteSync.Common/Business/Communications/Transfers/UploadFileResponse.cs b/src/ByteSync.Common/Business/Communications/Transfers/UploadFileResponse.cs index c50b526f..ae10d220 100644 --- a/src/ByteSync.Common/Business/Communications/Transfers/UploadFileResponse.cs +++ b/src/ByteSync.Common/Business/Communications/Transfers/UploadFileResponse.cs @@ -66,4 +66,16 @@ public static UploadFileResponse ClientTimeout(Exception exception) FailureKind = UploadFailureKind.ClientTimeout, }; } + + public static UploadFileResponse ClientNetworkError(Exception exception) + { + return new UploadFileResponse + { + IsSuccess = false, + StatusCode = 0, + ErrorMessage = exception.Message, + Exception = exception, + FailureKind = UploadFailureKind.ClientNetworkError, + }; + } } diff --git a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategyTests.cs b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategyTests.cs index e177d193..24da5001 100644 --- a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategyTests.cs +++ b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategyTests.cs @@ -26,15 +26,28 @@ private static FileUploaderSlice CreateSlice(int partNumber = 1, int sizeBytes = private static FileStorageLocation CreateLocation() => new(UploadUrl, StorageProvider.CloudflareR2); private static (CloudflareR2UploadStrategy strategy, Mock handler) CreateStrategy() + { + var (strategy, handler, _) = CreateStrategyWithCreatedClients(); + return (strategy, handler); + } + + private static (CloudflareR2UploadStrategy strategy, Mock handler, List createdClients) + CreateStrategyWithCreatedClients() { var handler = new Mock(MockBehavior.Strict); + var createdClients = new List(); var factory = new Mock(); factory .Setup(f => f.CreateClient(It.IsAny())) - .Returns(() => new HttpClient(handler.Object, disposeHandler: false)); + .Returns(() => + { + var httpClient = new HttpClient(handler.Object, disposeHandler: false); + createdClients.Add(httpClient); + return httpClient; + }); var strategy = new CloudflareR2UploadStrategy(NullLogger.Instance, factory.Object); - return (strategy, handler); + return (strategy, handler, createdClients); } private static void SetupHandler(Mock handler, HttpResponseMessage response) @@ -70,6 +83,18 @@ public async Task UploadAsync_On2xx_ShouldReturnSuccess() response.FailureKind.Should().Be(UploadFailureKind.None); } + [Test] + public async Task UploadAsync_ShouldLetCallerTokenControlAttemptTimeout() + { + var (strategy, handler, createdClients) = CreateStrategyWithCreatedClients(); + SetupHandler(handler, new HttpResponseMessage(HttpStatusCode.OK)); + + await strategy.UploadAsync(CreateSlice(), CreateLocation(), CancellationToken.None); + + createdClients.Should().ContainSingle(); + createdClients[0].Timeout.Should().Be(Timeout.InfiniteTimeSpan); + } + [Test] public async Task UploadAsync_On500_ShouldReturnServerFailure_WithRealStatusCode() { diff --git a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/UploadFailureClassifierTests.cs b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/UploadFailureClassifierTests.cs index 0c8f8a9a..0c6a8acf 100644 --- a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/UploadFailureClassifierTests.cs +++ b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/UploadFailureClassifierTests.cs @@ -2,6 +2,8 @@ using ByteSync.Services.Communications.Transfers.Strategies; using FluentAssertions; using NUnit.Framework; +using System.Net.Http; +using System.Net.Sockets; namespace ByteSync.Client.UnitTests.Services.Communications.Transfers.Strategies; @@ -80,16 +82,46 @@ public void Classify_GenericException_ShouldReturnServerError500() } [Test] - public void Classify_HttpRequestException_ShouldReturnServerError500() + public void Classify_HttpRequestExceptionWithoutSocketFailure_ShouldReturnServerError500() { using var cts = new CancellationTokenSource(); var ex = new HttpRequestException("network issue"); - + var response = UploadFailureClassifier.Classify(ex, cts.Token); - + response.IsSuccess.Should().BeFalse(); response.StatusCode.Should().Be(500); response.FailureKind.Should().Be(UploadFailureKind.ServerError); response.Exception.Should().BeSameAs(ex); } + + [Test] + public void Classify_HttpRequestExceptionWithConnectionReset_ShouldReturnClientNetworkError() + { + using var cts = new CancellationTokenSource(); + var socketException = new SocketException((int)SocketError.ConnectionReset); + var ioException = new IOException("transport closed", socketException); + var ex = new HttpRequestException("copy failed", ioException); + + var response = UploadFailureClassifier.Classify(ex, cts.Token); + + response.IsSuccess.Should().BeFalse(); + response.StatusCode.Should().Be(0); + response.FailureKind.Should().Be(UploadFailureKind.ClientNetworkError); + response.Exception.Should().BeSameAs(ex); + } + + [Test] + public void Classify_DirectSocketExceptionWithConnectionReset_ShouldReturnClientNetworkError() + { + using var cts = new CancellationTokenSource(); + var ex = new SocketException((int)SocketError.ConnectionReset); + + var response = UploadFailureClassifier.Classify(ex, cts.Token); + + response.IsSuccess.Should().BeFalse(); + response.StatusCode.Should().Be(0); + response.FailureKind.Should().Be(UploadFailureKind.ClientNetworkError); + response.Exception.Should().BeSameAs(ex); + } } diff --git a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/AdaptiveUploadControllerTests.cs b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/AdaptiveUploadControllerTests.cs index 4b00588c..1ddc2442 100644 --- a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/AdaptiveUploadControllerTests.cs +++ b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/AdaptiveUploadControllerTests.cs @@ -199,7 +199,22 @@ public void ClientTimeouts_DownscaleBelowInitialChunkSize_WhenAtMinParallelism() _controller.CurrentChunkSizeBytes.Should().BeLessThan(500 * 1024); _controller.CurrentChunkSizeBytes.Should().Be(375 * 1024); } - + + [Test] + public void ClientNetworkErrors_DownscaleBelowInitialChunkSize_WhenAtMinParallelism() + { + // Arrange + _controller.CurrentParallelism.Should().Be(2); + _controller.CurrentChunkSizeBytes.Should().Be(500 * 1024); + + // Act + FeedClientNetworkErrors(_controller, 2); + + // Assert + _controller.CurrentParallelism.Should().Be(2); + _controller.CurrentChunkSizeBytes.Should().Be(375 * 1024); + } + [Test] public void ClientTimeouts_ReduceParallelismFirst_WhenAboveMinParallelism() { @@ -280,7 +295,21 @@ private static void FeedClientTimeouts(AdaptiveUploadController controller, int failureKind: UploadFailureKind.ClientTimeout); } } - + + private static void FeedClientNetworkErrors(AdaptiveUploadController controller, int count) + { + for (var i = 0; i < count; i++) + { + RecordUploadResult( + controller, + TimeSpan.FromSeconds(90), + isSuccess: false, + partNumber: i + 1, + statusCode: 0, + failureKind: UploadFailureKind.ClientNetworkError); + } + } + private static void RecordUploadResult( AdaptiveUploadController controller, TimeSpan elapsed, diff --git a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileSlicerTests.cs b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileSlicerTests.cs index 52ebab6e..6de5bbf1 100644 --- a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileSlicerTests.cs +++ b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileSlicerTests.cs @@ -168,7 +168,57 @@ public async Task SliceAndEncryptAsync_WithNoSlices_ShouldCompleteNormally() // Channel should be completed _availableSlices.Reader.Completion.IsCompleted.Should().BeTrue(); } - + + [Test] + public async Task SliceAndEncryptAsync_WhenChannelClosesAfterSliceCreated_ShouldDisposeSlice() + { + // Arrange + var stream = new MemoryStream(new byte[128]); + var slice = new FileUploaderSlice(1, stream); + + _mockSlicerEncrypter.Setup(x => x.SliceAndEncrypt()) + .Callback(() => + { + _exceptionOccurred.Set(); + _availableSlices.Writer.TryComplete(); + }) + .ReturnsAsync(slice); + + // Act + await _fileSlicer.SliceAndEncryptAsync(_sharedFileDefinition, _progressState); + + // Assert + _progressState.TotalCreatedSlices.Should().Be(1); + var readDisposedStream = () => _ = stream.Length; + readDisposedStream.Should().Throw(); + } + + [Test] + public async Task SliceAndEncryptAdaptiveAsync_WhenChannelClosesAfterSliceCreated_ShouldDisposeSlice() + { + // Arrange + var stream = new MemoryStream(new byte[128]); + var slice = new FileUploaderSlice(1, stream); + + _mockAdaptiveController.Setup(x => x.CurrentChunkSizeBytes).Returns(64 * 1024); + _mockAdaptiveController.Setup(x => x.GetNextChunkSizeBytes()).Returns(64 * 1024); + _mockSlicerEncrypter.Setup(x => x.SliceAndEncrypt()) + .Callback(() => + { + _exceptionOccurred.Set(); + _availableSlices.Writer.TryComplete(); + }) + .ReturnsAsync(slice); + + // Act + await _fileSlicer.SliceAndEncryptAdaptiveAsync(_sharedFileDefinition, _progressState); + + // Assert + _progressState.TotalCreatedSlices.Should().Be(1); + var readDisposedStream = () => _ = stream.Length; + readDisposedStream.Should().Throw(); + } + [Test] public async Task SliceAndEncryptAsync_WithMultipleSlices_ShouldProcessAllSlices() { diff --git a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadCoordinatorTests.cs b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadCoordinatorTests.cs index 01fe5bec..c9ad5e40 100644 --- a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadCoordinatorTests.cs +++ b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadCoordinatorTests.cs @@ -177,6 +177,28 @@ public void AvailableSlices_ShouldAllowWritingAndReading() action.Should().NotThrowAsync(); } + [Test] + public async Task AvailableSlices_ShouldApplyTightBackpressure() + { + // Arrange + var first = new FileUploaderSlice(1, new MemoryStream()); + var second = new FileUploaderSlice(2, new MemoryStream()); + var third = new FileUploaderSlice(3, new MemoryStream()); + + // Act + await _coordinator.AvailableSlices.Writer.WriteAsync(first); + await _coordinator.AvailableSlices.Writer.WriteAsync(second); + var thirdWrite = _coordinator.AvailableSlices.Writer.WriteAsync(third).AsTask(); + + // Assert + thirdWrite.IsCompleted.Should().BeFalse(); + + var readSlice = await _coordinator.AvailableSlices.Reader.ReadAsync(); + readSlice.Should().Be(first); + + await thirdWrite.WaitAsync(TimeSpan.FromSeconds(1)); + } + [Test] public void MultipleSetExceptionCalls_ShouldNotThrow() { diff --git a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadWorkerTests.cs b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadWorkerTests.cs index 4f2fa3e4..a0a676a0 100644 --- a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadWorkerTests.cs +++ b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadWorkerTests.cs @@ -61,6 +61,8 @@ public void SetUp() _availableSlices = Channel.CreateBounded(8); _progressState = new UploadProgressState(); _mockAdaptiveController = new Mock(); + _mockAdaptiveController.Setup(x => x.CurrentChunkSizeBytes).Returns(500 * 1024); + _mockAdaptiveController.Setup(x => x.CurrentParallelism).Returns(2); _fileUploadWorker = new FileUploadWorker( _mockPolicyFactory.Object, @@ -341,4 +343,149 @@ public async Task UploadAvailableSlicesAdaptiveAsync_OnStrategyServerFailure_Sho result.FileId == _sharedFileDefinition.Id && result.FailureKind == UploadFailureKind.ServerError)), Times.AtLeastOnce); } -} \ No newline at end of file + + [Test] + public async Task UploadAvailableSlicesAdaptiveAsync_WhenClientTimeoutIsRetried_ShouldKeepFailureKindAndEventuallySucceed() + { + // Arrange + var slice = new FileUploaderSlice(1, new MemoryStream(new byte[625 * 1024])); + var mockUploadStrategy = new Mock(); + var mockUploadLocation = new FileStorageLocation("https://test.example.com/upload", StorageProvider.CloudflareR2); + var attempt = 0; + + _policy = Policy + .HandleResult(x => !x.IsSuccess) + .RetryAsync(1, onRetry: (_, _, _) => { }); + _mockPolicyFactory.Setup(x => x.BuildFileUploadPolicy()).Returns(_policy); + _mockAdaptiveController.Setup(x => x.CurrentChunkSizeBytes).Returns(64 * 1024); + + mockUploadStrategy.Setup(x => + x.UploadAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(() => + { + attempt++; + + return attempt == 1 + ? UploadFileResponse.ClientTimeout(new TaskCanceledException("attempt timed out")) + : UploadFileResponse.Success(200); + }); + + _mockStrategies.Setup(x => x[StorageProvider.CloudflareR2]).Returns(mockUploadStrategy.Object); + _mockFileTransferApiClient.Setup(x => x.GetUploadFileStorageLocation(It.IsAny())) + .ReturnsAsync(mockUploadLocation); + _mockFileTransferApiClient.Setup(x => x.AssertFilePartIsUploaded(It.IsAny())) + .Returns(Task.CompletedTask); + _progressState.TotalCreatedSlices = 1; + + await _availableSlices.Writer.WriteAsync(slice); + _availableSlices.Writer.Complete(); + + // Act + await _fileUploadWorker.UploadAvailableSlicesAdaptiveAsync(_availableSlices, _progressState); + + // Assert + attempt.Should().Be(2); + _uploadingIsFinished.WaitOne(1000).Should().BeTrue(); + _mockAdaptiveController.Verify(x => x.RecordUploadResult(It.Is(result => + !result.IsSuccess && + result.FailureKind == UploadFailureKind.ClientTimeout)), Times.Once); + _mockAdaptiveController.Verify(x => x.RecordUploadResult(It.Is(result => + result.IsSuccess && + result.FailureKind == UploadFailureKind.None)), Times.Once); + } + + [Test] + public async Task UploadAvailableSlicesAdaptiveAsync_WhenOneWorkerFails_ShouldCancelOtherWorkers() + { + // Arrange + var firstSlice = new FileUploaderSlice(1, new MemoryStream(new byte[64 * 1024])); + var secondSlice = new FileUploaderSlice(2, new MemoryStream(new byte[64 * 1024])); + var mockUploadStrategy = new Mock(); + var mockUploadLocation = new FileStorageLocation("https://test.example.com/upload", StorageProvider.CloudflareR2); + using var bothUploadsStarted = new CountdownEvent(2); + using var firstUploadCanFail = new ManualResetEventSlim(false); + using var secondUploadCanceled = new ManualResetEventSlim(false); + + mockUploadStrategy.Setup(x => + x.UploadAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (slice, _, cancellationToken) => + { + bothUploadsStarted.Signal(); + + if (slice.PartNumber == 1) + { + firstUploadCanFail.Wait(TimeSpan.FromSeconds(5), CancellationToken.None); + + return UploadFileResponse.Failure(500, "server failure"); + } + + firstUploadCanFail.Set(); + + try + { + await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); + + return UploadFileResponse.Success(200); + } + catch (OperationCanceledException ex) + { + secondUploadCanceled.Set(); + + return UploadFileResponse.ClientCancellation(ex); + } + }); + + _mockStrategies.Setup(x => x[StorageProvider.CloudflareR2]).Returns(mockUploadStrategy.Object); + _mockFileTransferApiClient.Setup(x => x.GetUploadFileStorageLocation(It.IsAny())) + .ReturnsAsync(mockUploadLocation); + _progressState.TotalCreatedSlices = 2; + + var firstWorker = _fileUploadWorker.UploadAvailableSlicesAdaptiveAsync(_availableSlices, _progressState); + var secondWorker = _fileUploadWorker.UploadAvailableSlicesAdaptiveAsync(_availableSlices, _progressState); + + await _availableSlices.Writer.WriteAsync(firstSlice); + await _availableSlices.Writer.WriteAsync(secondSlice); + _availableSlices.Writer.Complete(); + + // Act + bothUploadsStarted.Wait(TimeSpan.FromSeconds(2)).Should().BeTrue(); + var allWorkers = Task.WhenAll(firstWorker, secondWorker); + var completed = await Task.WhenAny(allWorkers, Task.Delay(TimeSpan.FromSeconds(3))); + + // Assert + completed.Should().Be(allWorkers); + secondUploadCanceled.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue(); + _exceptionOccurred.WaitOne(0).Should().BeTrue(); + _progressState.Exceptions.Should().HaveCount(1); + } + + [Test] + public async Task UploadAvailableSlicesAdaptiveAsync_WhenWorkerFails_ShouldDisposeQueuedSlices() + { + // Arrange + var activeSlice = new FileUploaderSlice(1, new MemoryStream(new byte[64 * 1024])); + var queuedStream = new MemoryStream(new byte[64 * 1024]); + var queuedSlice = new FileUploaderSlice(2, queuedStream); + var mockUploadStrategy = new Mock(); + var mockUploadLocation = new FileStorageLocation("https://test.example.com/upload", StorageProvider.CloudflareR2); + + mockUploadStrategy.Setup(x => + x.UploadAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(UploadFileResponse.Failure(500, "server failure")); + + _mockStrategies.Setup(x => x[StorageProvider.CloudflareR2]).Returns(mockUploadStrategy.Object); + _mockFileTransferApiClient.Setup(x => x.GetUploadFileStorageLocation(It.IsAny())) + .ReturnsAsync(mockUploadLocation); + + await _availableSlices.Writer.WriteAsync(activeSlice); + await _availableSlices.Writer.WriteAsync(queuedSlice); + _availableSlices.Writer.Complete(); + + // Act + await _fileUploadWorker.UploadAvailableSlicesAdaptiveAsync(_availableSlices, _progressState); + + // Assert + var readQueuedStream = () => _ = queuedStream.Length; + readQueuedStream.Should().Throw(); + } +} diff --git a/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/UploadAttemptTimeoutPolicyTests.cs b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/UploadAttemptTimeoutPolicyTests.cs new file mode 100644 index 00000000..decd47d9 --- /dev/null +++ b/tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/UploadAttemptTimeoutPolicyTests.cs @@ -0,0 +1,100 @@ +using ByteSync.Services.Communications.Transfers.Uploading; +using FluentAssertions; +using NUnit.Framework; + +namespace ByteSync.Client.UnitTests.Services.Communications.Transfers.Uploading; + +[TestFixture] +public class UploadAttemptTimeoutPolicyTests +{ + [Test] + public void ComputeTimeoutSeconds_FirstAttempt_ShouldUseFloorForSmallSlices() + { + // Act + var timeout = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + 500 * 1024, + attempt: 1, + currentChunkSizeBytes: 500 * 1024); + + // Assert + timeout.Should().Be(60); + } + + [Test] + public void ComputeTimeoutSeconds_RetryForStaleLargeSlice_ShouldIncreaseBudget() + { + // Act + var timeout = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + 625 * 1024, + attempt: 2, + currentChunkSizeBytes: 64 * 1024); + + // Assert + timeout.Should().Be(120); + } + + [Test] + public void ComputeTimeoutSeconds_RetryForCurrentChunkSizedSlice_ShouldGrowGradually() + { + // Act + var timeout = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + 64 * 1024, + attempt: 2, + currentChunkSizeBytes: 64 * 1024); + + // Assert + timeout.Should().Be(75); + } + + [Test] + public void ComputeTimeoutSeconds_ShouldNotExceedCeiling() + { + // Act + var timeout = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + 16 * 1024 * 1024, + attempt: 10, + currentChunkSizeBytes: 64 * 1024); + + // Assert + timeout.Should().Be(180); + } + + [Test] + public void ComputeTimeoutSeconds_ForLargeStaleSliceAtLowBandwidth_ShouldAllowMoreThanTwoMinutes() + { + // Act + var timeout = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + 1969 * 1024, + attempt: 6, + currentChunkSizeBytes: 500 * 1024); + + // Assert + timeout.Should().Be(150); + } + + [Test] + public void ComputeTimeoutSeconds_WithHugeSlice_ShouldNotOverflow() + { + // Act + var timeout = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + long.MaxValue, + attempt: 1, + currentChunkSizeBytes: 64 * 1024); + + // Assert + timeout.Should().Be(180); + } + + [Test] + public void ComputeTimeoutSeconds_WithHugeStaleRatio_ShouldNotOverflow() + { + // Act + var timeout = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds( + long.MaxValue, + attempt: 2, + currentChunkSizeBytes: 1); + + // Assert + timeout.Should().Be(180); + } +}