diff --git a/rust/cubestore/cubestore/src/remotefs/s3.rs b/rust/cubestore/cubestore/src/remotefs/s3.rs index 2dc5f943b4bb3..7e3ef76ae6fc1 100644 --- a/rust/cubestore/cubestore/src/remotefs/s3.rs +++ b/rust/cubestore/cubestore/src/remotefs/s3.rs @@ -29,6 +29,10 @@ pub struct S3RemoteFs { bucket: arc_swap::ArcSwap, sub_path: Option, delete_mut: Mutex<()>, + /// When set, the refresh loop watches this file for changes and calls + /// STS AssumeRoleWithWebIdentity with the JWT inside it. + web_identity_token_file: Option, + web_identity_role_arn: Option, } impl fmt::Debug for S3RemoteFs { @@ -50,29 +54,40 @@ impl S3RemoteFs { bucket_name: String, sub_path: Option, ) -> Result, CubeError> { - // Incorrect naming for ENV variables... let access_key = env::var("CUBESTORE_AWS_ACCESS_KEY_ID").ok(); let secret_key = env::var("CUBESTORE_AWS_SECRET_ACCESS_KEY").ok(); + let token_file = env::var("CUBESTORE_AWS_WEB_IDENTITY_TOKEN_FILE").ok(); + let role_arn = env::var("CUBESTORE_AWS_ROLE_ARN").ok(); + + let credentials = if let (Some(ref tf), Some(ref arn)) = (&token_file, &role_arn) { + // Web identity mode: read JWT from file and exchange via STS. + let jwt = std::fs::read_to_string(tf).map_err(|e| { + CubeError::internal(format!( + "Failed to read web identity token file '{}': {}", + tf, e + )) + })?; + info!( + "Using web identity token file for S3 credentials (role={})", + arn + ); + Credentials::from_sts(arn, "cubestore", &jwt).map_err(|e| { + CubeError::internal(format!("STS AssumeRoleWithWebIdentity failed: {}", e)) + })? + } else { + // Static credentials mode (or credential chain fallback). + Credentials::new( + access_key.as_deref(), + secret_key.as_deref(), + None, + None, + None, + ) + .map_err(|e| CubeError::internal(format!("Failed to create S3 credentials: {}", e)))? + }; - let credentials = Credentials::new( - access_key.as_deref(), - secret_key.as_deref(), - None, - None, - None, - ) - .map_err(|err| { - CubeError::internal(format!( - "Failed to create S3 credentials: {}", - err.to_string() - )) - })?; - let region = region.parse::().map_err(|err| { - CubeError::internal(format!( - "Failed to parse Region '{}': {}", - region, - err.to_string() - )) + let region = region.parse::().map_err(|e| { + CubeError::internal(format!("Failed to parse Region '{}': {}", region, e)) })?; let bucket = Bucket::new(&bucket_name, region.clone(), credentials)?; let fs = Arc::new(Self { @@ -80,6 +95,8 @@ impl S3RemoteFs { bucket: arc_swap::ArcSwap::new(Arc::new(bucket)), sub_path, delete_mut: Mutex::new(()), + web_identity_token_file: token_file, + web_identity_role_arn: role_arn, }); spawn_creds_refresh_loop(access_key, secret_key, bucket_name, region, &fs); @@ -94,15 +111,36 @@ fn spawn_creds_refresh_loop( region: Region, fs: &Arc, ) { - // Refresh credentials. TODO: use expiration time. - let refresh_every = refresh_interval_from_env(); + let token_file = fs.web_identity_token_file.clone(); + let role_arn = fs.web_identity_role_arn.clone(); + let is_web_identity = token_file.is_some() && role_arn.is_some(); + + // Web identity STS credentials expire in ~1 hour, so poll the token file + // every 30s by default. Static credentials use 3-hour default. + // CUBESTORE_AWS_CREDS_REFRESH_EVERY_MINS overrides both. + let refresh_every = { + let configured = refresh_interval_from_env(); + if is_web_identity && configured == Duration::from_secs(60 * 180) { + Duration::from_secs(30) + } else { + configured + } + }; + if refresh_every.as_secs() == 0 { return; } let fs = Arc::downgrade(fs); + let mut last_modified = token_file + .as_ref() + .and_then(|f| std::fs::metadata(f).ok()?.modified().ok()); + std::thread::spawn(move || { - log::debug!("Started S3 credentials refresh loop"); + log::debug!( + "Started S3 credentials refresh loop (web_identity={})", + is_web_identity + ); loop { std::thread::sleep(refresh_every); let fs = match fs.upgrade() { @@ -112,13 +150,36 @@ fn spawn_creds_refresh_loop( } Some(fs) => fs, }; - let c = match Credentials::new( - access_key.as_deref(), - secret_key.as_deref(), - None, - None, - None, - ) { + + // In web identity mode, only refresh when the token file changed. + if let (Some(ref file), Some(_)) = (&token_file, &role_arn) { + let current_modified = std::fs::metadata(file).ok().and_then(|m| m.modified().ok()); + if current_modified == last_modified { + continue; + } + last_modified = current_modified; + info!("Web identity token file changed, refreshing S3 credentials"); + } + + let c = if let (Some(ref file), Some(ref arn)) = (&token_file, &role_arn) { + match std::fs::read_to_string(file) { + Ok(jwt) => Credentials::from_sts(arn, "cubestore", &jwt), + Err(e) => { + log::error!("Failed to read web identity token file: {}", e); + continue; + } + } + } else { + Credentials::new( + access_key.as_deref(), + secret_key.as_deref(), + None, + None, + None, + ) + }; + + let c = match c { Ok(c) => c, Err(e) => { log::error!("Failed to refresh S3 credentials: {}", e);