Skip to content

Commit 521ece7

Browse files
committed
Support remote write v2 by converting request
Signed-off-by: SungJin1212 <[email protected]>
1 parent b48f93b commit 521ece7

18 files changed

+1365
-115
lines changed

.github/workflows/test-build-deploy.yml

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ jobs:
162162
- integration_querier
163163
- integration_ruler
164164
- integration_query_fuzz
165+
- integration_remote_write_v2
165166
steps:
166167
- name: Upgrade golang
167168
uses: actions/setup-go@f111f3307d8850f501ac008e886eec1fd1932a34 # v5.3.0

.golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ run:
4949
- integration_querier
5050
- integration_ruler
5151
- integration_query_fuzz
52+
- integration_remote_write_v2

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44

55
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
6+
* [FEATURE] Support Prometheus remote write 2.0. #6330
67
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
78

89
## 1.19.0 in progress

docs/configuration/config-file-reference.md

+5
Original file line numberDiff line numberDiff line change
@@ -2669,6 +2669,11 @@ ha_tracker:
26692669
# CLI flag: -distributor.sign-write-requests
26702670
[sign_write_requests: <boolean> | default = false]
26712671
2672+
# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
2673+
# request.
2674+
# CLI flag: -distributor.remote-write2-enabled
2675+
[remote_write2_enabled: <boolean> | default = false]
2676+
26722677
ring:
26732678
kvstore:
26742679
# Backend storage to use for the ring. Supported values are: consul, etcd,

docs/configuration/v1-guarantees.md

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ Currently experimental features are:
5959
- Distributor:
6060
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
6161
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
62+
- Accept Prometheus remote write 2.0 request (`-distributor.remote-write2-enabled=true`)
6263
- Tenant Deletion in Purger, for blocks storage.
6364
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
6465
- Blocks storage bucket index

integration/e2e/util.go

+76
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com./prometheus/prometheus/model/histogram"
1919
"github.com./prometheus/prometheus/model/labels"
2020
"github.com./prometheus/prometheus/prompb"
21+
writev2 "github.com./prometheus/prometheus/prompb/io/prometheus/write/v2"
2122
"github.com./prometheus/prometheus/storage"
2223
"github.com./prometheus/prometheus/tsdb"
2324
"github.com./prometheus/prometheus/tsdb/tsdbutil"
@@ -334,3 +335,78 @@ func CreateBlock(
334335

335336
return id, nil
336337
}
338+
339+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
340+
tsMillis := TimeToMilliseconds(ts)
341+
342+
st := writev2.NewSymbolTable()
343+
344+
lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
345+
for _, lbl := range additionalLabels {
346+
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
347+
}
348+
349+
var (
350+
h *histogram.Histogram
351+
fh *histogram.FloatHistogram
352+
ph writev2.Histogram
353+
)
354+
if floatHistogram {
355+
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
356+
ph = writev2.FromFloatHistogram(tsMillis, fh)
357+
} else {
358+
h = tsdbutil.GenerateTestHistogram(int(i))
359+
ph = writev2.FromIntHistogram(tsMillis, h)
360+
}
361+
362+
// Generate the series
363+
series = append(series, writev2.TimeSeries{
364+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
365+
Histograms: []writev2.Histogram{ph},
366+
})
367+
368+
symbols = st.Symbols()
369+
370+
return
371+
}
372+
373+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
374+
tsMillis := TimeToMilliseconds(ts)
375+
value := rand.Float64()
376+
377+
st := writev2.NewSymbolTable()
378+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
379+
380+
for _, label := range additionalLabels {
381+
lbs = append(lbs, labels.Label{
382+
Name: label.Name,
383+
Value: label.Value,
384+
})
385+
}
386+
series = append(series, writev2.TimeSeries{
387+
// Generate the series
388+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
389+
Samples: []writev2.Sample{
390+
{Value: value, Timestamp: tsMillis},
391+
},
392+
Metadata: writev2.Metadata{
393+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
394+
},
395+
})
396+
symbols = st.Symbols()
397+
398+
// Generate the expected vector when querying it
399+
metric := model.Metric{}
400+
metric[labels.MetricName] = model.LabelValue(name)
401+
for _, lbl := range additionalLabels {
402+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
403+
}
404+
405+
vector = append(vector, &model.Sample{
406+
Metric: metric,
407+
Value: model.SampleValue(value),
408+
Timestamp: model.Time(tsMillis),
409+
})
410+
411+
return
412+
}

integration/e2ecortex/client.go

+40
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com./prometheus/prometheus/model/labels"
2525
"github.com./prometheus/prometheus/model/rulefmt"
2626
"github.com./prometheus/prometheus/prompb"
27+
writev2 "github.com./prometheus/prometheus/prompb/io/prometheus/write/v2"
2728
"github.com./prometheus/prometheus/storage"
2829
"github.com./prometheus/prometheus/storage/remote"
2930
yaml "gopkg.in/yaml.v3"
@@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
147148
return res, nil
148149
}
149150

151+
// PushV2 the input timeseries to the remote endpoint
152+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
153+
// Create write request
154+
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
// Create HTTP request
160+
compressed := snappy.Encode(nil, data)
161+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
162+
if err != nil {
163+
return nil, err
164+
}
165+
166+
req.Header.Add("Content-Encoding", "snappy")
167+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
168+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
169+
req.Header.Set("X-Scope-OrgID", c.orgID)
170+
171+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
172+
defer cancel()
173+
174+
// Execute HTTP request
175+
res, err := c.httpClient.Do(req.WithContext(ctx))
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
defer res.Body.Close()
181+
return res, nil
182+
}
183+
150184
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
151185
var metricName string
152186
attributes := make(map[string]any)
@@ -356,6 +390,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
356390
return value, err
357391
}
358392

393+
// Metadata runs a metadata query
394+
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
395+
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
396+
return metadata, err
397+
}
398+
359399
// QueryExemplars runs an exemplars query
360400
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
361401
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)

0 commit comments

Comments
 (0)