From 095efb70de7264e66995287ec317df6a86d9efd7 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 20 Apr 2026 15:47:45 -0700 Subject: [PATCH 1/6] implement resource based throttling on querier Signed-off-by: Essam Eldaly --- pkg/cortex/cortex.go | 2 +- pkg/cortex/modules.go | 6 +- pkg/querier/querier.go | 69 ++++++++++++++++++++-- pkg/querier/querier_test.go | 114 ++++++++++++++++++++++++++++++------ pkg/ruler/ruler_test.go | 2 +- 5 files changed, 167 insertions(+), 26 deletions(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 08dcd2d3fbb..36eb3779313 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -229,7 +229,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.Distributor.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid distributor config") } - if err := c.Querier.Validate(); err != nil { + if err := c.Querier.Validate(c.ResourceMonitor.Resources); err != nil { return errors.Wrap(err, "invalid querier config") } if c.Querier.TimeoutClassificationEnabled && !c.Frontend.Handler.QueryStatsEnabled { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 805c2095429..6e6cfc523be 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -284,7 +284,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) // Create a querier queryable and PromQL engine - t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData) + t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData, t.ResourceMonitor) // Use distributor as default MetadataQuerier t.MetadataQuerier = t.Distributor @@ -701,7 +701,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { queryEngine = engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer) } else { // TODO: Consider wrapping logger to differentiate from querier module logger - queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData) + queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData, nil) } managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, queryEngine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) @@ -949,7 +949,7 @@ func (t *Cortex) setupModuleManager() error { Ingester: {IngesterService, OverridesConfig, API}, IngesterService: {OverridesConfig, RuntimeConfig, MemberlistKV, ResourceMonitor}, Flusher: {OverridesConfig, API}, - Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV}, + Queryable: {OverridesConfig, DistributorService, OverridesConfig, Ring, API, StoreQueryable, MemberlistKV, ResourceMonitor}, Querier: {TenantFederation}, StoreQueryable: {OverridesConfig, OverridesConfig, MemberlistKV, GrpcClientService}, QueryFrontendTripperware: {API, OverridesConfig}, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index c04d320268d..8cb23cf3112 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" + "github.com/cortexproject/cortex/pkg/configs" "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/lazyquery" @@ -32,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/parquetutil" + "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -103,6 +105,9 @@ type Config struct { TimeoutClassificationEnabled bool `yaml:"timeout_classification_enabled"` TimeoutClassificationDeadline time.Duration `yaml:"timeout_classification_deadline"` TimeoutClassificationEvalThreshold time.Duration `yaml:"timeout_classification_eval_threshold"` + + // Query protection: resource-based rejection. + QueryProtection configs.QueryProtection `yaml:"query_protection"` } var ( @@ -161,10 +166,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.TimeoutClassificationEnabled, "querier.timeout-classification-enabled", false, "If true, classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing.") f.DurationVar(&cfg.TimeoutClassificationDeadline, "querier.timeout-classification-deadline", time.Minute+59*time.Second, "The total time before the querier proactively cancels a query for timeout classification. Set this a few seconds less than the querier timeout.") f.DurationVar(&cfg.TimeoutClassificationEvalThreshold, "querier.timeout-classification-eval-threshold", time.Minute+30*time.Second, "Eval time threshold above which a timeout is classified as user error (4XX).") + cfg.QueryProtection.RegisterFlagsWithPrefix(f, "querier.") } // Validate the config -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(monitoredResources flagext.StringSliceCSV) error { if cfg.ResponseCompression != "" && cfg.ResponseCompression != "gzip" && cfg.ResponseCompression != "snappy" && cfg.ResponseCompression != "zstd" { return errUnsupportedResponseCompression @@ -207,6 +213,10 @@ func (cfg *Config) Validate() error { return err } + if err := cfg.QueryProtection.Validate(monitoredResources); err != nil { + return err + } + return nil } @@ -223,9 +233,28 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { } // New builds a queryable and promql engine. -func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { +func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc, resourceMonitor resource.IMonitor) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) + // Create resource-based limiter if resource monitor is available and thresholds are configured. + var resourceBasedLimiter *limiter.ResourceBasedLimiter + if resourceMonitor != nil { + resourceLimits := make(map[resource.Type]float64) + if cfg.QueryProtection.Rejection.Threshold.CPUUtilization > 0 { + resourceLimits[resource.CPU] = cfg.QueryProtection.Rejection.Threshold.CPUUtilization + } + if cfg.QueryProtection.Rejection.Threshold.HeapUtilization > 0 { + resourceLimits[resource.Heap] = cfg.QueryProtection.Rejection.Threshold.HeapUtilization + } + if len(resourceLimits) > 0 { + var err error + resourceBasedLimiter, err = limiter.NewResourceBasedLimiter(resourceMonitor, resourceLimits, reg, "querier") + if err != nil { + level.Error(logger).Log("msg", "failed to create resource based limiter for querier", "err", err) + } + } + } + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil) ns := make([]QueryableWithFilter, len(stores)) @@ -235,7 +264,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor limits: limits, } } - queryable := NewQueryable(distributorQueryable, ns, cfg, limits) + queryable := NewQueryable(distributorQueryable, ns, cfg, limits, resourceBasedLimiter, logger) exemplarQueryable := newDistributorExemplarQueryable(distributor) lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) { @@ -311,7 +340,7 @@ type limiterHolder struct { } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides) storage.Queryable { +func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides, resourceBasedLimiter *limiter.ResourceBasedLimiter, logger log.Logger) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ now: time.Now(), @@ -324,6 +353,8 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, distributor: distributor, stores: stores, limiterHolder: &limiterHolder{}, + resourceBasedLimiter: resourceBasedLimiter, + logger: logger, } return q, nil @@ -339,6 +370,8 @@ type querier struct { distributor QueryableWithFilter stores []QueryableWithFilter limiterHolder *limiterHolder + resourceBasedLimiter *limiter.ResourceBasedLimiter + logger log.Logger ignoreMaxQueryLength bool } @@ -390,6 +423,11 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, *querier_st // Select implements storage.Querier interface. // The bool passed is ignored because the series is always sorted. func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + // Check resource utilization before processing the query. + if err := q.checkResourceUtilization(); err != nil { + return storage.ErrSeriesSet(err) + } + ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx) if err == errEmptyTimeRange { return storage.EmptySeriesSet() @@ -490,6 +528,11 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select // LabelValues implements storage.Querier. func (q querier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + // Check resource utilization before processing the query. + if err := q.checkResourceUtilization(); err != nil { + return nil, nil, err + } + ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx) if err == errEmptyTimeRange { return nil, nil, nil @@ -558,6 +601,11 @@ func (q querier) LabelValues(ctx context.Context, name string, hints *storage.La } func (q querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + // Check resource utilization before processing the query. + if err := q.checkResourceUtilization(); err != nil { + return nil, nil, err + } + ctx, stats, userID, mint, maxt, metadataQuerier, queriers, err := q.setupFromCtx(ctx) if err == errEmptyTimeRange { return nil, nil, nil @@ -625,6 +673,19 @@ func (q querier) LabelNames(ctx context.Context, hints *storage.LabelHints, matc return strutil.MergeSlices(limit, sets...), warnings, nil } +func (q querier) checkResourceUtilization() error { + if q.resourceBasedLimiter == nil { + return nil + } + + if err := q.resourceBasedLimiter.AcceptNewRequest(); err != nil { + level.Warn(q.logger).Log("msg", "querier failed to accept request due to resource utilization", "err", err) + return limiter.ErrResourceLimitReached + } + + return nil +} + func (querier) Close() error { return nil } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c623aa5abf4..e8ee70e5581 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -40,6 +40,8 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/limiter" + "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -337,7 +339,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queryable := range tc.storeQueriables { wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable}) } - queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger()) opts := promql.EngineOpts{ Logger: promslog.NewNopLogger(), MaxSamples: 1e6, @@ -528,7 +530,7 @@ func TestLimits(t *testing.T) { } overrides := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) - queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger()) opts := promql.EngineOpts{ Logger: promslog.NewNopLogger(), MaxSamples: 1e6, @@ -584,7 +586,7 @@ func TestQuerier(t *testing.T) { overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) testRangeQuery(t, queryable, queryEngine, through, query, enc) }) } @@ -605,7 +607,7 @@ func TestQuerierMetric(t *testing.T) { queryables := []QueryableWithFilter{} r := prometheus.NewRegistry() reg := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, r) - New(cfg, overrides, distributor, queryables, reg, log.NewNopLogger(), nil) + New(cfg, overrides, distributor, queryables, reg, log.NewNopLogger(), nil, nil) assert.NoError(t, promutil.GatherAndCompare(r, strings.NewReader(` # HELP cortex_max_concurrent_queries The maximum number of concurrent queries. # TYPE cortex_max_concurrent_queries gauge @@ -688,7 +690,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { overrides := validation.NewOverrides(limits, nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) @@ -781,7 +783,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "0") queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.queryStartTime, c.queryEndTime, time.Minute) require.NoError(t, err) @@ -873,7 +875,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) queryEngine := promql.NewEngine(opts) ctx := user.InjectOrgID(context.Background(), "test") @@ -911,7 +913,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Series(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") now := time.Now() @@ -969,7 +971,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") @@ -1119,7 +1121,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor := &MockDistributor{} distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, testData.query, testData.queryStartTime, testData.queryEndTime, time.Minute) require.NoError(t, err) @@ -1147,7 +1149,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1188,7 +1190,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1216,7 +1218,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1243,7 +1245,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1580,7 +1582,7 @@ func TestShortTermQueryToLTS(t *testing.T) { limits.QueryStoreAfter = model.Duration(c.queryStoreAfter) overrides := validation.NewOverrides(limits, nil) - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil) + queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "0") query, err := engine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) @@ -1754,7 +1756,7 @@ func TestConfig_Validate(t *testing.T) { flagext.DefaultValues(cfg) testData.setup(cfg) - assert.Equal(t, testData.expected, cfg.Validate()) + assert.Equal(t, testData.expected, cfg.Validate(nil)) }) } } @@ -1900,7 +1902,7 @@ func TestQuerier_ProjectionHints(t *testing.T) { wDistributorQueryable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: distributorQueryable} - queryable := NewQueryable(wDistributorQueryable, []QueryableWithFilter{storeQueryable}, cfg, overrides) + queryable := NewQueryable(wDistributorQueryable, []QueryableWithFilter{storeQueryable}, cfg, overrides, nil, log.NewNopLogger()) q, err := queryable.Querier(util.TimeToMillis(start), util.TimeToMillis(end)) require.NoError(t, err) @@ -1935,3 +1937,81 @@ func TestQuerier_ProjectionHints(t *testing.T) { }) } } + +func TestQuerier_ResourceBasedLimiter(t *testing.T) { + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.MaxQueryIntoFuture = 0 + + limits := DefaultLimitsConfig() + overrides := validation.NewOverrides(limits, nil) + + mockMonitor := &limiter.MockMonitor{ + CpuUtilization: 0.9, + HeapUtilization: 0.9, + } + + resourceLimits := map[resource.Type]float64{ + resource.CPU: 0.8, + resource.Heap: 0.8, + } + + resourceBasedLimiter, err := limiter.NewResourceBasedLimiter(mockMonitor, resourceLimits, nil, "querier") + require.NoError(t, err) + + chunkStore := &errDistributor{} + distributorQueryable := newDistributorQueryable(chunkStore, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, overrides, nil) + + queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, resourceBasedLimiter, log.NewNopLogger()) + + ctx := user.InjectOrgID(context.Background(), "test") + q, err := queryable.Querier(util.TimeToMillis(time.Now().Add(-1*time.Hour)), util.TimeToMillis(time.Now())) + require.NoError(t, err) + + // Select should fail due to resource limit. + ss := q.Select(ctx, true, &storage.SelectHints{Start: util.TimeToMillis(time.Now().Add(-1 * time.Hour)), End: util.TimeToMillis(time.Now())}) + require.Error(t, ss.Err()) + require.ErrorIs(t, ss.Err(), limiter.ErrResourceLimitReached) + + // LabelValues should fail due to resource limit. + _, _, err = q.LabelValues(ctx, "__name__", nil) + require.Error(t, err) + require.ErrorIs(t, err, limiter.ErrResourceLimitReached) + + // LabelNames should fail due to resource limit. + _, _, err = q.LabelNames(ctx, nil) + require.Error(t, err) + require.ErrorIs(t, err, limiter.ErrResourceLimitReached) +} + +func TestQuerier_ResourceBasedLimiter_Nil(t *testing.T) { + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.MaxQueryIntoFuture = 0 + + limits := DefaultLimitsConfig() + overrides := validation.NewOverrides(limits, nil) + + distributor := &MockDistributor{} + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + distributor.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) + distributor.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) + + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, overrides, nil) + + // nil resourceBasedLimiter should not block queries. + queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, nil, log.NewNopLogger()) + + ctx := user.InjectOrgID(context.Background(), "test") + q, err := queryable.Querier(util.TimeToMillis(time.Now().Add(-1*time.Hour)), util.TimeToMillis(time.Now())) + require.NoError(t, err) + + ss := q.Select(ctx, true, &storage.SelectHints{Start: util.TimeToMillis(time.Now().Add(-1 * time.Hour)), End: util.TimeToMillis(time.Now())}) + require.NoError(t, ss.Err()) + + _, _, err = q.LabelValues(ctx, "__name__", nil) + require.NoError(t, err) + + _, _, err = q.LabelNames(ctx, nil) + require.NoError(t, err) +} diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index e5738945cb4..b9f30118e88 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -227,7 +227,7 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg querierTestConfig.Cfg.ActiveQueryTrackerDir = "" overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil) + q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil, nil) return func(mint, maxt int64) (storage.Querier, error) { return q.Querier(mint, maxt) } From 8ec1bde768bbe095d07a877f3a0d1ad370aba3b9 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 20 Apr 2026 16:05:51 -0700 Subject: [PATCH 2/6] make doc Signed-off-by: Essam Eldaly --- docs/blocks-storage/querier.md | 17 +++++++++++++ docs/configuration/config-file-reference.md | 17 +++++++++++++ schemas/cortex-config-schema.json | 27 +++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 5ceb537848d..f75c27c1e71 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -313,6 +313,23 @@ querier: # Eval time threshold above which a timeout is classified as user error (4XX). # CLI flag: -querier.timeout-classification-eval-threshold [timeout_classification_eval_threshold: | default = 1m30s] + + query_protection: + rejection: + threshold: + # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization + [heap_utilization: | default = 0] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2dc7d8bfe12..146170047c6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4989,6 +4989,23 @@ thanos_engine: # Eval time threshold above which a timeout is classified as user error (4XX). # CLI flag: -querier.timeout-classification-eval-threshold [timeout_classification_eval_threshold: | default = 1m30s] + +query_protection: + rejection: + threshold: + # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, between + # 0 and 1. monitored_resources config must include the resource type. 0 to + # disable. + # CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # rejecting new query request (across all tenants) in percentage, between + # 0 and 1. monitored_resources config must include the resource type. 0 to + # disable. + # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization + [heap_utilization: | default = 0] ``` ### `query_frontend_config` diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index feb746bdb22..4a64ff0d2d1 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -6034,6 +6034,33 @@ "type": "boolean", "x-cli-flag": "querier.per-step-stats-enabled" }, + "query_protection": { + "properties": { + "rejection": { + "properties": { + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.rejection.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.rejection.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + } + }, + "type": "object" + }, "response_compression": { "default": "gzip", "description": "Use compression for metrics query API or instant and range query APIs. Supported compression 'gzip', 'snappy', 'zstd' and '' (disable compression)", From a46cd3c78d85c510e856918cf90c3967ce59c73b Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 20 Apr 2026 16:16:57 -0700 Subject: [PATCH 3/6] Update changelog Signed-off-by: Essam Eldaly --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66d2be50ef2..82746f134b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 +* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 From c94cab203d28e70ca4105c570e263d29d99d34ea Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Fri, 24 Apr 2026 14:08:46 -0700 Subject: [PATCH 4/6] fix doc ingester -> instance Signed-off-by: Essam Eldaly --- docs/blocks-storage/querier.md | 4 ++-- docs/configuration/config-file-reference.md | 12 ++++++------ pkg/configs/query_protection.go | 4 ++-- schemas/cortex-config-schema.json | 12 ++++++------ 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index f75c27c1e71..b92a6b34b38 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -317,14 +317,14 @@ querier: query_protection: rejection: threshold: - # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # EXPERIMENTAL: Max CPU utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, # between 0 and 1. monitored_resources config must include the resource # type. 0 to disable. # CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization [cpu_utilization: | default = 0] - # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # EXPERIMENTAL: Max heap utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, # between 0 and 1. monitored_resources config must include the resource # type. 0 to disable. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 146170047c6..ff5deaee4d6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3859,14 +3859,14 @@ instance_limits: query_protection: rejection: threshold: - # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # EXPERIMENTAL: Max CPU utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, between # 0 and 1. monitored_resources config must include the resource type. 0 to # disable. # CLI flag: -ingester.query-protection.rejection.threshold.cpu-utilization [cpu_utilization: | default = 0] - # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # EXPERIMENTAL: Max heap utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, between # 0 and 1. monitored_resources config must include the resource type. 0 to # disable. @@ -4993,14 +4993,14 @@ thanos_engine: query_protection: rejection: threshold: - # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # EXPERIMENTAL: Max CPU utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, between # 0 and 1. monitored_resources config must include the resource type. 0 to # disable. # CLI flag: -querier.query-protection.rejection.threshold.cpu-utilization [cpu_utilization: | default = 0] - # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # EXPERIMENTAL: Max heap utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, between # 0 and 1. monitored_resources config must include the resource type. 0 to # disable. @@ -6761,14 +6761,14 @@ sharding_ring: query_protection: rejection: threshold: - # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # EXPERIMENTAL: Max CPU utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, between # 0 and 1. monitored_resources config must include the resource type. 0 to # disable. # CLI flag: -store-gateway.query-protection.rejection.threshold.cpu-utilization [cpu_utilization: | default = 0] - # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # EXPERIMENTAL: Max heap utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, between # 0 and 1. monitored_resources config must include the resource type. 0 to # disable. diff --git a/pkg/configs/query_protection.go b/pkg/configs/query_protection.go index 756a9f3620b..7b48e9b2def 100644 --- a/pkg/configs/query_protection.go +++ b/pkg/configs/query_protection.go @@ -23,8 +23,8 @@ type threshold struct { } func (cfg *QueryProtection) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.Float64Var(&cfg.Rejection.Threshold.CPUUtilization, prefix+"query-protection.rejection.threshold.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") - f.Float64Var(&cfg.Rejection.Threshold.HeapUtilization, prefix+"query-protection.rejection.threshold.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + f.Float64Var(&cfg.Rejection.Threshold.CPUUtilization, prefix+"query-protection.rejection.threshold.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + f.Float64Var(&cfg.Rejection.Threshold.HeapUtilization, prefix+"query-protection.rejection.threshold.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") } func (cfg *QueryProtection) Validate(monitoredResources flagext.StringSliceCSV) error { diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 4a64ff0d2d1..ef3aa180d5c 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4887,13 +4887,13 @@ "properties": { "cpu_utilization": { "default": 0, - "description": "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", "type": "number", "x-cli-flag": "ingester.query-protection.rejection.threshold.cpu-utilization" }, "heap_utilization": { "default": 0, - "description": "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", "type": "number", "x-cli-flag": "ingester.query-protection.rejection.threshold.heap-utilization" } @@ -6042,13 +6042,13 @@ "properties": { "cpu_utilization": { "default": 0, - "description": "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", "type": "number", "x-cli-flag": "querier.query-protection.rejection.threshold.cpu-utilization" }, "heap_utilization": { "default": 0, - "description": "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", "type": "number", "x-cli-flag": "querier.query-protection.rejection.threshold.heap-utilization" } @@ -8408,13 +8408,13 @@ "properties": { "cpu_utilization": { "default": 0, - "description": "EXPERIMENTAL: Max CPU utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", "type": "number", "x-cli-flag": "store-gateway.query-protection.rejection.threshold.cpu-utilization" }, "heap_utilization": { "default": 0, - "description": "EXPERIMENTAL: Max heap utilization that this ingester can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", "type": "number", "x-cli-flag": "store-gateway.query-protection.rejection.threshold.heap-utilization" } From 512786934a7a3a6737cdd1d598ae1dc7da238197 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 27 Apr 2026 14:01:41 -0700 Subject: [PATCH 5/6] fix doc Signed-off-by: Essam Eldaly --- docs/blocks-storage/store-gateway.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 22e21cec67f..a90cf65880b 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -358,14 +358,14 @@ store_gateway: query_protection: rejection: threshold: - # EXPERIMENTAL: Max CPU utilization that this ingester can reach before + # EXPERIMENTAL: Max CPU utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, # between 0 and 1. monitored_resources config must include the resource # type. 0 to disable. # CLI flag: -store-gateway.query-protection.rejection.threshold.cpu-utilization [cpu_utilization: | default = 0] - # EXPERIMENTAL: Max heap utilization that this ingester can reach before + # EXPERIMENTAL: Max heap utilization that this instance can reach before # rejecting new query request (across all tenants) in percentage, # between 0 and 1. monitored_resources config must include the resource # type. 0 to disable. From 40d2bad85200ecd9f4793deadc5f81773bb494fe Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 27 Apr 2026 15:18:08 -0700 Subject: [PATCH 6/6] Add metric for queries rejected Signed-off-by: Essam Eldaly --- pkg/querier/querier.go | 62 +++++++++++++++++++++++-------------- pkg/querier/querier_test.go | 20 +++++++++--- 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8cb23cf3112..8c701283bc1 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -264,7 +264,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor limits: limits, } } - queryable := NewQueryable(distributorQueryable, ns, cfg, limits, resourceBasedLimiter, logger) + queryable := NewQueryable(distributorQueryable, ns, cfg, limits, resourceBasedLimiter, logger, reg) exemplarQueryable := newDistributorExemplarQueryable(distributor) lazyQueryable := storage.QueryableFunc(func(mint int64, maxt int64) (storage.Querier, error) { @@ -340,21 +340,31 @@ type limiterHolder struct { } // NewQueryable creates a new Queryable for cortex. -func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides, resourceBasedLimiter *limiter.ResourceBasedLimiter, logger log.Logger) storage.Queryable { +func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, cfg Config, limits *validation.Overrides, resourceBasedLimiter *limiter.ResourceBasedLimiter, logger log.Logger, reg prometheus.Registerer) storage.Queryable { + var rejectedRequestsCounter *prometheus.CounterVec + if resourceBasedLimiter != nil { + rejectedRequestsCounter = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "querier_rejected_requests_total", + Help: "Total number of queries rejected by resource based throttling.", + }, []string{"reason"}) + } + return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ - now: time.Now(), - mint: mint, - maxt: maxt, - limits: limits, - maxQueryIntoFuture: cfg.MaxQueryIntoFuture, - ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength, - honorProjectionHints: cfg.HonorProjectionHints, - distributor: distributor, - stores: stores, - limiterHolder: &limiterHolder{}, - resourceBasedLimiter: resourceBasedLimiter, - logger: logger, + now: time.Now(), + mint: mint, + maxt: maxt, + limits: limits, + maxQueryIntoFuture: cfg.MaxQueryIntoFuture, + ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength, + honorProjectionHints: cfg.HonorProjectionHints, + distributor: distributor, + stores: stores, + limiterHolder: &limiterHolder{}, + resourceBasedLimiter: resourceBasedLimiter, + rejectedRequestsCounter: rejectedRequestsCounter, + logger: logger, } return q, nil @@ -362,16 +372,17 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, } type querier struct { - now time.Time - mint, maxt int64 - limits *validation.Overrides - maxQueryIntoFuture time.Duration - honorProjectionHints bool - distributor QueryableWithFilter - stores []QueryableWithFilter - limiterHolder *limiterHolder - resourceBasedLimiter *limiter.ResourceBasedLimiter - logger log.Logger + now time.Time + mint, maxt int64 + limits *validation.Overrides + maxQueryIntoFuture time.Duration + honorProjectionHints bool + distributor QueryableWithFilter + stores []QueryableWithFilter + limiterHolder *limiterHolder + resourceBasedLimiter *limiter.ResourceBasedLimiter + rejectedRequestsCounter *prometheus.CounterVec + logger log.Logger ignoreMaxQueryLength bool } @@ -680,6 +691,9 @@ func (q querier) checkResourceUtilization() error { if err := q.resourceBasedLimiter.AcceptNewRequest(); err != nil { level.Warn(q.logger).Log("msg", "querier failed to accept request due to resource utilization", "err", err) + if q.rejectedRequestsCounter != nil { + q.rejectedRequestsCounter.WithLabelValues("resource_utilization").Inc() + } return limiter.ErrResourceLimitReached } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index e8ee70e5581..a072abc2221 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1,6 +1,7 @@ package querier import ( + "bytes" "context" "fmt" "strconv" @@ -339,7 +340,7 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { for _, queryable := range tc.storeQueriables { wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queryable}) } - queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger()) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger(), nil) opts := promql.EngineOpts{ Logger: promslog.NewNopLogger(), MaxSamples: 1e6, @@ -530,7 +531,7 @@ func TestLimits(t *testing.T) { } overrides := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit) - queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger()) + queryable := NewQueryable(wDistributorQueriable, wQueriables, cfg, overrides, nil, log.NewNopLogger(), nil) opts := promql.EngineOpts{ Logger: promslog.NewNopLogger(), MaxSamples: 1e6, @@ -1902,7 +1903,7 @@ func TestQuerier_ProjectionHints(t *testing.T) { wDistributorQueryable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: distributorQueryable} - queryable := NewQueryable(wDistributorQueryable, []QueryableWithFilter{storeQueryable}, cfg, overrides, nil, log.NewNopLogger()) + queryable := NewQueryable(wDistributorQueryable, []QueryableWithFilter{storeQueryable}, cfg, overrides, nil, log.NewNopLogger(), nil) q, err := queryable.Querier(util.TimeToMillis(start), util.TimeToMillis(end)) require.NoError(t, err) @@ -1962,7 +1963,8 @@ func TestQuerier_ResourceBasedLimiter(t *testing.T) { chunkStore := &errDistributor{} distributorQueryable := newDistributorQueryable(chunkStore, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, overrides, nil) - queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, resourceBasedLimiter, log.NewNopLogger()) + reg := prometheus.NewPedanticRegistry() + queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, resourceBasedLimiter, log.NewNopLogger(), reg) ctx := user.InjectOrgID(context.Background(), "test") q, err := queryable.Querier(util.TimeToMillis(time.Now().Add(-1*time.Hour)), util.TimeToMillis(time.Now())) @@ -1982,6 +1984,14 @@ func TestQuerier_ResourceBasedLimiter(t *testing.T) { _, _, err = q.LabelNames(ctx, nil) require.Error(t, err) require.ErrorIs(t, err, limiter.ErrResourceLimitReached) + + // Verify the rejected requests metric was incremented for all 3 rejected queries. + err = promutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_querier_rejected_requests_total Total number of queries rejected by resource based throttling. + # TYPE cortex_querier_rejected_requests_total counter + cortex_querier_rejected_requests_total{reason="resource_utilization"} 3 + `), "cortex_querier_rejected_requests_total") + require.NoError(t, err) } func TestQuerier_ResourceBasedLimiter_Nil(t *testing.T) { @@ -2000,7 +2010,7 @@ func TestQuerier_ResourceBasedLimiter_Nil(t *testing.T) { distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, batch.NewChunkMergeIterator, nil, 1, overrides, nil) // nil resourceBasedLimiter should not block queries. - queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, nil, log.NewNopLogger()) + queryable := NewQueryable(distributorQueryable, nil, cfg, overrides, nil, log.NewNopLogger(), nil) ctx := user.InjectOrgID(context.Background(), "test") q, err := queryable.Querier(util.TimeToMillis(time.Now().Add(-1*time.Hour)), util.TimeToMillis(time.Now()))