Skip to content

filter_write_tag: in_emitter: Mitigate data loss on multiline#12029

Open
cosmo0920 wants to merge 4 commits into
masterfrom
cosmo0920-mitigate-data-loss-on-multiline
Open

filter_write_tag: in_emitter: Mitigate data loss on multiline#12029
cosmo0920 wants to merge 4 commits into
masterfrom
cosmo0920-mitigate-data-loss-on-multiline

Conversation

@cosmo0920

@cosmo0920 cosmo0920 commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

With the reproducer, I addressed like as:

% bash /private/tmp/rewrite_tag_repro_local.sh
Binary:  bin/fluent-bit
Runs:    1   Cycles/run: 150   Emitter limit: 1M   Flush: 5s

===== run 1/1 =====
  follower : 150/150 delivered (100.00%)
  timeout  : 150/150 delivered (100.00%)
  RESULT: timeout_lost=0 follower_lost=0

========================================================
Details
#!/usr/bin/env bash
# Self-contained reproducer for the in_emitter FLB_EMITTER_BUSY (-2) silent
# record-loss bug in Fluent Bit's rewrite_tag filter.
#
# Bug: when a rewrite_tag rule re-emits a record into a shared in_emitter
# whose mem-buf-limit is already reached, in_emitter_add_record() returns
# FLB_EMITTER_BUSY (-2) WITHOUT buffering the record. rewrite_tag.c only
# checks `if (ret == -1)`, so the -2 falls through, the filter reports the
# record as emitted, and with keep_record=false (the trailing `false` on the
# Rule) the original is dropped from the stream. Net effect: permanent,
# silent loss of that record.
#
# Why a TIMEOUT-flushed multiline record specifically: #8473's back-pressure
# pauses the SOURCE tail when the emitter is over its limit, which stops
# read-coupled flushes. But a multiline TIMEOUT flush is driven by an idle
# TIMER, not by a read, so it fires while the tail is paused and reaches the
# still-paused emitter -> hits the -2 path. Read-coupled "follower" flushes
# (flushed because the next start line arrived) only happen while reading,
# i.e. while the emitter has room, so they are never lost.
#
# This local variant keeps the original reproducer workload, config, and tally
# logic, but runs a local Fluent Bit binary instead of a stock container image.
# The writer runs as a local POSIX sh process and both processes share a temp
# directory:
#   writer (sh)          -- floods a noise file to keep the shared emitter at
#                           its limit, and single-writes victim burst+idle
#                           pairs so each pair's trailing line is timeout-flushed.
#   fluent-bit (FB_BIN)  -- tails both files; both inputs re-emit through ONE
#                           rewrite_tag/in_emitter; victim -> stdout, noise -> null.
# The host then tallies victim delivery by flush class from the fb stdout.
#
# Usage:
#   bash repro.sh                         # 1 run, defaults
#   RUNS=5 bash repro.sh                  # 5 runs, report repro rate
#   CYCLES=200 bash repro.sh              # more cycles -> more events
#   FB_BIN=build/bin/fluent-bit bash repro.sh
#
# Exit status: 0 if no timeout-flushed loss in any run, 1 if loss occurred.
set -euo pipefail

FB_BIN="${FB_BIN:-bin/fluent-bit}"
RUNS="${RUNS:-1}"
CYCLES="${CYCLES:-150}"          # victim burst+idle pairs per run
IDLE="${IDLE:-1.2}"             # idle after the timeout line (must exceed 1s flush_timeout)
RECV_GAP="${RECV_GAP:-0.007}"   # gap between the follower line and the timeout line
DRAIN="${DRAIN:-8}"             # seconds after last line for the final flushes to land
FLUSH="${FLUSH:-5}"             # engine flush; re-emitted chunks sit this long between drains
EMITTER_LIMIT="${EMITTER_LIMIT:-1M}"  # shared in_emitter mem-buf-limit; lower = easier to pause
NOISE_SIZE="${NOISE_SIZE:-65536}"     # bytes per noise line
NOISE_BURST="${NOISE_BURST:-16}"      # noise lines per burst
NOISE_GAP="${NOISE_GAP:-0.3}"         # seconds between noise bursts

if [[ ! -x "$FB_BIN" ]]; then
    echo "ERROR: FB_BIN is not executable: $FB_BIN" >&2
    exit 2
fi

CFG_DIR="$(mktemp -d)"
if [[ "${KEEP_DIR:-0}" = "1" ]]; then
    trap 'echo "kept repro dir: '"$CFG_DIR"'"' EXIT
else
    trap 'rm -rf "$CFG_DIR"' EXIT
fi

# --- generated config: minimal, generic, no project-specific naming ---------
cat > "$CFG_DIR/parsers.conf" <<'PARSERS'
# Each victim line starts with a "[YYYY-MM-DD HH:MM:SS" preamble, so every
# line is a multiline start_state -> its own one-line record. The previous
# record flushes when the next start line arrives (follower) or, for the last
# line before an idle gap, when flush_timeout (1000ms) fires (timeout).
[MULTILINE_PARSER]
    name           repro
    type           regex
    flush_timeout  1000
    rule  "start_state"  "/^\[\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}/"  "cont"
    rule  "cont"         "/^(?!\[\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})/"  "cont"
PARSERS

cat > "$CFG_DIR/fluent-bit.conf" <<FLUENTBIT
[SERVICE]
    Flush         ${FLUSH}
    Daemon        Off
    Log_Level     info
    Parsers_File  ${CFG_DIR}/parsers.conf

# Victim: multiline tail. Single-writer burst+idle pairs make each pair's
# trailing line a lone timeout-flushed single-record chunk.
[INPUT]
    Name              tail
    Tag               in.victim
    Path              ${CFG_DIR}/victim.log
    DB                ${CFG_DIR}/victim.db
    Read_from_Head    true
    Buffer_Max_Size   1024k
    Skip_Long_Lines   On
    multiline.parser  repro
    multiline_flush   1

# Aggressor: plain high-volume tail. Its only job is to keep the shared
# in_emitter at its mem-buf-limit while the victim trickles. Buffer_Max_Size
# MUST exceed NOISE_SIZE (default 64KB); the tail default is 32KB, so without
# this the big noise lines hit Skip_Long_Lines and never pressure the emitter.
[INPUT]
    Name              tail
    Tag               in.noise
    Path              ${CFG_DIR}/noise.log
    DB                ${CFG_DIR}/noise.db
    Read_from_Head    true
    Buffer_Max_Size   1024k
    Skip_Long_Lines   On

[FILTER]
    Name    modify
    Alias   tag-victim
    Match   in.victim
    Add     kind  victim

[FILTER]
    Name    modify
    Alias   tag-noise
    Match   in.noise
    Add     kind  noise

# The stage under test: BOTH inputs re-emitted through ONE in_emitter.
# keep_record=false (trailing \`false\`) means the original is dropped once the
# filter believes the re-emit succeeded -- including when it actually got -2.
[FILTER]
    Name                   rewrite_tag
    Alias                  retag-shared-emitter
    Match                  in.*
    Rule                   \$kind ^(.+)\$ out.\$1 false
    Emitter_Name           shared_emitter
    Emitter_Mem_Buf_Limit  ${EMITTER_LIMIT}

# Victim -> stdout (the host tallies seq=/cls= here). Noise -> discard.
[OUTPUT]
    Name              stdout
    Match             out.victim
    Format            json_lines

[OUTPUT]
    Name    null
    Match   out.noise
FLUENTBIT

# Local writer (POSIX sh). It is intentionally close to the original
# in-container writer; DATA_DIR replaces the container's /data mount point.
cat > "$CFG_DIR/writer.sh" <<'WRITER'
#!/bin/sh
set -eu
V="$DATA_DIR/victim.log"
N="$DATA_DIR/noise.log"
L="$DATA_DIR/ledger.tsv"

noise_line=$(head -c "$NOISE_SIZE" /dev/zero | tr '\0' x)
(
  while true; do
    n=0
    while [ "$n" -lt "$NOISE_BURST" ]; do
      printf '%s\n' "$noise_line" >> "$N"
      n=$((n + 1))
    done
    sleep "$NOISE_GAP"
  done
) &
NOISE_PID=$!

i=1
while [ "$i" -le "$CYCLES" ]; do
  seq=$(printf '%06d' "$i")
  ts="[$(date '+%Y-%m-%d %H:%M:%S')]"
  printf '%s seq=%s cls=follower MSG ping received\n' "$ts" "$seq" >> "$V"
  printf '%s\tfollower\n' "$seq" >> "$L"
  sleep "$RECV_GAP"
  ts="[$(date '+%Y-%m-%d %H:%M:%S')]"
  printf '%s seq=%s cls=timeout MSG ping succeeded\n' "$ts" "$seq" >> "$V"
  printf '%s\ttimeout\n' "$seq" >> "$L"
  sleep "$IDLE"
  i=$((i + 1))
done

kill "$NOISE_PID" 2>/dev/null || true
wait "$NOISE_PID" 2>/dev/null || true
sleep "$DRAIN"
WRITER

# Host-side analyzer: tally victim delivery by flush class.
cat > "$CFG_DIR/tally.py" <<'TALLY'
import json, re, sys
ledger_path, stdout_path = sys.argv[1], sys.argv[2]
pat = re.compile(r"seq=(\d{6}) cls=(follower|timeout)")
emitted = {"follower": set(), "timeout": set()}
with open(ledger_path) as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        seq, cls = line.split("\t")
        emitted[cls].add(seq)
delivered = {"follower": set(), "timeout": set()}
with open(stdout_path) as f:
    for line in f:
        line = line.strip()
        if not line:
            continue
        try:
            rec = json.loads(line)
        except json.JSONDecodeError:
            m = pat.search(line)
            if m:
                delivered[m.group(2)].add(m.group(1))
            continue
        m = pat.search(rec.get("log", ""))
        if m:
            delivered[m.group(2)].add(m.group(1))
lost = {c: sorted(emitted[c] - delivered[c]) for c in emitted}
# Drop the final-cycle line: it can be cut off by process teardown, which is
# a harness artifact, not the emitter bug.
last = "%06d" % max((int(s) for c in emitted for s in emitted[c]), default=0)
for c in lost:
    lost[c] = [s for s in lost[c] if s != last]
for c in ("follower", "timeout"):
    e = len(emitted[c]); l = len(lost[c]); d = e - l
    rate = (d / e * 100) if e else 0.0
    print(f"  {c:9s}: {d}/{e} delivered ({rate:.2f}%)")
    if lost[c]:
        prev = " ".join(lost[c][:20])
        more = "" if len(lost[c]) <= 20 else f" ... (+{len(lost[c]) - 20} more)"
        print(f"             LOST: {prev}{more}")
t_lost = len(lost["timeout"]); f_lost = len(lost["follower"])
print(f"  RESULT: timeout_lost={t_lost} follower_lost={f_lost}")
sys.exit(1 if t_lost else 0)
TALLY

run_once() {
    run_idx="$1"
    fb_pid=""

    # Pre-create files before Fluent Bit starts. On macOS in particular, this
    # avoids a watcher timing difference from the container volume repro.
    : > "$CFG_DIR/victim.log"
    : > "$CFG_DIR/noise.log"
    : > "$CFG_DIR/ledger.tsv"

    # Start local Fluent Bit, tailing the files in the temp directory.
    "$FB_BIN" -c "$CFG_DIR/fluent-bit.conf" > "$CFG_DIR/fb.stdout.${run_idx}" 2>&1 &
    fb_pid=$!
    sleep 3   # let the tail watcher initialize before any write

    # Writer process: floods noise + single-writes victim pairs, then drains.
    DATA_DIR="$CFG_DIR" CYCLES="$CYCLES" IDLE="$IDLE" RECV_GAP="$RECV_GAP" \
        DRAIN="$DRAIN" NOISE_SIZE="$NOISE_SIZE" NOISE_BURST="$NOISE_BURST" \
        NOISE_GAP="$NOISE_GAP" sh "$CFG_DIR/writer.sh"

    # Keep per-run ledger names like the original script.
    mv "$CFG_DIR/ledger.tsv" "$CFG_DIR/ledger.tsv.${run_idx}"

    sleep "$FLUSH"
    kill -TERM "$fb_pid" 2>/dev/null || true
    wait "$fb_pid" 2>/dev/null || true
}

echo "Binary:  $FB_BIN"
echo "Runs:    $RUNS   Cycles/run: $CYCLES   Emitter limit: $EMITTER_LIMIT   Flush: ${FLUSH}s"
echo

runs_with_loss=0
for r in $(seq 1 "$RUNS"); do
    echo "===== run $r/$RUNS ====="
    run_once "$r"
    if python3 "$CFG_DIR/tally.py" "$CFG_DIR/ledger.tsv.${r}" "$CFG_DIR/fb.stdout.${r}"; then
        :
    else
        runs_with_loss=$((runs_with_loss + 1))
    fi
    echo
done

echo "========================================================"
echo "Reproduced timeout-flushed loss in ${runs_with_loss}/${RUNS} run(s)."
[ "$runs_with_loss" -gt 0 ] && exit 1 || exit 0

Closes #12026.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • Bug Fixes
    • Improved handling of rewritten records when the emitter is busy or paused, helping ensure original records are preserved on negative failure cases.
    • Refined paused buffering behavior so records are queued more consistently and only limited when the emitter’s buffered payload exceeds the configured limit.
    • Consolidated shutdown cleanup to better release queued chunks and buffered data.
  • Tests
    • Updated runtime pause/busy expectations and added a new regression test to confirm rewritten records remain correct and all expected output is emitted.

Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
@coderabbitai

coderabbitai Bot commented Jul 2, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a6db1040-891a-47c5-a00d-0d5ae4a740e8

📥 Commits

Reviewing files that changed from the base of the PR and between 2ab49b4 and 0905395.

📒 Files selected for processing (2)
  • plugins/in_emitter/emitter.c
  • tests/runtime/filter_rewrite_tag.c

📝 Walkthrough

Walkthrough

This PR fixes silent record loss when the shared in_emitter is paused. process_record now preserves the original record on any negative emit result. in_emitter.c adds queue byte accounting and revised pause handling, and runtime tests cover the busy-path behavior.

Changes

Emitter Busy Handling and Record Preservation

Layer / File(s) Summary
rewrite_tag error handling on emitter failure
plugins/filter_rewrite_tag/rewrite_tag.c
Broadens the failure check from ret == -1 to ret < 0 and sets *keep = FLB_TRUE on failure so the original record is preserved.
in_emitter pause, queue, and teardown handling
plugins/in_emitter/emitter.c
Adds pending_bytes tracking, enforces mem_buf_limit with queue-byte accounting, adjusts paused-path buffering for ring-buffer and non-ring-buffer cases, and routes chunk cleanup through em_chunk_destroy(ctx, echunk).
Runtime tests for busy emitter record preservation
tests/runtime/filter_rewrite_tag.c
Updates the paused-emitter test expectation and adds busy_emitter_keeps_original to validate full record preservation under the busy path.

Estimated code review effort: 3 (Moderate) | ~25 minutes

Sequence Diagram(s)

sequenceDiagram
    participant RewriteTag as rewrite_tag filter
    participant Emitter as in_emitter
    participant Queue as emitter queue/chunks
    participant Output as output plugin

    RewriteTag->>Emitter: in_emitter_add_record(rewritten record)
    Emitter->>Queue: check mem_buf_limit using pending_bytes
    alt queue over limit
        Emitter-->>RewriteTag: return FLB_EMITTER_BUSY
        RewriteTag-->>RewriteTag: set keep = FLB_TRUE
    else paused, no ring buffer
        Emitter->>Queue: buffer record for later ingestion
        Emitter-->>RewriteTag: return queued
    else available
        Emitter->>Queue: enqueue record
        Emitter-->>RewriteTag: return success
    end
    Queue->>Output: flush buffered records
Loading
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title is concise and accurately points to the rewrite_tag/in_emitter data-loss fix for multiline records.
Linked Issues check ✅ Passed The changes explicitly handle busy emits, preserve the original record on failure, and add coverage for the loss case.
Out of Scope Changes check ✅ Passed The code changes stay focused on emitter back-pressure and rewrite_tag record preservation, with no clear unrelated additions.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch cosmo0920-mitigate-data-loss-on-multiline

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2ab49b4318

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread plugins/in_emitter/emitter.c

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/filter_rewrite_tag/rewrite_tag.c (1)

403-419: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

ret < 0 check misses FLB_FILTER_NOTOUCH failure path from in_emitter_add_record.

in_emitter_add_record can return FLB_FILTER_NOTOUCH (value 2, a positive int) when it fails to allocate i_ref for tracking the source input (flb_malloc failure path). Since FLB_FILTER_NOTOUCH is positive, the updated if (ret < 0) check at Line 417 won't catch this failure — the record will be misclassified as successfully emitted (is_emitted == FLB_TRUE, *keep stays whatever rule->keep_record was), and if keep_record=false, the record is silently dropped despite the actual emission having failed. This defeats the exact "don't drop on emitter failure" guarantee this PR is trying to establish.

🐛 Proposed fix
-    if (ret < 0) {
+    if (ret < 0 || ret == FLB_FILTER_NOTOUCH) {
         *keep = FLB_TRUE;
         return FLB_FALSE;
     }

Alternatively, consider having in_emitter_add_record return a value consistent with its own 0/-1/FLB_EMITTER_BUSY conventions instead of reusing a flb_filter.h constant, to avoid mixing return-code namespaces.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@plugins/filter_rewrite_tag/rewrite_tag.c` around lines 403 - 419, The failure
handling in rewrite_tag.c around ingest_inline/in_emitter_add_record is missing
the FLB_FILTER_NOTOUCH path, so a positive failure return can still be treated
as success. Update the post-call handling in this block to explicitly recognize
the in_emitter_add_record return conventions used by rewrite_tag.c and
in_emitter_add_record, not just ret < 0, and ensure any non-success from record
emission forces the keep/drop-safe path. If possible, align
in_emitter_add_record’s return values with the emitter’s own status codes so
rewrite_tag.c can make a single consistent success/failure check.
🧹 Nitpick comments (2)
tests/runtime/filter_rewrite_tag.c (2)

344-360: 🩺 Stability & Availability | 🔵 Trivial | ⚡ Quick win

Fixed 1500ms sleep with an exact-match assertion may be timing-sensitive under heavy load.

Both tests now assert an exact count (heavy_loop == got) after a single fixed flb_time_msleep(1500), pushing 100,000 records through a mem-buf-limited emitter. Previously (per the AI summary) the assertion tolerated partial delivery; tightening it to an exact match increases sensitivity to whatever cadence buffered/queued chunks are drained on under CI load variance. Consider polling for stabilization of get_output_num() instead of a single fixed sleep, to reduce flakiness risk.

Also applies to: 407-412

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/runtime/filter_rewrite_tag.c` around lines 344 - 360, The exact-count
assertion after a single fixed sleep in the filter rewrite tag tests is
timing-sensitive and can flake under load. Update the test logic around
get_output_num(), flb_time_msleep(), and the heavy_loop check to wait until
output delivery stabilizes or the expected count is reached, rather than relying
on one 1500ms delay before asserting heavy_loop == got. Apply the same
stabilization approach in the other matching test block referenced by the review
so both cases avoid CI-load-dependent failures.

362-415: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick win

Near-duplicate of flb_test_heavy_input_pause_emitter.

flb_test_busy_emitter_keeps_original (Lines 362-415) duplicates most of the setup/push/wait logic from flb_test_heavy_input_pause_emitter (Lines 294-343), differing mainly in the Match filter and final assertion. Consider extracting a shared helper (parameterized by Match pattern and expected count) to reduce duplication and ease future maintenance.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/runtime/filter_rewrite_tag.c` around lines 362 - 415,
`flb_test_busy_emitter_keeps_original` duplicates most of
`flb_test_heavy_input_pause_emitter`, including setup, push loop, sleep, and
output check. Refactor the shared test flow into a helper in
`filter_rewrite_tag.c` that takes the `Match` pattern and expected count as
parameters, then have both test functions call it with their specific values
while keeping only the differing assertion/setup in each wrapper.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@plugins/in_emitter/emitter.c`:
- Around line 166-174: The non-ring-buffer path in emitter.c does not enforce a
hard stop when flb_input_buf_paused(ctx->ins) is true, so the backlog can still
grow via ec->mp_sbuf. Update the emitter chunk handling around the ctx->msgs
branch to either reject new records immediately like the ring-buffer path or add
a separate backlog cap/limit before appending, using the existing
flb_input_buf_paused check and the emitter queue state symbols to keep this path
bounded.

---

Outside diff comments:
In `@plugins/filter_rewrite_tag/rewrite_tag.c`:
- Around line 403-419: The failure handling in rewrite_tag.c around
ingest_inline/in_emitter_add_record is missing the FLB_FILTER_NOTOUCH path, so a
positive failure return can still be treated as success. Update the post-call
handling in this block to explicitly recognize the in_emitter_add_record return
conventions used by rewrite_tag.c and in_emitter_add_record, not just ret < 0,
and ensure any non-success from record emission forces the keep/drop-safe path.
If possible, align in_emitter_add_record’s return values with the emitter’s own
status codes so rewrite_tag.c can make a single consistent success/failure
check.

---

Nitpick comments:
In `@tests/runtime/filter_rewrite_tag.c`:
- Around line 344-360: The exact-count assertion after a single fixed sleep in
the filter rewrite tag tests is timing-sensitive and can flake under load.
Update the test logic around get_output_num(), flb_time_msleep(), and the
heavy_loop check to wait until output delivery stabilizes or the expected count
is reached, rather than relying on one 1500ms delay before asserting heavy_loop
== got. Apply the same stabilization approach in the other matching test block
referenced by the review so both cases avoid CI-load-dependent failures.
- Around line 362-415: `flb_test_busy_emitter_keeps_original` duplicates most of
`flb_test_heavy_input_pause_emitter`, including setup, push loop, sleep, and
output check. Refactor the shared test flow into a helper in
`filter_rewrite_tag.c` that takes the `Match` pattern and expected count as
parameters, then have both test functions call it with their specific values
while keeping only the differing assertion/setup in each wrapper.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dbeabad8-d41a-420d-9f95-9430ee216a51

📥 Commits

Reviewing files that changed from the base of the PR and between aac11fc and 2ab49b4.

📒 Files selected for processing (3)
  • plugins/filter_rewrite_tag/rewrite_tag.c
  • plugins/in_emitter/emitter.c
  • tests/runtime/filter_rewrite_tag.c

Comment thread plugins/in_emitter/emitter.c
cosmo0920 added 3 commits July 2, 2026 14:56
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
@patrick-stephens

Copy link
Copy Markdown
Contributor

@cosmo0920 I think we need to backport this right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Silent record loss in rewrite_tag when the shared in_emitter is paused (FLB_EMITTER_BUSY / -2 not handled)

2 participants