diff --git a/src/query/api/v1/handler/namespace/ready_test.go b/src/query/api/v1/handler/namespace/ready_test.go index ed871bb6b3..e3c316d56a 100644 --- a/src/query/api/v1/handler/namespace/ready_test.go +++ b/src/query/api/v1/handler/namespace/ready_test.go @@ -235,7 +235,7 @@ func (t *testClusters) Close() error { panic("implement me") } -func (t *testClusters) UnaggregatedClusterNamespace() m3.ClusterNamespace { +func (t *testClusters) UnaggregatedClusterNamespace() (m3.ClusterNamespace, bool) { panic("implement me") } diff --git a/src/query/storage/m3/cluster.go b/src/query/storage/m3/cluster.go index 28d6ff3dc8..0309af584c 100644 --- a/src/query/storage/m3/cluster.go +++ b/src/query/storage/m3/cluster.go @@ -55,8 +55,8 @@ type Clusters interface { NonReadyClusterNamespaces() ClusterNamespaces // UnaggregatedClusterNamespace returns the valid unaggregated - // cluster namespace. - UnaggregatedClusterNamespace() ClusterNamespace + // cluster namespace. If the namespace is not yet initialized, returns false. + UnaggregatedClusterNamespace() (ClusterNamespace, bool) // AggregatedClusterNamespace returns an aggregated cluster namespace // at a specific retention and resolution. @@ -258,8 +258,8 @@ func (c *clusters) NonReadyClusterNamespaces() ClusterNamespaces { return nil } -func (c *clusters) UnaggregatedClusterNamespace() ClusterNamespace { - return c.unaggregatedNamespace +func (c *clusters) UnaggregatedClusterNamespace() (ClusterNamespace, bool) { + return c.unaggregatedNamespace, true } func (c *clusters) AggregatedClusterNamespace( diff --git a/src/query/storage/m3/cluster_resolver.go b/src/query/storage/m3/cluster_resolver.go index 4b4b22839e..09eecac9aa 100644 --- a/src/query/storage/m3/cluster_resolver.go +++ b/src/query/storage/m3/cluster_resolver.go @@ -89,8 +89,12 @@ func resolveClusterNamespacesForQuery( // First check if the unaggregated cluster can fully satisfy the query range. // If so, return it and shortcircuit, as unaggregated will necessarily have // every metric. - unaggregated := resolveUnaggregatedNamespaceForQuery(now, start, - clusters.UnaggregatedClusterNamespace(), opts) + ns, initialized := clusters.UnaggregatedClusterNamespace() + if !initialized { + return consolidators.NamespaceInvalid, nil, errUnaggregatedNamespaceUninitialized + } + + unaggregated := resolveUnaggregatedNamespaceForQuery(now, start, ns, opts) if unaggregated.satisfies == fullySatisfiesRange { return consolidators.NamespaceCoversAllQueryRange, ClusterNamespaces{unaggregated.clusterNamespace}, @@ -329,7 +333,13 @@ func resolveClusterNamespacesForQueryWithRestrictQueryOptions( switch restrict.MetricsType { case storagemetadata.UnaggregatedMetricsType: - return result(clusters.UnaggregatedClusterNamespace(), nil) + ns, ok := clusters.UnaggregatedClusterNamespace() + if !ok { + return result(nil, + fmt.Errorf("could not find unaggregated namespace for storage policy: %v", + restrict.StoragePolicy.String())) + } + return result(ns, nil) case storagemetadata.AggregatedMetricsType: ns, ok := clusters.AggregatedClusterNamespace(RetentionResolution{ Retention: restrict.StoragePolicy.Retention().Duration(), diff --git a/src/query/storage/m3/cluster_test.go b/src/query/storage/m3/cluster_test.go index b538ec0dc9..66480f89d7 100644 --- a/src/query/storage/m3/cluster_test.go +++ b/src/query/storage/m3/cluster_test.go @@ -107,7 +107,8 @@ func TestNewClustersFromConfig(t *testing.T) { require.NoError(t, err) // Resolve expected clusters and check attributes - unaggregatedNs := clusters.UnaggregatedClusterNamespace() + unaggregatedNs, initialized := clusters.UnaggregatedClusterNamespace() + assert.True(t, initialized) assert.Equal(t, "unaggregated", unaggregatedNs.NamespaceID().String()) assert.Equal(t, storagemetadata.Attributes{ MetricsType: storagemetadata.UnaggregatedMetricsType, @@ -209,7 +210,7 @@ func (n *noopCluster) NonReadyClusterNamespaces() ClusterNamespaces { panic("implement me") } -func (n *noopCluster) UnaggregatedClusterNamespace() ClusterNamespace { +func (n *noopCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) { panic("implement me") } diff --git a/src/query/storage/m3/dynamic_cluster.go b/src/query/storage/m3/dynamic_cluster.go index b68ccf18e1..9a0343c871 100644 --- a/src/query/storage/m3/dynamic_cluster.go +++ b/src/query/storage/m3/dynamic_cluster.go @@ -418,12 +418,12 @@ func (d *dynamicCluster) NonReadyClusterNamespaces() ClusterNamespaces { return nonReadyNamespaces } -func (d *dynamicCluster) UnaggregatedClusterNamespace() ClusterNamespace { +func (d *dynamicCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) { d.RLock() - unaggregatedNamespaces := d.unaggregatedNamespace + unaggregatedNamespace := d.unaggregatedNamespace d.RUnlock() - return unaggregatedNamespaces + return unaggregatedNamespace, (unaggregatedNamespace != nil) } func (d *dynamicCluster) AggregatedClusterNamespace(attrs RetentionResolution) (ClusterNamespace, bool) { diff --git a/src/query/storage/m3/dynamic_cluster_test.go b/src/query/storage/m3/dynamic_cluster_test.go index e5f8d9a584..1401d2e00b 100644 --- a/src/query/storage/m3/dynamic_cluster_test.go +++ b/src/query/storage/m3/dynamic_cluster_test.go @@ -75,6 +75,46 @@ func newNamespaceOptions() namespace.Options { return namespace.NewOptions().SetStagingState(state) } +func TestDynamicClustersUninitialized(t *testing.T) { + t.Parallel() + + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + mockSession := client.NewMockSession(ctrl) + + // setup dynamic cluster without any namespaces + mapCh := make(nsMapCh, 10) + nsInitializer := newFakeNsInitializer(t, ctrl, mapCh, false) + + cfg := DynamicClusterNamespaceConfiguration{ + session: mockSession, + nsInitializer: nsInitializer, + } + + opts := newTestOptions(cfg) + + clusters, err := NewDynamicClusters(opts) + require.NoError(t, err) + + //nolint:errcheck + defer clusters.Close() + + // Aggregated namespaces should not exist + _, ok := clusters.AggregatedClusterNamespace(RetentionResolution{ + Retention: 48 * time.Hour, + Resolution: 1 * time.Minute, + }) + require.False(t, ok) + + // Unaggregated namespaces should not be initialized + _, ok = clusters.UnaggregatedClusterNamespace() + require.False(t, ok) + + // Cluster namespaces should be empty + require.Len(t, clusters.ClusterNamespaces(), 0) +} + func TestDynamicClustersInitialization(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -424,7 +464,10 @@ func assertClusterNamespace(clusters Clusters, expectedID ident.ID, expectedOpts return false } } else { - ns = clusters.UnaggregatedClusterNamespace() + ns, ok = clusters.UnaggregatedClusterNamespace() + if !ok { + return false + } } return assert.ObjectsAreEqual(expectedID.String(), ns.NamespaceID().String()) && assert.ObjectsAreEqual(expectedOpts, ns.Options()) diff --git a/src/query/storage/m3/storage.go b/src/query/storage/m3/storage.go index d91edb3926..7ac8401732 100644 --- a/src/query/storage/m3/storage.go +++ b/src/query/storage/m3/storage.go @@ -54,7 +54,9 @@ const ( var ( errUnaggregatedAndAggregatedDisabled = goerrors.New("fetch options has both" + " aggregated and unaggregated namespace lookup disabled") - errNoNamespacesConfigured = goerrors.New("no namespaces configured") + errNoNamespacesConfigured = goerrors.New("no namespaces configured") + errUnaggregatedNamespaceUninitialized = goerrors.New( + "unaggregated namespace is not yet initialized") ) type m3storage struct { @@ -691,18 +693,21 @@ func (s *m3storage) writeSingle( var ( namespace ClusterNamespace err error + exists bool ) attributes := query.Attributes() switch attributes.MetricsType { case storagemetadata.UnaggregatedMetricsType: - namespace = s.clusters.UnaggregatedClusterNamespace() + namespace, exists = s.clusters.UnaggregatedClusterNamespace() + if !exists { + err = errUnaggregatedNamespaceUninitialized + } case storagemetadata.AggregatedMetricsType: attrs := RetentionResolution{ Retention: attributes.Retention, Resolution: attributes.Resolution, } - var exists bool namespace, exists = s.clusters.AggregatedClusterNamespace(attrs) if !exists { err = fmt.Errorf("no configured cluster namespace for: retention=%s,"+ diff --git a/src/query/storage/m3/storage_test.go b/src/query/storage/m3/storage_test.go index 4d2a9938eb..5c2d13a325 100644 --- a/src/query/storage/m3/storage_test.go +++ b/src/query/storage/m3/storage_test.go @@ -238,6 +238,26 @@ func TestLocalWriteAggregatedNoClusterNamespaceError(t *testing.T) { fmt.Sprintf("unexpected error string: %v", err.Error())) } +func TestLocalWriteUnaggregatedNamespaceUninitializedError(t *testing.T) { + t.Parallel() + + ctrl := xtest.NewController(t) + defer ctrl.Finish() + // We setup an empty dynamic cluster, which will by default + // have an uninitialized unaggregated namespace. + store := newTestStorage(t, &dynamicCluster{}) + + opts := newWriteQuery(t).Options() + + writeQuery, err := storage.NewWriteQuery(opts) + require.NoError(t, err) + + err = store.Write(context.TODO(), writeQuery) + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), "unaggregated namespace is not yet initialized"), + fmt.Sprintf("unexpected error string: %v", err.Error())) +} + func TestLocalWriteAggregatedInvalidMetricsTypeError(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish()