Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup store/streaming/constructor.go (backport #14044) #14235

Merged
merged 3 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions store/streaming/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/spf13/cast"
)
Expand All @@ -26,7 +27,6 @@ type ServiceType int
const (
Unknown ServiceType = iota
File
// add more in the future
)

// Streaming option keys
Expand All @@ -46,6 +46,7 @@ func ServiceTypeFromString(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File

default:
return Unknown
}
Expand All @@ -56,25 +57,30 @@ func (sst ServiceType) String() string {
switch sst {
case File:
return "file"

default:
return "unknown"
}
}

// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to
// streaming.ServiceConstructors types.
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: NewFileStreamingService,
}

// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name
// NewServiceConstructor returns the streaming.ServiceConstructor corresponding
// to the provided name.
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := ServiceTypeFromString(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}

if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil {
return constructor, nil
}

return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

Expand All @@ -97,7 +103,7 @@ func NewFileStreamingService(
fileDir = path.Join(homePath, fileDir)
}

// try to create output directory if not exists.
// try to create output directory if it does not exists
if _, err := os.Stat(fileDir); os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
return nil, err
Expand All @@ -119,14 +125,19 @@ func LoadStreamingServices(
) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)

// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get("store.streamers"))
streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))

for _, streamerName := range streamers {
var exposeStoreKeys []types.StoreKey

// get the store keys allowed to be exposed for this streaming service
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))
var exposeStoreKeys []types.StoreKey
if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys

// if list contains '*', expose all store keys
if sdk.SliceContains(exposeKeyStrs, "*") {
exposeStoreKeys = make([]types.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
Expand All @@ -139,45 +150,46 @@ func LoadStreamingServices(
}
}
}
if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything

if len(exposeStoreKeys) == 0 {
continue
}
// get the constructor for this streamer name

constructor, err := NewServiceConstructor(streamerName)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}

return nil, nil, err
}

// Generate the streaming service using the constructor, appOptions, and the
// StoreKeys we want to expose.
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}

return nil, nil, err
}

// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)

// kick off the background streaming service loop
streamingService.Stream(wg)

// add to the list of active streamers
activeStreamers = append(activeStreamers, streamingService)
}
// if there are no active streamers, activeStreamers is empty (len == 0) and the waitGroup is not waiting on anything
return activeStreamers, wg, nil
}

func exposeAll(list []string) bool {
for _, ele := range list {
if ele == "*" {
return true
}
}
return false
// If there are no active streamers, activeStreamers is empty (len == 0) and
// the waitGroup is not waiting on anything.
return activeStreamers, wg, nil
}
12 changes: 12 additions & 0 deletions types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,15 @@ func CopyBytes(bz []byte) (ret []byte) {
copy(ret, bz)
return ret
}

// SliceContains implements a generic function for checking if a slice contains
// a certain value.
func SliceContains[T comparable](elements []T, v T) bool {
for _, s := range elements {
if v == s {
return true
}
}

return false
}