From 51e5aa641a8fb81638262765fe03a7f541750d6f Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 10 Feb 2024 22:14:41 +0100 Subject: [PATCH] cleanup code & move task scheduler into its own package --- .../{test => scheduler}/scheduler.go | 12 ++++---- .../tasks/check_clients_are_healthy/task.go | 2 +- .../check_consensus_attestation_stats/task.go | 8 ++--- .../check_consensus_block_proposals/task.go | 6 ++-- .../tasks/check_consensus_finality/task.go | 4 +-- .../tasks/check_consensus_forks/task.go | 4 +-- .../check_consensus_proposer_duty/task.go | 6 ++-- .../tasks/check_consensus_reorgs/task.go | 6 ++-- .../tasks/check_consensus_slot_range/task.go | 4 +-- .../tasks/check_consensus_sync_status/task.go | 2 +- .../check_consensus_validator_status/task.go | 6 ++-- .../tasks/check_execution_sync_status/task.go | 2 +- .../tasks/generate_blob_transactions/task.go | 10 +++---- .../tasks/generate_bls_changes/task.go | 6 ++-- .../tasks/generate_child_wallet/task.go | 4 +-- .../tasks/generate_deposits/task.go | 8 ++--- .../tasks/generate_eoa_transactions/task.go | 10 +++---- pkg/coordinator/tasks/generate_exits/task.go | 6 ++-- .../tasks/generate_slashings/task.go | 10 +++---- .../tasks/generate_transaction/task.go | 6 ++-- pkg/coordinator/test/test.go | 17 ++++++----- pkg/coordinator/types/coordinator.go | 1 - .../types/{taskctx.go => scheduler.go} | 30 +++++-------------- pkg/coordinator/types/task.go | 22 ++++++++++++++ 24 files changed, 100 insertions(+), 92 deletions(-) rename pkg/coordinator/{test => scheduler}/scheduler.go (97%) rename pkg/coordinator/types/{taskctx.go => scheduler.go} (53%) diff --git a/pkg/coordinator/test/scheduler.go b/pkg/coordinator/scheduler/scheduler.go similarity index 97% rename from pkg/coordinator/test/scheduler.go rename to pkg/coordinator/scheduler/scheduler.go index 34ac439..166380c 100644 --- a/pkg/coordinator/test/scheduler.go +++ b/pkg/coordinator/scheduler/scheduler.go @@ -1,4 +1,4 @@ -package test +package scheduler import ( "context" @@ -16,7 +16,7 @@ import ( ) type TaskScheduler struct { - coordinator types.Coordinator + services types.TaskServices logger logrus.FieldLogger rootVars types.Variables taskCount uint64 @@ -47,7 +47,7 @@ type taskExecutionState struct { resultMutex sync.RWMutex } -func NewTaskScheduler(log logrus.FieldLogger, coordinator types.Coordinator, variables types.Variables) *TaskScheduler { +func NewTaskScheduler(log logrus.FieldLogger, services types.TaskServices, variables types.Variables) *TaskScheduler { return &TaskScheduler{ logger: log, rootVars: variables, @@ -55,7 +55,7 @@ func NewTaskScheduler(log logrus.FieldLogger, coordinator types.Coordinator, var rootTasks: make([]types.Task, 0), allTasks: make([]types.Task, 0), taskStateMap: make(map[types.Task]*taskExecutionState), - coordinator: coordinator, + services: services, } } @@ -67,8 +67,8 @@ func (ts *TaskScheduler) GetTaskCount() int { return len(ts.allTasks) } -func (ts *TaskScheduler) GetCoordinator() types.Coordinator { - return ts.coordinator +func (ts *TaskScheduler) GetServices() types.TaskServices { + return ts.services } func (ts *TaskScheduler) ParseTaskOptions(rawtask *helper.RawMessage) (*types.TaskOptions, error) { diff --git a/pkg/coordinator/tasks/check_clients_are_healthy/task.go b/pkg/coordinator/tasks/check_clients_are_healthy/task.go index 43dd2f1..a4442d3 100644 --- a/pkg/coordinator/tasks/check_clients_are_healthy/task.go +++ b/pkg/coordinator/tasks/check_clients_are_healthy/task.go @@ -106,7 +106,7 @@ func (t *Task) processCheck() { totalClientCount := 0 failedClients := []string{} - for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientPattern, "") { + for _, client := range t.ctx.Scheduler.GetServices().ClientPool().GetClientsByNamePatterns(t.config.ClientPattern, "") { totalClientCount++ checkResult := t.processClientCheck(client) diff --git a/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go b/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go index c6d271e..5ef261b 100644 --- a/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go +++ b/pkg/coordinator/tasks/check_consensus_attestation_stats/task.go @@ -101,7 +101,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() blockSubscription := consensusPool.GetBlockCache().SubscribeBlockEvent(10) defer blockSubscription.Unsubscribe() @@ -152,7 +152,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) processBlock(ctx context.Context, block *consensus.Block) { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() specs := consensusPool.GetBlockCache().GetSpecs() blockBody := block.AwaitBlock(ctx, 500*time.Millisecond) @@ -226,7 +226,7 @@ func (t *Task) processBlock(ctx context.Context, block *consensus.Block) { } func (t *Task) runAttestationStatsCheck(ctx context.Context, epoch uint64) { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() canonicalFork := consensusPool.GetCanonicalFork(1) epochVotes := t.aggregateEpochVotes(ctx, epoch) @@ -351,7 +351,7 @@ func (t *Task) newEpochVotes(base *epochVotes) *epochVotes { func (t *Task) aggregateEpochVotes(ctx context.Context, epoch uint64) []*epochVotes { t1 := time.Now() - consensusBlockCache := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetBlockCache() + consensusBlockCache := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetBlockCache() specs := consensusBlockCache.GetSpecs() firstSlot := epoch * specs.SlotsPerEpoch diff --git a/pkg/coordinator/tasks/check_consensus_block_proposals/task.go b/pkg/coordinator/tasks/check_consensus_block_proposals/task.go index af43794..bfb9600 100644 --- a/pkg/coordinator/tasks/check_consensus_block_proposals/task.go +++ b/pkg/coordinator/tasks/check_consensus_block_proposals/task.go @@ -94,7 +94,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() blockSubscription := consensusPool.GetBlockCache().SubscribeBlockEvent(10) defer blockSubscription.Unsubscribe() @@ -138,7 +138,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadValidatorSet(ctx context.Context) { - client := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) validatorSet, err := client.GetRPCClient().GetStateValidators(ctx, "head") if err != nil { @@ -251,7 +251,7 @@ func (t *Task) checkBlockValidatorName(block *consensus.Block, blockData *spec.V return false } - validatorName := t.ctx.Scheduler.GetCoordinator().ValidatorNames().GetValidatorName(uint64(proposerIndex)) + validatorName := t.ctx.Scheduler.GetServices().ValidatorNames().GetValidatorName(uint64(proposerIndex)) matched, err := regexp.MatchString(t.config.ValidatorNamePattern, validatorName) if err != nil { diff --git a/pkg/coordinator/tasks/check_consensus_finality/task.go b/pkg/coordinator/tasks/check_consensus_finality/task.go index 8522a43..f724fdc 100644 --- a/pkg/coordinator/tasks/check_consensus_finality/task.go +++ b/pkg/coordinator/tasks/check_consensus_finality/task.go @@ -87,7 +87,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() wallclockSubscription := consensusPool.GetBlockCache().SubscribeWallclockEpochEvent(10) defer wallclockSubscription.Unsubscribe() @@ -119,7 +119,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) runFinalityCheck() bool { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() _, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now() if err != nil { diff --git a/pkg/coordinator/tasks/check_consensus_forks/task.go b/pkg/coordinator/tasks/check_consensus_forks/task.go index c8c813a..9e4b7af 100644 --- a/pkg/coordinator/tasks/check_consensus_forks/task.go +++ b/pkg/coordinator/tasks/check_consensus_forks/task.go @@ -86,7 +86,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() blockSubscription := consensusPool.GetBlockCache().SubscribeBlockEvent(10) defer blockSubscription.Unsubscribe() @@ -109,7 +109,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) runCheck() types.TaskResult { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() headForks := consensusPool.GetHeadForks(int64(t.config.MaxForkDistance)) if len(headForks)-1 > int(t.config.MaxForkCount) { diff --git a/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go b/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go index 754987c..d94e269 100644 --- a/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go +++ b/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go @@ -89,7 +89,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() wallclockEpochSubscription := consensusPool.GetBlockCache().SubscribeWallclockEpochEvent(10) defer wallclockEpochSubscription.Unsubscribe() @@ -128,7 +128,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadEpochDuties(ctx context.Context, epoch uint64) { - client := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) proposerDuties, err := client.GetRPCClient().GetProposerDuties(ctx, epoch) if err != nil { @@ -169,7 +169,7 @@ func (t *Task) runProposerDutyCheck(slot uint64) bool { } if t.config.ValidatorNamePattern != "" { - validatorName := t.ctx.Scheduler.GetCoordinator().ValidatorNames().GetValidatorName(uint64(duty.ValidatorIndex)) + validatorName := t.ctx.Scheduler.GetServices().ValidatorNames().GetValidatorName(uint64(duty.ValidatorIndex)) matched, err := regexp.MatchString(t.config.ValidatorNamePattern, validatorName) if err != nil { diff --git a/pkg/coordinator/tasks/check_consensus_reorgs/task.go b/pkg/coordinator/tasks/check_consensus_reorgs/task.go index b585b28..b1c424b 100644 --- a/pkg/coordinator/tasks/check_consensus_reorgs/task.go +++ b/pkg/coordinator/tasks/check_consensus_reorgs/task.go @@ -90,7 +90,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() blockSubscription := consensusPool.GetBlockCache().SubscribeBlockEvent(10) defer blockSubscription.Unsubscribe() @@ -134,7 +134,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) runCheck() types.TaskResult { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() _, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now() if err != nil { @@ -158,7 +158,7 @@ func (t *Task) runCheck() types.TaskResult { } func (t *Task) processChainReorg(oldHead, newHead *consensus.Block) { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() parentHead := oldHead newHeadDistance := uint64(0) oldHeadDistance := uint64(0) diff --git a/pkg/coordinator/tasks/check_consensus_slot_range/task.go b/pkg/coordinator/tasks/check_consensus_slot_range/task.go index 3f65957..48bdfd3 100644 --- a/pkg/coordinator/tasks/check_consensus_slot_range/task.go +++ b/pkg/coordinator/tasks/check_consensus_slot_range/task.go @@ -87,7 +87,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() wallclockSubscription := consensusPool.GetBlockCache().SubscribeWallclockSlotEvent(10) defer wallclockSubscription.Unsubscribe() @@ -114,7 +114,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) runRangeCheck() (checkResult, isLower bool) { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() currentSlot, currentEpoch, err := consensusPool.GetBlockCache().GetWallclock().Now() if err != nil { diff --git a/pkg/coordinator/tasks/check_consensus_sync_status/task.go b/pkg/coordinator/tasks/check_consensus_sync_status/task.go index d7f056c..d916a4c 100644 --- a/pkg/coordinator/tasks/check_consensus_sync_status/task.go +++ b/pkg/coordinator/tasks/check_consensus_sync_status/task.go @@ -105,7 +105,7 @@ func (t *Task) processCheck(ctx context.Context) { allResultsPass := true failedClients := []string{} - for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientPattern, "") { + for _, client := range t.ctx.Scheduler.GetServices().ClientPool().GetClientsByNamePatterns(t.config.ClientPattern, "") { var checkResult bool checkLogger := t.logger.WithField("client", client.Config.Name) diff --git a/pkg/coordinator/tasks/check_consensus_validator_status/task.go b/pkg/coordinator/tasks/check_consensus_validator_status/task.go index 7aab89b..621219f 100644 --- a/pkg/coordinator/tasks/check_consensus_validator_status/task.go +++ b/pkg/coordinator/tasks/check_consensus_validator_status/task.go @@ -91,7 +91,7 @@ func (t *Task) LoadConfig() error { } func (t *Task) Execute(ctx context.Context) error { - consensusPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + consensusPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() wallclockEpochSubscription := consensusPool.GetBlockCache().SubscribeWallclockEpochEvent(10) defer wallclockEpochSubscription.Unsubscribe() @@ -121,7 +121,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadValidatorSet(ctx context.Context) { - client := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) validatorSet, err := client.GetRPCClient().GetStateValidators(ctx, "head") if err != nil { @@ -157,7 +157,7 @@ func (t *Task) runValidatorStatusCheck() bool { } if t.config.ValidatorNamePattern != "" { - validatorName := t.ctx.Scheduler.GetCoordinator().ValidatorNames().GetValidatorName(uint64(validator.Index)) + validatorName := t.ctx.Scheduler.GetServices().ValidatorNames().GetValidatorName(uint64(validator.Index)) matched, err := regexp.MatchString(t.config.ValidatorNamePattern, validatorName) if err != nil { diff --git a/pkg/coordinator/tasks/check_execution_sync_status/task.go b/pkg/coordinator/tasks/check_execution_sync_status/task.go index 8ab9d8d..9b742b0 100644 --- a/pkg/coordinator/tasks/check_execution_sync_status/task.go +++ b/pkg/coordinator/tasks/check_execution_sync_status/task.go @@ -105,7 +105,7 @@ func (t *Task) processCheck(ctx context.Context) { allResultsPass := true failedClients := []string{} - for _, client := range t.ctx.Scheduler.GetCoordinator().ClientPool().GetClientsByNamePatterns(t.config.ClientPattern, "") { + for _, client := range t.ctx.Scheduler.GetServices().ClientPool().GetClientsByNamePatterns(t.config.ClientPattern, "") { var checkResult bool checkLogger := t.logger.WithField("client", client.Config.Name) diff --git a/pkg/coordinator/tasks/generate_blob_transactions/task.go b/pkg/coordinator/tasks/generate_blob_transactions/task.go index 81e2f43..de15794 100644 --- a/pkg/coordinator/tasks/generate_blob_transactions/task.go +++ b/pkg/coordinator/tasks/generate_blob_transactions/task.go @@ -102,12 +102,12 @@ func (t *Task) LoadConfig() error { } if config.ChildWallets == 0 { - t.wallet, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletByPrivkey(privKey) + t.wallet, err = t.ctx.Scheduler.GetServices().WalletManager().GetWalletByPrivkey(privKey) if err != nil { return fmt.Errorf("cannot initialize wallet: %w", err) } } else { - t.walletPool, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletPoolByPrivkey(privKey, config.ChildWallets, config.WalletSeed) + t.walletPool, err = t.ctx.Scheduler.GetServices().WalletManager().GetWalletPoolByPrivkey(privKey, config.ChildWallets, config.WalletSeed) if err != nil { return fmt.Errorf("cannot initialize wallet pool: %w", err) } @@ -162,7 +162,7 @@ func (t *Task) Execute(ctx context.Context) error { var subscription *execution.Subscription[*execution.Block] if t.config.LimitPerBlock > 0 { - subscription = t.ctx.Scheduler.GetCoordinator().ClientPool().GetExecutionPool().GetBlockCache().SubscribeBlockEvent(10) + subscription = t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().SubscribeBlockEvent(10) defer subscription.Unsubscribe() } @@ -297,7 +297,7 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c } txObj := ðtypes.BlobTx{ - ChainID: uint256.MustFromBig(t.ctx.Scheduler.GetCoordinator().ClientPool().GetExecutionPool().GetBlockCache().GetChainID()), + ChainID: uint256.MustFromBig(t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID()), Nonce: nonce, BlobFeeCap: uint256.MustFromBig(t.config.BlobFeeCap), GasTipCap: uint256.MustFromBig(t.config.TipCap), @@ -318,7 +318,7 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c var clients []*execution.Client - clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + clientPool := t.ctx.Scheduler.GetServices().ClientPool() if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { clients = clientPool.GetExecutionPool().GetReadyEndpoints() diff --git a/pkg/coordinator/tasks/generate_bls_changes/task.go b/pkg/coordinator/tasks/generate_bls_changes/task.go index cf44865..489481c 100644 --- a/pkg/coordinator/tasks/generate_bls_changes/task.go +++ b/pkg/coordinator/tasks/generate_bls_changes/task.go @@ -122,7 +122,7 @@ func (t *Task) Execute(ctx context.Context) error { var subscription *consensus.Subscription[*consensus.Block] if t.config.LimitPerSlot > 0 { - subscription = t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) + subscription = t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) defer subscription.Unsubscribe() } @@ -173,7 +173,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadChainState(ctx context.Context) (map[phase0.ValidatorIndex]*v1.Validator, error) { - client := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) validators, err := client.GetRPCClient().GetStateValidators(ctx, "head") if err != nil { @@ -184,7 +184,7 @@ func (t *Task) loadChainState(ctx context.Context) (map[phase0.ValidatorIndex]*v } func (t *Task) generateBlsChange(ctx context.Context, accountIdx uint64, validators map[phase0.ValidatorIndex]*v1.Validator) error { - clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + clientPool := t.ctx.Scheduler.GetServices().ClientPool() validatorKeyPath := fmt.Sprintf("m/12381/3600/%d/0/0", accountIdx) validatorPrivkey, err := util.PrivateKeyFromSeedAndPath(t.withdrSeed, validatorKeyPath) diff --git a/pkg/coordinator/tasks/generate_child_wallet/task.go b/pkg/coordinator/tasks/generate_child_wallet/task.go index b1c2392..abff457 100644 --- a/pkg/coordinator/tasks/generate_child_wallet/task.go +++ b/pkg/coordinator/tasks/generate_child_wallet/task.go @@ -89,7 +89,7 @@ func (t *Task) LoadConfig() error { return err } - t.wallet, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletByPrivkey(privKey) + t.wallet, err = t.ctx.Scheduler.GetServices().WalletManager().GetWalletByPrivkey(privKey) if err != nil { return fmt.Errorf("cannot initialize wallet: %w", err) } @@ -112,7 +112,7 @@ func (t *Task) Execute(ctx context.Context) error { walletSeed = t.randStringBytes(20) } - walletPool, err := t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletPoolByPrivkey(t.wallet.GetPrivateKey(), 1, walletSeed) + walletPool, err := t.ctx.Scheduler.GetServices().WalletManager().GetWalletPoolByPrivkey(t.wallet.GetPrivateKey(), 1, walletSeed) if err != nil { return err } diff --git a/pkg/coordinator/tasks/generate_deposits/task.go b/pkg/coordinator/tasks/generate_deposits/task.go index 6643975..1a19ccc 100644 --- a/pkg/coordinator/tasks/generate_deposits/task.go +++ b/pkg/coordinator/tasks/generate_deposits/task.go @@ -136,7 +136,7 @@ func (t *Task) Execute(ctx context.Context) error { var subscription *consensus.Subscription[*consensus.Block] if t.config.LimitPerSlot > 0 { - subscription = t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) + subscription = t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) defer subscription.Unsubscribe() } @@ -285,7 +285,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadChainState(ctx context.Context) (map[phase0.ValidatorIndex]*v1.Validator, error) { - client := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) validators, err := client.GetRPCClient().GetStateValidators(ctx, "head") if err != nil { @@ -296,7 +296,7 @@ func (t *Task) loadChainState(ctx context.Context) (map[phase0.ValidatorIndex]*v } func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, validators map[phase0.ValidatorIndex]*v1.Validator, onConfirm func(tx *ethtypes.Transaction, receipt *ethtypes.Receipt)) (*common.BLSPubkey, *ethtypes.Transaction, error) { - clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + clientPool := t.ctx.Scheduler.GetServices().ClientPool() validatorKeyPath := fmt.Sprintf("m/12381/3600/%d/0/0", accountIdx) withdrAccPath := fmt.Sprintf("m/12381/3600/%d/0", accountIdx) @@ -375,7 +375,7 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, validator return nil, nil, fmt.Errorf("cannot create bound instance of DepositContract: %w", err) } - wallet, err := t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletByPrivkey(t.walletPrivKey) + wallet, err := t.ctx.Scheduler.GetServices().WalletManager().GetWalletByPrivkey(t.walletPrivKey) if err != nil { return nil, nil, fmt.Errorf("cannot initialize wallet: %w", err) } diff --git a/pkg/coordinator/tasks/generate_eoa_transactions/task.go b/pkg/coordinator/tasks/generate_eoa_transactions/task.go index 3aacfef..193deb7 100644 --- a/pkg/coordinator/tasks/generate_eoa_transactions/task.go +++ b/pkg/coordinator/tasks/generate_eoa_transactions/task.go @@ -101,12 +101,12 @@ func (t *Task) LoadConfig() error { } if config.ChildWallets == 0 { - t.wallet, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletByPrivkey(privKey) + t.wallet, err = t.ctx.Scheduler.GetServices().WalletManager().GetWalletByPrivkey(privKey) if err != nil { return fmt.Errorf("cannot initialize wallet: %w", err) } } else { - t.walletPool, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletPoolByPrivkey(privKey, config.ChildWallets, config.WalletSeed) + t.walletPool, err = t.ctx.Scheduler.GetServices().WalletManager().GetWalletPoolByPrivkey(privKey, config.ChildWallets, config.WalletSeed) if err != nil { return fmt.Errorf("cannot initialize wallet pool: %w", err) } @@ -161,7 +161,7 @@ func (t *Task) Execute(ctx context.Context) error { var subscription *execution.Subscription[*execution.Block] if t.config.LimitPerBlock > 0 { - subscription = t.ctx.Scheduler.GetCoordinator().ClientPool().GetExecutionPool().GetBlockCache().SubscribeBlockEvent(10) + subscription = t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().SubscribeBlockEvent(10) defer subscription.Unsubscribe() } @@ -299,7 +299,7 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c } } else { txObj = ðtypes.DynamicFeeTx{ - ChainID: t.ctx.Scheduler.GetCoordinator().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), + ChainID: t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), Nonce: nonce, GasTipCap: t.config.TipCap, GasFeeCap: t.config.FeeCap, @@ -318,7 +318,7 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c var clients []*execution.Client - clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + clientPool := t.ctx.Scheduler.GetServices().ClientPool() if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { clients = clientPool.GetExecutionPool().GetReadyEndpoints() diff --git a/pkg/coordinator/tasks/generate_exits/task.go b/pkg/coordinator/tasks/generate_exits/task.go index 1ff535c..4400951 100644 --- a/pkg/coordinator/tasks/generate_exits/task.go +++ b/pkg/coordinator/tasks/generate_exits/task.go @@ -114,7 +114,7 @@ func (t *Task) Execute(ctx context.Context) error { var subscription *consensus.Subscription[*consensus.Block] if t.config.LimitPerSlot > 0 { - subscription = t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) + subscription = t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) defer subscription.Unsubscribe() } @@ -165,7 +165,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadChainState(ctx context.Context) (*phase0.Fork, map[phase0.ValidatorIndex]*v1.Validator, error) { - client := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) fork, err := client.GetRPCClient().GetForkState(ctx, "head") if err != nil { @@ -211,7 +211,7 @@ func (t *Task) generateVoluntaryExit(ctx context.Context, accountIdx uint64, for // select client var client *consensus.Client - clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + clientPool := t.ctx.Scheduler.GetServices().ClientPool() if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) } else { diff --git a/pkg/coordinator/tasks/generate_slashings/task.go b/pkg/coordinator/tasks/generate_slashings/task.go index b1c4121..2d26628 100644 --- a/pkg/coordinator/tasks/generate_slashings/task.go +++ b/pkg/coordinator/tasks/generate_slashings/task.go @@ -116,7 +116,7 @@ func (t *Task) Execute(ctx context.Context) error { var subscription *consensus.Subscription[*consensus.Block] if t.config.LimitPerSlot > 0 { - subscription = t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) + subscription = t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetBlockCache().SubscribeBlockEvent(10) defer subscription.Unsubscribe() } @@ -167,7 +167,7 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadChainState(ctx context.Context) (map[phase0.ValidatorIndex]*v1.Validator, *phase0.Fork, error) { - client := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) validators, err := client.GetRPCClient().GetStateValidators(ctx, "head") if err != nil { @@ -183,7 +183,7 @@ func (t *Task) loadChainState(ctx context.Context) (map[phase0.ValidatorIndex]*v } func (t *Task) generateSlashing(ctx context.Context, accountIdx uint64, validators map[phase0.ValidatorIndex]*v1.Validator, forkState *phase0.Fork) error { - clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + clientPool := t.ctx.Scheduler.GetServices().ClientPool() validatorKeyPath := fmt.Sprintf("m/12381/3600/%d/0/0", accountIdx) validatorPrivkey, err := util.PrivateKeyFromSeedAndPath(t.withdrSeed, validatorKeyPath) @@ -278,7 +278,7 @@ func (t *Task) generateSurroundAttesterSlashing(validatorIndex uint64, validator // different target, different source // source1 < source 2 // target 1 > target 2 - clPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + clPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() slot, epoch, _ := clPool.GetBlockCache().GetWallclock().Now() if epoch.Number() < 4 { @@ -374,7 +374,7 @@ func (t *Task) generateSurroundAttesterSlashing(validatorIndex uint64, validator } func (t *Task) generateProposerSlashing(validatorIndex uint64, validatorKey *e2types.BLSPrivateKey, forkState *phase0.Fork) (*phase0.ProposerSlashing, error) { - clPool := t.ctx.Scheduler.GetCoordinator().ClientPool().GetConsensusPool() + clPool := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool() genesis := clPool.GetBlockCache().GetGenesis() slot, _, _ := clPool.GetBlockCache().GetWallclock().Now() diff --git a/pkg/coordinator/tasks/generate_transaction/task.go b/pkg/coordinator/tasks/generate_transaction/task.go index 383110d..b110831 100644 --- a/pkg/coordinator/tasks/generate_transaction/task.go +++ b/pkg/coordinator/tasks/generate_transaction/task.go @@ -102,7 +102,7 @@ func (t *Task) LoadConfig() error { return err } - t.wallet, err = t.ctx.Scheduler.GetCoordinator().WalletManager().GetWalletByPrivkey(privKey) + t.wallet, err = t.ctx.Scheduler.GetServices().WalletManager().GetWalletByPrivkey(privKey) if err != nil { return fmt.Errorf("cannot initialize wallet: %w", err) } @@ -141,7 +141,7 @@ func (t *Task) Execute(ctx context.Context) error { var clients []*execution.Client - clientPool := t.ctx.Scheduler.GetCoordinator().ClientPool() + clientPool := t.ctx.Scheduler.GetServices().ClientPool() if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { clients = clientPool.GetExecutionPool().GetReadyEndpoints() @@ -330,7 +330,7 @@ func (t *Task) generateTransaction(ctx context.Context) (*ethtypes.Transaction, } default: txObj = ðtypes.DynamicFeeTx{ - ChainID: t.ctx.Scheduler.GetCoordinator().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), + ChainID: t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), Nonce: nonce, GasTipCap: t.config.TipCap, GasFeeCap: t.config.FeeCap, diff --git a/pkg/coordinator/test/test.go b/pkg/coordinator/test/test.go index 93be609..28e8278 100644 --- a/pkg/coordinator/test/test.go +++ b/pkg/coordinator/test/test.go @@ -5,14 +5,15 @@ import ( "fmt" "time" + "github.com/ethpandaops/assertoor/pkg/coordinator/scheduler" "github.com/ethpandaops/assertoor/pkg/coordinator/types" "github.com/sirupsen/logrus" ) type Test struct { name string - taskScheduler *TaskScheduler - log logrus.FieldLogger + taskScheduler *scheduler.TaskScheduler + logger logrus.FieldLogger config *Config status types.TestStatus @@ -24,7 +25,7 @@ type Test struct { func CreateTest(coordinator types.Coordinator, config *Config, variables types.Variables) (types.Test, error) { test := &Test{ name: config.Name, - log: coordinator.Logger().WithField("component", "test").WithField("test", config.Name), + logger: coordinator.Logger().WithField("component", "test").WithField("test", config.Name), config: config, status: types.TestStatusPending, } @@ -44,7 +45,7 @@ func CreateTest(coordinator types.Coordinator, config *Config, variables types.V testVars.CopyVars(variables, config.ConfigVars) // parse tasks - test.taskScheduler = NewTaskScheduler(test.log, coordinator, testVars) + test.taskScheduler = scheduler.NewTaskScheduler(test.logger, coordinator, testVars) for i := range config.Tasks { taskOptions, err := test.taskScheduler.ParseTaskOptions(&config.Tasks[i]) if err != nil { @@ -94,7 +95,7 @@ func (t *Test) Status() types.TestStatus { } func (t *Test) Logger() logrus.FieldLogger { - return t.log + return t.logger } func (t *Test) Validate() error { @@ -128,17 +129,17 @@ func (t *Test) Run(ctx context.Context) error { }() // run test tasks - t.log.WithField("timeout", t.timeout.String()).Info("starting test") + t.logger.WithField("timeout", t.timeout.String()).Info("starting test") err := t.taskScheduler.RunTasks(ctx, t.timeout) if err != nil { - t.log.Info("test failed!") + t.logger.Info("test failed!") t.status = types.TestStatusFailure return err } - t.log.Info("test completed!") + t.logger.Info("test completed!") t.status = types.TestStatusSuccess return nil diff --git a/pkg/coordinator/types/coordinator.go b/pkg/coordinator/types/coordinator.go index 1c14fd4..01998cb 100644 --- a/pkg/coordinator/types/coordinator.go +++ b/pkg/coordinator/types/coordinator.go @@ -14,6 +14,5 @@ type Coordinator interface { ClientPool() *clients.ClientPool WalletManager() *wallet.Manager ValidatorNames() *names.ValidatorNames - NewVariables(parentScope Variables) Variables GetTests() []Test } diff --git a/pkg/coordinator/types/taskctx.go b/pkg/coordinator/types/scheduler.go similarity index 53% rename from pkg/coordinator/types/taskctx.go rename to pkg/coordinator/types/scheduler.go index ee3f90c..cbf39e2 100644 --- a/pkg/coordinator/types/taskctx.go +++ b/pkg/coordinator/types/scheduler.go @@ -2,14 +2,15 @@ package types import ( "context" - "time" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients" "github.com/ethpandaops/assertoor/pkg/coordinator/helper" - "github.com/ethpandaops/assertoor/pkg/coordinator/logger" + "github.com/ethpandaops/assertoor/pkg/coordinator/names" + "github.com/ethpandaops/assertoor/pkg/coordinator/wallet" ) type TaskScheduler interface { - GetCoordinator() Coordinator + GetServices() TaskServices ParseTaskOptions(rawtask *helper.RawMessage) (*TaskOptions, error) ExecuteTask(ctx context.Context, task Task, taskWatchFn func(ctx context.Context, cancelFn context.CancelFunc, task Task)) error WatchTaskPass(ctx context.Context, cancelFn context.CancelFunc, task Task) @@ -22,23 +23,8 @@ type TaskScheduler interface { GetTaskResultUpdateChan(task Task, oldResult TaskResult) <-chan bool } -type TaskStatus struct { - Index uint64 - ParentIndex uint64 - IsStarted bool - IsRunning bool - StartTime time.Time - StopTime time.Time - Result TaskResult - Error error - Logger *logger.LogScope -} - -type TaskContext struct { - Scheduler TaskScheduler - Index uint64 - Vars Variables - Logger *logger.LogScope - NewTask func(options *TaskOptions, variables Variables) (Task, error) - SetResult func(result TaskResult) +type TaskServices interface { + ClientPool() *clients.ClientPool + WalletManager() *wallet.Manager + ValidatorNames() *names.ValidatorNames } diff --git a/pkg/coordinator/types/task.go b/pkg/coordinator/types/task.go index 22de6b0..fe7c9c0 100644 --- a/pkg/coordinator/types/task.go +++ b/pkg/coordinator/types/task.go @@ -6,6 +6,7 @@ import ( "github.com/ethpandaops/assertoor/pkg/coordinator/helper" "github.com/ethpandaops/assertoor/pkg/coordinator/human-duration" + "github.com/ethpandaops/assertoor/pkg/coordinator/logger" "github.com/sirupsen/logrus" ) @@ -49,3 +50,24 @@ type Task interface { LoadConfig() error Execute(ctx context.Context) error } + +type TaskStatus struct { + Index uint64 + ParentIndex uint64 + IsStarted bool + IsRunning bool + StartTime time.Time + StopTime time.Time + Result TaskResult + Error error + Logger *logger.LogScope +} + +type TaskContext struct { + Scheduler TaskScheduler + Index uint64 + Vars Variables + Logger *logger.LogScope + NewTask func(options *TaskOptions, variables Variables) (Task, error) + SetResult func(result TaskResult) +}