From 9257eadf30302e3b7e68ffe55e3c60014344a3db Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 8 Nov 2024 12:16:09 -0800 Subject: [PATCH 1/6] add distributor per labelset ingestion rate metric Signed-off-by: Ben Ye --- pkg/distributor/distributor.go | 53 +++++++++++ pkg/distributor/distributor_test.go | 131 ++++++++++++++++++++++++++++ pkg/distributor/metrics.go | 105 ++++++++++++++++++++++ pkg/distributor/metrics_test.go | 122 ++++++++++++++++++++++++++ pkg/util/active_user.go | 19 ++++ pkg/util/active_user_test.go | 23 +++++ 6 files changed, 453 insertions(+) create mode 100644 pkg/distributor/metrics.go create mode 100644 pkg/distributor/metrics_test.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 05a57a6213..d297612ffe 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -64,6 +64,8 @@ const ( clearStaleIngesterMetricsInterval = time.Minute + labelSetMetricsTickInterval = 30 * time.Second + // mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and // it was based on empirical observation: See BenchmarkMergeSlicesParallel mergeSlicesParallelism = 8 @@ -107,6 +109,7 @@ type Distributor struct { // Metrics queryDuration *instrument.HistogramCollector receivedSamples *prometheus.CounterVec + receivedSamplesPerLabelSet *prometheus.CounterVec receivedExemplars *prometheus.CounterVec receivedMetadata *prometheus.CounterVec incomingSamples *prometheus.CounterVec @@ -125,6 +128,9 @@ type Distributor struct { validateMetrics *validation.ValidateMetrics asyncExecutor util.AsyncExecutor + + // Counter to track metrics per label set. + labelSetCounter *labelSetCounter } // Config contains the configuration required to @@ -290,6 +296,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), HATracker: haTracker, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), + labelSetCounter: newLabelSetCounter(), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -302,6 +309,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user", "type"}), + receivedSamplesPerLabelSet: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_received_samples_per_labelset_total", + Help: "The total number of received samples per label set, excluding rejected and deduped samples.", + }, []string{"user", "type", "labelset"}), receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_received_exemplars_total", @@ -449,6 +461,9 @@ func (d *Distributor) running(ctx context.Context) error { staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval) defer staleIngesterMetricTicker.Stop() + labelSetMetricsTicker := time.NewTicker(labelSetMetricsTickInterval) + defer labelSetMetricsTicker.Stop() + for { select { case <-ctx.Done(): @@ -460,6 +475,9 @@ func (d *Distributor) running(ctx context.Context) error { case <-staleIngesterMetricTicker.C: d.cleanStaleIngesterMetrics() + case <-labelSetMetricsTicker.C: + d.updateLabelSetMetrics() + case err := <-d.subservicesWatcher.Chan(): return errors.Wrap(err, "distributor subservice failed") } @@ -486,6 +504,10 @@ func (d *Distributor) cleanupInactiveUser(userID string) { level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err) } + if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": userID}); err != nil { + level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", userID, "err", err) + } + validation.DeletePerUserValidationMetrics(d.validateMetrics, userID, d.log) } @@ -777,6 +799,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return &cortexpb.WriteResponse{}, firstPartialErr } +func (d *Distributor) updateLabelSetMetrics() { + activeUserSet := make(map[string]map[uint64]struct{}) + for _, user := range d.activeUsers.ActiveUsers() { + limits := d.limits.LimitsPerLabelSet(user) + activeUserSet[user] = make(map[uint64]struct{}, len(limits)) + for _, l := range limits { + activeUserSet[user][l.Hash] = struct{}{} + } + } + + d.labelSetCounter.updateMetrics(activeUserSet, d.receivedSamplesPerLabelSet) +} + func (d *Distributor) cleanStaleIngesterMetrics() { healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend) if err != nil { @@ -888,7 +923,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write validatedFloatSamples := 0 validatedHistogramSamples := 0 validatedExemplars := 0 + limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID) + labelSetCounters := make(map[uint64]*samplesLabelSetEntry) var firstPartialErr error latestSampleTimestampMs := int64(0) @@ -1005,12 +1042,28 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write continue } + for _, l := range validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels)) { + if c, exists := labelSetCounters[l.Hash]; exists { + c.floatSamples += int64(len(ts.Samples)) + c.histogramSamples += int64(len(ts.Histograms)) + } else { + labelSetCounters[l.Hash] = &samplesLabelSetEntry{ + floatSamples: int64(len(ts.Samples)), + histogramSamples: int64(len(ts.Histograms)), + labels: l.LabelSet, + } + } + } + seriesKeys = append(seriesKeys, key) validatedTimeseries = append(validatedTimeseries, validatedSeries) validatedFloatSamples += len(ts.Samples) validatedHistogramSamples += len(ts.Histograms) validatedExemplars += len(ts.Exemplars) } + for h, counter := range labelSetCounters { + d.labelSetCounter.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples) + } return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2c95f82788..4becbcc89c 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -420,6 +420,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) { "cortex_distributor_metadata_in_total", "cortex_distributor_non_ha_samples_received_total", "cortex_distributor_latest_seen_sample_timestamp_seconds", + "cortex_distributor_received_samples_per_labelset_total", } allMetrics := append(removedMetrics, permanentMetrics...) @@ -438,6 +439,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) { d.nonHASamples.WithLabelValues("userA").Add(5) d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111) + d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeFloat, "{}").Add(5) + d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeHistogram, "{}").Add(10) h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend) ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr) @@ -473,6 +476,11 @@ func TestDistributor_MetricsCleanup(t *testing.T) { cortex_distributor_received_metadata_total{user="userA"} 5 cortex_distributor_received_metadata_total{user="userB"} 10 + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="userA"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="userA"} 10 + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{type="float",user="userA"} 5 @@ -4081,6 +4089,129 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing } } +func TestDistributor_PushLabelSetMetrics(t *testing.T) { + t.Parallel() + inputSeries := []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "three"}, + }, + } + + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{ + {Hash: 0, LabelSet: labels.FromStrings("cluster", "one")}, + {Hash: 1, LabelSet: labels.FromStrings("cluster", "two")}, + {Hash: 2, LabelSet: labels.EmptyLabels()}, + } + + ds, _, regs, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + reg := regs[0] + + // Push the series to the distributor + id := "user" + req := mockWriteRequest(inputSeries, 1, 1, false) + ctx := user.InjectOrgID(context.Background(), id) + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + + ds[0].updateLabelSetMetrics() + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0 + `), "cortex_distributor_received_samples_per_labelset_total")) + + // Push more series. + inputSeries = []labels.Labels{ + { + {Name: "__name__", Value: "baz"}, + {Name: "cluster", Value: "two"}, + }, + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "four"}, + }, + } + // Write the same request twice for different users. + req = mockWriteRequest(inputSeries, 1, 1, false) + ctx2 := user.InjectOrgID(context.Background(), "user2") + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + req = mockWriteRequest(inputSeries, 1, 1, false) + _, err = ds[0].Push(ctx2, req) + require.NoError(t, err) + ds[0].updateLabelSetMetrics() + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user2"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user2"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0 + `), "cortex_distributor_received_samples_per_labelset_total")) + + // Remove existing limits and add new limits + limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{ + {Hash: 3, LabelSet: labels.FromStrings("cluster", "three")}, + {Hash: 4, LabelSet: labels.FromStrings("cluster", "four")}, + {Hash: 2, LabelSet: labels.EmptyLabels()}, + } + ds[0].limits, err = validation.NewOverrides(limits, nil) + require.NoError(t, err) + ds[0].updateLabelSetMetrics() + // Old label set metrics are removed. New label set metrics will be added when + // new requests come in. + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0 + `), "cortex_distributor_received_samples_per_labelset_total")) + + // Metrics from `user` got removed but `user2` metric should remain. + ds[0].cleanupInactiveUser(id) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0 + `), "cortex_distributor_received_samples_per_labelset_total")) +} + func countMockIngestersCalls(ingesters []*mockIngester, name string) int { count := 0 for i := 0; i < len(ingesters); i++ { diff --git a/pkg/distributor/metrics.go b/pkg/distributor/metrics.go new file mode 100644 index 0000000000..df3c5ba49d --- /dev/null +++ b/pkg/distributor/metrics.go @@ -0,0 +1,105 @@ +package distributor + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/cortexproject/cortex/pkg/util" +) + +const ( + numMetricCounterShards = 128 +) + +type labelSetCounter struct { + shards []*labelSetCounterShard +} + +func newLabelSetCounter() *labelSetCounter { + shards := make([]*labelSetCounterShard, 0, numMetricCounterShards) + for i := 0; i < numMetricCounterShards; i++ { + shards = append(shards, &labelSetCounterShard{ + RWMutex: &sync.RWMutex{}, + valuesCounter: map[string]map[uint64]*samplesLabelSetEntry{}, + }) + } + return &labelSetCounter{shards: shards} +} + +type labelSetCounterShard struct { + *sync.RWMutex + valuesCounter map[string]map[uint64]*samplesLabelSetEntry +} + +type samplesLabelSetEntry struct { + floatSamples int64 + histogramSamples int64 + labels labels.Labels +} + +func (s *samplesLabelSetEntry) reset() { + s.floatSamples = 0 + s.histogramSamples = 0 +} + +func (m *labelSetCounter) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) { + s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricCounterShards] + s.Lock() + defer s.Unlock() + if userEntry, ok := s.valuesCounter[userId]; ok { + if e, ok2 := userEntry[hash]; ok2 { + e.floatSamples += floatSamples + e.histogramSamples += histogramSamples + } else { + userEntry[hash] = &samplesLabelSetEntry{ + floatSamples: floatSamples, + histogramSamples: histogramSamples, + labels: labelSet, + } + } + } else { + s.valuesCounter[userId] = map[uint64]*samplesLabelSetEntry{ + hash: { + floatSamples: floatSamples, + histogramSamples: histogramSamples, + labels: labelSet, + }, + } + } +} + +func (m *labelSetCounter) updateMetrics(userSet map[string]map[uint64]struct{}, receivedSamplesPerLabelSet *prometheus.CounterVec) { + for i := 0; i < numMetricCounterShards; i++ { + shard := m.shards[i] + shard.Lock() + + for user, userEntry := range shard.valuesCounter { + limits, ok := userSet[user] + if !ok { + // If user is removed, we will delete user metrics in cleanupInactiveUser loop + // so skip deleting metrics here. + delete(shard.valuesCounter, user) + continue + } + for h, entry := range userEntry { + labelSetStr := entry.labels.String() + // This limit no longer exists. + if _, ok := limits[h]; !ok { + delete(userEntry, h) + receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr) + receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr) + continue + } + receivedSamplesPerLabelSet.WithLabelValues(user, sampleMetricTypeFloat, labelSetStr).Add(float64(entry.floatSamples)) + receivedSamplesPerLabelSet.WithLabelValues(user, sampleMetricTypeHistogram, labelSetStr).Add(float64(entry.histogramSamples)) + // Reset entry counter to 0. Delete it only if it is removed from the limit. + entry.reset() + } + } + + shard.Unlock() + } +} diff --git a/pkg/distributor/metrics_test.go b/pkg/distributor/metrics_test.go new file mode 100644 index 0000000000..8b9ba64cb3 --- /dev/null +++ b/pkg/distributor/metrics_test.go @@ -0,0 +1,122 @@ +package distributor + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestLabelSetCounter(t *testing.T) { + counter := newLabelSetCounter() + + metricName := "cortex_distributor_received_samples_per_labelset_total" + reg := prometheus.NewPedanticRegistry() + dummyCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: metricName, + Help: "", + }, []string{"user", "type", "labelset"}) + reg.MustRegister(dummyCounter) + + userID := "1" + userID2 := "2" + userID3 := "3" + + counter.increaseSamplesLabelSet(userID, 0, labels.FromStrings("foo", "bar"), 10, 0) + counter.increaseSamplesLabelSet(userID, 1, labels.FromStrings("foo", "baz"), 0, 5) + counter.increaseSamplesLabelSet(userID, 3, labels.EmptyLabels(), 20, 20) + counter.increaseSamplesLabelSet(userID2, 0, labels.FromStrings("foo", "bar"), 100, 5) + counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) + + userSet := map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, + userID2: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, + } + counter.updateMetrics(userSet, dummyCounter) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 20 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 20 + `), metricName)) + + // Increment metrics and add a new user. + counter.increaseSamplesLabelSet(userID, 3, labels.EmptyLabels(), 20, 20) + counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) + counter.increaseSamplesLabelSet(userID2, 4, labels.FromStrings("cluster", "us-west-2"), 10, 10) + counter.increaseSamplesLabelSet(userID3, 4, labels.FromStrings("cluster", "us-east-1"), 30, 30) + userSet = map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, + userID2: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, + userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, + } + counter.updateMetrics(userSet, dummyCounter) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="histogram",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 + `), metricName)) + + // Remove user 2. But metrics for user 2 not cleaned up as it is expected to be cleaned up + // in cleanupInactiveUser loop. It is expected to have 3 minutes delay in this case. + delete(userSet, userID2) + counter.updateMetrics(userSet, dummyCounter) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="histogram",user="3"} 30 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 + cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 + `), metricName)) + + // Simulate existing limits removed for each user. + userSet = map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}}, + userID2: {}, + userID3: {}, + } + counter.updateMetrics(userSet, dummyCounter) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # TYPE cortex_distributor_received_samples_per_labelset_total counter + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 + cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 + `), metricName)) +} diff --git a/pkg/util/active_user.go b/pkg/util/active_user.go index f735dc2278..574fb2c8e1 100644 --- a/pkg/util/active_user.go +++ b/pkg/util/active_user.go @@ -94,6 +94,21 @@ func (m *ActiveUsers) PurgeInactiveUsers(deadline int64) []string { return inactive } +func (m *ActiveUsers) ActiveUsers(deadline int64) []string { + m.mu.RLock() + defer m.mu.RUnlock() + m.mu.RLock() + active := make([]string, 0, len(m.timestamps)) + + for userID, ts := range m.timestamps { + if ts.Load() > deadline { + active = append(active, userID) + } + } + m.mu.RUnlock() + return active +} + // ActiveUsersCleanupService tracks active users, and periodically purges inactive ones while running. type ActiveUsersCleanupService struct { services.Service @@ -129,3 +144,7 @@ func (s *ActiveUsersCleanupService) iteration(_ context.Context) error { } return nil } + +func (s *ActiveUsersCleanupService) ActiveUsers() []string { + return s.activeUsers.ActiveUsers(time.Now().Add(-s.inactiveTimeout).UnixNano()) +} diff --git a/pkg/util/active_user_test.go b/pkg/util/active_user_test.go index f2e50866fc..4db9e7b0cd 100644 --- a/pkg/util/active_user_test.go +++ b/pkg/util/active_user_test.go @@ -3,6 +3,7 @@ package util import ( "fmt" "runtime" + "sort" "strconv" "sync" "testing" @@ -28,6 +29,28 @@ func TestActiveUser(t *testing.T) { require.Equal(t, []string{"test1"}, as.PurgeInactiveUsers(20)) } +func TestActiveUser_ActiveUsers(t *testing.T) { + as := NewActiveUsers() + as.UpdateUserTimestamp("test1", 5) + as.UpdateUserTimestamp("test2", 10) + as.UpdateUserTimestamp("test3", 15) + + users := as.ActiveUsers(0) + sort.Strings(users) + require.Equal(t, []string{"test1", "test2", "test3"}, users) + + users = as.ActiveUsers(5) + sort.Strings(users) + require.Equal(t, []string{"test2", "test3"}, users) + + users = as.ActiveUsers(10) + sort.Strings(users) + require.Equal(t, []string{"test3"}, users) + + users = as.ActiveUsers(15) + require.Equal(t, []string{}, users) +} + func TestActiveUserConcurrentUpdateAndPurge(t *testing.T) { count := 10 From 24a62aed4d6af0f5f8761d154474b37362a01366 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 18 Dec 2024 08:07:04 -0800 Subject: [PATCH 2/6] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf025c719d..7da08bf6ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ * [ENHANCEMENT] Distributor: Add a new `-distributor.num-push-workers` flag to use a goroutine worker pool when sending data from distributor to ingesters. #6406 * [ENHANCEMENT] Ingester: If a limit per label set entry doesn't have any label, use it as the default partition to catch all series that doesn't match any other label sets entries. #6435 * [ENHANCEMENT] Querier: Add new `cortex_querier_codec_response_size` metric to track the size of the encoded query responses from queriers. #6444 +* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_samples_per_labelset_total` metric to calculate ingestion rate per label set. #6443 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 * [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326 * [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled. #6271 From 0cdf9e4e87c4bc4ec2fa8663ddba83bb06e8f637 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 18 Dec 2024 16:53:00 -0800 Subject: [PATCH 3/6] update metrics synchronously Signed-off-by: Ben Ye --- pkg/distributor/distributor.go | 11 ++-- pkg/distributor/distributor_test.go | 13 ----- pkg/distributor/metrics.go | 80 +++++++++++++---------------- pkg/distributor/metrics_test.go | 35 +++---------- 4 files changed, 49 insertions(+), 90 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d297612ffe..af370d818c 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -129,8 +129,8 @@ type Distributor struct { asyncExecutor util.AsyncExecutor - // Counter to track metrics per label set. - labelSetCounter *labelSetCounter + // Map to track label sets from user. + labelSetTracker *labelSetTracker } // Config contains the configuration required to @@ -296,7 +296,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), HATracker: haTracker, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), - labelSetCounter: newLabelSetCounter(), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -389,6 +388,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove asyncExecutor: util.NewNoOpExecutor(), } + d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet) + if cfg.NumPushWorkers > 0 { util_log.WarnExperimentalUse("Distributor: using goroutine worker pool") d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg) @@ -809,7 +810,7 @@ func (d *Distributor) updateLabelSetMetrics() { } } - d.labelSetCounter.updateMetrics(activeUserSet, d.receivedSamplesPerLabelSet) + d.labelSetTracker.updateMetrics(activeUserSet) } func (d *Distributor) cleanStaleIngesterMetrics() { @@ -1062,7 +1063,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write validatedExemplars += len(ts.Exemplars) } for h, counter := range labelSetCounters { - d.labelSetCounter.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples) + d.labelSetTracker.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples) } return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4becbcc89c..4ac187ffd7 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -4135,16 +4135,12 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) { _, err = ds[0].Push(ctx, req) require.NoError(t, err) - ds[0].updateLabelSetMetrics() require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_per_labelset_total counter cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 1 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 1 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0 `), "cortex_distributor_received_samples_per_labelset_total")) // Push more series. @@ -4166,20 +4162,14 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) { req = mockWriteRequest(inputSeries, 1, 1, false) _, err = ds[0].Push(ctx2, req) require.NoError(t, err) - ds[0].updateLabelSetMetrics() require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_per_labelset_total counter cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 2 cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user2"} 1 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0 - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user2"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0 `), "cortex_distributor_received_samples_per_labelset_total")) // Remove existing limits and add new limits @@ -4198,8 +4188,6 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) { # TYPE cortex_distributor_received_samples_per_labelset_total counter cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0 `), "cortex_distributor_received_samples_per_labelset_total")) // Metrics from `user` got removed but `user2` metric should remain. @@ -4208,7 +4196,6 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) { # HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_per_labelset_total counter cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1 - cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0 `), "cortex_distributor_received_samples_per_labelset_total")) } diff --git a/pkg/distributor/metrics.go b/pkg/distributor/metrics.go index df3c5ba49d..786ab954c2 100644 --- a/pkg/distributor/metrics.go +++ b/pkg/distributor/metrics.go @@ -11,27 +11,29 @@ import ( ) const ( - numMetricCounterShards = 128 + numMetricShards = 128 ) -type labelSetCounter struct { +type labelSetTracker struct { + receivedSamplesPerLabelSet *prometheus.CounterVec + shards []*labelSetCounterShard } -func newLabelSetCounter() *labelSetCounter { - shards := make([]*labelSetCounterShard, 0, numMetricCounterShards) - for i := 0; i < numMetricCounterShards; i++ { +func newLabelSetTracker(receivedSamplesPerLabelSet *prometheus.CounterVec) *labelSetTracker { + shards := make([]*labelSetCounterShard, 0, numMetricShards) + for i := 0; i < numMetricShards; i++ { shards = append(shards, &labelSetCounterShard{ RWMutex: &sync.RWMutex{}, - valuesCounter: map[string]map[uint64]*samplesLabelSetEntry{}, + userLabelSets: map[string]map[uint64]labels.Labels{}, }) } - return &labelSetCounter{shards: shards} + return &labelSetTracker{shards: shards, receivedSamplesPerLabelSet: receivedSamplesPerLabelSet} } type labelSetCounterShard struct { *sync.RWMutex - valuesCounter map[string]map[uint64]*samplesLabelSetEntry + userLabelSets map[string]map[uint64]labels.Labels } type samplesLabelSetEntry struct { @@ -40,63 +42,51 @@ type samplesLabelSetEntry struct { labels labels.Labels } -func (s *samplesLabelSetEntry) reset() { - s.floatSamples = 0 - s.histogramSamples = 0 -} - -func (m *labelSetCounter) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) { - s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricCounterShards] +func (m *labelSetTracker) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) { + s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards] s.Lock() - defer s.Unlock() - if userEntry, ok := s.valuesCounter[userId]; ok { - if e, ok2 := userEntry[hash]; ok2 { - e.floatSamples += floatSamples - e.histogramSamples += histogramSamples - } else { - userEntry[hash] = &samplesLabelSetEntry{ - floatSamples: floatSamples, - histogramSamples: histogramSamples, - labels: labelSet, - } + if userEntry, ok := s.userLabelSets[userId]; ok { + if _, ok2 := userEntry[hash]; !ok2 { + userEntry[hash] = labelSet } } else { - s.valuesCounter[userId] = map[uint64]*samplesLabelSetEntry{ - hash: { - floatSamples: floatSamples, - histogramSamples: histogramSamples, - labels: labelSet, - }, - } + s.userLabelSets[userId] = map[uint64]labels.Labels{hash: labelSet} + } + // Unlock before we update metrics. + s.Unlock() + + labelSetStr := labelSet.String() + if floatSamples > 0 { + m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeFloat, labelSetStr).Add(float64(floatSamples)) + } + if histogramSamples > 0 { + m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeHistogram, labelSetStr).Add(float64(histogramSamples)) } } -func (m *labelSetCounter) updateMetrics(userSet map[string]map[uint64]struct{}, receivedSamplesPerLabelSet *prometheus.CounterVec) { - for i := 0; i < numMetricCounterShards; i++ { +// Clean up dangling user and label set from the tracker as well as metrics. +func (m *labelSetTracker) updateMetrics(userSet map[string]map[uint64]struct{}) { + for i := 0; i < numMetricShards; i++ { shard := m.shards[i] shard.Lock() - for user, userEntry := range shard.valuesCounter { + for user, userEntry := range shard.userLabelSets { limits, ok := userSet[user] if !ok { // If user is removed, we will delete user metrics in cleanupInactiveUser loop // so skip deleting metrics here. - delete(shard.valuesCounter, user) + delete(shard.userLabelSets, user) continue } - for h, entry := range userEntry { - labelSetStr := entry.labels.String() + for h, lbls := range userEntry { // This limit no longer exists. if _, ok := limits[h]; !ok { delete(userEntry, h) - receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr) - receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr) + labelSetStr := lbls.String() + m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr) + m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr) continue } - receivedSamplesPerLabelSet.WithLabelValues(user, sampleMetricTypeFloat, labelSetStr).Add(float64(entry.floatSamples)) - receivedSamplesPerLabelSet.WithLabelValues(user, sampleMetricTypeHistogram, labelSetStr).Add(float64(entry.histogramSamples)) - // Reset entry counter to 0. Delete it only if it is removed from the limit. - entry.reset() } } diff --git a/pkg/distributor/metrics_test.go b/pkg/distributor/metrics_test.go index 8b9ba64cb3..842e4fe6c3 100644 --- a/pkg/distributor/metrics_test.go +++ b/pkg/distributor/metrics_test.go @@ -11,14 +11,13 @@ import ( ) func TestLabelSetCounter(t *testing.T) { - counter := newLabelSetCounter() - metricName := "cortex_distributor_received_samples_per_labelset_total" reg := prometheus.NewPedanticRegistry() dummyCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: metricName, Help: "", }, []string{"user", "type", "labelset"}) + counter := newLabelSetTracker(dummyCounter) reg.MustRegister(dummyCounter) userID := "1" @@ -31,22 +30,12 @@ func TestLabelSetCounter(t *testing.T) { counter.increaseSamplesLabelSet(userID2, 0, labels.FromStrings("foo", "bar"), 100, 5) counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) - userSet := map[string]map[uint64]struct { - }{ - userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, - userID2: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, - } - counter.updateMetrics(userSet, dummyCounter) - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # TYPE cortex_distributor_received_samples_per_labelset_total counter - cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 100 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 20 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 20 @@ -57,13 +46,6 @@ func TestLabelSetCounter(t *testing.T) { counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100) counter.increaseSamplesLabelSet(userID2, 4, labels.FromStrings("cluster", "us-west-2"), 10, 10) counter.increaseSamplesLabelSet(userID3, 4, labels.FromStrings("cluster", "us-east-1"), 30, 30) - userSet = map[string]map[uint64]struct { - }{ - userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, - userID2: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, - userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, - } - counter.updateMetrics(userSet, dummyCounter) require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # TYPE cortex_distributor_received_samples_per_labelset_total counter @@ -73,9 +55,7 @@ func TestLabelSetCounter(t *testing.T) { cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 @@ -83,8 +63,12 @@ func TestLabelSetCounter(t *testing.T) { // Remove user 2. But metrics for user 2 not cleaned up as it is expected to be cleaned up // in cleanupInactiveUser loop. It is expected to have 3 minutes delay in this case. - delete(userSet, userID2) - counter.updateMetrics(userSet, dummyCounter) + userSet := map[string]map[uint64]struct { + }{ + userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}}, + userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}}, + } + counter.updateMetrics(userSet) require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # TYPE cortex_distributor_received_samples_per_labelset_total counter cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30 @@ -93,9 +77,7 @@ func TestLabelSetCounter(t *testing.T) { cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40 cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40 @@ -108,7 +90,7 @@ func TestLabelSetCounter(t *testing.T) { userID2: {}, userID3: {}, } - counter.updateMetrics(userSet, dummyCounter) + counter.updateMetrics(userSet) require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # TYPE cortex_distributor_received_samples_per_labelset_total counter @@ -116,7 +98,6 @@ func TestLabelSetCounter(t *testing.T) { cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100 - cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0 cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5 `), metricName)) } From a51c6434c7a0f3a74e8aa07558b25fbc13d87e05 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 18 Dec 2024 16:56:31 -0800 Subject: [PATCH 4/6] allocate label set counter only if there are matching limtis for the series Signed-off-by: Ben Ye --- pkg/distributor/distributor.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index af370d818c..1a48de56cb 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -926,7 +926,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write validatedExemplars := 0 limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID) - labelSetCounters := make(map[uint64]*samplesLabelSetEntry) + var labelSetCounters map[uint64]*samplesLabelSetEntry var firstPartialErr error latestSampleTimestampMs := int64(0) @@ -1043,7 +1043,11 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write continue } - for _, l := range validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels)) { + matchedLabelSetLimits := validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels)) + if len(matchedLabelSetLimits) > 0 { + labelSetCounters = make(map[uint64]*samplesLabelSetEntry, len(matchedLabelSetLimits)) + } + for _, l := range matchedLabelSetLimits { if c, exists := labelSetCounters[l.Hash]; exists { c.floatSamples += int64(len(ts.Samples)) c.histogramSamples += int64(len(ts.Histograms)) From e27729dc00f46f46c01179c204aa0a10cac54e3c Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 18 Dec 2024 16:58:42 -0800 Subject: [PATCH 5/6] fix counter initialization Signed-off-by: Ben Ye --- pkg/distributor/distributor.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 1a48de56cb..eb25969978 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -926,8 +926,10 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write validatedExemplars := 0 limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID) - var labelSetCounters map[uint64]*samplesLabelSetEntry - var firstPartialErr error + var ( + labelSetCounters map[uint64]*samplesLabelSetEntry + firstPartialErr error + ) latestSampleTimestampMs := int64(0) defer func() { @@ -1044,7 +1046,8 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write } matchedLabelSetLimits := validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels)) - if len(matchedLabelSetLimits) > 0 { + if len(matchedLabelSetLimits) > 0 && labelSetCounters == nil { + // TODO: use pool. labelSetCounters = make(map[uint64]*samplesLabelSetEntry, len(matchedLabelSetLimits)) } for _, l := range matchedLabelSetLimits { From a24cc6e41dcfcbb27e258291c02c96886af58d01 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 18 Dec 2024 17:00:26 -0800 Subject: [PATCH 6/6] fix lock on active users Signed-off-by: Ben Ye --- pkg/util/active_user.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/util/active_user.go b/pkg/util/active_user.go index 574fb2c8e1..fb2165904a 100644 --- a/pkg/util/active_user.go +++ b/pkg/util/active_user.go @@ -95,17 +95,14 @@ func (m *ActiveUsers) PurgeInactiveUsers(deadline int64) []string { } func (m *ActiveUsers) ActiveUsers(deadline int64) []string { + active := make([]string, 0, len(m.timestamps)) m.mu.RLock() defer m.mu.RUnlock() - m.mu.RLock() - active := make([]string, 0, len(m.timestamps)) - for userID, ts := range m.timestamps { if ts.Load() > deadline { active = append(active, userID) } } - m.mu.RUnlock() return active }