diff --git a/config/quickwit.yaml b/config/quickwit.yaml index 035bd2fb241..039c2cd0e56 100644 --- a/config/quickwit.yaml +++ b/config/quickwit.yaml @@ -103,6 +103,13 @@ version: 0.8 # metastore_uri: s3://your-bucket/indexes # metastore_uri: postgres://username:password@host:port/db # +# Optional PostgreSQL read replica URI. Nodes started with the +# `metastore_read_replica` service connect to it over a read-only connection and +# serve stale-tolerant read-only metastore requests. Searchers use those nodes +# only when `searcher.use_metastore_read_replica` is enabled. Defaults to unset. +# +# metastore_read_replica_uri: postgres://username:password@read-replica-host:port/db +# # When using a file-backed metastore, the state of the metastore will be cached forever. # If you are indexing and searching from different processes, it is possible to periodically # refresh the state of the metastore on the searcher using the `polling_interval` hashtag. @@ -168,6 +175,11 @@ indexer: # https://quickwit.io/docs/configuration/node-config#searcher-configuration # # searcher: +# # If true, routes read-only metastore requests from searchers, including +# # DataFusion when enabled, to nodes running the `metastore_read_replica` +# # service. Searchers require at least one `metastore_read_replica` node at +# # startup and do not fall back to the primary metastore. +# use_metastore_read_replica: false # fast_field_cache_capacity: 1G # split_footer_cache_capacity: 500M # partial_request_cache_capacity: 64M diff --git a/docs/configuration/node-config.md b/docs/configuration/node-config.md index 849e8cb44c3..b6478ef34e0 100644 --- a/docs/configuration/node-config.md +++ b/docs/configuration/node-config.md @@ -22,7 +22,7 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw | `version` | Config file version. `0.7` is the only available value with a retro compatibility on `0.5` and `0.4`. | | | | `cluster_id` | Unique identifier of the cluster the node will be joining. Clusters sharing the same network should use distinct cluster IDs.| `QW_CLUSTER_ID` | `quickwit-default-cluster` | | `node_id` | Unique identifier of the node. It must be distinct from the node IDs of its cluster peers. Defaults to the instance's short hostname if not set. | `QW_NODE_ID` | short hostname | -| `enabled_services` | Enabled services (control_plane, indexer, janitor, metastore, searcher) | `QW_ENABLED_SERVICES` | all services | +| `enabled_services` | Enabled services (control_plane, indexer, janitor, metastore, metastore_read_replica, searcher) | `QW_ENABLED_SERVICES` | all services except metastore_read_replica | | `listen_address` | The IP address or hostname that Quickwit service binds to for starting REST and GRPC server and connecting this node to other nodes. By default, Quickwit binds itself to 127.0.0.1 (localhost). This default is not valid when trying to form a cluster. | `QW_LISTEN_ADDRESS` | `127.0.0.1` | | `advertise_address` | IP address advertised by the node, i.e. the IP address that peer nodes should use to connect to the node for RPCs. | `QW_ADVERTISE_ADDRESS` | `listen_address` | | `gossip_listen_port` | The port which to listen for the Gossip cluster membership service (UDP). | `QW_GOSSIP_LISTEN_PORT` | `rest.listen_port` | @@ -30,6 +30,7 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw | `peer_seeds` | List of IP addresses or hostnames used to bootstrap the cluster and discover the complete set of nodes. This list may contain the current node address and does not need to be exhaustive. If the list of peer seeds contains a host name, Quickwit will resolve it by querying the DNS every minute. On kubernetes for instance, it is a good practise to set it to a [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). | `QW_PEER_SEEDS` | | | `data_dir` | Path to directory where data (tmp data, splits kept for caching purpose) is persisted. This is mostly used in indexing. | `QW_DATA_DIR` | `./qwdata` | | `metastore_uri` | Metastore URI. Can be a local directory or `s3://my-bucket/indexes` or `postgres://username:password@localhost:5432/metastore`. [Learn more about the metastore configuration](metastore-config.md). | `QW_METASTORE_URI` | `{data_dir}/indexes` | +| `metastore_read_replica_uri` | Optional PostgreSQL read replica URI. Nodes running the `metastore_read_replica` service connect to it over a read-only connection and serve stale-tolerant read-only metastore requests. Searchers use those nodes only when `searcher.use_metastore_read_replica` is enabled. | `QW_METASTORE_READ_REPLICA_URI` | | | `default_index_root_uri` | Default index root URI that defines the location where index data (splits) is stored. The index URI is built following the scheme: `{default_index_root_uri}/{index-id}` | `QW_DEFAULT_INDEX_ROOT_URI` | `{data_dir}/indexes` | | environment variable only | Log level of Quickwit. Can be a direct log level, or a comma separated list of `module_name=level` | `RUST_LOG` | `info` | @@ -285,6 +286,7 @@ This section contains the configuration options for a Searcher. | `max_num_concurrent_split_searches` | Maximum number of concurrent split search requests running on a Searcher. | `100` | | `split_cache` | Searcher split cache configuration options defined in the section below. Cache disabled if unspecified. | | | `request_timeout_secs` | The time before a search request is cancelled. This should match the timeout of the stack calling into quickwit if there is one set. | `30` | +| `use_metastore_read_replica` | If true, routes read-only metastore requests from searchers, including DataFusion when enabled, to nodes running the `metastore_read_replica` service. Searchers require at least one `metastore_read_replica` node at startup and do not fall back to the primary metastore. | `false` | ### Searcher split cache configuration @@ -301,6 +303,7 @@ Example: ```yaml searcher: + use_metastore_read_replica: false fast_field_cache_capacity: 1G split_footer_cache_capacity: 500M partial_request_cache_capacity: 64M diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 1548273cb89..dad1d129694 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -18,6 +18,7 @@ ], "data_dir": "/opt/quickwit/data", "metastore_uri": "postgres://username:password@host:port/db", + "metastore_read_replica_uri": "postgres://username:replica-password@replica-host:port/db", "default_index_root_uri": "s3://quickwit-indexes", "rest": { "listen_port": 1111, @@ -64,6 +65,7 @@ "replication_factor": 2 }, "searcher": { + "use_metastore_read_replica": true, "aggregation_memory_limit": "1G", "aggregation_bucket_limit": 500000, "fast_field_cache_capacity": "10G", diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index 3c97620f185..3587774f718 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -11,6 +11,7 @@ grpc_listen_port = 3333 peer_seeds = [ "quickwit-searcher-0.local", "quickwit-searcher-1.local" ] data_dir = "/opt/quickwit/data" metastore_uri = "postgres://username:password@host:port/db" +metastore_read_replica_uri = "postgres://username:replica-password@replica-host:port/db" default_index_root_uri = "s3://quickwit-indexes" [rest] @@ -54,6 +55,7 @@ parquet_merge_use_streaming_engine = true replication_factor = 2 [searcher] +use_metastore_read_replica = true aggregation_memory_limit = "1G" aggregation_bucket_limit = 500_000 fast_field_cache_capacity = "10G" diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index 7d4551b714a..599a1781e56 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -15,6 +15,7 @@ peer_seeds: - quickwit-searcher-1.local data_dir: /opt/quickwit/data metastore_uri: postgres://username:password@host:port/db +metastore_read_replica_uri: postgres://username:replica-password@replica-host:port/db default_index_root_uri: s3://quickwit-indexes rest: @@ -58,6 +59,7 @@ ingest_api: replication_factor: 2 searcher: + use_metastore_read_replica: true aggregation_memory_limit: 1G aggregation_bucket_limit: 500000 fast_field_cache_capacity: 10G diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 1c60891cf58..cc36992a557 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -343,6 +343,9 @@ pub struct SearcherConfig { #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub storage_timeout_policy: Option, + /// Routes read-only metastore requests from searchers, including DataFusion when enabled, to + /// nodes running the `metastore_read_replica` service. + pub use_metastore_read_replica: bool, pub warmup_memory_budget: ByteSize, pub warmup_single_split_initial_allocation: ByteSize, /// Lambda configuration for serverless leaf search execution. @@ -566,6 +569,7 @@ impl Default for SearcherConfig { request_timeout_secs: Self::default_request_timeout_secs(), leaf_request_timeout_secs: Self::default_request_timeout_secs(), storage_timeout_policy: None, + use_metastore_read_replica: false, warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::mb(300), lambda: None, @@ -812,6 +816,10 @@ pub struct NodeConfig { pub peer_seeds: Vec, pub data_dir_path: PathBuf, pub metastore_uri: Uri, + /// Optional PostgreSQL read replica URI. It is used as the connection URI by nodes running the + /// [`QuickwitService::MetastoreReadReplica`] role. + #[serde(skip_serializing_if = "Option::is_none")] + pub metastore_read_replica_uri: Option, pub default_index_root_uri: Uri, pub rest_config: RestConfig, #[serde(skip_serializing_if = "Option::is_none")] @@ -872,6 +880,9 @@ impl NodeConfig { pub fn redact(&mut self) { self.metastore_configs.redact(); self.metastore_uri.redact(); + if let Some(metastore_read_replica_uri) = &mut self.metastore_read_replica_uri { + metastore_read_replica_uri.redact(); + } self.storage_configs.redact(); } @@ -896,6 +907,26 @@ mod tests { use super::*; use crate::IndexerConfig; + #[test] + fn test_node_config_redact_metastore_uris() { + let mut config = NodeConfig::for_test(); + config.metastore_uri = Uri::for_test("postgresql://username:password@host:5432/db"); + config.metastore_read_replica_uri = Some(Uri::for_test( + "postgresql://replica-user:replica-password@replica-host:5432/db", + )); + + config.redact(); + + assert_eq!( + config.metastore_uri, + "postgresql://username:***redacted***@host:5432/db" + ); + assert_eq!( + config.metastore_read_replica_uri.unwrap(), + "postgresql://replica-user:***redacted***@replica-host:5432/db" + ); + } + #[test] fn test_indexer_config_serialization() { { diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 8b53cb6cf74..c3c1ce657a4 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -88,7 +88,7 @@ impl FromStr for List { fn default_enabled_services() -> ConfigValue { ConfigValue::with_default(List( - QuickwitService::supported_services() + QuickwitService::default_services() .into_iter() .map(|service| service.to_string()) .collect(), @@ -130,6 +130,10 @@ fn default_metastore_uri(data_dir_uri: &Uri) -> Uri { data_dir_uri.join("indexes#polling_interval=30s").expect("Failed to create default metastore URI. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.") } +fn default_metastore_read_replica_uri() -> ConfigValue { + ConfigValue::none() +} + // See comment above. fn default_index_root_uri(data_dir_uri: &Uri) -> Uri { data_dir_uri.join("indexes").expect("Failed to create default index root URI. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.") @@ -191,6 +195,8 @@ struct NodeConfigBuilder { #[serde(default = "default_data_dir_uri")] data_dir_uri: ConfigValue, metastore_uri: ConfigValue, + #[serde(default = "default_metastore_read_replica_uri")] + metastore_read_replica_uri: ConfigValue, default_index_root_uri: ConfigValue, #[serde(rename = "rest")] #[serde(default)] @@ -301,6 +307,9 @@ impl NodeConfigBuilder { .resolve_optional(env_vars)? .unwrap_or_else(|| default_metastore_uri(&data_dir_uri)); + let metastore_read_replica_uri = + self.metastore_read_replica_uri.resolve_optional(env_vars)?; + let default_index_root_uri = self .default_index_root_uri .resolve_optional(env_vars)? @@ -330,6 +339,7 @@ impl NodeConfigBuilder { peer_seeds: self.peer_seeds.resolve(env_vars)?.0, data_dir_path, metastore_uri, + metastore_read_replica_uri, default_index_root_uri, rest_config, health_config, @@ -357,10 +367,38 @@ fn validate(node_config: &NodeConfig) -> anyhow::Result<()> { if node_config.peer_seeds.is_empty() { warn!("peer seeds are empty"); } + validate_metastore_read_replica(node_config)?; validate_disk_usage(node_config); Ok(()) } +/// Validates the configuration of the [`QuickwitService::MetastoreReadReplica`] role. +/// +/// The [`QuickwitService::MetastoreReadReplica`] role serves the same gRPC service as a read-only +/// metastore, so it must run standalone and requires `metastore_read_replica_uri` to connect to. +fn validate_metastore_read_replica(node_config: &NodeConfig) -> anyhow::Result<()> { + let read_replica_enabled = + node_config.is_service_enabled(QuickwitService::MetastoreReadReplica); + if !read_replica_enabled { + return Ok(()); + } + if node_config.enabled_services.len() != 1 { + bail!( + "the `metastore_read_replica` service must run standalone and cannot be combined with \ + any other service" + ); + } + match &node_config.metastore_read_replica_uri { + Some(_) => Ok(()), + None => { + bail!( + "the `metastore_read_replica` service requires `metastore_read_replica_uri` to be \ + set" + ) + } + } +} + /// A list of all the known disk budgets /// /// External disk usage and unbounded disk usages, e.g the indexing workbench @@ -429,6 +467,7 @@ impl Default for NodeConfigBuilder { peer_seeds: ConfigValue::with_default(List::default()), data_dir_uri: default_data_dir_uri(), metastore_uri: ConfigValue::none(), + metastore_read_replica_uri: default_metastore_read_replica_uri(), default_index_root_uri: ConfigValue::none(), rest_config_builder: RestConfigBuilder::default(), health_config_builder: HealthConfigBuilder::default(), @@ -522,7 +561,7 @@ pub fn node_config_for_tests_from_ports( grpc_listen_port: u16, ) -> NodeConfig { let node_id = NodeId::from_str(&default_node_id().unwrap()); - let enabled_services = QuickwitService::supported_services(); + let enabled_services = QuickwitService::default_services(); let availability_zone = Some(String::from("az-1")); let listen_address = Host::default(); let rest_listen_addr = listen_address @@ -564,6 +603,7 @@ pub fn node_config_for_tests_from_ports( peer_seeds: Vec::new(), data_dir_path, metastore_uri, + metastore_read_replica_uri: None, default_index_root_uri, rest_config, health_config: None, @@ -663,6 +703,10 @@ mod tests { config.metastore_uri, "postgresql://username:password@host:port/db" ); + assert_eq!( + config.metastore_read_replica_uri.as_ref().unwrap().as_str(), + "postgresql://username:replica-password@replica-host:port/db" + ); assert_eq!(config.default_index_root_uri, "s3://quickwit-indexes"); let azure_storage_config = config.storage_configs.find_azure().unwrap(); @@ -742,6 +786,7 @@ mod tests { timeout_millis: 2_000, max_num_retries: 2 }), + use_metastore_read_replica: true, warmup_memory_budget: ByteSize::gb(100), warmup_single_split_initial_allocation: ByteSize::mb(300), lambda: Some(LambdaConfig { @@ -820,10 +865,7 @@ mod tests { assert_eq!(config.cluster_id, DEFAULT_CLUSTER_ID); assert_eq!(config.node_id.as_str(), get_short_hostname().unwrap()); assert_eq!(config.availability_zone, None); - assert_eq!( - config.enabled_services, - QuickwitService::supported_services() - ); + assert_eq!(config.enabled_services, QuickwitService::default_services()); assert_eq!( config.rest_config.listen_addr, SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 7280) @@ -848,6 +890,7 @@ mod tests { env::current_dir().unwrap().display() ) ); + assert!(config.metastore_read_replica_uri.is_none()); assert_eq!( config.default_index_root_uri, format!( @@ -1010,6 +1053,107 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn test_metastore_read_replica_role_accepts_postgres_uri() { + let config_yaml = r#" + version: 0.8 + node_id: test-node + enabled_services: + - metastore_read_replica + metastore_read_replica_uri: postgres://user:pass@host:5432/db + "#; + let config = load_node_config_with_env( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Default::default(), + ) + .await + .unwrap(); + assert_eq!( + config.metastore_read_replica_uri.unwrap(), + "postgresql://user:pass@host:5432/db" + ); + } + + #[tokio::test] + async fn test_metastore_read_replica_uri_can_be_set_from_env() { + let config_yaml = r#" + version: 0.8 + node_id: test-node + enabled_services: + - metastore_read_replica + "#; + let mut env_vars = HashMap::new(); + env_vars.insert( + "QW_METASTORE_READ_REPLICA_URI".to_string(), + "postgres://user:pass@replica-host:5432/db".to_string(), + ); + let config = + load_node_config_with_env(ConfigFormat::Yaml, config_yaml.as_bytes(), &env_vars) + .await + .unwrap(); + assert_eq!( + config.metastore_read_replica_uri.unwrap(), + "postgresql://user:pass@replica-host:5432/db" + ); + } + + #[tokio::test] + async fn test_metastore_read_replica_role_without_uri_is_rejected() { + let config_yaml = r#" + version: 0.8 + node_id: test-node + enabled_services: + - metastore_read_replica + "#; + let error = load_node_config_with_env( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Default::default(), + ) + .await + .unwrap_err(); + assert!( + error + .to_string() + .contains("requires `metastore_read_replica_uri`") + ); + } + + #[tokio::test] + async fn test_metastore_read_replica_role_must_run_standalone() { + for service in [ + QuickwitService::ControlPlane, + QuickwitService::Indexer, + QuickwitService::Searcher, + QuickwitService::Janitor, + QuickwitService::Metastore, + ] { + let config_yaml = format!( + r#" + version: 0.8 + node_id: test-node + enabled_services: + - metastore_read_replica + - {} + metastore_read_replica_uri: postgres://user:pass@host:5432/db + "#, + service.as_str() + ); + let error = load_node_config_with_env( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Default::default(), + ) + .await + .unwrap_err(); + assert!( + error.to_string().contains("must run standalone"), + "{service} should not be allowed with metastore_read_replica: {error}" + ); + } + } + #[tokio::test] async fn test_peer_socket_addrs() { { diff --git a/quickwit/quickwit-config/src/qw_env_vars.rs b/quickwit/quickwit-config/src/qw_env_vars.rs index 0a551ccaab9..c65a2fb6406 100644 --- a/quickwit/quickwit-config/src/qw_env_vars.rs +++ b/quickwit/quickwit-config/src/qw_env_vars.rs @@ -56,6 +56,7 @@ qw_env_vars!( QW_PEER_SEEDS, QW_DATA_DIR, QW_METASTORE_URI, + QW_METASTORE_READ_REPLICA_URI, QW_DEFAULT_INDEX_ROOT_URI ); diff --git a/quickwit/quickwit-config/src/service.rs b/quickwit/quickwit-config/src/service.rs index d323331b510..51505182189 100644 --- a/quickwit/quickwit-config/src/service.rs +++ b/quickwit/quickwit-config/src/service.rs @@ -29,6 +29,10 @@ pub enum QuickwitService { Searcher, Janitor, Metastore, + /// A read-only metastore node backed by a PostgreSQL read replica. It serves the same gRPC + /// metastore service as [`QuickwitService::Metastore`] but over a read-only connection, and is + /// discovered separately so that read-only callers (e.g. searchers) can route reads to it. + MetastoreReadReplica, } #[allow(clippy::from_over_into)] @@ -46,12 +50,25 @@ impl QuickwitService { QuickwitService::Searcher => "searcher", QuickwitService::Janitor => "janitor", QuickwitService::Metastore => "metastore", + QuickwitService::MetastoreReadReplica => "metastore_read_replica", } } + /// Returns every service the binary knows how to run. pub fn supported_services() -> HashSet { all::().collect() } + + /// Returns the services enabled on a node when none are explicitly configured. + /// + /// This is every supported service except [`QuickwitService::MetastoreReadReplica`], which is + /// opt-in: it requires a dedicated read replica URI and is meant to be deployed as its own set + /// of nodes, so it must never be enabled implicitly by an all-in-one node. + pub fn default_services() -> HashSet { + all::() + .filter(|service| *service != QuickwitService::MetastoreReadReplica) + .collect() + } } impl Display for QuickwitService { @@ -70,6 +87,9 @@ impl FromStr for QuickwitService { "searcher" => Ok(QuickwitService::Searcher), "janitor" => Ok(QuickwitService::Janitor), "metastore" => Ok(QuickwitService::Metastore), + "metastore-read-replica" | "metastore_read_replica" => { + Ok(QuickwitService::MetastoreReadReplica) + } _ => { bail!( "failed to parse service `{service_str}`. supported services are: `{}`", @@ -79,3 +99,39 @@ impl FromStr for QuickwitService { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_quickwit_service_str_round_trip() { + for service in QuickwitService::supported_services() { + let parsed = QuickwitService::from_str(service.as_str()).unwrap(); + assert_eq!(parsed, service); + } + } + + #[test] + fn test_quickwit_service_metastore_read_replica_aliases() { + assert_eq!( + QuickwitService::from_str("metastore_read_replica").unwrap(), + QuickwitService::MetastoreReadReplica + ); + assert_eq!( + QuickwitService::from_str("metastore-read-replica").unwrap(), + QuickwitService::MetastoreReadReplica + ); + } + + #[test] + fn test_default_services_excludes_metastore_read_replica() { + let default_services = QuickwitService::default_services(); + assert!(!default_services.contains(&QuickwitService::MetastoreReadReplica)); + assert!(default_services.contains(&QuickwitService::Metastore)); + + let supported_services = QuickwitService::supported_services(); + assert!(supported_services.contains(&QuickwitService::MetastoreReadReplica)); + assert_eq!(default_services.len() + 1, supported_services.len()); + } +} diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs b/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs index a8f96fec60b..87b14b2ed91 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/index_resolver.rs @@ -25,11 +25,11 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::error::Result as DFResult; use quickwit_common::uri::Uri; -use quickwit_metastore::{IndexMetadataResponseExt, ListIndexesMetadataResponseExt}; -use quickwit_parquet_engine::split::ParquetSplitKind; -use quickwit_proto::metastore::{ - IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, +use quickwit_metastore::{ + IndexMetadataResponseExt, ListIndexesMetadataResponseExt, MetastoreReadServiceClient, }; +use quickwit_parquet_engine::split::ParquetSplitKind; +use quickwit_proto::metastore::{IndexMetadataRequest, ListIndexesMetadataRequest}; use tracing::debug; use super::metastore_provider::MetastoreSplitProvider; @@ -59,11 +59,11 @@ pub trait MetricsIndexResolver: Send + Sync + std::fmt::Debug { /// storage lazily on first read. #[derive(Clone)] pub struct MetastoreIndexResolver { - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, } impl MetastoreIndexResolver { - pub fn new(metastore: MetastoreServiceClient) -> Self { + pub fn new(metastore: MetastoreReadServiceClient) -> Self { Self { metastore } } } @@ -85,7 +85,6 @@ impl MetricsIndexResolver for MetastoreIndexResolver { let response = self .metastore - .clone() .index_metadata(IndexMetadataRequest::for_index_id(index_name.to_string())) .await .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; @@ -111,7 +110,6 @@ impl MetricsIndexResolver for MetastoreIndexResolver { async fn list_index_names(&self) -> DFResult> { let response = self .metastore - .clone() .list_indexes_metadata(ListIndexesMetadataRequest::all()) .await .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs b/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs index 4e0927554e4..0938e510fe0 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs @@ -18,9 +18,10 @@ use std::ops::Bound; use async_trait::async_trait; use datafusion::error::Result as DFResult; -use quickwit_metastore::{ListParquetSplitsQuery, list_parquet_splits_paginated}; +use quickwit_metastore::{ + ListParquetSplitsQuery, MetastoreReadServiceClient, list_parquet_splits_paginated, +}; use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata}; -use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::IndexUid; use tracing::{debug, instrument}; @@ -30,14 +31,14 @@ use super::table_provider::MetricsSplitProvider; /// `MetricsSplitProvider` backed by the Quickwit metastore RPC. #[derive(Debug, Clone)] pub struct MetastoreSplitProvider { - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, index_uid: IndexUid, split_kind: ParquetSplitKind, } impl MetastoreSplitProvider { pub fn new( - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, index_uid: IndexUid, split_kind: ParquetSplitKind, ) -> Self { @@ -64,10 +65,13 @@ impl MetricsSplitProvider for MetastoreSplitProvider { )] async fn list_splits(&self, query: &MetricsSplitQuery) -> DFResult> { let metastore_query = to_metastore_query(&self.index_uid, query); - let records = - list_parquet_splits_paginated(self.metastore.clone(), self.split_kind, metastore_query) - .await - .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; + let records = list_parquet_splits_paginated( + self.metastore.as_ref(), + self.split_kind, + metastore_query, + ) + .await + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?; let splits: Vec = records.into_iter().map(|record| record.metadata).collect(); diff --git a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs index 0e50e203fb3..4f877cf2a6f 100644 --- a/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs +++ b/quickwit/quickwit-datafusion/src/sources/metrics/mod.rs @@ -44,8 +44,9 @@ use quickwit_common::{is_metrics_index, is_parquet_pipeline_index, is_sketches_i use quickwit_df_core::{ QuickwitRuntimePlugin, QuickwitRuntimeRegistration, QuickwitSubstraitConsumerExt, }; +use quickwit_metastore::MetastoreReadServiceClient; use quickwit_parquet_engine::split::ParquetSplitKind; -use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient}; +use quickwit_proto::metastore::MetastoreError; use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory, SKETCHES_FILE_TYPE}; use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver}; @@ -84,7 +85,7 @@ pub struct MetricsDataSource { impl MetricsDataSource { /// Create a production `MetricsDataSource` backed by the metastore. - pub fn new(metastore: MetastoreServiceClient) -> Self { + pub fn new(metastore: MetastoreReadServiceClient) -> Self { Self { index_resolver: Arc::new(MetastoreIndexResolver::new(metastore)), } diff --git a/quickwit/quickwit-datafusion/tests/distributed.rs b/quickwit/quickwit-datafusion/tests/distributed.rs index 7c8c498e35f..e5a84c352ba 100644 --- a/quickwit/quickwit-datafusion/tests/distributed.rs +++ b/quickwit/quickwit-datafusion/tests/distributed.rs @@ -69,7 +69,7 @@ async fn test_distributed_tasks_not_shuffles() { .await; } - let source = Arc::new(MetricsDataSource::new(metastore)); + let source = Arc::new(MetricsDataSource::new(Arc::new(metastore))); // Pool starts empty; we populate it once the workers are bound. The resolver // reads pool keys lazily at query time, so this ordering is safe. diff --git a/quickwit/quickwit-datafusion/tests/metrics.rs b/quickwit/quickwit-datafusion/tests/metrics.rs index c36412305fc..0a0f5d743ea 100644 --- a/quickwit/quickwit-datafusion/tests/metrics.rs +++ b/quickwit/quickwit-datafusion/tests/metrics.rs @@ -45,7 +45,7 @@ async fn start_sandbox() -> TestSandbox { /// Build a `DataFusionSessionBuilder` wired to the sandbox's metastore + storage. fn session_builder(sandbox: &TestSandbox) -> DataFusionSessionBuilder { - let source = Arc::new(MetricsDataSource::new(sandbox.metastore.clone())); + let source = Arc::new(MetricsDataSource::new(Arc::new(sandbox.metastore.clone()))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new( sandbox.storage_resolver.clone(), diff --git a/quickwit/quickwit-datafusion/tests/null_columns.rs b/quickwit/quickwit-datafusion/tests/null_columns.rs index ed8bfa41685..2cc5a59f5e4 100644 --- a/quickwit/quickwit-datafusion/tests/null_columns.rs +++ b/quickwit/quickwit-datafusion/tests/null_columns.rs @@ -106,7 +106,7 @@ async fn test_null_columns_for_missing_parquet_fields() { ); publish_split(&metastore, &index_uid, data_dir.path(), "wide", &batch_b).await; - let source = Arc::new(MetricsDataSource::new(metastore)); + let source = Arc::new(MetricsDataSource::new(Arc::new(metastore))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new( sandbox.storage_resolver.clone(), diff --git a/quickwit/quickwit-datafusion/tests/sketches.rs b/quickwit/quickwit-datafusion/tests/sketches.rs index ea14fbb986b..43411e6e20c 100644 --- a/quickwit/quickwit-datafusion/tests/sketches.rs +++ b/quickwit/quickwit-datafusion/tests/sketches.rs @@ -36,7 +36,7 @@ async fn start_sandbox() -> TestSandbox { } fn session_builder(sandbox: &TestSandbox) -> DataFusionSessionBuilder { - let source = Arc::new(MetricsDataSource::new(sandbox.metastore.clone())); + let source = Arc::new(MetricsDataSource::new(Arc::new(sandbox.metastore.clone()))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new( sandbox.storage_resolver.clone(), diff --git a/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs index 6517123c43c..5700b286f04 100644 --- a/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/parquet_garbage_collection.rs @@ -177,7 +177,7 @@ async fn list_parquet_splits( let kind = parquet_split_kind_for_index(index_uid); protect_future( progress_opt, - list_parquet_splits_paginated(metastore.clone(), kind, query), + list_parquet_splits_paginated(metastore, kind, query), ) .await .context("failed to list parquet splits") diff --git a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs index 6de3f5f5983..8ee01056e01 100644 --- a/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_merge_pipeline.rs @@ -78,7 +78,7 @@ async fn fetch_published_parquet_splits_paginated( }; let query = quickwit_metastore::ListParquetSplitsQuery::for_index(index_uid.clone()) .retain_immature(OffsetDateTime::now_utc()); - let records = list_parquet_splits_paginated(metastore, kind, query).await?; + let records = list_parquet_splits_paginated(&metastore, kind, query).await?; debug!( num_splits = records.len(), "fetched published parquet splits for merge planning" diff --git a/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs b/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs index 533c5960997..307202751e2 100644 --- a/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/metrics_distributed_tests.rs @@ -323,7 +323,7 @@ async fn test_null_columns_for_missing_parquet_fields() { publish_split(&metastore, &index_uid, data_dir.path(), "wide", &batch_b).await; let storage_resolver = quickwit_storage::StorageResolver::unconfigured(); - let source = Arc::new(MetricsDataSource::new(metastore)); + let source = Arc::new(MetricsDataSource::new(Arc::new(metastore))); let schema_source = Arc::clone(&source); let registry = Arc::new(QuickwitObjectStoreRegistry::new(storage_resolver)); let builder = DataFusionSessionBuilder::new() diff --git a/quickwit/quickwit-janitor/src/retention_policy_execution.rs b/quickwit/quickwit-janitor/src/retention_policy_execution.rs index bf06dd1b032..3eed6bb6a69 100644 --- a/quickwit/quickwit-janitor/src/retention_policy_execution.rs +++ b/quickwit/quickwit-janitor/src/retention_policy_execution.rs @@ -117,11 +117,7 @@ pub async fn run_execute_parquet_retention_policy( ParquetSplitKind::Metrics }; let expired_splits: Vec = ctx - .protect_future(list_parquet_splits_paginated( - metastore.clone(), - kind, - query, - )) + .protect_future(list_parquet_splits_paginated(&metastore, kind, query)) .await?; if expired_splits.is_empty() { diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index 9a54edc57af..41597325e0e 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -42,6 +42,7 @@ pub use metastore::file_backed::FileBackedMetastore; pub(crate) use metastore::index_metadata::serialize::{IndexMetadataV0_8, VersionedIndexMetadata}; #[cfg(feature = "postgres")] pub use metastore::postgres::PostgresqlMetastore; +pub use metastore::read_service::{MetastoreReadService, MetastoreReadServiceClient}; pub use metastore::{ AddSourceRequestExt, CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt, @@ -52,7 +53,7 @@ pub use metastore::{ StageParquetSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, UpdateSourceRequestExt, file_backed, list_parquet_splits_page, list_parquet_splits_paginated, }; -pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; +pub use metastore_factory::{MetastoreFactory, MetastoreFactoryOptions, UnsupportedMetastore}; pub use metastore_resolver::MetastoreResolver; use quickwit_common::is_disjoint; use quickwit_doc_mapper::tag_pruning::TagFilterAst; diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs index 2c489eb4dd5..96823851221 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs @@ -26,7 +26,9 @@ use regex::Regex; use tokio::sync::Mutex; use tracing::debug; -use crate::{FileBackedMetastore, MetastoreFactory, MetastoreResolverError}; +use crate::{ + FileBackedMetastore, MetastoreFactory, MetastoreFactoryOptions, MetastoreResolverError, +}; /// A file-backed metastore factory. /// @@ -101,6 +103,7 @@ impl MetastoreFactory for FileBackedMetastoreFactory { &self, _metastore_config: &MetastoreConfig, uri: &Uri, + _options: MetastoreFactoryOptions, ) -> Result { let (uri_stripped, polling_interval_opt) = extract_polling_interval_from_uri(uri.as_str()); let uri = Uri::from_str(&uri_stripped).map_err(|_| { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index f94c254228f..49bc2a2395e 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -18,6 +18,7 @@ pub(crate) mod index_metadata; pub mod postgres; pub mod control_plane_metastore; +pub mod read_service; use std::cmp::Ordering; use std::ops::{Bound, RangeInclusive}; @@ -48,7 +49,7 @@ use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use crate::checkpoint::IndexCheckpointDelta; -use crate::{Split, SplitMetadata, SplitState}; +use crate::{MetastoreReadService, Split, SplitMetadata, SplitState}; /// Query parameters for listing parquet splits (metrics or sketches). #[derive(Debug, Clone, Serialize, Deserialize)] @@ -972,7 +973,7 @@ pub struct ParquetSplitsPage { /// Lists one parquet splits page and advances `query.after_split_id`. pub async fn list_parquet_splits_page( - metastore: &MetastoreServiceClient, + metastore: &dyn MetastoreReadService, kind: ParquetSplitKind, query: &mut ListParquetSplitsQuery, ) -> MetastoreResult { @@ -1011,7 +1012,7 @@ pub async fn list_parquet_splits_page( /// `page_size`; `after_split_id` is used as the starting cursor when already /// set and is advanced internally after each full page. pub async fn list_parquet_splits_paginated( - metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, kind: ParquetSplitKind, mut query: ListParquetSplitsQuery, ) -> MetastoreResult> { @@ -1019,7 +1020,7 @@ pub async fn list_parquet_splits_paginated( let mut splits = Vec::new(); loop { - let mut page = list_parquet_splits_page(&metastore, kind, &mut query).await?; + let mut page = list_parquet_splits_page(metastore, kind, &mut query).await?; splits.append(&mut page.splits); if !page.has_next_page { break; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs index 97aded689b1..5f872a611fa 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs @@ -22,7 +22,9 @@ use quickwit_proto::metastore::MetastoreServiceClient; use tokio::sync::Mutex; use tracing::debug; -use crate::{MetastoreFactory, MetastoreResolverError, PostgresqlMetastore}; +use crate::{ + MetastoreFactory, MetastoreFactoryOptions, MetastoreResolverError, PostgresqlMetastore, +}; #[derive(Clone, Default)] pub struct PostgresqlMetastoreFactory { @@ -31,30 +33,39 @@ pub struct PostgresqlMetastoreFactory { // In contrast to the file-backed metastore, we use a strong pointer here, so that the // `Metastore` doesn't get dropped. This is done in order to keep the underlying connection // pool to Postgres alive. - cache: Arc>>, + // + // The cache is keyed on the resolution options as well as the URI, so that the read-write and + // read-only clients for the same URI get their own connection pool. + cache: Arc>>, } impl PostgresqlMetastoreFactory { - async fn get_from_cache(&self, uri: &Uri) -> Option { + async fn get_from_cache( + &self, + uri: &Uri, + options: MetastoreFactoryOptions, + ) -> Option { let cache_lock = self.cache.lock().await; - cache_lock.get(uri).cloned() + cache_lock.get(&(uri.clone(), options)).cloned() } /// If there is a valid entry in the cache to begin with, we trash the new /// one and return the old one. /// /// This way we make sure that we keep only one instance associated - /// to the key `uri` outside of this struct. + /// to the cache key outside of this struct. async fn cache_metastore( &self, uri: Uri, + options: MetastoreFactoryOptions, metastore: MetastoreServiceClient, ) -> MetastoreServiceClient { let mut cache_lock = self.cache.lock().await; - if let Some(metastore) = cache_lock.get(&uri) { + let cache_key = (uri, options); + if let Some(metastore) = cache_lock.get(&cache_key) { return metastore.clone(); } - cache_lock.insert(uri, metastore.clone()); + cache_lock.insert(cache_key, metastore.clone()); metastore } } @@ -69,8 +80,9 @@ impl MetastoreFactory for PostgresqlMetastoreFactory { &self, metastore_config: &MetastoreConfig, uri: &Uri, + options: MetastoreFactoryOptions, ) -> Result { - if let Some(metastore) = self.get_from_cache(uri).await { + if let Some(metastore) = self.get_from_cache(uri, options).await { debug!("using metastore from cache"); return Ok(metastore); } @@ -82,12 +94,15 @@ impl MetastoreFactory for PostgresqlMetastoreFactory { ); MetastoreResolverError::InvalidConfig(message) })?; - let postgresql_metastore = PostgresqlMetastore::new(postgresql_metastore_config, uri) - .await - .map(MetastoreServiceClient::new) - .map_err(MetastoreResolverError::Initialization)?; + let postgresql_metastore = if options.read_only { + PostgresqlMetastore::new_read_only(postgresql_metastore_config, uri).await + } else { + PostgresqlMetastore::new(postgresql_metastore_config, uri).await + } + .map(MetastoreServiceClient::new) + .map_err(MetastoreResolverError::Initialization)?; let unique_metastore_for_uri = self - .cache_metastore(uri.clone(), postgresql_metastore) + .cache_metastore(uri.clone(), options, postgresql_metastore) .await; Ok(unique_metastore_for_uri) } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 43bd995e1ba..ab84117653c 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -104,11 +104,66 @@ impl fmt::Debug for PostgresqlMetastore { } } +/// Connection/migration options for a [`PostgresqlMetastore`]. +#[derive(Clone, Copy, Debug)] +struct PostgresqlMetastoreOptions { + read_only: bool, + skip_migrations: bool, + skip_locking: bool, +} + +impl PostgresqlMetastoreOptions { + /// Default options, with the read-only and migration flags read from the environment. + fn from_env() -> Self { + Self { + read_only: get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false), + skip_migrations: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false), + skip_locking: get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false), + } + } + + /// Options for a read-only metastore: the connection is read-only and migrations are skipped, + /// since the read replica cannot be written to and is migrated by the primary. + fn read_only() -> Self { + Self { + read_only: true, + skip_migrations: true, + skip_locking: false, + } + } +} + impl PostgresqlMetastore { /// Creates a metastore given a database URI. pub async fn new( postgres_metastore_config: &PostgresMetastoreConfig, connection_uri: &Uri, + ) -> MetastoreResult { + Self::new_with_options( + postgres_metastore_config, + connection_uri, + PostgresqlMetastoreOptions::from_env(), + ) + .await + } + + /// Creates a read-only metastore given a database URI. + pub async fn new_read_only( + postgres_metastore_config: &PostgresMetastoreConfig, + connection_uri: &Uri, + ) -> MetastoreResult { + Self::new_with_options( + postgres_metastore_config, + connection_uri, + PostgresqlMetastoreOptions::read_only(), + ) + .await + } + + async fn new_with_options( + postgres_metastore_config: &PostgresMetastoreConfig, + connection_uri: &Uri, + options: PostgresqlMetastoreOptions, ) -> MetastoreResult { let min_connections = postgres_metastore_config.min_connections; let max_connections = postgres_metastore_config.max_connections.get(); @@ -122,10 +177,6 @@ impl PostgresqlMetastore { .max_connection_lifetime_opt() .expect("PostgreSQL metastore config should have been validated"); - let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false); - let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); - let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); - let connection_pool = establish_connection( connection_uri, min_connections, @@ -133,11 +184,16 @@ impl PostgresqlMetastore { acquire_timeout, idle_timeout_opt, max_lifetime_opt, - read_only, + options.read_only, ) .await?; - run_migrations(&connection_pool, skip_migrations, skip_locking).await?; + run_migrations( + &connection_pool, + options.skip_migrations, + options.skip_locking, + ) + .await?; let metastore = PostgresqlMetastore { uri: connection_uri.clone(), diff --git a/quickwit/quickwit-metastore/src/metastore/read_service.rs b/quickwit/quickwit-metastore/src/metastore/read_service.rs new file mode 100644 index 00000000000..838cef9dd0a --- /dev/null +++ b/quickwit/quickwit-metastore/src/metastore/read_service.rs @@ -0,0 +1,107 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::sync::Arc; + +use async_trait::async_trait; +use quickwit_proto::metastore::{ + IndexMetadataRequest, IndexMetadataResponse, ListIndexesMetadataRequest, + ListIndexesMetadataResponse, ListMetricsSplitsRequest, ListMetricsSplitsResponse, + ListSketchSplitsRequest, ListSketchSplitsResponse, ListSplitsRequest, ListSplitsResponse, + MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, +}; + +/// Read-only subset of the metastore RPC surface that is safe to route to a read replica. +/// +/// Callers that only need stale-tolerant reads (e.g. searchers, the DataFusion analytics path) +/// should depend on this trait rather than on the full [`MetastoreService`], so that writes are +/// excluded at the type level. Add a method here only for RPCs that tolerate replication lag. +#[async_trait] +pub trait MetastoreReadService: fmt::Debug + Send + Sync + 'static { + /// Fetches index metadata. + async fn index_metadata( + &self, + request: IndexMetadataRequest, + ) -> MetastoreResult; + + /// Lists indexes metadata. + async fn list_indexes_metadata( + &self, + request: ListIndexesMetadataRequest, + ) -> MetastoreResult; + + /// Streams splits from indexes. + async fn list_splits( + &self, + request: ListSplitsRequest, + ) -> MetastoreResult>; + + /// Lists metrics parquet splits. + async fn list_metrics_splits( + &self, + request: ListMetricsSplitsRequest, + ) -> MetastoreResult; + + /// Lists sketch parquet splits. + async fn list_sketch_splits( + &self, + request: ListSketchSplitsRequest, + ) -> MetastoreResult; +} + +/// Cloneable read-only metastore handle. +/// +/// `Arc` derefs to `dyn MetastoreReadService`, so a +/// `&MetastoreReadServiceClient` coerces to the `&dyn MetastoreReadService` taken by the read-only +/// helpers — no newtype or blanket `impl ... for Arc` is needed. +pub type MetastoreReadServiceClient = Arc; + +#[async_trait] +impl MetastoreReadService for MetastoreServiceClient { + async fn index_metadata( + &self, + request: IndexMetadataRequest, + ) -> MetastoreResult { + MetastoreService::index_metadata(self, request).await + } + + async fn list_indexes_metadata( + &self, + request: ListIndexesMetadataRequest, + ) -> MetastoreResult { + MetastoreService::list_indexes_metadata(self, request).await + } + + async fn list_splits( + &self, + request: ListSplitsRequest, + ) -> MetastoreResult> { + MetastoreService::list_splits(self, request).await + } + + async fn list_metrics_splits( + &self, + request: ListMetricsSplitsRequest, + ) -> MetastoreResult { + MetastoreService::list_metrics_splits(self, request).await + } + + async fn list_sketch_splits( + &self, + request: ListSketchSplitsRequest, + ) -> MetastoreResult { + MetastoreService::list_sketch_splits(self, request).await + } +} diff --git a/quickwit/quickwit-metastore/src/metastore_factory.rs b/quickwit/quickwit-metastore/src/metastore_factory.rs index 872b16d3071..89eb0334090 100644 --- a/quickwit/quickwit-metastore/src/metastore_factory.rs +++ b/quickwit/quickwit-metastore/src/metastore_factory.rs @@ -19,6 +19,14 @@ use quickwit_proto::metastore::MetastoreServiceClient; use crate::MetastoreResolverError; +/// Options controlling how a metastore client is resolved. +#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq)] +pub struct MetastoreFactoryOptions { + /// Whether the resolved metastore client should use a read-only connection. Only supported by + /// the PostgreSQL backend. + pub read_only: bool, +} + /// A metastore factory builds a [`MetastoreServiceClient`] object for a target [`MetastoreBackend`] /// from a [`MetastoreConfig`] and a [`Uri`]. #[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] @@ -27,11 +35,13 @@ pub trait MetastoreFactory: Send + Sync + 'static { /// Returns the metastore backend targeted by the factory. fn backend(&self) -> MetastoreBackend; - /// Returns the appropriate [`MetastoreServiceClient`] object for the `uri`. + /// Returns the appropriate [`MetastoreServiceClient`] object for the `uri` and resolution + /// `options`. async fn resolve( &self, metastore_config: &MetastoreConfig, uri: &Uri, + options: MetastoreFactoryOptions, ) -> Result; } @@ -59,6 +69,7 @@ impl MetastoreFactory for UnsupportedMetastore { &self, _metastore_config: &MetastoreConfig, _uri: &Uri, + _options: MetastoreFactoryOptions, ) -> Result { Err(MetastoreResolverError::UnsupportedBackend( self.message.to_string(), diff --git a/quickwit/quickwit-metastore/src/metastore_resolver.rs b/quickwit/quickwit-metastore/src/metastore_resolver.rs index bc4d2cac6e7..e523f864d9f 100644 --- a/quickwit/quickwit-metastore/src/metastore_resolver.rs +++ b/quickwit/quickwit-metastore/src/metastore_resolver.rs @@ -25,7 +25,7 @@ use quickwit_storage::StorageResolver; use crate::metastore::file_backed::FileBackedMetastoreFactory; #[cfg(feature = "postgres")] use crate::metastore::postgres::PostgresqlMetastoreFactory; -use crate::{MetastoreFactory, MetastoreResolverError}; +use crate::{MetastoreFactory, MetastoreFactoryOptions, MetastoreResolverError}; type FactoryAndConfig = (Box, MetastoreConfig); @@ -53,6 +53,24 @@ impl MetastoreResolver { pub async fn resolve( &self, uri: &Uri, + ) -> Result { + self.resolve_inner(uri, MetastoreFactoryOptions::default()) + .await + } + + /// Resolves the given `uri` as a read-only metastore client. Only supported for PostgreSQL. + pub async fn resolve_read_only( + &self, + uri: &Uri, + ) -> Result { + self.resolve_inner(uri, MetastoreFactoryOptions { read_only: true }) + .await + } + + async fn resolve_inner( + &self, + uri: &Uri, + options: MetastoreFactoryOptions, ) -> Result { let backend = match uri.protocol() { Protocol::Azure => MetastoreBackend::File, @@ -67,13 +85,20 @@ impl MetastoreResolver { )); } }; + if options.read_only && backend != MetastoreBackend::PostgreSQL { + return Err(MetastoreResolverError::UnsupportedBackend( + "read-only metastore connections are only supported for PostgreSQL".to_string(), + )); + } let (metastore_factory, metastore_config) = self .per_backend_factories .get(&backend) .ok_or(MetastoreResolverError::UnsupportedBackend( "no metastore factory is registered for this backend".to_string(), ))?; - let metastore = metastore_factory.resolve(metastore_config, uri).await?; + let metastore = metastore_factory + .resolve(metastore_config, uri, options) + .await?; Ok(metastore) } @@ -184,6 +209,21 @@ mod tests { metastore_resolver.resolve(&metastore_uri).await.unwrap(); } + #[tokio::test] + async fn test_metastore_resolver_should_reject_read_only_file() { + let metastore_resolver = MetastoreResolver::unconfigured(); + let metastore_uri = Uri::from_str("ram:///metastore").unwrap(); + let error = metastore_resolver + .resolve_read_only(&metastore_uri) + .await + .unwrap_err(); + assert!(matches!( + error, + MetastoreResolverError::UnsupportedBackend(message) + if message == "read-only metastore connections are only supported for PostgreSQL" + )); + } + #[cfg(feature = "postgres")] #[tokio::test] async fn test_postgres_and_postgresql_protocol_accepted() { @@ -200,4 +240,31 @@ mod tests { metastore_resolver.resolve(&postgres_uri).await.unwrap(); } } + + #[cfg(feature = "postgres")] + #[tokio::test] + async fn test_metastore_resolver_resolve_read_only_postgres() { + use std::env; + + use quickwit_proto::metastore::{ListIndexesMetadataRequest, MetastoreService}; + + let metastore_resolver = MetastoreResolver::unconfigured(); + let test_database_url = env::var("QW_TEST_DATABASE_URL").unwrap_or_else(|_| { + "postgres://quickwit-dev:quickwit-dev@localhost/quickwit-metastore-dev".to_string() + }); + let postgres_uri = Uri::from_str(&test_database_url).unwrap(); + // The read replica skips migrations, so ensure the schema exists by resolving the + // read-write metastore first. + metastore_resolver.resolve(&postgres_uri).await.unwrap(); + + let read_only_metastore = metastore_resolver + .resolve_read_only(&postgres_uri) + .await + .unwrap(); + // A read-only connection must still serve read RPCs. + read_only_metastore + .list_indexes_metadata(ListIndexesMetadataRequest::all()) + .await + .unwrap(); + } } diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 8d2ad0f924d..2c445451c06 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -50,7 +50,7 @@ use quickwit_common::thread_pool::with_priority::ThreadPoolWithPriority; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; use quickwit_proto::metastore::{ - ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, MetastoreServiceClient, + ListIndexesMetadataRequest, ListSplitsRequest, MetastoreServiceClient, }; use tantivy::schema::NamedFieldDocument; @@ -65,7 +65,8 @@ use quickwit_config::SearcherConfig; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_metastore::{ IndexMetadata, ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, - MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, + MetastoreReadService, MetastoreReadServiceClient, MetastoreServiceStreamSplitsExt, + SplitMetadata, SplitState, }; use quickwit_proto::search::{ LeafResourceStats, PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets, @@ -182,7 +183,7 @@ fn extract_split_and_footer_offsets(split_metadata: &SplitMetadata) -> SplitIdAn /// Get all splits of given index ids pub async fn list_all_splits( index_uids: Vec, - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result> { list_relevant_splits(index_uids, None, None, None, metastore).await } @@ -193,7 +194,7 @@ pub async fn list_relevant_splits( start_timestamp: Option, end_timestamp: Option, tags_filter_opt: Option, - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result> { let Some(mut query) = ListSplitsQuery::try_from_index_uids(index_uids) else { return Ok(Vec::new()); @@ -222,7 +223,7 @@ pub async fn list_relevant_splits( /// Patterns follow the elastic search patterns. pub async fn resolve_index_patterns( index_id_patterns: &[String], - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result> { let list_indexes_metadata_request = if index_id_patterns.is_empty() { ListIndexesMetadataRequest::all() @@ -260,7 +261,7 @@ fn convert_document_to_json_string( /// Starts a search node, aka a `searcher`. pub async fn start_searcher_service( - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, search_job_placer: SearchJobPlacer, searcher_context: Arc, @@ -282,6 +283,7 @@ pub async fn single_node_search( metastore: MetastoreServiceClient, storage_resolver: StorageResolver, ) -> crate::Result { + let metastore: MetastoreReadServiceClient = Arc::new(metastore); let socket_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280u16); let searcher_pool = SearcherPool::default(); let search_job_placer = SearchJobPlacer::new(searcher_pool.clone()); @@ -300,7 +302,7 @@ pub async fn single_node_search( root_search( &searcher_context, search_request, - metastore, + metastore.as_ref(), &cluster_client, ) .await diff --git a/quickwit/quickwit-search/src/list_fields/root.rs b/quickwit/quickwit-search/src/list_fields/root.rs index 66d5947d123..5afb47368c1 100644 --- a/quickwit/quickwit-search/src/list_fields/root.rs +++ b/quickwit/quickwit-search/src/list_fields/root.rs @@ -21,8 +21,7 @@ use quickwit_common::rate_limited_warn; use quickwit_common::uri::Uri; use quickwit_config::build_doc_mapper; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; -use quickwit_metastore::SplitMetadata; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_metastore::{MetastoreReadService, SplitMetadata}; use quickwit_proto::search::{ LeafListFieldsRequest, ListFieldsEntry, ListFieldsRequest, ListFieldsResponse, }; @@ -54,10 +53,10 @@ struct IndexMetasForLeafSearch { pub async fn root_list_fields( list_fields_req: ListFieldsRequest, cluster_client: &ClusterClient, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result { let indexes_metadata = - resolve_index_patterns(&list_fields_req.index_id_patterns[..], &mut metastore).await?; + resolve_index_patterns(&list_fields_req.index_id_patterns[..], metastore).await?; // The request contains a wildcard, but couldn't find any index. if indexes_metadata.is_empty() { @@ -115,7 +114,7 @@ pub async fn root_list_fields( start_timestamp, end_timestamp, tags_filter_opt, - &mut metastore, + metastore, ) .await?; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 0c575793f41..7112f32db9e 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -21,9 +21,11 @@ use futures::future::try_join_all; use itertools::{Either, Itertools}; use quickwit_common::pretty::PrettySample; use quickwit_config::build_doc_mapper; -use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata}; +use quickwit_metastore::{ + ListSplitsRequestExt, MetastoreReadService, MetastoreServiceStreamSplitsExt, SplitMetadata, +}; use quickwit_metrics::HistogramTimer; -use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; +use quickwit_proto::metastore::ListSplitsRequest; use quickwit_proto::search::{ LeafListTermsRequest, LeafListTermsResponse, ListTermsRequest, ListTermsResponse, SplitIdAndFooterOffsets, SplitSearchError, @@ -48,12 +50,12 @@ use crate::{ClusterClient, SearchError, SearchJob, SearcherContext, resolve_inde #[instrument(skip(list_terms_request, cluster_client, metastore))] pub async fn root_list_terms( list_terms_request: &ListTermsRequest, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, cluster_client: &ClusterClient, ) -> crate::Result { let start_instant = tokio::time::Instant::now(); let indexes_metadata = - resolve_index_patterns(&list_terms_request.index_id_patterns, &mut metastore).await?; + resolve_index_patterns(&list_terms_request.index_id_patterns, metastore).await?; // The request contains a wildcard, but couldn't find any index. if indexes_metadata.is_empty() { return Ok(ListTermsResponse { @@ -113,7 +115,6 @@ pub async fn root_list_terms( .collect(); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&query)?; let split_metadatas: Vec = metastore - .clone() .list_splits(list_splits_request) .await? .collect_splits_metadata() diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 7624c7c8e30..ef79aa73e7b 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -27,10 +27,10 @@ use quickwit_common::uri::Uri; use quickwit_config::build_doc_mapper; use quickwit_doc_mapper::DYNAMIC_FIELD_NAME; use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; -use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt, SplitMetadata}; -use quickwit_proto::metastore::{ - ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, +use quickwit_metastore::{ + IndexMetadata, ListIndexesMetadataResponseExt, MetastoreReadService, SplitMetadata, }; +use quickwit_proto::metastore::ListIndexesMetadataRequest; use quickwit_proto::search::{ FetchDocsRequest, FetchDocsResponse, Hit, LeafHit, LeafRequestRef, LeafResourceStats, LeafSearchRequest, LeafSearchResponse, PartialHit, RootResourceStats, SearchPlanResponse, @@ -1208,7 +1208,7 @@ pub fn ensure_all_indexes_found( } async fn refine_and_list_matches( - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, search_request: &mut SearchRequest, indexes_metadata: Vec, query_ast_resolved: QueryAst, @@ -1251,7 +1251,7 @@ async fn refine_and_list_matches( /// Fetches the list of splits and their metadata from the metastore async fn plan_splits_for_root_search( search_request: &mut SearchRequest, - metastore: &mut MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result<(Vec, IndexesMetasForLeafSearch)> { let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1295,14 +1295,14 @@ async fn plan_splits_for_root_search( pub async fn root_search( searcher_context: &SearcherContext, mut search_request: SearchRequest, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, cluster_client: &ClusterClient, ) -> crate::Result { let start_instant = Instant::now(); let (split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture { start: start_instant, - tracked: plan_splits_for_root_search(&mut search_request, &mut metastore), + tracked: plan_splits_for_root_search(&mut search_request, metastore), is_success: None, step: RootSearchMetricsStep::Plan, } @@ -1364,7 +1364,7 @@ pub async fn root_search( /// Returns details on how a query would be executed pub async fn search_plan( mut search_request: SearchRequest, - mut metastore: MetastoreServiceClient, + metastore: &dyn MetastoreReadService, ) -> crate::Result { let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), @@ -1396,7 +1396,7 @@ pub async fn search_plan( let request_metadata = validate_request_and_build_metadata(&indexes_metadata, &search_request)?; let split_metadatas = refine_and_list_matches( - &mut metastore, + metastore, &mut search_request, indexes_metadata, request_metadata.query_ast_resolved.clone(), @@ -1902,7 +1902,8 @@ mod tests { use quickwit_indexing::MockSplitBuilder; use quickwit_metastore::{IndexMetadata, ListSplitsRequestExt, ListSplitsResponseExt}; use quickwit_proto::metastore::{ - ListIndexesMetadataResponse, ListSplitsResponse, MockMetastoreService, + ListIndexesMetadataResponse, ListSplitsResponse, MetastoreServiceClient, + MockMetastoreService, }; use quickwit_proto::search::{ ScrollRequest, SortByValue, SortOrder, SortValue, SplitSearchError, @@ -2758,7 +2759,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -2828,7 +2829,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -2920,7 +2921,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -3215,7 +3216,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await?; @@ -3333,7 +3334,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -3465,7 +3466,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request.clone(), - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await?; @@ -3647,7 +3648,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request.clone(), - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await?; @@ -3771,7 +3772,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - mock_metastore_client.clone(), + &mock_metastore_client, &cluster_client, ) .await @@ -3790,7 +3791,7 @@ mod tests { let search_error = root_search( &searcher_context, search_request, - mock_metastore_client, + &mock_metastore_client, &cluster_client, ) .await @@ -3915,7 +3916,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4055,7 +4056,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4136,7 +4137,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4203,7 +4204,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4293,7 +4294,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4375,7 +4376,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -4424,7 +4425,7 @@ mod tests { max_hits: 10, ..Default::default() }, - metastore.clone(), + &metastore, &cluster_client, ) .await @@ -4440,7 +4441,7 @@ mod tests { max_hits: 10, ..Default::default() }, - metastore, + &metastore, &cluster_client, ) .await @@ -4505,7 +4506,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await; @@ -4555,7 +4556,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - metastore.clone(), + &metastore, &cluster_client, ) .await; @@ -4575,7 +4576,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - metastore, + &metastore, &cluster_client, ) .await; @@ -4625,7 +4626,7 @@ mod tests { }); let search_response = search_plan( search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), ) .await .unwrap(); @@ -4712,7 +4713,7 @@ mod tests { ignore_missing_indexes: true, ..Default::default() }, - mock_metastore_service.clone(), + &mock_metastore_service, ) .await .unwrap(); @@ -4726,7 +4727,7 @@ mod tests { ignore_missing_indexes: false, ..Default::default() }, - mock_metastore_service.clone(), + &mock_metastore_service, ) .await .unwrap_err(); @@ -5093,7 +5094,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5359,7 +5360,7 @@ mod tests { let search_response = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5541,7 +5542,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5662,7 +5663,7 @@ mod tests { let search_response = root_search( &SearcherContext::for_test(), search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await @@ -5715,7 +5716,7 @@ mod tests { let search_error = root_search( &searcher_context, search_request, - MetastoreServiceClient::from_mock(mock_metastore), + &MetastoreServiceClient::from_mock(mock_metastore), &cluster_client, ) .await diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index db2b066ae83..456cb5fe8c7 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -20,7 +20,7 @@ use async_trait::async_trait; use quickwit_common::uri::Uri; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_metastore::MetastoreReadServiceClient; use quickwit_proto::search::{ FetchDocsRequest, FetchDocsResponse, GetKvRequest, Hit, LeafListFieldsRequest, LeafListTermsRequest, LeafListTermsResponse, LeafSearchRequest, LeafSearchResponse, @@ -47,7 +47,7 @@ use crate::{ClusterClient, SearchError, fetch_docs, root_search, search_plan}; #[derive(Clone)] /// The search service implementation. pub struct SearchServiceImpl { - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, cluster_client: ClusterClient, searcher_context: Arc, @@ -141,7 +141,7 @@ pub trait SearchService: 'static + Send + Sync { impl SearchServiceImpl { /// Creates a new search service. pub fn new( - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, cluster_client: ClusterClient, searcher_context: Arc, @@ -170,7 +170,7 @@ impl SearchService for SearchServiceImpl { let search_result = root_search( &self.searcher_context, search_request, - self.metastore.clone(), + self.metastore.as_ref(), &self.cluster_client, ) .await?; @@ -233,7 +233,7 @@ impl SearchService for SearchServiceImpl { ) -> crate::Result { let search_result = root_list_terms( &list_terms_request, - self.metastore.clone(), + self.metastore.as_ref(), &self.cluster_client, ) .await?; @@ -293,7 +293,7 @@ impl SearchService for SearchServiceImpl { root_list_fields( list_fields_req, &self.cluster_client, - self.metastore.clone(), + self.metastore.as_ref(), ) .await } @@ -320,7 +320,7 @@ impl SearchService for SearchServiceImpl { &self, search_request: SearchRequest, ) -> crate::Result { - let search_plan = search_plan(search_request, self.metastore.clone()).await?; + let search_plan = search_plan(search_request, self.metastore.as_ref()).await?; Ok(search_plan) } diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index c8d851d06cb..2b65560e1a3 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -966,7 +966,7 @@ async fn test_single_node_split_pruning_by_tags() -> anyhow::Result<()> { None, None, extract_tags_from_query(query_ast), - &mut test_sandbox.metastore(), + &test_sandbox.metastore(), ) .await?; assert!(selected_splits.is_empty()); @@ -978,7 +978,7 @@ async fn test_single_node_split_pruning_by_tags() -> anyhow::Result<()> { None, None, extract_tags_from_query(query_ast), - &mut test_sandbox.metastore(), + &test_sandbox.metastore(), ) .await?; assert_eq!(selected_splits.len(), 2); @@ -990,7 +990,7 @@ async fn test_single_node_split_pruning_by_tags() -> anyhow::Result<()> { None, None, extract_tags_from_query(query_ast), - &mut test_sandbox.metastore(), + &test_sandbox.metastore(), ) .await?; assert_eq!(selected_splits.len(), 2); diff --git a/quickwit/quickwit-serve/src/datafusion_api/setup.rs b/quickwit/quickwit-serve/src/datafusion_api/setup.rs index aad5e45a57b..46871ec0dfd 100644 --- a/quickwit/quickwit-serve/src/datafusion_api/setup.rs +++ b/quickwit/quickwit-serve/src/datafusion_api/setup.rs @@ -39,7 +39,7 @@ use quickwit_datafusion::{ DataFusionService, DataFusionSessionBuilder, QuickwitObjectStoreRegistry, QuickwitWorkerResolver, build_worker, }; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_metastore::MetastoreReadServiceClient; use quickwit_search::{SearchServiceClient, SearcherPool, create_search_client_from_grpc_addr}; use quickwit_storage::StorageResolver; use tokio::time::timeout; @@ -66,7 +66,7 @@ use crate::QuickwitServices; pub(crate) fn build_datafusion_session_builder( node_config: &NodeConfig, cluster_change_stream: ClusterChangeStream, - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, ) -> anyhow::Result>> { if !node_config.is_service_enabled(QuickwitService::Searcher) { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 0e733081f6b..a5795de6b4e 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -192,8 +192,7 @@ async fn get_index_metadata( metastore: MetastoreServiceClient, ) -> Result { let index_metadata_request = IndexMetadataRequest::for_index_id(index_id); - let index_metadata = metastore - .index_metadata(index_metadata_request) + let index_metadata = MetastoreService::index_metadata(&metastore, index_metadata_request) .await? .deserialize_index_metadata()?; Ok(index_metadata) @@ -205,12 +204,12 @@ async fn get_index_metadata( pub(crate) async fn es_compat_index_mapping( index_id: String, params: IndexMappingQueryParams, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, search_service: Arc, ) -> Result { let indexes_metadata = if index_id.contains('*') || index_id.contains(',') { let patterns: Vec = index_id.split(',').map(|s| s.trim().to_string()).collect(); - resolve_index_patterns(&patterns, &mut metastore).await? + resolve_index_patterns(&patterns, &metastore).await? } else { vec![get_index_metadata(index_id.clone(), metastore).await?] }; @@ -736,9 +735,9 @@ async fn es_compat_stats( pub(crate) async fn es_compat_index_stats( index_id_patterns: Vec, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, ) -> Result { - let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?; + let indexes_metadata = resolve_index_patterns(&index_id_patterns, &metastore).await?; // Index uid to index id mapping let index_uid_to_index_id: HashMap = indexes_metadata @@ -751,7 +750,7 @@ pub(crate) async fn es_compat_index_stats( .map(|index_metadata| index_metadata.index_uid) .collect_vec(); // calling into the search module is not necessary, but reuses established patterns - let splits_metadata = list_all_splits(index_uids, &mut metastore).await?; + let splits_metadata = list_all_splits(index_uids, &metastore).await?; let search_response_rest: ElasticsearchStatsResponse = convert_to_es_stats_response(index_uid_to_index_id, splits_metadata); @@ -769,10 +768,10 @@ pub(crate) async fn es_compat_cat_indices( pub(crate) async fn es_compat_index_cat_indices( index_id_patterns: Vec, query_params: CatIndexQueryParams, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, ) -> Result, ElasticsearchError> { query_params.validate()?; - let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?; + let indexes_metadata = resolve_index_patterns(&index_id_patterns, &metastore).await?; let mut index_id_to_resp: HashMap = indexes_metadata .iter() .map(|metadata| (metadata.index_uid.to_owned(), metadata.clone().into())) @@ -785,7 +784,7 @@ pub(crate) async fn es_compat_index_cat_indices( .collect_vec(); // calling into the search module is not necessary, but reuses established patterns - list_all_splits(index_uids, &mut metastore).await? + list_all_splits(index_uids, &metastore).await? }; let search_response_rest: Vec = @@ -815,9 +814,9 @@ pub(crate) async fn es_compat_index_cat_indices( pub(crate) async fn es_compat_resolve_index( index_id_patterns: Vec, - mut metastore: MetastoreServiceClient, + metastore: MetastoreServiceClient, ) -> Result { - let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?; + let indexes_metadata = resolve_index_patterns(&index_id_patterns, &metastore).await?; let mut indices: Vec = indexes_metadata .into_iter() .map(|metadata| metadata.into()) diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index c0cac95a146..4c139303229 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -62,7 +62,8 @@ pub(crate) async fn start_grpc_server( let cluster_grpc_service = cluster_grpc_server(services.cluster.clone()); file_descriptor_sets.push(quickwit_proto::cluster::CLUSTER_PLANE_FILE_DESCRIPTOR_SET); - // Mount gRPC metastore service if `QuickwitService::Metastore` is enabled on node. + // Mount gRPC metastore service if this node serves either the primary metastore or a + // read-only metastore replica. let metastore_grpc_service = if let Some(metastore_server) = &services.metastore_server_opt { enabled_grpc_services.insert("metastore"); file_descriptor_sets.push(quickwit_proto::metastore::METASTORE_FILE_DESCRIPTOR_SET); diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 31840382d2d..e15e4023e20 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -92,7 +92,8 @@ use quickwit_ingest::{ use quickwit_jaeger::JaegerService; use quickwit_janitor::{JanitorService, start_janitor_service}; use quickwit_metastore::{ - ControlPlaneMetastore, ListIndexesMetadataResponseExt, MetastoreResolver, + ControlPlaneMetastore, ListIndexesMetadataResponseExt, MetastoreReadServiceClient, + MetastoreResolver, }; use quickwit_opentelemetry::otlp::{OtlpGrpcLogsService, OtlpGrpcTracesService}; use quickwit_proto::control_plane::ControlPlaneServiceClient; @@ -229,6 +230,88 @@ impl QuickwitServices { } } +/// Computes the max number of in-flight requests for a metastore gRPC server, based on whether the +/// backing connection is a database (PostgreSQL) and its configured connection pool size. +fn metastore_max_in_flight_requests(node_config: &NodeConfig, uri: &Uri) -> usize { + if uri.protocol().is_database() { + node_config + .metastore_configs + .find_postgres() + .map(|config| config.max_connections.get() * 2) + .unwrap_or_default() + .max(100) + } else { + 100 + } +} + +async fn build_metastore_client( + cluster: &Cluster, + service: QuickwitService, + max_message_size: ByteSize, +) -> anyhow::Result { + info!(%service, "connecting to metastore service"); + + let balance_channel = balance_channel_for_service(cluster, service).await; + + if !balance_channel + .wait_for(Duration::from_secs(300), |connections| { + !connections.is_empty() + }) + .await + { + bail!("could not find any `{service}` node in the cluster"); + } + Ok(MetastoreServiceClient::tower() + .stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard()))) + .stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT)) + .stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( + get_metastore_client_max_concurrency(), + )) + .build_from_balance_channel(balance_channel, max_message_size, None)) +} + +/// Builds the list of metastore clients whose connectivity gates this node's readiness. +/// +/// `metastore` is the node's main metastore client (`metastore_through_control_plane`). The store +/// it resolves to depends on the role: +/// - on a `metastore` node, the locally served read-write primary; +/// - on a `metastore_read_replica` node, the locally served read-only replica; +/// - on any other node, a remote client to the primary `metastore` pool. +/// +/// `metastore_read_replica_opt` is the separate remote read-replica client built only for +/// searchers that route their reads to a replica. +fn build_metastore_readiness_clients( + node_config: &NodeConfig, + metastore: MetastoreServiceClient, + metastore_read_replica_opt: Option, +) -> Vec { + let mut readiness_clients = Vec::with_capacity(2); + + // Every role relies on the main metastore client except a searcher routed to a read replica, + // which reads from `metastore_read_replica_opt` below instead. A read-replica node is *not* an + // exception: its `metastore` is the replica it serves locally (see above), so it is checked + // here too. + let should_check_main_metastore = + node_config + .enabled_services + .iter() + .any(|service| match service { + QuickwitService::Searcher => { + !node_config.searcher_config.use_metastore_read_replica + } + _ => true, + }); + if should_check_main_metastore { + readiness_clients.push(metastore); + } + if let Some(read_replica_metastore) = metastore_read_replica_opt { + readiness_clients.push(read_replica_metastore); + } + readiness_clients +} + async fn balance_channel_for_service( cluster: &Cluster, service: QuickwitService, @@ -351,8 +434,11 @@ async fn start_control_plane_if_needed( balance_channel_for_service(cluster, QuickwitService::ControlPlane).await; // If the node is a metastore, we skip this check in order to avoid a deadlock. + // A read-replica metastore node is skipped for the same reason: it only serves read-only + // metastore traffic and does not need the control plane. // If the node is a searcher, we skip this check because the searcher does not need to. if !node_config.is_service_enabled(QuickwitService::Metastore) + && !node_config.is_service_enabled(QuickwitService::MetastoreReadReplica) && node_config.enabled_services != HashSet::from([QuickwitService::Searcher]) { info!("connecting to control plane"); @@ -458,7 +544,10 @@ pub async fn serve_quickwit( let universe = Universe::new(); let grpc_config = node_config.grpc_config.clone(); - // Instantiate a metastore "server" if the `metastore` role is enabled on the node. + // Instantiate a metastore "server" if the `metastore` or `metastore_read_replica` role is + // enabled on the node. Both roles expose the same gRPC metastore service; they differ only in + // the connection that is resolved (read-write primary vs. read-only replica) and in whether the + // control-plane event layers apply. let metastore_server_opt: Option = if node_config.is_service_enabled(QuickwitService::Metastore) { let metastore: MetastoreServiceClient = metastore_resolver @@ -470,20 +559,13 @@ pub async fn serve_quickwit( node_config.metastore_uri ) })?; - let max_in_flight_requests = if node_config.metastore_uri.protocol().is_database() { - node_config - .metastore_configs - .find_postgres() - .map(|config| config.max_connections.get() * 2) - .unwrap_or_default() - .max(100) - } else { - 100 - }; // These layers apply to all the RPCs of the metastore. let shared_layer = ServiceBuilder::new() .layer(METASTORE_GRPC_SERVER_METRICS_LAYER.clone()) - .layer(LoadShedLayer::new(max_in_flight_requests)) + .layer(LoadShedLayer::new(metastore_max_in_flight_requests( + &node_config, + &node_config.metastore_uri, + ))) .into_inner(); let broker_layer = EventListenerLayer::new(event_broker.clone()); let metastore = MetastoreServiceClient::tower() @@ -495,6 +577,30 @@ pub async fn serve_quickwit( .stack_toggle_source_layer(broker_layer) .build(metastore); Some(metastore) + } else if node_config.is_service_enabled(QuickwitService::MetastoreReadReplica) { + let read_replica_uri = node_config.metastore_read_replica_uri.as_ref().expect( + "`metastore_read_replica_uri` should be set when the `metastore_read_replica` \ + role is enabled (validated at config load)", + ); + let metastore: MetastoreServiceClient = metastore_resolver + .resolve_read_only(read_replica_uri) + .await + .with_context(|| { + format!("failed to resolve metastore read replica uri `{read_replica_uri}`") + })?; + // The replica is read-only, so the control-plane event layers (which only wrap write + // RPCs) do not apply here. + let shared_layer = ServiceBuilder::new() + .layer(METASTORE_GRPC_SERVER_METRICS_LAYER.clone()) + .layer(LoadShedLayer::new(metastore_max_in_flight_requests( + &node_config, + read_replica_uri, + ))) + .into_inner(); + let metastore = MetastoreServiceClient::tower() + .stack_layer(shared_layer) + .build(metastore); + Some(metastore) } else { None }; @@ -503,27 +609,12 @@ pub async fn serve_quickwit( if let Some(metastore_server) = &metastore_server_opt { metastore_server.clone() } else { - info!("connecting to metastore"); - - let balance_channel = - balance_channel_for_service(&cluster, QuickwitService::Metastore).await; - - if !balance_channel - .wait_for(Duration::from_secs(300), |connections| { - !connections.is_empty() - }) - .await - { - bail!("could not find any metastore node in the cluster"); - } - MetastoreServiceClient::tower() - .stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard()))) - .stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT)) - .stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) - .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( - get_metastore_client_max_concurrency(), - )) - .build_from_balance_channel(balance_channel, grpc_config.max_message_size, None) + build_metastore_client( + &cluster, + QuickwitService::Metastore, + grpc_config.max_message_size, + ) + .await? }; // Instantiate a control plane server if the `control-plane` role is enabled on the node. // Otherwise, instantiate a control plane client. @@ -676,12 +767,42 @@ pub async fn serve_quickwit( )) }; + // Searchers and the DataFusion analytics path use the primary metastore by default. When + // `searcher.use_metastore_read_replica` is enabled, searcher nodes require + // `metastore_read_replica` nodes and do not fall back to the primary if none are available. + let use_metastore_read_replica_for_search = node_config + .is_service_enabled(QuickwitService::Searcher) + && node_config.searcher_config.use_metastore_read_replica; + let search_read_replica_metastore_opt = if use_metastore_read_replica_for_search { + Some( + build_metastore_client( + &cluster, + QuickwitService::MetastoreReadReplica, + grpc_config.max_message_size, + ) + .await?, + ) + } else { + None + }; + let search_metastore_client: MetastoreReadServiceClient = + if let Some(read_replica_metastore) = &search_read_replica_metastore_opt { + Arc::new(read_replica_metastore.clone()) + } else { + Arc::new(metastore_through_control_plane.clone()) + }; + let metastore_readiness_clients = build_metastore_readiness_clients( + &node_config, + metastore_through_control_plane.clone(), + search_read_replica_metastore_opt, + ); + let (search_job_placer, search_service, searcher_pool) = setup_searcher( &node_config, cluster.change_stream(), // search remains available without a control plane because not all // metastore RPCs are proxied - metastore_through_control_plane.clone(), + search_metastore_client.clone(), storage_resolver.clone(), searcher_context, ) @@ -698,7 +819,7 @@ pub async fn serve_quickwit( let datafusion_session_builder = datafusion_api::setup::build_datafusion_session_builder( &node_config, cluster.change_stream(), - metastore_through_control_plane.clone(), + search_metastore_client.clone(), storage_resolver.clone(), )?; // The search job placer owns a clone of this pool; the local binding is not @@ -881,7 +1002,7 @@ pub async fn serve_quickwit( spawn_named_task( node_readiness_reporting_task( cluster.clone(), - metastore_through_control_plane, + metastore_readiness_clients, ingester_opt.clone(), grpc_readiness_signal_rx, rest_readiness_signal_rx, @@ -1185,7 +1306,7 @@ fn build_ingester_service( async fn setup_searcher( node_config: &NodeConfig, cluster_change_stream: ClusterChangeStream, - metastore: MetastoreServiceClient, + metastore: MetastoreReadServiceClient, storage_resolver: StorageResolver, searcher_context: Arc, ) -> anyhow::Result<(SearchJobPlacer, Arc, SearcherPool)> { @@ -1428,10 +1549,32 @@ fn with_arg(arg: T) -> impl Filter bool { + if metastores.is_empty() { + warn!("no metastore configured for readiness checks"); + return false; + } + futures::future::join_all(metastores.iter().map(|metastore| async move { + match metastore.check_connectivity().await { + Ok(()) => { + debug!(metastore_endpoints=?metastore.endpoints(), "metastore service is available"); + true + } + Err(error) => { + warn!(metastore_endpoints=?metastore.endpoints(), error=?error, "metastore service is unavailable"); + false + } + } + })) + .await + .into_iter() + .all(|metastore_is_available| metastore_is_available) +} + /// Reports node readiness to chitchat cluster every 10 seconds (25 ms for tests). async fn node_readiness_reporting_task( cluster: Cluster, - metastore: MetastoreServiceClient, + metastores: Vec, ingester_opt: Option, grpc_readiness_signal_rx: oneshot::Receiver<()>, rest_readiness_signal_rx: oneshot::Receiver<()>, @@ -1462,16 +1605,7 @@ async fn node_readiness_reporting_task( loop { interval.tick().await; - let metastore_is_available = match metastore.check_connectivity().await { - Ok(()) => { - debug!(metastore_endpoints=?metastore.endpoints(), "metastore service is available"); - true - } - Err(error) => { - warn!(metastore_endpoints=?metastore.endpoints(), error=?error, "metastore service is unavailable"); - false - } - }; + let metastore_is_available = metastores_are_available(&metastores).await; let ingester_is_available = if let Some(ingester) = &ingester_opt { match try_get_ingester_status(ingester).await { Ok(status) => { @@ -1569,6 +1703,26 @@ mod tests { use super::*; + fn metastore_readiness_client( + readiness_rx: watch::Receiver, + uri: &'static str, + ) -> MetastoreServiceClient { + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_check_connectivity() + .returning(move || { + if *readiness_rx.borrow() { + Ok(()) + } else { + Err(anyhow::anyhow!("metastore `{uri}` not ready")) + } + }); + mock_metastore + .expect_endpoints() + .return_const(vec![Uri::for_test(uri)]); + MetastoreServiceClient::from_mock(mock_metastore) + } + #[tokio::test] async fn test_check_cluster_configuration() { let services = HashSet::from_iter([QuickwitService::Metastore]); @@ -1602,16 +1756,7 @@ mod tests { .await .unwrap(); let (metastore_readiness_tx, metastore_readiness_rx) = watch::channel(false); - let mut mock_metastore = MockMetastoreService::new(); - mock_metastore - .expect_check_connectivity() - .returning(move || { - if *metastore_readiness_rx.borrow() { - Ok(()) - } else { - Err(anyhow::anyhow!("Metastore not ready")) - } - }); + let mock_metastore = metastore_readiness_client(metastore_readiness_rx, "ram:///metastore"); let (ingester_status_tx, ingester_status_rx) = watch::channel(IngesterStatus::Initializing); let mut mock_ingester = MockIngesterService::new(); mock_ingester @@ -1653,7 +1798,7 @@ mod tests { tokio::spawn(node_readiness_reporting_task( cluster.clone(), - MetastoreServiceClient::from_mock(mock_metastore), + vec![mock_metastore], Some(mock_ingester), grpc_readiness_signal_rx, rest_readiness_signal_rx, @@ -1685,6 +1830,145 @@ mod tests { assert_eq!(response.status(), ServingStatus::NotServing.into()); } + #[tokio::test] + async fn test_readiness_requires_all_metastores() { + let transport = ChitchatTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + .await + .unwrap(); + let (primary_readiness_tx, primary_readiness_rx) = watch::channel(false); + let (replica_readiness_tx, replica_readiness_rx) = watch::channel(false); + let primary_metastore = + metastore_readiness_client(primary_readiness_rx, "ram:///primary-metastore"); + let replica_metastore = + metastore_readiness_client(replica_readiness_rx, "ram:///replica-metastore"); + let (grpc_readiness_trigger_tx, grpc_readiness_signal_rx) = oneshot::channel(); + let (rest_readiness_trigger_tx, rest_readiness_signal_rx) = oneshot::channel(); + let (health_reporter, _health_service) = health_reporter(); + + tokio::spawn(node_readiness_reporting_task( + cluster.clone(), + vec![primary_metastore, replica_metastore], + None::, + grpc_readiness_signal_rx, + rest_readiness_signal_rx, + health_reporter, + )); + grpc_readiness_trigger_tx.send(()).unwrap(); + rest_readiness_trigger_tx.send(()).unwrap(); + + primary_readiness_tx.send(true).unwrap(); + tokio::time::sleep(READINESS_REPORTING_INTERVAL * 3).await; + assert!(!cluster.is_self_node_ready().await); + + replica_readiness_tx.send(true).unwrap(); + assert_eventually!(cluster.is_self_node_ready().await); + + primary_readiness_tx.send(false).unwrap(); + assert_eventually!(!cluster.is_self_node_ready().await); + } + + #[test] + fn test_build_metastore_readiness_clients() { + const MAIN_ENDPOINT: &str = "ram:///main-metastore"; + const READ_REPLICA_ENDPOINT: &str = "ram:///read-replica-metastore"; + + fn mock_metastore(endpoint: &'static str) -> MetastoreServiceClient { + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_endpoints() + .return_const(vec![Uri::for_test(endpoint)]); + MetastoreServiceClient::from_mock(mock_metastore) + } + + struct TestCase { + comment: &'static str, + enabled_services: &'static [QuickwitService], + use_metastore_read_replica: bool, + with_read_replica_client: bool, + expected_endpoints: &'static [&'static str], + } + + let test_cases = [ + TestCase { + comment: "indexer depends on the main metastore", + enabled_services: &[QuickwitService::Indexer], + use_metastore_read_replica: false, + with_read_replica_client: false, + expected_endpoints: &[MAIN_ENDPOINT], + }, + TestCase { + comment: "searcher without a read replica depends on the main metastore", + enabled_services: &[QuickwitService::Searcher], + use_metastore_read_replica: false, + with_read_replica_client: false, + expected_endpoints: &[MAIN_ENDPOINT], + }, + TestCase { + comment: "searcher routed to a read replica skips the main metastore", + enabled_services: &[QuickwitService::Searcher], + use_metastore_read_replica: true, + with_read_replica_client: true, + expected_endpoints: &[READ_REPLICA_ENDPOINT], + }, + TestCase { + comment: "searcher routed to a read replica still checks the main metastore for \ + its other roles", + enabled_services: &[QuickwitService::Searcher, QuickwitService::Indexer], + use_metastore_read_replica: true, + with_read_replica_client: true, + expected_endpoints: &[MAIN_ENDPOINT, READ_REPLICA_ENDPOINT], + }, + TestCase { + comment: "standalone read replica node checks its locally served metastore", + enabled_services: &[QuickwitService::MetastoreReadReplica], + use_metastore_read_replica: false, + with_read_replica_client: false, + expected_endpoints: &[MAIN_ENDPOINT], + }, + TestCase { + comment: "metastore node checks its locally served metastore", + enabled_services: &[QuickwitService::Metastore], + use_metastore_read_replica: false, + with_read_replica_client: false, + expected_endpoints: &[MAIN_ENDPOINT], + }, + ]; + + for test_case in test_cases { + let mut node_config = NodeConfig::for_test(); + node_config.enabled_services = test_case.enabled_services.iter().copied().collect(); + node_config.searcher_config.use_metastore_read_replica = + test_case.use_metastore_read_replica; + let read_replica_client_opt = if test_case.with_read_replica_client { + Some(mock_metastore(READ_REPLICA_ENDPOINT)) + } else { + None + }; + + let readiness_clients = build_metastore_readiness_clients( + &node_config, + mock_metastore(MAIN_ENDPOINT), + read_replica_client_opt, + ); + + let actual_endpoints: Vec = readiness_clients + .iter() + .flat_map(|metastore| metastore.endpoints()) + .collect(); + let expected_endpoints: Vec = test_case + .expected_endpoints + .iter() + .map(|endpoint| Uri::for_test(endpoint)) + .collect(); + assert_eq!( + actual_endpoints, expected_endpoints, + "{}", + test_case.comment + ); + } + } + #[tokio::test] async fn test_setup_indexer_pool() { let universe = Universe::with_accelerated_time(); @@ -1806,7 +2090,7 @@ mod tests { let (search_job_placer, _searcher_service, _searcher_pool) = setup_searcher( &node_config, change_stream, - metastore, + Arc::new(metastore), storage_resolver, searcher_context, )