Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
try
{
var options = new BlobClientOptions();
options.Retry.NetworkTimeout = TimeSpan.FromMinutes(1);
options.Retry.NetworkTimeout = TimeSpan.FromMinutes(10);

slice.MemoryStream.Position = 0;
var blob = new BlobClient(new Uri(storageLocation.Url), options);
var response = await blob.UploadAsync(slice.MemoryStream, cancellationToken);

_logger.LogDebug("UploadAvailableSlice: slice {PartNumber} is uploaded", slice.PartNumber);

Check warning on line 30 in src/ByteSync.Client/Services/Communications/Transfers/Strategies/AzureBlobStorageUploadStrategy.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoNSnELRW8fhTUaz&open=AZ3McoNSnELRW8fhTUaz&pullRequest=292

var rawResponse = response.GetRawResponse();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
slice.MemoryStream.Position = 0;

using var httpClient = _httpClientFactory.CreateClient();
httpClient.Timeout = TimeSpan.FromMinutes(1);
httpClient.Timeout = Timeout.InfiniteTimeSpan;
httpClient.DefaultRequestHeaders.ExpectContinue = false;

// Build ReadOnlyMemory without copying when possible; fallback to ToArray otherwise
Expand All @@ -54,8 +54,8 @@
};
request.Headers.ExpectContinue = false;

_logger.LogDebug("R2 PUT start: part {Part} sizeKB {SizeKb} host {Host}",
slice.PartNumber, Math.Round(rom.Length / 1024d), new Uri(storageLocation.Url).Host);

Check warning on line 58 in src/ByteSync.Client/Services/Communications/Transfers/Strategies/CloudflareR2UploadStrategy.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoM9nELRW8fhTUay&open=AZ3McoM9nELRW8fhTUay&pullRequest=292

using var response = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
using System.IO;
using System.Net.Http;
using System.Net.Sockets;
using System.Threading;
using ByteSync.Common.Business.Communications.Transfers;

Expand All @@ -12,12 +15,44 @@ public static UploadFileResponse Classify(Exception exception, CancellationToken
{
return UploadFileResponse.ClientCancellation(exception);
}

if (exception is OperationCanceledException)
{
return UploadFileResponse.ClientTimeout(exception);
}

if (IsClientNetworkError(exception))
{
return UploadFileResponse.ClientNetworkError(exception);
}

return UploadFileResponse.Failure(500, exception);
}

private static bool IsClientNetworkError(Exception exception)
{
if (exception is not HttpRequestException and not IOException and not SocketException)
{
return false;
}

var current = exception;
while (current != null)
{
if (current is SocketException socketException)
{
return socketException.SocketErrorCode is SocketError.ConnectionReset
or SocketError.ConnectionAborted
or SocketError.TimedOut
or SocketError.NetworkDown
or SocketError.NetworkUnreachable
or SocketError.HostDown
or SocketError.HostUnreachable;
}

current = current.InnerException;
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
private const int MAX_CHUNK_SIZE_BYTES = 16 * 1024 * 1024; // 16 MB
private const int MIN_PARALLELISM = 2;
private const int MAX_PARALLELISM = 4;
private const int CLIENT_TIMEOUTS_BEFORE_DOWNSCALE = 2;
private const int CLIENT_NETWORK_ISSUES_BEFORE_DOWNSCALE = 2;

private const double MULTIPLIER_2_X = 2.0;
private const double MULTIPLIER_1_75_X = 1.75;
Expand All @@ -37,7 +37,7 @@
private readonly Queue<long> _recentBytes;
private int _successesInWindow;
private int _windowSize;
private int _consecutiveClientTimeouts;
private int _consecutiveClientNetworkIssues;
private readonly ILogger<AdaptiveUploadController> _logger;
private readonly object _syncRoot = new();

Expand Down Expand Up @@ -90,17 +90,17 @@
{
if (uploadResult.FailureKind == UploadFailureKind.ClientCancellation)
{
_consecutiveClientTimeouts = 0;
_consecutiveClientNetworkIssues = 0;
return;
}

if (uploadResult.FailureKind == UploadFailureKind.ClientTimeout)
if (uploadResult.FailureKind is UploadFailureKind.ClientTimeout or UploadFailureKind.ClientNetworkError)
{
HandleClientTimeout(uploadResult.FileId);
HandleClientNetworkIssue(uploadResult.FileId, uploadResult.FailureKind);
return;
}

_consecutiveClientTimeouts = 0;
_consecutiveClientNetworkIssues = 0;

EnqueueSample(uploadResult.Elapsed, uploadResult.IsSuccess, uploadResult.ActualBytes);

Expand All @@ -116,13 +116,13 @@

var maxElapsed = GetMaxElapsedInWindow();

_logger.LogDebug(
"Adaptive: file {FileId} maxElapsed={MaxElapsedMs} ms, window={Window}, parallelism={Parallelism}, chunkSize={ChunkKb} KB",
uploadResult.FileId ?? "-",
maxElapsed.TotalMilliseconds,
_windowSize,
_currentParallelism,
Math.Round(_currentChunkSizeBytes / 1024d));

Check warning on line 125 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMBnELRW8fhTUaq&open=AZ3McoMBnELRW8fhTUaq&pullRequest=292

if (TryHandleDownscale(maxElapsed, uploadResult.FileId))
{
Expand Down Expand Up @@ -162,26 +162,27 @@
}
}

private void HandleClientTimeout(string? fileId)
private void HandleClientNetworkIssue(string? fileId, UploadFailureKind failureKind)
{
_consecutiveClientTimeouts += 1;
if (_consecutiveClientTimeouts < CLIENT_TIMEOUTS_BEFORE_DOWNSCALE)
_consecutiveClientNetworkIssues += 1;
if (_consecutiveClientNetworkIssues < CLIENT_NETWORK_ISSUES_BEFORE_DOWNSCALE)
{
_logger.LogDebug(
"Adaptive: file {FileId} client timeout {TimeoutCount}/{Threshold}. Waiting before downscale",
"Adaptive: file {FileId} client network issue {FailureKind} {IssueCount}/{Threshold}. Waiting before downscale",
fileId ?? "-",
_consecutiveClientTimeouts,
CLIENT_TIMEOUTS_BEFORE_DOWNSCALE);
failureKind,
_consecutiveClientNetworkIssues,
CLIENT_NETWORK_ISSUES_BEFORE_DOWNSCALE);

Check warning on line 175 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMCnELRW8fhTUar&open=AZ3McoMCnELRW8fhTUar&pullRequest=292

return;
}

_logger.LogInformation(
"Adaptive: file {FileId} client timeout threshold reached ({TimeoutCount}). Downscaling upload settings",
"Adaptive: file {FileId} client network issue threshold reached ({IssueCount}). Downscaling upload settings",
fileId ?? "-",
_consecutiveClientTimeouts);
_consecutiveClientTimeouts = 0;
Downscale(fileId, "client timeouts");
_consecutiveClientNetworkIssues);

Check warning on line 183 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMCnELRW8fhTUas&open=AZ3McoMCnELRW8fhTUas&pullRequest=292
_consecutiveClientNetworkIssues = 0;
Downscale(fileId, "client network issues");
}

private bool HandleBandwidthReset(bool isSuccess, int? statusCode)
Expand Down Expand Up @@ -231,13 +232,13 @@
{
if (_currentParallelism > MIN_PARALLELISM)
{
_logger.LogInformation(
"Adaptive: file {FileId} Downscale ({Reason}). Reducing parallelism {Prev} -> {Next}. Resetting window (window before {WindowBefore})",
fileId ?? "-",
reason,
_currentParallelism,
_currentParallelism - 1,
_windowSize);

Check warning on line 241 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMCnELRW8fhTUat&open=AZ3McoMCnELRW8fhTUat&pullRequest=292
_currentParallelism -= 1;
_windowSize = _currentParallelism;
ResetWindow();
Expand All @@ -249,11 +250,11 @@
if (reduced != _currentChunkSizeBytes)
{
_currentChunkSizeBytes = reduced;
_logger.LogInformation(
"Adaptive: file {FileId} Downscale ({Reason}). New chunkSize={ChunkKb} KB",
fileId ?? "-",
reason,
Math.Round(_currentChunkSizeBytes / 1024d));

Check warning on line 257 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMCnELRW8fhTUau&open=AZ3McoMCnELRW8fhTUau&pullRequest=292
}

ResetWindow();
Expand Down Expand Up @@ -306,12 +307,12 @@
var increased = (int)Math.Round(_currentChunkSizeBytes * multiplier);
_currentChunkSizeBytes = Math.Clamp(increased, MIN_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES);

_logger.LogInformation(
"Adaptive: file {FileId} Upscale. maxElapsed={MaxElapsedMs} ms <= {ThresholdMs} ms. New chunkSize={ChunkKb} KB",
fileId ?? "-",
maxElapsedEligible.TotalMilliseconds,
_upscaleThreshold.TotalMilliseconds,
Math.Round(_currentChunkSizeBytes / 1024d));

Check warning on line 315 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMCnELRW8fhTUax&open=AZ3McoMCnELRW8fhTUax&pullRequest=292

UpdateParallelismOnUpscale(fileId);
_currentParallelism = Math.Min(_currentParallelism, MAX_PARALLELISM);
Expand Down Expand Up @@ -349,8 +350,8 @@
_currentParallelism = Math.Max(_currentParallelism, 4);
if (_currentParallelism != prev)
{
_logger.LogInformation("Adaptive: file {FileId} Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=8MB",
fileId ?? "-", prev, _currentParallelism);

Check warning on line 354 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMCnELRW8fhTUav&open=AZ3McoMCnELRW8fhTUav&pullRequest=292
}
}
else if (_currentChunkSizeBytes >= FOUR_MB)
Expand All @@ -359,8 +360,8 @@
_currentParallelism = Math.Max(_currentParallelism, 3);
if (_currentParallelism != prev)
{
_logger.LogInformation("Adaptive: file {FileId} Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=4MB",
fileId ?? "-", prev, _currentParallelism);

Check warning on line 364 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3McoMCnELRW8fhTUaw&open=AZ3McoMCnELRW8fhTUaw&pullRequest=292
}
}
}
Expand Down Expand Up @@ -395,7 +396,7 @@
_currentChunkSizeBytes = Math.Clamp(INITIAL_CHUNK_SIZE_BYTES, MIN_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES);
_currentParallelism = MIN_PARALLELISM;
_windowSize = _currentParallelism;
_consecutiveClientTimeouts = 0;
_consecutiveClientNetworkIssues = 0;
}

ResetWindow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ public async Task SliceAndEncryptAsync(SharedFileDefinition sharedFileDefinition
_semaphoreSlim.Release();
}

await _availableSlices.Writer.WriteAsync(fileUploaderSlice);
if (!await TryWriteSliceAsync(fileUploaderSlice))
{
return;
}
}
else
{
Expand Down Expand Up @@ -126,7 +129,10 @@ public async Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDe
_semaphoreSlim.Release();
}

await _availableSlices.Writer.WriteAsync(fileUploaderSlice);
if (!await TryWriteSliceAsync(fileUploaderSlice))
{
return;
}
}
else
{
Expand All @@ -152,4 +158,20 @@ public async Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDe
_availableSlices.Writer.TryComplete(ex);
}
}

private async Task<bool> TryWriteSliceAsync(FileUploaderSlice fileUploaderSlice)
{
try
{
await _availableSlices.Writer.WriteAsync(fileUploaderSlice);

return true;
}
catch (ChannelClosedException) when (_exceptionOccurred.WaitOne(0))
{
await fileUploaderSlice.MemoryStream.DisposeAsync();

return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class FileUploadCoordinator : IFileUploadCoordinator
private readonly ManualResetEvent _uploadingIsFinished;
private readonly ManualResetEvent _exceptionOccurred;
private readonly ILogger<FileUploadCoordinator> _logger;
private const int CHANNEL_CAPACITY = 8;
private const int CHANNEL_CAPACITY = 2;

public FileUploadCoordinator(ILogger<FileUploadCoordinator> logger)
{
Expand Down
Loading
Loading