Skip to content

Commit b0c7c06

Browse files
committed
Inital Commit - Skeleton
Signed-off-by: alanprot <[email protected]>
1 parent 0e85ae0 commit b0c7c06

File tree

8 files changed

+504
-11
lines changed

8 files changed

+504
-11
lines changed

Diff for: docs/configuration/config-file-reference.md

+98
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,96 @@ api:
152152
# The compactor_config configures the compactor for the blocks storage.
153153
[compactor: <compactor_config>]
154154

155+
parquet_converter:
156+
# Comma separated list of tenants that can be converted. If specified, only
157+
# these tenants will be converted, otherwise all tenants can be converted.
158+
# CLI flag: -parquet-converter.enabled-tenants
159+
[enabled_tenants: <string> | default = ""]
160+
161+
# Comma separated list of tenants that cannot converted.
162+
# CLI flag: -parquet-converter.disabled-tenants
163+
[disabled_tenants: <string> | default = ""]
164+
165+
ring:
166+
kvstore:
167+
# Backend storage to use for the ring. Supported values are: consul, etcd,
168+
# inmemory, memberlist, multi.
169+
# CLI flag: -parquet-converter.ring.store
170+
[store: <string> | default = "consul"]
171+
172+
# The prefix for the keys in the store. Should end with a /.
173+
# CLI flag: -parquet-converter.ring.prefix
174+
[prefix: <string> | default = "collectors/"]
175+
176+
dynamodb:
177+
# Region to access dynamodb.
178+
# CLI flag: -parquet-converter.ring.dynamodb.region
179+
[region: <string> | default = ""]
180+
181+
# Table name to use on dynamodb.
182+
# CLI flag: -parquet-converter.ring.dynamodb.table-name
183+
[table_name: <string> | default = ""]
184+
185+
# Time to expire items on dynamodb.
186+
# CLI flag: -parquet-converter.ring.dynamodb.ttl-time
187+
[ttl: <duration> | default = 0s]
188+
189+
# Time to refresh local ring with information on dynamodb.
190+
# CLI flag: -parquet-converter.ring.dynamodb.puller-sync-time
191+
[puller_sync_time: <duration> | default = 1m]
192+
193+
# Maximum number of retries for DDB KV CAS.
194+
# CLI flag: -parquet-converter.ring.dynamodb.max-cas-retries
195+
[max_cas_retries: <int> | default = 10]
196+
197+
# Timeout of dynamoDbClient requests. Default is 2m.
198+
# CLI flag: -parquet-converter.ring.dynamodb.timeout
199+
[timeout: <duration> | default = 2m]
200+
201+
# The consul_config configures the consul client.
202+
# The CLI flags prefix for this block config is: parquet-converter.ring
203+
[consul: <consul_config>]
204+
205+
# The etcd_config configures the etcd client.
206+
# The CLI flags prefix for this block config is: parquet-converter.ring
207+
[etcd: <etcd_config>]
208+
209+
multi:
210+
# Primary backend storage used by multi-client.
211+
# CLI flag: -parquet-converter.ring.multi.primary
212+
[primary: <string> | default = ""]
213+
214+
# Secondary backend storage used by multi-client.
215+
# CLI flag: -parquet-converter.ring.multi.secondary
216+
[secondary: <string> | default = ""]
217+
218+
# Mirror writes to secondary store.
219+
# CLI flag: -parquet-converter.ring.multi.mirror-enabled
220+
[mirror_enabled: <boolean> | default = false]
221+
222+
# Timeout for storing value to secondary store.
223+
# CLI flag: -parquet-converter.ring.multi.mirror-timeout
224+
[mirror_timeout: <duration> | default = 2s]
225+
226+
# Period at which to heartbeat to the ring. 0 = disabled.
227+
# CLI flag: -parquet-converter.ring.heartbeat-period
228+
[heartbeat_period: <duration> | default = 5s]
229+
230+
# The heartbeat timeout after which parquet-converter are considered
231+
# unhealthy within the ring. 0 = never (timeout disabled).
232+
# CLI flag: -parquet-converter.ring.heartbeat-timeout
233+
[heartbeat_timeout: <duration> | default = 1m]
234+
235+
# Time since last heartbeat before parquet-converter will be removed from
236+
# ring. 0 to disable
237+
# CLI flag: -parquet-converter.auto-forget-delay
238+
[auto_forget_delay: <duration> | default = 2m]
239+
240+
# File path where tokens are stored. If empty, tokens are not stored at
241+
# shutdown and restored at startup.
242+
# CLI flag: -parquet-converter.ring.tokens-file-path
243+
[tokens_file_path: <string> | default = ""]
244+
155245
# The store_gateway_config configures the store-gateway service used by the
156246
# blocks storage.
157247
[store_gateway: <store_gateway_config>]
@@ -2499,6 +2589,7 @@ The `consul_config` configures the consul client. The supported CLI flags `<pref
24992589
- `compactor.ring`
25002590
- `distributor.ha-tracker`
25012591
- `distributor.ring`
2592+
- `parquet-converter.ring`
25022593
- `ruler.ring`
25032594
- `store-gateway.sharding-ring`
25042595

@@ -2815,6 +2906,7 @@ The `etcd_config` configures the etcd client. The supported CLI flags `<prefix>`
28152906
- `compactor.ring`
28162907
- `distributor.ha-tracker`
28172908
- `distributor.ring`
2909+
- `parquet-converter.ring`
28182910
- `ruler.ring`
28192911
- `store-gateway.sharding-ring`
28202912

@@ -3711,6 +3803,12 @@ query_rejection:
37113803
# CLI flag: -compactor.partition-series-count
37123804
[compactor_partition_series_count: <int> | default = 0]
37133805

3806+
# The default tenant's shard size when the shuffle-sharding strategy is used by
3807+
# the parquet converter. When this setting is specified in the per-tenant
3808+
# overrides, a value of 0 disables shuffle sharding for the tenant.
3809+
# CLI flag: -parquet-converter.tenant-shard-size
3810+
[parquet_converter_tenant_shard_size: <int> | default = 0]
3811+
37143812
# S3 server-side encryption type. Required to enable server-side encryption
37153813
# overrides for a specific tenant. If not set, the default S3 client settings
37163814
# are used.

Diff for: pkg/compactor/compactor_ring.go

-3
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ type RingConfig struct {
3939
ListenPort int `yaml:"-"`
4040

4141
WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`
42-
43-
ObservePeriod time.Duration `yaml:"-"`
4442
}
4543

4644
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -100,7 +98,6 @@ func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig {
10098
lc.InfNames = cfg.InstanceInterfaceNames
10199
lc.UnregisterOnShutdown = cfg.UnregisterOnShutdown
102100
lc.HeartbeatPeriod = cfg.HeartbeatPeriod
103-
lc.ObservePeriod = cfg.ObservePeriod
104101
lc.JoinAfter = 0
105102
lc.MinReadyDuration = 0
106103
lc.FinalSleep = 0

Diff for: pkg/cortex/cortex.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"flag"
77
"fmt"
8+
"github.com./cortexproject/cortex/pkg/parquetconverter"
89
"net/http"
910
"os"
1011
"reflect"
@@ -113,6 +114,7 @@ type Config struct {
113114
QueryRange queryrange.Config `yaml:"query_range"`
114115
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
115116
Compactor compactor.Config `yaml:"compactor"`
117+
ParquetConverter parquetconverter.Config `yaml:"parquet_converter"`
116118
StoreGateway storegateway.Config `yaml:"store_gateway"`
117119
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`
118120

@@ -165,6 +167,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
165167
c.QueryRange.RegisterFlags(f)
166168
c.BlocksStorage.RegisterFlags(f)
167169
c.Compactor.RegisterFlags(f)
170+
c.ParquetConverter.RegisterFlags(f)
168171
c.StoreGateway.RegisterFlags(f)
169172
c.TenantFederation.RegisterFlags(f)
170173

@@ -334,14 +337,15 @@ type Cortex struct {
334337
QueryFrontendTripperware tripperware.Tripperware
335338
ResourceMonitor *resource.Monitor
336339

337-
Ruler *ruler.Ruler
338-
RulerStorage rulestore.RuleStore
339-
ConfigAPI *configAPI.API
340-
ConfigDB db.DB
341-
Alertmanager *alertmanager.MultitenantAlertmanager
342-
Compactor *compactor.Compactor
343-
StoreGateway *storegateway.StoreGateway
344-
MemberlistKV *memberlist.KVInitService
340+
Ruler *ruler.Ruler
341+
RulerStorage rulestore.RuleStore
342+
ConfigAPI *configAPI.API
343+
ConfigDB db.DB
344+
Alertmanager *alertmanager.MultitenantAlertmanager
345+
Compactor *compactor.Compactor
346+
Parquetconverter *parquetconverter.Converter
347+
StoreGateway *storegateway.StoreGateway
348+
MemberlistKV *memberlist.KVInitService
345349

346350
// Queryables that the querier should use to query the long
347351
// term storage. It depends on the storage engine used.

Diff for: pkg/cortex/modules.go

+10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"github.com./cortexproject/cortex/pkg/parquetconverter"
78
"log/slog"
89
"net/http"
910
"runtime"
@@ -83,6 +84,7 @@ const (
8384
Configs string = "configs"
8485
AlertManager string = "alertmanager"
8586
Compactor string = "compactor"
87+
ParquetConverter string = "parquet-converter"
8688
StoreGateway string = "store-gateway"
8789
MemberlistKV string = "memberlist-kv"
8890
TenantDeletion string = "tenant-deletion"
@@ -692,6 +694,12 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
692694
return t.Alertmanager, nil
693695
}
694696

697+
func (t *Cortex) initParquetConverter() (serv services.Service, err error) {
698+
t.Cfg.ParquetConverter.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
699+
t.Parquetconverter = parquetconverter.NewConverter(t.Cfg.ParquetConverter, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)
700+
return t.Parquetconverter, nil
701+
}
702+
695703
func (t *Cortex) initCompactor() (serv services.Service, err error) {
696704
t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
697705
ingestionReplicationFactor := t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor
@@ -822,6 +830,7 @@ func (t *Cortex) setupModuleManager() error {
822830
mm.RegisterModule(Configs, t.initConfig)
823831
mm.RegisterModule(AlertManager, t.initAlertManager)
824832
mm.RegisterModule(Compactor, t.initCompactor)
833+
mm.RegisterModule(ParquetConverter, t.initParquetConverter)
825834
mm.RegisterModule(StoreGateway, t.initStoreGateway)
826835
mm.RegisterModule(TenantDeletion, t.initTenantDeletionAPI, modules.UserInvisibleModule)
827836
mm.RegisterModule(Purger, nil)
@@ -853,6 +862,7 @@ func (t *Cortex) setupModuleManager() error {
853862
Configs: {API},
854863
AlertManager: {API, MemberlistKV, Overrides},
855864
Compactor: {API, MemberlistKV, Overrides},
865+
ParquetConverter: {API, MemberlistKV, Overrides},
856866
StoreGateway: {API, Overrides, MemberlistKV, ResourceMonitor},
857867
TenantDeletion: {API, Overrides},
858868
Purger: {TenantDeletion},

0 commit comments

Comments
 (0)