-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(client/v2): broadcast logic #22282
Changes from all commits
cb00e03
2025987
015eab4
39b5126
49598dd
8656f1f
b33e5a2
c13fe79
4bd1fbb
bee57fa
b0094f3
71e1c4a
dc9e1f4
ea3ffb8
cb472db
4ecd77f
00fe5a3
86b8d58
c840773
5b3db1e
0c3855d
f86767d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package broadcast | ||
|
||
import "context" | ||
|
||
// Broadcaster defines an interface for broadcasting transactions to the consensus engine. | ||
type Broadcaster interface { | ||
// Broadcast sends a transaction to the network and returns the result. | ||
// | ||
// It returns a byte slice containing the formatted result that will be | ||
// passed to the output writer, and an error if the broadcast failed. | ||
Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) | ||
|
||
// Consensus returns the consensus engine identifier for this Broadcaster. | ||
Consensus() string | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,197 @@ | ||||||||||||||||||
package comet | ||||||||||||||||||
|
||||||||||||||||||
import ( | ||||||||||||||||||
"context" | ||||||||||||||||||
"encoding/json" | ||||||||||||||||||
"errors" | ||||||||||||||||||
"fmt" | ||||||||||||||||||
"strings" | ||||||||||||||||||
|
||||||||||||||||||
"github.com/cometbft/cometbft/mempool" | ||||||||||||||||||
rpcclient "github.com/cometbft/cometbft/rpc/client" | ||||||||||||||||||
rpchttp "github.com/cometbft/cometbft/rpc/client/http" | ||||||||||||||||||
coretypes "github.com/cometbft/cometbft/rpc/core/types" | ||||||||||||||||||
cmttypes "github.com/cometbft/cometbft/types" | ||||||||||||||||||
|
||||||||||||||||||
apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1" | ||||||||||||||||||
"cosmossdk.io/client/v2/broadcast" | ||||||||||||||||||
|
||||||||||||||||||
"github.com/cosmos/cosmos-sdk/codec" | ||||||||||||||||||
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
const ( | ||||||||||||||||||
// BroadcastSync defines a tx broadcasting mode where the client waits for | ||||||||||||||||||
// a CheckTx execution response only. | ||||||||||||||||||
BroadcastSync = "sync" | ||||||||||||||||||
// BroadcastAsync defines a tx broadcasting mode where the client returns | ||||||||||||||||||
// immediately. | ||||||||||||||||||
BroadcastAsync = "async" | ||||||||||||||||||
|
||||||||||||||||||
// cometBftConsensus is the identifier for the CometBFT consensus engine. | ||||||||||||||||||
cometBFTConsensus = "comet" | ||||||||||||||||||
) | ||||||||||||||||||
|
||||||||||||||||||
// CometRPC defines the interface of a CometBFT RPC client needed for | ||||||||||||||||||
// queries and transaction handling. | ||||||||||||||||||
type CometRPC interface { | ||||||||||||||||||
rpcclient.ABCIClient | ||||||||||||||||||
|
||||||||||||||||||
Validators(ctx context.Context, height *int64, page, perPage *int) (*coretypes.ResultValidators, error) | ||||||||||||||||||
Status(context.Context) (*coretypes.ResultStatus, error) | ||||||||||||||||||
Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error) | ||||||||||||||||||
BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error) | ||||||||||||||||||
BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error) | ||||||||||||||||||
BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) | ||||||||||||||||||
Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error) | ||||||||||||||||||
Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error) | ||||||||||||||||||
TxSearch( | ||||||||||||||||||
ctx context.Context, | ||||||||||||||||||
query string, | ||||||||||||||||||
prove bool, | ||||||||||||||||||
page, perPage *int, | ||||||||||||||||||
orderBy string, | ||||||||||||||||||
) (*coretypes.ResultTxSearch, error) | ||||||||||||||||||
BlockSearch( | ||||||||||||||||||
ctx context.Context, | ||||||||||||||||||
query string, | ||||||||||||||||||
page, perPage *int, | ||||||||||||||||||
orderBy string, | ||||||||||||||||||
) (*coretypes.ResultBlockSearch, error) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
var _ broadcast.Broadcaster = &CometBFTBroadcaster{} | ||||||||||||||||||
|
||||||||||||||||||
// CometBFTBroadcaster implements the Broadcaster interface for CometBFT consensus engine. | ||||||||||||||||||
type CometBFTBroadcaster struct { | ||||||||||||||||||
rpcClient CometRPC | ||||||||||||||||||
mode string | ||||||||||||||||||
cdc codec.JSONCodec | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// NewCometBFTBroadcaster creates a new CometBFTBroadcaster. | ||||||||||||||||||
func NewCometBFTBroadcaster(rpcURL, mode string, cdc codec.JSONCodec) (*CometBFTBroadcaster, error) { | ||||||||||||||||||
if cdc == nil { | ||||||||||||||||||
return nil, errors.New("codec can't be nil") | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if mode == "" { | ||||||||||||||||||
mode = BroadcastSync | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
rpcClient, err := rpchttp.New(rpcURL) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return nil, fmt.Errorf("failed to create CometBft RPC client: %w", err) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
return &CometBFTBroadcaster{ | ||||||||||||||||||
rpcClient: rpcClient, | ||||||||||||||||||
mode: mode, | ||||||||||||||||||
cdc: cdc, | ||||||||||||||||||
}, nil | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// Consensus returns the consensus engine name used by the broadcaster. | ||||||||||||||||||
// It always returns "comet" for CometBFTBroadcaster. | ||||||||||||||||||
func (c *CometBFTBroadcaster) Consensus() string { | ||||||||||||||||||
return cometBFTConsensus | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// Broadcast sends a transaction to the network and returns the result. | ||||||||||||||||||
// returns a byte slice containing the JSON-encoded result and an error if the broadcast failed. | ||||||||||||||||||
func (c *CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) { | ||||||||||||||||||
if c.cdc == nil { | ||||||||||||||||||
return []byte{}, fmt.Errorf("JSON codec is not initialized") | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
var broadcastFunc func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error) | ||||||||||||||||||
switch c.mode { | ||||||||||||||||||
case BroadcastSync: | ||||||||||||||||||
broadcastFunc = c.rpcClient.BroadcastTxSync | ||||||||||||||||||
case BroadcastAsync: | ||||||||||||||||||
broadcastFunc = c.rpcClient.BroadcastTxAsync | ||||||||||||||||||
default: | ||||||||||||||||||
return []byte{}, fmt.Errorf("unknown broadcast mode: %s", c.mode) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
res, err := c.broadcast(ctx, txBytes, broadcastFunc) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return []byte{}, err | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
return c.cdc.MarshalJSON(res) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// broadcast sends a transaction to the CometBFT network using the provided function. | ||||||||||||||||||
func (c *CometBFTBroadcaster) broadcast(ctx context.Context, txBytes []byte, | ||||||||||||||||||
fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error), | ||||||||||||||||||
) (*apiacbci.TxResponse, error) { | ||||||||||||||||||
bResult, err := fn(ctx, txBytes) | ||||||||||||||||||
if errRes := checkCometError(err, txBytes); errRes != nil { | ||||||||||||||||||
return errRes, nil | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
return newResponseFormatBroadcastTx(bResult), err | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// checkCometError checks for errors returned by the CometBFT network and returns an appropriate TxResponse. | ||||||||||||||||||
// It extracts error information and constructs a TxResponse with the error details. | ||||||||||||||||||
func checkCometError(err error, tx cmttypes.Tx) *apiacbci.TxResponse { | ||||||||||||||||||
if err == nil { | ||||||||||||||||||
return nil | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
errStr := strings.ToLower(err.Error()) | ||||||||||||||||||
txHash := fmt.Sprintf("%X", tx.Hash()) | ||||||||||||||||||
|
||||||||||||||||||
switch { | ||||||||||||||||||
case strings.Contains(errStr, strings.ToLower(mempool.ErrTxInCache.Error())): | ||||||||||||||||||
return &apiacbci.TxResponse{ | ||||||||||||||||||
Code: sdkerrors.ErrTxInMempoolCache.ABCICode(), | ||||||||||||||||||
Codespace: sdkerrors.ErrTxInMempoolCache.Codespace(), | ||||||||||||||||||
Txhash: txHash, | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
case strings.Contains(errStr, "mempool is full"): | ||||||||||||||||||
return &apiacbci.TxResponse{ | ||||||||||||||||||
Code: sdkerrors.ErrMempoolIsFull.ABCICode(), | ||||||||||||||||||
Codespace: sdkerrors.ErrMempoolIsFull.Codespace(), | ||||||||||||||||||
Txhash: txHash, | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
case strings.Contains(errStr, "tx too large"): | ||||||||||||||||||
return &apiacbci.TxResponse{ | ||||||||||||||||||
Code: sdkerrors.ErrTxTooLarge.ABCICode(), | ||||||||||||||||||
Codespace: sdkerrors.ErrTxTooLarge.Codespace(), | ||||||||||||||||||
Txhash: txHash, | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
default: | ||||||||||||||||||
return nil | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// newResponseFormatBroadcastTx returns a TxResponse given a ResultBroadcastTx from cometbft | ||||||||||||||||||
func newResponseFormatBroadcastTx(res *coretypes.ResultBroadcastTx) *apiacbci.TxResponse { | ||||||||||||||||||
if res == nil { | ||||||||||||||||||
return nil | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
parsedLogs, _ := parseABCILogs(res.Log) | ||||||||||||||||||
|
||||||||||||||||||
Comment on lines
+180
to
+181
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider handling log parsing errors. The unmarshal error from - parsedLogs, _ := parseABCILogs(res.Log)
+ parsedLogs, err := parseABCILogs(res.Log)
+ if err != nil {
+ // Either log the error or include it in the response
+ parsedLogs = []*apiacbci.ABCIMessageLog{{
+ Log: fmt.Sprintf("failed to parse logs: %v", err),
+ }}
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||
return &apiacbci.TxResponse{ | ||||||||||||||||||
Code: res.Code, | ||||||||||||||||||
Codespace: res.Codespace, | ||||||||||||||||||
Data: res.Data.String(), | ||||||||||||||||||
RawLog: res.Log, | ||||||||||||||||||
Logs: parsedLogs, | ||||||||||||||||||
Txhash: res.Hash.String(), | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// parseABCILogs attempts to parse a stringified ABCI tx log into a slice of | ||||||||||||||||||
// ABCIMessageLog types. It returns an error upon JSON decoding failure. | ||||||||||||||||||
func parseABCILogs(logs string) (res []*apiacbci.ABCIMessageLog, err error) { | ||||||||||||||||||
err = json.Unmarshal([]byte(logs), &res) | ||||||||||||||||||
return res, err | ||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
package comet | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
|
||
"github.com/cometbft/cometbft/mempool" | ||
coretypes "github.com/cometbft/cometbft/rpc/core/types" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/mock/gomock" | ||
|
||
apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1" | ||
mockrpc "cosmossdk.io/client/v2/broadcast/comet/testutil" | ||
|
||
"github.com/cosmos/cosmos-sdk/codec" | ||
"github.com/cosmos/cosmos-sdk/codec/testutil" | ||
) | ||
|
||
var cdc = testutil.CodecOptions{}.NewCodec() | ||
|
||
func TestNewCometBftBroadcaster(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
cdc codec.JSONCodec | ||
mode string | ||
want *CometBFTBroadcaster | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "constructor", | ||
mode: BroadcastSync, | ||
cdc: cdc, | ||
want: &CometBFTBroadcaster{ | ||
mode: BroadcastSync, | ||
cdc: cdc, | ||
}, | ||
}, | ||
{ | ||
name: "nil codec", | ||
mode: BroadcastSync, | ||
cdc: nil, | ||
wantErr: true, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
got, err := NewCometBFTBroadcaster("localhost:26657", tt.mode, tt.cdc) | ||
if tt.wantErr { | ||
require.Error(t, err) | ||
require.Nil(t, got) | ||
} else { | ||
require.Equal(t, got.mode, tt.want.mode) | ||
require.Equal(t, got.cdc, tt.want.cdc) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestCometBftBroadcaster_Broadcast(t *testing.T) { | ||
ctrl := gomock.NewController(t) | ||
cometMock := mockrpc.NewMockCometRPC(ctrl) | ||
c := CometBFTBroadcaster{ | ||
rpcClient: cometMock, | ||
mode: BroadcastSync, | ||
cdc: cdc, | ||
} | ||
tests := []struct { | ||
name string | ||
mode string | ||
setupMock func(*mockrpc.MockCometRPC) | ||
wantErr bool | ||
}{ | ||
{ | ||
name: "sync", | ||
mode: BroadcastSync, | ||
setupMock: func(m *mockrpc.MockCometRPC) { | ||
m.EXPECT().BroadcastTxSync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{ | ||
Code: 0, | ||
Data: []byte{}, | ||
Log: "", | ||
Codespace: "", | ||
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "), | ||
}, nil) | ||
}, | ||
}, | ||
{ | ||
name: "async", | ||
mode: BroadcastAsync, | ||
setupMock: func(m *mockrpc.MockCometRPC) { | ||
m.EXPECT().BroadcastTxAsync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{ | ||
Code: 0, | ||
Data: []byte{}, | ||
Log: "", | ||
Codespace: "", | ||
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "), | ||
}, nil) | ||
}, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
c.mode = tt.mode | ||
tt.setupMock(cometMock) | ||
got, err := c.Broadcast(context.Background(), []byte{}) | ||
if tt.wantErr { | ||
require.Error(t, err) | ||
} else { | ||
require.NotNil(t, got) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func Test_checkCometError(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
err error | ||
want *apiacbci.TxResponse | ||
}{ | ||
{ | ||
name: "tx already in cache", | ||
err: errors.New("tx already exists in cache"), | ||
want: &apiacbci.TxResponse{ | ||
Code: 19, | ||
}, | ||
}, | ||
{ | ||
name: "mempool is full", | ||
err: mempool.ErrMempoolIsFull{}, | ||
want: &apiacbci.TxResponse{ | ||
Code: 20, | ||
}, | ||
}, | ||
{ | ||
name: "tx too large", | ||
err: mempool.ErrTxTooLarge{}, | ||
want: &apiacbci.TxResponse{ | ||
Code: 21, | ||
}, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
got := checkCometError(tt.err, []byte{}) | ||
require.Equal(t, got.Code, tt.want.Code) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix inconsistent error handling in broadcast method.
The method returns both the formatted response and the error on line 134, but the error was already handled in the previous if statement. This could lead to returning a successful response with an error.
📝 Committable suggestion