diff --git a/Cargo.lock b/Cargo.lock index 24b4193b..ba239f5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,6 +353,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dtoa" version = "1.0.10" @@ -422,6 +442,7 @@ dependencies = [ "fact-ebpf", "glob", "globset", + "governor", "http-body-util", "hyper", "hyper-tls", @@ -542,6 +563,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -549,9 +576,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "r-efi", + "wasip2", + "wasm-bindgen", ] [[package]] @@ -586,6 +629,29 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "governor" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9efcab3c1958580ff1f25a2a41be1668f7603d849bb63af523b208a3cc1223b8" +dependencies = [ + "cfg-if", + "dashmap", + "futures-sink", + "futures-timer", + "futures-util", + "getrandom 0.3.4", + "hashbrown 0.16.1", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand", + "smallvec", + "spinning_top", + "web-time", +] + [[package]] name = "h2" version = "0.4.11" @@ -605,6 +671,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.4" @@ -622,6 +694,8 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" dependencies = [ + "allocator-api2", + "equivalent", "foldhash 0.2.0", ] @@ -941,6 +1015,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "object" version = "0.36.7" @@ -1101,6 +1181,15 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "prettyplease" version = "0.2.35" @@ -1217,6 +1306,21 @@ dependencies = [ "pulldown-cmark", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.45" @@ -1232,6 +1336,44 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ec095654a25171c2124e9e3393a930bddbffdc939556c914957a4c3e0a87166" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +dependencies = [ + "getrandom 0.3.4", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.5.17" @@ -1286,7 +1428,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1419,6 +1561,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "strsim" version = "0.11.1" @@ -1449,10 +1600,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom", + "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1703,7 +1854,7 @@ version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" dependencies = [ - "getrandom", + "getrandom 0.4.1", "js-sys", "wasm-bindgen", ] @@ -1845,6 +1996,48 @@ dependencies = [ "semver", ] +[[package]] +name = "web-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.2.1" @@ -2032,6 +2225,26 @@ dependencies = [ "hashlink", ] +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.14" diff --git a/Cargo.toml b/Cargo.toml index 525e8040..85be5dc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ clap = { version = "4.5.41", features = ["derive", "env"] } env_logger = { version = "0.11.5", default-features = false, features = ["humantime"] } glob = "0.3.3" globset = "0.4.18" +governor = "0.10.4" http-body-util = "0.1.3" hyper = { version = "1.6.0", default-features = false } hyper-tls = "0.6.0" diff --git a/fact/Cargo.toml b/fact/Cargo.toml index a2b946f6..5a7d900c 100644 --- a/fact/Cargo.toml +++ b/fact/Cargo.toml @@ -12,6 +12,7 @@ clap = { workspace = true } env_logger = { workspace = true } glob = { workspace = true } globset = { workspace = true } +governor = { workspace = true } http-body-util = { workspace = true } hyper = { workspace = true } hyper-tls = { workspace = true } diff --git a/fact/src/config/mod.rs b/fact/src/config/mod.rs index 4dd16f97..aa7747a2 100644 --- a/fact/src/config/mod.rs +++ b/fact/src/config/mod.rs @@ -33,6 +33,7 @@ pub struct FactConfig { json: Option, hotreload: Option, scan_interval: Option, + rate_limit: Option, } impl FactConfig { @@ -94,6 +95,10 @@ impl FactConfig { if let Some(scan_interval) = from.scan_interval { self.scan_interval = Some(scan_interval); } + + if let Some(rate_limit) = from.rate_limit { + self.rate_limit = Some(rate_limit); + } } pub fn paths(&self) -> &[PathBuf] { @@ -116,6 +121,10 @@ impl FactConfig { self.scan_interval.unwrap_or(Duration::from_secs(30)) } + pub fn rate_limit(&self) -> u64 { + self.rate_limit.unwrap_or(0) + } + #[cfg(test)] pub fn set_paths(&mut self, paths: Vec) { self.paths = Some(paths); @@ -224,6 +233,16 @@ impl TryFrom> for FactConfig { bail!("scan_interval field has incorrect type: {v:?}"); } } + "rate_limit" => { + // rate_limit == 0 means unlimited (no throttling) + let Some(rate_limit) = v.as_i64() else { + bail!("rate_limit field has incorrect type: {v:?}"); + }; + if rate_limit < 0 { + bail!("invalid rate_limit: {rate_limit}"); + } + config.rate_limit = Some(rate_limit as u64); + } name => bail!("Invalid field '{name}' with value: {v:?}"), } } @@ -518,6 +537,15 @@ pub struct FactCli { /// Default value is 30 seconds #[arg(long, short, env = "FACT_SCAN_INTERVAL")] scan_interval: Option, + + /// Maximum number of file events to allow per second + /// + /// Events exceeding this rate will be dropped. A value of 0 + /// means unlimited (no throttling). + /// + /// Default value is 0 (unlimited) + #[arg(long, short = 'l', env = "FACT_RATE_LIMIT")] + rate_limit: Option, } impl FactCli { @@ -541,6 +569,7 @@ impl FactCli { json: resolve_bool_arg(self.json, self.no_json), hotreload: resolve_bool_arg(self.hotreload, self.no_hotreload), scan_interval: self.scan_interval.map(Duration::from_secs_f64), + rate_limit: self.rate_limit, } } } diff --git a/fact/src/config/reloader.rs b/fact/src/config/reloader.rs index e5a764ea..2ac14742 100644 --- a/fact/src/config/reloader.rs +++ b/fact/src/config/reloader.rs @@ -18,6 +18,7 @@ pub struct Reloader { paths: watch::Sender>, files: HashMap<&'static str, i64>, scan_interval: watch::Sender, + rate_limit: watch::Sender, trigger: Arc, } @@ -82,6 +83,12 @@ impl Reloader { self.scan_interval.subscribe() } + /// Subscribe to get notifications when rate_limit configuration + /// is changed. + pub fn rate_limit(&self) -> watch::Receiver { + self.rate_limit.subscribe() + } + /// Get a reference to the internal trigger for manual reloading of /// configuration. /// @@ -189,6 +196,17 @@ impl Reloader { } }); + self.rate_limit.send_if_modified(|old| { + let new = new.rate_limit(); + if *old != new { + debug!("Sending new rate limit configuration..."); + *old = new; + true + } else { + false + } + }); + if self.config.hotreload() != new.hotreload() { warn!("Changes to the hotreload field only take effect on startup"); } @@ -222,6 +240,7 @@ impl From for Reloader { let (grpc, _) = watch::channel(config.grpc.clone()); let (paths, _) = watch::channel(config.paths().to_vec()); let (scan_interval, _) = watch::channel(config.scan_interval()); + let (rate_limit, _) = watch::channel(config.rate_limit()); let trigger = Arc::new(Notify::new()); Reloader { @@ -230,6 +249,7 @@ impl From for Reloader { grpc, paths, scan_interval, + rate_limit, files, trigger, } diff --git a/fact/src/config/tests.rs b/fact/src/config/tests.rs index 571f879a..0079d672 100644 --- a/fact/src/config/tests.rs +++ b/fact/src/config/tests.rs @@ -236,6 +236,20 @@ fn parsing() { ..Default::default() }, ), + ( + "rate_limit: 0", + FactConfig { + rate_limit: Some(0), + ..Default::default() + }, + ), + ( + "rate_limit: 1000", + FactConfig { + rate_limit: Some(1000), + ..Default::default() + }, + ), ( r#" paths: @@ -274,6 +288,7 @@ fn parsing() { }, hotreload: Some(false), scan_interval: Some(Duration::from_secs(60)), + rate_limit: None, }, ), ]; @@ -1031,6 +1046,7 @@ fn update() { }, hotreload: Some(true), scan_interval: Some(Duration::from_secs(30)), + rate_limit: None, }, FactConfig { paths: Some(vec![PathBuf::from("/etc")]), @@ -1051,6 +1067,7 @@ fn update() { }, hotreload: Some(false), scan_interval: Some(Duration::from_secs(60)), + rate_limit: None, }, ), ]; diff --git a/fact/src/lib.rs b/fact/src/lib.rs index 27c0d668..3312bcca 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -6,6 +6,7 @@ use host_info::{SystemInfo, get_distro, get_hostname}; use host_scanner::HostScanner; use log::{LevelFilter, debug, info, warn}; use metrics::exporter::Exporter; +use rate_limiter::RateLimiter; use tokio::{ signal::unix::{SignalKind, signal}, sync::{mpsc, watch}, @@ -20,6 +21,7 @@ mod host_scanner; mod metrics; mod output; mod pre_flight; +mod rate_limiter; use config::FactConfig; use pre_flight::pre_flight; @@ -92,14 +94,22 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { exporter.metrics.host_scanner.clone(), )?; - output::start( + let rate_limiter = RateLimiter::new( host_scanner.subscribe(), + reloader.rate_limit(), + running.subscribe(), + exporter.metrics.rate_limiter.clone(), + )?; + + output::start( + rate_limiter.subscribe(), running.subscribe(), exporter.metrics.output.clone(), reloader.grpc(), reloader.config().json(), )?; let mut host_scanner_handle = host_scanner.start(); + let mut rate_limiter_handle = rate_limiter.start(); endpoints::Server::new(exporter.clone(), reloader.endpoint(), running.subscribe()).start(); let mut bpf_handle = bpf.start(running.subscribe(), exporter.metrics.bpf_worker.clone()); reloader.start(running.subscribe()); @@ -129,6 +139,15 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { } break; } + res = rate_limiter_handle.borrow_mut() => { + match res { + Ok(res) => if let Err(e) = res { + warn!("Rate limiter worker errored out: {e:?}"); + } + Err(e) => warn!("Rate limiter task errored out: {e:?}"), + } + break; + } } } diff --git a/fact/src/metrics/mod.rs b/fact/src/metrics/mod.rs index 97213c83..3c5e7d96 100644 --- a/fact/src/metrics/mod.rs +++ b/fact/src/metrics/mod.rs @@ -137,6 +137,7 @@ impl OutputMetrics { pub struct Metrics { pub bpf_worker: EventCounter, + pub rate_limiter: EventCounter, pub output: OutputMetrics, pub host_scanner: HostScannerMetrics, } @@ -154,6 +155,13 @@ impl Metrics { ); bpf_worker.register(registry); + let rate_limiter = EventCounter::new( + "rate_limiter_events", + "Events processed by the rate limiter", + &[LabelValues::Added, LabelValues::Dropped], + ); + rate_limiter.register(registry); + let output_metrics = OutputMetrics::new(); output_metrics.register(registry); @@ -162,6 +170,7 @@ impl Metrics { Metrics { bpf_worker, + rate_limiter, output: output_metrics, host_scanner, } diff --git a/fact/src/rate_limiter.rs b/fact/src/rate_limiter.rs new file mode 100644 index 00000000..af914995 --- /dev/null +++ b/fact/src/rate_limiter.rs @@ -0,0 +1,106 @@ +use governor::{ + Quota, + clock::DefaultClock, + state::{InMemoryState, NotKeyed}, +}; +use std::num::NonZeroU32; +use std::sync::Arc; +use tokio::sync::{ + broadcast::{self, error::RecvError}, + watch, +}; +use tokio::task::JoinHandle; + +use crate::event::Event; +use crate::metrics::EventCounter; + +pub struct RateLimiter { + // the governor::RateLimiter handles the actual rate limiting. For now + // we use NotKeyed because we want to globally rate limit all events + // but in the future we could introduce a key to limit in more flexible ways + // (using a String; process name, container id, whatever) + limiter: Option>, + rx: broadcast::Receiver>, + tx: broadcast::Sender>, + rate_config: watch::Receiver, + running: watch::Receiver, + metrics: EventCounter, +} + +impl RateLimiter { + pub fn new( + rx: broadcast::Receiver>, + rate_config: watch::Receiver, + running: watch::Receiver, + metrics: EventCounter, + ) -> anyhow::Result { + let limiter = Self::build_limiter(*rate_config.borrow()); + let (tx, _) = broadcast::channel(100); + + Ok(RateLimiter { + limiter, + rx, + tx, + rate_config, + running, + metrics, + }) + } + + fn build_limiter( + rate: u64, + ) -> Option> { + if rate == 0 { + None + } else { + let rate = NonZeroU32::new(rate as u32).expect("rate > 0"); + Some(governor::RateLimiter::direct(Quota::per_second(rate))) + } + } + + fn reload_limiter(&mut self) -> anyhow::Result<()> { + let rate = *self.rate_config.borrow(); + self.limiter = Self::build_limiter(rate); + Ok(()) + } + + pub fn subscribe(&self) -> broadcast::Receiver> { + self.tx.subscribe() + } + + pub fn start(mut self) -> JoinHandle> { + tokio::spawn(async move { + loop { + tokio::select! { + event = self.rx.recv() => { + let event = match event { + Ok(e) => e, + Err(RecvError::Lagged(n)) => { + self.metrics.dropped_n(n); + continue; + } + Err(RecvError::Closed) => break, + }; + + if let Some(limiter) = &self.limiter && limiter.check().is_err() { + self.metrics.dropped(); + continue; + } + + self.metrics.added(); + let _ = self.tx.send(event); + }, + _ = self.rate_config.changed() => { + self.reload_limiter()?; + }, + _ = self.running.changed() => { + if !*self.running.borrow() { + break; + } + }, + } + } + Ok(()) + }) + } +} diff --git a/tests/test_rate_limit.py b/tests/test_rate_limit.py new file mode 100644 index 00000000..080606f7 --- /dev/null +++ b/tests/test_rate_limit.py @@ -0,0 +1,107 @@ +import os +import time +from time import sleep + +import pytest +import requests +import yaml + +from event import Event, EventType, Process + +@pytest.fixture +def rate_limited_config(fact, fact_config, monitored_dir): + """ + Configure rate limiting after fact has started, then hot-reload. + Sets rate_limit to 10 events/second. + """ + config, config_file = fact_config + config['rate_limit'] = 10 + with open(config_file, 'w') as f: + yaml.dump(config, f) + + fact.kill('SIGHUP') + sleep(0.1) + return config, config_file + +def test_rate_limit_drops_events(rate_limited_config, monitored_dir, server): + """ + Test that the rate limiter drops events when the rate limit is exceeded. + """ + config, _ = rate_limited_config + num_files = 100 + start_time = time.time() + + for i in range(num_files): + fut = os.path.join(monitored_dir, f'file_{i}.txt') + with open(fut, 'w') as f: + f.write(f'test {i}') + + elapsed = time.time() - start_time + print(f'Created {num_files} files in {elapsed:.3f} seconds') + + time.sleep(2) + + received_count = 0 + while not server.is_empty(): + server.get_next() + received_count += 1 + + print(f'Received {received_count} events out of {num_files}') + + assert received_count < num_files, \ + f'Expected rate limiting to drop some events, but received all {received_count}' + + metrics_response = requests.get(f'http://{config["endpoint"]["address"]}/metrics') + assert metrics_response.status_code == 200 + + metrics_text = metrics_response.text + assert 'rate_limiter_events' in metrics_text, 'rate_limiter_events metric not found' + + dropped_count = 0 + for line in metrics_text.split('\n'): + if 'rate_limiter_events' in line and 'label="Dropped"' in line: + parts = line.split() + if len(parts) >= 2: + dropped_count = int(parts[1]) + break + + assert dropped_count > 0, 'Expected rate limiter to report dropped events in metrics' + + total_accounted = received_count + dropped_count + + assert total_accounted == num_files, 'Expected rate limiter to see all events' + +def test_rate_limit_unlimited(monitored_dir, server, fact_config): + """ + Test that the default config (rate_limit=0) allows all events through. + """ + config, _ = fact_config + num_files = 20 + events = [] + process = Process.from_proc() + + for i in range(num_files): + fut = os.path.join(monitored_dir, f'file_{i}.txt') + with open(fut, 'w') as f: + f.write(f'test {i}') + + events.append( + Event(process=process, event_type=EventType.CREATION, file=fut, host_path=fut)) + + server.wait_events(events) + + metrics_response = requests.get(f'http://{config["endpoint"]["address"]}/metrics') + assert metrics_response.status_code == 200 + + metrics_text = metrics_response.text + + dropped_count = 0 + for line in metrics_text.split('\n'): + if 'rate_limiter_events' in line and 'label="Dropped"' in line: + parts = line.split() + if len(parts) >= 2: + dropped_count = int(parts[1]) + break + + assert dropped_count == 0, \ + f'Expected no dropped events with unlimited rate limiting, but got {dropped_count}'