Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools bucket replicate: add support for concurrent bucket replication #5280

Closed
wants to merge 8 commits into from

Conversation

mwasilew2
Copy link

@mwasilew2 mwasilew2 commented Apr 13, 2022

fixes: #4278

Signed-off-by: Michal Wasilewski [email protected]

  • I added CHANGELOG entry for this change.
  • Change is not relevant to the end user.

Changes

The current implementation of bucket replication is serial, i.e. there's a loop which iterates over blocks and synchronizes them between a source and destination bucket, one block at a time. This PR switches to a pipeline based approach which allows for concurrent replication.

The pipeline consists of three stages. The first stage is a single goroutine which sends pointers to blocks that need to be synchronized to an unbuffered channel. The second stage consists of multiple goroutines which receive pointers to blocks from the upstream channel and execute replication for each received pointer. The level of concurrency of this stage can be controlled with a cli config option. The default concurrency level is 4 and it's an arbitrary number. This default should work even on single threaded CPUs because goroutines are used. Any errors that happen in the second stage are sent to a channel which is then read from by the third stage. The third stage aggregates the errors using a multierror object. Once all executions from the second stage are finished the function returns the multierror object which also implements the error interface from the stdlib.

Verification

@mwasilew2
Copy link
Author

writing down some notes:

  • replication can run in two modes:
    • singleRun (by default is false, passed as a cli argument)
    • "continuous", runs on a non-configurable interval, hard-coded into 1min
  • both modes use the same function call for starting the replication, but handle errors from that function differently:
    • singleRun propagates the error effectively passing it to the "global" run group and thus terminating the execution
    • "continuous" doesn't propagate the error, but instead handles it (logs a message and increases some metrics)
  • what to do with partial uploads?
    • current state:
      • in case of a panic, there are some defer functions that log a problem
      • there is no retry mechanism
      • failures are not tracked, e.g. ids of failed blocks are not captured
    • don't terminate other goroutines in case of one of them returning an error, instead allow all of them to finish at their own pace and aggregate errors
  • how to handle life-cycle of goroutines used for concurrent upload?
    • the current implementation is serial, failure to upload any of the blocks in the list results in head-of-line blocking, this could potentially be improved
    • cancellation is handled using context propagation, context is injected using a closure
    • oklog/run:
      • reusing the "global" run group doesn't seem like a good idea because in the "continuous" mode we want to continue the execution no matter what the error
      • a local oklog/run.Group could be created
      • the new implementation could be able to capture errors from multiple goroutines without terminating the other ones (oklog/run terminates all goroutines in a group when the first error is received)
    • sync.Waitgroup:
      • doesn't propagate errors
      • it could be used with some additional error handling logic that would write to a thread safe, "centralised" variable storing errors (for example a mutex protected slice), but the upstream function wouldn't be returning a simple error and instead a number of errors. One possible workaround I can think of is implementing a custom error type that would return an aggregated error message.
    • errgroup:
      • captures only the first returned error (which might be ok)
      • could be extended with a "mutex protected slice"
    • hashicorp/go-multierror.Waitgroup:
      • is thread safe
      • hashicorp/go-multierror.Waitgroup.Wait() returns a single error object that implements the std lib error interface and at the same time aggregates errors from all goroutines
  • how to aggregate errors?
    • hashicorp/go-multierror is already a dependency
  • how to bind concurrency?
    • thread pool:
      • over-engineering?
    • sync.pool:
      • over-engineering? there won't be any memory heavy objects involved, it's just about scheduling
    • semaphore:
      • channel implementation
      • sync.semaphore:
        • introduces a new dependency

Summary:

  • return a single error object and let it be handled by the functions upstream
  • use hashicorp/go-multierror.Waitgroup to manage goroutines and aggregate errors from them
  • use blocking on a channel to bind concurrency
  • try to keep this PR to minimum, i.e. don't extend handling of partial uploads

@mwasilew2
Copy link
Author

mwasilew2 commented Apr 13, 2022

ah, I just realized that multierror.Group.Wait() will return when the internal counter goes down to 0 and that might happen before the loop iterates over all blocks, so the implementation as is in this commit won't work as desired so I still need to work on this

Signed-off-by: Michal Wasilewski <[email protected]>
@mwasilew2
Copy link
Author

switched to a pipeline approach, but I still want to give this some more thought

@mwasilew2
Copy link
Author

The bucket client methods seem to be thread safe at least for a few backends that I checked:

GCS

The methods of Client are safe for concurrent use by multiple goroutines.

src: https://pkg.go.dev/cloud.google.com/go/storage#hdr-Creating_a_Client

Minio

minio/minio-go#598

Filesystem

there are cases of race conditions when calling bucket methods, e.g. #5103 , but it seems that as long as the filename is unique it should be fine: https://github.com/thanos-io/thanos/blob/main/pkg/objstore/filesystem/filesystem.go#L191

@mwasilew2
Copy link
Author

mwasilew2 commented Apr 20, 2022

ideas for extending this further (potentially create issues):

  • handle upload errors (e.g. retries, backoff)
  • handle early termination (e.g. SIGINT or SIGTERM, use select in goroutines in the pipeline?)
  • add some more instrumentation in the replication function calls (e.g. a metric for the number of failed uploads)

Signed-off-by: Michal Wasilewski <[email protected]>
Signed-off-by: Michal Wasilewski <[email protected]>
@mwasilew2
Copy link
Author

@GiedriusS I think this is ready for a first review to confirm I'm heading in the right direction

@GiedriusS
Copy link
Member

I'll take a look soon, thank you for your thorough work. A bit on vacation ATM.

Copy link
Member

@bwplotka bwplotka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this!

Implementation is great, simple and efficient - we need tests and just minor style nits fixed.

But there is a reason why this code is not concurrent. I tried to explain this in flag help which we could use to warn users. This is essentially unsafe - but for your use case it might work (e.g if you don't have a compactor turned off on the target bucket). There is even comment about this:

image

There is a way to make it a bit safer - we can split to groups as mentioned here like we do in the compactor itself (unless already done here).

if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
// iterate over blocks and send them to a channel sequentially
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a full sentences in comments (nit).

Suggested change
// iterate over blocks and send them to a channel sequentially
// Iterate over blocks and send them to a channel sequentially.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, my bad, I'll fix it

I wonder why this was not picked up by the linter, golangci-lint config includes godot:

- godot
. Perhaps it's running using the defaults (where it's not checking all comments, just the declarations)? Would it make sense to add .godot.yaml with config appropriate to make it check all comments? (and probably fix lots and lots of comments across the code base)

return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
// iterate over blocks and send them to a channel sequentially
blocksChan := func() <-chan *metadata.Meta {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it have to be in closure (anon function)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, good point, I'll simplify this.

return out
}()

// fan-out for concurrent replication
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong comment format (full sentence please).

}
// iterate over blocks and send them to a channel sequentially
blocksChan := func() <-chan *metadata.Meta {
out := make(chan *metadata.Meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
out := make(chan *metadata.Meta)
blocksChan := make(chan *metadata.Meta)

@@ -210,6 +211,7 @@ func (tbc *bucketReplicateConfig) registerBucketReplicateFlag(cmd extkingpin.Fla
cmd.Flag("matcher", "Only blocks whose external labels exactly match this matcher will be replicated.").PlaceHolder("key=\"value\"").StringsVar(&tbc.matcherStrs)

cmd.Flag("single-run", "Run replication only one time, then exit.").Default("false").BoolVar(&tbc.singleRun)
cmd.Flag("concurrency-level", "Max number of go-routines to use for replication.").Default("4").IntVar(&tbc.concurrencyLvl)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this is not as simple, so if you want to keep this implementation we have to add extra context and set it to 1.

Suggested change
cmd.Flag("concurrency-level", "Max number of go-routines to use for replication.").Default("4").IntVar(&tbc.concurrencyLvl)
cmd.Flag("concurrency-level", "Number of go-routines to use for replication. WARNING: Value bigger than one enables, concurrent, non-sequential block replication. This means that the block from 4w ago can be uploaded after the block from 2h ago. In the default Thanos compactor, with a 30m consistency delay (https://thanos.io/tip/components/compact.md/#consistency-delay) if within 30 minutes after uploading those blocks you will have still some blocks to upload between those, the compactor might do compaction assuming a gap! This is solvable with vertical compaction, but it wastes extra computation and is not enabled by default. Use with care.").Default("1").IntVar(&tbc.concurrencyLvl)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is solvable with vertical compaction, but it wastes extra computation and is not enabled by default.

I see what you mean here, even if the problem can be solved by vertical compaction, the blocks should not be uploaded out of sequence for no valid reason.

@SuperQ
Copy link
Contributor

SuperQ commented Jun 2, 2022

I wonder if we should have a method of acquiring a lock on the compactor while these kinds of operations are in progress.

For example, the tool could could make an RPC to the compactor, or write a top-of-bucket lockfile.

@bwplotka
Copy link
Member

bwplotka commented Jun 2, 2022

I think better to lock, would be to have a compaction planning algorithm that just deals with those "out of order blocks" reasonably. We have vertical compaction we could enable by default, but we need just more efficient planning and execution that just waste more time and resources. No need for lock IMO (:

@bwplotka
Copy link
Member

bwplotka commented Jun 2, 2022

Some entry point for this work: #4233

@mwasilew2
Copy link
Author

@bwplotka

but for your use case it might work (e.g if you don't have a compactor turned off on the target bucket)

There's no use case on my side, in our environments we don't run replication between buckets. I was simply looking for Thanos issues to work on in my spare time and looked through the "good first issues" list in the issue tracker ;)

But there is a reason why this code is not concurrent. I tried to explain this in flag help which we could use to warn users.

I suspected I might be missing something about the bigger picture here. Thanks for the detailed explanation!

There is a way to make it a bit safer - we can split to groups as mentioned here like we do in the compactor itself (unless already done here).

Some entry point for this work: #4233

Thank you so much for these pointers!

I wonder if having replication concurrency between compaction groups, but not within groups (basically what you suggested) would be sufficient here to address this problem. I'll need to give it some more thought, but any further feedback is welcome.

@stale
Copy link

stale bot commented Aug 12, 2022

Is this still relevant? If so, what is blocking it? Is there anything you can do to help move it forward?

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

@stale stale bot added the stale label Aug 12, 2022
@GiedriusS GiedriusS removed the stale label Aug 18, 2022
@stale
Copy link

stale bot commented Oct 30, 2022

Hello 👋 Looks like there was no activity on this amazing PR for the last 30 days.
Do you mind updating us on the status? Is there anything we can help with? If you plan to still work on it, just comment on this PR or push a commit. Thanks! 🤗
If there will be no activity in the next week, this issue will be closed (we can always reopen a PR if you get back to this!). Alternatively, use remind command if you wish to be reminded at some point in future.

@stale stale bot added the stale label Oct 30, 2022
@stale
Copy link

stale bot commented Nov 12, 2022

Closing for now as promised, let us know if you need this to be reopened! 🤗

@stale stale bot closed this Nov 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

tools bucket replicate: concurrent replication
4 participants