Skip to content

Commit

Permalink
refactor(admin): set dirty field only when the MR is mutated
Browse files Browse the repository at this point in the history
This PR makes the mrmanager avoid unnecessary fetching cluster metadata by checking the dirty field
only when the MR is mutated successfully.
  • Loading branch information
ijsong committed Jul 17, 2023
1 parent 417a3e8 commit 37fd27f
Showing 1 changed file with 53 additions and 27 deletions.
80 changes: 53 additions & 27 deletions internal/admin/mrmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ func (mrm *mrManager) clusterMetadata(ctx context.Context) (*varlogpb.MetadataDe
return meta, err
}

func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) error {
func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *varlogpb.StorageNodeDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -182,18 +184,21 @@ func (mrm *mrManager) RegisterStorageNode(ctx context.Context, storageNodeMeta *
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.RegisterStorageNode(ctx, storageNodeMeta); err != nil {
err = cli.RegisterStorageNode(ctx, storageNodeMeta)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) error {
func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID types.StorageNodeID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -202,18 +207,21 @@ func (mrm *mrManager) UnregisterStorageNode(ctx context.Context, storageNodeID t
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UnregisterStorageNode(ctx, storageNodeID); err != nil {
err = cli.UnregisterStorageNode(ctx, storageNodeID)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) error {
func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -222,18 +230,21 @@ func (mrm *mrManager) RegisterTopic(ctx context.Context, topicID types.TopicID)
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.RegisterTopic(ctx, topicID); err != nil {
err = cli.RegisterTopic(ctx, topicID)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) error {
func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -242,18 +253,21 @@ func (mrm *mrManager) UnregisterTopic(ctx context.Context, topicID types.TopicID
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UnregisterTopic(ctx, topicID); err != nil {
err = cli.UnregisterTopic(ctx, topicID)
if err != nil {
_ = cli.Close()
return err
}

return err
}

func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error {
func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -262,17 +276,20 @@ func (mrm *mrManager) RegisterLogStream(ctx context.Context, logStreamDesc *varl
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.RegisterLogStream(ctx, logStreamDesc); err != nil {
err = cli.RegisterLogStream(ctx, logStreamDesc)
if err != nil {
_ = cli.Close()
return err
}
return nil
}

func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) error {
func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types.LogStreamID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -281,17 +298,20 @@ func (mrm *mrManager) UnregisterLogStream(ctx context.Context, logStreamID types
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UnregisterLogStream(ctx, logStreamID); err != nil {
err = cli.UnregisterLogStream(ctx, logStreamID)
if err != nil {
_ = cli.Close()
return err
}
return err
}

func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) error {
func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlogpb.LogStreamDescriptor) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -300,7 +320,8 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.UpdateLogStream(ctx, logStreamDesc); err != nil {
err = cli.UpdateLogStream(ctx, logStreamDesc)
if err != nil {
_ = cli.Close()
return err
}
Expand All @@ -311,7 +332,9 @@ func (mrm *mrManager) UpdateLogStream(ctx context.Context, logStreamDesc *varlog
func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (lastCommittedGLSN types.GLSN, err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -327,10 +350,12 @@ func (mrm *mrManager) Seal(ctx context.Context, logStreamID types.LogStreamID) (
return lastCommittedGLSN, err
}

func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) error {
func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID) (err error) {
mrm.mu.Lock()
defer func() {
mrm.dirty = true
if err == nil {
mrm.dirty = true
}
mrm.mu.Unlock()
}()

Expand All @@ -339,16 +364,17 @@ func (mrm *mrManager) Unseal(ctx context.Context, logStreamID types.LogStreamID)
return errors.WithMessage(err, "mrmanager: not accessible")
}

if err := cli.Unseal(ctx, logStreamID); err != nil {
err = cli.Unseal(ctx, logStreamID)
if err != nil {
_ = cli.Close()
return err
}
return err
}

func (mrm *mrManager) GetClusterInfo(ctx context.Context) (*mrpb.ClusterInfo, error) {
mrm.mu.Lock()
defer mrm.mu.Unlock()
mrm.mu.RLock()
defer mrm.mu.RUnlock()

cli, err := mrm.mc()
if err != nil {
Expand Down

0 comments on commit 37fd27f

Please sign in to comment.