Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 85 additions & 61 deletions api/src/main/java/io/minio/BaseS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.collect.ImmutableSet;
import io.minio.credentials.Credentials;
import io.minio.credentials.Provider;
import io.minio.errors.ErrorResponseException;
Expand Down Expand Up @@ -53,9 +52,8 @@
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
Expand All @@ -64,9 +62,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Interceptor;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
Expand Down Expand Up @@ -100,17 +100,13 @@ public abstract class BaseS3Client implements AutoCloseable {
.build();

private static final String RETRY_HEAD = "RetryHead";
private static final String END_HTTP = "----------END-HTTP----------";
private static final String UPLOAD_ID = "uploadId";
private static final Set<String> TRACE_QUERY_PARAMS =
ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes");
private PrintWriter traceStream;
private volatile PrintWriter traceStream;
protected final Map<String, String> regionCache = new ConcurrentHashMap<>();
protected String userAgent = Utils.getDefaultUserAgent();

protected Http.BaseUrl baseUrl;
protected Provider provider;
protected OkHttpClient httpClient;
protected volatile OkHttpClient httpClient;
protected boolean closeHttpClient;

protected BaseS3Client(
Expand All @@ -137,6 +133,45 @@ public void close() {
}
}

private static int getStatusRetryInterceptorIndex(List<Interceptor> interceptors) {
return IntStream.range(0, interceptors.size())
.filter(i -> interceptors.get(i) instanceof Http.StatusRetryInterceptor)
.findFirst()
.orElse(-1);
}

/**
* Sets request retry parameters. Any null/invalid values disable retry.
*
* <pre>Example:{@code
* minioClient.setRetry(ImmutableSet.of(408, 504), 250, 3);
* }</pre>
*
* @param retryStatusCodes HTTP status codes to be retried.
* @param delayMs Delay between retries.
* @param maxRetries Maximum number of retry attempts.
*/
public synchronized void setRetry(
Set<Integer> retryStatusCodes, Long delayMs, Integer maxRetries) {
Interceptor interceptor =
new Http.StatusRetryInterceptor(
retryStatusCodes, delayMs == null ? 0 : delayMs, maxRetries == null ? 0 : maxRetries);

List<Interceptor> interceptors = this.httpClient.interceptors();
int i = getStatusRetryInterceptorIndex(interceptors);
OkHttpClient.Builder builder = this.httpClient.newBuilder();
if (i >= 0) {
builder.interceptors().clear();
for (int j = 0; j < interceptors.size(); j++) {
builder.addInterceptor(i == j ? interceptor : interceptors.get(j));
}
} else {
builder.addInterceptor(interceptor);
}

this.httpClient = builder.build();
Comment thread
allanrogerr marked this conversation as resolved.
}

/**
* Sets HTTP connect, write and read timeouts. A value of 0 means no timeout, otherwise values
* must be between 1 and Integer.MAX_VALUE when converted to milliseconds.
Expand Down Expand Up @@ -268,25 +303,48 @@ private String[] handleRedirectResponse(
return new String[] {code, message};
}

private OkHttpClient getHttpClient(PrintWriter traceStream, Http.S3Request s3request) {
if (traceStream == null) return this.httpClient;

OkHttpClient httpClient = this.httpClient;
List<Interceptor> interceptors = httpClient.interceptors();
Comment thread
allanrogerr marked this conversation as resolved.
int i = getStatusRetryInterceptorIndex(interceptors);
Http.StatusRetryInterceptor interceptor =
i < 0 ? null : (Http.StatusRetryInterceptor) interceptors.get(i);

OkHttpClient.Builder builder = httpClient.newBuilder();
if (interceptor == null) {
builder.addInterceptor(
new Http.StatusRetryInterceptor(interceptor, traceStream, s3request.object() == null));
} else {
builder.interceptors().clear();
for (int j = 0; j < interceptors.size(); j++) {
if (i == j) {
builder.addInterceptor(
new Http.StatusRetryInterceptor(
interceptor, traceStream, s3request.object() == null));
} else {
builder.addInterceptor(interceptors.get(j));
}
}
}

return builder.build();
}

/** Execute HTTP request asynchronously for given parameters. */
protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, String region) {
Credentials credentials = (provider == null) ? null : provider.fetch();
Http.Request request = null;
PrintWriter traceStream = this.traceStream;
try {
request = s3request.toRequest(baseUrl, region, credentials);
} catch (MinioException e) {
return Utils.failedFuture(e);
}

OkHttpClient httpClient = getHttpClient(traceStream, s3request);
StringBuilder traceBuilder = new StringBuilder(request.httpTraces());
PrintWriter traceStream = this.traceStream;
if (traceStream != null) traceStream.print(request.httpTraces());

OkHttpClient httpClient = this.httpClient;
// FIXME: enable retry for all request.
// if (!s3request.retryFailure()) {
// httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build();
// }

okhttp3.Request httpRequest = request.httpRequest();
CompletableFuture<Response> completableFuture = newCompleteableFuture();
Expand All @@ -309,57 +367,23 @@ public void onResponse(Call call, final Response response) throws IOException {
}

private void onResponse(final Response response) throws IOException {
String trace =
String.format(
"%s %d %s%n%s",
response.protocol().toString().toUpperCase(Locale.US),
response.code(),
response.message(),
response.headers().toString());
if (!trace.endsWith("\n\n")) {
trace += trace.endsWith("\n") ? "\n" : "\n\n";
}
traceBuilder.append(trace);
if (traceStream != null) traceStream.print(trace);

String traces =
Http.getResponseTraces(
response,
s3request.method(),
s3request.queryParams(),
s3request.object() == null);
if (response.isSuccessful()) {
if (traceStream != null) {
// Trace response body only if the request is not
// GetObject/ListenBucketNotification
// S3 API.
Set<String> keys = s3request.queryParams().keySet();
if ((s3request.method() != Http.Method.GET
|| s3request.object() == null
|| !Collections.disjoint(keys, TRACE_QUERY_PARAMS))
&& !(keys.contains("events")
&& (keys.contains("prefix") || keys.contains("suffix")))) {
String responseBody = response.peekBody(1024 * 1024).string();
traceStream.print(responseBody);
if (!responseBody.endsWith("\n")) traceStream.println();
}
traceStream.println(END_HTTP);
}

completableFuture.complete(response);
return;
}

traceBuilder.append(traces);
String errorXml = null;
try (ResponseBody responseBody = response.body()) {
errorXml = responseBody.string();
}

if (!("".equals(errorXml) && s3request.method().equals(Http.Method.HEAD))) {
traceBuilder.append(errorXml);
if (traceStream != null) traceStream.print(errorXml);
if (!errorXml.endsWith("\n")) {
traceBuilder.append("\n");
if (traceStream != null) traceStream.println();
}
}
traceBuilder.append(END_HTTP).append("\n");
if (traceStream != null) traceStream.println(END_HTTP);

// Error out for Non-XML response from server for non-HEAD requests.
String contentType = response.headers().get(Http.Headers.CONTENT_TYPE);
if (!s3request.method().equals(Http.Method.HEAD)
Expand Down Expand Up @@ -635,7 +659,7 @@ protected void checkArgs(BaseArgs args) {
public CompletableFuture<AbortMultipartUploadResponse> abortMultipartUpload(
AbortMultipartUploadArgs args) {
checkArgs(args);
return executeDeleteAsync(args, null, new Http.QueryParameters(UPLOAD_ID, args.uploadId()))
return executeDeleteAsync(args, null, new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId()))
.thenApply(
response -> {
try {
Expand Down Expand Up @@ -672,7 +696,7 @@ public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
return executePostAsync(
args,
args.ssec() == null ? null : args.ssec().headers(),
new Http.QueryParameters(UPLOAD_ID, args.uploadId()),
new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId()),
body)
.thenApply(
response -> {
Expand Down Expand Up @@ -1191,7 +1215,7 @@ public CompletableFuture<ListObjectVersionsResponse> listObjectVersions(
public CompletableFuture<ListPartsResponse> listParts(ListPartsArgs args) {
Http.QueryParameters queryParams =
new Http.QueryParameters(
UPLOAD_ID,
Http.UPLOAD_ID,
args.uploadId(),
"max-parts",
(args.maxParts() != null) ? args.maxParts().toString() : "1000");
Expand Down
Loading
Loading