Skip to content

Commit

Permalink
perf(admin): use singleflight to handle Admin's RPCs
Browse files Browse the repository at this point in the history
This PR makes the admin server handle RPCs by using singleflight. It helps the admin to reduce
redundant computation, especially read operations.

I am only altering read operations. However, it is possible to refactor the admin server to
eliminate the use of a single big lock.
  • Loading branch information
ijsong committed Jul 28, 2023
1 parent ebca873 commit c231888
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 4 deletions.
108 changes: 104 additions & 4 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/kakao/varlog/internal/admin/sfgkey"
"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/pkg/rpc/interceptors/logging"
"github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc"
Expand All @@ -43,6 +45,7 @@ type Admin struct {

// single large lock
mu sync.RWMutex
sfg singleflight.Group
muLogStreamStatus [numLogStreamMutex]sync.Mutex

snw *snwatcher.StorageNodeWatcher
Expand Down Expand Up @@ -168,13 +171,24 @@ func (adm *Admin) Close() (err error) {
func (adm *Admin) Metadata(ctx context.Context) (*varlogpb.MetadataDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

return adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
}

func (adm *Admin) getStorageNode(ctx context.Context, snid types.StorageNodeID) (*admpb.StorageNodeMetadata, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

snm, err, _ := adm.sfg.Do(sfgkey.GetStorageNodeKey(snid), func() (interface{}, error) {
return adm.getStorageNodeInternal(ctx, snid)
})
if err != nil {
return nil, err
}
return snm.(*admpb.StorageNodeMetadata), nil
}

func (adm *Admin) getStorageNodeInternal(ctx context.Context, snid types.StorageNodeID) (*admpb.StorageNodeMetadata, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get storage node: %s", err.Error())
Expand Down Expand Up @@ -221,6 +235,16 @@ func (adm *Admin) listStorageNodes(ctx context.Context) ([]admpb.StorageNodeMeta
adm.mu.RLock()
defer adm.mu.RUnlock()

snms, err, _ := adm.sfg.Do(sfgkey.ListStorageNodesKey(), func() (interface{}, error) {
return adm.listStorageNodesInternal(ctx)
})
if err != nil {
return nil, err
}
return snms.([]admpb.StorageNodeMetadata), nil
}

func (adm *Admin) listStorageNodesInternal(ctx context.Context) ([]admpb.StorageNodeMetadata, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list storage nodes: %s", err.Error())
Expand Down Expand Up @@ -354,6 +378,19 @@ func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageN
}

func (adm *Admin) getTopic(ctx context.Context, tpid types.TopicID) (*varlogpb.TopicDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

td, err, _ := adm.sfg.Do(sfgkey.GetTopicKey(tpid), func() (interface{}, error) {
return adm.getTopicInternal(ctx, tpid)
})
if err != nil {
return nil, err
}
return td.(*varlogpb.TopicDescriptor), nil
}

func (adm *Admin) getTopicInternal(ctx context.Context, tpid types.TopicID) (*varlogpb.TopicDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get topic: %s", err.Error())
Expand All @@ -366,9 +403,19 @@ func (adm *Admin) getTopic(ctx context.Context, tpid types.TopicID) (*varlogpb.T
}

func (adm *Admin) listTopics(ctx context.Context) ([]varlogpb.TopicDescriptor, error) {
adm.mu.Lock()
defer adm.mu.Unlock()
adm.mu.RLock()
defer adm.mu.RUnlock()

tds, err, _ := adm.sfg.Do(sfgkey.ListTopicsKey(), func() (interface{}, error) {
return adm.listTopicsInternal(ctx)
})
if err != nil {
return nil, err
}
return tds.([]varlogpb.TopicDescriptor), nil
}

func (adm *Admin) listTopicsInternal(ctx context.Context) ([]varlogpb.TopicDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list topics: %s", err.Error())
Expand Down Expand Up @@ -422,6 +469,19 @@ func (adm *Admin) unregisterTopic(ctx context.Context, tpid types.TopicID) error
}

func (adm *Admin) getLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (*varlogpb.LogStreamDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

lsd, err, _ := adm.sfg.Do(sfgkey.GetLogStreamKey(tpid, lsid), func() (interface{}, error) {
return adm.getLogStreamInternal(ctx, tpid, lsid)
})
if err != nil {
return nil, err
}
return lsd.(*varlogpb.LogStreamDescriptor), nil
}

func (adm *Admin) getLogStreamInternal(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (*varlogpb.LogStreamDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get log stream: %s", err.Error())
Expand All @@ -444,6 +504,19 @@ func (adm *Admin) getLogStream(ctx context.Context, tpid types.TopicID, lsid typ
}

func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]varlogpb.LogStreamDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

lsds, err, _ := adm.sfg.Do(sfgkey.ListLogStreamsKey(tpid), func() (interface{}, error) {
return adm.listLogStreamsInternal(ctx, tpid)
})
if err != nil {
return nil, err
}
return lsds.([]varlogpb.LogStreamDescriptor), nil
}

func (adm *Admin) listLogStreamsInternal(ctx context.Context, tpid types.TopicID) ([]varlogpb.LogStreamDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list log streams: %s", err.Error())
Expand All @@ -467,9 +540,34 @@ func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]var
}

func (adm *Admin) describeTopic(ctx context.Context, tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) {
adm.mu.Lock()
defer adm.mu.Unlock()
adm.mu.RLock()
defer adm.mu.RUnlock()

type result struct {
td varlogpb.TopicDescriptor
lsds []varlogpb.LogStreamDescriptor
}

iface, err, _ := adm.sfg.Do(sfgkey.DescribeTopicKey(tpid), func() (interface{}, error) {
td, lsds, err := adm.describeTopicInternal(ctx, tpid)
if err != nil {
return nil, err
}
return &result{
td: td,
lsds: lsds,
}, nil
})
if err != nil {
return
}

res := iface.(*result)
return res.td, res.lsds, nil

}

func (adm *Admin) describeTopicInternal(ctx context.Context, tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil || len(md.Topics) == 0 {
return
Expand Down Expand Up @@ -906,6 +1004,7 @@ func (adm *Admin) syncInternal(ctx context.Context, tpid types.TopicID, lsid typ
func (adm *Admin) trim(ctx context.Context, tpid types.TopicID, lastGLSN types.GLSN) ([]admpb.TrimResult, error) {
adm.mu.Lock()
defer adm.mu.Unlock()

return adm.snmgr.Trim(ctx, tpid, lastGLSN)
}

Expand Down Expand Up @@ -948,6 +1047,7 @@ func (adm *Admin) listMetadataRepositoryNodes(ctx context.Context) ([]varlogpb.M
func (adm *Admin) mrInfos(ctx context.Context) (*mrpb.ClusterInfo, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

return adm.mrmgr.GetClusterInfo(ctx)
}

Expand Down
44 changes: 44 additions & 0 deletions internal/admin/sfgkey/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sfgkey

import (
"github.com/kakao/varlog/pkg/types"
)

const (
delimiter = "_"
keyGetStorageNode = "getsn"
keyListStorageNodes = "listsns"
keyGetTopic = "gettp"
keyListTopics = "listtps"
keyGetLogStream = "getls"
keyListLogStreams = "listlss"
keyDescribeTopic = "desctp"
)

func GetStorageNodeKey(snid types.StorageNodeID) string {
return keyGetStorageNode + delimiter + snid.String()
}

func ListStorageNodesKey() string {
return keyListStorageNodes
}

func GetTopicKey(tpid types.TopicID) string {
return keyGetTopic + delimiter + tpid.String()
}

func ListTopicsKey() string {
return keyListTopics
}

func GetLogStreamKey(tpid types.TopicID, lsid types.LogStreamID) string {
return keyGetLogStream + delimiter + tpid.String() + delimiter + lsid.String()
}

func ListLogStreamsKey(tpid types.TopicID) string {
return keyListLogStreams + delimiter + tpid.String()
}

func DescribeTopicKey(tpid types.TopicID) string {
return keyDescribeTopic + delimiter + tpid.String()
}

0 comments on commit c231888

Please sign in to comment.