Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Consider uninitialized case for UnaggregatedClusterNamespace with dynamic clusters #2957

Merged
merged 5 commits into from
Nov 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/query/storage/m3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Clusters interface {
NonReadyClusterNamespaces() ClusterNamespaces

// UnaggregatedClusterNamespace returns the valid unaggregated
// cluster namespace.
UnaggregatedClusterNamespace() ClusterNamespace
// cluster namespace. If the namespace is not yet initialized, returns false.
UnaggregatedClusterNamespace() (ClusterNamespace, bool)

// AggregatedClusterNamespace returns an aggregated cluster namespace
// at a specific retention and resolution.
Expand Down Expand Up @@ -258,8 +258,8 @@ func (c *clusters) NonReadyClusterNamespaces() ClusterNamespaces {
return nil
}

func (c *clusters) UnaggregatedClusterNamespace() ClusterNamespace {
return c.unaggregatedNamespace
func (c *clusters) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
return c.unaggregatedNamespace, true
}

func (c *clusters) AggregatedClusterNamespace(
Expand Down
16 changes: 13 additions & 3 deletions src/query/storage/m3/cluster_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ func resolveClusterNamespacesForQuery(
// First check if the unaggregated cluster can fully satisfy the query range.
// If so, return it and shortcircuit, as unaggregated will necessarily have
// every metric.
unaggregated := resolveUnaggregatedNamespaceForQuery(now, start,
clusters.UnaggregatedClusterNamespace(), opts)
ns, ok := clusters.UnaggregatedClusterNamespace()
if !ok {
wesleyk marked this conversation as resolved.
Show resolved Hide resolved
return consolidators.NamespaceInvalid, nil, errUnaggregatedNamespaceUninitialized
}

unaggregated := resolveUnaggregatedNamespaceForQuery(now, start, ns, opts)
if unaggregated.satisfies == fullySatisfiesRange {
return consolidators.NamespaceCoversAllQueryRange,
ClusterNamespaces{unaggregated.clusterNamespace},
Expand Down Expand Up @@ -329,7 +333,13 @@ func resolveClusterNamespacesForQueryWithRestrictQueryOptions(

switch restrict.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
return result(clusters.UnaggregatedClusterNamespace(), nil)
ns, ok := clusters.UnaggregatedClusterNamespace()
if !ok {
return result(nil,
fmt.Errorf("could not find unaggregated namespace for storage policy: %v",
restrict.StoragePolicy.String()))
}
return result(ns, nil)
case storagemetadata.AggregatedMetricsType:
ns, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: restrict.StoragePolicy.Retention().Duration(),
Expand Down
5 changes: 3 additions & 2 deletions src/query/storage/m3/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestNewClustersFromConfig(t *testing.T) {
require.NoError(t, err)

// Resolve expected clusters and check attributes
unaggregatedNs := clusters.UnaggregatedClusterNamespace()
unaggregatedNs, exists := clusters.UnaggregatedClusterNamespace()
assert.True(t, exists)
wesleyk marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, "unaggregated", unaggregatedNs.NamespaceID().String())
assert.Equal(t, storagemetadata.Attributes{
MetricsType: storagemetadata.UnaggregatedMetricsType,
Expand Down Expand Up @@ -209,7 +210,7 @@ func (n *noopCluster) NonReadyClusterNamespaces() ClusterNamespaces {
panic("implement me")
}

func (n *noopCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (n *noopCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions src/query/storage/m3/dynamic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,12 @@ func (d *dynamicCluster) NonReadyClusterNamespaces() ClusterNamespaces {
return nonReadyNamespaces
}

func (d *dynamicCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (d *dynamicCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
d.RLock()
unaggregatedNamespaces := d.unaggregatedNamespace
unaggregatedNamespace := d.unaggregatedNamespace
d.RUnlock()

return unaggregatedNamespaces
return unaggregatedNamespace, (unaggregatedNamespace != nil)
}

func (d *dynamicCluster) AggregatedClusterNamespace(attrs RetentionResolution) (ClusterNamespace, bool) {
Expand Down
41 changes: 39 additions & 2 deletions src/query/storage/m3/dynamic_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,43 @@ func newNamespaceOptions() namespace.Options {
return namespace.NewOptions().SetStagingState(state)
}

func TestDynamicClustersUninitialized(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

mockSession := client.NewMockSession(ctrl)

// setup dynamic cluster without any namespaces
mapCh := make(nsMapCh, 10)
nsInitializer := newFakeNsInitializer(t, ctrl, mapCh, false)

cfg := DynamicClusterNamespaceConfiguration{
session: mockSession,
nsInitializer: nsInitializer,
}

opts := newTestOptions(cfg)

clusters, err := NewDynamicClusters(opts)
require.NoError(t, err)

defer clusters.Close()

// Aggregated namespaces should not exist
_, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: 48 * time.Hour,
Resolution: 1 * time.Minute,
})
require.False(t, ok)

// Unaggregated namespaces hould not exist
wesleyk marked this conversation as resolved.
Show resolved Hide resolved
_, ok = clusters.UnaggregatedClusterNamespace()
require.False(t, ok)

// Cluster namespaces should be empty
require.Len(t, clusters.ClusterNamespaces(), 0)
}

func TestDynamicClustersInitialization(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -423,8 +460,8 @@ func assertClusterNamespace(clusters Clusters, expectedID ident.ID, expectedOpts
}); !ok {
return false
}
} else {
ns = clusters.UnaggregatedClusterNamespace()
} else if ns, ok = clusters.UnaggregatedClusterNamespace(); !ok {
Copy link
Collaborator

@arnikola arnikola Nov 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this as a separate if? Otherwise ns almost seems like it's getting set as a side effect

Suggested change
} else if ns, ok = clusters.UnaggregatedClusterNamespace(); !ok {
}
ns, initialized = clusters.UnaggregatedClusterNamespace()
if !initialized {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be in an else, will do

return false
}
return assert.ObjectsAreEqual(expectedID.String(), ns.NamespaceID().String()) &&
assert.ObjectsAreEqual(expectedOpts, ns.Options())
Expand Down
10 changes: 7 additions & 3 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const (
var (
errUnaggregatedAndAggregatedDisabled = goerrors.New("fetch options has both" +
" aggregated and unaggregated namespace lookup disabled")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errUnaggregatedNamespaceUninitialized = goerrors.New("unaggregated namespace is not yet initialized")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; unused

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, will use below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also used in cluster_resolver.go)

)

type m3storage struct {
Expand Down Expand Up @@ -691,18 +692,21 @@ func (s *m3storage) writeSingle(
var (
namespace ClusterNamespace
err error
exists bool
)

attributes := query.Attributes()
switch attributes.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
namespace = s.clusters.UnaggregatedClusterNamespace()
namespace, exists = s.clusters.UnaggregatedClusterNamespace()
if !exists {
err = fmt.Errorf("unaggregated namespace not yet initialized")
}
case storagemetadata.AggregatedMetricsType:
attrs := RetentionResolution{
Retention: attributes.Retention,
Resolution: attributes.Resolution,
}
var exists bool
namespace, exists = s.clusters.AggregatedClusterNamespace(attrs)
if !exists {
err = fmt.Errorf("no configured cluster namespace for: retention=%s,"+
Expand Down
17 changes: 17 additions & 0 deletions src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,23 @@ func TestLocalWriteAggregatedNoClusterNamespaceError(t *testing.T) {
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteUnaggregatedNamespaceUninitializedError(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
// We setup an empty dynamic cluster, which will by default have an uninitialized unaggregated namespace.
store := newTestStorage(t, &dynamicCluster{})

opts := newWriteQuery(t).Options()

writeQuery, err := storage.NewWriteQuery(opts)
require.NoError(t, err)

err = store.Write(context.TODO(), writeQuery)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "unaggregated namespace not yet initialized"),
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteAggregatedInvalidMetricsTypeError(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down