Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ fn init_telemetry(
Ok((telemetry_handle, env_filter_reload_fn))
}

fn shutdown_telemetry(telemetry_handle: quickwit_telemetry_exporters::TelemetryHandle) {
// Shut down on a plain OS thread: dropping OTel exporters that hold Tokio runtimes from a
// Tokio worker thread triggers "Cannot drop a runtime in a context where blocking is not
// allowed".
match std::thread::spawn(move || telemetry_handle.shutdown()).join() {
Ok(Ok(())) => {}
Ok(Err(error)) => eprintln!("warning: failed to shutdown telemetry cleanly: {error:#}"),
Err(_) => eprintln!("warning: telemetry shutdown thread panicked"),
}
}

async fn main_impl() -> anyhow::Result<()> {
let (command, ansi_colors) = parse_cli_command();

Expand Down Expand Up @@ -148,7 +159,7 @@ async fn main_impl() -> anyhow::Result<()> {
0
};

telemetry_handle.shutdown()?;
shutdown_telemetry(telemetry_handle);

std::process::exit(return_code)
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-telemetry-exporters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ opentelemetry-otlp = { workspace = true, features = ["experimental-grpc-retry",
opentelemetry_sdk = { workspace = true }
serde_json = { workspace = true }
time = { workspace = true, features = ["parsing"] }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
34 changes: 24 additions & 10 deletions quickwit/quickwit-telemetry-exporters/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,32 @@ pub struct TelemetryHandle {

impl TelemetryHandle {
pub fn shutdown(self) -> anyhow::Result<()> {
let mut errors = Vec::new();

if let Some(tracer_provider) = self.tracer_provider {
tracer_provider
.shutdown()
.context("failed to shutdown OpenTelemetry tracer provider")?;
if let Err(error) = tracer_provider.shutdown() {
errors.push(format!(
"failed to shutdown OpenTelemetry tracer provider: {error:#}"
));
}
}
if let Some(logger_provider) = self.logger_provider {
logger_provider
.shutdown()
.context("failed to shutdown OpenTelemetry logger provider")?;
if let Err(error) = logger_provider.shutdown() {
errors.push(format!(
"failed to shutdown OpenTelemetry logger provider: {error:#}"
));
}
}
if let Some(meter_provider) = self.meter_provider {
meter_provider
.shutdown()
.context("failed to shutdown OpenTelemetry meter provider")?;
if let Err(error) = meter_provider.shutdown() {
errors.push(format!(
"failed to shutdown OpenTelemetry meter provider: {error:#}"
));
}
}

if !errors.is_empty() {
anyhow::bail!("{}", errors.join("; "));
}
Ok(())
}
Expand All @@ -84,7 +96,9 @@ type ReloadLayer = tracing_subscriber::reload::Layer<EnvFilter, tracing_subscrib

/// Returns the regular Quickwit logging layer.
pub fn logging_layer<S>(ansi_colors: bool) -> impl Layer<S> + Send + Sync + 'static
where S: Subscriber + for<'span> LookupSpan<'span> {
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let event_format = logs::EventFormat::get_from_env();
let fmt_fields = event_format.format_fields();
tracing_subscriber::fmt::layer()
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-telemetry-exporters/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use opentelemetry_otlp::{
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::SdkLoggerProvider;

use super::tokio_runtime_exporter::TokioRuntimeExporter;
use crate::otlp::{OtlpExporterConfig, OtlpProtocol};

impl OtlpProtocol {
Expand Down Expand Up @@ -49,6 +50,7 @@ pub(crate) fn init_logger_provider(
) -> anyhow::Result<SdkLoggerProvider> {
let logs_protocol = otlp_config.logs_protocol()?;
let log_exporter = logs_protocol.log_exporter()?;
let log_exporter = TokioRuntimeExporter::new(log_exporter)?;
Ok(SdkLoggerProvider::builder()
.with_resource(resource)
.with_batch_exporter(log_exporter)
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-telemetry-exporters/src/otlp/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use opentelemetry_otlp::{
};
use opentelemetry_sdk::metrics::SdkMeterProvider;

use super::tokio_runtime_exporter::TokioRuntimeExporter;
use crate::otlp::{OtlpExporterConfig, OtlpProtocol, quickwit_resource};

impl OtlpProtocol {
Expand Down Expand Up @@ -50,6 +51,7 @@ pub(crate) fn build_recorder(
) -> anyhow::Result<(OpenTelemetryRecorder, SdkMeterProvider)> {
let metrics_protocol = otlp_config.metrics_protocol()?;
let metric_exporter = metrics_protocol.metric_exporter()?;
let metric_exporter = TokioRuntimeExporter::new(metric_exporter)?;
let metrics_provider = SdkMeterProvider::builder()
.with_resource(quickwit_resource(service_version))
.with_periodic_exporter(metric_exporter)
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-telemetry-exporters/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod config;
pub(crate) mod logs;
pub(crate) mod metrics;
mod tokio_runtime_exporter;
pub(crate) mod traces;

pub(crate) use config::{OtlpExporterConfig, OtlpProtocol, quickwit_resource};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use opentelemetry::logs::Severity;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
use opentelemetry_sdk::trace::{SpanData, SpanExporter};

pub(super) struct TokioRuntimeExporter<E> {
inner: E,
runtime: Arc<tokio::runtime::Runtime>,
}

impl<E> TokioRuntimeExporter<E> {
pub(super) fn new(inner: E) -> anyhow::Result<Self> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(1)
.thread_name("opentelemetry_exporter_runtime")
.build()
.context("failed to create OpenTelemetry exporter Tokio runtime")?;
Ok(Self {
inner,
runtime: Arc::new(runtime),
})
}
}

impl<E> fmt::Debug for TokioRuntimeExporter<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TokioRuntimeExporter")
.finish_non_exhaustive()
}
}

impl<E> SpanExporter for TokioRuntimeExporter<E>
where
E: SpanExporter,
{
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
self.runtime.block_on(self.inner.export(batch))
}

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}

fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}

fn set_resource(&mut self, resource: &Resource) {
self.inner.set_resource(resource);
}
}

impl<E> LogExporter for TokioRuntimeExporter<E>
where
E: LogExporter,
{
async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult {
self.runtime.block_on(self.inner.export(batch))
}

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}

fn event_enabled(&self, level: Severity, target: &str, name: Option<&str>) -> bool {
self.inner.event_enabled(level, target, name)
}

fn set_resource(&mut self, resource: &Resource) {
self.inner.set_resource(resource);
}
}

impl<E> PushMetricExporter for TokioRuntimeExporter<E>
where
E: PushMetricExporter,
{
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
self.runtime.block_on(self.inner.export(metrics))
}

fn force_flush(&self) -> OTelSdkResult {
self.inner.force_flush()
}

fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
self.inner.shutdown_with_timeout(timeout)
}

fn temporality(&self) -> Temporality {
self.inner.temporality()
}
}
2 changes: 2 additions & 0 deletions quickwit/quickwit-telemetry-exporters/src/otlp/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use opentelemetry_otlp::{
use opentelemetry_sdk::trace::{BatchConfigBuilder, SdkTracerProvider};
use opentelemetry_sdk::{Resource, trace};

use super::tokio_runtime_exporter::TokioRuntimeExporter;
use crate::otlp::{OtlpExporterConfig, OtlpProtocol};

impl OtlpProtocol {
Expand Down Expand Up @@ -49,6 +50,7 @@ pub(crate) fn init_tracer_provider(
) -> anyhow::Result<SdkTracerProvider> {
let traces_protocol = otlp_config.traces_protocol()?;
let span_exporter = traces_protocol.span_exporter()?;
let span_exporter = TokioRuntimeExporter::new(span_exporter)?;
let span_processor = trace::BatchSpanProcessor::builder(span_exporter)
.with_batch_config(
BatchConfigBuilder::default()
Expand Down
Loading