Skip to content

Add proposal for parquet storage #6712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

yeya24
Copy link
Contributor

@yeya24 yeya24 commented Apr 21, 2025

What this PR does:

This PR adds a design proposal for Parquet Storage in Cortex

Which issue(s) this PR fixes:
Fixes #

Checklist

  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

@alanprot
Copy link
Member

Thanks @yeya24 This looks amazing!

There is some small differences on the data cols description here and what we did on prometheus-community/parquet-common#2 but overall looks very good.

Signed-off-by: Ben Ye <[email protected]>
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Apr 21, 2025
@SungJin1212
Copy link
Member

It seems promising, thanks!


### Data Format

Following the current desgin of Cortex, each Parquet file contains at most 1 day of data.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Following the current desgin of Cortex, each Parquet file contains at most 1 day of data.
Following the current design of Cortex, each Parquet file contains at most 1 day of data.

| `s_lbl_{labelName}` | Values for a given label name. Rows are sorted by metric name | ByteArray (string) | RLE_DICTIONARY/Zstd/No | Yes |
| `s_data_{n}` | Chunks columns (0 to data_cols_count). Each column contains data from `[n*duration, (n+1)*duration]` where duration is `24h/data_cols_count` | ByteArray (encoded chunks) | DeltaByteArray/Zstd/Yes | Yes |

data_cols_count_md will be a parquet file metadata and its value is usually 3 but it can be configurable to adjust for different usecases.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data_cols_count_md this the same as data_cols_count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I will update to data_cols_count

- Maintains row and row group order matching the Labels file
- Contains multiple chunk columns for time-series data. Each column covering a time range of chunks: 0-8h, 8h-16h, 16-24h.

#### Column Specifications
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this column specification originated by joining the two parquet files above?

maybe I'm missing something obvious, but would be nice to include the rationale for splitting in two files. rows are ordered in the same way both files, so I'm not sure why they are need to be split.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are definitely right. We experimented with a single file with both labels and chunks. The reason of splitting to 2 files is that labels and chunks have kind of different size and read pattern. We are able to configure parquet reader differently for read buffer so that we can read more efficiently.

There is a also POC from Cloudflare which uses 2 files so that they can choose to store those files differently. They can cache labels file inmemory and leave chunks file on object store because of size for more efficient index queries.

Overall, 2 files seem a more flexible approach. Maybe @alanprot can share more info.

Copy link

@MichaHoffmann MichaHoffmann Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files are useful since the labels parquet file is tiny and can be stored on disk or memoized if wanted. This reduces requests to object storage for any label related lookups.


## Background

Since the introduction of Block Storage in Cortex, TSDB format and Store Gateway is the de-facto way to query long term data on object storage. However, it presents several significant chanllenges:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Since the introduction of Block Storage in Cortex, TSDB format and Store Gateway is the de-facto way to query long term data on object storage. However, it presents several significant chanllenges:
Since the introduction of Block Storage in Cortex, TSDB format and Store Gateway is the de-facto way to query long term data on object storage. However, it presents several significant challenges:

Copy link
Contributor

@justinjung04 justinjung04 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, thank you!


It is similar to compactor, however, it only converts single block. The converted Parquet files will be stored in the same TSDB block folder so that the lifecycle of Parquet file will be managed together with the block.

Only certain blocks can be configured to convert to Parquet file and it will be block duration based, for example we only convert if block duration is >= 12h.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to asssume that the blocks converted to Parquet file will not be further compacted? If not, how do we manage to compact blocks with parquet files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this proposal parquet files won't be further compacted. As it requires a compactor which takes in multiple parquet files and outputs 1 parquet file. This is added as a non goal.

This proposal we only add a parquet converter which takes 1 TSDB block and outputs 1 parquet file.

Comment on lines 109 to 111
2. **Chunks Parquet File**
- Maintains row and row group order matching the Labels file
- Contains multiple chunk columns for time-series data. Each column covering a time range of chunks: 0-8h, 8h-16h, 16-24h.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify the benefit of chunks parquet file over the existing chunks file? The labels parquet file clearly has advantage of fetching all series for label matchers, but once we have the final list of series to fetch, I'm not sure how having chunks in parquet file will help with performance or memory utilization.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It enables fetching chunks for 8h ( by default, configurable ) in groups. This reduces requests to object storage if you dont need to fetch chunks for all 24h of the block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main benefit here is overfetch. We have seen quite bad overfetch on Store Gateway for chunks where fetched chunk bytes vs touched chunk bytes is 20:1.

There could be overfetch in Parquet as well but the smallest read unit in Parquet is page and the size is configurable. From our initial test the overfetch seems much better.

Parquet's compression and encoding is another nice addition as we able to see 30% of chunk size reduction.

Copy link
Contributor

@matej-g matej-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work 👍, I added couple questions


### Data Format

Following the current design of Cortex, each Parquet file contains at most 1 day of data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know nearly enough about the format, but I'm missing some more justification / explanation why 1 day, beyond current design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Cortex, largest block we have is 1 day. It is configurable though.
I can remove this sentence. In short, Parquet file should have the same duration as of TSDB block as it is converted from TSDB blocks


2. **Chunks Parquet File**
- Maintains row and row group order matching the Labels file
- Contains multiple chunk columns for time-series data. Each column covering a time range of chunks: 0-8h, 8h-16h, 16-24h.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood from below conversation, this is an example of how the columns can be split. Maybe it would good to adjust this point to clarify it's an example of how this can be done (or default?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will add

| `s_hash` | Hash of all labels | INT64 | None/Zstd/Yes | No |
| `s_col_indexes` | Bitmap indicating which columns store the label set for this row (series) | ByteArray (bitmap) | DeltaByteArray/Zstd/Yes | Yes |
| `s_lbl_{labelName}` | Values for a given label name. Rows are sorted by metric name | ByteArray (string) | RLE_DICTIONARY/Zstd/No | Yes |
| `s_data_{n}` | Chunks columns (0 to data_cols_count). Each column contains data from `[n*duration, (n+1)*duration]` where duration is `24h/data_cols_count` | ByteArray (encoded chunks) | DeltaByteArray/Zstd/Yes | Yes |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question, but should the column count always result in columns being split by full hours (e.g. every 6 / 8 / 12 hours)? Are there any consequences if that's not so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will mention we re-encoded the chunks a bit at writer so that they fall into the configured column time ranges


## Open Questions

1. Should we use Parquet Gateway to replace Store Gateway
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a fully compatible API between Parquet Gateway and Store Gateway would make the migration easier as well no?

Copy link
Contributor Author

@yeya24 yeya24 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easier to just replace querier with parquet querier I believe as it is stateless.
But yeah migration is not part of the proposal. We probably need to create a migration guide later.


Similar to the existing `distributorQueryable` and `blockStorageQueryable`, Parquet queryable is a queryable implementation which allows Cortex to query parquet files and can be used in both Cortex Querier and Ruler.

If Parquet queryable is enabled, block storage queryable will be disabled and Cortex querier will not query Store Gateway anymore. `distributorQueryable` remains unchanged so it still queries Ingesters.
Copy link
Contributor

@danielblando danielblando Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the parquet converter configured for only blocks >= 12h. Would blockStorageQueryable still be enabled when querying blocks < 12h?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing flags like query Ingester within and query store after will still be used so we can fallback to Ingesters.
We can also do some kind of fallback to store gateway as you said during migration phase. But those are implementation details.

Long term solution is for compactor to create and compact parquet files so same as what we have today.


If Parquet queryable is enabled, block storage queryable will be disabled and Cortex querier will not query Store Gateway anymore. `distributorQueryable` remains unchanged so it still queries Ingesters.

Parquet queryable uses bucket index to discovers parquet files in object storage. The bucket index is the same as the existing TSDB bucket index file, but using a different name `bucket-index-parquet.json.gz`. It is updated periodically by Cortex Compactor/Parquet Converter if parquet storage is enabled.
Copy link
Contributor

@danielblando danielblando Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we still query the 12h tsdb blocks while we dont have them in the parquet index?
For example if we have 8h blocks and we are compacting to 12h blocks, I assume after the new 12h tsdb blocks are created we convert it to parquet, but while this doesnt finish we only have them in the default index and the 8h blocks would be removed from default index by compactor, no?

Copy link
Contributor Author

@yeya24 yeya24 Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot query them until the parquet file is created and added to bucket index. There is some tradeoff here. Users can configure any option that works for them

  • You can configure convert only 12h+ blocks. This has a longer delay in parquet file creation but less resource required for convertion. Users need to expect fallback to some other storage to query the data
  • You can configure converting 2h block. Maybe we can configure it to only convert blocks after deduplication so parquet files are available earlier but more compactors are required to do convert

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, makes more sense now. I think then parquet would need to do some kind of merge between both index for the 12h scenario.

@alanprot alanprot mentioned this pull request Apr 22, 2025
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/documentation lgtm This PR has been approved by a maintainer size/L
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants