Skip to content

Commit

Permalink
config for snapshot worker count
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed Apr 24, 2023
1 parent 228a702 commit 37f0c27
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 27 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,7 @@ type DBSyncConfig struct {
Enable bool `mapstructure:"db-sync-enable"`
SnapshotInterval int `mapstructure:"snapshot-interval"`
SnapshotDirectory string `mapstructure:"snapshot-directory"`
SnapshotWorkerCount int `mapstructure:"snapshot-worker-count"`
TimeoutInSeconds int `mapstructure:"timeout-in-seconds"`
NoFileSleepInSeconds int `mapstructure:"no-file-sleep-in-seconds"`
FileWorkerCount int `mapstructure:"file-worker-count"`
Expand All @@ -1278,6 +1279,7 @@ func DefaultDBSyncConfig() *DBSyncConfig {
Enable: false,
SnapshotInterval: 0,
SnapshotDirectory: "",
SnapshotWorkerCount: 16,
TimeoutInSeconds: 600,
NoFileSleepInSeconds: 5,
FileWorkerCount: 8,
Expand Down
1 change: 1 addition & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ restart-cooldown-seconds = {{ .SelfRemediation.RestartCooldownSeconds }}
db-sync-enable = "{{ .DBSync.Enable }}"
snapshot-interval = "{{ .DBSync.SnapshotInterval }}"
snapshot-directory = "{{ .DBSync.SnapshotDirectory }}"
snapshot-worker-count = "{{ .DBSync.SnapshotWorkerCount }}"
timeout-in-seconds = "{{ .DBSync.TimeoutInSeconds }}"
no-file-sleep-in-seconds = "{{ .DBSync.NoFileSleepInSeconds }}"
file-worker-count = "{{ .DBSync.FileWorkerCount }}"
Expand Down
47 changes: 20 additions & 27 deletions internal/dbsync/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@ package dbsync
import (
"crypto/md5"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path"
"sync"

"github.com/tendermint/tendermint/config"
dstypes "github.com/tendermint/tendermint/proto/tendermint/dbsync"
)

const WORKER_COUNT = 32

func Snapshot(height uint64, dbsyncConfig config.DBSyncConfig, baseConfig config.BaseConfig) error {
src := path.Join(baseConfig.DBDir(), ApplicationDBSubdirectory)
wasmSrc := path.Join(baseConfig.RootDir, WasmDirectory)
Expand All @@ -36,10 +34,10 @@ func Snapshot(height uint64, dbsyncConfig config.DBSyncConfig, baseConfig config
}
}

assignments := make([][]os.FileInfo, WORKER_COUNT)
assignments := make([][]os.FileInfo, dbsyncConfig.SnapshotWorkerCount)

for i, fd := range fds {
assignments[i%WORKER_COUNT] = append(assignments[i%WORKER_COUNT], fd)
assignments[i%dbsyncConfig.SnapshotWorkerCount] = append(assignments[i%dbsyncConfig.SnapshotWorkerCount], fd)
}

metadata := dstypes.MetadataResponse{
Expand All @@ -50,7 +48,7 @@ func Snapshot(height uint64, dbsyncConfig config.DBSyncConfig, baseConfig config
metadataMtx := &sync.Mutex{}

wg := sync.WaitGroup{}
for i := 0; i < WORKER_COUNT; i++ {
for i := 0; i < dbsyncConfig.SnapshotWorkerCount; i++ {
wg.Add(1)
assignment := assignments[i]
go func() {
Expand All @@ -64,25 +62,20 @@ func Snapshot(height uint64, dbsyncConfig config.DBSyncConfig, baseConfig config
dstfp = path.Join(dst, fd.Name())
}

// var srcfd *os.File
// var dstfd *os.File
// if srcfd, err = os.Open(srcfp); err != nil {
// panic(err)
// }

// if dstfd, err = os.Create(dstfp); err != nil {
// srcfd.Close()
// panic(err)
// }

// if _, err = io.Copy(dstfd, srcfd); err != nil {
// srcfd.Close()
// dstfd.Close()
// panic(err)
// }
cmd := exec.Command("cp", srcfp, dstfp)
_, err := cmd.Output()
if err != nil {
var srcfd *os.File
var dstfd *os.File
if srcfd, err = os.Open(srcfp); err != nil {
panic(err)
}

if dstfd, err = os.Create(dstfp); err != nil {
srcfd.Close()
panic(err)
}

if _, err = io.Copy(dstfd, srcfd); err != nil {
srcfd.Close()
dstfd.Close()
panic(err)
}

Expand All @@ -102,8 +95,8 @@ func Snapshot(height uint64, dbsyncConfig config.DBSyncConfig, baseConfig config
metadata.Md5Checksum = append(metadata.Md5Checksum, sum[:])

metadataMtx.Unlock()
// srcfd.Close()
// dstfd.Close()
srcfd.Close()
dstfd.Close()
}
wg.Done()
}()
Expand Down

0 comments on commit 37f0c27

Please sign in to comment.