Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact: Offline deduplication #1276

Closed
wants to merge 11 commits into from

Conversation

smalldirector
Copy link

Changes

As the discussion in #1014, implement a new feature in compactor which provides offline deduplication function for the data from different replicas.

The offline deduplication follow the same design with query deduplication. The user needs to specify replica label by config dedup.replica-label before enable it, and the function uses current query deduplication algorithm(penalty algorithm) to merge data points come from different replicas.

The offline deduplication function is based on bucket level in remote storage, so the user needs to ensure that all replica data write to same bucket.

Verification

  • Compared data quality by tsdb read API.
  • Compared the graph by Thanos query UI.

Below figures are the comparison for one sample metrics before dedup and after dedup. It defines one interval replica label _agg_replica_ to represent the merged data.

Before Dedup

After Dedup

  • Tested the dedup function online, and monitoring its metrics(especially memory usage).

For below figure, each of block size is around 3GB and we have two replicas for each block. As it is using streaming read/write way to operate block, so no OOM exception happens.

Metrics

@smalldirector
Copy link
Author

@bwplotka I submitted the PR for offline deduplication function, please help add reviewers and review the codes. Thanks.

@smalldirector smalldirector changed the title Compact: Offline deduplication [WIP]Compact: Offline deduplication Jul 1, 2019
@smalldirector smalldirector changed the title [WIP]Compact: Offline deduplication Compact: Offline deduplication Jul 15, 2019
@bwplotka
Copy link
Member

Thanks @smalldirector and we definitely need that (: Is there any way you can split this PR to smaller chunks in any way? It will will improve reviewing process majorily!

@smalldirector
Copy link
Author

@bwplotka Good question here. I was trying to split the PR before, however I didn't figure out one good way. For my understanding, we need to keep the function work in the PR before it is merged. However my PR only provide one function dedup.
Do you have any suggestion on how to break it?

@smalldirector
Copy link
Author

@bwplotka The dudep function support both of raw metrics and downsampling metrics, should I split it to two separate PRs(one is for raw metrics, another is for downsampling metrics)?

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thanks for this great work, looks solid. I think this might work from the first glance, but I would love to make this bit less complex, smaller in LOCs and less memory consuming before merging.

TL;DR: Reuse stuff! (:

You are right it is hard to split this - but I think we can really decrease those codelines majorily by reusing more code.

I proposed various option to reuse code that is there already, so we can have only one way of doing things e.g:

  • deduplicating (both offline and online) (iterators)
  • syncing
  • downloading block
  • specifying time ranges
  • specifying aggr type

It will reduce number of lines for this PR a lot IMO. Also we probably need to use iterators to stream the whole operation - otherwise it will take too much memory ): I know it is hard work to make this happen, but happy to help with that as well.

We could split then: Making sure Syncer is generic enough to be used by deduper and then rest - we could start with that.

return NewReplicas(s.labelName, result)
}

func (s *ReplicaSyncer) download(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan of adding yet another Download method. Same with Syncer. Can we reuse those?

We already have problem with having multiple Syncer so we have #1335

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will figure out one proper way to reuse the functions in the compact.Syncer.
From the #1335, I saw we would start to unify the syncer APIs.
I think if I can do this change after #1335, it will save some efforts.

}
}

func (s *ReplicaSyncer) Sync(ctx context.Context) (Replicas, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this for compactor. Can we reuse somehow?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with above.

postings index.Postings
}

func NewBlockReader(logger log.Logger, resolution int64, blockDir string) (*BlockReader, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this? We already have tsdb.BlockReader

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When try to get the sorted postings for one block, it needs to call four functions to make it: OpenBlock(), Index(), Postings(), SortedPostings(). Here just use this BlockReader to provide a convenient way to access one block's postings and chunk reader.

// samples from non-downsample blocks
RawSample SampleType = iota
// samples from downsample blocks, map to downsample AggrType
CountSample
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just reuse AggrType? This will avoid those mappings and simplify code (:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let me figure out the way to handle raw sample with AggrType.

return chks, nil
}

type SampleReader struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. We deduplicate in Iterators interface currently... Wonder if we can reuse same algorithm as here: https://github.com/thanos-io/thanos/blob/master/pkg/query/iter.go#L375

Again - to reuse code to reduce overall complexity for understanding Thanos codebase (: and to gain benefits from improvements by changing just single place (:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this SampleReader doesn't do the deduplication logic. It just provides the function to read the samples under the specified time range.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And using DedupSeriesIterator to do the dedup logic:

merge := func(st SampleType) []*Sample {

So both of online and offline deduplication are using the same penalty algorithm.

"github.com/thanos-io/thanos/pkg/query"
)

type TimeWindow struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have tsdb.TimeRange{}

Can we reuse it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know tsdb has TimeRange . Will change to use it.


type BlockGroups []*BlockGroup

func NewBlockGroups(replicas Replicas) BlockGroups {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be doable in Iterator as well. I think it would more understandable if we would have just one method/algo for this - for both offline and online deduplication. Online deduplication is available here: https://github.com/thanos-io/thanos/blob/master/pkg/query/iter.go#L304

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BlockGroups provides the function to group all the blocks under the specified time range. It doesn't provide any deduplication here.

pkg/compact/dedup/dedup_test.go Show resolved Hide resolved
@smalldirector
Copy link
Author

@bwplotka Thanks for your 1st round review. Totally agree with you that we should leverage more on the existed codes. For the time range and aggr type, I made the code changes. For the sync and downloading block, as I mentioned above, the #1335 will do the refactor task on those function. If I do it after the #1335, it will save us lots of efforts. Currently the offline deduplication logic follows the same dedup algorithm with online deduplication, so no code changes need to be done here.

@bwplotka
Copy link
Member

bwplotka commented Aug 6, 2019

Cool @smalldirector , so is it ready for second review iteration? (:

@smalldirector
Copy link
Author

smalldirector commented Aug 6, 2019

@bwplotka Yes, please help do the next round review. For the meta syncing and block downloading, I would like to do it after #1335 is fixed.

@bwplotka
Copy link
Member

We need to get back to this finally (: cc @GiedriusS @povilasv @brancz

Sorry for delay.

@smalldirector smalldirector force-pushed the offline-dedup branch 2 times, most recently from 395dcce to 0031d8e Compare October 11, 2019 22:14
@smalldirector
Copy link
Author

Rebased the codes for the upcoming review.

f := func() error {
if isEnableDedup(enableDedup, dedupReplicaLabel) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isEnableDedup(enableDedup, dedupReplicaLabel) {
if enableDedup && len(dedupReplicaLabel) > 0 {

@@ -463,3 +479,7 @@ func generateIndexCacheFile(
}
return nil
}

func isEnableDedup(enableDedup bool, dedupReplicaLabel string) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider getting rid of this one line function, IMO no point in abstracting single line of code :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can remove this function.

@@ -665,7 +665,7 @@ type RetryError struct {
err error
}

func retry(err error) error {
func Retry(err error) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be exported method? If yes add comment, but I would keep it internal if possible

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this function is used in another package scope, I need to change it as public. I will add comment for this method.

"github.com/thanos-io/thanos/pkg/objstore"
)

type BucketDeduper struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are missing comments on exported structs / methods / functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will review all the codes and see whether I can change some exported structs / methods / functions to interval. If not, I will add comments for them. Thanks.

@Reamer
Copy link

Reamer commented Oct 18, 2019

Hi @smalldirector,

At first, thank you for developing this feature. I tested your feature on my test environment and I will keep this running. If I encounter issues, i'll let you known.

Used code

I've done a merge with thanos v0.8.1.

Before De-duplication

I used thanos bucket web with --label=prometheus_replica.
I have two prometheus statefulsets in a HA setup, created with prometheus-operator. One is called prometheus-k8s, the other prometheus-avm.
Pre_Dedup

During De-duplication

For both prometheus setups I noticed a lot of warn message of the kind find empty downsample chunk during de-duplication of resolution 3600000.
Full error message:

resolution=3600000 metric in prometheus-k8s job:
{"caller":"reader.go:324","labels":"{__name__=\"admission_quota_controller_work_duration\",endpoint=\"https\",instance=\"10.20.15.13:443\",job=\"apiserver\",namespace=\"default\",quantile=\"0.99\",service=\"kubernetes\"}","level":"warn","msg":"find empty downsample chunk","ref":887438,"ts":"2019-10-16T07:16:23.650581433Z","type":"counter"}

resolution=3600000 metric in prometheus-avm job:
{"caller":"reader.go:324","labels":"{__name__=\"http_request_duration_microseconds\",endpoint=\"http\",handler=\"flags\",instance=\"10.130.17.27:10902\",job=\"thanos-metrics\",namespace=\"thanos\",pod=\"thanos-querier-657d5f7d9-qvzmq\",quantile=\"0.5\",service=\"thanos-metrics\"}","level":"warn","msg":"find empty downsample chunk","ref":51486867,"ts":"2019-10-17T05:57:53.170766602Z","type":"counter"}

first_dedbug_3600000_with_many_errors

De-duplication of resolution 0 (Raw-files) and resolution 300000 was done without warning or error messages.

After De-duplication

After De-duplication was succeeded, the downsampling algorithm had starting to create new blocks for 300000 (A third block for 300000). I have no idea why that was happening. Maybe you or an other developer knows more. I interrupted this process and deleted all blocks of resolution 300000 and 3600000. After this I restarted compactor with downsampling again.

After downsampling overnight everything looks nearly perfect.
After_Dedup
After changing thanos bucket web config to --label=prometheus it looks perfect.
After_Dedup2

Proposal to change from user perspective

  • Checkout de-duplication algorithm for resolution 3600000.

@ivan-kiselev
Copy link

This would be just an awesome thing to have. Looking forward to it!

@smalldirector
Copy link
Author

Just came back from vacation. Thanks for review @povilasv, I will do the related code changes with comments. @Reamer Thanks for your testing, I will check your question.

@smalldirector
Copy link
Author

@Reamer The logic for throwing the warning log find empty downsample chunk is pretty simple. The function tries to read chunk data with given downsampling aggr type, if no such chunk found, it will throw such logs.
From your test, there is no counter downsampling data for given label set. Can you write one simple block reader to test your downsampling block?

shuaizhang and others added 10 commits December 14, 2019 16:58
Signed-off-by: shuaizhang <[email protected]>
Signed-off-by: shuaizhang <[email protected]>
Signed-off-by: shuaizhang <[email protected]>
Signed-off-by: shuaizhang <[email protected]>
Signed-off-by: shuaizhang <[email protected]>
Signed-off-by: shuaizhang <[email protected]>
Signed-off-by: shuaizhang <[email protected]>
Signed-off-by: Alan Zhang <[email protected]>
@smalldirector
Copy link
Author

@Reamer Merged the latest codes and fixed the issues.

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for maintaining it!

I think it's a lot of code to maintain and support but there is a way to simplify this by reusing more from Prometheus.

Yes, we need this grouping code which you did which is amazing, but for deduping we might need to just:

  • Abstract in Prometheus chunks.MergeOverlappingChunks(chks) and allow injecting custom (chks []Meta) ([]Meta, error) { mergers. This requires a change in upstream which I think Prometheus maintainers will accept (:
  • Implement merger bases on our deduplication logic we have here.

What do you think? (: Would love to have this in Thanos finally ❤️

@smalldirector
Copy link
Author

smalldirector commented Dec 22, 2019

Thanks for the suggestion.
AFAIK, Thanos supports both of raw and downsampling data block, but there is no downsampling concept in Prometheus world. We may can use the function chunks.MergeOverlappingChunks(raw chks) provided by Prometheus to do the merge logic, however it seems to be impossible to support MergeOverlappingChunks(downsampling chks) in Prometheus.
I don't think we can use this one function to handle both of raw and downsampling cases.

@bwplotka
Copy link
Member

chunks.MergeOverlappingChunks(raw chks)

This should be fine as we hide downsampling chunks under common Prometheus chunk interface (:

@smalldirector
Copy link
Author

chunks.MergeOverlappingChunks(raw chks)

This should be fine as we hide downsampling chunks under common Prometheus chunk interface (:

Can anyone help make this change in Prometheus upstream?

@bwplotka
Copy link
Member

bwplotka commented Jan 9, 2020

Happy New Year!

Can anyone help make this change in Prometheus upstream?

In what sense help?

I can help, but it's as easy as proposing a change to injecting interface/function for chunks.MergeOverlappingChunks(raw chks) to Compactor (: Happy to look on that tomorrow.

@Reamer
Copy link

Reamer commented Jan 31, 2020

Hi,
I doesn't want, that this cool feature falls into sleep. @bwplotka Can we move forward here?

I get a merge conflict with v0.10.1. @smalldirector Can you please rebase your code to current master.

Thanks for your work

Reamer

@bwplotka
Copy link
Member

bwplotka commented Feb 6, 2020

@Reamer @smalldirector We looked in details and we believe we can do 90% of this PR by just reusing existing components (with small upstream changes). Cc @metalmatze

We will spend some time next week on this to show what we mean (: No work will be waste as grouping etc can be used from the code here!

@vjsamuel
Copy link

@bwplotka curious, would you just add into this PR and add move the feature forward or would this PR be merged in and modified? we would like to see this feature in place as we see great value add. @smalldirector thank you for continuing work on this.

@bwplotka
Copy link
Member

The feature is literally 100 lines of code if we would integrate all with TSDB code and reuse more, so I would say another PR. Are you ok with this @smalldirector ? Looks like Shuai is inactive as well recently.

Thank you for your hard work on this, I think we learned a lot with this implementation. Will close for now.

cc @metalmatze who is working on this, this week.

@bwplotka bwplotka closed this Feb 24, 2020
@d-ulyanov
Copy link
Contributor

waiting for this feature, @bwplotka is there another PR already?

@bwplotka
Copy link
Member

@metalmatze is working on the first iteration, I am preparing interface upstream, maybe as part of this: prometheus/prometheus#5882

@vjsamuel
Copy link

@bwplotka @metalmatze is there an update on this one?

@Reamer
Copy link

Reamer commented Mar 17, 2020

@bwplotka
Copy link
Member

bwplotka commented Mar 17, 2020

Yes, we are close to enable vertical compaction (which is a bit different than offline): #2250

Then for offline indeed it requires prometheus/prometheus#6990 which is cloooooooose to be done (:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants