From 9b0207f1bb25db4935056842c269ec2913c0aad0 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 21 Oct 2019 14:51:31 -0400 Subject: [PATCH 1/4] Fix namespace update log message when missing schema for non-protobuf namespace --- .../namespace/schema_registry_updater.go | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/src/dbnode/namespace/schema_registry_updater.go b/src/dbnode/namespace/schema_registry_updater.go index af6367d5f9..af13e15232 100644 --- a/src/dbnode/namespace/schema_registry_updater.go +++ b/src/dbnode/namespace/schema_registry_updater.go @@ -33,35 +33,53 @@ func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap. schemaUpdates := newNamespaces.Metadatas() merr := xerrors.NewMultiError() for _, metadata := range schemaUpdates { - curSchemaID := "none" + var ( + curSchemaID = "none" + curSchemaNone = true + ) curSchema, err := schemaReg.GetLatestSchema(metadata.ID()) + if err != nil { + merr = merr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err)) + continue + } + if curSchema != nil { + curSchemaNone = false curSchemaID = curSchema.DeployId() if len(curSchemaID) == 0 { - log.Warn("can not update namespace schema with empty deploy ID", zap.Stringer("namespace", metadata.ID()), + msg := "namespace schema update invalid with empty deploy ID" + log.Warn(msg, zap.Stringer("namespace", metadata.ID()), zap.String("currentSchemaID", curSchemaID)) - merr = merr.Add(fmt.Errorf("can not update namespace(%v) schema with empty deploy ID", metadata.ID().String())) + merr = merr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) continue } } + // Log schema update. latestSchema, found := metadata.Options().SchemaHistory().GetLatest() if !found { - log.Warn("can not update namespace schema to empty", zap.Stringer("namespace", metadata.ID()), - zap.String("currentSchema", curSchemaID)) - merr = merr.Add(fmt.Errorf("can not update namespace(%v) schema to empty", metadata.ID().String())) + if !curSchemaNone { + // NB(r): Only interpret this as a warning/error if already had a schema + // otherwise this is just a namespace that is not using protobuf schemas. + msg := "namespace schema not found on update" + log.Warn(msg, zap.Stringer("namespace", metadata.ID()), + zap.String("currentSchema", curSchemaID)) + merr = merr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) + } continue } + log.Info("updating database namespace schema", zap.Stringer("namespace", metadata.ID()), zap.String("currentSchema", curSchemaID), zap.String("latestSchema", latestSchema.DeployId())) err = schemaReg.SetSchemaHistory(metadata.ID(), metadata.Options().SchemaHistory()) if err != nil { - log.Warn("failed to update latest schema for namespace", + msg := "namespace schema failed to update to latest schema" + log.Warn(msg, zap.Stringer("namespace", metadata.ID()), zap.Error(err)) - merr = merr.Add(fmt.Errorf("failed to update latest schema for namespace %v, error: %v", - metadata.ID().String(), err)) + merr = merr.Add(fmt.Errorf("%s: namespace=%s, error=%v", + msg, metadata.ID().String(), err)) } } if merr.Empty() { From 4b0dc9f31a63d2e9c32ec468576455f2d76a5557 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 22 Oct 2019 05:42:09 -0400 Subject: [PATCH 2/4] Address feedback --- .../namespace/schema_registry_updater.go | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/dbnode/namespace/schema_registry_updater.go b/src/dbnode/namespace/schema_registry_updater.go index af13e15232..ee20633653 100644 --- a/src/dbnode/namespace/schema_registry_updater.go +++ b/src/dbnode/namespace/schema_registry_updater.go @@ -31,15 +31,15 @@ import ( // UpdateSchemaRegistry updates schema registry with namespace updates. func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap.Logger) error { schemaUpdates := newNamespaces.Metadatas() - merr := xerrors.NewMultiError() + multiErr := xerrors.NewMultiError() for _, metadata := range schemaUpdates { var ( - curSchemaID = "none" + curSchemaID = "none" curSchemaNone = true ) curSchema, err := schemaReg.GetLatestSchema(metadata.ID()) if err != nil { - merr = merr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err)) + multiErr = multiErr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err)) continue } @@ -50,7 +50,7 @@ func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap. msg := "namespace schema update invalid with empty deploy ID" log.Warn(msg, zap.Stringer("namespace", metadata.ID()), zap.String("currentSchemaID", curSchemaID)) - merr = merr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) + multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) continue } } @@ -59,12 +59,12 @@ func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap. latestSchema, found := metadata.Options().SchemaHistory().GetLatest() if !found { if !curSchemaNone { - // NB(r): Only interpret this as a warning/error if already had a schema + // NB(r): Only interpret this as a warning/error if already had a schema, // otherwise this is just a namespace that is not using protobuf schemas. msg := "namespace schema not found on update" log.Warn(msg, zap.Stringer("namespace", metadata.ID()), zap.String("currentSchema", curSchemaID)) - merr = merr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) + multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) } continue } @@ -78,13 +78,12 @@ func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap. log.Warn(msg, zap.Stringer("namespace", metadata.ID()), zap.Error(err)) - merr = merr.Add(fmt.Errorf("%s: namespace=%s, error=%v", + multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s, error=%v", msg, metadata.ID().String(), err)) } } - if merr.Empty() { - return nil + if !multiErr.Empty() { + return multiErr } - return merr + return nil } - From f62eccf8c1c9b0364f666610d90d3310fb3a6c8d Mon Sep 17 00:00:00 2001 From: Bo Du Date: Fri, 25 Oct 2019 15:35:27 -0400 Subject: [PATCH 3/4] Schema not found on initial schema bootstrap is a valid error. --- src/dbnode/namespace/schema_registry_updater.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dbnode/namespace/schema_registry_updater.go b/src/dbnode/namespace/schema_registry_updater.go index ee20633653..472da93c96 100644 --- a/src/dbnode/namespace/schema_registry_updater.go +++ b/src/dbnode/namespace/schema_registry_updater.go @@ -22,6 +22,7 @@ package namespace import ( "fmt" + "strings" xerrors "github.com/m3db/m3/src/x/errors" @@ -38,7 +39,8 @@ func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap. curSchemaNone = true ) curSchema, err := schemaReg.GetLatestSchema(metadata.ID()) - if err != nil { + // NB(bodu): Schema history not found is a valid error as this occurs on initial bootstrap for the db. + if err != nil && !strings.Contains(err.Error(), "schema history is not found for") { multiErr = multiErr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err)) continue } From 4360c024b655b275497ea2dd33bc26b7ac745878 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Fri, 25 Oct 2019 16:32:50 -0400 Subject: [PATCH 4/4] Custom error type/check. --- src/dbnode/namespace/schema_registry.go | 14 +++++++++++++- src/dbnode/namespace/schema_registry_updater.go | 11 ++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/dbnode/namespace/schema_registry.go b/src/dbnode/namespace/schema_registry.go index bfc11ea03f..014f19d4d2 100644 --- a/src/dbnode/namespace/schema_registry.go +++ b/src/dbnode/namespace/schema_registry.go @@ -31,6 +31,18 @@ import ( "go.uber.org/zap" ) +func newSchemaHistoryNotFoundError(nsIDStr string) error { + return &schemaHistoryNotFoundError{nsIDStr} +} + +type schemaHistoryNotFoundError struct { + nsIDStr string +} + +func (s *schemaHistoryNotFoundError) Error() string { + return fmt.Sprintf("schema history is not found for %s", s.nsIDStr) +} + type schemaRegistry struct { sync.RWMutex @@ -125,7 +137,7 @@ func (sr *schemaRegistry) getSchemaHistory(nsIDStr string) (SchemaHistory, error history, ok := sr.registry[nsIDStr] if !ok { - return nil, fmt.Errorf("schema history is not found for %v", nsIDStr) + return nil, newSchemaHistoryNotFoundError(nsIDStr) } return history.Get().(SchemaHistory), nil } diff --git a/src/dbnode/namespace/schema_registry_updater.go b/src/dbnode/namespace/schema_registry_updater.go index 472da93c96..245103ca1a 100644 --- a/src/dbnode/namespace/schema_registry_updater.go +++ b/src/dbnode/namespace/schema_registry_updater.go @@ -22,7 +22,6 @@ package namespace import ( "fmt" - "strings" xerrors "github.com/m3db/m3/src/x/errors" @@ -39,10 +38,12 @@ func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap. curSchemaNone = true ) curSchema, err := schemaReg.GetLatestSchema(metadata.ID()) - // NB(bodu): Schema history not found is a valid error as this occurs on initial bootstrap for the db. - if err != nil && !strings.Contains(err.Error(), "schema history is not found for") { - multiErr = multiErr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err)) - continue + if err != nil { + // NB(bodu): Schema history not found is a valid error as this occurs on initial bootstrap for the db. + if _, ok := err.(*schemaHistoryNotFoundError); !ok { + multiErr = multiErr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err)) + continue + } } if curSchema != nil {