Skip to content

fix(parquet/metadata): Fix sync.Pool memory leak and capacity destruction in GetColumnBloomFilter#864

Open
Patzifist wants to merge 1 commit into
apache:mainfrom
Patzifist:fix/863-bloom-filter-pool-pollution
Open

fix(parquet/metadata): Fix sync.Pool memory leak and capacity destruction in GetColumnBloomFilter#864
Patzifist wants to merge 1 commit into
apache:mainfrom
Patzifist:fix/863-bloom-filter-pool-pollution

Conversation

@Patzifist

Copy link
Copy Markdown

… GetColumnBloomFilter

Rationale for this change

Under high concurrency (65 parallel workers), GetColumnBloomFilter completely degrades sync.Pool. Calling ResizeNoShrink(0) drops the internal buffer capacity to 0, forcing a fresh heap allocation on every single iteration and churning 13 GB of garbage in 1.4 seconds.

What changes are included in this PR?

Fixes #863

  1. Fixed buffer recycling by using data.Reset(data.Buf()[:cap(data.Buf())]) instead of ResizeNoShrink(0) to preserve backing slice capacity.
  2. Isolated heavy Bloom filter allocations from small metadata headers to eliminate pool pollution.

Are these changes tested?

Yes, added a concurrent stress test suite in bloom_filter_leak_test.go

  • Original code (Bug): Allocated: 13024.81 MB (Pool failed completely).
  • Fixed code (sync.Pool): Allocated: 66.25 MB (100% memory reuse verified)

Are there any user-facing changes?

No. This is strictly an internal performance and memory footprint optimization.

@Patzifist Patzifist requested a review from zeroshade as a code owner June 25, 2026 12:58

@zeroshade zeroshade left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran some testing using your provided test harness configuration and it looks like your originally measured 13GB memory usage is an artifact of your test and not actually an issue in the code.

Your original test contained a make([]byte, 2048) inside the loop. Running that test on main produced ~2 MB/op, with a cumulative alloc of 13,016. The 13GB you measured is 65 * 100 * 2 MB, produced by the original test and not by this library.

If I run the test from this PR against main, I get the same reduction you did because the test in this PR hoists the slice allocation outside of the loop.

Another key point is that if BloomFilterOffset is 0, we trigger an early return and don't even go into the bloom-filter read/pool path at all (because we just return the raw bloom filter bytes as they existed). Your test uses BloomFilterOffset = 0 which means you're never actually exercising the pool path anyways in this test, proving that the original 13GB allocation was just the make which existed in the test and the reduction was solely due to hoisting it out of the loop in the test. Essentially, the allocation measurement test you're adding doesn't actually exercise any of the new code this PR adds.

If I create a proper test that uses a non-zero BloomFilterOffset against main, using a shared reader and sync.Pool recycling then I get about ~11 KB/op with a cumulative allocation of 72 MB. Even in the worst case scenario, removing the explicit returning to the pool and relying solely on the GC to recycle things, I still max out at only ~3GB, far from the 13GB you measured.

Can you try reproducing the memory allocation issue using:

  • A non-zero BloomFilterOffset so that we actually use the pool instead of just returning the raw bytes deserialized by thrift
  • Use the same test against both main and against your changes.

Given the analysis I've done and my own tests, I wasn't able to reproduce the original issue you reported in #863

That's not to say there isn't any issue here: because we rely on the GC finalizers to return the buffers, they don't get promptly returned under a burst and we end up over-allocating a bit with some churn (between 1.5 - 4 GB of churn vs ~75 MB if we promptly return the buffers to the pool). So there's definitely something we can fix here, but it's not a memory leak and capacity destruction as you originally thought

@@ -0,0 +1,162 @@
package metadata

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is missing the apache license header that is required.

Hasher() Hasher
CheckHash(hash uint64) bool
Size() int64
Close()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a breaking change that we shouldn't introduce and shouldn't be needed.

Comment on lines +59 to +62
var (
headerPool = make(chan []byte, 512)
filterPool = make(chan *memory.Buffer, 512)
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're replacing the per-reader, allocator-aware sync.Pool with package level globals. You're bypassing the user-configured allocator (which means that if users are using the mallocator i.e. CGO/non-go memory, unreleased buffers will leak native memory rather than Go memory that can be reclaimed).

Comment on lines +300 to +305
func (b *blockSplitBloomFilter) Close() {
if b.cancelCleanup != nil {
b.cancelCleanup()
b.cancelCleanup = nil
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't actually called anywhere in the code except for in your new test, so this PR doesn't actually fix anything

@zeroshade

Copy link
Copy Markdown
Member

Ultimately, there is a measurable thing to fix here regarding the latency and churn of memory and how promptly we return the buffer to the pool. But we should improve that without breaking the public BloomFilter api, bypassing the reader's allocator, or introducing new process-global pools instead of the per-reader pool.

For reference, here's the test file I used:

const (
	reproGoroutines = 65
	reproIters      = 100
	reproOps        = reproGoroutines * reproIters
	reproBitsetSize = 1 * 1024 * 1024
	reproScratch    = 2 * 1024 * 1024
	reproReadSize   = int32(4096)
)

func reproHeaderBytes(t testing.TB) []byte {
	t.Helper()
	hb, err := thrift.NewThriftSerializer().Write(context.Background(), &format.BloomFilterHeader{
		NumBytes:    int32(reproBitsetSize),
		Algorithm:   &defaultAlgorithm,
		Hash:        &defaultHashStrategy,
		Compression: &defaultCompression,
	})
	if err != nil {
		t.Fatalf("serialize bloom filter header: %v", err)
	}
	return hb
}

// reproFileData lays out [8 byte prefix][thrift header][bitset] so a non-zero
// BloomFilterOffset points at the header.
func reproFileData(t testing.TB) []byte {
	t.Helper()
	fd := append(make([]byte, 8), reproHeaderBytes(t)...)
	return append(fd, make([]byte, reproBitsetSize)...)
}

func reproMeta(t testing.TB, bloomFilterOffset int64) *RowGroupMetaData {
	t.Helper()
	length := reproReadSize
	cmd := format.ColumnMetaData{
		Type:              format.Type_BYTE_ARRAY,
		Encodings:         []format.Encoding{format.Encoding_PLAIN},
		PathInSchema:      []string{"test_col"},
		Codec:             format.CompressionCodec_UNCOMPRESSED,
		BloomFilterOffset: &bloomFilterOffset,
		BloomFilterLength: &length,
	}
	rg := format.RowGroup{Columns: []*format.ColumnChunk{{MetaData: &cmd}}, TotalByteSize: 100, NumRows: 100}
	node, err := schema.NewPrimitiveNode("test_col", parquet.Repetition(format.FieldRepetitionType_REQUIRED),
		parquet.Type(format.Type_BYTE_ARRAY), -1, -1)
	if err != nil {
		t.Fatalf("create primitive node: %v", err)
	}
	root, err := schema.NewGroupNode("schema", parquet.Repetition(format.FieldRepetitionType_REPEATED),
		schema.FieldList{node}, -1)
	if err != nil {
		t.Fatalf("create group node: %v", err)
	}
	return NewRowGroupMetaData(&rg, schema.NewSchema(root), nil, nil)
}

func reproReader(fileData []byte, meta *RowGroupMetaData, pool *sync.Pool) *RowGroupBloomFilterReader {
	return &RowGroupBloomFilterReader{
		input:          bytes.NewReader(fileData),
		sourceFileSize: int64(len(fileData)),
		rgMeta:         meta,
		bufferPool:     pool,
	}
}

func reproPool() *sync.Pool {
	return &sync.Pool{New: func() any {
		buf := memory.NewResizableBuffer(memory.NewGoAllocator())
		runtime.SetFinalizer(buf, func(o *memory.Buffer) { o.Release() })
		return buf
	}}
}

// reproRecycle returns the bloom filter's buffer to the pool immediately, doing
// exactly what the reader's runtime.AddCleanup callback does (ResizeNoShrink(0)
// then Put), but synchronously. The registered cleanup is cancelled first so the
// same buffer is not returned twice.
func reproRecycle(bf BloomFilter, pool *sync.Pool) {
	b, ok := bf.(*blockSplitBloomFilter)
	if !ok || b.data == nil {
		return
	}
	if b.cancelCleanup != nil {
		b.cancelCleanup()
	}
	b.data.ResizeNoShrink(0)
	pool.Put(b.data)
}

func reproMeasure(t *testing.T, name string, run func() (read, nilRet int64)) (int64, float64) {
	t.Helper()
	runtime.GC()
	var before runtime.MemStats
	runtime.ReadMemStats(&before)

	start := time.Now()
	read, nilRet := run()
	elapsed := time.Since(start)

	var after runtime.MemStats
	runtime.ReadMemStats(&after)
	churnMB := float64(after.TotalAlloc-before.TotalAlloc) / 1024 / 1024
	t.Logf("%s\n    ops=%d filters-read=%d nil-returns=%d cumulative-alloc(churn)=%.1f MB mallocs=%d elapsed=%v",
		name, reproOps, read, nilRet, churnMB, after.Mallocs-before.Mallocs, elapsed)
	return read, churnMB
}

func reproRunParallel(work func() BloomFilter) (read, nilRet int64) {
	var wg sync.WaitGroup
	gate := make(chan struct{})
	for range reproGoroutines {
		wg.Add(1)
		go func() {
			defer wg.Done()
			<-gate
			for range reproIters {
				if work() != nil {
					atomic.AddInt64(&read, 1)
				} else {
					atomic.AddInt64(&nilRet, 1)
				}
			}
		}()
	}
	close(gate)
	wg.Wait()
	return read, nilRet
}

// Scenario A: the original reproducer. BloomFilterOffset=0 and a fresh 2 MiB file
// buffer allocated per iteration. Allocates ~13 GB while never reading a filter.
func TestBloomFilterPoolRepro_A_OriginalHarness(t *testing.T) {
	header := reproHeaderBytes(t)
	meta := reproMeta(t, 0)

	read, _ := reproMeasure(t, "A original harness: offset=0, fresh 2MiB file per iteration", func() (int64, int64) {
		var read, nilRet int64
		var wg sync.WaitGroup
		gate := make(chan struct{})
		for range reproGoroutines {
			wg.Add(1)
			go func() {
				defer wg.Done()
				pool := reproPool()
				<-gate
				for range reproIters {
					scratch := make([]byte, reproScratch)
					copy(scratch, header)
					bf, err := reproReader(scratch, meta, pool).GetColumnBloomFilter(0)
					if err != nil || bf == nil {
						atomic.AddInt64(&nilRet, 1)
						continue
					}
					atomic.AddInt64(&read, 1)
					reproRecycle(bf, pool)
				}
			}()
		}
		close(gate)
		wg.Wait()
		return read, nilRet
	})

	if read != 0 {
		t.Fatalf("expected 0 filters read with offset=0, got %d", read)
	}
}

// Scenario B: corrected harness. Non-zero offset (read path runs), shared reader,
// buffer returned to the pool promptly. The 1 MiB bitset buffer is reused.
func TestBloomFilterPoolRepro_B_SharedReaderPromptReturn(t *testing.T) {
	fileData := reproFileData(t)
	meta := reproMeta(t, 8)
	pool := reproPool()
	rdr := reproReader(fileData, meta, pool)

	bf, err := rdr.GetColumnBloomFilter(0)
	if err != nil || bf == nil {
		t.Fatalf("sanity read failed: bf=%v err=%v", bf, err)
	}
	if bf.Size() != int64(reproBitsetSize) {
		t.Fatalf("unexpected filter size: got %d want %d", bf.Size(), reproBitsetSize)
	}
	reproRecycle(bf, pool)

	read, churnMB := reproMeasure(t, "B shared reader + prompt return: offset>0", func() (int64, int64) {
		return reproRunParallel(func() BloomFilter {
			bf, err := rdr.GetColumnBloomFilter(0)
			if err != nil || bf == nil {
				return nil
			}
			reproRecycle(bf, pool)
			return bf
		})
	})

	if read != reproOps {
		t.Fatalf("expected %d filters read, got %d", reproOps, read)
	}
	if churnMB > 500 {
		t.Fatalf("pool not reusing buffers: churn %.1f MB (1 MiB x %d ops would be ~%d MB without reuse)",
			churnMB, reproOps, reproOps)
	}
}

// Scenario C: corrected harness using the reader's real return path
// (runtime.AddCleanup). GC-timing dependent; reported only.
func TestBloomFilterPoolRepro_C_SharedReaderCleanupOnly(t *testing.T) {
	fileData := reproFileData(t)
	meta := reproMeta(t, 8)
	pool := reproPool()
	rdr := reproReader(fileData, meta, pool)

	reproMeasure(t, "C shared reader + cleanup-only return: offset>0 (reader's real path)", func() (int64, int64) {
		return reproRunParallel(func() BloomFilter {
			bf, err := rdr.GetColumnBloomFilter(0)
			if err != nil {
				return nil
			}
			return bf
		})
	})
}

// Scenario D1: a new reader is built every iteration (as in the original harness)
// but the file bytes are shared and the offset is non-zero. A shared pool still
// recycles, so recreating the reader is not what defeated reuse.
func TestBloomFilterPoolRepro_D1_FreshReaderSharedPool(t *testing.T) {
	fileData := reproFileData(t)
	meta := reproMeta(t, 8)
	pool := reproPool()

	read, churnMB := reproMeasure(t, "D1 fresh reader + shared pool: offset>0, file shared, prompt return", func() (int64, int64) {
		return reproRunParallel(func() BloomFilter {
			bf, err := reproReader(fileData, meta, pool).GetColumnBloomFilter(0)
			if err != nil || bf == nil {
				return nil
			}
			reproRecycle(bf, pool)
			return bf
		})
	})

	if read != reproOps {
		t.Fatalf("expected %d filters read, got %d", reproOps, read)
	}
	if churnMB > 500 {
		t.Fatalf("fresh reader defeated shared pool reuse: churn %.1f MB", churnMB)
	}
}

// Scenario D2: a new pool is built per read (i.e. a new file.Reader per single
// read). This is the only configuration where per-reader pooling cannot help.
func TestBloomFilterPoolRepro_D2_FreshReaderFreshPool(t *testing.T) {
	fileData := reproFileData(t)
	meta := reproMeta(t, 8)

	reproMeasure(t, "D2 fresh reader + fresh pool per read: offset>0, file shared", func() (int64, int64) {
		return reproRunParallel(func() BloomFilter {
			bf, err := reproReader(fileData, meta, reproPool()).GetColumnBloomFilter(0)
			if err != nil {
				return nil
			}
			return bf
		})
	})
}

// BenchmarkBloomFilterPoolReuse reports allocs/op for the read path with the
// buffer returned to the pool. A reused 1 MiB bitset does not show up per op.
func BenchmarkBloomFilterPoolReuse(b *testing.B) {
	fileData := reproFileData(b)
	meta := reproMeta(b, 8)
	pool := reproPool()
	rdr := reproReader(fileData, meta, pool)

	b.ReportAllocs()
	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			bf, err := rdr.GetColumnBloomFilter(0)
			if err != nil || bf == nil {
				continue
			}
			reproRecycle(bf, pool)
		}
	})
}

Running it locally myself with go test -bench I get the following results:

Test Case filters-read churn
A - original test (offset = 0) 0 13002.7 MB
B - shared reader + prompt return 6500 77.1 MB
C - cleanup-only (real path for main) 6500 3419.5 MB
D1 - fresh reader + shared pool 6500 39.0 MB
D2 - fresh reader + fresh pool 6500 6617.0 MB

Let's construct a good solution for getting more prompt returns to the pool without breaking the API or defeating the pool. Thoughts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Severe sync.Pool degradation and infinite heap allocations in GetColumnBloomFilter due to pool pollution and ResizeNoShrink(0)

2 participants