From 1c6bcca5c5583b20370dfe01f67debcc4d150f93 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Fri, 16 Jun 2023 19:47:42 +0900 Subject: [PATCH] perf(admin): use singleflight to handle Admin's RPCs This PR makes the admin server handle RPCs by using singleflight. It helps the admin to reduce redundant computation, especially read operations. I am only altering read operations. However, it is possible to refactor the admin server to eliminate the use of a single big lock. --- internal/admin/admin.go | 108 ++++++++++++++++++++++++++++++++-- internal/admin/sfgkey/keys.go | 44 ++++++++++++++ 2 files changed, 148 insertions(+), 4 deletions(-) create mode 100644 internal/admin/sfgkey/keys.go diff --git a/internal/admin/admin.go b/internal/admin/admin.go index 2588ee739..fa8251ba9 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -15,11 +15,13 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/kakao/varlog/internal/admin/sfgkey" "github.com/kakao/varlog/internal/admin/snwatcher" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/netutil" @@ -43,6 +45,7 @@ type Admin struct { // single large lock mu sync.RWMutex + sfg singleflight.Group muLogStreamStatus [numLogStreamMutex]sync.Mutex snw *snwatcher.StorageNodeWatcher @@ -172,6 +175,7 @@ func (adm *Admin) Close() (err error) { func (adm *Admin) Metadata(ctx context.Context) (*varlogpb.MetadataDescriptor, error) { adm.mu.RLock() defer adm.mu.RUnlock() + return adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) } @@ -179,6 +183,16 @@ func (adm *Admin) getStorageNode(ctx context.Context, snid types.StorageNodeID) adm.mu.RLock() defer adm.mu.RUnlock() + snm, err, _ := adm.sfg.Do(sfgkey.GetStorageNodeKey(snid), func() (interface{}, error) { + return adm.getStorageNodeInternal(ctx, snid) + }) + if err != nil { + return nil, err + } + return snm.(*admpb.StorageNodeMetadata), nil +} + +func (adm *Admin) getStorageNodeInternal(ctx context.Context, snid types.StorageNodeID) (*admpb.StorageNodeMetadata, error) { md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) if err != nil { return nil, status.Errorf(codes.Unavailable, "get storage node: %s", err.Error()) @@ -225,6 +239,16 @@ func (adm *Admin) listStorageNodes(ctx context.Context) ([]admpb.StorageNodeMeta adm.mu.RLock() defer adm.mu.RUnlock() + snms, err, _ := adm.sfg.Do(sfgkey.ListStorageNodesKey(), func() (interface{}, error) { + return adm.listStorageNodesInternal(ctx) + }) + if err != nil { + return nil, err + } + return snms.([]admpb.StorageNodeMetadata), nil +} + +func (adm *Admin) listStorageNodesInternal(ctx context.Context) ([]admpb.StorageNodeMetadata, error) { md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) if err != nil { return nil, status.Errorf(codes.Unavailable, "list storage nodes: %s", err.Error()) @@ -358,6 +382,19 @@ func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageN } func (adm *Admin) getTopic(ctx context.Context, tpid types.TopicID) (*varlogpb.TopicDescriptor, error) { + adm.mu.RLock() + defer adm.mu.RUnlock() + + td, err, _ := adm.sfg.Do(sfgkey.GetTopicKey(tpid), func() (interface{}, error) { + return adm.getTopicInternal(ctx, tpid) + }) + if err != nil { + return nil, err + } + return td.(*varlogpb.TopicDescriptor), nil +} + +func (adm *Admin) getTopicInternal(ctx context.Context, tpid types.TopicID) (*varlogpb.TopicDescriptor, error) { md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) if err != nil { return nil, status.Errorf(codes.Unavailable, "get topic: %s", err.Error()) @@ -370,9 +407,19 @@ func (adm *Admin) getTopic(ctx context.Context, tpid types.TopicID) (*varlogpb.T } func (adm *Admin) listTopics(ctx context.Context) ([]varlogpb.TopicDescriptor, error) { - adm.mu.Lock() - defer adm.mu.Unlock() + adm.mu.RLock() + defer adm.mu.RUnlock() + tds, err, _ := adm.sfg.Do(sfgkey.ListTopicsKey(), func() (interface{}, error) { + return adm.listTopicsInternal(ctx) + }) + if err != nil { + return nil, err + } + return tds.([]varlogpb.TopicDescriptor), nil +} + +func (adm *Admin) listTopicsInternal(ctx context.Context) ([]varlogpb.TopicDescriptor, error) { md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) if err != nil { return nil, status.Errorf(codes.Unavailable, "list topics: %s", err.Error()) @@ -426,6 +473,19 @@ func (adm *Admin) unregisterTopic(ctx context.Context, tpid types.TopicID) error } func (adm *Admin) getLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (*varlogpb.LogStreamDescriptor, error) { + adm.mu.RLock() + defer adm.mu.RUnlock() + + lsd, err, _ := adm.sfg.Do(sfgkey.GetLogStreamKey(tpid, lsid), func() (interface{}, error) { + return adm.getLogStreamInternal(ctx, tpid, lsid) + }) + if err != nil { + return nil, err + } + return lsd.(*varlogpb.LogStreamDescriptor), nil +} + +func (adm *Admin) getLogStreamInternal(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (*varlogpb.LogStreamDescriptor, error) { md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) if err != nil { return nil, status.Errorf(codes.Unavailable, "get log stream: %s", err.Error()) @@ -448,6 +508,19 @@ func (adm *Admin) getLogStream(ctx context.Context, tpid types.TopicID, lsid typ } func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]varlogpb.LogStreamDescriptor, error) { + adm.mu.RLock() + defer adm.mu.RUnlock() + + lsds, err, _ := adm.sfg.Do(sfgkey.ListLogStreamsKey(tpid), func() (interface{}, error) { + return adm.listLogStreamsInternal(ctx, tpid) + }) + if err != nil { + return nil, err + } + return lsds.([]varlogpb.LogStreamDescriptor), nil +} + +func (adm *Admin) listLogStreamsInternal(ctx context.Context, tpid types.TopicID) ([]varlogpb.LogStreamDescriptor, error) { md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) if err != nil { return nil, status.Errorf(codes.Unavailable, "list log streams: %s", err.Error()) @@ -471,9 +544,34 @@ func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]var } func (adm *Admin) describeTopic(ctx context.Context, tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) { - adm.mu.Lock() - defer adm.mu.Unlock() + adm.mu.RLock() + defer adm.mu.RUnlock() + type result struct { + td varlogpb.TopicDescriptor + lsds []varlogpb.LogStreamDescriptor + } + + iface, err, _ := adm.sfg.Do(sfgkey.DescribeTopicKey(tpid), func() (interface{}, error) { + td, lsds, err := adm.describeTopicInternal(ctx, tpid) + if err != nil { + return nil, err + } + return &result{ + td: td, + lsds: lsds, + }, nil + }) + if err != nil { + return + } + + res := iface.(*result) + return res.td, res.lsds, nil + +} + +func (adm *Admin) describeTopicInternal(ctx context.Context, tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) { md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx) if err != nil || len(md.Topics) == 0 { return @@ -910,6 +1008,7 @@ func (adm *Admin) syncInternal(ctx context.Context, tpid types.TopicID, lsid typ func (adm *Admin) trim(ctx context.Context, tpid types.TopicID, lastGLSN types.GLSN) ([]admpb.TrimResult, error) { adm.mu.Lock() defer adm.mu.Unlock() + return adm.snmgr.Trim(ctx, tpid, lastGLSN) } @@ -952,6 +1051,7 @@ func (adm *Admin) listMetadataRepositoryNodes(ctx context.Context) ([]varlogpb.M func (adm *Admin) mrInfos(ctx context.Context) (*mrpb.ClusterInfo, error) { adm.mu.RLock() defer adm.mu.RUnlock() + return adm.mrmgr.GetClusterInfo(ctx) } diff --git a/internal/admin/sfgkey/keys.go b/internal/admin/sfgkey/keys.go new file mode 100644 index 000000000..b4077edbd --- /dev/null +++ b/internal/admin/sfgkey/keys.go @@ -0,0 +1,44 @@ +package sfgkey + +import ( + "github.com/kakao/varlog/pkg/types" +) + +const ( + delimiter = "_" + keyGetStorageNode = "getsn" + keyListStorageNodes = "listsns" + keyGetTopic = "gettp" + keyListTopics = "listtps" + keyGetLogStream = "getls" + keyListLogStreams = "listlss" + keyDescribeTopic = "desctp" +) + +func GetStorageNodeKey(snid types.StorageNodeID) string { + return keyGetStorageNode + delimiter + snid.String() +} + +func ListStorageNodesKey() string { + return keyListStorageNodes +} + +func GetTopicKey(tpid types.TopicID) string { + return keyGetTopic + delimiter + tpid.String() +} + +func ListTopicsKey() string { + return keyListTopics +} + +func GetLogStreamKey(tpid types.TopicID, lsid types.LogStreamID) string { + return keyGetLogStream + delimiter + tpid.String() + delimiter + lsid.String() +} + +func ListLogStreamsKey(tpid types.TopicID) string { + return keyListLogStreams + delimiter + tpid.String() +} + +func DescribeTopicKey(tpid types.TopicID) string { + return keyDescribeTopic + delimiter + tpid.String() +}