Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/quickwit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ version: 0.8
# metastore_uri: s3://your-bucket/indexes
# metastore_uri: postgres://username:password@host:port/db
#
# Optional PostgreSQL read replica URI used by metastore nodes for gRPC requests
# carrying the `qw-use-read-replica: true` metadata. Defaults to unset.
#
# metastore_read_replica_uri: postgres://username:password@read-replica-host:port/db
#
# When using a file-backed metastore, the state of the metastore will be cached forever.
# If you are indexing and searching from different processes, it is possible to periodically
# refresh the state of the metastore on the searcher using the `polling_interval` hashtag.
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/node-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ A commented example is available here: [quickwit.yaml](https://github.com/quickw
| `peer_seeds` | List of IP addresses or hostnames used to bootstrap the cluster and discover the complete set of nodes. This list may contain the current node address and does not need to be exhaustive. If the list of peer seeds contains a host name, Quickwit will resolve it by querying the DNS every minute. On kubernetes for instance, it is a good practise to set it to a [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). | `QW_PEER_SEEDS` | |
| `data_dir` | Path to directory where data (tmp data, splits kept for caching purpose) is persisted. This is mostly used in indexing. | `QW_DATA_DIR` | `./qwdata` |
| `metastore_uri` | Metastore URI. Can be a local directory or `s3://my-bucket/indexes` or `postgres://username:password@localhost:5432/metastore`. [Learn more about the metastore configuration](metastore-config.md). | `QW_METASTORE_URI` | `{data_dir}/indexes` |
| `metastore_read_replica_uri` | Optional PostgreSQL read replica URI used by metastore nodes when a gRPC request carries the `qw-use-read-replica: true` metadata. If unset, those requests use `metastore_uri`. | `QW_METASTORE_READ_REPLICA_URI` | |
| `default_index_root_uri` | Default index root URI that defines the location where index data (splits) is stored. The index URI is built following the scheme: `{default_index_root_uri}/{index-id}` | `QW_DEFAULT_INDEX_ROOT_URI` | `{data_dir}/indexes` |
| environment variable only | Log level of Quickwit. Can be a direct log level, or a comma separated list of `module_name=level` | `RUST_LOG` | `info` |

Expand Down
25 changes: 23 additions & 2 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ use std::fs;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use predicates::str;
use quickwit_cli::ClientArgs;
use quickwit_cli::service::RunCliCommand;
use quickwit_common::net::find_available_tcp_port;
use quickwit_common::test_utils::wait_for_server_ready;
use quickwit_common::test_utils::wait_until_predicate;
use quickwit_common::uri::Uri;
use quickwit_config::service::QuickwitService;
use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, MetastoreResolver};
Expand Down Expand Up @@ -174,7 +175,7 @@ impl TestEnv {
_ = server_handle => {}
}
});
wait_for_server_ready(([127, 0, 0, 1], self.rest_listen_port).into()).await?;
wait_for_quickwit_ready(self.rest_listen_port).await?;
Ok(())
}

Expand All @@ -186,6 +187,26 @@ impl TestEnv {
}
}

async fn wait_for_quickwit_ready(rest_listen_port: u16) -> anyhow::Result<()> {
let ready_url = format!("http://127.0.0.1:{rest_listen_port}/health/readyz");
wait_until_predicate(
|| async {
let Ok(response) = reqwest::get(&ready_url).await else {
return false;
};
if !response.status().is_success() {
return false;
}
response.json::<bool>().await.unwrap_or(false)
},
Duration::from_secs(10),
Duration::from_millis(50),
)
.await
.with_context(|| format!("quickwit server did not become ready at `{ready_url}`"))?;
Ok(())
}

pub enum TestStorageType {
S3,
LocalFileSystem,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) async fn cluster_grpc_client(

ClusterServiceClient::tower()
.stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone())
.build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE, None)
.build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE, None, [])
}

pub fn cluster_grpc_server(
Expand Down
16 changes: 14 additions & 2 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions quickwit/quickwit-codegen/example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ mod tests {
"127.0.0.1:6666".parse().unwrap(),
Endpoint::from_static("http://127.0.0.1:6666").connect_lazy(),
);
let grpc_client = HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE, None);
let grpc_client =
HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE, None, []);

assert_eq!(
grpc_client
Expand Down Expand Up @@ -342,7 +343,7 @@ mod tests {
// The connectivity check fails if there is no client behind the channel.
let (balance_channel, _): (BalanceChannel<SocketAddr>, _) = BalanceChannel::new();
let grpc_client =
HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE, None);
HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE, None, []);
assert_eq!(
grpc_client
.check_connectivity()
Expand Down Expand Up @@ -441,6 +442,7 @@ mod tests {
channel,
MAX_GRPC_MESSAGE_SIZE,
Some(CompressionEncoding::Zstd),
[],
);

assert_eq!(
Expand Down Expand Up @@ -788,7 +790,7 @@ mod tests {
"127.0.0.1:7777".parse().unwrap(),
Endpoint::from_static("http://127.0.0.1:7777").connect_lazy(),
);
HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE, None);
HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE, None, []);
}

#[tokio::test]
Expand Down Expand Up @@ -876,7 +878,7 @@ mod tests {
.timeout(Duration::from_millis(100))
.connect_lazy();
let max_message_size = ByteSize::mib(1);
let grpc_client = HelloClient::from_channel(addr, channel, max_message_size, None);
let grpc_client = HelloClient::from_channel(addr, channel, max_message_size, None, []);

let error = grpc_client
.hello(HelloRequest {
Expand Down Expand Up @@ -955,7 +957,7 @@ mod tests {
// this test hangs forever if we comment out the TimeoutLayer, which
// shows that a request without explicit timeout might hang forever
.stack_layer(TimeoutLayer::new(Duration::from_secs(3)))
.build_from_balance_channel(balance_channel, ByteSize::mib(1), None);
.build_from_balance_channel(balance_channel, ByteSize::mib(1), None, []);

let response_fut = async move {
grpc_client
Expand Down
45 changes: 37 additions & 8 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ impl CodegenContext {
}
}

fn is_metastore_service(context: &CodegenContext) -> bool {
context.package_name == "quickwit.metastore" && context.service_name == "MetastoreService"
}

fn generate_all(
service: &Service,
result_type_path: &str,
Expand Down Expand Up @@ -637,6 +641,21 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
} else {
TokenStream::new()
};
let metastore_read_replica_client_methods = if is_metastore_service(context) {
quote! {
pub fn as_grpc_service_with_read_replica(
&self,
read_replica: Option<Self>,
max_message_size: bytesize::ByteSize,
) -> crate::grpc_read_replica::ReadReplicaGrpcService<#grpc_server_package_name::#grpc_server_name<#grpc_server_adapter_name>> {
let primary = self.as_grpc_service(max_message_size);
let read_replica = read_replica.map(|client| client.as_grpc_service(max_message_size));
crate::grpc_read_replica::ReadReplicaGrpcService::new(primary, read_replica)
}
}
} else {
TokenStream::new()
};

quote! {
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -671,15 +690,21 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
.max_encoding_message_size(max_message_size.0 as usize)
}

#metastore_read_replica_client_methods

pub fn from_channel(
addr: std::net::SocketAddr,
channel: tonic::transport::Channel,
max_message_size: bytesize::ByteSize,
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
) -> Self
{
let (_, connection_keys_watcher) = tokio::sync::watch::channel(std::collections::HashSet::from_iter([addr]));
let mut client = #grpc_client_package_name::#grpc_client_name::new(channel)
let mut client = #grpc_client_package_name::#grpc_client_name::with_interceptor(
channel,
quickwit_common::tower::GrpcInterceptors::new(interceptors),
)
.max_decoding_message_size(max_message_size.0 as usize)
.max_encoding_message_size(max_message_size.0 as usize);
if let Some(compression_encoding) = compression_encoding_opt {
Expand All @@ -695,10 +720,14 @@ fn generate_client(context: &CodegenContext) -> TokenStream {
balance_channel: quickwit_common::tower::BalanceChannel<std::net::SocketAddr>,
max_message_size: bytesize::ByteSize,
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
) -> #client_name
{
let connection_keys_watcher = balance_channel.connection_keys_watcher();
let mut client = #grpc_client_package_name::#grpc_client_name::new(balance_channel)
let mut client = #grpc_client_package_name::#grpc_client_name::with_interceptor(
balance_channel,
quickwit_common::tower::GrpcInterceptors::new(interceptors),
)
.max_decoding_message_size(max_message_size.0 as usize)
.max_encoding_message_size(max_message_size.0 as usize);
if let Some(compression_encoding) = compression_encoding_opt {
Expand Down Expand Up @@ -1024,7 +1053,6 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {

svc_attribute_idents.push(svc_attribute_name);
}

quote! {
impl #tower_layer_stack_name {
pub fn stack_layer<L>(mut self, layer: L) -> Self
Expand All @@ -1051,9 +1079,10 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {
channel: tonic::transport::Channel,
max_message_size: bytesize::ByteSize,
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
) -> #client_name
{
let client = #client_name::from_channel(addr, channel, max_message_size, compression_encoding_opt);
let client = #client_name::from_channel(addr, channel, max_message_size, compression_encoding_opt, interceptors);
let inner_client = client.inner;
self.build_from_inner_client(inner_client)
}
Expand All @@ -1063,9 +1092,10 @@ fn generate_layer_stack_impl(context: &CodegenContext) -> TokenStream {
balance_channel: quickwit_common::tower::BalanceChannel<std::net::SocketAddr>,
max_message_size: bytesize::ByteSize,
compression_encoding_opt: Option<tonic::codec::CompressionEncoding>,
interceptors: impl IntoIterator<Item = quickwit_common::tower::GrpcInterceptor>,
) -> #client_name
{
let client = #client_name::from_balance_channel(balance_channel, max_message_size, compression_encoding_opt);
let client = #client_name::from_balance_channel(balance_channel, max_message_size, compression_encoding_opt, interceptors);
let inner_client = client.inner;
self.build_from_inner_client(inner_client)
}
Expand Down Expand Up @@ -1279,7 +1309,7 @@ fn generate_grpc_client_adapter(context: &CodegenContext) -> TokenStream {
pub fn new(instance: T, connection_addrs_rx: tokio::sync::watch::Receiver<std::collections::HashSet<std::net::SocketAddr>>) -> Self {
Self {
inner: instance,
connection_addrs_rx
connection_addrs_rx,
}
}
}
Expand Down Expand Up @@ -1435,8 +1465,7 @@ fn generate_grpc_server_adapter_methods(context: &CodegenContext) -> TokenStream
let span = #span_macro;
let _ = <tracing::Span as tracing_opentelemetry::OpenTelemetrySpanExt>::set_parent(&span, parent_context);
let fut = async move {
self.inner
.0
self.inner.0
.#method_name(request)
.await
.map(#into_response_type)
Expand Down
Loading
Loading