diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 608c3204836..8dff8eb3c12 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -9468,6 +9468,7 @@ dependencies = [ "quickwit-metrics", "serde_json", "time", + "tokio", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 9f8662ccf7e..37bc842b88e 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -114,6 +114,17 @@ fn init_telemetry( Ok((telemetry_handle, env_filter_reload_fn)) } +fn shutdown_telemetry(telemetry_handle: quickwit_telemetry_exporters::TelemetryHandle) { + // Shut down on a plain OS thread: dropping OTel exporters that hold Tokio runtimes from a + // Tokio worker thread triggers "Cannot drop a runtime in a context where blocking is not + // allowed". + match std::thread::spawn(move || telemetry_handle.shutdown()).join() { + Ok(Ok(())) => {} + Ok(Err(error)) => eprintln!("warning: failed to shutdown telemetry cleanly: {error:#}"), + Err(_) => eprintln!("warning: telemetry shutdown thread panicked"), + } +} + async fn main_impl() -> anyhow::Result<()> { let (command, ansi_colors) = parse_cli_command(); @@ -148,7 +159,7 @@ async fn main_impl() -> anyhow::Result<()> { 0 }; - telemetry_handle.shutdown()?; + shutdown_telemetry(telemetry_handle); std::process::exit(return_code) } diff --git a/quickwit/quickwit-telemetry-exporters/Cargo.toml b/quickwit/quickwit-telemetry-exporters/Cargo.toml index 616ca25972b..3df17d8d8be 100644 --- a/quickwit/quickwit-telemetry-exporters/Cargo.toml +++ b/quickwit/quickwit-telemetry-exporters/Cargo.toml @@ -22,6 +22,7 @@ opentelemetry-otlp = { workspace = true, features = ["experimental-grpc-retry", opentelemetry_sdk = { workspace = true } serde_json = { workspace = true } time = { workspace = true, features = ["parsing"] } +tokio = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/quickwit/quickwit-telemetry-exporters/src/lib.rs b/quickwit/quickwit-telemetry-exporters/src/lib.rs index 3b8a93ead22..0af3b85c479 100644 --- a/quickwit/quickwit-telemetry-exporters/src/lib.rs +++ b/quickwit/quickwit-telemetry-exporters/src/lib.rs @@ -49,20 +49,32 @@ pub struct TelemetryHandle { impl TelemetryHandle { pub fn shutdown(self) -> anyhow::Result<()> { + let mut errors = Vec::new(); + if let Some(tracer_provider) = self.tracer_provider { - tracer_provider - .shutdown() - .context("failed to shutdown OpenTelemetry tracer provider")?; + if let Err(error) = tracer_provider.shutdown() { + errors.push(format!( + "failed to shutdown OpenTelemetry tracer provider: {error:#}" + )); + } } if let Some(logger_provider) = self.logger_provider { - logger_provider - .shutdown() - .context("failed to shutdown OpenTelemetry logger provider")?; + if let Err(error) = logger_provider.shutdown() { + errors.push(format!( + "failed to shutdown OpenTelemetry logger provider: {error:#}" + )); + } } if let Some(meter_provider) = self.meter_provider { - meter_provider - .shutdown() - .context("failed to shutdown OpenTelemetry meter provider")?; + if let Err(error) = meter_provider.shutdown() { + errors.push(format!( + "failed to shutdown OpenTelemetry meter provider: {error:#}" + )); + } + } + + if !errors.is_empty() { + anyhow::bail!("{}", errors.join("; ")); } Ok(()) } @@ -84,7 +96,9 @@ type ReloadLayer = tracing_subscriber::reload::Layer(ansi_colors: bool) -> impl Layer + Send + Sync + 'static -where S: Subscriber + for<'span> LookupSpan<'span> { +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ let event_format = logs::EventFormat::get_from_env(); let fmt_fields = event_format.format_fields(); tracing_subscriber::fmt::layer() diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs index 4f2949bb612..356daa17af4 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs @@ -19,6 +19,7 @@ use opentelemetry_otlp::{ use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::SdkLoggerProvider; +use super::tokio_runtime_exporter::TokioRuntimeExporter; use crate::otlp::{OtlpExporterConfig, OtlpProtocol}; impl OtlpProtocol { @@ -49,6 +50,7 @@ pub(crate) fn init_logger_provider( ) -> anyhow::Result { let logs_protocol = otlp_config.logs_protocol()?; let log_exporter = logs_protocol.log_exporter()?; + let log_exporter = TokioRuntimeExporter::new(log_exporter)?; Ok(SdkLoggerProvider::builder() .with_resource(resource) .with_batch_exporter(log_exporter) diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs index d43eb3dceb0..1238fb027e9 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs @@ -20,6 +20,7 @@ use opentelemetry_otlp::{ }; use opentelemetry_sdk::metrics::SdkMeterProvider; +use super::tokio_runtime_exporter::TokioRuntimeExporter; use crate::otlp::{OtlpExporterConfig, OtlpProtocol, quickwit_resource}; impl OtlpProtocol { @@ -50,6 +51,7 @@ pub(crate) fn build_recorder( ) -> anyhow::Result<(OpenTelemetryRecorder, SdkMeterProvider)> { let metrics_protocol = otlp_config.metrics_protocol()?; let metric_exporter = metrics_protocol.metric_exporter()?; + let metric_exporter = TokioRuntimeExporter::new(metric_exporter)?; let metrics_provider = SdkMeterProvider::builder() .with_resource(quickwit_resource(service_version)) .with_periodic_exporter(metric_exporter) diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs index 43f99c6afed..06864efa238 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs @@ -15,6 +15,7 @@ mod config; pub(crate) mod logs; pub(crate) mod metrics; +mod tokio_runtime_exporter; pub(crate) mod traces; pub(crate) use config::{OtlpExporterConfig, OtlpProtocol, quickwit_resource}; diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/tokio_runtime_exporter.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/tokio_runtime_exporter.rs new file mode 100644 index 00000000000..4120c5abfec --- /dev/null +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/tokio_runtime_exporter.rs @@ -0,0 +1,117 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use opentelemetry::logs::Severity; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::metrics::Temporality; +use opentelemetry_sdk::metrics::data::ResourceMetrics; +use opentelemetry_sdk::metrics::exporter::PushMetricExporter; +use opentelemetry_sdk::trace::{SpanData, SpanExporter}; + +pub(super) struct TokioRuntimeExporter { + inner: E, + runtime: Arc, +} + +impl TokioRuntimeExporter { + pub(super) fn new(inner: E) -> anyhow::Result { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(1) + .thread_name("opentelemetry_exporter_runtime") + .build() + .context("failed to create OpenTelemetry exporter Tokio runtime")?; + Ok(Self { + inner, + runtime: Arc::new(runtime), + }) + } +} + +impl fmt::Debug for TokioRuntimeExporter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TokioRuntimeExporter") + .finish_non_exhaustive() + } +} + +impl SpanExporter for TokioRuntimeExporter +where + E: SpanExporter, +{ + async fn export(&self, batch: Vec) -> OTelSdkResult { + self.runtime.block_on(self.inner.export(batch)) + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + self.inner.shutdown_with_timeout(timeout) + } + + fn force_flush(&self) -> OTelSdkResult { + self.inner.force_flush() + } + + fn set_resource(&mut self, resource: &Resource) { + self.inner.set_resource(resource); + } +} + +impl LogExporter for TokioRuntimeExporter +where + E: LogExporter, +{ + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + self.runtime.block_on(self.inner.export(batch)) + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + self.inner.shutdown_with_timeout(timeout) + } + + fn event_enabled(&self, level: Severity, target: &str, name: Option<&str>) -> bool { + self.inner.event_enabled(level, target, name) + } + + fn set_resource(&mut self, resource: &Resource) { + self.inner.set_resource(resource); + } +} + +impl PushMetricExporter for TokioRuntimeExporter +where + E: PushMetricExporter, +{ + async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult { + self.runtime.block_on(self.inner.export(metrics)) + } + + fn force_flush(&self) -> OTelSdkResult { + self.inner.force_flush() + } + + fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult { + self.inner.shutdown_with_timeout(timeout) + } + + fn temporality(&self) -> Temporality { + self.inner.temporality() + } +} diff --git a/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs b/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs index 67b5db32e18..560a151107c 100644 --- a/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs +++ b/quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs @@ -19,6 +19,7 @@ use opentelemetry_otlp::{ use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider}; use opentelemetry_sdk::{Resource, trace}; +use super::tokio_runtime_exporter::TokioRuntimeExporter; use crate::otlp::{OtlpExporterConfig, OtlpProtocol}; impl OtlpProtocol { @@ -49,6 +50,7 @@ pub(crate) fn init_tracer_provider( ) -> anyhow::Result { let traces_protocol = otlp_config.traces_protocol()?; let span_exporter = traces_protocol.span_exporter()?; + let span_exporter = TokioRuntimeExporter::new(span_exporter)?; let span_processor = trace::BatchSpanProcessor::builder(span_exporter) .with_batch_config( BatchConfigBuilder::default()