Skip to content

Commit

Permalink
[3.1.2 Backport] CBG-3387: Handle nil buckets in db config (#6488)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbrks authored Oct 5, 2023
1 parent f5d70ab commit 595a2aa
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 31 deletions.
16 changes: 16 additions & 0 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,3 +885,19 @@ func AssertTimestampGreaterThan(t *testing.T, e1, e2 int64, msgAndArgs ...interf
}
return assert.Greater(t, e1, e2, msgAndArgs...)
}

// MoveDocument moves the document from src to dst
// Note: does not handle xattr contents
func MoveDocument(t testing.TB, docID string, dst, src DataStore) {
var data interface{}

srcCAS, err := src.Get(docID, &data)
require.NoError(t, err)

ok, err := dst.Add(docID, 0, data)
require.NoError(t, err)
require.True(t, ok)

_, err = src.Remove(docID, srcCAS)
require.NoError(t, err)
}
45 changes: 32 additions & 13 deletions rest/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (h *handler) handleCreateDB() error {

validateOIDC := !h.getBoolQuery(paramDisableOIDCValidation)

if dbName != config.Name && config.Name != "" {
if config.Name != "" && dbName != config.Name {
return base.HTTPErrorf(http.StatusBadRequest, "When providing a name in the JSON body (%s), ensure it matches the name in the path (%s).", config.Name, dbName)
}

Expand All @@ -69,9 +69,12 @@ func (h *handler) handleCreateDB() error {
return err
}

bucket := dbName
if config.Bucket != nil {
bucket = *config.Bucket
bucket := config.GetBucketName()

// Before computing metadata ID or taking a copy of the "persistedDbConfig", stamp bucket if missing...
// Ensures all newly persisted configs have a bucket field set to detect bucket moves via backup/restore or XDCR
if config.Bucket == nil {
config.Bucket = &bucket
}

metadataID, metadataIDError := h.server.BootstrapContext.ComputeMetadataIDForDbConfig(h.ctx(), config)
Expand All @@ -95,7 +98,8 @@ func (h *handler) handleCreateDB() error {
loadedConfig := DatabaseConfig{
Version: version,
MetadataID: metadataID,
DbConfig: *config}
DbConfig: *config,
}

persistedConfig := DatabaseConfig{
Version: version,
Expand Down Expand Up @@ -173,22 +177,37 @@ func (h *handler) handleCreateDB() error {
return base.HTTPErrorf(http.StatusCreated, "created")
}

// getAuthScopeHandleCreateDB is used in the router to supply an auth scope for the admin api auth. Takes the JSON body
// from the payload, pulls out bucket and returns this as the auth scope.
func getAuthScopeHandleCreateDB(ctx context.Context, bodyJSON []byte) (string, error) {
var body struct {
// getAuthScopeHandleCreateDB determines the auth scope for a PUT /{db}/ request.
// This is a special case because we don't have a database initialized yet, so we need to infer the bucket from db config.
func getAuthScopeHandleCreateDB(ctx context.Context, h *handler) (bucketName string, err error) {

// grab a copy of the request body and restore body buffer for later handlers
bodyJSON, err := h.readBody()
if err != nil {
return "", base.HTTPErrorf(http.StatusInternalServerError, "Unable to read body: %v", err)
}
// mark the body as already read to avoid double-counting bytes for stats once it gets read again
h.requestBody = io.NopCloser(bytes.NewReader(bodyJSON))

var dbConfigBody struct {
Bucket string `json:"bucket"`
}
reader := bytes.NewReader(bodyJSON)
err := DecodeAndSanitiseConfig(ctx, reader, &body, false)
err = DecodeAndSanitiseConfig(ctx, reader, &dbConfigBody, false)
if err != nil {
return "", err
}
if body.Bucket == "" {
return "", nil

if dbConfigBody.Bucket == "" {
dbName := h.PathVar("newdb")
if dbName == "" {
return "", base.HTTPErrorf(http.StatusBadRequest, "bucket or db name not specified in request")
}
// imply bucket name from db name in path if not in body
return dbName, nil
}

return body.Bucket, nil
return dbConfigBody.Bucket, nil
}

// Take a DB online, first reload the DB config
Expand Down
15 changes: 13 additions & 2 deletions rest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ func (dbConfig *DbConfig) setDatabaseCredentials(credentials base.CredentialsCon
// setup populates fields in the dbConfig
func (dbConfig *DbConfig) setup(ctx context.Context, dbName string, bootstrapConfig BootstrapConfig, dbCredentials, bucketCredentials *base.CredentialsConfig, forcePerBucketAuth bool) error {
dbConfig.Name = dbName

// use db name as bucket if absent from config (handling for old non-stamped configs)
if dbConfig.Bucket == nil {
dbConfig.Bucket = &dbConfig.Name
}
Expand Down Expand Up @@ -539,6 +541,15 @@ func readFromPath(ctx context.Context, path string, insecureSkipVerify bool) (rc
return rc, nil
}

// GetBucketName returns the bucket name associated with the database config.
func (dbConfig *DbConfig) GetBucketName() string {
if dbConfig.Bucket != nil {
return *dbConfig.Bucket
}
// db name as fallback in the case of a `nil` bucket.
return dbConfig.Name
}

func (dbConfig *DbConfig) AutoImportEnabled(ctx context.Context) (bool, error) {
if dbConfig.AutoImport == nil {
return base.DefaultAutoImport, nil
Expand Down Expand Up @@ -1526,7 +1537,7 @@ func (sc *ServerContext) _fetchDatabase(ctx context.Context, dbName string) (fou
// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != *cnf.Bucket {
if bucket != cnf.GetBucketName() {
sc._handleInvalidDatabaseConfig(ctx, bucket, cnf)
return true, fmt.Errorf("mismatch in persisted database bucket name %q vs the actual bucket name %q. Please correct db %q's config, groupID %q.", base.MD(cnf.Bucket), base.MD(bucket), base.MD(cnf.Name), base.MD(sc.Config.Bootstrap.ConfigGroupID))
}
Expand Down Expand Up @@ -1687,7 +1698,7 @@ func (sc *ServerContext) FetchConfigs(ctx context.Context, isInitialStartup bool
// We need to check for corruption in the database config (CC. CBG-3292). If the fetched config doesn't match the
// bucket name we got the config from we need to maker this db context as corrupt. Then remove the context and
// in memory representation on the server context.
if bucket != *cnf.Bucket {
if bucket != cnf.GetBucketName() {
sc.handleInvalidDatabaseConfig(ctx, bucket, *cnf)
continue
}
Expand Down
9 changes: 2 additions & 7 deletions rest/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,11 +693,7 @@ func (b *bootstrapContext) addDatabaseLogContext(ctx context.Context, config *Db
}

func (b *bootstrapContext) ComputeMetadataIDForDbConfig(ctx context.Context, config *DbConfig) (string, error) {
bucketName := config.Bucket
if bucketName == nil {
return "", fmt.Errorf("No bucket defined in dbConfig, cannot compute metadataID")
}
registry, err := b.getGatewayRegistry(ctx, *bucketName)
registry, err := b.getGatewayRegistry(ctx, config.GetBucketName())
if err != nil {
return "", err
}
Expand Down Expand Up @@ -740,8 +736,7 @@ func (b *bootstrapContext) computeMetadataID(ctx context.Context, registry *Gate
}
}

// If _default._default is already associated with a metadataID, return standard metadata ID
bucketName := *config.Bucket
bucketName := config.GetBucketName()
exists, err := b.Connection.KeyExists(ctx, bucketName, base.SGSyncInfo)
if err != nil {
base.WarnfCtx(ctx, "Error checking whether metadataID is already defined for default collection - using standard metadataID. Error: %v", err)
Expand Down
11 changes: 2 additions & 9 deletions rest/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type handler struct {
rqCtx context.Context
}

type authScopeFunc func(ctx context.Context, bodyJSON []byte) (string, error)
type authScopeFunc func(context.Context, *handler) (string, error)

type handlerPrivs int

Expand Down Expand Up @@ -461,14 +461,7 @@ func (h *handler) validateAndWriteHeaders(method handlerMethod, accessPermission
return base.HTTPErrorf(http.StatusInternalServerError, "")
}
if h.authScopeFunc != nil {
body, err := h.readBody()
if err != nil {
return base.HTTPErrorf(http.StatusInternalServerError, "Unable to read body: %v", err)
}
// The above readBody() will end up clearing the body which the later handler will require. Re-populate this
// for the later handler.
h.requestBody = io.NopCloser(bytes.NewReader(body))
authScope, err = h.authScopeFunc(h.ctx(), body)
authScope, err = h.authScopeFunc(h.ctx(), h)
if err != nil {
return base.HTTPErrorf(http.StatusInternalServerError, "Unable to read body: %v", err)
}
Expand Down
100 changes: 100 additions & 0 deletions rest/persistent_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1236,3 +1236,103 @@ func makeDbConfig(bucketName string, dbName string, scopesConfig ScopesConfig) D
}
return dbConfig
}

func TestPersistentConfigNoBucketField(t *testing.T) {
base.TestsRequireBootstrapConnection(t)
base.RequireNumTestBuckets(t, 2)

base.SetUpTestLogging(t, base.LevelTrace, base.KeyConfig)

b1 := base.GetTestBucket(t)
defer b1.Close(base.TestCtx(t))
b1Name := b1.GetName()

// at the end of the test we'll move config from b1 into b2 to test backup/restore-type migration
b2 := base.GetTestBucket(t)
defer b2.Close(base.TestCtx(t))
b2Name := b2.GetName()

rt := NewRestTester(t, &RestTesterConfig{
PersistentConfig: true,
CustomTestBucket: b1.NoCloseClone(),
})
defer rt.Close()

dbName := b1Name

dbConfig := rt.NewDbConfig()
// will infer from db name in handler and stamp into config (as of CBG-3353)
dbConfig.Bucket = nil
resp := rt.CreateDatabase(dbName, dbConfig)
RequireStatus(t, resp, http.StatusCreated)

// read back config in bucket to see if bucket field was stamped into the config
var databaseConfig DatabaseConfig
groupID := rt.ServerContext().Config.Bootstrap.ConfigGroupID
configDocID := PersistentConfigKey(base.TestCtx(t), groupID, b1Name)
_, err := rt.GetDatabase().MetadataStore.Get(configDocID, &databaseConfig)
require.NoError(t, err)
require.NotNil(t, databaseConfig.Bucket)
assert.Equal(t, b1Name, *databaseConfig.Bucket, "bucket field should be stamped into config")

// manually strip out bucket to test backwards compatibility (older configs don't always have this field set)
_, err = rt.GetDatabase().MetadataStore.Update(configDocID, 0, func(current []byte) (updated []byte, expiry *uint32, delete bool, err error) {
var d DatabaseConfig
require.NoError(t, base.JSONUnmarshal(current, &d))
d.Bucket = nil
newConfig, err := base.JSONMarshal(d)
return newConfig, nil, false, err
})
require.NoError(t, err)

count, err := rt.ServerContext().fetchAndLoadConfigs(base.TestCtx(t), false)
require.NoError(t, err)
assert.Equal(t, 1, count, "should have loaded 1 config")

_, err = rt.UpdatePersistedBucketName(&databaseConfig, &b2Name)
require.NoError(t, err)

dbBucketMismatch := base.SyncGatewayStats.GlobalStats.ConfigStat.DatabaseBucketMismatches.Value()

// expect config to fail to load due to bucket mismatch
count, err = rt.ServerContext().fetchAndLoadConfigs(base.TestCtx(t), false)
require.NoError(t, err)
assert.Equal(t, 0, count)
dbBucketMismatch, _ = base.WaitForStat(t, base.SyncGatewayStats.GlobalStats.ConfigStat.DatabaseBucketMismatches.Value, dbBucketMismatch+1)

// Move config docs from original bucket to b2 and force a fetch/load (simulate backup/restore or XDCR to different bucket)
base.MoveDocument(t, base.SGRegistryKey, b2.GetMetadataStore(), b1.GetMetadataStore())
base.MoveDocument(t, configDocID, b2.GetMetadataStore(), b1.GetMetadataStore())

// put the bucket for the config back to b1 so we can use the admin API to repair the config (like a real user would have to do)
_, err = b2.GetMetadataStore().Update(configDocID, 0, func(current []byte) (updated []byte, expiry *uint32, delete bool, err error) {
var d DatabaseConfig
require.NoError(t, base.JSONUnmarshal(current, &d))
d.Bucket = &b1Name
newConfig, err := base.JSONMarshal(d)
return newConfig, nil, false, err
})
require.NoError(t, err)

count, err = rt.ServerContext().fetchAndLoadConfigs(base.TestCtx(t), false)
require.NoError(t, err)
assert.Equal(t, 0, count)
dbBucketMismatch, _ = base.WaitForStat(t, base.SyncGatewayStats.GlobalStats.ConfigStat.DatabaseBucketMismatches.Value, dbBucketMismatch+1)

// repair config
dbConfig.Bucket = &b2Name

// /db/_config won't work because db isn't actually loaded
resp = rt.UpsertDbConfig(dbName, dbConfig)
RequireStatus(t, resp, http.StatusNotFound)

// PUT /db/ will work to repair config
resp = rt.CreateDatabase(dbName, dbConfig)
RequireStatus(t, resp, http.StatusCreated)

// do another fetch just to be sure that the config won't be unloaded again
count, err = rt.ServerContext().fetchAndLoadConfigs(base.TestCtx(t), false)
require.NoError(t, err)
assert.Equal(t, 0, count)
_, _ = base.WaitForStat(t, base.SyncGatewayStats.GlobalStats.ConfigStat.DatabaseBucketMismatches.Value, dbBucketMismatch+1)
}

0 comments on commit 595a2aa

Please sign in to comment.