Skip to content

Commit

Permalink
Divided the operations for restoration from delta snapshot in smaller…
Browse files Browse the repository at this point in the history
… chunks to commit.
  • Loading branch information
abdasgupta committed Nov 6, 2020
1 parent 232a10b commit ab36fbf
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
30 changes: 18 additions & 12 deletions pkg/snapshot/restorer/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *Restorer) Restore(ro RestoreOptions) error {
defer client.Close()

r.logger.Infof("Applying delta snapshots...")
return r.applyDeltaSnapshots(client, ro)
return r.applyDeltaSnapshots(client, &ro)
}

// restoreFromBaseSnapshot restore the etcd data directory from base snapshot.
Expand Down Expand Up @@ -345,13 +345,13 @@ func startEmbeddedEtcd(logger *logrus.Entry, ro RestoreOptions) (*embed.Etcd, er
}

// applyDeltaSnapshots fetches the events from delta snapshots in parallel and applies them to the embedded etcd sequentially.
func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOptions) error {
func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro *RestoreOptions) error {
snapList := ro.DeltaSnapList
numMaxFetchers := ro.Config.MaxFetchers

firstDeltaSnap := snapList[0]

if err := r.applyFirstDeltaSnapshot(client, *firstDeltaSnap); err != nil {
if err := r.applyFirstDeltaSnapshot(client, *firstDeltaSnap, ro); err != nil {
return err
}
if err := verifySnapshotRevision(client, snapList[0]); err != nil {
Expand All @@ -375,7 +375,7 @@ func (r *Restorer) applyDeltaSnapshots(client *clientv3.Client, ro RestoreOption
wg sync.WaitGroup
)

go r.applySnaps(client, remainingSnaps, applierInfoCh, errCh, stopCh, &wg)
go r.applySnaps(client, remainingSnaps, ro, applierInfoCh, errCh, stopCh, &wg)

for f := 0; f < numFetchers; f++ {
go r.fetchSnaps(f, fetcherInfoCh, applierInfoCh, snapLocationsCh, errCh, stopCh, &wg)
Expand Down Expand Up @@ -459,7 +459,7 @@ func (r *Restorer) fetchSnaps(fetcherIndex int, fetcherInfoCh <-chan fetcherInfo
}

// applySnaps applies delta snapshot events to the embedded etcd sequentially, in the right order of snapshots, regardless of the order in which they were fetched.
func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.SnapList, applierInfoCh <-chan applierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) {
func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.SnapList, ro *RestoreOptions, applierInfoCh <-chan applierInfo, errCh chan<- error, stopCh <-chan bool, wg *sync.WaitGroup) {
defer wg.Done()
wg.Add(1)

Expand Down Expand Up @@ -509,7 +509,7 @@ func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.
return
}

if err := applyEventsAndVerify(client, events, remainingSnaps[currSnapIndex]); err != nil {
if err := applyEventsAndVerify(client, events, remainingSnaps[currSnapIndex], ro); err != nil {
errCh <- err
return
}
Expand All @@ -525,8 +525,8 @@ func (r *Restorer) applySnaps(client *clientv3.Client, remainingSnaps snapstore.
}

// applyEventsAndVerify applies events from one snapshot to the embedded etcd and verifies the correctness of the sequence of snapshot applied.
func applyEventsAndVerify(client *clientv3.Client, events []event, snap *snapstore.Snapshot) error {
if err := applyEventsToEtcd(client, events); err != nil {
func applyEventsAndVerify(client *clientv3.Client, events []event, snap *snapstore.Snapshot, ro *RestoreOptions) error {
if err := applyEventsToEtcd(client, events, ro); err != nil {
return fmt.Errorf("failed to apply events to etcd for delta snapshot %s : %v", snap.SnapName, err)
}

Expand All @@ -537,7 +537,7 @@ func applyEventsAndVerify(client *clientv3.Client, events []event, snap *snapsto
}

// applyFirstDeltaSnapshot applies the events from first delta snapshot to etcd.
func (r *Restorer) applyFirstDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot) error {
func (r *Restorer) applyFirstDeltaSnapshot(client *clientv3.Client, snap snapstore.Snapshot, ro *RestoreOptions) error {
r.logger.Infof("Applying first delta snapshot %s", path.Join(snap.SnapDir, snap.SnapName))
events, err := getEventsFromDeltaSnapshot(r.store, snap)
if err != nil {
Expand All @@ -564,7 +564,7 @@ func (r *Restorer) applyFirstDeltaSnapshot(client *clientv3.Client, snap snapsto
}
}

return applyEventsToEtcd(client, events[newRevisionIndex:])
return applyEventsToEtcd(client, events[newRevisionIndex:], ro)
}

// getEventsFromDeltaSnapshot returns the events from delta snapshot from snap store.
Expand Down Expand Up @@ -635,7 +635,7 @@ func persistDeltaSnapshot(data []byte) (string, error) {
}

// applyEventsToEtcd performs operations in events sequentially.
func applyEventsToEtcd(client *clientv3.Client, events []event) error {
func applyEventsToEtcd(client *clientv3.Client, events []event, ro *RestoreOptions) error {
var (
lastRev int64
ops = []clientv3.Op{}
Expand All @@ -662,7 +662,13 @@ func applyEventsToEtcd(client *clientv3.Client, events []event) error {
return fmt.Errorf("Unexpected event type")
}
}
_, err := client.Txn(ctx).Then(ops...).Commit()
fmt.Print("The size of operations is: ", len(ops))
var err error
for len(ops)/int(ro.Config.MaxTxnOps) > 0 {
_, err = client.Txn(ctx).Then(ops...).Commit()
ops = ops[ro.Config.MaxTxnOps:]
}
_, err = client.Txn(ctx).Then(ops...).Commit()
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/snapshot/restorer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
defaultMaxFetchers = 6
defaultMaxCallSendMsgSize = 10 * 1024 * 1024 //10Mib
defaultMaxRequestBytes = 10 * 1024 * 1024 //10Mib
defaultMaxTxnOps = 10 * 1024
defaultMaxTxnOps = 128
defaultEmbeddedEtcdQuotaBytes = 8 * 1024 * 1024 * 1024 //8Gib
)

Expand Down

0 comments on commit ab36fbf

Please sign in to comment.