Skip to content

Commit

Permalink
cli: disable automatic compactions for compact command & enable paral…
Browse files Browse the repository at this point in the history
…lelism

When running `debug compact`, disable automatic compactions to prevent
excessive retrying of the manual compaction, and to prevent conflicts
in the chosen key ranges to compact in parallel.

Also, enable concurrent manual compactions in the call to `db.Compact`.

Release note: None

Release justification: Plumbing for pebble change which was merged before
the stability period.
  • Loading branch information
mufeez-amjad committed Mar 2, 2022
1 parent 12c968f commit 836ab71
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/cliccl/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func runEncryptionStatus(cmd *cobra.Command, args []string) error {

dir := args[0]

db, err := cli.OpenExistingStore(dir, stopper, true /* readOnly */)
db, err := cli.OpenExistingStore(dir, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down
38 changes: 24 additions & 14 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ func parsePositiveDuration(arg string) (time.Duration, error) {

// OpenEngineOptions tunes the behavior of OpenEngine.
type OpenEngineOptions struct {
ReadOnly bool
MustExist bool
ReadOnly bool
MustExist bool
DisableAutomaticCompactions bool
}

func (opts OpenEngineOptions) configOptions() []storage.ConfigOption {
Expand All @@ -143,13 +144,22 @@ func (opts OpenEngineOptions) configOptions() []storage.ConfigOption {
if opts.MustExist {
cfgOpts = append(cfgOpts, storage.MustExist)
}
if opts.DisableAutomaticCompactions {
cfgOpts = append(cfgOpts, storage.DisableAutomaticCompactions)
}
return cfgOpts
}

// OpenExistingStore opens the Pebble engine rooted at 'dir'.
// If 'readOnly' is true, opens the store in read-only mode.
func OpenExistingStore(dir string, stopper *stop.Stopper, readOnly bool) (storage.Engine, error) {
return OpenEngine(dir, stopper, OpenEngineOptions{ReadOnly: readOnly, MustExist: true})
// OpenExistingStore opens the Pebble engine rooted at 'dir'. If 'readOnly' is
// true, opens the store in read-only mode. If 'disableAutomaticCompactions' is
// true, disables automatic/background compactions (only used for manual
// compactions).
func OpenExistingStore(
dir string, stopper *stop.Stopper, readOnly, disableAutomaticCompactions bool,
) (storage.Engine, error) {
return OpenEngine(dir, stopper, OpenEngineOptions{
ReadOnly: readOnly, MustExist: true, DisableAutomaticCompactions: disableAutomaticCompactions,
})
}

// OpenEngine opens the engine at 'dir'. Depending on the supplied options,
Expand Down Expand Up @@ -237,7 +247,7 @@ func runDebugKeys(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -410,7 +420,7 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -500,7 +510,7 @@ func runDebugRangeDescriptors(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -628,7 +638,7 @@ func runDebugRaftLog(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -698,7 +708,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error {
}
}

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -796,7 +806,7 @@ func runDebugCompact(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, false /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, false /* readOnly */, true /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -1064,7 +1074,7 @@ func runDebugUnsafeRemoveDeadReplicas(cmd *cobra.Command, args []string) error {
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(args[0], stopper, false /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -1426,7 +1436,7 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error {
ctx := context.Background()
defer stopper.Stop(ctx)

db, err := OpenExistingStore(args[0], stopper, true /* readOnly */)
db, err := OpenExistingStore(args[0], stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug_check_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func checkStoreRangeStats(
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

eng, err := OpenExistingStore(dir, stopper, true /* readOnly */)
eng, err := OpenExistingStore(dir, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func checkStoreRaftState(
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

db, err := OpenExistingStore(dir, stopper, true /* readOnly */)
db, err := OpenExistingStore(dir, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/debug_recover_loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func runDebugDeadReplicaCollect(cmd *cobra.Command, args []string) error {

var stores []storage.Engine
for _, storeSpec := range debugRecoverCollectInfoOpts.Stores.Specs {
db, err := OpenExistingStore(storeSpec.Path, stopper, true /* readOnly */)
db, err := OpenExistingStore(storeSpec.Path, stopper, true /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return errors.Wrapf(err, "failed to open store at path %q, ensure that store path is "+
"correct and that it is not used by another process", storeSpec.Path)
Expand Down Expand Up @@ -457,7 +457,7 @@ func runDebugExecuteRecoverPlan(cmd *cobra.Command, args []string) error {
var localNodeID roachpb.NodeID
batches := make(map[roachpb.StoreID]storage.Batch)
for _, storeSpec := range debugRecoverExecuteOpts.Stores.Specs {
store, err := OpenExistingStore(storeSpec.Path, stopper, false /* readOnly */)
store, err := OpenExistingStore(storeSpec.Path, stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return errors.Wrapf(err, "failed to open store at path %q. ensure that store path is "+
"correct and that it is not used by another process", storeSpec.Path)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestOpenExistingStore(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("dir=%s", test.dir), func(t *testing.T) {
_, err := OpenExistingStore(test.dir, stopper, false /* readOnly */)
_, err := OpenExistingStore(test.dir, stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
if !testutils.IsError(err, test.expErr) {
t.Errorf("wanted %s but got %v", test.expErr, err)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestOpenReadOnlyStore(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("readOnly=%t", test.readOnly), func(t *testing.T) {
db, err := OpenExistingStore(storePath, stopper, test.readOnly)
db, err := OpenExistingStore(storePath, stopper, test.readOnly, false /* disableAutomaticCompactions */)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestRemoveDeadReplicas(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

db, err := OpenExistingStore(storePaths[idx], stopper, false /* readOnly */)
db, err := OpenExistingStore(storePaths[idx], stopper, false /* readOnly */, false /* disableAutomaticCompactions */)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ var MustExist ConfigOption = func(cfg *engineConfig) error {
return nil
}

// DisableAutomaticCompactions configures an engine to be opened with disabled
// automatic compactions. Used primarily for debugCompactCmd.
var DisableAutomaticCompactions ConfigOption = func(cfg *engineConfig) error {
cfg.Opts.DisableAutomaticCompactions = true
return nil
}

// ForTesting configures the engine for use in testing. It may randomize some
// config options to improve test coverage.
var ForTesting ConfigOption = func(cfg *engineConfig) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,14 +1482,14 @@ func (p *Pebble) ApproximateDiskBytes(from, to roachpb.Key) (uint64, error) {

// Compact implements the Engine interface.
func (p *Pebble) Compact() error {
return p.db.Compact(nil, EncodeMVCCKey(MVCCKeyMax), false /* parallel */)
return p.db.Compact(nil, EncodeMVCCKey(MVCCKeyMax), true /* parallel */)
}

// CompactRange implements the Engine interface.
func (p *Pebble) CompactRange(start, end roachpb.Key) error {
bufStart := EncodeMVCCKey(MVCCKey{start, hlc.Timestamp{}})
bufEnd := EncodeMVCCKey(MVCCKey{end, hlc.Timestamp{}})
return p.db.Compact(bufStart, bufEnd, false /* parallel */)
return p.db.Compact(bufStart, bufEnd, true /* parallel */)
}

// InMem returns true if the receiver is an in-memory engine and false
Expand Down

0 comments on commit 836ab71

Please sign in to comment.