Skip to content

Commit

Permalink
Merge pull request #284 from dkuryakin/main
Browse files Browse the repository at this point in the history
Make some worker & service consts configurable.
  • Loading branch information
latenssi authored May 27, 2022
2 parents 75caebc + dc391bd commit d3002fb
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 2 deletions.
3 changes: 3 additions & 0 deletions chain_events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func NewListener(
opt(listener)
}

log.Debug(listener)

return listener
}

Expand Down Expand Up @@ -167,6 +169,7 @@ func (l *ListenerImpl) Start() Listener {
// Unable to connect to chain, pause system.
if l.systemService != nil {
entry.Warn("Unable to connect to chain, pausing system")
entry.Warn(err)
if err := l.systemService.Pause(); err != nil {
entry.
WithFields(log.Fields{"error": err}).
Expand Down
22 changes: 22 additions & 0 deletions configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,28 @@ type Config struct {

// Max transactions per second, rate at which the service can submit transactions to Flow
TransactionMaxSendRate int `env:"MAX_TPS" envDefault:"10"`

// maxJobErrorCount is the maximum number of times a Job can be tried to
// execute before considering it completely failed.
MaxJobErrorCount int `env:"MAX_JOB_ERROR_COUNT" envDefault:"10"`

// Poll DB for new schedulable jobs every 30s.
DBJobPollInterval time.Duration `env:"DB_JOB_POLL_INTERVAL" envDefault:"30s"`

// Grace time period before re-scheduling jobs that are in state INIT or
// ACCEPTED. These are jobs where the executor processing has been
// unexpectedly disrupted (such as bug, dead node, disconnected
// networking etc.).
AcceptedGracePeriod time.Duration `env:"ACCEPTED_GRACE_PERIOD" envDefault:"180s"`

// Grace time period before re-scheduling jobs that are up for immediate
// restart (such as NO_AVAILABLE_WORKERS or ERROR).
ReSchedulableGracePeriod time.Duration `env:"RESCHEDULABLE_GRACE_PERIOD" envDefault:"60s"`

// Sleep duration in case of service isHalted
PauseDuration time.Duration `env:"PAUSE_DURATION" envDefault:"60s"`

GrpcMaxCallRecvMsgSize int `env:"GRPC_MAX_CALL_RECV_MSG_SIZE" envDefault:"16777216"`
}

// Parse parses environment variables and flags to a valid Config.
Expand Down
3 changes: 3 additions & 0 deletions jobs/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func NewWorkerPool(db Store, capacity uint, workerCount uint, opts ...WorkerPool
// Register asynchronous job executor.
pool.RegisterExecutor(SendJobStatusJobType, pool.executeSendJobStatus)

pool.logger.Debug(pool)

return pool
}

Expand Down Expand Up @@ -320,6 +322,7 @@ func (wp *WorkerPoolImpl) startWorkers() {
if wallet_errors.IsChainConnectionError(err) {
if wp.systemService != nil {
entry.Warn("Unable to connect to chain, pausing system")
entry.Warn(err)
// Unable to connect to chain, pause system.
if err := wp.systemService.Pause(); err != nil {
entry.
Expand Down
15 changes: 13 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ func runServer(cfg *configs.Config) {

// Flow client
// TODO: WithInsecure()?
fc, err := client.New(cfg.AccessAPIHost, grpc.WithTransportCredentials(insecure.NewCredentials()))
fc, err := client.New(
cfg.AccessAPIHost,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.GrpcMaxCallRecvMsgSize)),
)
if err != nil {
log.Fatal(err)
}
Expand All @@ -92,7 +96,10 @@ func runServer(cfg *configs.Config) {
}
defer gorm.Close(db)

systemService := system.NewService(system.NewGormStore(db))
systemService := system.NewService(
system.NewGormStore(db),
system.WithPauseDuration(cfg.PauseDuration),
)

// Create a worker pool
wp := jobs.NewWorkerPool(
Expand All @@ -101,6 +108,10 @@ func runServer(cfg *configs.Config) {
cfg.WorkerCount,
jobs.WithJobStatusWebhook(cfg.JobStatusWebhookUrl, cfg.JobStatusWebhookTimeout),
jobs.WithSystemService(systemService),
jobs.WithMaxJobErrorCount(cfg.MaxJobErrorCount),
jobs.WithDbJobPollInterval(cfg.DBJobPollInterval),
jobs.WithAcceptedGracePeriod(cfg.AcceptedGracePeriod),
jobs.WithReSchedulableGracePeriod(cfg.ReSchedulableGracePeriod),
)

defer func() {
Expand Down

0 comments on commit d3002fb

Please sign in to comment.