From 867a2365a5885ef39c0ce18e51edb0406f6a9a7a Mon Sep 17 00:00:00 2001 From: Maxim Date: Wed, 15 Apr 2026 14:36:21 -0700 Subject: [PATCH] feat(cubestore): support AWS Web Identity Token File in S3RemoteFs (#10687) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When CUBESTORE_AWS_ACCESS_KEY_ID is not set and AWS_WEB_IDENTITY_TOKEN_FILE is present, the credential provider chain falls through to STS AssumeRoleWithWebIdentity — reading the JWT from the token file and exchanging it for temporary session credentials. The refresh loop now polls the token file mtime every 30 seconds in web identity mode (vs 3-hour default for static credentials). Credentials are only re-exchanged when the file actually changes, keeping STS calls minimal. Co-authored-by: Claude Opus 4.6 (1M context) --- rust/cubestore/cubestore/src/remotefs/s3.rs | 121 +++++++++++++++----- 1 file changed, 91 insertions(+), 30 deletions(-) diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 2dc5f943b4bb3..7e3ef76ae6fc1 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -29,6 +29,10 @@ pub struct S3RemoteFs { bucket: arc_swap::ArcSwap, sub_path: Option, delete_mut: Mutex<()>, + /// When set, the refresh loop watches this file for changes and calls + /// STS AssumeRoleWithWebIdentity with the JWT inside it. + web_identity_token_file: Option, + web_identity_role_arn: Option, } impl fmt::Debug for S3RemoteFs { @@ -50,29 +54,40 @@ impl S3RemoteFs { bucket_name: String, sub_path: Option, ) -> Result, CubeError> { - // Incorrect naming for ENV variables... let access_key = env::var("CUBESTORE_AWS_ACCESS_KEY_ID").ok(); let secret_key = env::var("CUBESTORE_AWS_SECRET_ACCESS_KEY").ok(); + let token_file = env::var("CUBESTORE_AWS_WEB_IDENTITY_TOKEN_FILE").ok(); + let role_arn = env::var("CUBESTORE_AWS_ROLE_ARN").ok(); + + let credentials = if let (Some(ref tf), Some(ref arn)) = (&token_file, &role_arn) { + // Web identity mode: read JWT from file and exchange via STS. + let jwt = std::fs::read_to_string(tf).map_err(|e| { + CubeError::internal(format!( + "Failed to read web identity token file '{}': {}", + tf, e + )) + })?; + info!( + "Using web identity token file for S3 credentials (role={})", + arn + ); + Credentials::from_sts(arn, "cubestore", &jwt).map_err(|e| { + CubeError::internal(format!("STS AssumeRoleWithWebIdentity failed: {}", e)) + })? + } else { + // Static credentials mode (or credential chain fallback). + Credentials::new( + access_key.as_deref(), + secret_key.as_deref(), + None, + None, + None, + ) + .map_err(|e| CubeError::internal(format!("Failed to create S3 credentials: {}", e)))? + }; - let credentials = Credentials::new( - access_key.as_deref(), - secret_key.as_deref(), - None, - None, - None, - ) - .map_err(|err| { - CubeError::internal(format!( - "Failed to create S3 credentials: {}", - err.to_string() - )) - })?; - let region = region.parse::().map_err(|err| { - CubeError::internal(format!( - "Failed to parse Region '{}': {}", - region, - err.to_string() - )) + let region = region.parse::().map_err(|e| { + CubeError::internal(format!("Failed to parse Region '{}': {}", region, e)) })?; let bucket = Bucket::new(&bucket_name, region.clone(), credentials)?; let fs = Arc::new(Self { @@ -80,6 +95,8 @@ impl S3RemoteFs { bucket: arc_swap::ArcSwap::new(Arc::new(bucket)), sub_path, delete_mut: Mutex::new(()), + web_identity_token_file: token_file, + web_identity_role_arn: role_arn, }); spawn_creds_refresh_loop(access_key, secret_key, bucket_name, region, &fs); @@ -94,15 +111,36 @@ fn spawn_creds_refresh_loop( region: Region, fs: &Arc, ) { - // Refresh credentials. TODO: use expiration time. - let refresh_every = refresh_interval_from_env(); + let token_file = fs.web_identity_token_file.clone(); + let role_arn = fs.web_identity_role_arn.clone(); + let is_web_identity = token_file.is_some() && role_arn.is_some(); + + // Web identity STS credentials expire in ~1 hour, so poll the token file + // every 30s by default. Static credentials use 3-hour default. + // CUBESTORE_AWS_CREDS_REFRESH_EVERY_MINS overrides both. + let refresh_every = { + let configured = refresh_interval_from_env(); + if is_web_identity && configured == Duration::from_secs(60 * 180) { + Duration::from_secs(30) + } else { + configured + } + }; + if refresh_every.as_secs() == 0 { return; } let fs = Arc::downgrade(fs); + let mut last_modified = token_file + .as_ref() + .and_then(|f| std::fs::metadata(f).ok()?.modified().ok()); + std::thread::spawn(move || { - log::debug!("Started S3 credentials refresh loop"); + log::debug!( + "Started S3 credentials refresh loop (web_identity={})", + is_web_identity + ); loop { std::thread::sleep(refresh_every); let fs = match fs.upgrade() { @@ -112,13 +150,36 @@ fn spawn_creds_refresh_loop( } Some(fs) => fs, }; - let c = match Credentials::new( - access_key.as_deref(), - secret_key.as_deref(), - None, - None, - None, - ) { + + // In web identity mode, only refresh when the token file changed. + if let (Some(ref file), Some(_)) = (&token_file, &role_arn) { + let current_modified = std::fs::metadata(file).ok().and_then(|m| m.modified().ok()); + if current_modified == last_modified { + continue; + } + last_modified = current_modified; + info!("Web identity token file changed, refreshing S3 credentials"); + } + + let c = if let (Some(ref file), Some(ref arn)) = (&token_file, &role_arn) { + match std::fs::read_to_string(file) { + Ok(jwt) => Credentials::from_sts(arn, "cubestore", &jwt), + Err(e) => { + log::error!("Failed to read web identity token file: {}", e); + continue; + } + } + } else { + Credentials::new( + access_key.as_deref(), + secret_key.as_deref(), + None, + None, + None, + ) + }; + + let c = match c { Ok(c) => c, Err(e) => { log::error!("Failed to refresh S3 credentials: {}", e);