Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481
* [FEATURE] Parquet: Support sharded parquet file conversion and querying. #7610
* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476
* [FEATURE] Ingester: Add experimental head-only queried series metric. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Includes a `jsonEscape` template function for safely embedding expressions in JSON-encoded URL parameters (e.g., Grafana Explore panes). Supports Grafana Explore, Perses, and other UIs. #7302
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ parquet_converter:
# CLI flag: -parquet-converter.max-rows-per-row-group
[max_rows_per_row_group: <int> | default = 1000000]

# Maximum number of row groups per parquet shard. Each shard holds at most
# num-row-groups * max-rows-per-row-group series, so lowering this value
# splits a block into more parquet shards for better read parallelization. 0
# means unlimited (single shard).
# CLI flag: -parquet-converter.num-row-groups
[num_row_groups: <int> | default = 0]

# Enable disk-based write buffering to reduce memory consumption during
# parquet file generation.
# CLI flag: -parquet-converter.file-buffer-enabled
Expand Down
225 changes: 225 additions & 0 deletions integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,228 @@ func TestParquetProjectionPushdownFuzz(t *testing.T) {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}

func TestParquetMultiShardQuery(t *testing.T) {
for name, tc := range map[string]struct {
viaStoreGateway bool
// replicationFactor is the store-gateway sharding ring replication factor.
replicationFactor string
// extraStoreGateways is the number of additional store-gateway replicas to
// start alongside the one embedded in the single binary (target "all").
extraStoreGateways int
}{
"querier parquet queryable": {viaStoreGateway: false, replicationFactor: "1", extraStoreGateways: 0},
"store-gateway parquet bucket store": {viaStoreGateway: true, replicationFactor: "1", extraStoreGateways: 0},
"store-gateway parquet bucket store with replication": {viaStoreGateway: true, replicationFactor: "2", extraStoreGateways: 1},
} {
t.Run(name, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

const (
// 2 metrics * seriesPerMetric unique series. Sized together with the
// converter flags below so the block is split into exactly 2 shards.
seriesPerMetric = 10
totalSeries = seriesPerMetric * 2 // 20
maxRowsPerRowGroup = 10
numRowGroups = 1
expectedShards = 2 // ceil(20 / (1 * 10))
seriesSize = 10
)

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Don't query ingesters: the queried time range is older than this,
// so all data is served exclusively from parquet blocks.
"-limits.query-ingesters-within": "2h",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
"-parquet-converter.num-row-groups": strconv.Itoa(numRowGroups),
"-parquet-converter.max-rows-per-row-group": strconv.Itoa(maxRowsPerRowGroup),
},
)

if tc.viaStoreGateway {
// Route reads through the store-gateway's parquet bucket store.
flags = mergeFlags(flags, map[string]string{
"-blocks-storage.bucket-store.bucket-store-type": "parquet",
// Enable sharding so the querier discovers the store-gateway via the
// ring and routes block queries to it.
"-store-gateway.sharding-enabled": "true",
"-store-gateway.sharding-ring.store": "consul",
"-store-gateway.sharding-ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-store-gateway.sharding-ring.replication-factor": tc.replicationFactor,
// Disable the embedded parquet queryable so reads go to the store-gateway.
"-querier.enable-parquet-queryable": "false",
})
} else {
// Query directly via the querier's embedded parquet queryable.
flags = mergeFlags(flags, map[string]string{
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.enable-parquet-queryable": "true",
})
}

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := newFuzzRand(t)
dir := filepath.Join(s.SharedDir(), "data")
numSamples := 60
scrapeInterval := time.Minute
now := time.Now()
// Keep the whole range older than -limits.query-ingesters-within (2h)
// so queries are served exclusively from parquet blocks.
start := now.Add(-time.Hour * 24)
end := now.Add(-time.Hour * 3)

// Generate unique series so the converter produces a deterministic series count.
lbls := make([]labels.Labels, 0, totalSeries)
for i := 0; i < seriesPerMetric; i++ {
lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_a", "job", "test", "instance", strconv.Itoa(i)))
lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_b", "job", "test", "instance", strconv.Itoa(i)))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), seriesSize)
require.NoError(t, err)
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

// Upload the block before starting cortex so the first compactor scan finds
// the complete block and includes it in the bucket index immediately.
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Start additional store-gateway replicas
for i := 0; i < tc.extraStoreGateways; i++ {
storeGateway := e2ecortex.NewStoreGateway(
fmt.Sprintf("store-gateway-%d", i+1),
e2ecortex.RingStoreConsul,
consul.NetworkHTTPEndpoint(),
// Override the target so this instance only runs the store-gateway
mergeFlags(flags, map[string]string{"-target": "store-gateway"}),
"",
)
require.NoError(t, s.StartAndWaitReady(storeGateway))
}

// Ensure all store-gateways (embedded + extra replicas) are ACTIVE
if tc.extraStoreGateways > 0 {
expectedStoreGateways := float64(1 + tc.extraStoreGateways)
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(expectedStoreGateways), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "store-gateway"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
}

// Wait until the block is converted to parquet and the bucket index is updated.
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
found := false
foundBucketIndex := false
err := bkt.Iter(context.Background(), "", func(name string) error {
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
found = true
}
if name == "bucket-index.json.gz" {
foundBucketIndex = true
}
return nil
}, objstore.WithRecursiveIter())
require.NoError(t, err)
return found && foundBucketIndex
})

// Verify the converter actually split the block into the expected number of shards.
marker, err := cortex_parquet.ReadConverterMark(ctx, id, bkt, log.Logger)
require.NoError(t, err)
require.Equal(t, expectedShards, marker.Shards, "block should be split into multiple parquet shards")

// Verify each shard's parquet files (labels + chunks) exist in object storage.
for shardID := 0; shardID < expectedShards; shardID++ {
labelsFile := fmt.Sprintf("%s/%d.labels.parquet", id.String(), shardID)
chunksFile := fmt.Sprintf("%s/%d.chunks.parquet", id.String(), shardID)

exists, err := bkt.Exists(ctx, labelsFile)
require.NoError(t, err)
require.True(t, exists, "labels parquet file should exist for shard %d", shardID)

exists, err = bkt.Exists(ctx, chunksFile)
require.NoError(t, err)
require.True(t, exists, "chunks parquet file should exist for shard %d", shardID)
}

// Verify the block is registered in the bucket index as a parquet block with the expected shard count.
cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} {
idx, err := bucketindex.ReadIndex(ctx, storage.GetBucket(), "user-1", nil, log.Logger)
if err != nil {
return false
}
for _, b := range idx.Blocks {
if b.ID == id && b.Parquet != nil && b.Parquet.Shards == expectedShards {
return true
}
}
return false
})

c, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Wait until all series are queryable across both shards.
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
labelSets, err := c.Series([]string{`{job="test"}`}, start, end)
if err != nil {
return false
}
return len(labelSets) == totalSeries
})

rangeRes, err := c.QueryRange(`test_series_a`, start, end, scrapeInterval)
require.NoError(t, err)
rangeMatrix, ok := rangeRes.(model.Matrix)
require.True(t, ok)
require.Len(t, rangeMatrix, seriesPerMetric)

if tc.viaStoreGateway {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_storegateway_instances_hit_per_query"}, e2e.WithMetricCount, e2e.SkipMissingMetrics))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_querier_blocks_consistency_checks_total"}, e2e.SkipMissingMetrics))
} else {
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func (c *Config) Validate(log log.Logger) error {
return errors.Wrap(err, "invalid tracing config")
}

if err := c.ParquetConverter.Validate(); err != nil {
return errors.Wrap(err, "invalid parquet-converter config")
}

return nil
}

Expand Down
47 changes: 32 additions & 15 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ const (

var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

var errInvalidNumRowGroups = errors.New("invalid -parquet-converter.num-row-groups: must be greater than or equal to 0")

type Config struct {
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
NumRowGroups int `yaml:"num_row_groups"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`

DataDir string `yaml:"data_dir"`
Expand Down Expand Up @@ -107,10 +110,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Local directory path for caching TSDB blocks during parquet conversion.")
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Maximum concurrent goroutines for downloading block metadata from object storage.")
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.")
f.IntVar(&cfg.NumRowGroups, "parquet-converter.num-row-groups", 0, "Maximum number of row groups per parquet shard. Each shard holds at most num-row-groups * max-rows-per-row-group series, so lowering this value splits a block into more parquet shards for better read parallelization. 0 means unlimited (single shard).")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.")
}

func (cfg *Config) Validate() error {
if cfg.NumRowGroups < 0 {
return errInvalidNumRowGroups
}
return nil
}

func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
bkt, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "parquet-converter", logger, registerer)
if err != nil {
Expand All @@ -126,22 +137,28 @@ func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockR
}

func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides, usersScanner users.Scanner) *Converter {
baseConverterOptions := []convert.ConvertOption{
convert.WithColDuration(time.Hour * 8),
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
}

if cfg.NumRowGroups > 0 {
baseConverterOptions = append(baseConverterOptions, convert.WithNumRowGroups(cfg.NumRowGroups))
}

c := &Converter{
cfg: cfg,
reg: registerer,
storageCfg: storageCfg,
logger: logger,
limits: limits,
usersScanner: usersScanner,
pool: chunkenc.NewPool(),
blockRanges: blockRanges,
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
metrics: newMetrics(registerer),
bkt: bkt,
baseConverterOptions: []convert.ConvertOption{
convert.WithColDuration(time.Hour * 8),
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
},
cfg: cfg,
reg: registerer,
storageCfg: storageCfg,
logger: logger,
limits: limits,
usersScanner: usersScanner,
pool: chunkenc.NewPool(),
blockRanges: blockRanges,
fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil),
metrics: newMetrics(registerer),
bkt: bkt,
baseConverterOptions: baseConverterOptions,
}

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
Expand Down
Loading
Loading