diff --git a/cmd/pump/pump.toml b/cmd/pump/pump.toml index 927e9d7a8..65a906f0e 100644 --- a/cmd/pump/pump.toml +++ b/cmd/pump/pump.toml @@ -27,6 +27,9 @@ pd-urls = "http://127.0.0.1:2379" # Path of file that contains X509 key in PEM format for connection with cluster components. # ssl-key = "/path/to/drainer-key.pem" # +# [storage] +# Set to `true` (default) for best reliability, which prevents data loss when there is a power failure. +# sync-log = true # # we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing # [storage.kv] @@ -36,7 +39,7 @@ pd-urls = "http://127.0.0.1:2379" # compaction-L0-trigger = 8 # compaction-table-size = 67108864 # compaction-total-size = 536870912 -# compaction-total-size-multiplier = 8 +# compaction-total-size-multiplier = 8.0 # write-buffer = 67108864 # write-L0-pause-trigger = 24 # write-L0-slowdown-trigger = 17 diff --git a/pump/config.go b/pump/config.go index ff29ed101..70b946d09 100644 --- a/pump/config.go +++ b/pump/config.go @@ -64,7 +64,7 @@ type Config struct { configFile string printVersion bool tls *tls.Config - Storage *storage.Config `toml:"storage" json:"storage"` + Storage storage.Config `toml:"storage" json:"storage"` } // NewConfig return an instance of configuration diff --git a/pump/server.go b/pump/server.go index 0d077010d..347fa488f 100644 --- a/pump/server.go +++ b/pump/server.go @@ -125,7 +125,10 @@ func NewServer(cfg *Config) (*Server, error) { return nil, errors.Trace(err) } - options := storage.DefaultOptions().WithStorage(cfg.Storage) + options := storage.DefaultOptions() + options = options.WithKVConfig(cfg.Storage.KV) + options = options.WithSync(cfg.Storage.GetSyncLog()) + storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver) if err != nil { return nil, errors.Trace(err) diff --git a/pump/storage/storage.go b/pump/storage/storage.go index bb0005393..911868266 100644 --- a/pump/storage/storage.go +++ b/pump/storage/storage.go @@ -94,13 +94,14 @@ func NewAppend(dir string, options *Options) (append *Append, err error) { } // NewAppendWithResolver returns a instance of Append -// if tiStore and tiLockResolver is not nil, we will try to query tikv to know weather a txn is committed +// if tiStore and tiLockResolver is not nil, we will try to query tikv to know whether a txn is committed func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiLockResolver *tikv.LockResolver) (append *Append, err error) { if options == nil { options = DefaultOptions() } - kvDir := path.Join(dir, "kv") + log.Infof("options: %+v", options) + valueDir := path.Join(dir, "value") err = os.MkdirAll(valueDir, 0755) if err != nil { @@ -113,28 +114,8 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL return nil, errors.Trace(err) } - var opt opt.Options - cf := options.Storage - if cf == nil { - cf = defaultStorageConfig - } else { - setDefaultStorageConfig(cf) - } - - log.Infof("storage config: %+v", cf) - - opt.BlockCacheCapacity = cf.BlockCacheCapacity - opt.BlockRestartInterval = cf.BlockRestartInterval - opt.BlockSize = cf.BlockSize - opt.CompactionL0Trigger = cf.CompactionL0Trigger - opt.CompactionTableSize = cf.CompactionTableSize - opt.CompactionTotalSize = cf.CompactionTotalSize - opt.CompactionTotalSizeMultiplier = cf.CompactionTotalSizeMultiplier - opt.WriteBuffer = cf.WriteBuffer - opt.WriteL0PauseTrigger = cf.WriteL0PauseTrigger - opt.WriteL0SlowdownTrigger = cf.WriteL0SlowdownTrigger - - metadata, err := leveldb.OpenFile(kvDir, &opt) + kvDir := path.Join(dir, "kv") + metadata, err := openMetadataDB(kvDir, options.KVConfig) if err != nil { return nil, errors.Trace(err) } @@ -229,7 +210,6 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL append.wg.Add(1) go append.updateStatus() - return } @@ -1014,7 +994,17 @@ func getStorageSize(dir string) (size storageSize, err error) { // Config holds the configuration of storage type Config struct { - KVConfig `toml:"kv" json:"kv"` + SyncLog *bool `toml:"sync-log" json:"sync-log"` + KV *KVConfig `toml:"kv" json:"kv"` +} + +// GetSyncLog return sync-log config option +func (c *Config) GetSyncLog() bool { + if c.SyncLog == nil { + return true + } + + return *c.SyncLog } // KVConfig if the configuration of goleveldb @@ -1031,50 +1021,72 @@ type KVConfig struct { WriteL0SlowdownTrigger int `toml:"write-L0-slowdown-trigger" json:"write-L0-slowdown-trigger"` } -var defaultStorageConfig = &Config{ - KVConfig: KVConfig{ - BlockCacheCapacity: 8388608, - BlockRestartInterval: 16, - BlockSize: 4096, - CompactionL0Trigger: 8, - CompactionTableSize: 67108864, - CompactionTotalSize: 536870912, - CompactionTotalSizeMultiplier: 8, - WriteBuffer: 67108864, - WriteL0PauseTrigger: 24, - WriteL0SlowdownTrigger: 17, - }, +var defaultStorageKVConfig = &KVConfig{ + BlockCacheCapacity: 8388608, + BlockRestartInterval: 16, + BlockSize: 4096, + CompactionL0Trigger: 8, + CompactionTableSize: 67108864, + CompactionTotalSize: 536870912, + CompactionTotalSizeMultiplier: 8, + WriteBuffer: 67108864, + WriteL0PauseTrigger: 24, + WriteL0SlowdownTrigger: 17, } -func setDefaultStorageConfig(cf *Config) { +func setDefaultStorageConfig(cf *KVConfig) { if cf.BlockCacheCapacity <= 0 { - cf.BlockCacheCapacity = defaultStorageConfig.BlockCacheCapacity + cf.BlockCacheCapacity = defaultStorageKVConfig.BlockCacheCapacity } if cf.BlockRestartInterval <= 0 { - cf.BlockRestartInterval = defaultStorageConfig.BlockRestartInterval + cf.BlockRestartInterval = defaultStorageKVConfig.BlockRestartInterval } if cf.BlockSize <= 0 { - cf.BlockSize = defaultStorageConfig.BlockSize + cf.BlockSize = defaultStorageKVConfig.BlockSize } if cf.CompactionL0Trigger <= 0 { - cf.CompactionL0Trigger = defaultStorageConfig.CompactionL0Trigger + cf.CompactionL0Trigger = defaultStorageKVConfig.CompactionL0Trigger } if cf.CompactionTableSize <= 0 { - cf.CompactionTableSize = defaultStorageConfig.CompactionTableSize + cf.CompactionTableSize = defaultStorageKVConfig.CompactionTableSize } if cf.CompactionTotalSize <= 0 { - cf.CompactionTotalSize = defaultStorageConfig.CompactionTotalSize + cf.CompactionTotalSize = defaultStorageKVConfig.CompactionTotalSize } if cf.CompactionTotalSizeMultiplier <= 0 { - cf.CompactionTotalSizeMultiplier = defaultStorageConfig.CompactionTotalSizeMultiplier + cf.CompactionTotalSizeMultiplier = defaultStorageKVConfig.CompactionTotalSizeMultiplier } if cf.WriteBuffer <= 0 { - cf.WriteBuffer = defaultStorageConfig.WriteBuffer + cf.WriteBuffer = defaultStorageKVConfig.WriteBuffer } if cf.WriteL0PauseTrigger <= 0 { - cf.WriteL0PauseTrigger = defaultStorageConfig.WriteL0PauseTrigger + cf.WriteL0PauseTrigger = defaultStorageKVConfig.WriteL0PauseTrigger } if cf.WriteL0SlowdownTrigger <= 0 { - cf.WriteL0SlowdownTrigger = defaultStorageConfig.WriteL0SlowdownTrigger + cf.WriteL0SlowdownTrigger = defaultStorageKVConfig.WriteL0SlowdownTrigger + } +} + +func openMetadataDB(kvDir string, cf *KVConfig) (*leveldb.DB, error) { + if cf == nil { + cf = defaultStorageKVConfig + } else { + setDefaultStorageConfig(cf) } + + log.Infof("Storage config: %+v", cf) + + var opt opt.Options + opt.BlockCacheCapacity = cf.BlockCacheCapacity + opt.BlockRestartInterval = cf.BlockRestartInterval + opt.BlockSize = cf.BlockSize + opt.CompactionL0Trigger = cf.CompactionL0Trigger + opt.CompactionTableSize = cf.CompactionTableSize + opt.CompactionTotalSize = cf.CompactionTotalSize + opt.CompactionTotalSizeMultiplier = cf.CompactionTotalSizeMultiplier + opt.WriteBuffer = cf.WriteBuffer + opt.WriteL0PauseTrigger = cf.WriteL0PauseTrigger + opt.WriteL0SlowdownTrigger = cf.WriteL0SlowdownTrigger + + return leveldb.OpenFile(kvDir, &opt) } diff --git a/pump/storage/storage_test.go b/pump/storage/storage_test.go index 4386b657c..faf19d194 100644 --- a/pump/storage/storage_test.go +++ b/pump/storage/storage_test.go @@ -305,3 +305,30 @@ func (as *AppendSuit) TestResolve(c *check.C) { // TODO test the case we query tikv to know weather a txn a commit // is there a fake or mock kv.Storage and tikv.LockResolver to easy the test? } + +type OpenDBSuit struct { + dir string +} + +var _ = check.Suite(&OpenDBSuit{}) + +func (s *OpenDBSuit) SetUpTest(c *check.C) { + s.dir = c.MkDir() +} + +func (s *OpenDBSuit) TestWhenConfigIsNotProvided(c *check.C) { + _, err := openMetadataDB(s.dir, nil) + c.Assert(err, check.IsNil) +} + +func (s *OpenDBSuit) TestProvidedConfigValsNotOverwritten(c *check.C) { + cf := KVConfig{ + BlockRestartInterval: 32, + WriteL0PauseTrigger: 12, + } + _, err := openMetadataDB(s.dir, &cf) + c.Assert(err, check.IsNil) + c.Assert(cf.BlockRestartInterval, check.Equals, 32) + c.Assert(cf.WriteL0PauseTrigger, check.Equals, 12) + c.Assert(cf.BlockCacheCapacity, check.Equals, defaultStorageKVConfig.BlockCacheCapacity) +} diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 7888df11d..75c8f88fa 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -29,7 +29,7 @@ type Options struct { ValueLogFileSize int64 Sync bool - Storage *Config + KVConfig *KVConfig } // DefaultOptions return the default options @@ -41,8 +41,8 @@ func DefaultOptions() *Options { } // WithStorage set the Config -func (o *Options) WithStorage(storage *Config) *Options { - o.Storage = storage +func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options { + o.KVConfig = kvConfig return o }