diff --git a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java index e5eaa41c3e..d7a1ef1031 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java @@ -17,6 +17,7 @@ package org.apache.fluss.client; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.admin.FlussAdmin; import org.apache.fluss.client.lookup.LookupClient; @@ -33,7 +34,9 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.IllegalConfigurationException; import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.S3FileSystemConfigUtils; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.GatewayClientProxy; @@ -45,6 +48,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import static org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX; @@ -52,6 +56,8 @@ /** A connection to Fluss cluster, and holds the client session resources. */ public final class FlussConnection implements Connection { + private static final String CLIENT_FS_PREFIX = CLIENT_PREFIX + "fs."; + private final Configuration conf; private final RpcClient rpcClient; private final MetadataUpdater metadataUpdater; @@ -72,10 +78,7 @@ public final class FlussConnection implements Connection { this.conf = conf; // init Filesystem with configuration from FlussConnection, // only pass options with 'client.fs.' prefix - FileSystem.initialize( - Configuration.fromMap( - extractAndRemovePrefix(new HashMap<>(conf.toMap()), CLIENT_PREFIX + "fs.")), - null); + FileSystem.initialize(getClientFileSystemConfiguration(conf), null); // for client metrics. setupClientMetricsConfiguration(); String clientId = conf.getString(ConfigOptions.CLIENT_ID); @@ -88,6 +91,26 @@ public final class FlussConnection implements Connection { this.writerClient = null; } + @VisibleForTesting + static Configuration getClientFileSystemConfiguration(Configuration conf) { + Map clientFsOptions = + extractAndRemovePrefix(new HashMap<>(conf.toMap()), CLIENT_FS_PREFIX); + validateClientFileSystemConfiguration(clientFsOptions); + return Configuration.fromMap(clientFsOptions); + } + + private static void validateClientFileSystemConfiguration(Map clientFsOptions) { + for (String key : clientFsOptions.keySet()) { + if (S3FileSystemConfigUtils.isCredentialConfigKey(key)) { + throw new IllegalConfigurationException( + "Client-side S3 credential configuration '%s' is not supported. " + + "Configure S3 credentials on Fluss servers and let Fluss clients " + + "use server-issued temporary filesystem tokens.", + CLIENT_FS_PREFIX + key); + } + } + } + @Override public Configuration getConfiguration() { return conf; diff --git a/fluss-client/src/test/java/org/apache/fluss/client/FlussConnectionTest.java b/fluss-client/src/test/java/org/apache/fluss/client/FlussConnectionTest.java new file mode 100644 index 0000000000..4a0d06f436 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/FlussConnectionTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.IllegalConfigurationException; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link FlussConnection}. */ +class FlussConnectionTest { + + @Test + void testAllowNonCredentialClientFileSystemOptions() { + Configuration conf = new Configuration(); + conf.setString("client.fs.s3a.endpoint", "http://localhost:9000"); + conf.setString("client.fs.s3a.region", "us-east-1"); + conf.setString("client.fs.s3a.path-style-access", "true"); + conf.setString("client.fs.oss.endpoint", "http://oss-local"); + conf.setString("client.test.key", "client-value"); + + Map clientFsOptions = + FlussConnection.getClientFileSystemConfiguration(conf).toMap(); + + assertThat(clientFsOptions) + .containsEntry("s3a.endpoint", "http://localhost:9000") + .containsEntry("s3a.region", "us-east-1") + .containsEntry("s3a.path-style-access", "true") + .containsEntry("oss.endpoint", "http://oss-local") + .doesNotContainKey("client.test.key"); + } + + @ParameterizedTest + @ValueSource( + strings = { + "client.fs.s3a.aws.credentials.provider", + "client.fs.s3a.access.key", + "client.fs.s3a.secret.key", + "client.fs.s3a.access-key", + "client.fs.s3a.secret-key", + "client.fs.s3a.session.token", + "client.fs.s3a.assumed.role.arn", + "client.fs.s3a.assumed.role.sts.endpoint", + "client.fs.s3.aws.credentials.provider", + "client.fs.s3.access.key", + "client.fs.s3.secret.key", + "client.fs.s3.secret-key", + "client.fs.fs.s3a.aws.credentials.provider", + "client.fs.fs.s3a.access.key", + "client.fs.fs.s3a.secret-key", + "client.fs.fs.s3a.assumed.role.arn" + }) + void testRejectCredentialBearingS3ClientFileSystemOptions(String key) { + Configuration conf = new Configuration(); + conf.setString(key, "test-value"); + + assertThatThrownBy(() -> FlussConnection.getClientFileSystemConfiguration(conf)) + .isInstanceOf(IllegalConfigurationException.class) + .hasMessageContaining(key) + .hasMessageContaining("Client-side S3 credential configuration") + .hasMessageContaining("Fluss servers"); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/fs/S3FileSystemConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/fs/S3FileSystemConfigUtils.java new file mode 100644 index 0000000000..26b42fb981 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/fs/S3FileSystemConfigUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs; + +import org.apache.fluss.annotation.Internal; + +import javax.annotation.Nullable; + +/** Utilities for converting and classifying S3 filesystem configuration keys. */ +@Internal +public final class S3FileSystemConfigUtils { + + public static final String ACCESS_KEY = "fs.s3a.access.key"; + public static final String ACCESS_KEY_ALIAS = "fs.s3a.access-key"; + public static final String SECRET_KEY = "fs.s3a.secret.key"; + public static final String SECRET_KEY_ALIAS = "fs.s3a.secret-key"; + public static final String SESSION_TOKEN = "fs.s3a.session.token"; + public static final String AWS_CREDENTIALS_PROVIDER = "fs.s3a.aws.credentials.provider"; + public static final String ROLE_ARN = "fs.s3a.assumed.role.arn"; + public static final String STS_ENDPOINT = "fs.s3a.assumed.role.sts.endpoint"; + public static final String ENDPOINT = "fs.s3a.endpoint"; + public static final String REGION = "fs.s3a.region"; + public static final String PATH_STYLE_ACCESS = "fs.s3a.path.style.access"; + public static final String PATH_STYLE_ACCESS_ALIAS = "fs.s3a.path-style-access"; + + private static final String HADOOP_CONFIG_PREFIX = "fs.s3a."; + private static final String ASSUMED_ROLE_PREFIX = "fs.s3a.assumed.role."; + private static final String[] FLUSS_CONFIG_PREFIXES = {"s3.", "s3a.", HADOOP_CONFIG_PREFIX}; + + /** + * Converts a Fluss S3 configuration key to the corresponding Hadoop S3A configuration key. + * + *

Supported Fluss prefixes are {@code s3.}, {@code s3a.}, and {@code fs.s3a.}. Unknown + * prefixes return {@code null}. + */ + @Nullable + public static String toHadoopConfigKey(String key) { + for (String prefix : FLUSS_CONFIG_PREFIXES) { + if (key.startsWith(prefix)) { + return HADOOP_CONFIG_PREFIX + key.substring(prefix.length()); + } + } + return null; + } + + /** + * Returns whether the given Fluss S3 configuration key carries client-side S3 credentials or + * credential provider settings. + */ + public static boolean isCredentialConfigKey(String key) { + String hadoopConfigKey = toHadoopConfigKey(key); + if (hadoopConfigKey == null) { + return false; + } + + return hadoopConfigKey.equals(ACCESS_KEY) + || hadoopConfigKey.equals(ACCESS_KEY_ALIAS) + || hadoopConfigKey.equals(SECRET_KEY) + || hadoopConfigKey.equals(SECRET_KEY_ALIAS) + || hadoopConfigKey.equals(SESSION_TOKEN) + || hadoopConfigKey.equals(AWS_CREDENTIALS_PROVIDER) + || hadoopConfigKey.startsWith(ASSUMED_ROLE_PREFIX); + } + + private S3FileSystemConfigUtils() {} +} diff --git a/fluss-common/src/test/java/org/apache/fluss/fs/S3FileSystemConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/fs/S3FileSystemConfigUtilsTest.java new file mode 100644 index 0000000000..dd3e481270 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/fs/S3FileSystemConfigUtilsTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link S3FileSystemConfigUtils}. */ +class S3FileSystemConfigUtilsTest { + + @Test + void testConvertFlussConfigKeyToHadoopConfigKey() { + assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("s3.endpoint")) + .isEqualTo(S3FileSystemConfigUtils.ENDPOINT); + assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("s3a.region")) + .isEqualTo(S3FileSystemConfigUtils.REGION); + assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("fs.s3a.path-style-access")) + .isEqualTo(S3FileSystemConfigUtils.PATH_STYLE_ACCESS_ALIAS); + assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("oss.endpoint")).isNull(); + } + + @Test + void testDetectCredentialConfigKey() { + assertThat( + Arrays.asList( + "s3a.aws.credentials.provider", + "s3a.access.key", + "s3a.secret.key", + "s3a.access-key", + "s3a.secret-key", + "s3a.session.token", + "s3a.assumed.role.arn", + "s3a.assumed.role.sts.endpoint", + "s3.aws.credentials.provider", + "s3.access.key", + "s3.secret.key", + "s3.secret-key", + "fs.s3a.aws.credentials.provider", + "fs.s3a.access.key", + "fs.s3a.secret-key", + "fs.s3a.assumed.role.arn")) + .allMatch(S3FileSystemConfigUtils::isCredentialConfigKey); + } + + @Test + void testAllowNonCredentialConfigKey() { + assertThat( + Arrays.asList( + "s3a.endpoint", + "s3a.region", + "s3a.path-style-access", + "s3a.path.style.access", + "oss.access.key")) + .noneMatch(S3FileSystemConfigUtils::isCredentialConfigKey); + } +} diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java index 92ed4615b3..b423b025bd 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FileSystemPlugin; +import org.apache.fluss.fs.S3FileSystemConfigUtils; import org.apache.fluss.fs.s3.token.S3ADelegationTokenReceiver; import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver; @@ -40,19 +41,10 @@ public class S3FileSystemPlugin implements FileSystemPlugin { private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemPlugin.class); - private static final String[] FLUSS_CONFIG_PREFIXES = {"s3.", "s3a.", "fs.s3a."}; - - private static final String HADOOP_CONFIG_PREFIX = "fs.s3a."; - - private static final String ACCESS_KEY_ID = "fs.s3a.access.key"; - private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key"; - - private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn"; - private static final String[][] MIRRORED_CONFIG_KEYS = { - {"fs.s3a.access-key", "fs.s3a.access.key"}, - {"fs.s3a.secret-key", "fs.s3a.secret.key"}, - {"fs.s3a.path-style-access", "fs.s3a.path.style.access"} + {S3FileSystemConfigUtils.ACCESS_KEY_ALIAS, S3FileSystemConfigUtils.ACCESS_KEY}, + {S3FileSystemConfigUtils.SECRET_KEY_ALIAS, S3FileSystemConfigUtils.SECRET_KEY}, + {S3FileSystemConfigUtils.PATH_STYLE_ACCESS_ALIAS, S3FileSystemConfigUtils.PATH_STYLE_ACCESS} }; @Override @@ -85,17 +77,17 @@ org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration flussC } for (String key : flussConfig.keySet()) { - for (String prefix : FLUSS_CONFIG_PREFIXES) { - if (key.startsWith(prefix)) { - String newKey = HADOOP_CONFIG_PREFIX + key.substring(prefix.length()); - String newValue = - flussConfig.getString( - ConfigBuilder.key(key).stringType().noDefaultValue(), null); - conf.set(newKey, newValue); - - LOG.debug( - "Adding Fluss config entry for {} as {} to Hadoop config", key, newKey); - } + String hadoopConfigKey = S3FileSystemConfigUtils.toHadoopConfigKey(key); + if (hadoopConfigKey != null) { + String newValue = + flussConfig.getString( + ConfigBuilder.key(key).stringType().noDefaultValue(), null); + conf.set(hadoopConfigKey, newValue); + + LOG.debug( + "Adding Fluss config entry for {} as {} to Hadoop config", + key, + hadoopConfigKey); } } return conf; @@ -131,9 +123,9 @@ private URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopCon private void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) { boolean hasStaticKeys = - hadoopConfig.get(ACCESS_KEY_ID) != null - && hadoopConfig.get(ACCESS_KEY_SECRET) != null; - boolean hasRoleArn = hadoopConfig.get(ROLE_ARN_KEY) != null; + hadoopConfig.get(S3FileSystemConfigUtils.ACCESS_KEY) != null + && hadoopConfig.get(S3FileSystemConfigUtils.SECRET_KEY) != null; + boolean hasRoleArn = hadoopConfig.get(S3FileSystemConfigUtils.ROLE_ARN) != null; if (hasStaticKeys || hasRoleArn) { LOG.info( diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java index 2a5ee6212a..bd392407a0 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java @@ -17,6 +17,7 @@ package org.apache.fluss.fs.s3.token; +import org.apache.fluss.fs.S3FileSystemConfigUtils; import org.apache.fluss.fs.token.CredentialsJsonSerde; import org.apache.fluss.fs.token.ObtainedSecurityToken; @@ -47,15 +48,15 @@ public class S3DelegationTokenProvider { private static final Logger LOG = LoggerFactory.getLogger(S3DelegationTokenProvider.class); - private static final String ACCESS_KEY_ID = "fs.s3a.access.key"; - private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key"; + private static final String ACCESS_KEY_ID = S3FileSystemConfigUtils.ACCESS_KEY; + private static final String ACCESS_KEY_SECRET = S3FileSystemConfigUtils.SECRET_KEY; - private static final String REGION_KEY = "fs.s3a.region"; - private static final String ENDPOINT_KEY = "fs.s3a.endpoint"; - private static final String PATH_STYLE_ACCESS_KEY = "fs.s3a.path.style.access"; + private static final String REGION_KEY = S3FileSystemConfigUtils.REGION; + private static final String ENDPOINT_KEY = S3FileSystemConfigUtils.ENDPOINT; + private static final String PATH_STYLE_ACCESS_KEY = S3FileSystemConfigUtils.PATH_STYLE_ACCESS; - private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn"; - private static final String STS_ENDPOINT_KEY = "fs.s3a.assumed.role.sts.endpoint"; + private static final String ROLE_ARN_KEY = S3FileSystemConfigUtils.ROLE_ARN; + private static final String STS_ENDPOINT_KEY = S3FileSystemConfigUtils.STS_ENDPOINT; private final String scheme; private final String region; diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java index e391f92ed5..58e406f8a6 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java @@ -17,6 +17,7 @@ package org.apache.fluss.fs.s3.token; +import org.apache.fluss.fs.S3FileSystemConfigUtils; import org.apache.fluss.fs.token.Credentials; import org.apache.fluss.fs.token.CredentialsJsonSerde; import org.apache.fluss.fs.token.ObtainedSecurityToken; @@ -35,7 +36,8 @@ /** Security token receiver for S3 filesystem. */ public class S3DelegationTokenReceiver implements SecurityTokenReceiver { - public static final String PROVIDER_CONFIG_NAME = "fs.s3a.aws.credentials.provider"; + public static final String PROVIDER_CONFIG_NAME = + S3FileSystemConfigUtils.AWS_CREDENTIALS_PROVIDER; private static final Logger LOG = LoggerFactory.getLogger(S3DelegationTokenReceiver.class); diff --git a/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java b/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java index 3b5a445a13..e49732732e 100644 --- a/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java +++ b/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java @@ -18,6 +18,7 @@ package org.apache.fluss.fs.s3; import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.S3FileSystemConfigUtils; import org.apache.fluss.fs.s3.token.DynamicTemporaryAWSCredentialsProvider; import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver; import org.apache.fluss.fs.token.Credentials; @@ -35,6 +36,26 @@ class S3FileSystemPluginTest { private static final String PROVIDER_CONFIG = "fs.s3a.aws.credentials.provider"; + @Test + void testConvertFlussS3ConfigPrefixesToHadoopConfig() { + Configuration flussConfig = new Configuration(); + flussConfig.setString("s3.endpoint", "http://localhost:9000"); + flussConfig.setString("s3a.region", "us-east-1"); + flussConfig.setString("fs.s3a.path-style-access", "true"); + flussConfig.setString("oss.endpoint", "http://oss-local"); + + S3FileSystemPlugin plugin = new S3FileSystemPlugin(); + org.apache.hadoop.conf.Configuration hadoopConfig = + plugin.getHadoopConfiguration(flussConfig); + + assertThat(hadoopConfig.get(S3FileSystemConfigUtils.ENDPOINT)) + .isEqualTo("http://localhost:9000"); + assertThat(hadoopConfig.get(S3FileSystemConfigUtils.REGION)).isEqualTo("us-east-1"); + assertThat(hadoopConfig.get(S3FileSystemConfigUtils.PATH_STYLE_ACCESS_ALIAS)) + .isEqualTo("true"); + assertThat(hadoopConfig.get("fs.s3a.oss.endpoint")).isNull(); + } + @Test void testServerModeWithStaticKeys() { Configuration flussConfig = new Configuration();