Skip to content

Commit

Permalink
[coordinator] Consider uninitialized case for UnaggregatedClusterName…
Browse files Browse the repository at this point in the history
…space with dynamic clusters (#2957)

* [coordinator] Consider uninitialized case for UnaggregatedClusterNamespace with dynamic clusters

* Fixup ready_test

* Apply suggestions from code review

Co-authored-by: arnikola <[email protected]>

* PR feedback

* Revert go mod changes

Co-authored-by: arnikola <[email protected]>
  • Loading branch information
wesleyk and arnikola authored Nov 29, 2020
1 parent 084fd20 commit 4bfe377
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/namespace/ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (t *testClusters) Close() error {
panic("implement me")
}

func (t *testClusters) UnaggregatedClusterNamespace() m3.ClusterNamespace {
func (t *testClusters) UnaggregatedClusterNamespace() (m3.ClusterNamespace, bool) {
panic("implement me")
}

Expand Down
8 changes: 4 additions & 4 deletions src/query/storage/m3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Clusters interface {
NonReadyClusterNamespaces() ClusterNamespaces

// UnaggregatedClusterNamespace returns the valid unaggregated
// cluster namespace.
UnaggregatedClusterNamespace() ClusterNamespace
// cluster namespace. If the namespace is not yet initialized, returns false.
UnaggregatedClusterNamespace() (ClusterNamespace, bool)

// AggregatedClusterNamespace returns an aggregated cluster namespace
// at a specific retention and resolution.
Expand Down Expand Up @@ -258,8 +258,8 @@ func (c *clusters) NonReadyClusterNamespaces() ClusterNamespaces {
return nil
}

func (c *clusters) UnaggregatedClusterNamespace() ClusterNamespace {
return c.unaggregatedNamespace
func (c *clusters) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
return c.unaggregatedNamespace, true
}

func (c *clusters) AggregatedClusterNamespace(
Expand Down
16 changes: 13 additions & 3 deletions src/query/storage/m3/cluster_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ func resolveClusterNamespacesForQuery(
// First check if the unaggregated cluster can fully satisfy the query range.
// If so, return it and shortcircuit, as unaggregated will necessarily have
// every metric.
unaggregated := resolveUnaggregatedNamespaceForQuery(now, start,
clusters.UnaggregatedClusterNamespace(), opts)
ns, initialized := clusters.UnaggregatedClusterNamespace()
if !initialized {
return consolidators.NamespaceInvalid, nil, errUnaggregatedNamespaceUninitialized
}

unaggregated := resolveUnaggregatedNamespaceForQuery(now, start, ns, opts)
if unaggregated.satisfies == fullySatisfiesRange {
return consolidators.NamespaceCoversAllQueryRange,
ClusterNamespaces{unaggregated.clusterNamespace},
Expand Down Expand Up @@ -329,7 +333,13 @@ func resolveClusterNamespacesForQueryWithRestrictQueryOptions(

switch restrict.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
return result(clusters.UnaggregatedClusterNamespace(), nil)
ns, ok := clusters.UnaggregatedClusterNamespace()
if !ok {
return result(nil,
fmt.Errorf("could not find unaggregated namespace for storage policy: %v",
restrict.StoragePolicy.String()))
}
return result(ns, nil)
case storagemetadata.AggregatedMetricsType:
ns, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: restrict.StoragePolicy.Retention().Duration(),
Expand Down
5 changes: 3 additions & 2 deletions src/query/storage/m3/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestNewClustersFromConfig(t *testing.T) {
require.NoError(t, err)

// Resolve expected clusters and check attributes
unaggregatedNs := clusters.UnaggregatedClusterNamespace()
unaggregatedNs, initialized := clusters.UnaggregatedClusterNamespace()
assert.True(t, initialized)
assert.Equal(t, "unaggregated", unaggregatedNs.NamespaceID().String())
assert.Equal(t, storagemetadata.Attributes{
MetricsType: storagemetadata.UnaggregatedMetricsType,
Expand Down Expand Up @@ -209,7 +210,7 @@ func (n *noopCluster) NonReadyClusterNamespaces() ClusterNamespaces {
panic("implement me")
}

func (n *noopCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (n *noopCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions src/query/storage/m3/dynamic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,12 @@ func (d *dynamicCluster) NonReadyClusterNamespaces() ClusterNamespaces {
return nonReadyNamespaces
}

func (d *dynamicCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (d *dynamicCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
d.RLock()
unaggregatedNamespaces := d.unaggregatedNamespace
unaggregatedNamespace := d.unaggregatedNamespace
d.RUnlock()

return unaggregatedNamespaces
return unaggregatedNamespace, (unaggregatedNamespace != nil)
}

func (d *dynamicCluster) AggregatedClusterNamespace(attrs RetentionResolution) (ClusterNamespace, bool) {
Expand Down
45 changes: 44 additions & 1 deletion src/query/storage/m3/dynamic_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,46 @@ func newNamespaceOptions() namespace.Options {
return namespace.NewOptions().SetStagingState(state)
}

func TestDynamicClustersUninitialized(t *testing.T) {
t.Parallel()

ctrl := xtest.NewController(t)
defer ctrl.Finish()

mockSession := client.NewMockSession(ctrl)

// setup dynamic cluster without any namespaces
mapCh := make(nsMapCh, 10)
nsInitializer := newFakeNsInitializer(t, ctrl, mapCh, false)

cfg := DynamicClusterNamespaceConfiguration{
session: mockSession,
nsInitializer: nsInitializer,
}

opts := newTestOptions(cfg)

clusters, err := NewDynamicClusters(opts)
require.NoError(t, err)

//nolint:errcheck
defer clusters.Close()

// Aggregated namespaces should not exist
_, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: 48 * time.Hour,
Resolution: 1 * time.Minute,
})
require.False(t, ok)

// Unaggregated namespaces should not be initialized
_, ok = clusters.UnaggregatedClusterNamespace()
require.False(t, ok)

// Cluster namespaces should be empty
require.Len(t, clusters.ClusterNamespaces(), 0)
}

func TestDynamicClustersInitialization(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -424,7 +464,10 @@ func assertClusterNamespace(clusters Clusters, expectedID ident.ID, expectedOpts
return false
}
} else {
ns = clusters.UnaggregatedClusterNamespace()
ns, ok = clusters.UnaggregatedClusterNamespace()
if !ok {
return false
}
}
return assert.ObjectsAreEqual(expectedID.String(), ns.NamespaceID().String()) &&
assert.ObjectsAreEqual(expectedOpts, ns.Options())
Expand Down
11 changes: 8 additions & 3 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ const (
var (
errUnaggregatedAndAggregatedDisabled = goerrors.New("fetch options has both" +
" aggregated and unaggregated namespace lookup disabled")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errUnaggregatedNamespaceUninitialized = goerrors.New(
"unaggregated namespace is not yet initialized")
)

type m3storage struct {
Expand Down Expand Up @@ -691,18 +693,21 @@ func (s *m3storage) writeSingle(
var (
namespace ClusterNamespace
err error
exists bool
)

attributes := query.Attributes()
switch attributes.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
namespace = s.clusters.UnaggregatedClusterNamespace()
namespace, exists = s.clusters.UnaggregatedClusterNamespace()
if !exists {
err = errUnaggregatedNamespaceUninitialized
}
case storagemetadata.AggregatedMetricsType:
attrs := RetentionResolution{
Retention: attributes.Retention,
Resolution: attributes.Resolution,
}
var exists bool
namespace, exists = s.clusters.AggregatedClusterNamespace(attrs)
if !exists {
err = fmt.Errorf("no configured cluster namespace for: retention=%s,"+
Expand Down
20 changes: 20 additions & 0 deletions src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ func TestLocalWriteAggregatedNoClusterNamespaceError(t *testing.T) {
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteUnaggregatedNamespaceUninitializedError(t *testing.T) {
t.Parallel()

ctrl := xtest.NewController(t)
defer ctrl.Finish()
// We setup an empty dynamic cluster, which will by default
// have an uninitialized unaggregated namespace.
store := newTestStorage(t, &dynamicCluster{})

opts := newWriteQuery(t).Options()

writeQuery, err := storage.NewWriteQuery(opts)
require.NoError(t, err)

err = store.Write(context.TODO(), writeQuery)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "unaggregated namespace is not yet initialized"),
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteAggregatedInvalidMetricsTypeError(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down

0 comments on commit 4bfe377

Please sign in to comment.