diff --git a/CHANGELOG.md b/CHANGELOG.md index 95d688766ce..59bab77a187 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ * [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077 * [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152 * [FEATURE] Ingester: Add experimental active series queried metric. #7173 +* [FEATURE] StoreGateway: Add a flag `-blocks-storage.bucket-store.honor-projection-hints`. If enabled, Store Gateway in Parquet mode will honor projection hints and only materialize requested labels. #7206 * [FEATURE] Update prometheus Alertmanager version to v0.31.1 and add new integration to IncidentIO and Mattermost. #7092 #7267 * [FEATURE] Tenant Federation: Add experimental support for partial responses using the `-tenant-federation.allow-partial-data` flag. When enabled, failures from individual tenants during a federated query are treated as warnings, allowing results from successful tenants to be returned. #7232 * [ENHANCEMENT] Distributor: Add `cortex_distributor_push_requests_total` metric to track the number of push requests by type. #7239 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 5ceb537848d..9436273c23f 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1805,6 +1805,12 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # [Experimental] If enabled, Store Gateway will honor projection hints and + # only materialize requested labels. It is only effect when + # `-blocks-storage.bucket-store.bucket-store-type` is parquet. + # CLI flag: -blocks-storage.bucket-store.honor-projection-hints + [honor_projection_hints: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 22e21cec67f..5db64d46f39 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1880,6 +1880,12 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # [Experimental] If enabled, Store Gateway will honor projection hints and + # only materialize requested labels. It is only effect when + # `-blocks-storage.bucket-store.bucket-store-type` is parquet. + # CLI flag: -blocks-storage.bucket-store.honor-projection-hints + [honor_projection_hints: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 8c2adf98027..3121f2e79a1 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2503,6 +2503,12 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl [parquet_shard_cache_ttl: | default = 24h] + # [Experimental] If enabled, Store Gateway will honor projection hints and + # only materialize requested labels. It is only effect when + # `-blocks-storage.bucket-store.bucket-store-type` is parquet. + # CLI flag: -blocks-storage.bucket-store.honor-projection-hints + [honor_projection_hints: | default = false] + tsdb: # Local directory to store TSDBs in the ingesters. # CLI flag: -blocks-storage.tsdb.dir diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index be3ee78ce59..181cfa18dff 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -91,6 +91,8 @@ Currently experimental features are: - Store Gateway Zone Stable Shuffle Sharding - `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag - `zone_stable_shuffle_sharding` (boolean) field in config file +- Store Gateway HonorProjectionHints in Parquet Mode + - `-blocks-storage.bucket-store.honor-projection-hints` CLI flag - Basic Lifecycler (Storegateway, Alertmanager, Ruler) Final Sleep on shutdown, which tells the pod wait before shutdown, allowing a delay to propagate ring changes. - `-ruler.ring.final-sleep` (duration) CLI flag - `store-gateway.sharding-ring.final-sleep` (duration) CLI flag diff --git a/integration/parquet_store_gateway_test.go b/integration/parquet_store_gateway_test.go new file mode 100644 index 00000000000..8e65a2cb011 --- /dev/null +++ b/integration/parquet_store_gateway_test.go @@ -0,0 +1,214 @@ +//go:build integration_querier + +package integration + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "slices" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util/log" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestParquetBucketStore_ProjectionHint(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + minio := e2edb.NewMinio(9000, bucketName) + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached)) + + // Define configuration flags. + flags := BlocksStorageFlags() + flags = mergeFlags(flags, map[string]string{ + // Enable Thanos engine and projection optimization. + "-querier.thanos-engine": "true", + "-querier.optimizers": "projection", + + // enable honor-projection-hints querier and store gateway + "-querier.honor-projection-hints": "true", + "-blocks-storage.bucket-store.honor-projection-hints": "true", + // enable Store Gateway Parquet mode + "-blocks-storage.bucket-store.bucket-store-type": "parquet", + + // Set query-ingesters-within to 1h so queries older than 1h don't hit ingesters + "-limits.query-ingesters-within": "1h", + + // Configure Parquet Converter + "-parquet-converter.enabled": "true", + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-compactor.block-ranges": "1ms,12h", + // Enable cache + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.sync-interval": "1s", + + // Compactor + "-compactor.cleanup-interval": "1s", // to update bucket index quickly + }) + + // Store Gateway + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(storeGateway)) + + // Parquet Converter + parquetConverter := e2ecortex.NewParquetConverter("parquet-converter", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(parquetConverter)) + + // Querier + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier)) + + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + compactor := e2ecortex.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(compactor)) + + // Create block + now := time.Now() + // Time range: [Now - 24h] to [Now - 20h] + start := now.Add(-24 * time.Hour) + end := now.Add(-20 * time.Hour) + + ctx := context.Background() + + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + methods := []string{"GET", "POST", "PUT", "DELETE"} + + numSeries := 10 + numSamples := 100 + + lbls := make([]labels.Labels, 0, numSeries) + for i := 0; i < numSeries; i++ { + lbls = append(lbls, labels.FromStrings( + labels.MetricName, "http_requests_total", + "job", "api-server", + "instance", fmt.Sprintf("instance-%d", i), + "status_code", statusCodes[i%len(statusCodes)], + "method", methods[i%len(methods)], + "path", fmt.Sprintf("/api/v1/endpoint%d", i%3), + "cluster", "test-cluster", + )) + } + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + + storage, err := e2ecortex.NewS3ClientForMinio(minio, bucketName) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + // Upload TSDB Block + require.NoError(t, block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)) + + // Wait until parquet converter convert block + require.NoError(t, parquetConverter.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_parquet_converter_blocks_converted_total"}, e2e.WaitMissingMetrics)) + + // Create client + c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} { + labelSets, err := c.Series([]string{`{job="api-server"}`}, start, end) + if err != nil { + t.Logf("Series query failed: %v", err) + return false + } + return len(labelSets) > 0 + }) + + testCases := []struct { + name string + query string + expectedLabels []string // query result should contain these labels + }{ + { + name: "vector selector query", + query: `http_requests_total`, + expectedLabels: []string{ + "__name__", "job", "instance", "status_code", "method", "path", "cluster", + }, + }, + { + name: "simple_sum_by_job", + query: `sum by (job) (http_requests_total)`, + expectedLabels: []string{"job"}, + }, + { + name: "rate_with_aggregation", + query: `sum by (method) (rate(http_requests_total[5m]))`, + expectedLabels: []string{"method"}, + }, + { + name: "multiple_grouping_labels", + query: `sum by (job, status_code) (http_requests_total)`, + expectedLabels: []string{"job", "status_code"}, + }, + { + name: "aggregation without query", + query: `sum without (instance, method) (http_requests_total)`, + expectedLabels: []string{"job", "status_code", "path", "cluster"}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Testing: %s", tc.query) + + // Execute instant query + result, err := c.Query(tc.query, end) + require.NoError(t, err) + require.NotNil(t, result) + + // Verify we got results + vector, ok := result.(model.Vector) + require.True(t, ok, "result should be a vector") + require.NotEmpty(t, vector, "query should return results") + + for _, sample := range vector { + actualLabels := make(map[string]struct{}) + for label := range sample.Metric { + actualLabels[string(label)] = struct{}{} + } + + // Check that all expected labels are present + for _, expectedLabel := range tc.expectedLabels { + _, ok := actualLabels[expectedLabel] + require.True(t, ok, + "series should have %s label", expectedLabel) + } + + // Check that no unexpected labels are present + for lbl := range actualLabels { + if !slices.Contains(tc.expectedLabels, lbl) { + require.Fail(t, "series should not have unexpected label: %s", lbl) + } + } + } + }) + } +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index fa6f7b1c938..165e53bc305 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -334,7 +334,8 @@ type BucketStoreConfig struct { // Token bucket configs TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"` // Parquet shard cache config - ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + ParquetShardCache parquetutil.CacheConfig `yaml:",inline"` + HonorProjectionHints bool `yaml:"honor_projection_hints"` } type TokenBucketBytesLimiterConfig struct { @@ -397,6 +398,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.TokenBucketBytesLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-bytes-limiter.touched-chunks-token-factor", 1, "Multiplication factor used for touched chunks token") f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.") cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f) + f.BoolVar(&cfg.HonorProjectionHints, "blocks-storage.bucket-store.honor-projection-hints", false, "[Experimental] If enabled, Store Gateway will honor projection hints and only materialize requested labels. It is only effect when `-blocks-storage.bucket-store.bucket-store-type` is parquet.") } // Validate the config. diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index e6bded1e2f6..8eec27a278e 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -3,6 +3,7 @@ package storegateway import ( "context" "fmt" + "slices" "strings" "github.com/go-kit/log" @@ -38,8 +39,9 @@ type parquetBucketStore struct { chunksDecoder *schema.PrometheusParquetChunksDecoder - matcherCache storecache.MatchersCache - parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] + matcherCache storecache.MatchersCache + parquetShardCache parquetutil.CacheInterface[parquet_storage.ParquetShard] + honorProjectionHints bool } func (p *parquetBucketStore) Close() error { @@ -107,6 +109,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep return fmt.Errorf("failed to find parquet shards: %w", err) } + storageHints := p.buildSelectHints(req.QueryHints, shards, req.MinTime, req.MaxTime) seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards)) errGroup, ctx := errgroup.WithContext(srv.Context()) errGroup.SetLimit(p.concurrency) @@ -116,7 +119,7 @@ func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storep Id: shard.name, }) errGroup.Go(func() error { - ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers) + ss, err := shard.Query(ctx, storageHints, req.SkipChunks, matchers) seriesSet[i] = ss return err }) @@ -279,3 +282,42 @@ func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.Label Hints: anyHints, }, nil } + +func (p *parquetBucketStore) buildSelectHints(queryHints *storepb.QueryHints, shards []*parquetBlock, minT, maxT int64) *prom_storage.SelectHints { + storageHints := &prom_storage.SelectHints{ + Start: minT, + End: maxT, + } + + if p.honorProjectionHints && queryHints != nil { + storageHints.ProjectionInclude = queryHints.ProjectionInclude + storageHints.ProjectionLabels = queryHints.ProjectionLabels + + if storageHints.ProjectionInclude { + // Reset projection hints if not all parquet shard have the hash column. + if !allParquetBlocksHaveHashColumn(shards) { + storageHints.ProjectionInclude = false + storageHints.ProjectionLabels = nil + } + } else { + // Reset hints for non-include projections to force a full scan, matching querier behavior. + storageHints.ProjectionLabels = nil + } + + if storageHints.ProjectionInclude && !slices.Contains(storageHints.ProjectionLabels, schema.SeriesHashColumn) { + storageHints.ProjectionLabels = append(storageHints.ProjectionLabels, schema.SeriesHashColumn) + } + } + + return storageHints +} + +func allParquetBlocksHaveHashColumn(blocks []*parquetBlock) bool { + // TODO(Sungjin1212): Change it to read marker version + for _, b := range blocks { + if !b.hasHashColumn() { + return false + } + } + return true +} diff --git a/pkg/storegateway/parquet_bucket_store_bench_test.go b/pkg/storegateway/parquet_bucket_store_bench_test.go index 37b389104a4..c4c57ed04c3 100644 --- a/pkg/storegateway/parquet_bucket_store_bench_test.go +++ b/pkg/storegateway/parquet_bucket_store_bench_test.go @@ -40,6 +40,173 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) +// BenchmarkParquetBucketStore_ProjectionHints compares query performance +// with and without projectionHints enabled. +func BenchmarkParquetBucketStore_ProjectionHints(b *testing.B) { + seriesNum := []int{100, 1000, 10000} + samplePerSeries := 100 + + // projectionLabels is the subset of labels requested via projection hints. + // The test data contains: __name__, idx, job, instance + projectionLabels := []string{"job"} + + for _, numSeries := range seriesNum { + b.Run(fmt.Sprintf("series_%d", numSeries), func(b *testing.B) { + ctx := context.Background() + tmpDir := b.TempDir() + storageDir := filepath.Join(tmpDir, "storage") + dataDir := filepath.Join(tmpDir, "data") + userID := "user-1" + + storageCfg := cortex_tsdb.BlocksStorageConfig{ + UsersScanner: users.UsersScannerConfig{ + Strategy: users.UserScanStrategyList, + UpdateInterval: time.Second, + }, + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: storageDir, + }, + }, + BucketStore: cortex_tsdb.BucketStoreConfig{ + SyncDir: filepath.Join(tmpDir, "sync"), + BucketStoreType: "parquet", + BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery), + }, + } + + bucketClient, err := bucket.NewClient(ctx, storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(b, err) + + blockID := prepareParquetBlock(b, ctx, storageCfg, bucketClient, dataDir, userID, numSeries, samplePerSeries) + + startGRPCServer := func(honorProjectionHints bool) (storepb.StoreClient, func()) { + cfg := storageCfg + cfg.BucketStore.HonorProjectionHints = honorProjectionHints + + reg := prometheus.NewPedanticRegistry() + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucketClient), defaultLimitsOverrides(nil), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(b, err) + + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + + gRPCServer := grpc.NewServer( + grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor), + ) + storepb.RegisterStoreServer(gRPCServer, stores) + + go func() { + if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { + b.Error(err) + } + }() + + conn, err := grpc.NewClient(listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)), + ) + require.NoError(b, err) + + return storepb.NewStoreClient(conn), func() { + _ = conn.Close() + gRPCServer.Stop() + } + } + + // Benchmark without projectionHints (full scan) + b.Run("without_projection_hints", func(b *testing.B) { + client, stop := startGRPCServer(false) + defer stop() + + b.ReportAllocs() + for b.Loop() { + benchmarkProjectionHints(b, client, userID, blockID, numSeries, nil) + } + }) + + // Benchmark with projectionHints enabled and projection labels specified + b.Run("with_projection_hints", func(b *testing.B) { + client, stop := startGRPCServer(true) + defer stop() + + b.ReportAllocs() + for b.Loop() { + benchmarkProjectionHints(b, client, userID, blockID, numSeries, projectionLabels) + } + }) + }) + } +} + +func benchmarkProjectionHints(b *testing.B, client storepb.StoreClient, userID, blockID string, expectedSeries int, projectionLabels []string) { + b.Helper() + + ctx := grpcMetadata.NewOutgoingContext(context.Background(), grpcMetadata.Pairs(cortex_tsdb.TenantIDExternalLabel, userID)) + ctx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, userID)) + require.NoError(b, err) + + hintMatchers := []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: block.BlockIDLabel, + Value: blockID, + }, + } + + dataMatchers := []storepb.LabelMatcher{ + { + Type: storepb.LabelMatcher_RE, + Name: "__name__", + Value: ".+", + }, + } + + hints := &hintspb.SeriesRequestHints{ + BlockMatchers: hintMatchers, + } + hintsAny, err := types.MarshalAny(hints) + require.NoError(b, err) + + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: dataMatchers, + ResponseBatchSize: 1000, + Hints: hintsAny, + } + + if len(projectionLabels) > 0 { + req.QueryHints = &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: projectionLabels, + } + } + + stream, err := client.Series(ctx, req) + require.NoError(b, err) + + got := 0 + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + + if series := resp.GetSeries(); series != nil { + got++ + } else if batch := resp.GetBatch(); batch != nil { + got += len(batch.Series) + } + } + + if got != expectedSeries { + b.Fatalf("expected %d series, got %d", expectedSeries, got) + } +} + func BenchmarkParquetBucketStore_SeriesBatch(b *testing.B) { seriesNum := []int{100, 1000, 10000, 100000} samplePerSeries := 100 diff --git a/pkg/storegateway/parquet_bucket_store_test.go b/pkg/storegateway/parquet_bucket_store_test.go new file mode 100644 index 00000000000..d7ccd654a1c --- /dev/null +++ b/pkg/storegateway/parquet_bucket_store_test.go @@ -0,0 +1,255 @@ +package storegateway + +import ( + "bytes" + "testing" + + "github.com/parquet-go/parquet-go" + "github.com/prometheus-community/parquet-common/schema" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type mockParquetFileView struct { + parquet_storage.ParquetFileView + file *parquet.File +} + +func (m *mockParquetFileView) Schema() *parquet.Schema { + return m.file.Schema() +} + +type mockShard struct { + parquet_storage.ParquetShard + fileView parquet_storage.ParquetFileView +} + +func (m *mockShard) LabelsFile() parquet_storage.ParquetFileView { + return m.fileView +} + +func createTestParquetBlock(t *testing.T, hasHash bool) *parquetBlock { + t.Helper() + + var buf bytes.Buffer + var err error + + if hasHash { + // v2 and higher blocks + type RowWithHash struct { + SeriesHash string `parquet:"s_series_hash"` + Label string `parquet:"l_job"` + } + + w := parquet.NewGenericWriter[RowWithHash](&buf) + _, err = w.Write([]RowWithHash{{SeriesHash: "hash1", Label: "node-1"}}) + require.NoError(t, err) + require.NoError(t, w.Close()) + } else { + // v1 block + type RowWithoutHash struct { + Label string `parquet:"l_job"` + } + + w := parquet.NewGenericWriter[RowWithoutHash](&buf) + _, err = w.Write([]RowWithoutHash{{Label: "node-1"}}) + require.NoError(t, err) + require.NoError(t, w.Close()) + } + + readBuf := bytes.NewReader(buf.Bytes()) + f, err := parquet.OpenFile(readBuf, readBuf.Size()) + require.NoError(t, err) + + return &parquetBlock{ + shard: &mockShard{ + fileView: &mockParquetFileView{file: f}, + }, + } +} + +func Test_AllParquetBlocksHaveHashColumn(t *testing.T) { + tests := []struct { + description string + setup func() []*parquetBlock + expected bool + }{ + { + description: "returns true when all blocks have hash column", + setup: func() []*parquetBlock { + return []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, true), + createTestParquetBlock(t, true), + } + }, + expected: true, + }, + { + description: "returns false when mixed block versions exist", + setup: func() []*parquetBlock { + return []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, false), + createTestParquetBlock(t, true), + } + }, + expected: false, + }, + { + description: "returns false when no blocks have hash column", + setup: func() []*parquetBlock { + return []*parquetBlock{ + createTestParquetBlock(t, false), + createTestParquetBlock(t, false), + } + }, + expected: false, + }, + { + description: "returns true for empty block list", + setup: func() []*parquetBlock { + return []*parquetBlock{} + }, + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + blocks := tc.setup() + actual := allParquetBlocksHaveHashColumn(blocks) + require.Equal(t, tc.expected, actual) + }) + } +} + +func TestParquetBucketStore_buildSelectHints(t *testing.T) { + const ( + minT = 1000 + maxT = 2000 + ) + + tests := []struct { + description string + honorProjectionHints bool + queryHints *storepb.QueryHints + shards []*parquetBlock + expectedHints *prom_storage.SelectHints + }{ + { + description: "honorProjectionHints=false, should ignore query hints", + honorProjectionHints: false, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job"}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: false, + ProjectionLabels: nil, + }, + }, + { + description: "honorProjectionHints=true, V2 blocks, should enable projection and append hash column", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job"}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: true, + ProjectionLabels: []string{"job", schema.SeriesHashColumn}, + }, + }, + { + description: "honorProjectionHints=true, V2 blocks, should not duplicate hash column if already present", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job", schema.SeriesHashColumn}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: true, + ProjectionLabels: []string{"job", schema.SeriesHashColumn}, + }, + }, + { + description: "honorProjectionHints=true, Mixed V1/V2 blocks, should reset projection", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{"job"}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + createTestParquetBlock(t, false), // v1 + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: false, + ProjectionLabels: nil, + }, + }, + { + description: "honorProjectionHints=true, nil query hints, should return default hints", + honorProjectionHints: true, + queryHints: nil, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: false, + ProjectionLabels: nil, + }, + }, + { + description: "honorProjectionHints=true, Empty projection labels, should add hash column only", + honorProjectionHints: true, + queryHints: &storepb.QueryHints{ + ProjectionInclude: true, + ProjectionLabels: []string{}, + }, + shards: []*parquetBlock{ + createTestParquetBlock(t, true), + }, + expectedHints: &prom_storage.SelectHints{ + Start: minT, + End: maxT, + ProjectionInclude: true, + ProjectionLabels: []string{schema.SeriesHashColumn}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + store := &parquetBucketStore{ + honorProjectionHints: tc.honorProjectionHints, + } + shards := tc.shards + hints := store.buildSelectHints(tc.queryHints, shards, minT, maxT) + require.Equal(t, tc.expectedHints, hints) + }) + } +} diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index b51bf758ae5..6eae37927f9 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -255,13 +255,14 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits) store := &parquetBucketStore{ - logger: userLogger, - bucket: userBucket, - limits: u.limits, - concurrency: 4, // TODO: make this configurable - chunksDecoder: u.chunksDecoder, - matcherCache: u.matcherCache, - parquetShardCache: u.parquetShardCache, + logger: userLogger, + bucket: userBucket, + limits: u.limits, + concurrency: 4, // TODO: make this configurable + chunksDecoder: u.chunksDecoder, + matcherCache: u.matcherCache, + parquetShardCache: u.parquetShardCache, + honorProjectionHints: u.cfg.BucketStore.HonorProjectionHints, } return store, nil @@ -310,7 +311,7 @@ func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, s if err != nil { return nil, err } - m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback, false) + m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback, p.honorProjectionHints) if err != nil { return nil, err } @@ -365,7 +366,7 @@ func (f *shardMatcherLabelsFilter) Close() { f.shardMatcher.Close() } -func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { +func (b *parquetBlock) Query(ctx context.Context, hints *prom_storage.SelectHints, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(b.concurrency) @@ -395,7 +396,7 @@ func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks b return nil } - seriesSetIter, err := b.m.Materialize(ctx, nil, rgi, mint, maxt, skipChunks, rr) + seriesSetIter, err := b.m.Materialize(ctx, hints, rgi, hints.Start, hints.End, skipChunks, rr) if err != nil { return err } @@ -532,6 +533,18 @@ func (b *parquetBlock) allLabelValues(ctx context.Context, name string, limit in return util.MergeUnsortedSlices(int(limit), results...), nil } +// hasHashColumn checks if the parquet block contains the schema.SeriesHashColumn column. +// This is used to determine if projection pushdown can be enabled. +func (b *parquetBlock) hasHashColumn() bool { + labelsFile := b.shard.LabelsFile() + if labelsFile == nil { + return false + } + + _, found := labelsFile.Schema().Lookup(schema.SeriesHashColumn) + return found +} + type byLabels []prom_storage.ChunkSeries func (b byLabels) Len() int { return len(b) } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 84724f8d76e..247b3ec4765 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -1455,6 +1455,12 @@ "x-cli-flag": "blocks-storage.bucket-store.consistency-delay", "x-format": "duration" }, + "honor_projection_hints": { + "default": false, + "description": "[Experimental] If enabled, Store Gateway will honor projection hints and only materialize requested labels. It is only effect when `-blocks-storage.bucket-store.bucket-store-type` is parquet.", + "type": "boolean", + "x-cli-flag": "blocks-storage.bucket-store.honor-projection-hints" + }, "ignore_blocks_before": { "default": "0s", "description": "The blocks created before `now() - ignore_blocks_before` will not be synced. 0 to disable.",