Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove dependency of postgres trigger for broadcaster #11109

Merged
merged 13 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 0 additions & 40 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/label"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)

Expand Down Expand Up @@ -123,8 +122,6 @@ type Broadcaster[
// when Start is called
autoSyncSequence bool

txInsertListener pg.Subscription
eventBroadcaster pg.EventBroadcaster
processUnstartedTxsImpl ProcessUnstartedTxs[ADDR]

ks txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
Expand All @@ -143,8 +140,6 @@ type Broadcaster[
initSync sync.Mutex
isStarted bool

parseAddr func(string) (ADDR, error)

sequenceLock sync.RWMutex
nextSequenceMap map[ADDR]SEQ
generateNextSequence types.GenerateNextSequenceFunc[SEQ]
Expand All @@ -166,13 +161,11 @@ func NewBroadcaster[
txConfig txmgrtypes.BroadcasterTransactionsConfig,
listenerConfig txmgrtypes.BroadcasterListenerConfig,
keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
eventBroadcaster pg.EventBroadcaster,
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
logger logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
parseAddress func(string) (ADDR, error),
generateNextSequence types.GenerateNextSequenceFunc[SEQ],
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
logger = logger.Named("Broadcaster")
Expand All @@ -187,11 +180,9 @@ func NewBroadcaster[
feeConfig: feeConfig,
txConfig: txConfig,
listenerConfig: listenerConfig,
eventBroadcaster: eventBroadcaster,
ks: keystore,
checkerFactory: checkerFactory,
autoSyncSequence: autoSyncSequence,
parseAddr: parseAddress,
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
Expand All @@ -215,10 +206,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
return errors.New("Broadcaster is already started")
}
var err error
eb.txInsertListener, err = eb.eventBroadcaster.Subscribe(pg.ChannelInsertOnTx, "")
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "Broadcaster could not start")
}
eb.enabledAddresses, err = eb.ks.EnabledAddressesForChain(eb.chainID)
if err != nil {
return errors.Wrap(err, "Broadcaster: failed to load EnabledAddressesForChain")
Expand All @@ -239,9 +226,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
go eb.monitorTxs(addr, triggerCh)
}

eb.wg.Add(1)
go eb.txInsertTriggerer()

eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap, err = eb.loadNextSequenceMap(eb.enabledAddresses)
Expand All @@ -266,9 +250,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) clos
if !eb.isStarted {
return errors.Wrap(utils.ErrAlreadyStopped, "Broadcaster is not started")
}
if eb.txInsertListener != nil {
eb.txInsertListener.Close()
}
close(eb.chStop)
eb.wg.Wait()
eb.isStarted = false
Expand Down Expand Up @@ -305,27 +286,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig
}
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) txInsertTriggerer() {
defer eb.wg.Done()
for {
select {
case ev, ok := <-eb.txInsertListener.Events():
if !ok {
eb.logger.Debug("txInsertListener channel closed, exiting trigger loop")
return
}
addr, err := eb.parseAddr(ev.Payload)
if err != nil {
eb.logger.Errorw("failed to parse address in trigger", "err", err)
continue
}
eb.Trigger(addr)
case <-eb.chStop:
return
}
}
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(addresses []ADDR) (map[ADDR]SEQ, error) {
ctx, cancel := eb.chStop.NewCtx()
Expand Down
50 changes: 33 additions & 17 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/google/uuid"
pkgerrors "github.com/pkg/errors"
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved

"github.com/smartcontractkit/chainlink-relay/pkg/services"

Expand Down Expand Up @@ -166,14 +165,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
return b.StartOnce("Txm", func() error {
var ms services.MultiStart
if err := ms.Start(ctx, b.broadcaster); err != nil {
return pkgerrors.Wrap(err, "Txm: Broadcaster failed to start")
return fmt.Errorf("Txm: Broadcaster failed to start: %w", err)
}
if err := ms.Start(ctx, b.confirmer); err != nil {
return pkgerrors.Wrap(err, "Txm: Confirmer failed to start")
return fmt.Errorf("Txm: Confirmer failed to start: %w", err)
}

if err := ms.Start(ctx, b.txAttemptBuilder); err != nil {
return pkgerrors.Wrap(err, "Txm: Estimator failed to start")
return fmt.Errorf("Txm: Estimator failed to start: %w", err)
}

b.wg.Add(1)
Expand All @@ -190,7 +189,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx

if b.fwdMgr != nil {
if err := ms.Start(ctx, b.fwdMgr); err != nil {
return pkgerrors.Wrap(err, "Txm: ForwarderManager failed to start")
return fmt.Errorf("Txm: ForwarderManager failed to start: %w", err)
}
}

Expand Down Expand Up @@ -223,8 +222,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Reset(addr
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon(addr ADDR) (err error) {
ctx, cancel := utils.StopChan(b.chStop).NewCtx()
defer cancel()
err = b.txStore.Abandon(ctx, b.chainID, addr)
return pkgerrors.Wrapf(err, "abandon failed to update txes for key %s", addr.String())
if err = b.txStore.Abandon(ctx, b.chainID, addr); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a common go pattern to do the assignment and comparison this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I typically am a fan of doing it this way to prevent accidental overshadowing of the error... i have witnessed bugs in the past where the error does not get fully handled and gets overwritten accidentally. scoping it this way reduces the chances of that happening slightly

return fmt.Errorf("abandon failed to update txes for key %s: %w", addr.String(), err)
}
return nil
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (merr error) {
Expand All @@ -241,14 +242,14 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (m
}
if b.fwdMgr != nil {
if err := b.fwdMgr.Close(); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Txm: failed to stop ForwarderManager"))
merr = errors.Join(merr, fmt.Errorf("Txm: failed to stop ForwarderManager: %w", err))
}
}

b.wg.Wait()

if err := b.txAttemptBuilder.Close(); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Txm: failed to close TxAttemptBuilder"))
merr = errors.Join(merr, fmt.Errorf("Txm: failed to close TxAttemptBuilder: %w", err))
}

return nil
Expand Down Expand Up @@ -444,7 +445,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
existingTx, err = b.txStore.FindTxWithIdempotencyKey(ctx, *txRequest.IdempotencyKey, b.chainID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return tx, pkgerrors.Wrap(err, "Failed to search for transaction with IdempotencyKey")
return tx, fmt.Errorf("Failed to search for transaction with IdempotencyKey: %w", err)
}
if existingTx != nil {
b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey)
Expand All @@ -470,31 +471,40 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
txRequest.ToAddress = txRequest.ForwarderAddress
txRequest.EncodedPayload = fwdPayload
} else {
b.logger.Errorf("Failed to use forwarder set upstream: %s", fwdErr.Error())
b.logger.Errorf("Failed to use forwarder set upstream: %w", fwdErr.Error())
}
}

err = b.txStore.CheckTxQueueCapacity(ctx, txRequest.FromAddress, b.txConfig.MaxQueued(), b.chainID)
if err != nil {
return tx, pkgerrors.Wrap(err, "Txm#CreateTransaction")
return tx, fmt.Errorf("Txm#CreateTransaction: %w", err)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID)
return
if err != nil {
return tx, err
}

// Trigger the Broadcaster to check for new transaction
b.broadcaster.Trigger(txRequest.FromAddress)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love to see how simple this now is!


return tx, nil
}

// Calls forwarderMgr to get a proper forwarder for a given EOA.
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForwarderForEOA(eoa ADDR) (forwarder ADDR, err error) {
if !b.txConfig.ForwardersEnabled() {
return forwarder, pkgerrors.Errorf("Forwarding is not enabled, to enable set Transactions.ForwardersEnabled =true")
return forwarder, fmt.Errorf("forwarding is not enabled, to enable set Transactions.ForwardersEnabled =true")
}
forwarder, err = b.fwdMgr.ForwarderFor(eoa)
return
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(addr ADDR) error {
err := b.keyStore.CheckEnabled(addr, b.chainID)
return pkgerrors.Wrapf(err, "cannot send transaction from %s on chain ID %s", addr, b.chainID.String())
if err := b.keyStore.CheckEnabled(addr, b.chainID); err != nil {
return fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", addr, b.chainID.String(), err)
}
return nil
}

// SendNativeToken creates a transaction that transfers the given value of native tokens
Expand All @@ -511,7 +521,13 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative
Strategy: NewSendEveryStrategy(),
}
etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
return etx, pkgerrors.Wrap(err, "SendNativeToken failed to insert tx")
if err != nil {
return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err)
}

// Trigger the Broadcaster to check for new transaction
b.broadcaster.Trigger(from)
return etx, nil
}

type NullTxManager[
Expand Down
1 change: 0 additions & 1 deletion core/chains/evm/evm_txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func newEvmTxm(
lggr,
logPoller,
opts.KeyStore,
opts.EventBroadcaster,
estimator)
} else {
txm = opts.GenTxManager(chainID)
Expand Down
Loading
Loading