-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimise the restoration process #57
Optimise the restoration process #57
Conversation
As discussed last week, can we include some tests please? ;-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!! I have some suggestions. Please check it out.
7126058
to
c92de4c
Compare
Code changes are finished and ready for review. Will push the unit tests shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR Shreyas. I have suggested some changes PTAL. I haven't traced the channel handling much. I'll go through another iteration of review once you address the comments. Also, please rebase the branch to master and work.
f8a7699
to
8c4de61
Compare
Will push unit tests shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests look good too! If you can address the couple of questions, we can go to LGTM :-)
errCh <- fmt.Errorf("snap index mismatch for delta snapshot %d; expected snap index to be atleast %d", fetchedSnapIndex, nextSnapIndexToApply) | ||
return | ||
} | ||
if fetchedSnapIndex == nextSnapIndexToApply { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if fetchedSnapIndex > nextSnapIndexToApply
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will just continue to the next iteration. The logic is: when fetchedSnapIndex
== nextSnapIndexToApply
, it applies that snap as well as any existing subsequent snaps till an unfetched snap is encountered, and it moves to the next iteration. This way, all snaps are applied in the right order.
DeltaSnapList: deltaSnapList, | ||
} | ||
err = rstr.Restore(restoreOptions) | ||
Expect(err).Should(HaveOccurred()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to validate the error message also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will definitely look into it.
LGTM |
e72ce5c
to
2d1d7b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Thanks for writing test cases as well. I have some concern regarding applier logic. Will you please address it?
pkg/snapshot/restorer/restorer.go
Outdated
r.logger.Infof("Cleanup complete") | ||
} | ||
|
||
// fetch fetches delta snapshots as events and persists them onto disk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/fetch/fetchSnaps/
pkg/snapshot/restorer/restorer.go
Outdated
|
||
// fetch fetches delta snapshots as events and persists them onto disk. | ||
func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo, applierInfoCh chan<- applierInfo, snapLocationsCh chan<- string, errCh chan<- error, stopCh chan bool, wg *sync.WaitGroup) { | ||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor:
defer wg.Done()
pkg/snapshot/restorer/restorer.go
Outdated
} | ||
} | ||
|
||
// apply applies delta snapshot events to the embedded etcd sequentially, in the right order of snapshots, regardless of the order in which they were fetched. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/apply/applySnaps/
pkg/snapshot/restorer/restorer.go
Outdated
|
||
// apply applies delta snapshot events to the embedded etcd sequentially, in the right order of snapshots, regardless of the order in which they were fetched. | ||
func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.SnapList, applierInfoCh <-chan applierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) { | ||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above.
pkg/snapshot/restorer/restorer.go
Outdated
|
||
wg.Add(1) | ||
|
||
eventsList := make([][]event, len(remainingSnaps)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need eventsList
? It is kind of invalidating purpose of persisting fetched delta snapshot locally, isn't it? This way at one stage we might have data from all delta snapshots in memory, which will lead to unnecessary memory consumptions and OOM issues. Please read the persisted snapshots in memory in order of index and apply it. If for current index, snapshot is not yet fetched, wait until it gets fetched.
What this PR does / why we need it:
This PR optimises the restoration process by parallelizing the fetching of delta snapshots.
Which issue(s) this PR fixes:
Fixes #41
Special notes for your reviewer:
Release note: