diff --git a/chain_events/listener.go b/chain_events/listener.go index ff7b5d5a..30b44024 100644 --- a/chain_events/listener.go +++ b/chain_events/listener.go @@ -71,6 +71,8 @@ func NewListener( opt(listener) } + log.Debug(listener) + return listener } @@ -167,6 +169,7 @@ func (l *ListenerImpl) Start() Listener { // Unable to connect to chain, pause system. if l.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") + entry.Warn(err) if err := l.systemService.Pause(); err != nil { entry. WithFields(log.Fields{"error": err}). diff --git a/configs/configs.go b/configs/configs.go index 10ce450a..cab02950 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -130,6 +130,28 @@ type Config struct { // Max transactions per second, rate at which the service can submit transactions to Flow TransactionMaxSendRate int `env:"MAX_TPS" envDefault:"10"` + + // maxJobErrorCount is the maximum number of times a Job can be tried to + // execute before considering it completely failed. + MaxJobErrorCount int `env:"MAX_JOB_ERROR_COUNT" envDefault:"10"` + + // Poll DB for new schedulable jobs every 30s. + DBJobPollInterval time.Duration `env:"DB_JOB_POLL_INTERVAL" envDefault:"30s"` + + // Grace time period before re-scheduling jobs that are in state INIT or + // ACCEPTED. These are jobs where the executor processing has been + // unexpectedly disrupted (such as bug, dead node, disconnected + // networking etc.). + AcceptedGracePeriod time.Duration `env:"ACCEPTED_GRACE_PERIOD" envDefault:"180s"` + + // Grace time period before re-scheduling jobs that are up for immediate + // restart (such as NO_AVAILABLE_WORKERS or ERROR). + ReSchedulableGracePeriod time.Duration `env:"RESCHEDULABLE_GRACE_PERIOD" envDefault:"60s"` + + // Sleep duration in case of service isHalted + PauseDuration time.Duration `env:"PAUSE_DURATION" envDefault:"60s"` + + GrpcMaxCallRecvMsgSize int `env:"GRPC_MAX_CALL_RECV_MSG_SIZE" envDefault:"16777216"` } // Parse parses environment variables and flags to a valid Config. diff --git a/jobs/workerpool.go b/jobs/workerpool.go index 73776ee8..92d0e370 100644 --- a/jobs/workerpool.go +++ b/jobs/workerpool.go @@ -112,6 +112,8 @@ func NewWorkerPool(db Store, capacity uint, workerCount uint, opts ...WorkerPool // Register asynchronous job executor. pool.RegisterExecutor(SendJobStatusJobType, pool.executeSendJobStatus) + pool.logger.Debug(pool) + return pool } @@ -320,6 +322,7 @@ func (wp *WorkerPoolImpl) startWorkers() { if wallet_errors.IsChainConnectionError(err) { if wp.systemService != nil { entry.Warn("Unable to connect to chain, pausing system") + entry.Warn(err) // Unable to connect to chain, pause system. if err := wp.systemService.Pause(); err != nil { entry. diff --git a/main.go b/main.go index 24333157..123883ee 100644 --- a/main.go +++ b/main.go @@ -74,7 +74,11 @@ func runServer(cfg *configs.Config) { // Flow client // TODO: WithInsecure()? - fc, err := client.New(cfg.AccessAPIHost, grpc.WithTransportCredentials(insecure.NewCredentials())) + fc, err := client.New( + cfg.AccessAPIHost, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(cfg.GrpcMaxCallRecvMsgSize)), + ) if err != nil { log.Fatal(err) } @@ -92,7 +96,10 @@ func runServer(cfg *configs.Config) { } defer gorm.Close(db) - systemService := system.NewService(system.NewGormStore(db)) + systemService := system.NewService( + system.NewGormStore(db), + system.WithPauseDuration(cfg.PauseDuration), + ) // Create a worker pool wp := jobs.NewWorkerPool( @@ -101,6 +108,10 @@ func runServer(cfg *configs.Config) { cfg.WorkerCount, jobs.WithJobStatusWebhook(cfg.JobStatusWebhookUrl, cfg.JobStatusWebhookTimeout), jobs.WithSystemService(systemService), + jobs.WithMaxJobErrorCount(cfg.MaxJobErrorCount), + jobs.WithDbJobPollInterval(cfg.DBJobPollInterval), + jobs.WithAcceptedGracePeriod(cfg.AcceptedGracePeriod), + jobs.WithReSchedulableGracePeriod(cfg.ReSchedulableGracePeriod), ) defer func() {