Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Consider uninitialized case for UnaggregatedClusterNamespace with dynamic clusters #2957

Merged
merged 5 commits into from
Nov 29, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/CAFxX/gcnotifier v0.0.0-20190112062741-224a280d589d // indirect
github.com/DataDog/datadog-go v3.7.1+incompatible // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/MichaelTJones/pcg v0.0.0-20180122055547-df440c6ed7ed
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
Expand All @@ -23,7 +22,6 @@ require (
github.com/containerd/continuity v0.0.0-20200413184840-d3ef23f19fbb // indirect
github.com/davecgh/go-spew v1.1.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/fatih/color v1.10.0 // indirect
github.com/fortytw2/leaktest v1.2.1-0.20180901000122-b433bbd6d743
github.com/fossas/fossa-cli v1.0.30
github.com/garethr/kubeval v0.0.0-20180821130434-c44f5193dc94
Expand All @@ -36,6 +34,7 @@ require (
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2
github.com/golang/snappy v0.0.1
github.com/golangci/golangci-lint v1.33.0 // indirect
github.com/google/go-cmp v0.5.2
github.com/google/go-jsonnet v0.16.0
github.com/google/uuid v1.1.2-0.20190416172445-c2e93f3ae59f // indirect
Expand All @@ -47,7 +46,6 @@ require (
github.com/influxdata/influxdb v1.7.7
github.com/jhump/protoreflect v1.6.1
github.com/json-iterator/go v1.1.9
github.com/kr/text v0.2.0 // indirect
github.com/leanovate/gopter v0.2.8
github.com/lib/pq v1.6.0 // indirect
github.com/lightstep/lightstep-tracer-go v0.18.1
Expand All @@ -66,9 +64,6 @@ require (
github.com/m3dbx/vellum v0.0.0-20201119082309-5b47f7a70f69
github.com/mauricelam/genny v0.0.0-20180903214747-eb2c5232c885
github.com/mjibson/esc v0.1.0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/ginkgo v1.14.1 // indirect
github.com/onsi/gomega v1.10.2 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v0.1.1 // indirect
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
Expand All @@ -93,7 +88,6 @@ require (
github.com/satori/go.uuid v1.2.0
github.com/sergi/go-diff v1.1.0
github.com/shirou/gopsutil v2.20.5+incompatible // indirect
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/spf13/cast v1.3.1-0.20190531151931-f31dc0aaab5a // indirect
github.com/spf13/cobra v1.1.1
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down Expand Up @@ -124,7 +118,6 @@ require (
golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634
golang.org/x/tools v0.0.0-20201013201025-64a9e34f3752
google.golang.org/grpc v1.29.1
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/go-ini/ini.v1 v1.57.0 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.7.0
Expand All @@ -134,7 +127,6 @@ require (
gopkg.in/vmihailenco/msgpack.v2 v2.8.3
gopkg.in/yaml.v2 v2.3.0
gotest.tools v2.2.0+incompatible
honnef.co/go/tools v0.0.1-2020.1.6 // indirect
)

// branch 0.9.3-pool-read-binary-3
Expand Down
191 changes: 191 additions & 0 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/query/api/v1/handler/namespace/ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (t *testClusters) Close() error {
panic("implement me")
}

func (t *testClusters) UnaggregatedClusterNamespace() m3.ClusterNamespace {
func (t *testClusters) UnaggregatedClusterNamespace() (m3.ClusterNamespace, bool) {
panic("implement me")
}

Expand Down
8 changes: 4 additions & 4 deletions src/query/storage/m3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Clusters interface {
NonReadyClusterNamespaces() ClusterNamespaces

// UnaggregatedClusterNamespace returns the valid unaggregated
// cluster namespace.
UnaggregatedClusterNamespace() ClusterNamespace
// cluster namespace. If the namespace is not yet initialized, returns false.
UnaggregatedClusterNamespace() (ClusterNamespace, bool)

// AggregatedClusterNamespace returns an aggregated cluster namespace
// at a specific retention and resolution.
Expand Down Expand Up @@ -258,8 +258,8 @@ func (c *clusters) NonReadyClusterNamespaces() ClusterNamespaces {
return nil
}

func (c *clusters) UnaggregatedClusterNamespace() ClusterNamespace {
return c.unaggregatedNamespace
func (c *clusters) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
return c.unaggregatedNamespace, true
}

func (c *clusters) AggregatedClusterNamespace(
Expand Down
16 changes: 13 additions & 3 deletions src/query/storage/m3/cluster_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ func resolveClusterNamespacesForQuery(
// First check if the unaggregated cluster can fully satisfy the query range.
// If so, return it and shortcircuit, as unaggregated will necessarily have
// every metric.
unaggregated := resolveUnaggregatedNamespaceForQuery(now, start,
clusters.UnaggregatedClusterNamespace(), opts)
ns, ok := clusters.UnaggregatedClusterNamespace()
if !ok {
wesleyk marked this conversation as resolved.
Show resolved Hide resolved
return consolidators.NamespaceInvalid, nil, errUnaggregatedNamespaceUninitialized
}

unaggregated := resolveUnaggregatedNamespaceForQuery(now, start, ns, opts)
if unaggregated.satisfies == fullySatisfiesRange {
return consolidators.NamespaceCoversAllQueryRange,
ClusterNamespaces{unaggregated.clusterNamespace},
Expand Down Expand Up @@ -329,7 +333,13 @@ func resolveClusterNamespacesForQueryWithRestrictQueryOptions(

switch restrict.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
return result(clusters.UnaggregatedClusterNamespace(), nil)
ns, ok := clusters.UnaggregatedClusterNamespace()
if !ok {
return result(nil,
fmt.Errorf("could not find unaggregated namespace for storage policy: %v",
restrict.StoragePolicy.String()))
}
return result(ns, nil)
case storagemetadata.AggregatedMetricsType:
ns, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: restrict.StoragePolicy.Retention().Duration(),
Expand Down
5 changes: 3 additions & 2 deletions src/query/storage/m3/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestNewClustersFromConfig(t *testing.T) {
require.NoError(t, err)

// Resolve expected clusters and check attributes
unaggregatedNs := clusters.UnaggregatedClusterNamespace()
unaggregatedNs, exists := clusters.UnaggregatedClusterNamespace()
assert.True(t, exists)
wesleyk marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, "unaggregated", unaggregatedNs.NamespaceID().String())
assert.Equal(t, storagemetadata.Attributes{
MetricsType: storagemetadata.UnaggregatedMetricsType,
Expand Down Expand Up @@ -209,7 +210,7 @@ func (n *noopCluster) NonReadyClusterNamespaces() ClusterNamespaces {
panic("implement me")
}

func (n *noopCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (n *noopCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions src/query/storage/m3/dynamic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,12 @@ func (d *dynamicCluster) NonReadyClusterNamespaces() ClusterNamespaces {
return nonReadyNamespaces
}

func (d *dynamicCluster) UnaggregatedClusterNamespace() ClusterNamespace {
func (d *dynamicCluster) UnaggregatedClusterNamespace() (ClusterNamespace, bool) {
d.RLock()
unaggregatedNamespaces := d.unaggregatedNamespace
unaggregatedNamespace := d.unaggregatedNamespace
d.RUnlock()

return unaggregatedNamespaces
return unaggregatedNamespace, (unaggregatedNamespace != nil)
}

func (d *dynamicCluster) AggregatedClusterNamespace(attrs RetentionResolution) (ClusterNamespace, bool) {
Expand Down
41 changes: 39 additions & 2 deletions src/query/storage/m3/dynamic_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,43 @@ func newNamespaceOptions() namespace.Options {
return namespace.NewOptions().SetStagingState(state)
}

func TestDynamicClustersUninitialized(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

mockSession := client.NewMockSession(ctrl)

// setup dynamic cluster without any namespaces
mapCh := make(nsMapCh, 10)
nsInitializer := newFakeNsInitializer(t, ctrl, mapCh, false)

cfg := DynamicClusterNamespaceConfiguration{
session: mockSession,
nsInitializer: nsInitializer,
}

opts := newTestOptions(cfg)

clusters, err := NewDynamicClusters(opts)
require.NoError(t, err)

defer clusters.Close()

// Aggregated namespaces should not exist
_, ok := clusters.AggregatedClusterNamespace(RetentionResolution{
Retention: 48 * time.Hour,
Resolution: 1 * time.Minute,
})
require.False(t, ok)

// Unaggregated namespaces hould not exist
wesleyk marked this conversation as resolved.
Show resolved Hide resolved
_, ok = clusters.UnaggregatedClusterNamespace()
require.False(t, ok)

// Cluster namespaces should be empty
require.Len(t, clusters.ClusterNamespaces(), 0)
}

func TestDynamicClustersInitialization(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -423,8 +460,8 @@ func assertClusterNamespace(clusters Clusters, expectedID ident.ID, expectedOpts
}); !ok {
return false
}
} else {
ns = clusters.UnaggregatedClusterNamespace()
} else if ns, ok = clusters.UnaggregatedClusterNamespace(); !ok {
Copy link
Collaborator

@arnikola arnikola Nov 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this as a separate if? Otherwise ns almost seems like it's getting set as a side effect

Suggested change
} else if ns, ok = clusters.UnaggregatedClusterNamespace(); !ok {
}
ns, initialized = clusters.UnaggregatedClusterNamespace()
if !initialized {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be in an else, will do

return false
}
return assert.ObjectsAreEqual(expectedID.String(), ns.NamespaceID().String()) &&
assert.ObjectsAreEqual(expectedOpts, ns.Options())
Expand Down
10 changes: 7 additions & 3 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const (
var (
errUnaggregatedAndAggregatedDisabled = goerrors.New("fetch options has both" +
" aggregated and unaggregated namespace lookup disabled")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errNoNamespacesConfigured = goerrors.New("no namespaces configured")
errUnaggregatedNamespaceUninitialized = goerrors.New("unaggregated namespace is not yet initialized")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; unused

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, will use below

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also used in cluster_resolver.go)

)

type m3storage struct {
Expand Down Expand Up @@ -691,18 +692,21 @@ func (s *m3storage) writeSingle(
var (
namespace ClusterNamespace
err error
exists bool
)

attributes := query.Attributes()
switch attributes.MetricsType {
case storagemetadata.UnaggregatedMetricsType:
namespace = s.clusters.UnaggregatedClusterNamespace()
namespace, exists = s.clusters.UnaggregatedClusterNamespace()
if !exists {
err = fmt.Errorf("unaggregated namespace not yet initialized")
}
case storagemetadata.AggregatedMetricsType:
attrs := RetentionResolution{
Retention: attributes.Retention,
Resolution: attributes.Resolution,
}
var exists bool
namespace, exists = s.clusters.AggregatedClusterNamespace(attrs)
if !exists {
err = fmt.Errorf("no configured cluster namespace for: retention=%s,"+
Expand Down
17 changes: 17 additions & 0 deletions src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,23 @@ func TestLocalWriteAggregatedNoClusterNamespaceError(t *testing.T) {
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteUnaggregatedNamespaceUninitializedError(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
// We setup an empty dynamic cluster, which will by default have an uninitialized unaggregated namespace.
store := newTestStorage(t, &dynamicCluster{})

opts := newWriteQuery(t).Options()

writeQuery, err := storage.NewWriteQuery(opts)
require.NoError(t, err)

err = store.Write(context.TODO(), writeQuery)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), "unaggregated namespace not yet initialized"),
fmt.Sprintf("unexpected error string: %v", err.Error()))
}

func TestLocalWriteAggregatedInvalidMetricsTypeError(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down