Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.client;

import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.admin.FlussAdmin;
import org.apache.fluss.client.lookup.LookupClient;
Expand All @@ -33,7 +34,9 @@
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.S3FileSystemConfigUtils;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.GatewayClientProxy;
Expand All @@ -45,13 +48,16 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static org.apache.fluss.client.utils.MetadataUtils.getOneAvailableTabletServerNode;
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;

/** A connection to Fluss cluster, and holds the client session resources. */
public final class FlussConnection implements Connection {
private static final String CLIENT_FS_PREFIX = CLIENT_PREFIX + "fs.";

private final Configuration conf;
private final RpcClient rpcClient;
private final MetadataUpdater metadataUpdater;
Expand All @@ -72,10 +78,7 @@ public final class FlussConnection implements Connection {
this.conf = conf;
// init Filesystem with configuration from FlussConnection,
// only pass options with 'client.fs.' prefix
FileSystem.initialize(
Configuration.fromMap(
extractAndRemovePrefix(new HashMap<>(conf.toMap()), CLIENT_PREFIX + "fs.")),
null);
FileSystem.initialize(getClientFileSystemConfiguration(conf), null);
// for client metrics.
setupClientMetricsConfiguration();
String clientId = conf.getString(ConfigOptions.CLIENT_ID);
Expand All @@ -88,6 +91,26 @@ public final class FlussConnection implements Connection {
this.writerClient = null;
}

@VisibleForTesting
static Configuration getClientFileSystemConfiguration(Configuration conf) {
Map<String, String> clientFsOptions =
extractAndRemovePrefix(new HashMap<>(conf.toMap()), CLIENT_FS_PREFIX);
validateClientFileSystemConfiguration(clientFsOptions);
return Configuration.fromMap(clientFsOptions);
}

private static void validateClientFileSystemConfiguration(Map<String, String> clientFsOptions) {
for (String key : clientFsOptions.keySet()) {
if (S3FileSystemConfigUtils.isCredentialConfigKey(key)) {
throw new IllegalConfigurationException(
"Client-side S3 credential configuration '%s' is not supported. "
+ "Configure S3 credentials on Fluss servers and let Fluss clients "
+ "use server-issued temporary filesystem tokens.",
CLIENT_FS_PREFIX + key);
}
}
}

@Override
public Configuration getConfiguration() {
return conf;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client;

import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.IllegalConfigurationException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link FlussConnection}. */
class FlussConnectionTest {

@Test
void testAllowNonCredentialClientFileSystemOptions() {
Configuration conf = new Configuration();
conf.setString("client.fs.s3a.endpoint", "http://localhost:9000");
conf.setString("client.fs.s3a.region", "us-east-1");
conf.setString("client.fs.s3a.path-style-access", "true");
conf.setString("client.fs.oss.endpoint", "http://oss-local");
conf.setString("client.test.key", "client-value");

Map<String, String> clientFsOptions =
FlussConnection.getClientFileSystemConfiguration(conf).toMap();

assertThat(clientFsOptions)
.containsEntry("s3a.endpoint", "http://localhost:9000")
.containsEntry("s3a.region", "us-east-1")
.containsEntry("s3a.path-style-access", "true")
.containsEntry("oss.endpoint", "http://oss-local")
.doesNotContainKey("client.test.key");
}

@ParameterizedTest
@ValueSource(
strings = {
"client.fs.s3a.aws.credentials.provider",
"client.fs.s3a.access.key",
"client.fs.s3a.secret.key",
"client.fs.s3a.access-key",
"client.fs.s3a.secret-key",
"client.fs.s3a.session.token",
"client.fs.s3a.assumed.role.arn",
"client.fs.s3a.assumed.role.sts.endpoint",
"client.fs.s3.aws.credentials.provider",
"client.fs.s3.access.key",
"client.fs.s3.secret.key",
"client.fs.s3.secret-key",
"client.fs.fs.s3a.aws.credentials.provider",
"client.fs.fs.s3a.access.key",
"client.fs.fs.s3a.secret-key",
"client.fs.fs.s3a.assumed.role.arn"
})
void testRejectCredentialBearingS3ClientFileSystemOptions(String key) {
Configuration conf = new Configuration();
conf.setString(key, "test-value");

assertThatThrownBy(() -> FlussConnection.getClientFileSystemConfiguration(conf))
.isInstanceOf(IllegalConfigurationException.class)
.hasMessageContaining(key)
.hasMessageContaining("Client-side S3 credential configuration")
.hasMessageContaining("Fluss servers");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs;

import org.apache.fluss.annotation.Internal;

import javax.annotation.Nullable;

/** Utilities for converting and classifying S3 filesystem configuration keys. */
@Internal
public final class S3FileSystemConfigUtils {

public static final String ACCESS_KEY = "fs.s3a.access.key";
public static final String ACCESS_KEY_ALIAS = "fs.s3a.access-key";
public static final String SECRET_KEY = "fs.s3a.secret.key";
public static final String SECRET_KEY_ALIAS = "fs.s3a.secret-key";
public static final String SESSION_TOKEN = "fs.s3a.session.token";
public static final String AWS_CREDENTIALS_PROVIDER = "fs.s3a.aws.credentials.provider";
public static final String ROLE_ARN = "fs.s3a.assumed.role.arn";
public static final String STS_ENDPOINT = "fs.s3a.assumed.role.sts.endpoint";
public static final String ENDPOINT = "fs.s3a.endpoint";
public static final String REGION = "fs.s3a.region";
public static final String PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
public static final String PATH_STYLE_ACCESS_ALIAS = "fs.s3a.path-style-access";

private static final String HADOOP_CONFIG_PREFIX = "fs.s3a.";
private static final String ASSUMED_ROLE_PREFIX = "fs.s3a.assumed.role.";
private static final String[] FLUSS_CONFIG_PREFIXES = {"s3.", "s3a.", HADOOP_CONFIG_PREFIX};

/**
* Converts a Fluss S3 configuration key to the corresponding Hadoop S3A configuration key.
*
* <p>Supported Fluss prefixes are {@code s3.}, {@code s3a.}, and {@code fs.s3a.}. Unknown
* prefixes return {@code null}.
*/
@Nullable
public static String toHadoopConfigKey(String key) {
for (String prefix : FLUSS_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
return HADOOP_CONFIG_PREFIX + key.substring(prefix.length());
}
}
return null;
}

/**
* Returns whether the given Fluss S3 configuration key carries client-side S3 credentials or
* credential provider settings.
*/
public static boolean isCredentialConfigKey(String key) {
String hadoopConfigKey = toHadoopConfigKey(key);
if (hadoopConfigKey == null) {
return false;
}

return hadoopConfigKey.equals(ACCESS_KEY)
|| hadoopConfigKey.equals(ACCESS_KEY_ALIAS)
|| hadoopConfigKey.equals(SECRET_KEY)
|| hadoopConfigKey.equals(SECRET_KEY_ALIAS)
|| hadoopConfigKey.equals(SESSION_TOKEN)
|| hadoopConfigKey.equals(AWS_CREDENTIALS_PROVIDER)
|| hadoopConfigKey.startsWith(ASSUMED_ROLE_PREFIX);
}

private S3FileSystemConfigUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.fs;

import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link S3FileSystemConfigUtils}. */
class S3FileSystemConfigUtilsTest {

@Test
void testConvertFlussConfigKeyToHadoopConfigKey() {
assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("s3.endpoint"))
.isEqualTo(S3FileSystemConfigUtils.ENDPOINT);
assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("s3a.region"))
.isEqualTo(S3FileSystemConfigUtils.REGION);
assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("fs.s3a.path-style-access"))
.isEqualTo(S3FileSystemConfigUtils.PATH_STYLE_ACCESS_ALIAS);
assertThat(S3FileSystemConfigUtils.toHadoopConfigKey("oss.endpoint")).isNull();
}

@Test
void testDetectCredentialConfigKey() {
assertThat(
Arrays.asList(
"s3a.aws.credentials.provider",
"s3a.access.key",
"s3a.secret.key",
"s3a.access-key",
"s3a.secret-key",
"s3a.session.token",
"s3a.assumed.role.arn",
"s3a.assumed.role.sts.endpoint",
"s3.aws.credentials.provider",
"s3.access.key",
"s3.secret.key",
"s3.secret-key",
"fs.s3a.aws.credentials.provider",
"fs.s3a.access.key",
"fs.s3a.secret-key",
"fs.s3a.assumed.role.arn"))
.allMatch(S3FileSystemConfigUtils::isCredentialConfigKey);
}

@Test
void testAllowNonCredentialConfigKey() {
assertThat(
Arrays.asList(
"s3a.endpoint",
"s3a.region",
"s3a.path-style-access",
"s3a.path.style.access",
"oss.access.key"))
.noneMatch(S3FileSystemConfigUtils::isCredentialConfigKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FileSystemPlugin;
import org.apache.fluss.fs.S3FileSystemConfigUtils;
import org.apache.fluss.fs.s3.token.S3ADelegationTokenReceiver;
import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver;

Expand All @@ -40,19 +41,10 @@ public class S3FileSystemPlugin implements FileSystemPlugin {

private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemPlugin.class);

private static final String[] FLUSS_CONFIG_PREFIXES = {"s3.", "s3a.", "fs.s3a."};

private static final String HADOOP_CONFIG_PREFIX = "fs.s3a.";

private static final String ACCESS_KEY_ID = "fs.s3a.access.key";
private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key";

private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn";

private static final String[][] MIRRORED_CONFIG_KEYS = {
{"fs.s3a.access-key", "fs.s3a.access.key"},
{"fs.s3a.secret-key", "fs.s3a.secret.key"},
{"fs.s3a.path-style-access", "fs.s3a.path.style.access"}
{S3FileSystemConfigUtils.ACCESS_KEY_ALIAS, S3FileSystemConfigUtils.ACCESS_KEY},
{S3FileSystemConfigUtils.SECRET_KEY_ALIAS, S3FileSystemConfigUtils.SECRET_KEY},
{S3FileSystemConfigUtils.PATH_STYLE_ACCESS_ALIAS, S3FileSystemConfigUtils.PATH_STYLE_ACCESS}
};

@Override
Expand Down Expand Up @@ -85,17 +77,17 @@ org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration flussC
}

for (String key : flussConfig.keySet()) {
for (String prefix : FLUSS_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = HADOOP_CONFIG_PREFIX + key.substring(prefix.length());
String newValue =
flussConfig.getString(
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
conf.set(newKey, newValue);

LOG.debug(
"Adding Fluss config entry for {} as {} to Hadoop config", key, newKey);
}
String hadoopConfigKey = S3FileSystemConfigUtils.toHadoopConfigKey(key);
if (hadoopConfigKey != null) {
String newValue =
flussConfig.getString(
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
conf.set(hadoopConfigKey, newValue);

LOG.debug(
"Adding Fluss config entry for {} as {} to Hadoop config",
key,
hadoopConfigKey);
}
}
return conf;
Expand Down Expand Up @@ -131,9 +123,9 @@ private URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopCon

private void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) {
boolean hasStaticKeys =
hadoopConfig.get(ACCESS_KEY_ID) != null
&& hadoopConfig.get(ACCESS_KEY_SECRET) != null;
boolean hasRoleArn = hadoopConfig.get(ROLE_ARN_KEY) != null;
hadoopConfig.get(S3FileSystemConfigUtils.ACCESS_KEY) != null
&& hadoopConfig.get(S3FileSystemConfigUtils.SECRET_KEY) != null;
boolean hasRoleArn = hadoopConfig.get(S3FileSystemConfigUtils.ROLE_ARN) != null;

if (hasStaticKeys || hasRoleArn) {
LOG.info(
Expand Down
Loading
Loading