Skip to content

Commit

Permalink
Use pooling for writes to coordinator (#942)
Browse files Browse the repository at this point in the history
* Use pooling for writes

Duplicate iterator
  • Loading branch information
nikunjgit authored Sep 28, 2018
1 parent 8446dca commit 2ec02cb
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 69 deletions.
5 changes: 4 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ type Configuration struct {

// DecompressWorkerPoolSize is the size of the worker pool given to each
// fetch request.
DecompressWorkerPoolSize int `yaml:"workerPoolSize"`
DecompressWorkerPoolSize int `yaml:"decompressWorkerPoolSize"`

// WriteWorkerPoolSize is the size of the worker pool write requests.
WriteWorkerPoolSize int `yaml:"writeWorkerPoolSize"`
}

// LocalConfiguration is the local embedded configuration if running
Expand Down
15 changes: 13 additions & 2 deletions src/query/pools/query_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func BuildWorkerPools(
cfg config.Configuration,
logger *zap.Logger,
scope tally.Scope,
) (pool.ObjectPool, instrument.Options) {
) (pool.ObjectPool, xsync.PooledWorkerPool, instrument.Options, error) {
workerPoolCount := cfg.DecompressWorkerPoolCount
if workerPoolCount == 0 {
workerPoolCount = defaultWorkerPoolCount
Expand All @@ -79,7 +79,18 @@ func BuildWorkerPools(
return workerPool
})

return objectPool, instrumentOptions
writePoolSize := cfg.WriteWorkerPoolSize
if writePoolSize == 0 {
writePoolSize = defaultWorkerPoolSize
}

writeWorkerPool, err := xsync.NewPooledWorkerPool(writePoolSize, xsync.NewPooledWorkerPoolOptions())
if err != nil {
return nil, nil, instrumentOptions, err
}

writeWorkerPool.Init()
return objectPool, writeWorkerPool, instrumentOptions, nil
}

type sessionPools struct {
Expand Down
17 changes: 12 additions & 5 deletions src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/pool"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -135,7 +136,10 @@ func Run(runOpts RunOptions) {
enabled bool
)

workerPool, instrumentOptions := pools.BuildWorkerPools(cfg, logger, scope)
workerPool, writeWorkerPool, instrumentOptions, err := pools.BuildWorkerPools(cfg, logger, scope)
if err != nil {
logger.Fatal("could not create worker pools", zap.Any("error", err))
}

// For grpc backend, we need to setup only the grpc client and a storage accompanying that client.
// For m3db backend, we need to make connections to the m3db cluster which generates a session and use the storage with the session.
Expand All @@ -157,6 +161,7 @@ func Run(runOpts RunOptions) {
cfg,
logger,
workerPool,
writeWorkerPool,
instrumentOptions,
)
if err != nil {
Expand Down Expand Up @@ -228,6 +233,7 @@ func newM3DBStorage(
cfg config.Configuration,
logger *zap.Logger,
workerPool pool.ObjectPool,
writeWorkerPool xsync.PooledWorkerPool,
instrumentOptions instrument.Options,
) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) {
var clusterClientCh <-chan clusterclient.Client
Expand Down Expand Up @@ -269,7 +275,7 @@ func newM3DBStorage(
return nil, nil, nil, nil, err
}

fanoutStorage, storageCleanup, err := newStorages(logger, clusters, cfg, poolWrapper, workerPool)
fanoutStorage, storageCleanup, err := newStorages(logger, clusters, cfg, poolWrapper, workerPool, writeWorkerPool)
if err != nil {
return nil, nil, nil, nil, errors.Wrap(err, "unable to set up storages")
}
Expand Down Expand Up @@ -400,8 +406,8 @@ func newDownsamplerAutoMappingRules(
func initClusters(
cfg config.Configuration,
dbClientCh <-chan client.Client,
logger *zap.Logger,
) (m3.Clusters, *pools.PoolWrapper, error) {
logger *zap.Logger,
) (m3.Clusters, *pools.PoolWrapper, error) {
var (
clusters m3.Clusters
poolWrapper *pools.PoolWrapper
Expand Down Expand Up @@ -462,10 +468,11 @@ func newStorages(
cfg config.Configuration,
poolWrapper *pools.PoolWrapper,
workerPool pool.ObjectPool,
writeWorkerPool xsync.PooledWorkerPool,
) (storage.Storage, cleanupFn, error) {
cleanup := func() error { return nil }

localStorage := m3.NewStorage(clusters, workerPool)
localStorage := m3.NewStorage(clusters, workerPool, writeWorkerPool)
stores := []storage.Storage{localStorage}
remoteEnabled := false
if cfg.RPC != nil && cfg.RPC.Enabled {
Expand Down
6 changes: 6 additions & 0 deletions src/query/storage/fanout/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ func (s *fanoutStorage) FetchTags(ctx context.Context, query *storage.FetchQuery
}

func (s *fanoutStorage) Write(ctx context.Context, query *storage.WriteQuery) error {
// TODO: Consider removing this lookup on every write by maintaining different read/write lists
stores := filterStores(s.stores, s.writeFilter, query)
// short circuit writes
if len(stores) == 1 {
return stores[0].Write(ctx, query)
}

requests := make([]execution.Request, len(stores))
for idx, store := range stores {
requests[idx] = newWriteRequest(store, query)
Expand Down
107 changes: 48 additions & 59 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,26 @@ import (
"github.com/m3db/m3/src/query/errors"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/util/execution"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/pool"
xtime "github.com/m3db/m3x/time"
xsync "github.com/m3db/m3x/sync"
)

var (
errNoLocalClustersFulfillsQuery = goerrors.New("no clusters can fulfill query")
)

type m3storage struct {
clusters Clusters
workerPool pool.ObjectPool
clusters Clusters
readWorkerPool pool.ObjectPool
writeWorkerPool xsync.PooledWorkerPool
}

// NewStorage creates a new local m3storage instance.
func NewStorage(clusters Clusters, workerPool pool.ObjectPool) Storage {
return &m3storage{clusters: clusters, workerPool: workerPool}
// TODO: Consider combining readWorkerPool and writeWorkerPool
func NewStorage(clusters Clusters, workerPool pool.ObjectPool, writeWorkerPool xsync.PooledWorkerPool) Storage {
return &m3storage{clusters: clusters, readWorkerPool: workerPool, writeWorkerPool: writeWorkerPool}
}

func (s *m3storage) Fetch(
Expand All @@ -64,7 +66,7 @@ func (s *m3storage) Fetch(
return nil, err
}

return storage.SeriesIteratorsToFetchResult(raw, s.workerPool)
return storage.SeriesIteratorsToFetchResult(raw, s.readWorkerPool)
}

func (s *m3storage) FetchBlocks(
Expand Down Expand Up @@ -259,20 +261,30 @@ func (s *m3storage) Write(
identID := ident.StringID(id)
// Set id to NoFinalize to avoid cloning it in write operations
identID.NoFinalize()
common := &writeRequestCommon{
store: s,
annotation: query.Annotation,
unit: query.Unit,
id: identID,
tagIterator: storage.TagsToIdentTagIterator(query.Tags),
attributes: query.Attributes,
}
tagIterator := storage.TagsToIdentTagIterator(query.Tags)

var (
wg sync.WaitGroup
multiErr syncMultiErrs
)

for _, datapoint := range query.Datapoints {
tagIter := tagIterator.Duplicate()
// capture var
datapoint := datapoint
wg.Add(1)
s.writeWorkerPool.Go(func() {
if err := s.writeSingle(ctx, query, datapoint, identID, tagIter); err != nil {
multiErr.add(err)
}

requests := make([]execution.Request, len(query.Datapoints))
for idx, datapoint := range query.Datapoints {
requests[idx] = newWriteRequest(common, datapoint.Timestamp, datapoint.Value)
tagIter.Close()
wg.Done()
})
}
return execution.ExecuteParallel(ctx, requests)

wg.Wait()
return multiErr.finalError()
}

func (s *m3storage) Type() storage.Type {
Expand All @@ -283,31 +295,35 @@ func (s *m3storage) Close() error {
return nil
}

func (w *writeRequest) Process(ctx context.Context) error {
common := w.writeRequestCommon
store := common.store
id := common.id

func (s *m3storage) writeSingle(
ctx context.Context,
query *storage.WriteQuery,
datapoint ts.Datapoint,
identID ident.ID,
iterator ident.TagIterator,
) error {
var (
namespace ClusterNamespace
err error
)
switch common.attributes.MetricsType {

attributes := query.Attributes
switch attributes.MetricsType {
case storage.UnaggregatedMetricsType:
namespace = store.clusters.UnaggregatedClusterNamespace()
namespace = s.clusters.UnaggregatedClusterNamespace()
case storage.AggregatedMetricsType:
attrs := RetentionResolution{
Retention: common.attributes.Retention,
Resolution: common.attributes.Resolution,
Retention: attributes.Retention,
Resolution: attributes.Resolution,
}
var exists bool
namespace, exists = store.clusters.AggregatedClusterNamespace(attrs)
namespace, exists = s.clusters.AggregatedClusterNamespace(attrs)
if !exists {
err = fmt.Errorf("no configured cluster namespace for: retention=%s, resolution=%s",
attrs.Retention.String(), attrs.Resolution.String())
}
default:
metricsType := common.attributes.MetricsType
metricsType := attributes.MetricsType
err = fmt.Errorf("invalid write request metrics type: %s (%d)",
metricsType.String(), uint(metricsType))
}
Expand All @@ -317,33 +333,6 @@ func (w *writeRequest) Process(ctx context.Context) error {

namespaceID := namespace.NamespaceID()
session := namespace.Session()
return session.WriteTagged(namespaceID, id, common.tagIterator,
w.timestamp, w.value, common.unit, common.annotation)
}

type writeRequestCommon struct {
store *m3storage
annotation []byte
unit xtime.Unit
id ident.ID
tagIterator ident.TagIterator
attributes storage.Attributes
}

type writeRequest struct {
writeRequestCommon *writeRequestCommon
timestamp time.Time
value float64
}

func newWriteRequest(
writeRequestCommon *writeRequestCommon,
timestamp time.Time,
value float64,
) execution.Request {
return &writeRequest{
writeRequestCommon: writeRequestCommon,
timestamp: timestamp,
value: value,
}
return session.WriteTagged(namespaceID, identID, iterator,
datapoint.Timestamp, datapoint.Value, query.Unit, query.Annotation)
}
6 changes: 5 additions & 1 deletion src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -78,7 +79,10 @@ func setup(
Resolution: time.Minute,
})
require.NoError(t, err)
storage := NewStorage(clusters, nil)
writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions())
require.NoError(t, err)
writePool.Init()
storage := NewStorage(clusters, nil, writePool)
return storage, testSessions{
unaggregated1MonthRetention: unaggregated1MonthRetention,
aggregated1MonthRetention1MinuteResolution: aggregated1MonthRetention1MinuteResolution,
Expand Down
6 changes: 5 additions & 1 deletion src/query/test/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/sync"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
Expand All @@ -54,6 +55,9 @@ func NewStorageAndSession(
Retention: TestRetention,
})
require.NoError(t, err)
storage := m3.NewStorage(clusters, nil)
writePool, err := sync.NewPooledWorkerPool(10, sync.NewPooledWorkerPoolOptions())
require.NoError(t, err)
writePool.Init()
storage := m3.NewStorage(clusters, nil, writePool)
return storage, session
}

0 comments on commit 2ec02cb

Please sign in to comment.