From d05c4662f41a80ea235b45d40a1af4c68e502c78 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Mon, 10 Feb 2025 16:28:24 +0000 Subject: [PATCH] Add error details provider for Amazon S3 plugin --- pom.xml | 28 ++++++++- .../s3/common/AmazonErrorDetailsProvider.java | 62 +++++++++++++++++++ .../cdap/plugin/aws/s3/sink/S3BatchSink.java | 6 ++ .../plugin/aws/s3/source/S3BatchSource.java | 6 ++ 4 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/aws/s3/common/AmazonErrorDetailsProvider.java diff --git a/pom.xml b/pom.xml index 611d08d..0240608 100644 --- a/pom.xml +++ b/pom.xml @@ -278,6 +278,17 @@ httpclient ${httpclient.version} + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-bundle + + + com.amazonaws aws-java-sdk-s3 @@ -288,6 +299,11 @@ aws-java-sdk-sts ${aws.sdk.version} + + org.apache.commons + commons-lang3 + 3.12.0 + @@ -310,7 +326,11 @@ true - <_exportcontents>io.cdap.plugin.aws.s3.* + <_exportcontents> + io.cdap.plugin.aws.s3.*; + org.apache.hadoop.fs.s3a.*; + org.apache.hadoop.fs.s3native.*; + *;inline=false;scope=compile true lib @@ -492,7 +512,11 @@ true - <_exportcontents>io.cdap.plugin.aws.s3.* + <_exportcontents> + io.cdap.plugin.aws.s3.*; + org.apache.hadoop.fs.s3a.*; + org.apache.hadoop.fs.s3native.*; + *;inline=false;scope=compile true lib diff --git a/src/main/java/io/cdap/plugin/aws/s3/common/AmazonErrorDetailsProvider.java b/src/main/java/io/cdap/plugin/aws/s3/common/AmazonErrorDetailsProvider.java new file mode 100644 index 0000000..dd8d32a --- /dev/null +++ b/src/main/java/io/cdap/plugin/aws/s3/common/AmazonErrorDetailsProvider.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2025 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.aws.s3.common; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.google.common.base.Throwables; +import io.cdap.cdap.api.exception.ErrorCategory; +import io.cdap.cdap.api.exception.ErrorCodeType; +import io.cdap.cdap.api.exception.ErrorType; +import io.cdap.cdap.api.exception.ErrorUtils; +import io.cdap.cdap.api.exception.ErrorUtils.ActionErrorPair; +import io.cdap.cdap.api.exception.ProgramFailureException; +import io.cdap.cdap.etl.api.exception.ErrorContext; +import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider; +import java.util.List; + +/** + * Error details provided for the Amazon S3 + **/ +public final class AmazonErrorDetailsProvider implements ErrorDetailsProvider { + + static final String S3_EXTERNAL_DOC = + "https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html"; + + @Override + public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) { + List causalChain = Throwables.getCausalChain(e); + for (Throwable t : causalChain) { + if (t instanceof ProgramFailureException) { + // if causal chain already has program failure exception, return null to avoid double wrap. + return null; + } + if (t instanceof AmazonS3Exception) { + AmazonS3Exception amazonServiceException = (AmazonS3Exception) t; + int statusCode = amazonServiceException.getStatusCode(); + ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode); + String errorReason = String.format("%s %s. %s. For more details, see %s", statusCode, + amazonServiceException.getErrorMessage(), pair.getCorrectiveAction(), S3_EXTERNAL_DOC); + return ErrorUtils.getProgramFailureException(new ErrorCategory( + ErrorCategory.ErrorCategoryEnum.PLUGIN, amazonServiceException.getErrorCode()), + errorReason, amazonServiceException.getMessage(), ErrorType.USER, true, + ErrorCodeType.HTTP, String.valueOf(statusCode), S3_EXTERNAL_DOC, t); + } + } + return null; + } +} diff --git a/src/main/java/io/cdap/plugin/aws/s3/sink/S3BatchSink.java b/src/main/java/io/cdap/plugin/aws/s3/sink/S3BatchSink.java index 169b15c..9a525f6 100644 --- a/src/main/java/io/cdap/plugin/aws/s3/sink/S3BatchSink.java +++ b/src/main/java/io/cdap/plugin/aws/s3/sink/S3BatchSink.java @@ -31,6 +31,7 @@ import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.plugin.aws.s3.common.AmazonErrorDetailsProvider; import io.cdap.plugin.aws.s3.common.S3ConnectorConfig; import io.cdap.plugin.aws.s3.common.S3Constants; import io.cdap.plugin.aws.s3.common.S3Path; @@ -87,6 +88,11 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) { return new LineageRecorder(context, asset); } + @Override + protected String getErrorDetailsProviderClassName() { + return AmazonErrorDetailsProvider.class.getName(); + } + @Override protected Map getFileSystemProperties(BatchSinkContext context) { // when context is null, it is configure time, by that time, it will always use s3n diff --git a/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java b/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java index 8e14f26..c14e1e2 100644 --- a/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java +++ b/src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java @@ -30,6 +30,7 @@ import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.plugin.aws.s3.common.AmazonErrorDetailsProvider; import io.cdap.plugin.aws.s3.common.S3ConnectorConfig; import io.cdap.plugin.aws.s3.common.S3Constants; import io.cdap.plugin.aws.s3.common.S3EmptyInputFormat; @@ -89,6 +90,11 @@ public void prepareRun(BatchSourceContext context) throws Exception { super.prepareRun(context); } + @Override + protected String getErrorDetailsProviderClassName() { + return AmazonErrorDetailsProvider.class.getName(); + } + @Override protected LineageRecorder getLineageRecorder(BatchSourceContext context) { return new LineageRecorder(context, asset);