Skip to content

Commit

Permalink
cherry-pick: pump/: Add syn-log option (#529)
Browse files Browse the repository at this point in the history
* pump: Extract function and add unit tests (#503)

* pump/: Add syn-log option (#509)
  • Loading branch information
july2993 authored Apr 10, 2019
1 parent b03249e commit b8353ed
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 56 deletions.
5 changes: 4 additions & 1 deletion cmd/pump/pump.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pd-urls = "http://127.0.0.1:2379"
# Path of file that contains X509 key in PEM format for connection with cluster components.
# ssl-key = "/path/to/drainer-key.pem"
#
# [storage]
# Set to `true` (default) for best reliability, which prevents data loss when there is a power failure.
# sync-log = true
#
# we suggest using the default config of the embedded LSM DB now, do not change it useless you know what you are doing
# [storage.kv]
Expand All @@ -36,7 +39,7 @@ pd-urls = "http://127.0.0.1:2379"
# compaction-L0-trigger = 8
# compaction-table-size = 67108864
# compaction-total-size = 536870912
# compaction-total-size-multiplier = 8
# compaction-total-size-multiplier = 8.0
# write-buffer = 67108864
# write-L0-pause-trigger = 24
# write-L0-slowdown-trigger = 17
2 changes: 1 addition & 1 deletion pump/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Config struct {
configFile string
printVersion bool
tls *tls.Config
Storage *storage.Config `toml:"storage" json:"storage"`
Storage storage.Config `toml:"storage" json:"storage"`
}

// NewConfig return an instance of configuration
Expand Down
5 changes: 4 additions & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func NewServer(cfg *Config) (*Server, error) {
return nil, errors.Trace(err)
}

options := storage.DefaultOptions().WithStorage(cfg.Storage)
options := storage.DefaultOptions()
options = options.WithKVConfig(cfg.Storage.KV)
options = options.WithSync(cfg.Storage.GetSyncLog())

storage, err := storage.NewAppendWithResolver(cfg.DataDir, options, tiStore, lockResolver)
if err != nil {
return nil, errors.Trace(err)
Expand Down
112 changes: 62 additions & 50 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ func NewAppend(dir string, options *Options) (append *Append, err error) {
}

// NewAppendWithResolver returns a instance of Append
// if tiStore and tiLockResolver is not nil, we will try to query tikv to know weather a txn is committed
// if tiStore and tiLockResolver is not nil, we will try to query tikv to know whether a txn is committed
func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiLockResolver *tikv.LockResolver) (append *Append, err error) {
if options == nil {
options = DefaultOptions()
}

kvDir := path.Join(dir, "kv")
log.Infof("options: %+v", options)

valueDir := path.Join(dir, "value")
err = os.MkdirAll(valueDir, 0755)
if err != nil {
Expand All @@ -113,28 +114,8 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
return nil, errors.Trace(err)
}

var opt opt.Options
cf := options.Storage
if cf == nil {
cf = defaultStorageConfig
} else {
setDefaultStorageConfig(cf)
}

log.Infof("storage config: %+v", cf)

opt.BlockCacheCapacity = cf.BlockCacheCapacity
opt.BlockRestartInterval = cf.BlockRestartInterval
opt.BlockSize = cf.BlockSize
opt.CompactionL0Trigger = cf.CompactionL0Trigger
opt.CompactionTableSize = cf.CompactionTableSize
opt.CompactionTotalSize = cf.CompactionTotalSize
opt.CompactionTotalSizeMultiplier = cf.CompactionTotalSizeMultiplier
opt.WriteBuffer = cf.WriteBuffer
opt.WriteL0PauseTrigger = cf.WriteL0PauseTrigger
opt.WriteL0SlowdownTrigger = cf.WriteL0SlowdownTrigger

metadata, err := leveldb.OpenFile(kvDir, &opt)
kvDir := path.Join(dir, "kv")
metadata, err := openMetadataDB(kvDir, options.KVConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -229,7 +210,6 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL

append.wg.Add(1)
go append.updateStatus()

return
}

Expand Down Expand Up @@ -1014,7 +994,17 @@ func getStorageSize(dir string) (size storageSize, err error) {

// Config holds the configuration of storage
type Config struct {
KVConfig `toml:"kv" json:"kv"`
SyncLog *bool `toml:"sync-log" json:"sync-log"`
KV *KVConfig `toml:"kv" json:"kv"`
}

// GetSyncLog return sync-log config option
func (c *Config) GetSyncLog() bool {
if c.SyncLog == nil {
return true
}

return *c.SyncLog
}

// KVConfig if the configuration of goleveldb
Expand All @@ -1031,50 +1021,72 @@ type KVConfig struct {
WriteL0SlowdownTrigger int `toml:"write-L0-slowdown-trigger" json:"write-L0-slowdown-trigger"`
}

var defaultStorageConfig = &Config{
KVConfig: KVConfig{
BlockCacheCapacity: 8388608,
BlockRestartInterval: 16,
BlockSize: 4096,
CompactionL0Trigger: 8,
CompactionTableSize: 67108864,
CompactionTotalSize: 536870912,
CompactionTotalSizeMultiplier: 8,
WriteBuffer: 67108864,
WriteL0PauseTrigger: 24,
WriteL0SlowdownTrigger: 17,
},
var defaultStorageKVConfig = &KVConfig{
BlockCacheCapacity: 8388608,
BlockRestartInterval: 16,
BlockSize: 4096,
CompactionL0Trigger: 8,
CompactionTableSize: 67108864,
CompactionTotalSize: 536870912,
CompactionTotalSizeMultiplier: 8,
WriteBuffer: 67108864,
WriteL0PauseTrigger: 24,
WriteL0SlowdownTrigger: 17,
}

func setDefaultStorageConfig(cf *Config) {
func setDefaultStorageConfig(cf *KVConfig) {
if cf.BlockCacheCapacity <= 0 {
cf.BlockCacheCapacity = defaultStorageConfig.BlockCacheCapacity
cf.BlockCacheCapacity = defaultStorageKVConfig.BlockCacheCapacity
}
if cf.BlockRestartInterval <= 0 {
cf.BlockRestartInterval = defaultStorageConfig.BlockRestartInterval
cf.BlockRestartInterval = defaultStorageKVConfig.BlockRestartInterval
}
if cf.BlockSize <= 0 {
cf.BlockSize = defaultStorageConfig.BlockSize
cf.BlockSize = defaultStorageKVConfig.BlockSize
}
if cf.CompactionL0Trigger <= 0 {
cf.CompactionL0Trigger = defaultStorageConfig.CompactionL0Trigger
cf.CompactionL0Trigger = defaultStorageKVConfig.CompactionL0Trigger
}
if cf.CompactionTableSize <= 0 {
cf.CompactionTableSize = defaultStorageConfig.CompactionTableSize
cf.CompactionTableSize = defaultStorageKVConfig.CompactionTableSize
}
if cf.CompactionTotalSize <= 0 {
cf.CompactionTotalSize = defaultStorageConfig.CompactionTotalSize
cf.CompactionTotalSize = defaultStorageKVConfig.CompactionTotalSize
}
if cf.CompactionTotalSizeMultiplier <= 0 {
cf.CompactionTotalSizeMultiplier = defaultStorageConfig.CompactionTotalSizeMultiplier
cf.CompactionTotalSizeMultiplier = defaultStorageKVConfig.CompactionTotalSizeMultiplier
}
if cf.WriteBuffer <= 0 {
cf.WriteBuffer = defaultStorageConfig.WriteBuffer
cf.WriteBuffer = defaultStorageKVConfig.WriteBuffer
}
if cf.WriteL0PauseTrigger <= 0 {
cf.WriteL0PauseTrigger = defaultStorageConfig.WriteL0PauseTrigger
cf.WriteL0PauseTrigger = defaultStorageKVConfig.WriteL0PauseTrigger
}
if cf.WriteL0SlowdownTrigger <= 0 {
cf.WriteL0SlowdownTrigger = defaultStorageConfig.WriteL0SlowdownTrigger
cf.WriteL0SlowdownTrigger = defaultStorageKVConfig.WriteL0SlowdownTrigger
}
}

func openMetadataDB(kvDir string, cf *KVConfig) (*leveldb.DB, error) {
if cf == nil {
cf = defaultStorageKVConfig
} else {
setDefaultStorageConfig(cf)
}

log.Infof("Storage config: %+v", cf)

var opt opt.Options
opt.BlockCacheCapacity = cf.BlockCacheCapacity
opt.BlockRestartInterval = cf.BlockRestartInterval
opt.BlockSize = cf.BlockSize
opt.CompactionL0Trigger = cf.CompactionL0Trigger
opt.CompactionTableSize = cf.CompactionTableSize
opt.CompactionTotalSize = cf.CompactionTotalSize
opt.CompactionTotalSizeMultiplier = cf.CompactionTotalSizeMultiplier
opt.WriteBuffer = cf.WriteBuffer
opt.WriteL0PauseTrigger = cf.WriteL0PauseTrigger
opt.WriteL0SlowdownTrigger = cf.WriteL0SlowdownTrigger

return leveldb.OpenFile(kvDir, &opt)
}
27 changes: 27 additions & 0 deletions pump/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,30 @@ func (as *AppendSuit) TestResolve(c *check.C) {
// TODO test the case we query tikv to know weather a txn a commit
// is there a fake or mock kv.Storage and tikv.LockResolver to easy the test?
}

type OpenDBSuit struct {
dir string
}

var _ = check.Suite(&OpenDBSuit{})

func (s *OpenDBSuit) SetUpTest(c *check.C) {
s.dir = c.MkDir()
}

func (s *OpenDBSuit) TestWhenConfigIsNotProvided(c *check.C) {
_, err := openMetadataDB(s.dir, nil)
c.Assert(err, check.IsNil)
}

func (s *OpenDBSuit) TestProvidedConfigValsNotOverwritten(c *check.C) {
cf := KVConfig{
BlockRestartInterval: 32,
WriteL0PauseTrigger: 12,
}
_, err := openMetadataDB(s.dir, &cf)
c.Assert(err, check.IsNil)
c.Assert(cf.BlockRestartInterval, check.Equals, 32)
c.Assert(cf.WriteL0PauseTrigger, check.Equals, 12)
c.Assert(cf.BlockCacheCapacity, check.Equals, defaultStorageKVConfig.BlockCacheCapacity)
}
6 changes: 3 additions & 3 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Options struct {
ValueLogFileSize int64
Sync bool

Storage *Config
KVConfig *KVConfig
}

// DefaultOptions return the default options
Expand All @@ -41,8 +41,8 @@ func DefaultOptions() *Options {
}

// WithStorage set the Config
func (o *Options) WithStorage(storage *Config) *Options {
o.Storage = storage
func (o *Options) WithKVConfig(kvConfig *KVConfig) *Options {
o.KVConfig = kvConfig
return o
}

Expand Down

0 comments on commit b8353ed

Please sign in to comment.