Skip to content

Commit 69eb3fb

Browse files
committed
Add native histogram for tracking push requests size
Signed-off-by: SungJin1212 <[email protected]>
1 parent 81321dc commit 69eb3fb

File tree

11 files changed

+147
-28
lines changed

11 files changed

+147
-28
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [FEATURE] Ruler: Add support for per-user external labels #6340
2121
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
2222
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
23+
* [ENHANCEMENT] Distributor: Add a `cortex_distributor_push_requests_uncompressed_size_bytes` native histogram to track uncompressed push requests in bytes for tenant and format. #6384
2324
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
2425
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
2526
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333

Diff for: pkg/api/api.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
277277
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
278278
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
279279

280-
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
281-
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
280+
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
281+
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
282282

283283
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
284284
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
@@ -289,7 +289,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
289289
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")
290290

291291
// Legacy Routes
292-
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
292+
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
293293
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
294294
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
295295
}
@@ -322,12 +322,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
322322
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
323323
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
324324
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
325-
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
325+
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
326326

327327
// Legacy Routes
328328
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
329329
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
330-
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
330+
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
331331
}
332332

333333
func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {

Diff for: pkg/distributor/distributor.go

+34-2
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ type Distributor struct {
125125
validateMetrics *validation.ValidateMetrics
126126

127127
asyncExecutor util.AsyncExecutor
128+
129+
// metrics passed to push handler
130+
PushHandlerMetrics *PushHandlerMetrics
128131
}
129132

130133
// Config contains the configuration required to
@@ -174,6 +177,32 @@ type Config struct {
174177
OTLPConfig OTLPConfig `yaml:"otlp"`
175178
}
176179

180+
type PushHandlerMetrics struct {
181+
pushRequestSizeBytes *prometheus.HistogramVec
182+
}
183+
184+
func NewPushHandlerMetrics(reg prometheus.Registerer) *PushHandlerMetrics {
185+
return &PushHandlerMetrics{
186+
pushRequestSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
187+
Name: "cortex_distributor_push_requests_uncompressed_size_bytes",
188+
Help: "Histogram of push request's uncompressed size in bytes",
189+
NativeHistogramBucketFactor: 1.1,
190+
NativeHistogramMinResetDuration: 1 * time.Hour,
191+
NativeHistogramMaxBucketNumber: 100,
192+
}, []string{"user", "format"}),
193+
}
194+
}
195+
196+
func (m *PushHandlerMetrics) ObservePushRequestSize(user, format string, size float64) {
197+
if m != nil {
198+
m.pushRequestSizeBytes.WithLabelValues(user, format).Observe(size)
199+
}
200+
}
201+
202+
func (m *PushHandlerMetrics) deleteUserMetrics(user string) {
203+
m.pushRequestSizeBytes.DeleteLabelValues(user)
204+
}
205+
177206
type InstanceLimits struct {
178207
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
179208
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
@@ -373,8 +402,9 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
373402
Help: "Unix timestamp of latest received sample per user.",
374403
}, []string{"user"}),
375404

376-
validateMetrics: validation.NewValidateMetrics(reg),
377-
asyncExecutor: util.NewNoOpExecutor(),
405+
validateMetrics: validation.NewValidateMetrics(reg),
406+
PushHandlerMetrics: NewPushHandlerMetrics(reg),
407+
asyncExecutor: util.NewNoOpExecutor(),
378408
}
379409

380410
if cfg.NumPushWorkers > 0 {
@@ -482,6 +512,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
482512
d.nonHASamples.DeleteLabelValues(userID)
483513
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)
484514

515+
d.PushHandlerMetrics.deleteUserMetrics(userID)
516+
485517
if err := util.DeleteMatchingLabels(d.dedupedSamples, map[string]string{"user": userID}); err != nil {
486518
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
487519
}

Diff for: pkg/ingester/client/client_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ func TestMarshall(t *testing.T) {
4343
plentySize = 1024 * 1024
4444
)
4545
req := cortexpb.WriteRequest{}
46-
err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
46+
_, err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
4747
require.Error(t, err)
48-
err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
48+
_, err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
4949
require.NoError(t, err)
5050
require.Equal(t, numSeries, len(req.Timeseries))
5151
}

Diff for: pkg/querier/remote_read.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler {
2121
ctx := r.Context()
2222
var req client.ReadRequest
2323
logger := util_log.WithContext(r.Context(), logger)
24-
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
24+
if _, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
2525
level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error())
2626
http.Error(w, err.Error(), http.StatusBadRequest)
2727
return

Diff for: pkg/util/http.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,14 @@ const (
148148
)
149149

150150
// ParseProtoReader parses a compressed proto from an io.Reader.
151-
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error {
151+
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) (int, error) {
152152
sp := opentracing.SpanFromContext(ctx)
153153
if sp != nil {
154154
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
155155
}
156156
body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp)
157157
if err != nil {
158-
return err
158+
return 0, err
159159
}
160160

161161
if sp != nil {
@@ -171,10 +171,10 @@ func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSi
171171
err = proto.NewBuffer(body).Unmarshal(req)
172172
}
173173
if err != nil {
174-
return err
174+
return 0, err
175175
}
176176

177-
return nil
177+
return len(body), nil
178178
}
179179

180180
func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) {

Diff for: pkg/util/http_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func TestParseProtoReader(t *testing.T) {
193193
reader = bytesBuffered{Buffer: &buf}
194194
}
195195

196-
err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
196+
_, err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
197197
if tt.expectErr {
198198
assert.NotNil(t, err)
199199
return

Diff for: pkg/util/push/otlp.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const (
3434
)
3535

3636
// OTLPHandler is a http.Handler which accepts OTLP metrics.
37-
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
37+
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func, metrics *distributor.PushHandlerMetrics) http.Handler {
3838
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3939
ctx := r.Context()
4040
logger := util_log.WithContext(ctx, util_log.Logger)
@@ -51,7 +51,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
5151
return
5252
}
5353

54-
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize)
54+
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize, userID, metrics)
5555
if err != nil {
5656
level.Error(logger).Log("err", err.Error())
5757
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -99,7 +99,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
9999
})
100100
}
101101

102-
func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
102+
func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int, userID string, metrics *distributor.PushHandlerMetrics) (pmetricotlp.ExportRequest, error) {
103103
expectedSize := int(r.ContentLength)
104104
if expectedSize > maxSize {
105105
return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize)
@@ -124,7 +124,17 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
124124
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
125125
req := pmetricotlp.NewExportRequest()
126126
otlpReqProto := otlpProtoMessage{req: &req}
127-
return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
127+
128+
bodySize, err := util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
129+
if err != nil {
130+
return req, err
131+
}
132+
133+
if metrics != nil {
134+
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
135+
}
136+
137+
return req, nil
128138
}
129139
case jsonContentType:
130140
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
@@ -143,11 +153,15 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
143153
if expectedSize > 0 {
144154
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
145155
}
146-
_, err := buf.ReadFrom(reader)
156+
bodySize, err := buf.ReadFrom(reader)
147157
if err != nil {
148158
return req, err
149159
}
150160

161+
if metrics != nil {
162+
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
163+
}
164+
151165
return req, req.UnmarshalJSON(buf.Bytes())
152166
}
153167
default:

Diff for: pkg/util/push/otlp_test.go

+39-2
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ import (
77
"io"
88
"net/http"
99
"net/http/httptest"
10+
"strings"
1011
"testing"
1112
"time"
1213

1314
"github.com./go-kit/log"
15+
"github.com./prometheus/client_golang/prometheus"
16+
"github.com./prometheus/client_golang/prometheus/testutil"
1417
"github.com./prometheus/prometheus/prompb"
1518
"github.com./stretchr/testify/assert"
1619
"github.com./stretchr/testify/require"
@@ -305,7 +308,7 @@ func BenchmarkOTLPWriteHandler(b *testing.B) {
305308
mockPushFunc := func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
306309
return &cortexpb.WriteResponse{}, nil
307310
}
308-
handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc)
311+
handler := OTLPHandler(10000, overrides, cfg, nil, mockPushFunc, nil)
309312

310313
b.Run("json with no compression", func(b *testing.B) {
311314
req, err := getOTLPHttpRequest(&exportRequest, jsonContentType, "")
@@ -384,32 +387,61 @@ func TestOTLPWriteHandler(t *testing.T) {
384387
expectedStatusCode int
385388
expectedErrMsg string
386389
encodingType string
390+
expectedMetrics string
387391
}{
388392
{
389393
description: "Test proto format write with no compression",
390394
maxRecvMsgSize: 10000,
391395
contentType: pbContentType,
392396
expectedStatusCode: http.StatusOK,
397+
expectedMetrics: `
398+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
399+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
400+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
401+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665
402+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
403+
`,
393404
},
394405
{
395406
description: "Test proto format write with gzip",
396407
maxRecvMsgSize: 10000,
397408
contentType: pbContentType,
398409
expectedStatusCode: http.StatusOK,
399410
encodingType: "gzip",
411+
expectedMetrics: `
412+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
413+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
414+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
415+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665
416+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
417+
`,
400418
},
401419
{
402420
description: "Test json format write with no compression",
403421
maxRecvMsgSize: 10000,
404422
contentType: jsonContentType,
405423
expectedStatusCode: http.StatusOK,
424+
expectedMetrics: `
425+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
426+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
427+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
428+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568
429+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
430+
`,
406431
},
407432
{
408433
description: "Test json format write with gzip",
409434
maxRecvMsgSize: 10000,
410435
contentType: jsonContentType,
411436
expectedStatusCode: http.StatusOK,
412437
encodingType: "gzip",
438+
expectedMetrics: `
439+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
440+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
441+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
442+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568
443+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
444+
`,
413445
},
414446
{
415447
description: "request too big than maxRecvMsgSize (proto) with no compression",
@@ -458,14 +490,19 @@ func TestOTLPWriteHandler(t *testing.T) {
458490
push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
459491
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
460492
require.NoError(t, err)
461-
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push)
493+
reg := prometheus.NewRegistry()
494+
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push, distributor.NewPushHandlerMetrics(reg))
462495

463496
recorder := httptest.NewRecorder()
464497
handler.ServeHTTP(recorder, req)
465498

466499
resp := recorder.Result()
467500
require.Equal(t, test.expectedStatusCode, resp.StatusCode)
468501

502+
if test.expectedMetrics != "" {
503+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_distributor_push_requests"))
504+
}
505+
469506
if test.expectedErrMsg != "" {
470507
b, err := io.ReadAll(resp.Body)
471508
require.NoError(t, err)

Diff for: pkg/util/push/push.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@ import (
99
"github.com./weaveworks/common/middleware"
1010

1111
"github.com./cortexproject/cortex/pkg/cortexpb"
12+
"github.com./cortexproject/cortex/pkg/distributor"
13+
"github.com./cortexproject/cortex/pkg/tenant"
1214
"github.com./cortexproject/cortex/pkg/util"
1315
"github.com./cortexproject/cortex/pkg/util/log"
1416
)
1517

18+
const (
19+
formatRemoteWrite1 = "prw1"
20+
formatOTLP = "otlp"
21+
)
22+
1623
// Func defines the type of the push. It is similar to http.HandlerFunc.
1724
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
1825

1926
// Handler is a http.Handler which accepts WriteRequests.
20-
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
27+
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func, pushMetrics *distributor.PushHandlerMetrics) http.Handler {
2128
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2229
ctx := r.Context()
2330
logger := log.WithContext(ctx, log.Logger)
@@ -28,14 +35,24 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
2835
logger = log.WithSourceIPs(source, logger)
2936
}
3037
}
38+
39+
userID, err := tenant.TenantID(ctx)
40+
if err != nil {
41+
return
42+
}
43+
3144
var req cortexpb.PreallocWriteRequest
32-
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
45+
bodySize, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
3346
if err != nil {
3447
level.Error(logger).Log("err", err.Error())
3548
http.Error(w, err.Error(), http.StatusBadRequest)
3649
return
3750
}
3851

52+
if pushMetrics != nil {
53+
pushMetrics.ObservePushRequestSize(userID, formatRemoteWrite1, float64(bodySize))
54+
}
55+
3956
req.SkipLabelNameValidation = false
4057
if req.Source == 0 {
4158
req.Source = cortexpb.API

0 commit comments

Comments
 (0)