Skip to content

Commit

Permalink
feat(admin): speed up fetching cluster metadata (#480)
Browse files Browse the repository at this point in the history
### What this PR does

This PR improves `internal/admin/mrmanager.(*mrManager).ClusterMetadata` performance:

- Fail-fast: If the given context is canceled, it returns a context error without acquiring the mutex.
- Fast-path: If the cluster metadata does not need to update, it returns the existing thing without the exclusive lock.
- Slow-path: It is similar to the previous implementation but shares the updated cluster metadata using a singleflight.

This PR makes the `internal/admin/mrmanager.(*mrManager).ClusterMetadata` more concurrent and removes unnecessary RPCs. It also avoids RPC calls within exclusive mutex.
  • Loading branch information
ijsong authored Jul 28, 2023
2 parents fc4ed81 + 3e46f62 commit 53a8f19
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions internal/admin/mrmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/sync/singleflight"

"github.com/kakao/varlog/pkg/mrc"
"github.com/kakao/varlog/pkg/mrc/mrconnector"
Expand Down Expand Up @@ -88,6 +89,7 @@ type mrManager struct {
mu sync.RWMutex
connector mrconnector.Connector

sfg singleflight.Group
dirty bool
updated time.Time
meta *varlogpb.MetadataDescriptor
Expand Down Expand Up @@ -401,19 +403,37 @@ func (mrm *mrManager) RemovePeer(ctx context.Context, nodeID types.NodeID) error
}

func (mrm *mrManager) ClusterMetadata(ctx context.Context) (*varlogpb.MetadataDescriptor, error) {
mrm.mu.Lock()
defer mrm.mu.Unlock()
// fail-fast
if err := ctx.Err(); err != nil {
return nil, ctx.Err()
}

// fast path
mrm.mu.RLock()
if !mrm.dirty && time.Since(mrm.updated) <= ReloadInterval {
meta := mrm.meta
mrm.mu.RUnlock()
return meta, nil
}
mrm.mu.RUnlock()

if mrm.dirty || time.Since(mrm.updated) > ReloadInterval {
// slow path
md, err, _ := mrm.sfg.Do("cluster_metadata", func() (interface{}, error) {
meta, err := mrm.clusterMetadata(ctx)
if err != nil {
return nil, fmt.Errorf("cluster metadata: %w", err)
}
mrm.mu.Lock()
mrm.meta = meta
mrm.dirty = false
mrm.updated = time.Now()
mrm.mu.Unlock()
return meta, nil
})
if err != nil {
return nil, err
}
return mrm.meta, nil
return md.(*varlogpb.MetadataDescriptor), nil
}

func (mrm *mrManager) StorageNode(ctx context.Context, storageNodeID types.StorageNodeID) (*varlogpb.StorageNodeDescriptor, error) {
Expand Down

0 comments on commit 53a8f19

Please sign in to comment.