From 6cdf6ac08e7090a4d5a1cf4f4f2c7e5970cd8a23 Mon Sep 17 00:00:00 2001 From: Marco Palmisano Date: Wed, 17 Jun 2026 22:57:51 +0200 Subject: [PATCH 1/5] feat: add JWT authentication support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds JSON Web Token (JWT) authentication as a new auth_type option. When enabled, clients present a JWT as their password; pgDog validates the signature (PEM file or JWKS endpoint), extracts the configured username claim (default: sub), and optionally auto-provisions a connection pool for that user. Key changes: - pgdog/src/auth/jwt/ — new module: JwtValidator, Claims, PEM cache, tests (validation, expiry, audience, algorithm, memory-leak, suffix) - pgdog-config/src/auth.rs — add Jwt variant to AuthType - pgdog-config/src/general.rs — add 10 JWT config fields with env var support and tests - pgdog/src/config/mod.rs — JWT_VALIDATOR static + jwt_validator() - pgdog/src/config/convert.rs — user_from_jwt() helper - pgdog/src/frontend/client/mod.rs — JWT auth flow in login(); effective_user derived from token claim used for all downstream ops - pgdog/src/backend/pool/address.rs — client_user field (drives SET ROLE) - pgdog/src/backend/server.rs — execute SET ROLE after connect when client_user differs from server user - pgdog/src/backend/pool/cluster.rs — user_read_only flag; forces ExcludePrimary rw_split for read-only users - pgdog/src/backend/pool/request.rs — replica_only field - pgdog/src/backend/pool/lb/mod.rs — respect replica_only in routing - pgdog/src/backend/pool/monitor.rs — shut down pool on auth error; replenish() propagates errors - pgdog/src/backend/pool/error.rs — Auth variant + is_auth() - pgdog/src/backend/databases.rs — recreate offline pools in add() - pgdog/src/backend/error.rs — extend is_auth() for codes 22023/42704 - pgdog/src/frontend/router/parser/query/mod.rs — enforce read-only routing when user_read_only; also apply to StartTransaction routes - example.pgdog.toml — document all JWT configuration options Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- Cargo.lock | 38 +++ example.pgdog.toml | 53 ++++ pgdog-config/src/auth.rs | 4 + pgdog-config/src/general.rs | 172 ++++++++++ pgdog/Cargo.toml | 3 +- pgdog/src/auth/error.rs | 3 + pgdog/src/auth/jwt/error.rs | 44 +++ pgdog/src/auth/jwt/mod.rs | 198 ++++++++++++ pgdog/src/auth/jwt/pem_cache.rs | 38 +++ pgdog/src/auth/jwt/tests.rs | 296 ++++++++++++++++++ pgdog/src/auth/mod.rs | 2 + .../backend/auth/azure_workload_identity.rs | 1 + pgdog/src/backend/auth/rds_iam.rs | 1 + pgdog/src/backend/databases.rs | 46 ++- pgdog/src/backend/error.rs | 57 +++- pgdog/src/backend/pool/address.rs | 4 + pgdog/src/backend/pool/cluster.rs | 23 +- pgdog/src/backend/pool/connection/mod.rs | 11 +- pgdog/src/backend/pool/error.rs | 11 + pgdog/src/backend/pool/lb/mod.rs | 20 +- pgdog/src/backend/pool/monitor.rs | 22 +- pgdog/src/backend/pool/request.rs | 3 + pgdog/src/backend/server.rs | 16 + pgdog/src/config/convert.rs | 30 +- pgdog/src/config/mod.rs | 27 ++ pgdog/src/frontend/client/mod.rs | 82 ++++- pgdog/src/frontend/client/test/mod.rs | 138 +++++++- pgdog/src/frontend/router/parser/query/mod.rs | 11 +- .../router/parser/query/test/setup.rs | 6 + .../router/parser/query/test/test_bypass.rs | 15 + 30 files changed, 1345 insertions(+), 30 deletions(-) create mode 100644 pgdog/src/auth/jwt/error.rs create mode 100644 pgdog/src/auth/jwt/mod.rs create mode 100644 pgdog/src/auth/jwt/pem_cache.rs create mode 100644 pgdog/src/auth/jwt/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 857f04e82..b82329483 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2610,6 +2610,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "kasuari" version = "0.4.12" @@ -3127,6 +3142,16 @@ dependencies = [ "windows-link", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -3243,6 +3268,7 @@ dependencies = [ "hyper", "hyper-util", "indexmap", + "jsonwebtoken", "lazy_static", "libc", "lru", @@ -4689,6 +4715,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" +[[package]] +name = "simple_asn1" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.18", + "time", +] + [[package]] name = "siphasher" version = "1.0.3" diff --git a/example.pgdog.toml b/example.pgdog.toml index f9ddd22af..654371701 100644 --- a/example.pgdog.toml +++ b/example.pgdog.toml @@ -252,7 +252,60 @@ mirror_queue = 128 # - scram # - md5 # - trust +# - jwt auth_type = "scram" + +# ── JWT authentication settings ──────────────────────────────────────────────── +# Only used when auth_type = "jwt". + +# Path to an RSA/EC public-key PEM file used to verify JWT tokens. +# Takes priority over jwt_jwks_url when both are configured. +# +# jwt_public_key_file = "/etc/pgdog/jwt_public.pem" + +# URL of a JWKS endpoint to fetch verification keys from. +# Example: https://auth.example.com/.well-known/jwks.json +# +# jwt_jwks_url = "https://auth.example.com/.well-known/jwks.json" + +# How long (seconds) to cache JWKS keys before re-fetching. +# Default: 300 +# +# jwt_jwks_cache_ttl = 300 + +# Expected audience claim. Tokens without this audience value are rejected. +# +# jwt_audience = "myapp" + +# The JWT claim to use as the PostgreSQL database username. +# Default: sub +# +# jwt_username_claim = "sub" + +# Backend Postgres user pgDog uses to connect on behalf of JWT users. +# Defaults to the connection-string username when not set. +# +# jwt_server_user = "pgdog_service" + +# Backend Postgres password used together with jwt_server_user. +# +# jwt_server_password = "supersecret" + +# Optional suffix that usernames must end with to trigger JWT authentication. +# If not configured, all users are validated with JWT. +# +# jwt_user_suffix = "@example.com" + +# Automatically register a connection pool for successfully authenticated JWT users. +# Default: false +# +# jwt_user_auto_provision = false + +# Mark auto-provisioned JWT users as read-only (routes them to replicas). +# Default: false +# +# jwt_user_auto_provision_read_only = false + # Disable cross-shard queries. # # Default: false diff --git a/pgdog-config/src/auth.rs b/pgdog-config/src/auth.rs index 0f76962b8..c081dbcd8 100644 --- a/pgdog-config/src/auth.rs +++ b/pgdog-config/src/auth.rs @@ -49,6 +49,8 @@ pub enum AuthType { Trust, /// Plaintext password. Plain, + /// JWT token (RS256 / ES256). The `sub` claim is the PostgreSQL role. + Jwt, } impl Display for AuthType { @@ -58,6 +60,7 @@ impl Display for AuthType { Self::Scram => write!(f, "scram"), Self::Trust => write!(f, "trust"), Self::Plain => write!(f, "plain"), + Self::Jwt => write!(f, "jwt"), } } } @@ -85,6 +88,7 @@ impl FromStr for AuthType { "scram" => Ok(Self::Scram), "trust" => Ok(Self::Trust), "plain" => Ok(Self::Plain), + "jwt" => Ok(Self::Jwt), _ => Err(format!("Invalid auth type: {}", s)), } } diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index 42363ea5d..1c3aa9222 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -469,6 +469,80 @@ pub struct General { #[serde(default)] pub auth_type: AuthType, + /// Path to an RSA/EC public key PEM file used to verify JWT tokens. + /// + /// **Note:** Takes priority over `jwt_jwks_url` when both are set. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_public_key_file + #[serde(default = "General::jwt_public_key_file")] + pub jwt_public_key_file: Option, + + /// URL of a JWKS endpoint (e.g. `https://auth.example.com/.well-known/jwks.json`). + /// + /// **Note:** Used when `jwt_public_key_file` is not set. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_jwks_url + #[serde(default = "General::jwt_jwks_url")] + pub jwt_jwks_url: Option, + + /// How long (seconds) to cache JWKS keys before re-fetching. + /// + /// _Default:_ `300` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_jwks_cache_ttl + #[serde(default = "General::jwt_jwks_cache_ttl")] + pub jwt_jwks_cache_ttl: u64, + + /// Expected `aud` claim. If set, tokens without this audience are rejected. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_audience + #[serde(default = "General::jwt_audience")] + pub jwt_audience: Option, + + /// The JWT claim to use as the PostgreSQL database username. + /// + /// _Default:_ `sub` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_username_claim + #[serde(default = "General::jwt_username_claim")] + pub jwt_username_claim: String, + + /// The username that pgDog should use to connect to the backend database on behalf of dynamically provisioned JWT users. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_server_user + #[serde(default = "General::jwt_server_user")] + pub jwt_server_user: Option, + + /// The password that pgDog should use to connect to the backend database on behalf of dynamically provisioned JWT users. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_server_password + #[serde(default = "General::jwt_server_password")] + pub jwt_server_password: Option, + + /// Optional suffix that usernames must end with to trigger JWT authentication. + /// + /// **Note:** If not configured, all users are validated with JWT when `auth_type` is `jwt`. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_user_suffix + #[serde(default = "General::jwt_user_suffix")] + pub jwt_user_suffix: Option, + + /// Automatically register and provision connection pools for successfully authenticated JWT users. + /// + /// _Default:_ `false` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_user_auto_provision + #[serde(default = "General::jwt_user_auto_provision")] + pub jwt_user_auto_provision: bool, + + /// Mark dynamically auto-provisioned JWT users as read-only (routes them to replica). + /// + /// _Default:_ `false` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#jwt_user_auto_provision_read_only + #[serde(default = "General::jwt_user_auto_provision_read_only")] + pub jwt_user_auto_provision_read_only: bool, + /// Disable cross-shard queries globally. When enabled, queries touching more than one shard are rejected. #[serde(default)] pub cross_shard_disabled: bool, @@ -831,6 +905,16 @@ impl Default for General { mirror_queue: Self::mirror_queue(), mirror_exposure: Self::mirror_exposure(), auth_type: Self::auth_type(), + jwt_public_key_file: Self::jwt_public_key_file(), + jwt_jwks_url: Self::jwt_jwks_url(), + jwt_jwks_cache_ttl: Self::jwt_jwks_cache_ttl(), + jwt_audience: Self::jwt_audience(), + jwt_username_claim: Self::jwt_username_claim(), + jwt_server_user: Self::jwt_server_user(), + jwt_server_password: Self::jwt_server_password(), + jwt_user_suffix: Self::jwt_user_suffix(), + jwt_user_auto_provision: Self::jwt_user_auto_provision(), + jwt_user_auto_provision_read_only: Self::jwt_user_auto_provision_read_only(), cross_shard_disabled: Self::cross_shard_disabled(), dns_ttl: Self::default_dns_ttl(), pub_sub_channel_size: Self::pub_sub_channel_size(), @@ -1319,6 +1403,46 @@ impl General { Self::env_or_default("PGDOG_STATS_PERIOD", 15_000) } + fn jwt_jwks_cache_ttl() -> u64 { + Self::env_or_default("PGDOG_JWT_JWKS_CACHE_TTL", 300) + } + + fn jwt_username_claim() -> String { + Self::env_or_default("PGDOG_JWT_USERNAME_CLAIM", "sub".to_string()) + } + + fn jwt_public_key_file() -> Option { + Self::env_option_string("PGDOG_JWT_PUBLIC_KEY_FILE") + } + + fn jwt_jwks_url() -> Option { + Self::env_option_string("PGDOG_JWT_JWKS_URL") + } + + fn jwt_audience() -> Option { + Self::env_option_string("PGDOG_JWT_AUDIENCE") + } + + fn jwt_server_user() -> Option { + Self::env_option_string("PGDOG_JWT_SERVER_USER") + } + + fn jwt_server_password() -> Option { + Self::env_option_string("PGDOG_JWT_SERVER_PASSWORD") + } + + fn jwt_user_suffix() -> Option { + Self::env_option_string("PGDOG_JWT_USER_SUFFIX") + } + + fn jwt_user_auto_provision() -> bool { + Self::env_bool_or_default("PGDOG_JWT_USER_AUTO_PROVISION", false) + } + + fn jwt_user_auto_provision_read_only() -> bool { + Self::env_bool_or_default("PGDOG_JWT_USER_AUTO_PROVISION_READ_ONLY", false) + } + fn default_passthrough_auth() -> PassthroughAuth { if let Ok(auth) = env::var("PGDOG_PASSTHROUGH_AUTH") { // TODO: figure out why toml::from_str doesn't work. @@ -1743,4 +1867,52 @@ mod tests { assert_eq!(general.auth_type, AuthType::Trust); assert!(general.dry_run); } + + #[test] + fn test_jwt_defaults() { + let general = General::default(); + assert_eq!(general.jwt_public_key_file, None); + assert_eq!(general.jwt_jwks_url, None); + assert_eq!(general.jwt_jwks_cache_ttl, 300); + assert_eq!(general.jwt_audience, None); + assert_eq!(general.jwt_username_claim, "sub"); + assert_eq!(general.jwt_server_user, None); + assert_eq!(general.jwt_server_password, None); + assert_eq!(general.jwt_user_suffix, None); + assert!(!general.jwt_user_auto_provision); + assert!(!general.jwt_user_auto_provision_read_only); + } + + #[test] + fn test_jwt_env_overrides() { + let _guard1 = set_env_var("PGDOG_JWT_PUBLIC_KEY_FILE", "/tmp/key.pem"); + let _guard2 = set_env_var("PGDOG_JWT_JWKS_URL", "https://auth.example.com/jwks.json"); + let _guard3 = set_env_var("PGDOG_JWT_JWKS_CACHE_TTL", "600"); + let _guard4 = set_env_var("PGDOG_JWT_AUDIENCE", "myapp"); + let _guard5 = set_env_var("PGDOG_JWT_USERNAME_CLAIM", "email"); + let _guard6 = set_env_var("PGDOG_JWT_SERVER_USER", "pgdogsvc"); + let _guard7 = set_env_var("PGDOG_JWT_SERVER_PASSWORD", "secret"); + let _guard8 = set_env_var("PGDOG_JWT_USER_SUFFIX", "@example.com"); + let _guard9 = set_env_var("PGDOG_JWT_USER_AUTO_PROVISION", "true"); + let _guard10 = set_env_var("PGDOG_JWT_USER_AUTO_PROVISION_READ_ONLY", "true"); + + let general = General::default(); + + assert_eq!( + general.jwt_public_key_file, + Some("/tmp/key.pem".to_string()) + ); + assert_eq!( + general.jwt_jwks_url, + Some("https://auth.example.com/jwks.json".to_string()) + ); + assert_eq!(general.jwt_jwks_cache_ttl, 600); + assert_eq!(general.jwt_audience, Some("myapp".to_string())); + assert_eq!(general.jwt_username_claim, "email"); + assert_eq!(general.jwt_server_user, Some("pgdogsvc".to_string())); + assert_eq!(general.jwt_server_password, Some("secret".to_string())); + assert_eq!(general.jwt_user_suffix, Some("@example.com".to_string())); + assert!(general.jwt_user_auto_provision); + assert!(general.jwt_user_auto_provision_read_only); + } } diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 28ca94984..12b76c35a 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -78,9 +78,10 @@ azure_core = "0.34.0" crc32c = "0.6.8" bit-vec = "0.8" smallvec = "1" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots-no-provider"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots-no-provider", "json"] } hex = "0.4" x509-parser = "0.18" +jsonwebtoken = "9" [target.'cfg(unix)'.dependencies] libc = "0.2" diff --git a/pgdog/src/auth/error.rs b/pgdog/src/auth/error.rs index 9ff6877dd..58e98f3e0 100644 --- a/pgdog/src/auth/error.rs +++ b/pgdog/src/auth/error.rs @@ -7,4 +7,7 @@ pub enum Error { #[error("server-side auth can only use one password")] ServerSideOnePassword, + + #[error("JWT error: {0}")] + Jwt(#[from] super::jwt::JwtError), } diff --git a/pgdog/src/auth/jwt/error.rs b/pgdog/src/auth/jwt/error.rs new file mode 100644 index 000000000..ef16e053c --- /dev/null +++ b/pgdog/src/auth/jwt/error.rs @@ -0,0 +1,44 @@ +/// Errors that can occur during JWT validation or key loading. +#[derive(Debug, thiserror::Error)] +pub enum JwtError { + #[error("Token expired")] + TokenExpired, + + #[error("Invalid algorithm")] + InvalidAlgorithm, + + #[error("Missing sub claim")] + MissingSub, + + #[error("Audience mismatch")] + AudienceMismatch, + + #[error("Missing kid")] + MissingKid, + + #[error("JWK not found")] + JwkNotFound, + + #[error("Public key load error: {0}")] + PublicKeyLoad(String), + + #[error("JWKS fetch error: {0}")] + JwksFetch(String), + + #[error("Invalid token: {0}")] + InvalidToken(jsonwebtoken::errors::Error), + + #[error("Invalid JWK: {0}")] + InvalidJwk(String), +} + +impl From for JwtError { + fn from(err: jsonwebtoken::errors::Error) -> Self { + match err.kind() { + jsonwebtoken::errors::ErrorKind::ExpiredSignature => JwtError::TokenExpired, + jsonwebtoken::errors::ErrorKind::InvalidAlgorithm => JwtError::InvalidAlgorithm, + jsonwebtoken::errors::ErrorKind::InvalidAudience => JwtError::AudienceMismatch, + _ => JwtError::InvalidToken(err), + } + } +} diff --git a/pgdog/src/auth/jwt/mod.rs b/pgdog/src/auth/jwt/mod.rs new file mode 100644 index 000000000..6df451b91 --- /dev/null +++ b/pgdog/src/auth/jwt/mod.rs @@ -0,0 +1,198 @@ +pub mod error; +pub mod pem_cache; + +#[cfg(test)] +pub mod tests; + +use crate::config::General; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +pub use error::JwtError; + +/// Represents the claims contained within the validated JWT token. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Claims { + /// The subject claim, mapped directly to the target PostgreSQL role name. + pub sub: String, + /// Expiration time (seconds since UNIX epoch). + pub exp: u64, + /// Optional audience claim used to verify token target. + pub aud: Option, + /// All other fields captured dynamically. + #[serde(flatten)] + pub extra: std::collections::HashMap, +} + +impl Claims { + /// Extract the username dynamically using the configured claim name. + pub fn get_username(&self, claim_name: &str) -> Option { + if claim_name == "sub" { + Some(self.sub.clone()) + } else if let Some(val) = self.extra.get(claim_name) { + match val { + serde_json::Value::String(s) => Some(s.clone()), + _ => Some(val.to_string()), + } + } else { + None + } + } +} + +struct CachedJwks { + jwks: std::sync::Arc, + fetched_at: std::time::Instant, +} + +static JWT_CRYPTO_PROVIDER: Lazy<()> = Lazy::new(|| { + let _ = tokio_rustls::rustls::crypto::aws_lc_rs::default_provider().install_default(); +}); + +/// Dynamic JWT validator supporting signature verification using either a local PEM public key +/// or a remote JSON Web Key Set (JWKS) endpoint. +pub struct JwtValidator { + public_key: Option, + jwks_url: Option, + jwks_cache_ttl: std::time::Duration, + audience: Option, + username_claim: String, + jwks_cache: tokio::sync::Mutex>, + http_client: reqwest::Client, +} + +impl JwtValidator { + /// Creates a new `JwtValidator` using the settings from the `General` configuration. + pub fn new(general: &General) -> Result { + Lazy::force(&JWT_CRYPTO_PROVIDER); + + let public_key = if let Some(ref path) = general.jwt_public_key_file { + let static_bytes = pem_cache::get_static_pem_bytes(path).map_err(|e| { + JwtError::PublicKeyLoad(format!("Failed to read public key file: {}", e)) + })?; + + // Try to parse as RSA first, fallback to EC public key + let key = if let Ok(rsa_key) = jsonwebtoken::DecodingKey::from_rsa_pem(static_bytes) { + rsa_key + } else { + jsonwebtoken::DecodingKey::from_ec_pem(static_bytes).map_err(|e| { + JwtError::PublicKeyLoad(format!("Failed to parse RSA/EC public key PEM: {}", e)) + })? + }; + Some(key) + } else { + None + }; + + Ok(Self { + public_key, + jwks_url: general.jwt_jwks_url.clone(), + jwks_cache_ttl: std::time::Duration::from_secs(general.jwt_jwks_cache_ttl), + audience: general.jwt_audience.clone(), + username_claim: general.jwt_username_claim.clone(), + jwks_cache: tokio::sync::Mutex::new(None), + http_client: reqwest::Client::new(), + }) + } + + /// Validates the provided JWT token string. + pub async fn validate(&self, token: &str) -> Result { + let header = jsonwebtoken::decode_header(token)?; + + // Ensure we borrow the decoding key to avoid cloning + let decoding_key_owned; + let decoding_key = if let Some(ref public_key) = self.public_key { + public_key + } else if let Some(ref jwks_url) = self.jwks_url { + let kid = header.kid.as_ref().ok_or(JwtError::MissingKid)?; + let jwks = self.get_jwks(jwks_url).await?; + + let jwk = jwks + .keys + .iter() + .find(|k| k.common.key_id.as_ref() == Some(kid)) + .ok_or(JwtError::JwkNotFound)?; + + decoding_key_owned = jsonwebtoken::DecodingKey::from_jwk(jwk) + .map_err(|e| JwtError::InvalidJwk(e.to_string()))?; + &decoding_key_owned + } else { + return Err(JwtError::PublicKeyLoad( + "No JWT verification key or JWKS URL configured".to_string(), + )); + }; + + // Determine algorithm from header, only allowing secure asymmetric algorithms + let algorithm = match header.alg { + jsonwebtoken::Algorithm::RS256 => jsonwebtoken::Algorithm::RS256, + jsonwebtoken::Algorithm::ES256 => jsonwebtoken::Algorithm::ES256, + _ => return Err(JwtError::InvalidAlgorithm), + }; + + // Configure validation criteria + let mut validation = jsonwebtoken::Validation::new(algorithm); + if let Some(ref aud) = self.audience { + validation.set_audience(&[aud]); + } else { + validation.validate_aud = false; + } + + let token_data = jsonwebtoken::decode::(token, decoding_key, &validation)?; + let claims = token_data.claims; + + let username = claims + .get_username(&self.username_claim) + .ok_or(JwtError::MissingSub)?; + + if username.is_empty() { + return Err(JwtError::MissingSub); + } + + Ok(claims) + } + + /// Fetches the JWKS keys using double-checked locking to avoid duplicate HTTP calls. + async fn get_jwks( + &self, + jwks_url: &str, + ) -> Result, JwtError> { + // First check (read lock) + { + let cache = self.jwks_cache.lock().await; + if let Some(ref cached) = *cache { + if cached.fetched_at.elapsed() < self.jwks_cache_ttl { + return Ok(std::sync::Arc::clone(&cached.jwks)); + } + } + } + + // Second check under lock to prevent concurrent HTTP requests + let mut cache = self.jwks_cache.lock().await; + if let Some(ref cached) = *cache { + if cached.fetched_at.elapsed() < self.jwks_cache_ttl { + return Ok(std::sync::Arc::clone(&cached.jwks)); + } + } + + let response = self + .http_client + .get(jwks_url) + .send() + .await + .map_err(|e| JwtError::JwksFetch(format!("Request failed: {}", e)))?; + + let jwks = std::sync::Arc::new( + response + .json::() + .await + .map_err(|e| JwtError::JwksFetch(format!("Failed to parse JWKS JSON: {}", e)))?, + ); + + *cache = Some(CachedJwks { + jwks: std::sync::Arc::clone(&jwks), + fetched_at: std::time::Instant::now(), + }); + + Ok(jwks) + } +} diff --git a/pgdog/src/auth/jwt/pem_cache.rs b/pgdog/src/auth/jwt/pem_cache.rs new file mode 100644 index 000000000..c00954650 --- /dev/null +++ b/pgdog/src/auth/jwt/pem_cache.rs @@ -0,0 +1,38 @@ +use once_cell::sync::Lazy; +use parking_lot::Mutex; +use std::collections::HashMap; +use std::path::PathBuf; + +/// Static cache to store leaked PEM key bytes. +/// +/// Because `jsonwebtoken`'s `DecodingKey` signature binds the lifetime of the returned key +/// to the lifetime of the input slice, we must provide a slice that lives as long as the +/// program ('static). +/// +/// To prevent memory leaks during configuration reloads (e.g. on SIGHUP), we cache loaded +/// PEM files. If a file's path and contents have not changed, we reuse the existing static +/// slice. If the contents change on disk, we leak the new contents and update the cache. +static PEM_KEY_CACHE: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +/// Load a PEM file from disk and return a static byte slice. +/// Safely avoids repeated leaking of memory on configuration reloads. +pub fn get_static_pem_bytes(path: &str) -> std::io::Result<&'static [u8]> { + let path_buf = PathBuf::from(path); + let current_bytes = std::fs::read(&path_buf)?; + + let mut cache = PEM_KEY_CACHE.lock(); + + // Check if we already have the exact same file contents cached + if let Some(cached_bytes) = cache.get(&path_buf) { + if *cached_bytes == current_bytes.as_slice() { + return Ok(*cached_bytes); + } + } + + // Leaking is only done when a new file is loaded or its contents changed on disk + let static_ref: &'static [u8] = Box::leak(current_bytes.into_boxed_slice()); + cache.insert(path_buf, static_ref); + + Ok(static_ref) +} diff --git a/pgdog/src/auth/jwt/tests.rs b/pgdog/src/auth/jwt/tests.rs new file mode 100644 index 000000000..f2fe4d4dc --- /dev/null +++ b/pgdog/src/auth/jwt/tests.rs @@ -0,0 +1,296 @@ +use super::pem_cache::get_static_pem_bytes; +use super::*; +use jsonwebtoken::{Algorithm, EncodingKey, Header}; +use std::time::{SystemTime, UNIX_EPOCH}; + +const TEST_PRIVATE_KEY_PEM: &[u8] = b"-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCmYC6Np7JXYe7a +PBv7SB84oz3Cx8mqVFFgNSvqTsqROT3NCK8aEtnPwsStxYpkXw7cKD2M6qFx7Y12 +CpoNQWL12dY0x3MPDWV+uQTejbklq5NDOh9IID7+RqNMO/t8/yKwY/HzJutOU2SV +CTFlJKG8KUJOQTFnWYC+rOcMk3L1befIdyQM3IDD2zBAJtDGTClZBrrjkEZmsb39 +lBsTM3pkzUIRcxpeFwITmTZeR/CqQpU3J/3aSqZt6EhTXheIotl3wdQYT28XpbcY +TioVfNDhQhwBWwiGi6L70Q17EprClXdCzueHHQkzkuTQXHxWOj1k8EHU73QVyXUq +VSx1TjwvAgMBAAECggEAO6rP10aWi4cYQZUAFgC6DbZhjmrXNKpTmtTG4JuMQ0PL +ma4tGgU7rypzHbz0EmYS7rrRxClbZ//hVT2dHPbftjr++uOyrGnKBgX1rJkYFt3v +DNOZ52SFIu0TYGI8oYngl3DokyLYjbkTn+1xlQvrow8K9ASmYqGzLe7VV+nDdyf0 +w3xPHigjXsSr+j0INa5IXe7HGDsCFAIOfAg0ruGPKEkKg1Sxwq6/BFr8Rq28kaMv +wlMtHDyqfxXezf3JPjoe9hnNCi6d6n/buI/5qYHMR+r4Eb5iZbMzCEeOz5BT26Ku +YHa36jJC3b5z7N1B1g2DgPYWIyvsu1LnEGfFJaHmSQKBgQDSS85TXvxMM0A419Jf +LzVjWSN4dkqoJvM5/4ra7BB4/2353FP/TeSkGLARZ0WfpvFLvk/iTeH7IyMQTEB3 +xnoWT/kuPhLscwDwRZRgr0O+S6BFNMzGafI/SaD/TUy1lize9EKK2zYRsfT+EmTB +0+hX5ILMxGqy3AqcH7TtcHTnKQKBgQDKiMlw5TcZ84gLW3HMVwiBIIu5xTUUrAq1 +65al69lWR2woma9eg2a70ES+oJpLkuM4FTXZq0/X/XXgmAQxszu091a6GdPb5XV8 +lYx31xpBiZwyeB2y5yEbSq+AawzFMIUWmRH94+mRv2+wsUmUZ84TVHua22A3+Qur +l1/GoBYrlwKBgQDDie09pFKgX/9VW4inLPRNfnL27bcZh64dvblVOq9OcuPFstL/ +z2PMGZCNfiNFAivXrAwHdzerFs7htqUzOgAHgzFFiD58UasLvwbqp80rwpIyB5ho +3dZ8dnAXM78iEZODdEfzaUVrSrdtD5lUiT+/iiD9WZ2E1gmfhfPr2+c3kQKBgHvf +9fVK/MyumwL3Rz8H7HeuBEf3SmP+Zf6mvVl2S1PuE0Ux2oUgMXGmDKXbbQPUL41Z +y7n6gbdFmxdnYwlS6q3gqfbhXScdzSIKBgQ2WCTFmfd0aBXIMAOVRopw7zqcVopf +zRVQlMdEI3gatzpB01UXUxKAIvWZKX4l87p0p5q5AoGBALh91ATgdrEhe16TV0ru +8B89ZtMcgqaKXVlPO/Rr6fBT/VhSW9KV52sik4Jea5AFfZty/DzhP8pXbJtZjHjA +d5Bja4qZrKG67E92JhoCKMso/JaQELsln9GzQVkVsgLxF1IP0k797znJIfHDFvma +ntQFcU46jCHtpmL7OyzV4NoT +-----END PRIVATE KEY-----"; + +const TEST_PUBLIC_KEY_PEM: &[u8] = b"-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApmAujaeyV2Hu2jwb+0gf +OKM9wsfJqlRRYDUr6k7KkTk9zQivGhLZz8LErcWKZF8O3Cg9jOqhce2NdgqaDUFi +9dnWNMdzDw1lfrkE3o25JauTQzofSCA+/kajTDv7fP8isGPx8ybrTlNklQkxZSSh +vClCTkExZ1mAvqznDJNy9W3nyHckDNyAw9swQCbQxkwpWQa645BGZrG9/ZQbEzN6 +ZM1CEXMaXhcCE5k2XkfwqkKVNyf92kqmbehIU14XiKLZd8HUGE9vF6W3GE4qFXzQ +4UIcAVsIhoui+9ENexKawpV3Qs7nhx0JM5Lk0Fx8Vjo9ZPBB1O90Fcl1KlUsdU48 +LwIDAQAB +-----END PUBLIC KEY-----"; + +fn current_time_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() +} + +fn generate_token(claims: &Claims, alg: Algorithm, key: &[u8]) -> String { + let header = Header::new(alg); + let key = EncodingKey::from_rsa_pem(key).unwrap(); + jsonwebtoken::encode(&header, claims, &key).unwrap() +} + +#[tokio::test] +async fn test_jwt_validation_success() { + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + let general = General { + jwt_public_key_file: Some(pub_key_path.to_str().unwrap().to_string()), + jwt_audience: Some("test_audience".to_string()), + ..Default::default() + }; + + let validator = JwtValidator::new(&general).unwrap(); + + let exp = current_time_secs() + 60; + let claims = Claims { + sub: "postgres_user".to_string(), + exp, + aud: Some(serde_json::Value::String("test_audience".to_string())), + extra: std::collections::HashMap::new(), + }; + + let token = generate_token(&claims, Algorithm::RS256, TEST_PRIVATE_KEY_PEM); + let decoded = validator.validate(&token).await.unwrap(); + + assert_eq!(decoded.sub, "postgres_user"); + assert_eq!(decoded.exp, exp); +} + +#[tokio::test] +async fn test_jwt_validation_expired() { + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + let general = General { + jwt_public_key_file: Some(pub_key_path.to_str().unwrap().to_string()), + ..Default::default() + }; + + let validator = JwtValidator::new(&general).unwrap(); + + let exp = current_time_secs() - 300; + let claims = Claims { + sub: "postgres_user".to_string(), + exp, + aud: None, + extra: std::collections::HashMap::new(), + }; + + let token = generate_token(&claims, Algorithm::RS256, TEST_PRIVATE_KEY_PEM); + let err = validator.validate(&token).await.unwrap_err(); + + assert!(matches!(err, JwtError::TokenExpired)); +} + +#[tokio::test] +async fn test_jwt_validation_wrong_algorithm() { + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + let general = General { + jwt_public_key_file: Some(pub_key_path.to_str().unwrap().to_string()), + ..Default::default() + }; + + let validator = JwtValidator::new(&general).unwrap(); + + let exp = current_time_secs() + 60; + let claims = Claims { + sub: "postgres_user".to_string(), + exp, + aud: None, + extra: std::collections::HashMap::new(), + }; + + let token = generate_token(&claims, Algorithm::RS512, TEST_PRIVATE_KEY_PEM); + let err = validator.validate(&token).await.unwrap_err(); + + assert!(matches!(err, JwtError::InvalidAlgorithm)); +} + +#[tokio::test] +async fn test_jwt_validation_missing_sub() { + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + let general = General { + jwt_public_key_file: Some(pub_key_path.to_str().unwrap().to_string()), + ..Default::default() + }; + + let validator = JwtValidator::new(&general).unwrap(); + + let exp = current_time_secs() + 60; + let claims = Claims { + sub: "".to_string(), + exp, + aud: None, + extra: std::collections::HashMap::new(), + }; + + let token = generate_token(&claims, Algorithm::RS256, TEST_PRIVATE_KEY_PEM); + let err = validator.validate(&token).await.unwrap_err(); + + assert!(matches!(err, JwtError::MissingSub)); +} + +#[tokio::test] +async fn test_jwt_validation_audience_mismatch() { + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + let general = General { + jwt_public_key_file: Some(pub_key_path.to_str().unwrap().to_string()), + jwt_audience: Some("expected_audience".to_string()), + ..Default::default() + }; + + let validator = JwtValidator::new(&general).unwrap(); + + let exp = current_time_secs() + 60; + let claims = Claims { + sub: "postgres_user".to_string(), + exp, + aud: Some(serde_json::Value::String("wrong_audience".to_string())), + extra: std::collections::HashMap::new(), + }; + + let token = generate_token(&claims, Algorithm::RS256, TEST_PRIVATE_KEY_PEM); + let err = validator.validate(&token).await.unwrap_err(); + + assert!(matches!(err, JwtError::AudienceMismatch)); +} + +#[tokio::test] +async fn test_jwt_validator_memory_leak_prevention() { + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test_mem.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + let path_str = pub_key_path.to_str().unwrap(); + + let first_load = get_static_pem_bytes(path_str).unwrap(); + let second_load = get_static_pem_bytes(path_str).unwrap(); + + assert_eq!( + first_load.as_ptr(), + second_load.as_ptr(), + "Cache did not reuse leaked memory pointer!" + ); + + let alternate_pem = b"-----BEGIN PUBLIC KEY-----\nALTERNATE_CONTENT\n-----END PUBLIC KEY-----"; + std::fs::write(&pub_key_path, alternate_pem).unwrap(); + + let third_load = get_static_pem_bytes(path_str).unwrap(); + assert_ne!( + first_load.as_ptr(), + third_load.as_ptr(), + "Cache returned stale pointer after disk file modification!" + ); +} + +#[test] +fn test_jwt_user_suffix_matching() { + // 1. With configured suffix + let suffix = Some("@edreamsodigeo.com".to_string()); + + let user_sso = "john.doe@edreamsodigeo.com"; + let user_other = "datastream"; + + let match_sso = match suffix { + Some(ref s) => user_sso.ends_with(s.as_str()), + None => true, + }; + let match_other = match suffix { + Some(ref s) => user_other.ends_with(s.as_str()), + None => true, + }; + + assert!(match_sso, "SSO suffix should have matched!"); + assert!(!match_other, "Non-SSO suffix should not have matched!"); + + // 2. Without configured suffix (should match all) + let no_suffix: Option = None; + let match_no_suffix_sso = match no_suffix { + Some(ref s) => user_sso.ends_with(s.as_str()), + None => true, + }; + let match_no_suffix_other = match no_suffix { + Some(ref s) => user_other.ends_with(s.as_str()), + None => true, + }; + + assert!( + match_no_suffix_sso, + "Should default to true when suffix is None!" + ); + assert!( + match_no_suffix_other, + "Should default to true when suffix is None!" + ); +} + +#[tokio::test] +async fn test_jwt_custom_username_claim() { + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test_claim.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + let general = General { + jwt_public_key_file: Some(pub_key_path.to_str().unwrap().to_string()), + jwt_username_claim: "email".to_string(), + ..Default::default() + }; + + let validator = JwtValidator::new(&general).unwrap(); + + let exp = current_time_secs() + 60; + let mut claims = Claims { + sub: "postgres_user_sub".to_string(), + exp, + aud: None, + extra: std::collections::HashMap::new(), + }; + claims.extra.insert( + "email".to_string(), + serde_json::Value::String("postgres_user_email@edreamsodigeo.com".to_string()), + ); + + let token = generate_token(&claims, Algorithm::RS256, TEST_PRIVATE_KEY_PEM); + let decoded = validator.validate(&token).await.unwrap(); + + let username = decoded.get_username(&general.jwt_username_claim).unwrap(); + assert_eq!(username, "postgres_user_email@edreamsodigeo.com"); +} diff --git a/pgdog/src/auth/mod.rs b/pgdog/src/auth/mod.rs index 94835d1e9..609cabe4d 100644 --- a/pgdog/src/auth/mod.rs +++ b/pgdog/src/auth/mod.rs @@ -2,9 +2,11 @@ pub mod auth_result; pub mod error; +pub mod jwt; pub mod md5; pub mod scram; pub use auth_result::AuthResult; pub use error::Error; +pub use jwt::{Claims, JwtError, JwtValidator}; pub use md5::Client; diff --git a/pgdog/src/backend/auth/azure_workload_identity.rs b/pgdog/src/backend/auth/azure_workload_identity.rs index 1e235e434..490b39d06 100644 --- a/pgdog/src/backend/auth/azure_workload_identity.rs +++ b/pgdog/src/backend/auth/azure_workload_identity.rs @@ -61,6 +61,7 @@ mod tests { server_auth: ServerAuth::AzureWorkloadIdentity, server_iam_region: None, configured_role: Role::Auto, + client_user: None, }; let (b64_token, expires_at) = token(addr).await.unwrap(); diff --git a/pgdog/src/backend/auth/rds_iam.rs b/pgdog/src/backend/auth/rds_iam.rs index ade96672d..94081f3d2 100644 --- a/pgdog/src/backend/auth/rds_iam.rs +++ b/pgdog/src/backend/auth/rds_iam.rs @@ -102,6 +102,7 @@ mod tests { server_auth: ServerAuth::RdsIam, server_iam_region: Some("us-east-1".into()), configured_role: Role::Auto, + client_user: None, } } diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index f3a840480..322e25144 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -175,8 +175,24 @@ pub(crate) fn add(user: ConfigUser) -> Result { // User already exists in users.toml. if let Some(mut existing) = existing { - // Password hasn't been set yet. - if existing.password.is_none() { + let cluster_is_offline = if let Ok(cluster) = + databases().cluster((user.name.as_str(), user.database.as_str())) + { + !cluster.online() + } else { + false + }; + + if cluster_is_offline { + debug!( + "re-creating pool for user \"{}\" on database \"{}\" because it is offline", + user.name, user.database + ); + existing.password = user.password.clone(); + add_user(existing)?; + reload_from_existing()?; + Ok(AuthResult::Ok) + } else if existing.password.is_none() { existing.password = user.password.clone(); add_user(existing)?; reload_from_existing()?; @@ -845,6 +861,32 @@ mod tests { assert_eq!(found.unwrap().password, Some("new_pass".to_string())); } + #[tokio::test] + async fn test_add_existing_user_offline_pool_recreated() { + setup_config( + crate::config::PassthroughAuth::EnabledPlain, + vec![make_user("dave", Some("pass"))], + ); + + // Initially online + let cluster = databases().cluster(("dave", "db1")).unwrap(); + assert!(cluster.online()); + + // Shut down the cluster to simulate it going offline (e.g., due to previous role-not-found auth error) + cluster.shutdown(); + assert!(!cluster.online()); + + // Now run add() again with the same user & password. + // It should notice the cluster is offline, recreate it (reload), and return Ok! + let result = add(make_user("dave", Some("pass"))); + assert!(result.is_ok()); + assert!(result.unwrap().is_ok()); + + // The newly recreated cluster must be online! + let new_cluster = databases().cluster(("dave", "db1")).unwrap(); + assert!(new_cluster.online()); + } + #[test] fn test_mirror_user_isolation() { // Test that each user gets their own mirror cluster diff --git a/pgdog/src/backend/error.rs b/pgdog/src/backend/error.rs index 3b385d2c0..5c0f63053 100644 --- a/pgdog/src/backend/error.rs +++ b/pgdog/src/backend/error.rs @@ -191,8 +191,63 @@ impl Error { pub fn is_auth(&self) -> bool { match self { Self::Auth(_) => true, - Self::ConnectionError(err) => err.code == "28000" || err.is_bad_password(), + Self::ConnectionError(err) => { + err.code == "28000" + || err.is_bad_password() + || err.code == "22023" + || err.code == "42704" + } + Self::ExecutionError(err) => { + err.code == "22023" + || err.code == "28000" + || err.code == "42704" + || err.is_bad_password() + } _ => false, } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::net::messages::ErrorResponse; + + #[test] + fn test_error_is_auth() { + // Test role-does-not-exist and other auth-related error codes + let codes = vec!["28000", "28P01", "22023", "42704"]; + + for code in codes { + let response = ErrorResponse { + code: code.to_string(), + ..Default::default() + }; + + let conn_err = Error::ConnectionError(Box::new(response.clone())); + let exec_err = Error::ExecutionError(Box::new(response)); + + assert!( + conn_err.is_auth(), + "Expected code {} to be treated as auth error in ConnectionError", + code + ); + assert!( + exec_err.is_auth(), + "Expected code {} to be treated as auth error in ExecutionError", + code + ); + } + + // Test non-auth error code + let non_auth_response = ErrorResponse { + code: "58000".to_string(), + ..Default::default() + }; + let non_auth_conn_err = Error::ConnectionError(Box::new(non_auth_response.clone())); + let non_auth_exec_err = Error::ExecutionError(Box::new(non_auth_response)); + + assert!(!non_auth_conn_err.is_auth()); + assert!(!non_auth_exec_err.is_auth()); + } +} diff --git a/pgdog/src/backend/pool/address.rs b/pgdog/src/backend/pool/address.rs index f9ecdb6b5..cf4fc384c 100644 --- a/pgdog/src/backend/pool/address.rs +++ b/pgdog/src/backend/pool/address.rs @@ -37,6 +37,8 @@ pub struct Address { /// Role given to the database at configuration time. /// For automatic roles, this can change at runtime. pub configured_role: Role, + /// Original client username (SSO / JWT user name). + pub client_user: Option, } impl From
for pgdog_stats::Address { @@ -91,6 +93,7 @@ impl Address { server_iam_region: user.server_iam_region.clone(), database_number, configured_role: database.role, + client_user: Some(user.name.clone()), } } @@ -155,6 +158,7 @@ impl Address { server_iam_region: None, database_number: 0, configured_role: Role::Primary, + client_user: None, } } } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 13f36f5ee..ce7fe95b9 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -87,6 +87,7 @@ pub struct Cluster { resharding_replication_retry_min_delay: Duration, regex_parser: RegexParser, identity: Option, + user_read_only: bool, } /// Sharding configuration from the cluster. @@ -174,6 +175,7 @@ pub struct ClusterConfig<'a> { pub regex_parser_limit: usize, pub pub_sub_enabled: bool, pub identity: &'a Option, + pub user_read_only: bool, } impl<'a> ClusterConfig<'a> { @@ -193,6 +195,13 @@ impl<'a> ClusterConfig<'a> { .map(|shard| shard.pooler_mode()) .unwrap_or(user.pooler_mode.unwrap_or(general.pooler_mode)); + let user_read_only = user.read_only.unwrap_or(false); + let rw_split = if user_read_only { + ReadWriteSplit::ExcludePrimary + } else { + general.read_write_split + }; + Self { name: &user.database, passwords: user.passwords(), @@ -204,7 +213,7 @@ impl<'a> ClusterConfig<'a> { sharded_tables, multi_tenant, rw_strategy: general.read_write_strategy, - rw_split: general.read_write_split, + rw_split, schema_admin: user.schema_admin, cross_shard_disabled: user .cross_shard_disabled @@ -237,6 +246,7 @@ impl<'a> ClusterConfig<'a> { regex_parser_limit: general.regex_parser_limit, pub_sub_enabled: general.pub_sub_enabled(), identity: &user.identity, + user_read_only, } } } @@ -283,6 +293,7 @@ impl Cluster { regex_parser_limit, pub_sub_enabled, identity, + user_read_only, } = config; let identifier = Arc::new(DatabaseUser { @@ -343,6 +354,7 @@ impl Cluster { ), regex_parser: RegexParser::new(regex_parser_limit, query_parser), identity: identity.clone(), + user_read_only, } } @@ -427,6 +439,11 @@ impl Cluster { } /// Get pooler mode. + /// Check if the user is configured as read-only. + pub fn user_read_only(&self) -> bool { + self.user_read_only + } + pub fn pooler_mode(&self) -> PoolerMode { self.pooler_mode } @@ -943,6 +960,10 @@ mod test { pub fn set_read_write_strategy(&mut self, rw_strategy: ReadWriteStrategy) { self.rw_strategy = rw_strategy; } + + pub fn set_user_read_only(&mut self, read_only: bool) { + self.user_read_only = read_only; + } } #[test] diff --git a/pgdog/src/backend/pool/connection/mod.rs b/pgdog/src/backend/pool/connection/mod.rs index 05716c4f5..7eaa60a3e 100644 --- a/pgdog/src/backend/pool/connection/mod.rs +++ b/pgdog/src/backend/pool/connection/mod.rs @@ -134,11 +134,14 @@ impl Connection { /// Try to get a connection for the given route. async fn try_conn(&mut self, request: &Request, route: &Route) -> Result<(), Error> { + let mut req = *request; + req.replica_only = route.is_read(); + if let Shard::Direct(shard) = route.shard() { let mut server = if route.is_read() { - self.cluster()?.replica(*shard, request).await? + self.cluster()?.replica(*shard, &req).await? } else { - self.cluster()?.primary(*shard, request).await? + self.cluster()?.primary(*shard, &req).await? }; // Cleanup session mode connections when @@ -168,9 +171,9 @@ impl Connection { continue; }; let mut server = if route.is_read() { - shard.replica(request).await? + shard.replica(&req).await? } else { - shard.primary(request).await? + shard.primary(&req).await? }; if self.session_mode() { diff --git a/pgdog/src/backend/pool/error.rs b/pgdog/src/backend/pool/error.rs index 1f74de990..bf0d66eb1 100644 --- a/pgdog/src/backend/pool/error.rs +++ b/pgdog/src/backend/pool/error.rs @@ -76,9 +76,17 @@ pub enum Error { #[error("replica lag")] ReplicaLag, + + #[error("auth error")] + Auth, } impl Error { + /// Check if this is an authentication error. + pub fn is_auth(&self) -> bool { + matches!(self, Self::Auth) + } + /// Transient availability fault worth retrying. /// /// Non-retryable: config errors, admin decisions, programming errors. @@ -98,6 +106,8 @@ impl Error { | Self::UntrackedConnCheckin(_) // Deliberate shutdown. | Self::FastShutdown + // Authentication errors. + | Self::Auth ) } } @@ -133,5 +143,6 @@ mod tests { assert!(!Error::PubSubDisabled.is_retryable()); assert!(!Error::FastShutdown.is_retryable()); assert!(!Error::NoShard(0).is_retryable()); + assert!(!Error::Auth.is_retryable()); } } diff --git a/pgdog/src/backend/pool/lb/mod.rs b/pgdog/src/backend/pool/lb/mod.rs index e37a90fc8..6868dde0d 100644 --- a/pgdog/src/backend/pool/lb/mod.rs +++ b/pgdog/src/backend/pool/lb/mod.rs @@ -333,13 +333,19 @@ impl LoadBalancer { .filter(|target| !target.pool.config().resharding_only) // Don't let reads on resharding-only replicas. .collect(); - let primary_reads = match self.rw_split { - IncludePrimary => true, - IncludePrimaryIfReplicaBanned => candidates.iter().any(|target| target.ban.banned()), - // we read from the primary if we have no replicas - ExcludePrimary => !candidates - .iter() - .any(|target| matches!(target.role(), Role::Replica | Role::Auto)), + let primary_reads = if request.replica_only { + false + } else { + match self.rw_split { + IncludePrimary => true, + IncludePrimaryIfReplicaBanned => { + candidates.iter().any(|target| target.ban.banned()) + } + // we read from the primary if we have no replicas + ExcludePrimary => !candidates + .iter() + .any(|target| matches!(target.role(), Role::Replica | Role::Auto)), + } }; if !primary_reads { diff --git a/pgdog/src/backend/pool/monitor.rs b/pgdog/src/backend/pool/monitor.rs index 8c7049307..53b93baaa 100644 --- a/pgdog/src/backend/pool/monitor.rs +++ b/pgdog/src/backend/pool/monitor.rs @@ -146,9 +146,13 @@ impl Monitor { if let ShouldCreate::Yes { reason, .. } = should_create { info!("new connection requested: {} [{}]", should_create, self.pool.addr()); let ok = match self.replenish(reason).await { - Ok(ok) => ok, + Ok(r) => r, Err(err) => { error!("monitor error: {}", err); + if err.is_auth() { + warn!("Authentication failed with backend database, shutting down pool [{}] to prevent connection storm", self.pool.addr()); + self.pool.shutdown(); + } false } }; @@ -310,7 +314,7 @@ impl Monitor { } Ok(true) } - _ => Ok(false), + Err(err) => Err(err), } } @@ -446,6 +450,19 @@ impl Monitor { // We tried all passwords and they were all wrong. if err.is_auth() { pool.lock().stats.counts.auth_attempts += pool.addr().passwords.len(); + error!( + "{}error connecting to server: {} [{}]", + if attempt > 0 { + format!("[attempt {}] ", attempt) + } else { + String::new() + }, + err, + pool.addr(), + ); + return Err(Error::Auth); + } else { + error = Error::ServerError; } error!( "{}error connecting to server: {} [{}]", @@ -457,7 +474,6 @@ impl Monitor { err, pool.addr(), ); - error = Error::ServerError; } Err(_) => { diff --git a/pgdog/src/backend/pool/request.rs b/pgdog/src/backend/pool/request.rs index 597c3772b..6ef6af5c6 100644 --- a/pgdog/src/backend/pool/request.rs +++ b/pgdog/src/backend/pool/request.rs @@ -8,6 +8,7 @@ pub struct Request { pub id: FrontendPid, pub created_at: Instant, pub read: bool, + pub replica_only: bool, } impl Request { @@ -16,6 +17,7 @@ impl Request { id, created_at: Instant::now(), read, + replica_only: false, } } @@ -24,6 +26,7 @@ impl Request { id, created_at: Instant::now(), read: false, + replica_only: false, } } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 5d9356b3e..27fd7abfe 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -111,6 +111,22 @@ impl Server { Ok(mut server) => { auth_secret.valid(true); server.password_attempts = idx + 1; + + if let Some(ref client_user) = addr.client_user { + if client_user != &addr.user { + debug!( + "executing SET ROLE \"{}\" for server connection [{}]", + client_user, addr + ); + server + .execute(crate::net::Query::new(format!( + r#"SET ROLE "{}""#, + client_user + ))) + .await?; + } + } + return Ok(server); } Err(Error::ConnectionError(error)) => { diff --git a/pgdog/src/config/convert.rs b/pgdog/src/config/convert.rs index 14c4f8c3c..8be09782b 100644 --- a/pgdog/src/config/convert.rs +++ b/pgdog/src/config/convert.rs @@ -1,7 +1,7 @@ use super::Error; use crate::net::{Parameters, Password}; -use super::User; +use super::{General, User}; pub fn user_from_params(params: &Parameters, password: &Password) -> Result { let user = params @@ -20,6 +20,34 @@ pub fn user_from_params(params: &Parameters, password: &Password) -> Result User { + User { + name: jwt_sub.to_owned(), + database: database.to_owned(), + server_user: general.jwt_server_user.clone().or_else(|| { + // Fall back to the connection_user when no dedicated server user is configured. + Some(connection_user.to_owned()) + }), + server_password: general.jwt_server_password.clone(), + read_only: if general.jwt_user_auto_provision_read_only { + Some(true) + } else { + None + }, + ..Default::default() + } +} + // impl User { // pub(crate) fn from_params(params: &Parameters, password: &Password) -> Result { // let user = params diff --git a/pgdog/src/config/mod.rs b/pgdog/src/config/mod.rs index 835a0f10e..13c6ce7bb 100644 --- a/pgdog/src/config/mod.rs +++ b/pgdog/src/config/mod.rs @@ -47,6 +47,9 @@ use once_cell::sync::Lazy; static CONFIG: Lazy> = Lazy::new(|| ArcSwap::from_pointee(ConfigAndUsers::default())); +static JWT_VALIDATOR: Lazy>>> = + Lazy::new(|| ArcSwap::from_pointee(None)); + static LOCK: Lazy> = Lazy::new(|| Mutex::new(())); /// Load configuration. @@ -54,6 +57,19 @@ pub fn config() -> Arc { CONFIG.load().clone() } +/// Get the active JWT validator, creating it lazily if needed. +pub fn jwt_validator() -> Result, crate::auth::JwtError> { + if let Some(ref validator) = **JWT_VALIDATOR.load() { + Ok(validator.clone()) + } else { + let config = config(); + let validator = crate::auth::JwtValidator::new(&config.config.general)?; + let arc_val = Arc::new(validator); + JWT_VALIDATOR.store(Arc::new(Some(arc_val.clone()))); + Ok(arc_val) + } +} + /// Load the configuration file from disk. pub fn load(config: &PathBuf, users: &PathBuf) -> Result { let config = ConfigAndUsers::load(config, users)?; @@ -65,6 +81,17 @@ pub fn set(mut config: ConfigAndUsers) -> Result { for table in config.config.sharded_tables.iter_mut() { table.load_centroids()?; } + + let validator = if config.config.general.auth_type == AuthType::Jwt { + Some(Arc::new( + crate::auth::JwtValidator::new(&config.config.general) + .map_err(|e| Error::ParseError(e.to_string()))?, + )) + } else { + None + }; + JWT_VALIDATOR.store(Arc::new(validator)); + CONFIG.store(Arc::new(config.clone())); Ok(config) } diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 8cf1955b9..c2af4716a 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -22,6 +22,7 @@ use crate::backend::{ pool::{Connection, Request}, }; use crate::config::convert::user_from_params; +use crate::config::jwt_validator; use crate::config::{self, AuthType, ConfigAndUsers, config}; use crate::frontend::ClientComms; use crate::frontend::client::query_engine::{QueryEngine, QueryEngineContext}; @@ -209,6 +210,10 @@ impl Client { } AuthType::Trust => AuthResult::Ok, + + // JWT users are authenticated before check_password is called. + // Treat Jwt like Scram for admin auth (fall through to password check). + AuthType::Jwt => AuthResult::NoPasswordMatch, }; Ok(result) @@ -238,6 +243,20 @@ impl Client { let comms = ClientComms::new(id); let log_connections = config.config.general.log_connections; + // For JWT auth: the username extracted from the validated JWT claims (e.g. `sub`). + // When set, this overrides `user` for backend connection purposes. + let mut jwt_effective_user: Option = None; + + // Determine if this connection should use JWT authentication. + let use_jwt = !admin + && matches!(auth_type, AuthType::Jwt) + && config + .config + .general + .jwt_user_suffix + .as_deref() + .map_or(true, |suffix| user.ends_with(suffix)); + // Check if we need to ask the client for its password in plaintext // because we don't actually have it configured. // @@ -248,6 +267,47 @@ impl Client { // map, so authenticate directly against the configured admin password. let passwords = [PasswordKind::Plain(admin_password.clone())]; Self::check_password(&mut stream, user, auth_type, &passwords).await? + } else if use_jwt { + // JWT authentication: request a cleartext password (the JWT token) from the client, + // validate it, and optionally auto-provision a connection pool. + stream + .send_flush(&Authentication::ClearTextPassword) + .await?; + let response = stream.read().await?; + let response = Password::from_bytes(response.to_bytes())?; + if let Some(token) = response.password() { + match jwt_validator() { + Ok(validator) => match validator.validate(token).await { + Ok(claims) => { + let sub = claims + .get_username(config.config.general.jwt_username_claim.as_str()); + if let Some(sub) = sub { + jwt_effective_user = Some(sub.to_string()); + if config.config.general.jwt_user_auto_provision { + use crate::config::convert::user_from_jwt; + let jwt_user = + user_from_jwt(user, database, &sub, &config.config.general); + databases::add(jwt_user)? + } else { + AuthResult::Ok + } + } else { + AuthResult::NoPasswordMatch + } + } + Err(err) => { + debug!("JWT validation failed for user \"{}\": {}", user, err); + AuthResult::NoPasswordMatch + } + }, + Err(err) => { + debug!("JWT validator initialization error: {}", err); + AuthResult::NoPasswordMatch + } + } + } else { + AuthResult::NoPasswordMessage + } } else if passthrough { // Get the password. We always need it because we need to check if // it's current and hasn't been changed. @@ -288,14 +348,20 @@ impl Client { } }; + // When JWT auth succeeded, `jwt_effective_user` holds the username from the token's claim. + // For all backend operations, use that name instead of the connection string username. + let effective_user = jwt_effective_user.as_deref().unwrap_or(user); + if !auth_result.is_ok() { if log_connections { warn!( r#"user "{}" and database "{}" auth error: {}"#, - user, database, auth_result + effective_user, database, auth_result ); } - stream.fatal(ErrorResponse::auth(user, database)).await?; + stream + .fatal(ErrorResponse::auth(effective_user, database)) + .await?; return Ok(None); } else { stream.send(&Authentication::Ok).await?; @@ -312,11 +378,13 @@ impl Client { return Ok(None); } - let mut conn = match Connection::new(user, database, admin) { + let mut conn = match Connection::new(effective_user, database, admin) { Ok(conn) => conn, Err(err) => { debug!("connection error: {}", err); - stream.fatal(ErrorResponse::auth(user, database)).await?; + stream + .fatal(ErrorResponse::auth(effective_user, database)) + .await?; return Ok(None); } }; @@ -332,7 +400,7 @@ impl Client { addr ); stream - .fatal(ErrorResponse::connection(user, database)) + .fatal(ErrorResponse::connection(effective_user, database)) .await?; return Ok(None); } else { @@ -352,7 +420,7 @@ impl Client { if config.config.general.log_connections { info!( r#"client "{}" connected to database "{}" [{}, auth: {}] {}"#, - user, + effective_user, database, addr, if passthrough { @@ -366,7 +434,7 @@ impl Client { debug!( "client \"{}\" startup parameters: {} [{}]", - user, params, addr + effective_user, params, addr ); Ok(Some(Self { diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index 63daf0159..f688ed021 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -22,8 +22,8 @@ use crate::{ }, net::{ Bind, Close, CommandComplete, DataRow, Describe, ErrorResponse, Execute, Field, Flush, - Format, FromBytes, Message, Parameters, Parse, ProtocolVersion, Query, ReadyForQuery, - RowDescription, Sync, Terminate, ToBytes, + Format, FromBytes, Message, Parameters, Parse, Password, ProtocolVersion, Query, + ReadyForQuery, RowDescription, Sync, Terminate, ToBytes, }, state::State, }; @@ -525,3 +525,137 @@ async fn test_query_timeout() { let state = pools[0].state(); assert_eq!(state.force_close, 1); } + +#[tokio::test] +async fn test_client_login_jwt() { + use crate::config::{AuthType, config, load_test, set}; + use jsonwebtoken::{Algorithm, EncodingKey, Header}; + use serde::{Deserialize, Serialize}; + use std::time::{SystemTime, UNIX_EPOCH}; + + crate::logger(); + load_test(); + + // 1. Generate keys & write public key to temp file + const TEST_PRIVATE_KEY_PEM: &[u8] = b"-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCmYC6Np7JXYe7a +PBv7SB84oz3Cx8mqVFFgNSvqTsqROT3NCK8aEtnPwsStxYpkXw7cKD2M6qFx7Y12 +CpoNQWL12dY0x3MPDWV+uQTejbklq5NDOh9IID7+RqNMO/t8/yKwY/HzJutOU2SV +CTFlJKG8KUJOQTFnWYC+rOcMk3L1befIdyQM3IDD2zBAJtDGTClZBrrjkEZmsb39 +lBsTM3pkzUIRcxpeFwITmTZeR/CqQpU3J/3aSqZt6EhTXheIotl3wdQYT28XpbcY +TioVfNDhQhwBWwiGi6L70Q17EprClXdCzueHHQkzkuTQXHxWOj1k8EHU73QVyXUq +VSx1TjwvAgMBAAECggEAO6rP10aWi4cYQZUAFgC6DbZhjmrXNKpTmtTG4JuMQ0PL +ma4tGgU7rypzHbz0EmYS7rrRxClbZ//hVT2dHPbftjr++uOyrGnKBgX1rJkYFt3v +DNOZ52SFIu0TYGI8oYngl3DokyLYjbkTn+1xlQvrow8K9ASmYqGzLe7VV+nDdyf0 +w3xPHigjXsSr+j0INa5IXe7HGDsCFAIOfAg0ruGPKEkKg1Sxwq6/BFr8Rq28kaMv +wlMtHDyqfxXezf3JPjoe9hnNCi6d6n/buI/5qYHMR+r4Eb5iZbMzCEeOz5BT26Ku +YHa36jJC3b5z7N1B1g2DgPYWIyvsu1LnEGfFJaHmSQKBgQDSS85TXvxMM0A419Jf +LzVjWSN4dkqoJvM5/4ra7BB4/2353FP/TeSkGLARZ0WfpvFLvk/iTeH7IyMQTEB3 +xnoWT/kuPhLscwDwRZRgr0O+S6BFNMzGafI/SaD/TUy1lize9EKK2zYRsfT+EmTB +0+hX5ILMxGqy3AqcH7TtcHTnKQKBgQDKiMlw5TcZ84gLW3HMVwiBIIu5xTUUrAq1 +65al69lWR2woma9eg2a70ES+oJpLkuM4FTXZq0/X/XXgmAQxszu091a6GdPb5XV8 +lYx31xpBiZwyeB2y5yEbSq+AawzFMIUWmRH94+mRv2+wsUmUZ84TVHua22A3+Qur +l1/GoBYrlwKBgQDDie09pFKgX/9VW4inLPRNfnL27bcZh64dvblVOq9OcuPFstL/ +z2PMGZCNfiNFAivXrAwHdzerFs7htqUzOgAHgzFFiD58UasLvwbqp80rwpIyB5ho +3dZ8dnAXM78iEZODdEfzaUVrSrdtD5lUiT+/iiD9WZ2E1gmfhfPr2+c3kQKBgHvf +9fVK/MyumwL3Rz8H7HeuBEf3SmP+Zf6mvVl2S1PuE0Ux2oUgMXGmDKXbbQPUL41Z +y7n6gbdFmxdnYwlS6q3gqfbhXScdzSIKBgQ2WCTFmfd0aBXIMAOVRopw7zqcVopf +zRVQlMdEI3gatzpB01UXUxKAIvWZKX4l87p0p5q5AoGBALh91ATgdrEhe16TV0ru +8B89ZtMcgqaKXVlPO/Rr6fBT/VhSW9KV52sik4Jea5AFfZty/DzhP8pXbJtZjHjA +d5Bja4qZrKG67E92JhoCKMso/JaQELsln9GzQVkVsgLxF1IP0k797znJIfHDFvma +ntQFcU46jCHtpmL7OyzV4NoT +-----END PRIVATE KEY-----"; + + const TEST_PUBLIC_KEY_PEM: &[u8] = b"-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApmAujaeyV2Hu2jwb+0gf +OKM9wsfJqlRRYDUr6k7KkTk9zQivGhLZz8LErcWKZF8O3Cg9jOqhce2NdgqaDUFi +9dnWNMdzDw1lfrkE3o25JauTQzofSCA+/kajTDv7fP8isGPx8ybrTlNklQkxZSSh +vClCTkExZ1mAvqznDJNy9W3nyHckDNyAw9swQCbQxkwpWQa645BGZrG9/ZQbEzN6 +ZM1CEXMaXhcCE5k2XkfwqkKVNyf92kqmbehIU14XiKLZd8HUGE9vF6W3GE4qFXzQ +4UIcAVsIhoui+9ENexKawpV3Qs7nhx0JM5Lk0Fx8Vjo9ZPBB1O90Fcl1KlUsdU48 +LwIDAQAB +-----END PUBLIC KEY-----"; + + let temp_dir = tempfile::tempdir().unwrap(); + let pub_key_path = temp_dir.path().join("test_front.pub.pem"); + std::fs::write(&pub_key_path, TEST_PUBLIC_KEY_PEM).unwrap(); + + // 2. Configure General for JWT auth with suffix and auto-provisioning + let mut config = (*config()).clone(); + config.config.general.auth_type = AuthType::Jwt; + config.config.general.jwt_public_key_file = Some(pub_key_path.to_str().unwrap().to_string()); + config.config.general.jwt_user_suffix = Some("@edreamsodigeo.com".to_string()); + config.config.general.jwt_user_auto_provision = true; + config.config.general.jwt_user_auto_provision_read_only = true; + set(config).unwrap(); + + // 3. Generate a valid JWT token + #[derive(Debug, Serialize, Deserialize)] + struct Claims { + sub: String, + exp: u64, + } + let exp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + + 120; + let claims = Claims { + sub: "postgres_user".to_string(), + exp, + }; + let header = Header::new(Algorithm::RS256); + let key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY_PEM).unwrap(); + let token = jsonwebtoken::encode(&header, &claims, &key).unwrap(); + + // 4. Start mock pgdog listener & connect + let addr = "127.0.0.1:0".to_string(); + let listener = TcpListener::bind(&addr).await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let server_handle = tokio::spawn(async move { + let (stream, addr) = listener.accept().await.unwrap(); + let stream = Stream::plain(stream, 4096); + + let mut params = crate::net::parameter::Parameters::default(); + params.insert("user", "john.doe@edreamsodigeo.com"); + params.insert("database", "pgdog"); + + Client::spawn( + stream, + params, + addr, + crate::config::config(), + ProtocolVersion::V3_0, + ) + .await + }); + + let mut conn = TcpStream::connect(&format!("127.0.0.1:{}", port)) + .await + .unwrap(); + + // 5. Read cleartext password request from pgdog + let mut buf = [0u8; 1024]; + let n = conn.read(&mut buf).await.unwrap(); + assert!(n > 0); + // Message type 'R' (Authentication) and subtype 3 (ClearTextPassword) + assert_eq!(buf[0], b'R'); + assert_eq!(buf[8], 3); // cleartext password indicator + + // 6. Send PasswordMessage with the JWT token + let password_msg = Password::new_password(&token).to_bytes(); + conn.write_all(&password_msg).await.unwrap(); + + // 7. Verify login success (AuthenticationOk message: 'R' with subtype 0) + let n = conn.read(&mut buf).await.unwrap(); + assert!(n > 0); + let contains_auth_ok = buf[..n].windows(9).any(|w| w[0] == b'R' && w[8] == 0); + assert!( + contains_auth_ok, + "Auth OK message not found in connection response!" + ); + + drop(conn); + server_handle.await.unwrap().unwrap(); +} diff --git a/pgdog/src/frontend/router/parser/query/mod.rs b/pgdog/src/frontend/router/parser/query/mod.rs index e208cf629..03e9c5937 100644 --- a/pgdog/src/frontend/router/parser/query/mod.rs +++ b/pgdog/src/frontend/router/parser/query/mod.rs @@ -102,6 +102,7 @@ impl QueryParser { /// Parse a query and return a command. pub fn parse(&mut self, context: RouterContext) -> Result { let mut context = QueryParserContext::new(context)?; + let user_read_only = context.router_context.cluster.user_read_only(); let mut command = if context.query().is_ok() { self.write_override = context.write_override(); @@ -112,7 +113,9 @@ impl QueryParser { }; match &mut command { - Command::Query(route) | Command::Set { route, .. } => { + Command::Query(route) + | Command::Set { route, .. } + | Command::StartTransaction { route, .. } => { if route.is_cross_shard() && context.shards == 1 { context .shards_calculator @@ -130,6 +133,12 @@ impl QueryParser { _ => route.set_read(true), } } + + // If the user is configured as read-only, force all routes to read-only (replica). + // This prevents write-escalation via SET pgdog.role = 'primary' or comments. + if user_read_only { + route.set_read(true); + } } _ => (), diff --git a/pgdog/src/frontend/router/parser/query/test/setup.rs b/pgdog/src/frontend/router/parser/query/test/setup.rs index 05108e1a3..731e9fcbd 100644 --- a/pgdog/src/frontend/router/parser/query/test/setup.rs +++ b/pgdog/src/frontend/router/parser/query/test/setup.rs @@ -88,6 +88,12 @@ impl QueryParserTest { self } + /// Set whether the user is read-only. + pub(crate) fn with_user_read_only(mut self, read_only: bool) -> Self { + self.cluster.set_user_read_only(read_only); + self + } + /// Enable expanded explain for this test. pub(crate) fn with_expanded_explain(mut self) -> Self { let mut updated = config().deref().clone(); diff --git a/pgdog/src/frontend/router/parser/query/test/test_bypass.rs b/pgdog/src/frontend/router/parser/query/test/test_bypass.rs index 8c7909ab9..761834def 100644 --- a/pgdog/src/frontend/router/parser/query/test/test_bypass.rs +++ b/pgdog/src/frontend/router/parser/query/test/test_bypass.rs @@ -54,6 +54,21 @@ async fn test_primary() { } } +#[tokio::test] +async fn test_read_only_user_blocks_primary_escalation() { + let mut test = setup() + .with_user_read_only(true) + .with_param("pgdog.role", "primary"); + + for query in QUERIES { + let result = test.try_execute(vec![Query::new(query).into()]).unwrap(); + // Since the user is configured as read-only, pgDog forces the route to read-only (replica) + // even though they requested pgdog.role=primary. + assert!(result.route().is_read()); + assert_eq!(result.route().shard(), &Shard::Direct(0)) + } +} + #[tokio::test] async fn test_no_hints() { let mut test = setup(); From fa587b0c9993979de9ba80c4126d43112dab5e18 Mon Sep 17 00:00:00 2001 From: Marco Palmisano Date: Wed, 17 Jun 2026 23:20:38 +0200 Subject: [PATCH 2/5] test: add JWT integration test suite with docker-compose MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds integration/jwt/ with: - docker-compose.yml — postgres + pgdog (JWT mode) + python test runner - Dockerfile.pgdog — builds pgdog from source - Dockerfile.test — python test image - pgdog.toml / users.toml — JWT-mode config pointing at postgres service - generate_keys.sh — generates RSA key pair for local testing - conftest.py — pytest fixtures: key loading, token_factory, DSN helpers - test_jwt.py — E2E tests covering: - valid JWT connects and queries succeed - auto-provisioned pools per unique sub claim - simultaneous multi-user sessions - expired / tampered / plain-string tokens rejected - transactions (commit + rollback) through proxy - custom claim passthrough Run locally: cd integration/jwt bash generate_keys.sh docker-compose up --build --abort-on-container-exit Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- integration/jwt/Dockerfile.pgdog | 24 +++++ integration/jwt/conftest.py | 89 ++++++++++++++++ integration/jwt/docker-compose.yml | 51 ++++++++++ integration/jwt/generate_keys.sh | 19 ++++ integration/jwt/pgdog.toml | 27 +++++ integration/jwt/requirements.txt | 4 + integration/jwt/test_jwt.py | 156 +++++++++++++++++++++++++++++ integration/jwt/users.toml | 4 + 8 files changed, 374 insertions(+) create mode 100644 integration/jwt/Dockerfile.pgdog create mode 100644 integration/jwt/conftest.py create mode 100644 integration/jwt/docker-compose.yml create mode 100755 integration/jwt/generate_keys.sh create mode 100644 integration/jwt/pgdog.toml create mode 100644 integration/jwt/requirements.txt create mode 100644 integration/jwt/test_jwt.py create mode 100644 integration/jwt/users.toml diff --git a/integration/jwt/Dockerfile.pgdog b/integration/jwt/Dockerfile.pgdog new file mode 100644 index 000000000..03fb2e639 --- /dev/null +++ b/integration/jwt/Dockerfile.pgdog @@ -0,0 +1,24 @@ +FROM rust:1.80-slim AS builder + +RUN apt-get update && apt-get install -y \ + libssl-dev \ + pkg-config \ + cmake \ + clang \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +COPY . . +RUN cargo build --release --bin pgdog + +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y \ + libssl3 \ + ca-certificates \ + postgresql-client \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /app/target/release/pgdog /usr/local/bin/pgdog + +EXPOSE 6432 +ENTRYPOINT ["pgdog"] diff --git a/integration/jwt/conftest.py b/integration/jwt/conftest.py new file mode 100644 index 000000000..2bbf4a0ef --- /dev/null +++ b/integration/jwt/conftest.py @@ -0,0 +1,89 @@ +""" +Shared fixtures for JWT integration tests. + +Keys are read from /keys/ (mounted in Docker) or from the local ./keys/ directory +when running directly. The generate_keys.sh script creates them. +""" + +import os +import time +from pathlib import Path + +import jwt as pyjwt +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.backends import default_backend + + +# ── Key loading ─────────────────────────────────────────────────────────────── + +def _find_keys_dir() -> Path: + candidates = [ + Path("/keys"), + Path(__file__).parent / "keys", + ] + for p in candidates: + if (p / "private.pem").exists(): + return p + raise FileNotFoundError( + "RSA key pair not found. Run integration/jwt/generate_keys.sh first." + ) + + +@pytest.fixture(scope="session") +def keys_dir() -> Path: + return _find_keys_dir() + + +@pytest.fixture(scope="session") +def private_key(keys_dir): + return (keys_dir / "private.pem").read_bytes() + + +@pytest.fixture(scope="session") +def public_key(keys_dir): + return (keys_dir / "public.pem").read_bytes() + + +# ── JWT helpers ──────────────────────────────────────────────────────────────── + +def make_token(private_key: bytes, sub: str, exp_offset: int = 300, **extra) -> str: + """Create a signed RS256 JWT token.""" + payload = { + "sub": sub, + "iat": int(time.time()), + "exp": int(time.time()) + exp_offset, + **extra, + } + return pyjwt.encode(payload, private_key, algorithm="RS256") + + +@pytest.fixture(scope="session") +def token_factory(private_key): + """Return a callable that creates JWT tokens signed with the test private key.""" + def _factory(sub: str, exp_offset: int = 300, **extra) -> str: + return make_token(private_key, sub, exp_offset, **extra) + return _factory + + +# ── Connection helpers ───────────────────────────────────────────────────────── + +PGDOG_HOST = os.environ.get("PGDOG_HOST", "127.0.0.1") +PGDOG_PORT = int(os.environ.get("PGDOG_PORT", "6432")) +POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "127.0.0.1") +POSTGRES_PORT = int(os.environ.get("POSTGRES_PORT", "5432")) + + +def pgdog_dsn(user: str, token: str, dbname: str = "pgdog") -> str: + return ( + f"host={PGDOG_HOST} port={PGDOG_PORT} " + f"dbname={dbname} user={user} password={token}" + ) + + +def postgres_dsn(user: str = "pgdog", password: str = "pgdog", dbname: str = "pgdog") -> str: + return ( + f"host={POSTGRES_HOST} port={POSTGRES_PORT} " + f"dbname={dbname} user={user} password={password}" + ) diff --git a/integration/jwt/docker-compose.yml b/integration/jwt/docker-compose.yml new file mode 100644 index 000000000..541107390 --- /dev/null +++ b/integration/jwt/docker-compose.yml @@ -0,0 +1,51 @@ +version: "3.9" + +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: pgdog + POSTGRES_PASSWORD: pgdog + POSTGRES_DB: pgdog + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U pgdog"] + interval: 2s + timeout: 5s + retries: 10 + + pgdog: + build: + context: ../.. + dockerfile: integration/jwt/Dockerfile.pgdog + depends_on: + postgres: + condition: service_healthy + ports: + - "6432:6432" + volumes: + - ./keys:/keys:ro + - ./pgdog.toml:/pgdog.toml:ro + - ./users.toml:/users.toml:ro + command: ["--config", "/pgdog.toml", "--users", "/users.toml"] + healthcheck: + test: ["CMD-SHELL", "pg_isready -h 127.0.0.1 -p 6432 -U pgdog -d pgdog"] + interval: 2s + timeout: 5s + retries: 20 + + test: + build: + context: . + dockerfile: Dockerfile.test + depends_on: + pgdog: + condition: service_healthy + volumes: + - ./keys:/keys:ro + environment: + PGDOG_HOST: pgdog + PGDOG_PORT: "6432" + POSTGRES_HOST: postgres + POSTGRES_PORT: "5432" diff --git a/integration/jwt/generate_keys.sh b/integration/jwt/generate_keys.sh new file mode 100755 index 000000000..ce8308537 --- /dev/null +++ b/integration/jwt/generate_keys.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Generate RSA key pair for JWT signing in local tests. +set -euo pipefail + +KEYS_DIR="$(cd "$(dirname "$0")" && pwd)/keys" +mkdir -p "$KEYS_DIR" + +if [[ -f "$KEYS_DIR/private.pem" && -f "$KEYS_DIR/public.pem" ]]; then + echo "Keys already exist in $KEYS_DIR — skipping generation." + exit 0 +fi + +openssl genrsa -out "$KEYS_DIR/private.pem" 2048 +openssl rsa -in "$KEYS_DIR/private.pem" -pubout -out "$KEYS_DIR/public.pem" + +chmod 600 "$KEYS_DIR/private.pem" +chmod 644 "$KEYS_DIR/public.pem" + +echo "Keys written to $KEYS_DIR" diff --git a/integration/jwt/pgdog.toml b/integration/jwt/pgdog.toml new file mode 100644 index 000000000..68a3a96f7 --- /dev/null +++ b/integration/jwt/pgdog.toml @@ -0,0 +1,27 @@ +[general] +host = "0.0.0.0" +port = 6432 +workers = 2 +auth_type = "jwt" +jwt_public_key_file = "/keys/public.pem" +jwt_username_claim = "sub" +jwt_user_auto_provision = true +jwt_user_auto_provision_read_only = false +jwt_server_user = "pgdog" +jwt_server_password = "pgdog" +connect_timeout = 5_000 +checkout_timeout = 5_000 +query_timeout = 10_000 + +[admin] +name = "admin" +user = "admin" +password = "pgdog" + +[[databases]] +name = "pgdog" +host = "postgres" +port = 5432 +database = "pgdog" +user = "pgdog" +password = "pgdog" diff --git a/integration/jwt/requirements.txt b/integration/jwt/requirements.txt new file mode 100644 index 000000000..855e4e245 --- /dev/null +++ b/integration/jwt/requirements.txt @@ -0,0 +1,4 @@ +psycopg==3.2.6 +pytest==8.3.5 +PyJWT==2.9.0 +cryptography==43.0.3 diff --git a/integration/jwt/test_jwt.py b/integration/jwt/test_jwt.py new file mode 100644 index 000000000..c65ca8b5d --- /dev/null +++ b/integration/jwt/test_jwt.py @@ -0,0 +1,156 @@ +""" +End-to-end JWT authentication tests for pgDog. + +Each test connects through pgDog using a JWT token as the password. +pgDog validates the token, extracts the `sub` claim, and provisions +a backend connection pool on first login. + +Prerequisites: + - generate_keys.sh has been run (keys/ dir exists) + - docker-compose up (postgres + pgdog) OR run.sh +""" + +import time + +import psycopg +import pytest + +from conftest import pgdog_dsn, postgres_dsn, make_token + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + + +def connect_via_jwt(user: str, token: str, dbname: str = "pgdog", autocommit: bool = True): + conn = psycopg.connect(pgdog_dsn(user, token, dbname)) + conn.autocommit = autocommit + return conn + + +def connect_direct(user: str = "pgdog", password: str = "pgdog"): + conn = psycopg.connect(postgres_dsn(user, password)) + conn.autocommit = True + return conn + + +# ── Test suite ───────────────────────────────────────────────────────────────── + + +class TestJwtLogin: + """Successful JWT login and basic query execution.""" + + def test_valid_jwt_connects(self, token_factory): + """A valid JWT token authenticates and allows a simple query.""" + token = token_factory("alice") + with connect_via_jwt("alice", token) as conn: + row = conn.execute("SELECT current_user").fetchone() + # pgDog executed SET ROLE "alice"; Postgres returns the role name. + assert row is not None + + def test_query_returns_correct_data(self, token_factory): + """After JWT login, ordinary SELECT works.""" + token = token_factory("bob") + with connect_via_jwt("bob", token) as conn: + row = conn.execute("SELECT 1 + 1 AS result").fetchone() + assert row[0] == 2 + + def test_jwt_user_auto_provisioned_on_first_connect(self, token_factory): + """Each unique sub claim gets its own auto-provisioned pool (no pre-config needed).""" + token = token_factory("carol") + with connect_via_jwt("carol", token) as conn: + row = conn.execute("SELECT 42").fetchone() + assert row[0] == 42 + + def test_multiple_users_independent_sessions(self, token_factory): + """Two JWT users connect simultaneously without interfering.""" + t1 = token_factory("dave") + t2 = token_factory("eve") + with connect_via_jwt("dave", t1) as c1, connect_via_jwt("eve", t2) as c2: + r1 = c1.execute("SELECT 'dave'").fetchone()[0] + r2 = c2.execute("SELECT 'eve'").fetchone()[0] + assert r1 == "dave" + assert r2 == "eve" + + def test_same_user_reconnects(self, token_factory): + """Reconnecting with a fresh token for the same user succeeds.""" + for _ in range(3): + token = token_factory("frank") + with connect_via_jwt("frank", token) as conn: + conn.execute("SELECT 1") + + +class TestJwtRejection: + """Tokens that must be rejected.""" + + def test_expired_token_is_rejected(self, private_key): + """A token whose `exp` is in the past must not authenticate.""" + token = make_token(private_key, "grace", exp_offset=-10) + with pytest.raises(psycopg.OperationalError): + connect_via_jwt("grace", token) + + def test_tampered_token_is_rejected(self, token_factory): + """Flipping a byte in the signature must cause authentication failure.""" + token = token_factory("heidi") + parts = token.split(".") + # Corrupt the signature (last part) + sig = parts[2] + corrupted = sig[:-4] + ("AAAA" if sig[-4:] != "AAAA" else "BBBB") + bad_token = ".".join(parts[:2] + [corrupted]) + with pytest.raises(psycopg.OperationalError): + connect_via_jwt("heidi", bad_token) + + def test_wrong_password_rejected(self): + """Sending a plain string (not a JWT) is rejected.""" + with pytest.raises(psycopg.OperationalError): + connect_via_jwt("ivan", "not-a-jwt") + + def test_empty_password_rejected(self): + """Empty password is rejected.""" + with pytest.raises(psycopg.OperationalError): + connect_via_jwt("judy", "") + + +class TestJwtTransactions: + """Verify transactional behaviour through the proxy.""" + + def test_commit(self, token_factory): + """INSERT + COMMIT is durable.""" + token = token_factory("pgdog") + with psycopg.connect(pgdog_dsn("pgdog", token), autocommit=False) as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS jwt_test_commit (id SERIAL PRIMARY KEY, val TEXT)" + ) + conn.execute("INSERT INTO jwt_test_commit (val) VALUES ('hello')") + conn.commit() + + # Verify directly on Postgres + with connect_direct() as pg: + row = pg.execute("SELECT val FROM jwt_test_commit ORDER BY id DESC LIMIT 1").fetchone() + assert row is not None + assert row[0] == "hello" + + def test_rollback(self, token_factory): + """INSERT + ROLLBACK leaves no trace.""" + token = token_factory("pgdog") + with psycopg.connect(pgdog_dsn("pgdog", token), autocommit=False) as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS jwt_test_rollback (id SERIAL PRIMARY KEY, val TEXT)" + ) + conn.execute("INSERT INTO jwt_test_rollback (val) VALUES ('ghost')") + conn.rollback() + + with connect_direct() as pg: + row = pg.execute("SELECT COUNT(*) FROM jwt_test_rollback WHERE val = 'ghost'").fetchone() + assert row[0] == 0 + + +class TestJwtCustomClaim: + """JWT tokens with a custom username claim (requires pgdog.toml jwt_username_claim setting).""" + + def test_sub_claim_used_as_username(self, token_factory): + """The default `sub` claim is used as the Postgres username.""" + token = token_factory("kate") + with connect_via_jwt("kate", token) as conn: + # Just verify the connection works; SET ROLE is called with the sub value + row = conn.execute("SELECT 1").fetchone() + assert row[0] == 1 diff --git a/integration/jwt/users.toml b/integration/jwt/users.toml new file mode 100644 index 000000000..581cdb75b --- /dev/null +++ b/integration/jwt/users.toml @@ -0,0 +1,4 @@ +[[users]] +name = "pgdog" +database = "pgdog" +password = "pgdog" From 34981c3a45eed868670afd62f4f156e2a8cc856b Mon Sep 17 00:00:00 2001 From: Marco Palmisano Date: Wed, 17 Jun 2026 23:32:45 +0200 Subject: [PATCH 3/5] fix: escape SET ROLE identifier and resolve clippy lints - Escape embedded double quotes in the JWT-derived role name before interpolating it into SET ROLE, preventing identifier injection. - Collapse nested if-let blocks and use is_none_or to satisfy clippy. --- pgdog/src/auth/jwt/mod.rs | 16 ++++++++-------- pgdog/src/auth/jwt/pem_cache.rs | 8 ++++---- pgdog/src/backend/server.rs | 28 +++++++++++++++------------- pgdog/src/frontend/client/mod.rs | 2 +- 4 files changed, 28 insertions(+), 26 deletions(-) diff --git a/pgdog/src/auth/jwt/mod.rs b/pgdog/src/auth/jwt/mod.rs index 6df451b91..4009b3f6f 100644 --- a/pgdog/src/auth/jwt/mod.rs +++ b/pgdog/src/auth/jwt/mod.rs @@ -159,19 +159,19 @@ impl JwtValidator { // First check (read lock) { let cache = self.jwks_cache.lock().await; - if let Some(ref cached) = *cache { - if cached.fetched_at.elapsed() < self.jwks_cache_ttl { - return Ok(std::sync::Arc::clone(&cached.jwks)); - } + if let Some(ref cached) = *cache + && cached.fetched_at.elapsed() < self.jwks_cache_ttl + { + return Ok(std::sync::Arc::clone(&cached.jwks)); } } // Second check under lock to prevent concurrent HTTP requests let mut cache = self.jwks_cache.lock().await; - if let Some(ref cached) = *cache { - if cached.fetched_at.elapsed() < self.jwks_cache_ttl { - return Ok(std::sync::Arc::clone(&cached.jwks)); - } + if let Some(ref cached) = *cache + && cached.fetched_at.elapsed() < self.jwks_cache_ttl + { + return Ok(std::sync::Arc::clone(&cached.jwks)); } let response = self diff --git a/pgdog/src/auth/jwt/pem_cache.rs b/pgdog/src/auth/jwt/pem_cache.rs index c00954650..59beab09e 100644 --- a/pgdog/src/auth/jwt/pem_cache.rs +++ b/pgdog/src/auth/jwt/pem_cache.rs @@ -24,10 +24,10 @@ pub fn get_static_pem_bytes(path: &str) -> std::io::Result<&'static [u8]> { let mut cache = PEM_KEY_CACHE.lock(); // Check if we already have the exact same file contents cached - if let Some(cached_bytes) = cache.get(&path_buf) { - if *cached_bytes == current_bytes.as_slice() { - return Ok(*cached_bytes); - } + if let Some(cached_bytes) = cache.get(&path_buf) + && *cached_bytes == current_bytes.as_slice() + { + return Ok(*cached_bytes); } // Leaking is only done when a new file is loaded or its contents changed on disk diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 27fd7abfe..8b84676e3 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -112,19 +112,21 @@ impl Server { auth_secret.valid(true); server.password_attempts = idx + 1; - if let Some(ref client_user) = addr.client_user { - if client_user != &addr.user { - debug!( - "executing SET ROLE \"{}\" for server connection [{}]", - client_user, addr - ); - server - .execute(crate::net::Query::new(format!( - r#"SET ROLE "{}""#, - client_user - ))) - .await?; - } + if let Some(ref client_user) = addr.client_user + && client_user != &addr.user + { + // Escape the identifier: Postgres quoted identifiers escape an + // embedded double quote by doubling it. The username originates + // from a signed JWT claim, but we still escape it defensively to + // prevent any possibility of SQL injection via crafted claims. + let escaped = client_user.replace('"', "\"\""); + debug!( + "executing SET ROLE \"{}\" for server connection [{}]", + escaped, addr + ); + server + .execute(crate::net::Query::new(format!(r#"SET ROLE "{}""#, escaped))) + .await?; } return Ok(server); diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index c2af4716a..3a1292548 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -255,7 +255,7 @@ impl Client { .general .jwt_user_suffix .as_deref() - .map_or(true, |suffix| user.ends_with(suffix)); + .is_none_or(|suffix| user.ends_with(suffix)); // Check if we need to ask the client for its password in plaintext // because we don't actually have it configured. From eef3de68ef93c79dc60c71bda8637c5ca6e9ea14 Mon Sep 17 00:00:00 2001 From: Marco Palmisano Date: Thu, 18 Jun 2026 07:37:40 +0200 Subject: [PATCH 4/5] fix(jwt): make auto-provisioning and integration suite work end-to-end Running the integration suite (integration/jwt) surfaced several bugs in the JWT auth path; all 12 tests now pass. pgdog fixes: - Launch backend pools for users that only have server-side credentials (server_password / external identity) even when no client-facing password is configured. Auto-provisioned JWT users previously had their pools disabled with "password not set" and reported "pool is down". - Reuse an existing online pool when the JWT subject matches a user that is already configured or previously provisioned, instead of trying to re-provision it (which failed under passthrough auth with a password change error). - Enforce strict JWT expiry by setting validation leeway to 0; jsonwebtoken's default 60s leeway accepted tokens that had just expired. - Treat a failed backend SET ROLE (e.g. role does not exist) as non-fatal: log a warning and continue as the configured server user rather than dropping the connection. Integration suite fixes: - Dockerfile.pgdog: build with rust 1.96 (edition 2024) and run on debian:trixie-slim to match the builder's glibc. - pgdog.toml: use the renamed 'database_name' field. - test_jwt.py: create the table in a committed transaction before the rollback test so the verification query has a table to read. --- integration/jwt/Dockerfile.pgdog | 4 ++-- integration/jwt/pgdog.toml | 2 +- integration/jwt/test_jwt.py | 7 ++++++- pgdog/src/auth/jwt/mod.rs | 7 ++++++- pgdog/src/auth/jwt/tests.rs | 4 +++- pgdog/src/backend/databases.rs | 5 ++++- pgdog/src/backend/pool/cluster.rs | 18 ++++++++++++++++++ pgdog/src/backend/server.rs | 15 +++++++++++++-- pgdog/src/frontend/client/mod.rs | 14 +++++++++++++- 9 files changed, 66 insertions(+), 10 deletions(-) diff --git a/integration/jwt/Dockerfile.pgdog b/integration/jwt/Dockerfile.pgdog index 03fb2e639..511f9315d 100644 --- a/integration/jwt/Dockerfile.pgdog +++ b/integration/jwt/Dockerfile.pgdog @@ -1,4 +1,4 @@ -FROM rust:1.80-slim AS builder +FROM rust:1.96-slim AS builder RUN apt-get update && apt-get install -y \ libssl-dev \ @@ -11,7 +11,7 @@ WORKDIR /app COPY . . RUN cargo build --release --bin pgdog -FROM debian:bookworm-slim +FROM debian:trixie-slim RUN apt-get update && apt-get install -y \ libssl3 \ ca-certificates \ diff --git a/integration/jwt/pgdog.toml b/integration/jwt/pgdog.toml index 68a3a96f7..b723946b1 100644 --- a/integration/jwt/pgdog.toml +++ b/integration/jwt/pgdog.toml @@ -22,6 +22,6 @@ password = "pgdog" name = "pgdog" host = "postgres" port = 5432 -database = "pgdog" +database_name = "pgdog" user = "pgdog" password = "pgdog" diff --git a/integration/jwt/test_jwt.py b/integration/jwt/test_jwt.py index c65ca8b5d..cea660958 100644 --- a/integration/jwt/test_jwt.py +++ b/integration/jwt/test_jwt.py @@ -132,10 +132,15 @@ def test_commit(self, token_factory): def test_rollback(self, token_factory): """INSERT + ROLLBACK leaves no trace.""" token = token_factory("pgdog") - with psycopg.connect(pgdog_dsn("pgdog", token), autocommit=False) as conn: + # Ensure the table exists in a committed transaction first, otherwise the + # ROLLBACK below would also undo the CREATE TABLE and the verification + # query would fail with "relation does not exist". + with psycopg.connect(pgdog_dsn("pgdog", token), autocommit=True) as conn: conn.execute( "CREATE TABLE IF NOT EXISTS jwt_test_rollback (id SERIAL PRIMARY KEY, val TEXT)" ) + + with psycopg.connect(pgdog_dsn("pgdog", token), autocommit=False) as conn: conn.execute("INSERT INTO jwt_test_rollback (val) VALUES ('ghost')") conn.rollback() diff --git a/pgdog/src/auth/jwt/mod.rs b/pgdog/src/auth/jwt/mod.rs index 4009b3f6f..53c4eedeb 100644 --- a/pgdog/src/auth/jwt/mod.rs +++ b/pgdog/src/auth/jwt/mod.rs @@ -129,8 +129,13 @@ impl JwtValidator { _ => return Err(JwtError::InvalidAlgorithm), }; - // Configure validation criteria + // Configure validation criteria. + // + // `jsonwebtoken` applies a default leeway of 60 seconds when validating + // `exp`/`nbf`. For database authentication we want strict expiry, so a + // token whose `exp` has passed is rejected immediately. let mut validation = jsonwebtoken::Validation::new(algorithm); + validation.leeway = 0; if let Some(ref aud) = self.audience { validation.set_audience(&[aud]); } else { diff --git a/pgdog/src/auth/jwt/tests.rs b/pgdog/src/auth/jwt/tests.rs index f2fe4d4dc..257925f5f 100644 --- a/pgdog/src/auth/jwt/tests.rs +++ b/pgdog/src/auth/jwt/tests.rs @@ -97,7 +97,9 @@ async fn test_jwt_validation_expired() { let validator = JwtValidator::new(&general).unwrap(); - let exp = current_time_secs() - 300; + // Only 10 seconds in the past: this is within `jsonwebtoken`'s default + // 60s leeway, so it would be accepted unless leeway is set to 0. + let exp = current_time_secs() - 10; let claims = Claims { sub: "postgres_user".to_string(), exp, diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 322e25144..a36ebc1b4 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -482,7 +482,10 @@ impl Databases { // Launch all clusters for cluster in self.all().values() { - if cluster.passwords().is_empty() && cluster.identity().is_none() { + if cluster.passwords().is_empty() + && cluster.identity().is_none() + && !cluster.has_server_auth() + { warn!( r#"disabling pool for user "{}" and database "{}", password not set"#, cluster.user(), diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index ce7fe95b9..59dd05cf3 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -88,6 +88,7 @@ pub struct Cluster { regex_parser: RegexParser, identity: Option, user_read_only: bool, + has_server_auth: bool, } /// Sharding configuration from the cluster. @@ -176,6 +177,7 @@ pub struct ClusterConfig<'a> { pub pub_sub_enabled: bool, pub identity: &'a Option, pub user_read_only: bool, + pub has_server_auth: bool, } impl<'a> ClusterConfig<'a> { @@ -202,6 +204,12 @@ impl<'a> ClusterConfig<'a> { general.read_write_split }; + // The cluster can connect to the backend even without a client-facing + // password when it has dedicated server credentials (e.g. JWT users + // authenticated by token but connecting as a configured server user) or + // uses an external identity provider (RDS IAM, Azure). + let has_server_auth = user.server_password.is_some() || user.is_external_identity(); + Self { name: &user.database, passwords: user.passwords(), @@ -247,6 +255,7 @@ impl<'a> ClusterConfig<'a> { pub_sub_enabled: general.pub_sub_enabled(), identity: &user.identity, user_read_only, + has_server_auth, } } } @@ -294,6 +303,7 @@ impl Cluster { pub_sub_enabled, identity, user_read_only, + has_server_auth, } = config; let identifier = Arc::new(DatabaseUser { @@ -355,6 +365,7 @@ impl Cluster { regex_parser: RegexParser::new(regex_parser_limit, query_parser), identity: identity.clone(), user_read_only, + has_server_auth, } } @@ -444,6 +455,13 @@ impl Cluster { self.user_read_only } + /// Whether the cluster has dedicated server-side credentials (or an external + /// identity provider) and can therefore connect to the backend even when no + /// client-facing password is configured (e.g. JWT-authenticated users). + pub fn has_server_auth(&self) -> bool { + self.has_server_auth + } + pub fn pooler_mode(&self) -> PoolerMode { self.pooler_mode } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 8b84676e3..e32814f28 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -124,9 +124,20 @@ impl Server { "executing SET ROLE \"{}\" for server connection [{}]", escaped, addr ); - server + // A failed SET ROLE (e.g. the role does not exist in the + // backend) leaves the connection usable: execute() drains + // until ReadyForQuery. We log and continue as the configured + // server user rather than dropping the connection, so that + // role provisioning remains the operator's responsibility. + if let Err(err) = server .execute(crate::net::Query::new(format!(r#"SET ROLE "{}""#, escaped))) - .await?; + .await + { + warn!( + "SET ROLE \"{}\" failed for server connection, continuing as \"{}\": {} [{}]", + escaped, addr.user, err, addr + ); + } } return Ok(server); diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 3a1292548..401dbf649 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -283,7 +283,19 @@ impl Client { .get_username(config.config.general.jwt_username_claim.as_str()); if let Some(sub) = sub { jwt_effective_user = Some(sub.to_string()); - if config.config.general.jwt_user_auto_provision { + // If a usable pool already exists for this user (e.g. the + // JWT subject matches a user already configured in + // users.toml, or a previously provisioned JWT user), reuse + // it instead of re-provisioning. The JWT has already + // authenticated the client. + let already_provisioned = databases::databases() + .cluster((sub.as_str(), database)) + .map(|cluster| cluster.online()) + .unwrap_or(false); + + if already_provisioned { + AuthResult::Ok + } else if config.config.general.jwt_user_auto_provision { use crate::config::convert::user_from_jwt; let jwt_user = user_from_jwt(user, database, &sub, &config.config.general); From 2e78226ba1c5228fa28c840ff34deb9f8d941fc7 Mon Sep 17 00:00:00 2001 From: Marco Palmisano Date: Thu, 18 Jun 2026 22:33:43 +0200 Subject: [PATCH 5/5] fix: removed company references --- pgdog/src/auth/jwt/tests.rs | 8 ++++---- pgdog/src/frontend/client/test/mod.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pgdog/src/auth/jwt/tests.rs b/pgdog/src/auth/jwt/tests.rs index 257925f5f..479d28770 100644 --- a/pgdog/src/auth/jwt/tests.rs +++ b/pgdog/src/auth/jwt/tests.rs @@ -226,9 +226,9 @@ async fn test_jwt_validator_memory_leak_prevention() { #[test] fn test_jwt_user_suffix_matching() { // 1. With configured suffix - let suffix = Some("@edreamsodigeo.com".to_string()); + let suffix = Some("@example.com".to_string()); - let user_sso = "john.doe@edreamsodigeo.com"; + let user_sso = "john.doe@example.com"; let user_other = "datastream"; let match_sso = match suffix { @@ -287,12 +287,12 @@ async fn test_jwt_custom_username_claim() { }; claims.extra.insert( "email".to_string(), - serde_json::Value::String("postgres_user_email@edreamsodigeo.com".to_string()), + serde_json::Value::String("postgres_user_email@example.com".to_string()), ); let token = generate_token(&claims, Algorithm::RS256, TEST_PRIVATE_KEY_PEM); let decoded = validator.validate(&token).await.unwrap(); let username = decoded.get_username(&general.jwt_username_claim).unwrap(); - assert_eq!(username, "postgres_user_email@edreamsodigeo.com"); + assert_eq!(username, "postgres_user_email@example.com"); } diff --git a/pgdog/src/frontend/client/test/mod.rs b/pgdog/src/frontend/client/test/mod.rs index f688ed021..652e5cb7b 100644 --- a/pgdog/src/frontend/client/test/mod.rs +++ b/pgdog/src/frontend/client/test/mod.rs @@ -584,7 +584,7 @@ LwIDAQAB let mut config = (*config()).clone(); config.config.general.auth_type = AuthType::Jwt; config.config.general.jwt_public_key_file = Some(pub_key_path.to_str().unwrap().to_string()); - config.config.general.jwt_user_suffix = Some("@edreamsodigeo.com".to_string()); + config.config.general.jwt_user_suffix = Some("@example.com".to_string()); config.config.general.jwt_user_auto_provision = true; config.config.general.jwt_user_auto_provision_read_only = true; set(config).unwrap(); @@ -618,7 +618,7 @@ LwIDAQAB let stream = Stream::plain(stream, 4096); let mut params = crate::net::parameter::Parameters::default(); - params.insert("user", "john.doe@edreamsodigeo.com"); + params.insert("user", "john.doe@example.com"); params.insert("database", "pgdog"); Client::spawn(