Skip to content

Commit

Permalink
refactor(admin): set dirty field only when the MR is mutated (#481)
Browse files Browse the repository at this point in the history
### What this PR does

This PR makes the mrmanager avoid unnecessary fetching cluster metadata by checking the dirty field only when the MR is mutated successfully.
  • Loading branch information
ijsong authored Jul 28, 2023
2 parents 53a8f19 + 083739b commit e5c843b
Showing 1 changed file with 53 additions and 27 deletions.
80 changes: 53 additions & 27 deletions internal/admin/mrmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ func (mrm *mrManager) clusterMetadata(ctx context.Context) (*varlogpb.MetadataDe
return meta, err
}

func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) error {
func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -182,18 +184,21 @@ func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.RegisterStorageNode(ctx, storageNodeMeta); err != nil {
err = cli.RegisterStorageNode(ctx, storageNodeMeta)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) error {
func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -202,18 +207,21 @@ func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID t
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UnregisterStorageNode(ctx, storageNodeID); err != nil {
err = cli.UnregisterStorageNode(ctx, storageNodeID)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) error {
func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -222,18 +230,21 @@ func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID)
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.RegisterTopic(ctx, topicID); err != nil {
err = cli.RegisterTopic(ctx, topicID)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) error {
func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -242,18 +253,21 @@ func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UnregisterTopic(ctx, topicID); err != nil {
err = cli.UnregisterTopic(ctx, topicID)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error {
func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -262,17 +276,20 @@ func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varl
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.RegisterLogStream(ctx, logStreamDesc); err != nil {
err = cli.RegisterLogStream(ctx, logStreamDesc)
if err != nil {
_ = cli.Close()
return err
}
return nil
}

func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) error {
func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -281,17 +298,20 @@ func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UnregisterLogStream(ctx, logStreamID); err != nil {
err = cli.UnregisterLogStream(ctx, logStreamID)
if err != nil {
_ = cli.Close()
return err
}
return err
}

func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error {
func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -300,7 +320,8 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UpdateLogStream(ctx, logStreamDesc); err != nil {
err = cli.UpdateLogStream(ctx, logStreamDesc)
if err != nil {
_ = cli.Close()
return err
}
Expand All @@ -311,7 +332,9 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog
func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (lastCommittedGLSN types.GLSN, err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -327,10 +350,12 @@ func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (
return lastCommittedGLSN, err
}

func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) error {
func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -339,16 +364,17 @@ func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID)
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.Unseal(ctx, logStreamID); err != nil {
err = cli.Unseal(ctx, logStreamID)
if err != nil {
_ = cli.Close()
return err
}
return err
}

func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, error) {
mrm.mu.Lock()
defer mrm.mu.Unlock()
mrm.mu.RLock()
defer mrm.mu.RUnlock()

cli, err := mrm.mc()
if err != nil {
Expand Down

0 comments on commit e5c843b

Please sign in to comment.