Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
* [BUGFIX] Packaging: Fix RPM and deb packages to install the binary to `/usr/bin`, install the systemd unit to the correct system path (`/usr/lib/systemd/system` for RPM, `/lib/systemd/system` for deb), and mark the sysconfig/default env file as a config file so it is not overwritten on upgrade. #7445
* [BUGFIX] Compactor: Handle not-found and access-denied errors from `Attributes()` in bucket index updater, preventing a stale cached `Get()` from causing the entire cleanup cycle to fail when `meta.json` has been deleted from object storage. #7454
* [BUGFIX] gRPC: Fix panic when `grpc_compression` is set to `snappy` on ingester client or store-gateway client configurations. #7459

## 1.21.0 2026-04-24
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/bucketindex/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo

// Get the meta.json attributes.
attrs, err := w.bkt.Attributes(ctx, metaFile)
if w.bkt.IsObjNotFoundErr(err) {
return nil, ErrBlockMetaNotFound
}
if w.bkt.IsAccessDeniedErr(err) {
return nil, errBlockMetaKeyAccessDeniedErr
}
if err != nil {
return nil, errors.Wrapf(err, "read meta file attributes: %v", metaFile)
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/storage/tsdb/bucketindex/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"os"
"path"
"strings"
"testing"
Expand Down Expand Up @@ -184,6 +185,39 @@ func TestUpdater_UpdateIndex_ShouldNotIncreaseOperationFailureMetricCustomerKey(
`), "thanos_objstore_bucket_operation_failures_total"))
}

func TestUpdater_UpdateIndex_ShouldSkipBlockWithMetaAttributesNotFound(t *testing.T) {
const userID = "user-1"

bkt, _ := testutil.PrepareFilesystemBucket(t)

ctx := context.Background()
logger := log.NewNopLogger()

// Mock some blocks in the storage.
bkt = BucketWithGlobalMarkers(bkt)
block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20)
block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30)

// Simulate a race condition where Get() on meta.json succeeds but Attributes() returns not-found.
// This can happen in object stores like S3 when meta.json is deleted between the Get and Attributes calls.
bkt = &testutil.MockBucketFailure{
Bucket: bkt,
AttributesFailures: map[string]error{
path.Join(userID, block2.ULID.String(), "meta.json"): os.ErrNotExist,
},
}

w := NewUpdater(bkt, userID, nil, logger)
idx, partials, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqual(t, idx, bkt, userID,
[]tsdb.BlockMeta{block1},
[]*metadata.DeletionMark{})

assert.Len(t, partials, 1)
assert.True(t, errors.Is(partials[block2.ULID], ErrBlockMetaNotFound))
}

func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) {
const userID = "user-1"

Expand Down
18 changes: 13 additions & 5 deletions pkg/util/testutil/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ func PrepareFilesystemBucket(t testing.TB) (objstore.InstrumentedBucket, string)
type MockBucketFailure struct {
objstore.Bucket

DeleteFailures []string
GetFailures map[string]error
UploadFailures map[string]error
DeleteFailures []string
GetFailures map[string]error
UploadFailures map[string]error
AttributesFailures map[string]error

UploadCalls atomic.Int32
GetCalls atomic.Int32
Expand Down Expand Up @@ -92,17 +93,24 @@ func (m *MockBucketFailure) Upload(ctx context.Context, name string, r io.Reader
return m.Bucket.Upload(ctx, name, r, opts...)
}

func (m *MockBucketFailure) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
if e, ok := m.AttributesFailures[name]; ok {
return objstore.ObjectAttributes{}, e
}
return m.Bucket.Attributes(ctx, name)
}

func (m *MockBucketFailure) WithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.Bucket {
if ibkt, ok := m.Bucket.(objstore.InstrumentedBucket); ok {
return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures}
return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures, AttributesFailures: m.AttributesFailures}
}

return m
}

func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
if ibkt, ok := m.Bucket.(objstore.InstrumentedBucket); ok {
return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures}
return &MockBucketFailure{Bucket: ibkt.WithExpectedErrs(expectedFunc), DeleteFailures: m.DeleteFailures, GetFailures: m.GetFailures, AttributesFailures: m.AttributesFailures}
}

return m
Expand Down
Loading