diff --git a/src/dbnode/namespace/schema_registry.go b/src/dbnode/namespace/schema_registry.go index bfc11ea03f..014f19d4d2 100644 --- a/src/dbnode/namespace/schema_registry.go +++ b/src/dbnode/namespace/schema_registry.go @@ -31,6 +31,18 @@ import ( "go.uber.org/zap" ) +func newSchemaHistoryNotFoundError(nsIDStr string) error { + return &schemaHistoryNotFoundError{nsIDStr} +} + +type schemaHistoryNotFoundError struct { + nsIDStr string +} + +func (s *schemaHistoryNotFoundError) Error() string { + return fmt.Sprintf("schema history is not found for %s", s.nsIDStr) +} + type schemaRegistry struct { sync.RWMutex @@ -125,7 +137,7 @@ func (sr *schemaRegistry) getSchemaHistory(nsIDStr string) (SchemaHistory, error history, ok := sr.registry[nsIDStr] if !ok { - return nil, fmt.Errorf("schema history is not found for %v", nsIDStr) + return nil, newSchemaHistoryNotFoundError(nsIDStr) } return history.Get().(SchemaHistory), nil } diff --git a/src/dbnode/namespace/schema_registry_updater.go b/src/dbnode/namespace/schema_registry_updater.go index af6367d5f9..245103ca1a 100644 --- a/src/dbnode/namespace/schema_registry_updater.go +++ b/src/dbnode/namespace/schema_registry_updater.go @@ -31,42 +31,62 @@ import ( // UpdateSchemaRegistry updates schema registry with namespace updates. func UpdateSchemaRegistry(newNamespaces Map, schemaReg SchemaRegistry, log *zap.Logger) error { schemaUpdates := newNamespaces.Metadatas() - merr := xerrors.NewMultiError() + multiErr := xerrors.NewMultiError() for _, metadata := range schemaUpdates { - curSchemaID := "none" + var ( + curSchemaID = "none" + curSchemaNone = true + ) curSchema, err := schemaReg.GetLatestSchema(metadata.ID()) + if err != nil { + // NB(bodu): Schema history not found is a valid error as this occurs on initial bootstrap for the db. + if _, ok := err.(*schemaHistoryNotFoundError); !ok { + multiErr = multiErr.Add(fmt.Errorf("cannot get latest namespace schema: %v", err)) + continue + } + } + if curSchema != nil { + curSchemaNone = false curSchemaID = curSchema.DeployId() if len(curSchemaID) == 0 { - log.Warn("can not update namespace schema with empty deploy ID", zap.Stringer("namespace", metadata.ID()), + msg := "namespace schema update invalid with empty deploy ID" + log.Warn(msg, zap.Stringer("namespace", metadata.ID()), zap.String("currentSchemaID", curSchemaID)) - merr = merr.Add(fmt.Errorf("can not update namespace(%v) schema with empty deploy ID", metadata.ID().String())) + multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) continue } } + // Log schema update. latestSchema, found := metadata.Options().SchemaHistory().GetLatest() if !found { - log.Warn("can not update namespace schema to empty", zap.Stringer("namespace", metadata.ID()), - zap.String("currentSchema", curSchemaID)) - merr = merr.Add(fmt.Errorf("can not update namespace(%v) schema to empty", metadata.ID().String())) + if !curSchemaNone { + // NB(r): Only interpret this as a warning/error if already had a schema, + // otherwise this is just a namespace that is not using protobuf schemas. + msg := "namespace schema not found on update" + log.Warn(msg, zap.Stringer("namespace", metadata.ID()), + zap.String("currentSchema", curSchemaID)) + multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s", msg, metadata.ID().String())) + } continue } + log.Info("updating database namespace schema", zap.Stringer("namespace", metadata.ID()), zap.String("currentSchema", curSchemaID), zap.String("latestSchema", latestSchema.DeployId())) err = schemaReg.SetSchemaHistory(metadata.ID(), metadata.Options().SchemaHistory()) if err != nil { - log.Warn("failed to update latest schema for namespace", + msg := "namespace schema failed to update to latest schema" + log.Warn(msg, zap.Stringer("namespace", metadata.ID()), zap.Error(err)) - merr = merr.Add(fmt.Errorf("failed to update latest schema for namespace %v, error: %v", - metadata.ID().String(), err)) + multiErr = multiErr.Add(fmt.Errorf("%s: namespace=%s, error=%v", + msg, metadata.ID().String(), err)) } } - if merr.Empty() { - return nil + if !multiErr.Empty() { + return multiErr } - return merr + return nil } -