-
Notifications
You must be signed in to change notification settings - Fork 816
[Query-Frontend] Add dynamic query vertical sharding #6678
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Query-Frontend] Add dynamic query vertical sharding #6678
Conversation
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
expectedDurationFetchedBySelectors: 4 * day, | ||
expectedDurationFetchedByLookbackDelta: 0 * day, | ||
expectedDurationFetchedByRange: 40 * day, | ||
expectedDurationFetchedBySelectors: 4 * day, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a test about the edge case you described in the PR description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added separate test cases for dynamic splitting + vertical sharding scenarios. I also added an example in the PR description to explain the change better.
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
for candidateVerticalShardSize := 1; candidateVerticalShardSize <= maxVerticalShardSize; candidateVerticalShardSize++ { | ||
maxSplitsFromMaxShards := getMaxSplitsFromMaxQueryShards(dynamicSplitCfg.MaxShardsPerQuery, candidateVerticalShardSize) | ||
maxSplitsFromDurationFetched := getMaxSplitsFromDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, candidateVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) | ||
|
||
// Use the more restrictive max splits limit | ||
var maxSplits int | ||
switch { | ||
case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: | ||
maxSplits = min(maxSplitsFromMaxShards, maxSplitsFromDurationFetched) | ||
case dynamicSplitCfg.MaxShardsPerQuery > 0: | ||
maxSplits = maxSplitsFromMaxShards | ||
case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: | ||
maxSplits = maxSplitsFromDurationFetched | ||
} | ||
|
||
candidateInterval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) | ||
if candidateTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), candidateInterval, candidateVerticalShardSize); candidateTotalShards > totalShards { | ||
interval = candidateInterval | ||
verticalShardSize = candidateVerticalShardSize | ||
totalShards = candidateTotalShards | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change to introduce dynamic vertical sharding. We do the same calculations to determine the split interval, but now inside a loop that goes over vertical sharding from 1 to max.
The best vertical sharding value is the one that is expected to result in the most number of total shards, which is checked by getExpectedTotalShards()
.
|
||
// Set number of vertical shards to be used in shard_by middleware | ||
if isShardable && maxVerticalShardSize > 1 { | ||
ctx = tripperware.InjectVerticalShardSizeToContext(ctx, verticalShardSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chosen value for vertical sharding is injected to context here, and then used in shard_by.go
middlware
// Return the fixed base duration fetched by the query regardless of the number of splits, and the duration that is fetched once for every split | ||
func getDurationFetchedByQuerySplitting(expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) (fixedDurationFetched time.Duration, perSplitDurationFetched time.Duration) { | ||
// First analyze the query using original start-end time. Duration fetched by lookbackDelta here only reflects the start time of first split | ||
durationFetchedByRange, durationFetchedBySelectors, durationFetchedByLookbackDeltaFirstSplit := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta) | ||
|
||
fixedDurationFetched += durationFetchedByRange // Duration fetched by the query range is constant regardless of how many splits the query has | ||
perSplitDurationFetched += durationFetchedBySelectors // Duration fetched by selectors is fetched once for every query split | ||
|
||
// Next analyze the query using the next split start time to find the duration fetched by lookbackDelta for splits other than first one | ||
nextIntervalStart := nextIntervalBoundary(queryStart, queryStep, baseInterval) + queryStep | ||
_, _, durationFetchedByLookbackDeltaOtherSplits := analyzeDurationFetchedByQueryExpr(expr, nextIntervalStart, queryEnd, baseInterval, lookbackDelta) | ||
|
||
// Handle different cases for lookbackDelta | ||
if durationFetchedByLookbackDeltaFirstSplit > 0 && durationFetchedByLookbackDeltaOtherSplits > 0 { | ||
// lookbackDelta is fetching additional duration for all splits | ||
perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits | ||
} else if durationFetchedByLookbackDeltaOtherSplits > 0 { | ||
// lookbackDelta is fetching additional duration for all splits except first one | ||
perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits | ||
fixedDurationFetched -= durationFetchedByLookbackDeltaOtherSplits | ||
} else if durationFetchedByLookbackDeltaFirstSplit > 0 { | ||
// lookbackDelta is fetching additional duration for first split only | ||
fixedDurationFetched += durationFetchedByLookbackDeltaFirstSplit | ||
} | ||
|
||
return fixedDurationFetched, perSplitDurationFetched | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To simplify the calculation for expected duration fetched by query, we no longer calculate lookbackDelta separate from matrix selectors, which makes this function obsolete.
// Adjust start and end time based on matrix selectors and/or subquery selector and increment total duration fetched, this excludes lookbackDelta | ||
start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, 0, n, path, evalRange) | ||
startIntervalIndex := floorDiv(start, baseIntervalMillis) | ||
endIntervalIndex := floorDiv(end, baseIntervalMillis) | ||
totalDurationFetchedCount += int(endIntervalIndex-startIntervalIndex) + 1 | ||
|
||
// Increment duration fetched by lookbackDelta | ||
startLookbackDelta := start - util.DurationMilliseconds(lookbackDelta) | ||
startLookbackDeltaIntervalIndex := floorDiv(startLookbackDelta, baseIntervalMillis) | ||
if evalRange == 0 && startLookbackDeltaIntervalIndex < startIntervalIndex { | ||
durationFetchedByLookbackDeltaCount += int(startIntervalIndex - startLookbackDeltaIntervalIndex) | ||
} | ||
// Adjust start time based on matrix selectors and/or subquery selectors and calculate additional lookback duration fetched | ||
start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, lookbackDelta, n, path, evalRange) | ||
durationFetchedBySelectors := (end - start) - (queryEnd - queryStart) | ||
durationFetchedBySelectorsCount += int(ceilDiv(durationFetchedBySelectors, baseIntervalMillis)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also simplified because we no longer need to calculate duration fetched by lookbackDelta separately. It is all included in durationFetchedBySelectors
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, thank you
What this PR does:
Example query
sum(rate(metric[1h])) by (pod)
with 100 day range and the following configurations:max_shards_per_query: 100
max_fetched_data_duration_per_query: 6000h
// 250 dayquery_vertical_shard_size: 3
Option one: 1 vertical shard (no vertical sharding)
Option two: 2 vertical shards
Option three: 3 vertical shards
Previously, we would always choose option three. After the change, in this scenario we would choose option one to not sharding vertically to allow 100 horizontal splits.
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]