Skip to content

Commit

Permalink
feat(Dgraph): Add separate compression flag for z and wal dirs (#6401) (
Browse files Browse the repository at this point in the history
#6421)

* Add separate compression flag for z and wal dirs

* Address comments

* Address comments

(cherry picked from commit 601cc3b)
  • Loading branch information
vmrajas authored Sep 18, 2020
1 parent 1e81ff6 commit cf876b6
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 20 deletions.
30 changes: 21 additions & 9 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ they form a Raft group and provide synchronous replication.
"log directory. mmap consumes more RAM, but provides better performance. If you pass "+
"two values separated by a comma the first value will be used for the postings "+
"directory and the second for the w directory.")
flag.Int("badger.compression_level", 3,
"The compression level for Badger. A higher value uses more resources.")
flag.String("badger.compression_level", "3,0",
"Specifies the compression level for the postings and write-ahead log "+
"directory. A higher value uses more resources. The value of 0 disables "+
"compression. If you pass two values separated by a comma the first "+
"value will be used for the postings directory (p) and the second for "+
"the wal directory (w). If a single value is passed the value is used "+
"as compression level for both directories.")
enc.RegisterFlags(flag)

// Snapshot and Transactions.
Expand Down Expand Up @@ -603,14 +608,21 @@ func run() {
wstoreBlockCacheSize := (cachePercent[3] * (totalCache << 20)) / 100
wstoreIndexCacheSize := (cachePercent[4] * (totalCache << 20)) / 100

compressionLevelString := Alpha.Conf.GetString("badger.compression_level")
compressionLevels, err := x.GetCompressionLevels(compressionLevelString)
x.Check(err)
postingDirCompressionLevel := compressionLevels[0]
walDirCompressionLevel := compressionLevels[1]

opts := worker.Options{
BadgerCompressionLevel: Alpha.Conf.GetInt("badger.compression_level"),
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
WBlockCacheSize: wstoreBlockCacheSize,
WIndexCacheSize: wstoreIndexCacheSize,
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
PostingDirCompressionLevel: postingDirCompressionLevel,
WALDirCompressionLevel: walDirCompressionLevel,
PBlockCacheSize: pstoreBlockCacheSize,
PIndexCacheSize: pstoreIndexCacheSize,
WBlockCacheSize: wstoreBlockCacheSize,
WIndexCacheSize: wstoreIndexCacheSize,

MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
Expand Down
8 changes: 6 additions & 2 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ type Options struct {
// BadgerWalVlog is the name of the mode used to load the badger value log for the w directory.
BadgerWalVlog string

// BadgerCompressionLevel is the ZSTD compression level used by badger. A
// WALDirCompressionLevel is the ZSTD compression level used by WAL directory. A
// higher value means more CPU intensive compression and better compression
// ratio.
BadgerCompressionLevel int
WALDirCompressionLevel int
// PostingDirCompressionLevel is the ZSTD compression level used by Postings directory. A
// higher value means more CPU intensive compression and better compression
// ratio.
PostingDirCompressionLevel int
// WALDir is the path to the directory storing the write-ahead log.
WALDir string
// MutationsMode is the mode used to handle mutation requests.
Expand Down
27 changes: 18 additions & 9 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,34 @@ func setBadgerOptions(opt badger.Options, wal bool) badger.Options {
// saved by disabling it.
opt.DetectConflicts = false

glog.Infof("Setting Badger Compression Level: %d", Config.BadgerCompressionLevel)
// Default value of badgerCompressionLevel is 3 so compression will always
// be enabled, unless it is explicitly disabled by setting the value to 0.
if Config.BadgerCompressionLevel != 0 {
// By default, compression is disabled in badger.
opt.Compression = options.ZSTD
opt.ZSTDCompressionLevel = Config.BadgerCompressionLevel
}

var badgerTables string
var badgerVlog string
if wal {
// Settings for the write-ahead log.
badgerTables = Config.BadgerWalTables
badgerVlog = Config.BadgerWalVlog

glog.Infof("Setting WAL Dir Compression Level: %d", Config.WALDirCompressionLevel)
// Default value of WALDirCompressionLevel is 0 so compression will always
// be disabled, unless it is explicitly enabled by setting the value to greater than 0.
if Config.WALDirCompressionLevel != 0 {
// By default, compression is disabled in badger.
opt.Compression = options.ZSTD
opt.ZSTDCompressionLevel = Config.WALDirCompressionLevel
}
} else {
// Settings for the data directory.
badgerTables = Config.BadgerTables
badgerVlog = Config.BadgerVlog

glog.Infof("Setting Posting Dir Compression Level: %d", Config.PostingDirCompressionLevel)
// Default value of postingDirCompressionLevel is 3 so compression will always
// be enabled, unless it is explicitly disabled by setting the value to 0.
if Config.PostingDirCompressionLevel != 0 {
// By default, compression is disabled in badger.
opt.Compression = options.ZSTD
opt.ZSTDCompressionLevel = Config.PostingDirCompressionLevel
}
}

glog.Infof("Setting Badger table load option: %s", Config.BadgerTables)
Expand Down
35 changes: 35 additions & 0 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,3 +1074,38 @@ func GetCachePercentages(cpString string, numExpected int) ([]int64, error) {

return cachePercent, nil
}

// ParseCompressionLevel returns compression level(int) given the compression level(string)
func ParseCompressionLevel(compressionLevel string) (int, error) {
x, err := strconv.Atoi(compressionLevel)
if err != nil {
return 0, errors.Errorf("ERROR: unable to parse compression level(%s)", compressionLevel)
}
if x < 0 {
return 0, errors.Errorf("ERROR: compression level(%s) cannot be negative", compressionLevel)
}
return x, nil
}

// GetCompressionLevels returns the slice of compression levels given the "," (comma) separated
// compression levels(integers) string.
func GetCompressionLevels(compressionLevelsString string) ([]int, error) {
compressionLevels := strings.Split(compressionLevelsString, ",")
// Validity checks
if len(compressionLevels) != 1 && len(compressionLevels) != 2 {
return nil, errors.Errorf("ERROR: expected single integer or two comma separated integers")
}
var compressionLevelsInt []int
for _, cLevel := range compressionLevels {
x, err := ParseCompressionLevel(cLevel)
if err != nil {
return nil, err
}
compressionLevelsInt = append(compressionLevelsInt, x)
}
// Append the same compression level in case only one level was passed.
if len(compressionLevelsInt) == 1 {
compressionLevelsInt = append(compressionLevelsInt, compressionLevelsInt[0])
}
return compressionLevelsInt, nil
}

0 comments on commit cf876b6

Please sign in to comment.