Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SendMsgs, delete Send, and introduce GetMsgResult #106

Merged
merged 15 commits into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 124 additions & 11 deletions chains/tendermint/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package tendermint

import (
"context"
"encoding/hex"
"fmt"
"os"
"path"
"strings"
"sync"
"time"

"cosmossdk.io/errors"
"github.com/avast/retry-go"
"github.com/cometbft/cometbft/libs/log"
rpcclient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
libclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
sdkCtx "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
Expand Down Expand Up @@ -160,22 +164,41 @@ func (c *Chain) Timestamp(height ibcexported.Height) (time.Time, error) {
}
}

func (c *Chain) AverageBlockTime() time.Duration {
return time.Duration(c.config.AverageBlockTimeMsec) * time.Millisecond
}

// RegisterMsgEventListener registers a given EventListener to the chain
func (c *Chain) RegisterMsgEventListener(listener core.MsgEventListener) {
c.msgEventListener = listener
}

func (c *Chain) sendMsgs(msgs []sdk.Msg) (*sdk.TxResponse, error) {
// broadcast tx
res, _, err := c.rawSendMsgs(msgs)
if err != nil {
return nil, err
} else if res.Code != 0 {
// CheckTx failed
return nil, fmt.Errorf("CheckTx failed: %v", errors.ABCIError(res.Codespace, res.Code, res.RawLog))
}

// wait for tx being committed
if resTx, err := c.waitForCommit(res.TxHash); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is GetMsgResult not called here to avoid unnecessary log parsing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

return nil, err
} else if resTx.TxResult.IsErr() {
// DeliverTx failed
return nil, fmt.Errorf("DeliverTx failed: %v", errors.ABCIError(res.Codespace, res.Code, res.RawLog))
}
if res.Code == 0 && c.msgEventListener != nil {

// call msgEventListener if needed
if c.msgEventListener != nil {
if err := c.msgEventListener.OnSentMsg(msgs); err != nil {
c.logger.Error("failed to OnSendMsg call", "msgs", msgs, "err", err)
return res, nil
}
}

return res, nil
}

Expand Down Expand Up @@ -236,6 +259,55 @@ func (c *Chain) rawSendMsgs(msgs []sdk.Msg) (*sdk.TxResponse, bool, error) {
return res, true, nil
}

func (c *Chain) waitForCommit(txHash string) (*coretypes.ResultTx, error) {
var resTx *coretypes.ResultTx

retryInterval := c.AverageBlockTime()
maxRetry := uint(c.config.MaxRetryForCommit)

if err := retry.Do(func() error {
var err error
var recoverable bool
resTx, recoverable, err = c.rawQueryTx(txHash)
if err != nil {
if recoverable {
return err
} else {
return retry.Unrecoverable(err)
}
}
return nil
}, retry.Attempts(maxRetry), retry.Delay(retryInterval), rtyErr); err != nil {
return resTx, fmt.Errorf("failed to make sure that tx is committed: %v", err)
}

return resTx, nil
}

// rawQueryTx returns a tx of which hash equals to `hexTxHash`.
// If the RPC is successful but the tx is not found, this returns nil with nil error.
func (c *Chain) rawQueryTx(hexTxHash string) (*coretypes.ResultTx, bool, error) {
ctx := c.CLIContext(0)

txHash, err := hex.DecodeString(hexTxHash)
if err != nil {
return nil, false, fmt.Errorf("failed to decode the hex string of tx hash: %v", err)
}

node, err := ctx.GetNode()
if err != nil {
return nil, false, fmt.Errorf("failed to get node: %v", err)
}

resTx, err := node.Tx(context.TODO(), txHash, false)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if the node enables the transaction indexing like the following:
https://github.com/cosmos/relayer/blob/1bfe06cdec3bdd1e0ba64fafac2b6f39cf495e70/relayer/chains/cosmos/tx.go#L303

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no checks, I think we should write a comment about this assumption.

recoverable := !strings.Contains(err.Error(), "transaction indexing is disabled")
return nil, recoverable, fmt.Errorf("failed to retrieve tx: %v", err)
}

return resTx, false, nil
}

func prepareFactory(clientCtx sdkCtx.Context, txf tx.Factory) (tx.Factory, error) {
from := clientCtx.GetFromAddress()

Expand Down Expand Up @@ -323,25 +395,66 @@ func CalculateGas(
return simRes, uint64(txf.GasAdjustment() * float64(simRes.GasInfo.GasUsed)), nil
}

func (c *Chain) SendMsgs(msgs []sdk.Msg) ([]byte, error) {
func (c *Chain) SendMsgs(msgs []sdk.Msg) ([]core.MsgID, error) {
// Broadcast those bytes
res, err := c.sendMsgs(msgs)
if err != nil {
return nil, err
}
return []byte(res.Logs.String()), nil
var msgIDs []core.MsgID
for msgIndex := range msgs {
msgIDs = append(msgIDs, &MsgID{
txHash: res.TxHash,
msgIndex: uint32(msgIndex),
})
}
return msgIDs, nil
}

func (c *Chain) Send(msgs []sdk.Msg) bool {
res, err := c.sendMsgs(msgs)
if err != nil || res.Code != 0 {
c.LogFailedTx(res, err, msgs)
return false
func (c *Chain) GetMsgResult(id core.MsgID) (core.MsgResult, error) {
msgID, ok := id.(*MsgID)
if !ok {
return nil, fmt.Errorf("unexpected message id type: %T", id)
}

// find tx
resTx, err := c.waitForCommit(msgID.txHash)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Even if resTx.TxResult.IsErr() == true, GetMsgResult returns (msgResult, nil).
At that case, msgResult.Status returns (false, failureReason).

return nil, fmt.Errorf("failed to query tx: %v", err)
}

// check height of the delivered tx
version := clienttypes.ParseChainID(c.ChainID())
height := clienttypes.NewHeight(version, uint64(resTx.Height))

// check if the tx execution succeeded
if resTx.TxResult.IsErr() {
err := errors.ABCIError(resTx.TxResult.Codespace, resTx.TxResult.Code, resTx.TxResult.Log)
txFailureReason := err.Error()
return &MsgResult{
height: height,
txStatus: false,
txFailureReason: txFailureReason,
}, nil
}

// parse the log into ABCI logs
abciLogs, err := sdk.ParseABCILogs(resTx.TxResult.Log)
if err != nil {
return nil, fmt.Errorf("failed to parse ABCI logs: %v", err)
}

// parse the ABCI logs into core.MsgEventLog's
events, err := parseMsgEventLogs(abciLogs, msgID.msgIndex)
if err != nil {
return nil, fmt.Errorf("failed to parse msg event log: %v", err)
}
// NOTE: Add more data to this such as identifiers
c.LogSuccessTx(res, msgs)

return true
return &MsgResult{
height: height,
txStatus: true,
events: events,
}, nil
}

// ------------------------------- //
Expand Down
117 changes: 89 additions & 28 deletions chains/tendermint/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions chains/tendermint/log-chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"
proto "github.com/cosmos/gogoproto/proto"
clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types"
ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported"
)

// LogFailedTx takes the transaction and the messages to create it and logs the appropriate data
Expand Down Expand Up @@ -75,15 +73,6 @@ func (c *Chain) Print(toPrint proto.Message, text, indent bool) error {
return nil
}

// MustGetHeight takes the height inteface and returns the actual height
func MustGetHeight(h ibcexported.Height) uint64 {
height, ok := h.(clienttypes.Height)
if !ok {
panic("height is not an instance of height! wtf")
}
return height.GetRevisionHeight()
}

func getMsgAction(msgs []sdk.Msg) string {
var out string
for i, msg := range msgs {
Expand Down
Loading