Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add save index operation log #2048

Merged
merged 5 commits into from
Jun 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions pkg/agent/core/ngt/service/ngt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,10 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
}
n.smu.Lock()
defer n.smu.Unlock()
log.Infof("save index operation started, the number of create index execution = %d", nocie)

eg.Go(safety.RecoverFunc(func() (err error) {
log.Debugf("start save operation for kvsdb, the number of kvsdb = %d", n.kvs.Len())
if n.kvs.Len() > 0 && path != "" {
m := make(map[string]uint32, n.Len())
mt := make(map[string]int64, n.Len())
Expand All @@ -1134,6 +1137,7 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
fs.ModePerm,
)
if err != nil {
log.Warnf("failed to create or open kvsdb file, err: %v", err)
return err
}
defer func() {
Expand All @@ -1147,10 +1151,12 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
gob.Register(map[string]uint32{})
err = gob.NewEncoder(f).Encode(&m)
if err != nil {
log.Warnf("failed to encode kvsdb data, err: %v", err)
return err
}
err = f.Sync()
if err != nil {
log.Warnf("failed to flush all kvsdb data to storage, err: %v", err)
return err
}
m = make(map[string]uint32)
Expand All @@ -1162,6 +1168,7 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
fs.ModePerm,
)
if err != nil {
log.Warnf("failed to create or open kvs timestamp file, err: %v", err)
return err
}
defer func() {
Expand All @@ -1175,21 +1182,25 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
gob.Register(map[string]int64{})
err = gob.NewEncoder(ft).Encode(&mt)
if err != nil {
log.Warnf("failed to encode kvs timestamp data, err: %v", err)
return err
}
err = ft.Sync()
if err != nil {
log.Warnf("failed to flush all kvsdb timestamp data to storage, err: %v", err)
return err
}
mt = make(map[string]int64)
}
log.Debug("save operation for kvsdb finished")
return nil
}))

eg.Go(safety.RecoverFunc(func() (err error) {
n.fmu.Lock()
fl := len(n.fmap)
n.fmu.Unlock()
log.Debugf("start save operation for invalid kvsdb, the number of invalid kvsdb = %d", fl)
if fl > 0 && path != "" {
var f *os.File
f, err = file.Open(
Expand All @@ -1198,6 +1209,7 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
fs.ModePerm,
)
if err != nil {
log.Warnf("failed to create or open invalid kvsdb file, err: %v", err)
return err
}
defer func() {
Expand All @@ -1213,25 +1225,35 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
err = gob.NewEncoder(f).Encode(&n.fmap)
n.fmu.Unlock()
if err != nil {
log.Warnf("failed to encode invalid kvsdb data, err: %v", err)
return err
}
err = f.Sync()
if err != nil {
log.Warnf("failed to flush all invalid kvsdb data to storage, err: %v", err)
return err
}
}
log.Debug("start save operation for invalid kvsdb finished")
return nil
}))

eg.Go(safety.RecoverFunc(func() error {
return n.core.SaveIndexWithPath(path)
log.Debug("start save operation for index")
if err := n.core.SaveIndexWithPath(path); err != nil {
log.Warnf("failed to save index with path, err: %v\tpath: %s", err, path)
return err
}
log.Debug("save operation for index finished")
return nil
}))

err = eg.Wait()
if err != nil {
return err
}

log.Debug("start save operation for metadata file")
err = metadata.Store(
file.Join(path, metadata.AgentMetadataFileName),
&metadata.Metadata{
Expand All @@ -1242,10 +1264,17 @@ func (n *ngt) saveIndex(ctx context.Context) (err error) {
},
)
if err != nil {
log.Warnf("failed to save metadata file, err: %v", err)
return err
}
log.Debug("save operation for metadata file finished")

return n.moveAndSwitchSavedData(ctx)
if err := n.moveAndSwitchSavedData(ctx); err != nil {
log.Warnf("failed to move and switch saved data for copy on write, err: %v", err)
return err
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
}
log.Info("save index operation finished")
return nil
}

func (n *ngt) CreateAndSaveIndex(ctx context.Context, poolSize uint32) (err error) {
Expand All @@ -1272,6 +1301,7 @@ func (n *ngt) moveAndSwitchSavedData(ctx context.Context) (err error) {
}
n.cowmu.Lock()
defer n.cowmu.Unlock()
log.Debug("start move and switch saved data operation for copy on write")
err = file.MoveDir(ctx, n.path, n.oldPath)
if err != nil {
log.Warnf("failed to backup backup data from %s to %s error: %v", n.path, n.oldPath, err)
Expand Down