Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,18 @@ openssl-probe = "0.2"
opentelemetry = "0.32"
opentelemetry-appender-tracing = "0.32"
opentelemetry_sdk = { version = "0.32", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.32", features = ["grpc-tonic", "http-json"] }
# `default-features = false` drops the default `reqwest-blocking-client`:
# see https://github.com/quickwit-oss/quickwit/pull/6558 for more details.
opentelemetry-otlp = { version = "0.32", default-features = false, features = [
"grpc-tonic",
"http-json",
"http-proto",
"internal-logs",
"logs",
"metrics",
"reqwest-client",
"trace",
] }
ouroboros = "0.18"
parquet = { version = "58", default-features = false, features = ["arrow", "experimental", "snap", "variant_experimental", "zstd"] }
percent-encoding = "2.3"
Expand Down
4 changes: 1 addition & 3 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ async fn main_impl() -> anyhow::Result<()> {
} else {
0
};

telemetry_handle.shutdown()?;

telemetry_handle.shutdown().await?;
std::process::exit(return_code)
}

Expand Down
9 changes: 7 additions & 2 deletions quickwit/quickwit-telemetry-exporters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ license.workspace = true
[dependencies]
anyhow = { workspace = true }
metrics = { workspace = true }
metrics-opentelemetry = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics-opentelemetry = { workspace = true }
metrics-util = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true }
opentelemetry-otlp = { workspace = true, features = ["experimental-grpc-retry", "experimental-http-retry"] }
Comment thread
guilload marked this conversation as resolved.
opentelemetry_sdk = { workspace = true }
opentelemetry_sdk = { workspace = true, features = [
"experimental_logs_batch_log_processor_with_async_runtime",
"experimental_metrics_periodicreader_with_async_runtime",
"experimental_trace_batch_span_processor_with_async_runtime",
] }
serde_json = { workspace = true }
time = { workspace = true, features = ["parsing"] }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
15 changes: 14 additions & 1 deletion quickwit/quickwit-telemetry-exporters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,20 @@ pub struct TelemetryHandle {
}

impl TelemetryHandle {
pub fn shutdown(self) -> anyhow::Result<()> {
/// Shuts down the OpenTelemetry providers, flushing any pending telemetry.
///
/// The providers are driven by processors spawned onto the Tokio runtime, so the underlying
/// `shutdown` blocks while it hands a flush message to those tasks and waits for the
/// acknowledgement. We run it on the blocking pool rather than inline so it never occupies a
/// runtime worker: the flush tasks can then make progress even when the runtime is configured
/// with a single worker.
pub async fn shutdown(self) -> anyhow::Result<()> {
tokio::task::spawn_blocking(move || self.shutdown_blocking())
.await
.context("failed to join telemetry shutdown task")?
}

fn shutdown_blocking(self) -> anyhow::Result<()> {
if let Some(tracer_provider) = self.tracer_provider {
tracer_provider
.shutdown()
Expand Down
13 changes: 8 additions & 5 deletions quickwit/quickwit-telemetry-exporters/src/otlp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ pub const QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER_ENV_KEY: &str =
"QW_ENABLE_OPENTELEMETRY_OTLP_EXPORTER";

const OTEL_EXPORTER_OTLP_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_PROTOCOL";
const OTEL_EXPORTER_OTLP_TRACES_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL";
const OTEL_EXPORTER_OTLP_LOGS_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_LOGS_PROTOCOL";
const OTEL_EXPORTER_OTLP_METRICS_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_METRICS_PROTOCOL";
const OTEL_EXPORTER_OTLP_TRACES_PROTOCOL_ENV_KEY: &str = "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL";

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum OtlpProtocol {
Expand All @@ -39,17 +39,20 @@ impl FromStr for OtlpProtocol {

fn from_str(protocol_str: &str) -> anyhow::Result<Self> {
const OTLP_PROTOCOL_GRPC: &str = "grpc";
const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf";
const OTLP_PROTOCOL_HTTP_JSON: &str = "http/json";
const OTLP_PROTOCOL_HTTP_PROTO: &str = "http/proto";
const OTLP_PROTOCOL_HTTP_PROTOBUF: &str = "http/protobuf";

match protocol_str {
OTLP_PROTOCOL_GRPC => Ok(OtlpProtocol::Grpc),
OTLP_PROTOCOL_HTTP_PROTOBUF => Ok(OtlpProtocol::HttpProtobuf),
OTLP_PROTOCOL_HTTP_JSON => Ok(OtlpProtocol::HttpJson),
OTLP_PROTOCOL_HTTP_PROTO | OTLP_PROTOCOL_HTTP_PROTOBUF => {
Ok(OtlpProtocol::HttpProtobuf)
}
other => anyhow::bail!(
"unsupported OTLP protocol `{other}`, supported values are \
`{OTLP_PROTOCOL_GRPC}`, `{OTLP_PROTOCOL_HTTP_PROTOBUF}` and \
`{OTLP_PROTOCOL_HTTP_JSON}`"
`{OTLP_PROTOCOL_GRPC}`, `{OTLP_PROTOCOL_HTTP_JSON}`, and \
`{OTLP_PROTOCOL_HTTP_PROTO}`"
),
}
}
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use anyhow::Context;
use opentelemetry_otlp::{
LogExporter, Protocol as OtlpWireProtocol, WithExportConfig, WithHttpConfig, WithTonicConfig,
};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor;
use opentelemetry_sdk::{Resource, runtime};

use crate::otlp::{OtlpExporterConfig, OtlpProtocol};

Expand All @@ -43,14 +44,16 @@ impl OtlpProtocol {
}
}

/// Builds the OTLP logger provider.
pub(crate) fn init_logger_provider(
otlp_config: &OtlpExporterConfig,
resource: Resource,
) -> anyhow::Result<SdkLoggerProvider> {
let logs_protocol = otlp_config.logs_protocol()?;
let log_exporter = logs_protocol.log_exporter()?;
let log_processor = BatchLogProcessor::builder(log_exporter, runtime::Tokio).build();
Ok(SdkLoggerProvider::builder()
.with_resource(resource)
.with_batch_exporter(log_exporter)
.with_log_processor(log_processor)
.build())
}
6 changes: 5 additions & 1 deletion quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use opentelemetry_otlp::{
MetricExporter, Protocol as OtlpWireProtocol, WithExportConfig, WithHttpConfig, WithTonicConfig,
};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader;
use opentelemetry_sdk::runtime;

use crate::otlp::{OtlpExporterConfig, OtlpProtocol, quickwit_resource};

Expand All @@ -44,15 +46,17 @@ impl OtlpProtocol {
}
}

/// Builds the OTLP metrics recorder and its meter provider.
pub(crate) fn build_recorder(
service_version: &str,
otlp_config: &OtlpExporterConfig,
) -> anyhow::Result<(OpenTelemetryRecorder, SdkMeterProvider)> {
let metrics_protocol = otlp_config.metrics_protocol()?;
let metric_exporter = metrics_protocol.metric_exporter()?;
let metric_reader = PeriodicReader::builder(metric_exporter, runtime::Tokio).build();
Comment thread
guilload marked this conversation as resolved.
let metrics_provider = SdkMeterProvider::builder()
.with_resource(quickwit_resource(service_version))
.with_periodic_exporter(metric_exporter)
.with_reader(metric_reader)
.build();
let meter = metrics_provider.meter("quickwit");

Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use anyhow::Context;
use opentelemetry_otlp::{
Protocol as OtlpWireProtocol, SpanExporter, WithExportConfig, WithHttpConfig, WithTonicConfig,
};
use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor;
use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider};
use opentelemetry_sdk::{Resource, trace};
use opentelemetry_sdk::{Resource, runtime};

use crate::otlp::{OtlpExporterConfig, OtlpProtocol};

Expand All @@ -43,13 +44,14 @@ impl OtlpProtocol {
}
}

/// Builds the OTLP tracer provider.
pub(crate) fn init_tracer_provider(
otlp_config: &OtlpExporterConfig,
resource: Resource,
) -> anyhow::Result<SdkTracerProvider> {
let traces_protocol = otlp_config.traces_protocol()?;
let span_exporter = traces_protocol.span_exporter()?;
let span_processor = trace::BatchSpanProcessor::builder(span_exporter)
let span_processor = BatchSpanProcessor::builder(span_exporter, runtime::Tokio)
Comment thread
guilload marked this conversation as resolved.
.with_batch_config(
BatchConfigBuilder::default()
// Quickwit can generate a lot of spans, especially in debug mode, and the
Expand Down
Loading