Skip to content

Commit d562e33

Browse files
committed
Support remote write v2 by converting request
Signed-off-by: SungJin1212 <[email protected]>
1 parent a2055f0 commit d562e33

18 files changed

+1359
-115
lines changed

.github/workflows/test-build-deploy.yml

+1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ jobs:
162162
- integration_querier
163163
- integration_ruler
164164
- integration_query_fuzz
165+
- integration_remote_write_v2
165166
steps:
166167
- name: Upgrade golang
167168
uses: actions/setup-go@f111f3307d8850f501ac008e886eec1fd1932a34 # v5.3.0

.golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ run:
5050
- integration_querier
5151
- integration_ruler
5252
- integration_query_fuzz
53+
- integration_remote_write_v2

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
66
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
77
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
8+
* [FEATURE] Support Prometheus remote write 2.0. #6330
89
* [FEATURE] Ingester: Add a `-ingester.enable-ooo-native-histograms` flag to enable out-of-order native histogram ingestion per tenant. It only takes effect when `-blocks-storage.tsdb.enable-native-histograms=true` and `-ingester.out-of-order-time-window` > 0. It is applied after the restart if it is changed at runtime through the runtime config. #6626
910
* [ENHANCEMENT] Alertmanager: Add new limits `-alertmanager.max-silences-count` and `-alertmanager.max-silences-size-bytes` for limiting silences per tenant. #6605
1011
* [ENHANCEMENT] Update prometheus version to v3.1.0. #6583

docs/configuration/config-file-reference.md

+5
Original file line numberDiff line numberDiff line change
@@ -2688,6 +2688,11 @@ ha_tracker:
26882688
# CLI flag: -distributor.sign-write-requests
26892689
[sign_write_requests: <boolean> | default = false]
26902690
2691+
# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
2692+
# request.
2693+
# CLI flag: -distributor.remote-write2-enabled
2694+
[remote_write2_enabled: <boolean> | default = false]
2695+
26912696
ring:
26922697
kvstore:
26932698
# Backend storage to use for the ring. Supported values are: consul, etcd,

docs/configuration/v1-guarantees.md

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ Currently experimental features are:
5959
- Distributor:
6060
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
6161
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
62+
- Accept Prometheus remote write 2.0 request (`-distributor.remote-write2-enabled=true`)
6263
- Tenant Deletion in Purger, for blocks storage.
6364
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
6465
- Blocks storage bucket index

integration/e2e/util.go

+76
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com./prometheus/prometheus/model/histogram"
2020
"github.com./prometheus/prometheus/model/labels"
2121
"github.com./prometheus/prometheus/prompb"
22+
writev2 "github.com./prometheus/prometheus/prompb/io/prometheus/write/v2"
2223
"github.com./prometheus/prometheus/storage"
2324
"github.com./prometheus/prometheus/tsdb"
2425
"github.com./prometheus/prometheus/tsdb/tsdbutil"
@@ -336,3 +337,78 @@ func CreateBlock(
336337

337338
return id, nil
338339
}
340+
341+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
342+
tsMillis := TimeToMilliseconds(ts)
343+
344+
st := writev2.NewSymbolTable()
345+
346+
lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
347+
for _, lbl := range additionalLabels {
348+
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
349+
}
350+
351+
var (
352+
h *histogram.Histogram
353+
fh *histogram.FloatHistogram
354+
ph writev2.Histogram
355+
)
356+
if floatHistogram {
357+
fh = tsdbutil.GenerateTestFloatHistogram(int64(i))
358+
ph = writev2.FromFloatHistogram(tsMillis, fh)
359+
} else {
360+
h = tsdbutil.GenerateTestHistogram(int64(i))
361+
ph = writev2.FromIntHistogram(tsMillis, h)
362+
}
363+
364+
// Generate the series
365+
series = append(series, writev2.TimeSeries{
366+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
367+
Histograms: []writev2.Histogram{ph},
368+
})
369+
370+
symbols = st.Symbols()
371+
372+
return
373+
}
374+
375+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
376+
tsMillis := TimeToMilliseconds(ts)
377+
value := rand.Float64()
378+
379+
st := writev2.NewSymbolTable()
380+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
381+
382+
for _, label := range additionalLabels {
383+
lbs = append(lbs, labels.Label{
384+
Name: label.Name,
385+
Value: label.Value,
386+
})
387+
}
388+
series = append(series, writev2.TimeSeries{
389+
// Generate the series
390+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
391+
Samples: []writev2.Sample{
392+
{Value: value, Timestamp: tsMillis},
393+
},
394+
Metadata: writev2.Metadata{
395+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
396+
},
397+
})
398+
symbols = st.Symbols()
399+
400+
// Generate the expected vector when querying it
401+
metric := model.Metric{}
402+
metric[labels.MetricName] = model.LabelValue(name)
403+
for _, lbl := range additionalLabels {
404+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
405+
}
406+
407+
vector = append(vector, &model.Sample{
408+
Metric: metric,
409+
Value: model.SampleValue(value),
410+
Timestamp: model.Time(tsMillis),
411+
})
412+
413+
return
414+
}

integration/e2ecortex/client.go

+34
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com./prometheus/prometheus/model/labels"
2525
"github.com./prometheus/prometheus/model/rulefmt"
2626
"github.com./prometheus/prometheus/prompb"
27+
writev2 "github.com./prometheus/prometheus/prompb/io/prometheus/write/v2"
2728
"github.com./prometheus/prometheus/storage"
2829
"github.com./prometheus/prometheus/storage/remote"
2930
yaml "gopkg.in/yaml.v3"
@@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
147148
return res, nil
148149
}
149150

151+
// PushV2 the input timeseries to the remote endpoint
152+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
153+
// Create write request
154+
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
// Create HTTP request
160+
compressed := snappy.Encode(nil, data)
161+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
162+
if err != nil {
163+
return nil, err
164+
}
165+
166+
req.Header.Add("Content-Encoding", "snappy")
167+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
168+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
169+
req.Header.Set("X-Scope-OrgID", c.orgID)
170+
171+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
172+
defer cancel()
173+
174+
// Execute HTTP request
175+
res, err := c.httpClient.Do(req.WithContext(ctx))
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
defer res.Body.Close()
181+
return res, nil
182+
}
183+
150184
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
151185
var metricName string
152186
attributes := make(map[string]any)

0 commit comments

Comments
 (0)