From 3cf4cd3526f3992cc9665bea97a59c6362b5e49b Mon Sep 17 00:00:00 2001 From: Bryan White Date: Sat, 14 Dec 2024 12:51:35 +0100 Subject: [PATCH 1/8] wip --- testutil/e2e/app.go | 141 ++++++++++++++++++++++++++++ testutil/e2e/app_test.go | 81 ++++++++++++++++ testutil/e2e/grpc_server.go | 96 +++++++++++++++++++ testutil/e2e/ws_server.go | 181 ++++++++++++++++++++++++++++++++++++ 4 files changed, 499 insertions(+) create mode 100644 testutil/e2e/app.go create mode 100644 testutil/e2e/app_test.go create mode 100644 testutil/e2e/grpc_server.go create mode 100644 testutil/e2e/ws_server.go diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go new file mode 100644 index 000000000..102d25323 --- /dev/null +++ b/testutil/e2e/app.go @@ -0,0 +1,141 @@ +package e2e + +import ( + "context" + "encoding/json" + "net" + "net/http" + "sync" + "testing" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + + "github.com/pokt-network/poktroll/testutil/integration" +) + +// E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing +type E2EApp struct { + *integration.App + grpcServer *grpc.Server + grpcListener net.Listener + wsServer *http.Server + wsListener net.Listener + httpServer *http.Server + httpListener net.Listener + wsUpgrader websocket.Upgrader + wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries + wsConnMutex sync.RWMutex + blockEventChan chan *coretypes.ResultEvent +} + +// NewE2EApp creates a new E2EApp instance with integration.App, gRPC, and WebSocket servers +func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp { + t.Helper() + + // Create the integration app + app := integration.NewCompleteIntegrationApp(t, opts...) + + // Create listeners for gRPC, WebSocket, and HTTP + grpcListener, err := net.Listen("tcp", "localhost:42069") + require.NoError(t, err, "failed to create gRPC listener") + + wsListener, err := net.Listen("tcp", "localhost:6969") + require.NoError(t, err, "failed to create WebSocket listener") + + httpListener, err := net.Listen("tcp", "localhost:42070") + require.NoError(t, err, "failed to create HTTP listener") + + e2eApp := &E2EApp{ + App: app, + grpcListener: grpcListener, + wsListener: wsListener, + httpListener: httpListener, + wsConnections: make(map[*websocket.Conn]map[string]struct{}), + wsUpgrader: websocket.Upgrader{}, + blockEventChan: make(chan *coretypes.ResultEvent, 1), + } + + // Initialize and start gRPC server + e2eApp.grpcServer = newGRPCServer(e2eApp, t) + go func() { + if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { + panic(err) + } + }() + + // Initialize and start WebSocket server + e2eApp.wsServer = newWebSocketServer(e2eApp) + go func() { + if err := e2eApp.wsServer.Serve(wsListener); err != nil && err != http.ErrServerClosed { + panic(err) + } + }() + + // Initialize and start HTTP server + mux := http.NewServeMux() + mux.HandleFunc("/", e2eApp.handleHTTP) + e2eApp.httpServer = &http.Server{Handler: mux} + go func() { + if err := e2eApp.httpServer.Serve(httpListener); err != nil && err != http.ErrServerClosed { + panic(err) + } + }() + + // Start event handling + go e2eApp.handleBlockEvents(t) + + return e2eApp +} + +// Close gracefully shuts down the E2EApp and its servers +func (app *E2EApp) Close() error { + app.grpcServer.GracefulStop() + if err := app.wsServer.Close(); err != nil { + return err + } + if err := app.httpServer.Close(); err != nil { + return err + } + close(app.blockEventChan) + return nil +} + +// GetClientConn returns a gRPC client connection to the E2EApp's gRPC server +func (app *E2EApp) GetClientConn(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext( + ctx, + app.grpcListener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) +} + +// GetWSEndpoint returns the WebSocket endpoint URL +func (app *E2EApp) GetWSEndpoint() string { + return "ws://" + app.wsListener.Addr().String() + "/websocket" +} + +// handleHTTP handles incoming HTTP requests by responding with RPCResponse +func (app *E2EApp) handleHTTP(w http.ResponseWriter, r *http.Request) { + var req rpctypes.RPCRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Process the request - for now just return a basic response + // TODO_IMPROVE: Implement proper CometBFT RPC endpoint handling + response := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + Result: json.RawMessage(`{}`), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go new file mode 100644 index 000000000..515bebf5b --- /dev/null +++ b/testutil/e2e/app_test.go @@ -0,0 +1,81 @@ +package e2e + +import ( + "testing" + + "cosmossdk.io/depinject" + "cosmossdk.io/math" + sdkclient "github.com/cosmos/cosmos-sdk/client" + cosmostx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + cosmostypes "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/app/volatile" + "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/client/query" + "github.com/pokt-network/poktroll/pkg/client/tx" + txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" + "github.com/pokt-network/poktroll/testutil/testclient" + gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" +) + +func TestNewE2EApp(t *testing.T) { + app := NewE2EApp(t) + + blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070") + require.NoError(t, err) + + deps := depinject.Supply(app.QueryHelper(), blockQueryClient) + + sharedQueryClient, err := query.NewSharedQuerier(deps) + require.NoError(t, err) + + sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) + require.NoError(t, err) + + t.Logf("shared params: %+v", sharedParams) + + eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:6969/websocket") + deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) + blockClient, err := block.NewBlockClient(app.GetSdkCtx(), deps) + require.NoError(t, err) + + keyRing := keyring.NewInMemory(app.GetCodec()) + // TODO: add the gateway2 key... + _, err = keyRing.NewAccount( + "gateway2", + "suffer wet jelly furnace cousin flip layer render finish frequent pledge feature economy wink like water disease final erase goat include apple state furnace", + "", + cosmostypes.FullFundraiserPath, + hd.Secp256k1, + ) + require.NoError(t, err) + + flagSet := testclient.NewLocalnetFlagSet(t) + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + + txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) + txContext, err := tx.NewTxContext(deps) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(blockClient, txContext)) + txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("gateway2")) + require.NoError(t, err) + + eitherErr := txClient.SignAndBroadcast( + app.GetSdkCtx(), + gatewaytypes.NewMsgStakeGateway( + "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", + cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewInt(100000000)), + ), + ) + err, errCh := eitherErr.SyncOrAsyncError() + require.NoError(t, err) + require.NoError(t, <-errCh) +} diff --git a/testutil/e2e/grpc_server.go b/testutil/e2e/grpc_server.go new file mode 100644 index 000000000..e5bbfdd76 --- /dev/null +++ b/testutil/e2e/grpc_server.go @@ -0,0 +1,96 @@ +package e2e + +import ( + "context" + "fmt" + "strings" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "google.golang.org/protobuf/proto" + + "github.com/cosmos/cosmos-sdk/baseapp" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +// newGRPCServer creates and configures a new gRPC server for the E2EApp +func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { + grpcServer := grpc.NewServer() + reflection.Register(grpcServer) + + forwarder := &grpcForwarderServer{ + queryHelper: app.QueryHelper(), + app: app, + t: t, + } + + grpcServer.RegisterService(&grpc.ServiceDesc{ + ServiceName: "cosmos.Service", + HandlerType: (*interface{})(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + Metadata: "", + }, forwarder) + + return grpcServer +} + +// grpcForwarderServer implements a generic gRPC service that forwards all queries +// to the queryHelper and messages to the app +type grpcForwarderServer struct { + queryHelper *baseapp.QueryServiceTestHelper + app *E2EApp + t *testing.T +} + +// Invoke implements the grpc.Server interface and forwards all requests appropriately +func (s *grpcForwarderServer) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { + // Determine if this is a query or message based on the method name + if isQuery(method) { + return s.queryHelper.Invoke(ctx, method, args, reply) + } + + // If it's not a query, treat it as a message + msg, ok := args.(sdk.Msg) + if !ok { + return fmt.Errorf("expected sdk.Msg, got %T", args) + } + + // Run the message through the app + msgRes, err := s.app.RunMsg(s.t, msg) + if err != nil { + return err + } + + // Type assert the reply as a proto.Message + protoReply, ok := reply.(proto.Message) + if !ok { + return fmt.Errorf("expected proto.Message, got %T", reply) + } + + // Type assert the response as a proto.Message + protoRes, ok := msgRes.(proto.Message) + if !ok { + return fmt.Errorf("expected proto.Message response, got %T", msgRes) + } + + // Marshal the response to bytes + resBz, err := proto.Marshal(protoRes) + if err != nil { + return fmt.Errorf("failed to marshal response: %w", err) + } + + // Unmarshal into the reply + return proto.Unmarshal(resBz, protoReply) +} + +// NewStream implements the grpc.Server interface but is not used in this implementation +func (s *grpcForwarderServer) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, fmt.Errorf("streaming is not supported") +} + +// isQuery returns true if the method name indicates this is a query request +func isQuery(method string) bool { + return strings.Contains(method, ".Query/") +} diff --git a/testutil/e2e/ws_server.go b/testutil/e2e/ws_server.go new file mode 100644 index 000000000..c21187a28 --- /dev/null +++ b/testutil/e2e/ws_server.go @@ -0,0 +1,181 @@ +package e2e + +import ( + "encoding/json" + "net/http" + "strings" + "testing" + + "github.com/gorilla/websocket" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/tx" +) + +// newWebSocketServer creates and configures a new WebSocket server for the E2EApp +func newWebSocketServer(app *E2EApp) *http.Server { + mux := http.NewServeMux() + mux.HandleFunc("/websocket", app.handleWebSocket) + return &http.Server{Handler: mux} +} + +// handleWebSocket handles incoming WebSocket connections and subscriptions +func (app *E2EApp) handleWebSocket(w http.ResponseWriter, r *http.Request) { + conn, err := app.wsUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + + app.wsConnMutex.Lock() + app.wsConnections[conn] = make(map[string]struct{}) + app.wsConnMutex.Unlock() + + go app.handleWebSocketConnection(conn) +} + +// handleWebSocketConnection handles messages from a specific WebSocket connection +func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { + defer func() { + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + conn.Close() + }() + + for { + _, message, err := conn.ReadMessage() + if err != nil { + return + } + + var req rpctypes.RPCRequest + if err := json.Unmarshal(message, &req); err != nil { + continue + } + + // Handle subscribe/unsubscribe requests + if req.Method == "subscribe" { + var params struct { + Query string `json:"query"` + } + if err := json.Unmarshal(req.Params, ¶ms); err != nil { + continue + } + + app.wsConnMutex.Lock() + app.wsConnections[conn][params.Query] = struct{}{} + app.wsConnMutex.Unlock() + + // Send subscription response + resp := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + // TODO_IN_THIS_COMMIT: generate a mock result... + Result: json.RawMessage("{}"), + } + if err := conn.WriteJSON(resp); err != nil { + return + } + } + } +} + +// handleBlockEvents coordinates block finalization with WebSocket event broadcasting +func (app *E2EApp) handleBlockEvents(t *testing.T) { + for event := range app.blockEventChan { + app.wsConnMutex.RLock() + for conn, queries := range app.wsConnections { + // Check if connection is subscribed to this event type + for query := range queries { + if eventMatchesQuery(event, query) { + // Marshal the event to JSON + eventJSON, err := json.Marshal(event) + if err != nil { + t.Logf("failed to marshal event: %v", err) + continue + } + + response := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: nil, // Events don't have an ID + Result: json.RawMessage(eventJSON), + } + + if err := conn.WriteJSON(response); err != nil { + app.wsConnMutex.RUnlock() + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + app.wsConnMutex.RLock() + continue + } + } + } + } + app.wsConnMutex.RUnlock() + } +} + +// TODO_IN_THIS_COMMIT: also wrap RunMsgs... +// TODO_IN_THIS_COMMIT: godoc... +// Override RunMsg to also emit transaction events via WebSocket +func (app *E2EApp) RunMsg(t *testing.T, msg sdk.Msg) (tx.MsgResponse, error) { + msgRes, err := app.App.RunMsg(t, msg) + if err != nil { + return nil, err + } + + // Create and emit block event with transaction results + blockEvent := createBlockEvent(app.GetSdkCtx(), msgRes) + app.blockEventChan <- blockEvent + + return msgRes, nil +} + +// createBlockEvent creates a CometBFT-compatible event from transaction results +func createBlockEvent(ctx *sdk.Context, msgRes tx.MsgResponse) *coretypes.ResultEvent { + // Convert SDK events to map[string][]string format that CometBFT expects + events := make(map[string][]string) + for _, event := range ctx.EventManager().Events() { + // Each event type becomes a key, and its attributes become the values + for _, attr := range event.Attributes { + if events[event.Type] == nil { + events[event.Type] = make([]string, 0) + } + events[event.Type] = append(events[event.Type], string(attr.Value)) + } + } + + return &coretypes.ResultEvent{ + Query: "tm.event='NewBlock'", + Data: map[string]interface{}{ + "height": ctx.BlockHeight(), + "hash": ctx.BlockHeader().LastBlockId.Hash, + "events": events, + // Add other relevant block and transaction data here as needed + }, + Events: events, + } +} + +//// createTxEvent creates a CometBFT-compatible event from transaction results +//func createTxEvent(tx *coretypes.ResultTx, index int) *coretypes.ResultEvent { +// return &coretypes.ResultEvent{ +// Query: "tm.event='Tx'", +// Data: map[string]interface{}{ +// "height": ctx.BlockHeight(), +// "hash": ctx.BlockHeader().LastBlockId.Hash, +// "events": events, +// // Add other relevant block and transaction data here as needed +// }, +// Events: events, +// } +//} + +// eventMatchesQuery checks if an event matches a subscription query +func eventMatchesQuery(event *coretypes.ResultEvent, query string) bool { + // Basic implementation - should be expanded to handle more complex queries + return strings.Contains(query, event.Query) +} From 3d3e3f4d8b7b94aa6a09b9e21ef5f6d40ee39667 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 9 Jan 2025 15:13:56 +0100 Subject: [PATCH 2/8] wip --- pkg/client/block/block_result.go | 8 + pkg/client/interface.go | 2 +- testutil/e2e/app.go | 67 ++--- testutil/e2e/app_test.go | 186 ++++++++++++-- testutil/e2e/grpc_server.go | 119 +++++++-- testutil/e2e/ws_server.go | 353 ++++++++++++++++++++++++-- testutil/integration/app.go | 8 + testutil/testclient/localnet.go | 11 +- testutil/testclient/testtx/context.go | 18 +- 9 files changed, 665 insertions(+), 107 deletions(-) diff --git a/pkg/client/block/block_result.go b/pkg/client/block/block_result.go index b95a74870..c41175aef 100644 --- a/pkg/client/block/block_result.go +++ b/pkg/client/block/block_result.go @@ -11,6 +11,14 @@ import ( type cometBlockResult coretypes.ResultBlock func (cbr *cometBlockResult) Height() int64 { + + //cdc := codec.NewProtoCodec(codectypes.NewInterfaceRegistry()) cbrPreJSON, err := cdc.MarshalJSON(cbr) + + //cbrJSON, err := json.MarshalIndent(cbrPreJSON, "", " ") + //if err != nil { + // panic(err) + //} + //fmt.Println(string(cbrJSON)) return cbr.Block.Header.Height } diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 365c24b74..e860d0de7 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -1,5 +1,5 @@ //go:generate mockgen -destination=../../testutil/mockclient/events_query_client_mock.go -package=mockclient . Dialer,Connection,EventsQueryClient -//go:generate mockgen -destination=../../testutil/mockclient/block_client_mock.go -package=mockclient . Block,BlockClient +//go:generate mockgen -destination=../../testutil/mockclient/block_client_mock.go -package=mockclient . Block,BlockClient,BlockQueryClient //go:generate mockgen -destination=../../testutil/mockclient/delegation_client_mock.go -package=mockclient . DelegationClient //go:generate mockgen -destination=../../testutil/mockclient/tx_client_mock.go -package=mockclient . TxContext,TxClient //go:generate mockgen -destination=../../testutil/mockclient/supplier_client_mock.go -package=mockclient . SupplierClient diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index 102d25323..2a3c32646 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -2,21 +2,25 @@ package e2e import ( "context" - "encoding/json" + "errors" + "fmt" "net" "net/http" "sync" "testing" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "github.com/cosmos/cosmos-sdk/types/module" "github.com/gorilla/websocket" + "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" coretypes "github.com/cometbft/cometbft/rpc/core/types" - rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" "github.com/pokt-network/poktroll/testutil/integration" + "github.com/pokt-network/poktroll/testutil/testclient" ) // E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing @@ -26,11 +30,9 @@ type E2EApp struct { grpcListener net.Listener wsServer *http.Server wsListener net.Listener - httpServer *http.Server - httpListener net.Listener wsUpgrader websocket.Upgrader - wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries wsConnMutex sync.RWMutex + wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries blockEventChan chan *coretypes.ResultEvent } @@ -38,8 +40,24 @@ type E2EApp struct { func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp { t.Helper() + // Initialize and start gRPC server + creds := insecure.NewCredentials() + grpcServer := grpc.NewServer(grpc.Creds(creds)) + mux := runtime.NewServeMux() + // Create the integration app app := integration.NewCompleteIntegrationApp(t, opts...) + app.RegisterGRPCServer(grpcServer) + //app.RegisterGRPCServer(e2eApp.grpcServer) + + flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + keyRing := keyring.NewInMemory(app.GetCodec()) + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + + for moduleName, mod := range app.GetModuleManager().Modules { + fmt.Printf(">>> %s\n", moduleName) + mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) + } // Create listeners for gRPC, WebSocket, and HTTP grpcListener, err := net.Listen("tcp", "localhost:42069") @@ -48,21 +66,16 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp wsListener, err := net.Listen("tcp", "localhost:6969") require.NoError(t, err, "failed to create WebSocket listener") - httpListener, err := net.Listen("tcp", "localhost:42070") - require.NoError(t, err, "failed to create HTTP listener") - e2eApp := &E2EApp{ App: app, grpcListener: grpcListener, + grpcServer: grpcServer, wsListener: wsListener, - httpListener: httpListener, wsConnections: make(map[*websocket.Conn]map[string]struct{}), wsUpgrader: websocket.Upgrader{}, blockEventChan: make(chan *coretypes.ResultEvent, 1), } - // Initialize and start gRPC server - e2eApp.grpcServer = newGRPCServer(e2eApp, t) go func() { if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { panic(err) @@ -72,17 +85,14 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp // Initialize and start WebSocket server e2eApp.wsServer = newWebSocketServer(e2eApp) go func() { - if err := e2eApp.wsServer.Serve(wsListener); err != nil && err != http.ErrServerClosed { + if err := e2eApp.wsServer.Serve(wsListener); err != nil && errors.Is(err, http.ErrServerClosed) { panic(err) } }() // Initialize and start HTTP server - mux := http.NewServeMux() - mux.HandleFunc("/", e2eApp.handleHTTP) - e2eApp.httpServer = &http.Server{Handler: mux} go func() { - if err := e2eApp.httpServer.Serve(httpListener); err != nil && err != http.ErrServerClosed { + if err := http.ListenAndServe("localhost:42070", mux); err != nil { panic(err) } }() @@ -99,10 +109,9 @@ func (app *E2EApp) Close() error { if err := app.wsServer.Close(); err != nil { return err } - if err := app.httpServer.Close(); err != nil { - return err - } + close(app.blockEventChan) + return nil } @@ -119,23 +128,3 @@ func (app *E2EApp) GetClientConn(ctx context.Context) (*grpc.ClientConn, error) func (app *E2EApp) GetWSEndpoint() string { return "ws://" + app.wsListener.Addr().String() + "/websocket" } - -// handleHTTP handles incoming HTTP requests by responding with RPCResponse -func (app *E2EApp) handleHTTP(w http.ResponseWriter, r *http.Request) { - var req rpctypes.RPCRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - // Process the request - for now just return a basic response - // TODO_IMPROVE: Implement proper CometBFT RPC endpoint handling - response := rpctypes.RPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Result: json.RawMessage(`{}`), - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) -} diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go index 515bebf5b..a2157ec69 100644 --- a/testutil/e2e/app_test.go +++ b/testutil/e2e/app_test.go @@ -1,16 +1,26 @@ package e2e import ( + "context" + "encoding/hex" + "net/http" "testing" + "time" "cosmossdk.io/depinject" "cosmossdk.io/math" - sdkclient "github.com/cosmos/cosmos-sdk/client" + abci "github.com/cometbft/cometbft/abci/types" + cometrpctypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/cometbft/cometbft/types" cosmostx "github.com/cosmos/cosmos-sdk/client/tx" "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" cosmostypes "github.com/cosmos/cosmos-sdk/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/golang/mock/gomock" + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "github.com/pokt-network/poktroll/app/volatile" "github.com/pokt-network/poktroll/pkg/client/block" @@ -18,16 +28,126 @@ import ( "github.com/pokt-network/poktroll/pkg/client/query" "github.com/pokt-network/poktroll/pkg/client/tx" txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" + "github.com/pokt-network/poktroll/testutil/integration" + "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testclient" gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" ) +func TestGRPCServer2(t *testing.T) { + grpcServer := grpc.NewServer() + //gwKeeper := gatewaykeeper.NewKeeper() + //gwSvc := gatewaykeeper.NewMsgServerImpl(gwKeeper) + + app := integration.NewCompleteIntegrationApp(t) + app.RegisterGRPCServer(grpcServer) + + mux := runtime.NewServeMux() + err := http.ListenAndServe(":42070", mux) + require.NoError(t, err) + //gatewaytypes.RegisterMsgServer(grpcServer, gwSvc) + //gatewaytypes.RegisterMsgServer(app.MsgServiceRouter(), gwSvc) + + //gatewaytypes.RegisterQueryHandlerFromEndpoint() + + //reflectionService, err := services.NewReflectionService() + //require.NoError(t, err) + + //desc, err := reflectionService.FileDescriptors(nil, nil) + //require.NoError(t, err) + + //app := integration.NewCompleteIntegrationApp(t) + //grpcServer.RegisterService(desc, app.MsgServiceRouter()) +} + +func TestSanity(t *testing.T) { + app := integration.NewCompleteIntegrationApp(t) + + //app.Query(nil, &authtypes.QueryAccountRequest{ + // Address: "pokt1h04g6njyuv03dhd74a73pyzeadmd8dk7l9tsk8", + //}) + + //app.Query(nil, types2.RequestQuery{ + // Data: nil, + // Path: "", + // Height: 0, + // Prove: false, + //}) + + ctrl := gomock.NewController(t) + blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) + blockQueryClient.EXPECT(). + Block(gomock.Any(), gomock.Any()). + DoAndReturn( + func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { + blockResultMock := &cometrpctypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + Height: 1, + }, + }, + } + return blockResultMock, nil + }, + ).AnyTimes() + deps := depinject.Supply(app.QueryHelper(), blockQueryClient) + sharedClient, err := query.NewSharedQuerier(deps) + require.NoError(t, err) + + params, err := sharedClient.GetParams(app.GetSdkCtx()) + require.NoError(t, err) + + t.Logf("shared params: %+v", params) +} + func TestNewE2EApp(t *testing.T) { + initialHeight := int64(7553) + // TODO_IN_THIS_COMMIT: does this 👆 need to be reconciled with the internal height of app? + app := NewE2EApp(t) - blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070") + keyRing := keyring.NewInMemory(app.GetCodec()) + rec, err := keyRing.NewAccount( + "gateway2", + "suffer wet jelly furnace cousin flip layer render finish frequent pledge feature economy wink like water disease final erase goat include apple state furnace", + "", + cosmostypes.FullFundraiserPath, + hd.Secp256k1, + ) + require.NoError(t, err) + + gateway2Addr, err := rec.GetAddress() + require.NoError(t, err) + + // TODO_IN_THIS_COMMOT: fund gateway2 account. + _, err = app.RunMsg(t, &banktypes.MsgSend{ + FromAddress: app.GetFaucetBech32(), + ToAddress: gateway2Addr.String(), + Amount: cosmostypes.NewCoins(cosmostypes.NewInt64Coin(volatile.DenomuPOKT, 100000000)), + }) require.NoError(t, err) + ctrl := gomock.NewController(t) + blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) + blockQueryClient.EXPECT(). + Block(gomock.Any(), gomock.Any()). + DoAndReturn( + func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { + //time.Sleep(time.Second * 100) + blockResultMock := &cometrpctypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + Height: initialHeight, + }, + }, + } + return blockResultMock, nil + }, + ).AnyTimes() + //blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070") + //blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:26657") + //require.NoError(t, err) + deps := depinject.Supply(app.QueryHelper(), blockQueryClient) sharedQueryClient, err := query.NewSharedQuerier(deps) @@ -39,28 +159,22 @@ func TestNewE2EApp(t *testing.T) { t.Logf("shared params: %+v", sharedParams) eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:6969/websocket") + //eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:26657/websocket") deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) blockClient, err := block.NewBlockClient(app.GetSdkCtx(), deps) require.NoError(t, err) - keyRing := keyring.NewInMemory(app.GetCodec()) - // TODO: add the gateway2 key... - _, err = keyRing.NewAccount( - "gateway2", - "suffer wet jelly furnace cousin flip layer render finish frequent pledge feature economy wink like water disease final erase goat include apple state furnace", - "", - cosmostypes.FullFundraiserPath, - hd.Secp256k1, - ) - require.NoError(t, err) - - flagSet := testclient.NewLocalnetFlagSet(t) + // TODO_IN_THIS_COMMIT: NOT localnet flagset NOR context, should be + // configured to match the E2E app listeners. + flagSet := testclient.NewFlagSet(t, "127.0.0.1:42069") clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) require.NoError(t, err) deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) + + //_, txContext := testtx.NewE2ETxContext(t, keyRing, flagSet) txContext, err := tx.NewTxContext(deps) require.NoError(t, err) @@ -68,14 +182,56 @@ func TestNewE2EApp(t *testing.T) { txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("gateway2")) require.NoError(t, err) + time.Sleep(time.Second * 1) + eitherErr := txClient.SignAndBroadcast( app.GetSdkCtx(), gatewaytypes.NewMsgStakeGateway( "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", - cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewInt(100000000)), + cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewInt(100000001)), ), ) + + // TODO_IN_THIS_COMMIT: signal to the WS server to send another block result event... + //app.NextBlock(t) + err, errCh := eitherErr.SyncOrAsyncError() require.NoError(t, err) require.NoError(t, <-errCh) } + +func TestGRPCServer(t *testing.T) { + app := NewE2EApp(t) + t.Cleanup(func() { + app.Close() + }) + + grpcConn, err := grpc.NewClient("tcp://127.0.0.1:42069", grpc.WithInsecure()) + require.NoError(t, err) + + dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") + require.NoError(t, err) + + req := &abci.RequestQuery{ + Data: dataHex, + Path: "/cosmos.auth.v1beta1.Query/Account", + Height: 0, + Prove: false, + } + res := &abci.ResponseQuery{} + + grpcConn.Invoke(context.Background(), "abci_query", req, res) + + //"method" : "abci_query", + //"params" : { + // "data" : "0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A", + // "height" : "0", + // "path" : "/cosmos.auth.v1beta1.Query/Account", + // "prove" : false + //} + + //"method" : "broadcast_tx_async", + //"params" : { + // "tx" : "CmsKZgohL3Bva3Ryb2xsLmdhdGV3YXkuTXNnU3Rha2VHYXRld2F5EkEKK3Bva3QxNXczZmhmeWMwbHR0djdyNTg1ZTJuY3BmNnQya2w5dWg4cnNueXoSEgoFdXBva3QSCTEwMDAwMDAwMRiGOxJYCk4KRgofL2Nvc21vcy5jcnlwdG8uc2VjcDI1NmsxLlB1YktleRIjCiEDZo2bY9XquUsFljtW/OKWVCDhYFf7NbidN4Y99VQ9438SBAoCCAESBhCqoYLJAhpAw5e7iJN5SpFit3fftxnZY7EDiFqupi7XEL3sUyeV0IBSQv2JZ7Cdu0dCG0yEVgj0xarkPi7dR10pNDL1gcUJxw==" + //} +} diff --git a/testutil/e2e/grpc_server.go b/testutil/e2e/grpc_server.go index e5bbfdd76..c01d4609e 100644 --- a/testutil/e2e/grpc_server.go +++ b/testutil/e2e/grpc_server.go @@ -6,32 +6,16 @@ import ( "strings" "testing" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - "google.golang.org/protobuf/proto" - "github.com/cosmos/cosmos-sdk/baseapp" sdk "github.com/cosmos/cosmos-sdk/types" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" ) // newGRPCServer creates and configures a new gRPC server for the E2EApp func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { grpcServer := grpc.NewServer() - reflection.Register(grpcServer) - - forwarder := &grpcForwarderServer{ - queryHelper: app.QueryHelper(), - app: app, - t: t, - } - - grpcServer.RegisterService(&grpc.ServiceDesc{ - ServiceName: "cosmos.Service", - HandlerType: (*interface{})(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{}, - Metadata: "", - }, forwarder) + app.RegisterGRPCServer(grpcServer) return grpcServer } @@ -94,3 +78,100 @@ func (s *grpcForwarderServer) NewStream(ctx context.Context, desc *grpc.StreamDe func isQuery(method string) bool { return strings.Contains(method, ".Query/") } + +//func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { +// grpcServer := grpc.NewServer() +// reflection.Register(grpcServer) +// +// forwarder := &grpcForwarderServer{ +// app: app, +// t: t, +// queryHelper: app.QueryHelper(), +// msgRouter: app.MsgServiceRouter(), +// msgHandlers: map[string]interface{}{}, +// } +// +// // Forward all gRPC messages through our forwarder +// sd := &grpc.ServiceDesc{ +// ServiceName: "cosmos.Service", +// HandlerType: (*interface{})(nil), +// Methods: []grpc.MethodDesc{ +// { +// MethodName: "HandleMessage", +// Handler: forwarder.handleMessageGeneric, +// }, +// }, +// } +// grpcServer.RegisterService(sd, forwarder) +// +// return grpcServer +//} +// +//type grpcForwarderServer struct { +// app *E2EApp +// t *testing.T +// queryHelper *baseapp.QueryServiceTestHelper +// msgRouter *baseapp.MsgServiceRouter +// msgHandlers map[string]interface{} +//} +// +//func (s *grpcForwarderServer) handleMessageGeneric( +// srv interface{}, +// ctx context.Context, +// dec func(interface{}) error, +// interceptor grpc.UnaryServerInterceptor, +//) (interface{}, error) { +// msg, ok := srv.(sdk.Msg) +// if !ok { +// return nil, fmt.Errorf("invalid message type: %T", srv) +// } +// +// // Use the app's existing message handling infrastructure +// msgRes, err := s.app.RunMsg(s.t, msg) +// if err != nil { +// return nil, err +// } +// +// return msgRes, nil +//} + +//func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { +// grpcServer := grpc.NewServer() +// reflection.Register(grpcServer) +// +// // Register a service handler that forwards to MsgServiceRouter +// msgHandler := &grpcForwarderServer{app: app, t: t} +// serverServiceDesc := &grpc.ServiceDesc{ +// ServiceName: "cosmos.msg.v1.Msg", +// HandlerType: (*interface{})(nil), +// Methods: []grpc.MethodDesc{{ +// MethodName: "HandleMessage", +// Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { +// var msg sdk.Msg +// if err := dec(&msg); err != nil { +// return nil, err +// } +// return msgHandler.app.RunMsg(msgHandler.t, msg) +// }, +// }}, +// } +// grpcServer.RegisterService(serverServiceDesc, msgHandler) +// +// // Set up the gRPC-Gateway mux +// mux := runtime.NewServeMux() +// opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} +// +// // Register all your service handlers with the mux +// if err := gatewaytypes.RegisterMsgHandlerFromEndpoint(context.Background(), mux, app.grpcListener.Addr().String(), opts); err != nil { +// panic(err) +// } +// +// // Start HTTP server with the mux +// go func() { +// if err := http.ListenAndServe(":42070", mux); err != nil { +// panic(err) +// } +// }() +// +// return grpcServer +//} diff --git a/testutil/e2e/ws_server.go b/testutil/e2e/ws_server.go index c21187a28..cb6b10ec4 100644 --- a/testutil/e2e/ws_server.go +++ b/testutil/e2e/ws_server.go @@ -6,12 +6,11 @@ import ( "strings" "testing" - "github.com/gorilla/websocket" - coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/tx" + "github.com/gorilla/websocket" ) // newWebSocketServer creates and configures a new WebSocket server for the E2EApp @@ -35,6 +34,293 @@ func (app *E2EApp) handleWebSocket(w http.ResponseWriter, r *http.Request) { go app.handleWebSocketConnection(conn) } +// TODO_IN_THIS_COMMIT: move +var mockBlockResultJSON = ` +{ + "query" : "tm.event='NewBlock'", + "data" : { + "type" : "tendermint/event/NewBlock", + "value" : { + "block" : { + "header" : { + "version" : { + "block" : "11" + }, + "chain_id" : "poktroll", + "height" : "7554", + "time" : "2025-01-03T14:55:39.944873259Z", + "last_block_id" : { + "hash" : "BC2C6FDDAC8A8A7CF79D4682FC3E76AAFCACB84974FC7C37577322D00D7C7525", + "parts" : { + "total" : 1, + "hash" : "478F75135E06F132E99770C9F6B3D276157532A97252B0D7EDF153E0C8143E80" + } + }, + "last_commit_hash" : "13D79604C6171FCCA01F5F9CA85F03973F4AF6D367E3FED27CAD4B8F87B5A01F", + "data_hash" : "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "validators_hash" : "100E8F1C31B34BBF84E5832CDE10B33EB1D8CA28E5761278A4347EA149CD49ED", + "next_validators_hash" : "100E8F1C31B34BBF84E5832CDE10B33EB1D8CA28E5761278A4347EA149CD49ED", + "consensus_hash" : "048091BC7DDC283F77BFBF91D73C44DA58C3DF8A9CBC867405D8B7F3DAADA22F", + "app_hash" : "BC5335B4EEF06E71C08FC051C89BB965F77B5364E55495E0487294C0AF92C251", + "last_results_hash" : "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "evidence_hash" : "E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855", + "proposer_address" : "294A784968AC4F506E9DF5E672C430B821F1362F" + }, + "data" : { + "txs" : [ ] + }, + "evidence" : { + "evidence" : [ ] + }, + "last_commit" : { + "height" : "7553", + "round" : 0, + "block_id" : { + "hash" : "BC2C6FDDAC8A8A7CF79D4682FC3E76AAFCACB84974FC7C37577322D00D7C7525", + "parts" : { + "total" : 1, + "hash" : "478F75135E06F132E99770C9F6B3D276157532A97252B0D7EDF153E0C8143E80" + } + }, + "signatures" : [ { + "block_id_flag" : 2, + "validator_address" : "294A784968AC4F506E9DF5E672C430B821F1362F", + "timestamp" : "2025-01-03T14:55:39.944873259Z", + "signature" : "Fur3Gg1nLlzpRuN4aBY90Xb/BZVBGl57zjbMkPscycAi8/cGBYub/EHUTQbxUZjchQ0h1hcQlDQNc2Eu1oiiBQ==" + } ] + } + }, + "block_id" : { + "hash" : "5F1522B51BCE44338C1ED0D4BBAC048CA149EA14D2488C2AC730FFE4F206FBC8", + "parts" : { + "total" : 1, + "hash" : "1DF66D12EF879200392F078033C7CE314AA57B1BE38084C20C526AAD42DF90EE" + } + }, + "result_finalize_block" : { + "events" : [ { + "type" : "coin_spent", + "attributes" : [ { + "key" : "spender", + "value" : "pokt1m3h30wlvsf8llruxtpukdvsy0km2kum84hcvmc", + "index" : true + }, { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "coin_received", + "attributes" : [ { + "key" : "receiver", + "value" : "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9", + "index" : true + }, { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "transfer", + "attributes" : [ { + "key" : "recipient", + "value" : "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9", + "index" : true + }, { + "key" : "sender", + "value" : "pokt1m3h30wlvsf8llruxtpukdvsy0km2kum84hcvmc", + "index" : true + }, { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "message", + "attributes" : [ { + "key" : "sender", + "value" : "pokt1m3h30wlvsf8llruxtpukdvsy0km2kum84hcvmc", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "mint", + "attributes" : [ { + "key" : "bonded_ratio", + "value" : "0.000000000000013043", + "index" : true + }, { + "key" : "inflation", + "value" : "0.000000000000000000", + "index" : true + }, { + "key" : "annual_provisions", + "value" : "0.000000000000000000", + "index" : true + }, { + "key" : "amount", + "value" : "0", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "coin_spent", + "attributes" : [ { + "key" : "spender", + "value" : "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9", + "index" : true + }, { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "coin_received", + "attributes" : [ { + "key" : "receiver", + "value" : "pokt1jv65s3grqf6v6jl3dp4t6c9t9rk99cd86emg48", + "index" : true + }, { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "transfer", + "attributes" : [ { + "key" : "recipient", + "value" : "pokt1jv65s3grqf6v6jl3dp4t6c9t9rk99cd86emg48", + "index" : true + }, { + "key" : "sender", + "value" : "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9", + "index" : true + }, { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "message", + "attributes" : [ { + "key" : "sender", + "value" : "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "commission", + "attributes" : [ { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "validator", + "value" : "poktvaloper18kk3aqe2pjz7x7993qp2pjt95ghurra9c5ef0t", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + }, { + "type" : "rewards", + "attributes" : [ { + "key" : "amount", + "value" : "", + "index" : true + }, { + "key" : "validator", + "value" : "poktvaloper18kk3aqe2pjz7x7993qp2pjt95ghurra9c5ef0t", + "index" : true + }, { + "key" : "mode", + "value" : "BeginBlock", + "index" : true + } ] + } ], + "validator_updates" : [ ], + "consensus_param_updates" : { + "block" : { + "max_bytes" : "22020096", + "max_gas" : "-1" + }, + "evidence" : { + "max_age_num_blocks" : "100000", + "max_age_duration" : "172800000000000", + "max_bytes" : "1048576" + }, + "validator" : { + "pub_key_types" : [ "ed25519" ] + }, + "version" : { }, + "abci" : { } + }, + "app_hash" : "kPhqTfc+m5pORs+A7Y/eGTM8w0gdZXFMGW64KLnjGdU=" + } + } + }, + "events" : { + "commission.amount" : [ "" ], + "commission.mode" : [ "BeginBlock" ], + "transfer.sender" : [ "pokt1m3h30wlvsf8llruxtpukdvsy0km2kum84hcvmc", "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9" ], + "transfer.amount" : [ "", "" ], + "message.sender" : [ "pokt1m3h30wlvsf8llruxtpukdvsy0km2kum84hcvmc", "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9" ], + "message.mode" : [ "BeginBlock", "BeginBlock" ], + "mint.bonded_ratio" : [ "0.000000000000013043" ], + "mint.mode" : [ "BeginBlock" ], + "rewards.validator" : [ "poktvaloper18kk3aqe2pjz7x7993qp2pjt95ghurra9c5ef0t" ], + "coin_spent.amount" : [ "", "" ], + "transfer.recipient" : [ "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9", "pokt1jv65s3grqf6v6jl3dp4t6c9t9rk99cd86emg48" ], + "rewards.amount" : [ "" ], + "rewards.mode" : [ "BeginBlock" ], + "coin_spent.spender" : [ "pokt1m3h30wlvsf8llruxtpukdvsy0km2kum84hcvmc", "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9" ], + "coin_received.receiver" : [ "pokt17xpfvakm2amg962yls6f84z3kell8c5ldlu5h9", "pokt1jv65s3grqf6v6jl3dp4t6c9t9rk99cd86emg48" ], + "coin_received.mode" : [ "BeginBlock", "BeginBlock" ], + "transfer.mode" : [ "BeginBlock", "BeginBlock" ], + "mint.inflation" : [ "0.000000000000000000" ], + "mint.amount" : [ "0" ], + "coin_spent.mode" : [ "BeginBlock", "BeginBlock" ], + "coin_received.amount" : [ "", "" ], + "mint.annual_provisions" : [ "0.000000000000000000" ], + "commission.validator" : [ "poktvaloper18kk3aqe2pjz7x7993qp2pjt95ghurra9c5ef0t" ], + "tm.event" : [ "NewBlock" ] + } +}` + // handleWebSocketConnection handles messages from a specific WebSocket connection func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { defer func() { @@ -51,7 +337,7 @@ func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { } var req rpctypes.RPCRequest - if err := json.Unmarshal(message, &req); err != nil { + if err = json.Unmarshal(message, &req); err != nil { continue } @@ -60,7 +346,7 @@ func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { var params struct { Query string `json:"query"` } - if err := json.Unmarshal(req.Params, ¶ms); err != nil { + if err = json.Unmarshal(req.Params, ¶ms); err != nil { continue } @@ -73,11 +359,19 @@ func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { JSONRPC: "2.0", ID: req.ID, // TODO_IN_THIS_COMMIT: generate a mock result... + //Result: json.RawMessage(mockBlockResultJSON), + // DEV_NOTE: Query subscription responses are initially empty; data is sent as subsequent events occur. Result: json.RawMessage("{}"), } - if err := conn.WriteJSON(resp); err != nil { - return + + //time.Sleep(time.Second * 4) + + if err = conn.WriteJSON(resp); err != nil { + panic(err) } + //if err = conn.WriteJSON(resp); err != nil { + // return + //} } } } @@ -89,29 +383,32 @@ func (app *E2EApp) handleBlockEvents(t *testing.T) { for conn, queries := range app.wsConnections { // Check if connection is subscribed to this event type for query := range queries { - if eventMatchesQuery(event, query) { - // Marshal the event to JSON - eventJSON, err := json.Marshal(event) - if err != nil { - t.Logf("failed to marshal event: %v", err) - continue - } - - response := rpctypes.RPCResponse{ - JSONRPC: "2.0", - ID: nil, // Events don't have an ID - Result: json.RawMessage(eventJSON), - } - - if err := conn.WriteJSON(response); err != nil { - app.wsConnMutex.RUnlock() - app.wsConnMutex.Lock() - delete(app.wsConnections, conn) - app.wsConnMutex.Unlock() - app.wsConnMutex.RLock() - continue - } + _ = query + _ = event + //if eventMatchesQuery(event, query) { + // // Marshal the event to JSON + // eventJSON, err := json.Marshal(event) + // if err != nil { + // t.Logf("failed to marshal event: %v", err) + // continue + // } + + response := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: nil, // Events don't have an ID + // TODO_IN_THIS_COMMIT: make this dynamic! + Result: json.RawMessage(mockBlockResultJSON), + } + + if err := conn.WriteJSON(response); err != nil { + app.wsConnMutex.RUnlock() + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + app.wsConnMutex.RLock() + continue } + //} } } app.wsConnMutex.RUnlock() diff --git a/testutil/integration/app.go b/testutil/integration/app.go index 715c0135b..f6d88b598 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -819,6 +819,9 @@ func (app *App) NextBlock(t *testing.T) { Time: app.sdkCtx.BlockTime(), // Randomize the proposer address for each block. ProposerAddress: sample.ConsAddress().Bytes(), + DecidedLastCommit: cmtabcitypes.CommitInfo{ + Votes: []cmtabcitypes.VoteInfo{{}}, + }, }) require.NoError(t, err) @@ -963,6 +966,11 @@ func (app *App) setupDefaultActorsState( app.NextBlock(t) } +// TODO_IN_THIS_COMMIT: godoc... +func (app *App) GetModuleManager() module.Manager { + return app.moduleManager +} + // fundAccount mints and sends amountUpokt tokens to the given recipientAddr. // // TODO_IMPROVE: Eliminate usage of and remove this function in favor of diff --git a/testutil/testclient/localnet.go b/testutil/testclient/localnet.go index 354a1dd28..72c05554e 100644 --- a/testutil/testclient/localnet.go +++ b/testutil/testclient/localnet.go @@ -100,10 +100,19 @@ func NewLocalnetClientCtx(t gocuke.TestingT, flagSet *pflag.FlagSet) *client.Con func NewLocalnetFlagSet(t gocuke.TestingT) *pflag.FlagSet { t.Helper() + return NewFlagSet(t, CometLocalTCPURL) +} + +// TODO_IN_THIS_COMMIT: godoc... +func NewFlagSet(t gocuke.TestingT, cometTCPURL string) *pflag.FlagSet { + t.Helper() + mockFlagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) // TODO_IMPROVE: It would be nice if the value could be set correctly based // on whether the test using it is running in tilt or not. - mockFlagSet.String(flags.FlagNode, CometLocalTCPURL, "use localnet poktrolld node") + mockFlagSet.Bool(flags.FlagGRPCInsecure, true, "use insecure grpc connection") + mockFlagSet.String(flags.FlagGRPC, cometTCPURL, "use localnet poktrolld node") + //mockFlagSet.String(flags.FlagNode, cometTCPURL, "use localnet poktrolld node") mockFlagSet.String(flags.FlagHome, "", "use localnet poktrolld node") mockFlagSet.String(flags.FlagKeyringBackend, "test", "use test keyring") mockFlagSet.String(flags.FlagChainID, app.Name, "use poktroll chain-id") diff --git a/testutil/testclient/testtx/context.go b/testutil/testclient/testtx/context.go index 449e88b8f..7fc9bbd2a 100644 --- a/testutil/testclient/testtx/context.go +++ b/testutil/testclient/testtx/context.go @@ -15,6 +15,7 @@ import ( cosmoskeyring "github.com/cosmos/cosmos-sdk/crypto/keyring" cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/golang/mock/gomock" + "github.com/spf13/pflag" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/client" @@ -245,10 +246,19 @@ func NewAnyTimesTxTxContext( ) (*mockclient.MockTxContext, client.TxContext) { t.Helper() - var ( - ctrl = gomock.NewController(t) - flagSet = testclient.NewLocalnetFlagSet(t) - ) + flagSet := testclient.NewLocalnetFlagSet(t) + return NewE2ETxContext(t, keyring, flagSet) +} + +// TODO_IN_THIS_COMMIT: godoc... +func NewE2ETxContext( + t *testing.T, + keyring cosmoskeyring.Keyring, + flagSet *pflag.FlagSet, +) (*mockclient.MockTxContext, client.TxContext) { + t.Helper() + + var ctrl = gomock.NewController(t) // intercept #GetAccountNumberSequence() call to mock response and prevent actual query accountRetrieverMock := mockclient.NewMockAccountRetriever(ctrl) From 9a5fc9d78704c8ed01d618fa29b50affcb35df3e Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Fri, 10 Jan 2025 16:51:22 +0100 Subject: [PATCH 3/8] fix: Working grpc server --- testutil/e2e/app.go | 2 +- testutil/e2e/app_test.go | 26 +++++++++++++++----------- testutil/integration/app.go | 11 +++++------ 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index 2a3c32646..6336c59fb 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -50,7 +50,7 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp app.RegisterGRPCServer(grpcServer) //app.RegisterGRPCServer(e2eApp.grpcServer) - flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42069") keyRing := keyring.NewInMemory(app.GetCodec()) clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go index a2157ec69..99b8ef054 100644 --- a/testutil/e2e/app_test.go +++ b/testutil/e2e/app_test.go @@ -2,17 +2,16 @@ package e2e import ( "context" - "encoding/hex" "net/http" "testing" "time" "cosmossdk.io/depinject" "cosmossdk.io/math" - abci "github.com/cometbft/cometbft/abci/types" cometrpctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cometbft/cometbft/types" cosmostx "github.com/cosmos/cosmos-sdk/client/tx" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" cosmostypes "github.com/cosmos/cosmos-sdk/types" @@ -21,6 +20,7 @@ import ( "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/pokt-network/poktroll/app/volatile" "github.com/pokt-network/poktroll/pkg/client/block" @@ -206,21 +206,25 @@ func TestGRPCServer(t *testing.T) { app.Close() }) - grpcConn, err := grpc.NewClient("tcp://127.0.0.1:42069", grpc.WithInsecure()) + creds := insecure.NewCredentials() + grpcConn, err := grpc.NewClient("127.0.0.1:42069", grpc.WithTransportCredentials(creds)) require.NoError(t, err) - dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") + //dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") require.NoError(t, err) - req := &abci.RequestQuery{ - Data: dataHex, - Path: "/cosmos.auth.v1beta1.Query/Account", - Height: 0, - Prove: false, + req := gatewaytypes.QueryGetGatewayRequest{ + Address: "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", } - res := &abci.ResponseQuery{} - grpcConn.Invoke(context.Background(), "abci_query", req, res) + // convert the request to a proto message + anyReq, err := codectypes.NewAnyWithValue(&req) + require.NoError(t, err) + + res := new(gatewaytypes.QueryGetGatewayResponse) + + err = grpcConn.Invoke(context.Background(), "/poktroll.gateway.Query/Gateway", anyReq, res) + require.NoError(t, err) //"method" : "abci_query", //"params" : { diff --git a/testutil/integration/app.go b/testutil/integration/app.go index f6d88b598..aae7e6071 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -103,7 +103,7 @@ type App struct { txCfg client.TxConfig authority sdk.AccAddress moduleManager module.Manager - queryHelper *baseapp.QueryServiceTestHelper + queryHelper *baseapp.GRPCQueryRouter keyRing keyring.Keyring ringClient crypto.RingClient preGeneratedAccts *testkeyring.PreGeneratedAccountIterator @@ -139,7 +139,7 @@ func NewIntegrationApp( modules map[string]appmodule.AppModule, keys map[string]*storetypes.KVStoreKey, msgRouter *baseapp.MsgServiceRouter, - queryHelper *baseapp.QueryServiceTestHelper, + queryHelper *baseapp.GRPCQueryRouter, opts ...IntegrationAppOptionFn, ) *App { t.Helper() @@ -513,8 +513,8 @@ func NewCompleteIntegrationApp(t *testing.T, opts ...IntegrationAppOptionFn) *Ap ) // Prepare the message & query routers - msgRouter := baseapp.NewMsgServiceRouter() - queryHelper := baseapp.NewQueryServerTestHelper(sdkCtx, registry) + msgRouter := bApp.MsgServiceRouter() + queryHelper := bApp.GRPCQueryRouter() // Prepare the authz keeper and module authzKeeper := authzkeeper.NewKeeper( @@ -661,8 +661,7 @@ func (app *App) GetPreGeneratedAccounts() *testkeyring.PreGeneratedAccountIterat // QueryHelper returns the query helper used by the application that can be // used to submit queries to the application. -func (app *App) QueryHelper() *baseapp.QueryServiceTestHelper { - app.queryHelper.Ctx = *app.sdkCtx +func (app *App) QueryHelper() *baseapp.GRPCQueryRouter { return app.queryHelper } From 1714822a1646658e9333fec1645646762337fc6e Mon Sep 17 00:00:00 2001 From: Bryan White Date: Fri, 10 Jan 2025 17:45:03 +0100 Subject: [PATCH 4/8] wip --- testutil/e2e/app.go | 40 ++++++++++++++++++-------- testutil/e2e/app_test.go | 62 ++++++++++++++++++++++++++++++++++------ 2 files changed, 81 insertions(+), 21 deletions(-) diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index 6336c59fb..a6b753c4c 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -9,7 +9,6 @@ import ( "sync" "testing" - "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/cosmos/cosmos-sdk/types/module" "github.com/gorilla/websocket" "github.com/grpc-ecosystem/grpc-gateway/runtime" @@ -20,7 +19,6 @@ import ( coretypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/pokt-network/poktroll/testutil/integration" - "github.com/pokt-network/poktroll/testutil/testclient" ) // E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing @@ -50,20 +48,32 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp app.RegisterGRPCServer(grpcServer) //app.RegisterGRPCServer(e2eApp.grpcServer) - flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42069") - keyRing := keyring.NewInMemory(app.GetCodec()) - clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + //flagSet := testclient.NewFlagSet(t, "127.0.0.1:42069") + //keyRing := keyring.NewInMemory(app.GetCodec()) + //clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + //authtx.RegisterTxService( + // //app.GRPCQueryRouter(), + // grpcServer, + // clientCtx, + // app.Simulate, + // app.GetRegistry(), + //) + //authtx.RegisterGRPCGatewayRoutes(clientCtx, mux) + + configurator := module.NewConfigurator(app.GetCodec(), app.MsgServiceRouter(), app.GRPCQueryRouter()) for moduleName, mod := range app.GetModuleManager().Modules { - fmt.Printf(">>> %s\n", moduleName) - mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) + fmt.Printf(">>> start: %s\n", moduleName) + //mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) + mod.(module.HasServices).RegisterServices(configurator) + fmt.Printf(">>> done: %s\n", moduleName) } // Create listeners for gRPC, WebSocket, and HTTP - grpcListener, err := net.Listen("tcp", "localhost:42069") + grpcListener, err := net.Listen("tcp", "127.0.0.1:42069") require.NoError(t, err, "failed to create gRPC listener") - wsListener, err := net.Listen("tcp", "localhost:6969") + wsListener, err := net.Listen("tcp", "127.0.0.1:6969") require.NoError(t, err, "failed to create WebSocket listener") e2eApp := &E2EApp{ @@ -78,7 +88,9 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp go func() { if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { - panic(err) + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } } }() @@ -86,14 +98,18 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp e2eApp.wsServer = newWebSocketServer(e2eApp) go func() { if err := e2eApp.wsServer.Serve(wsListener); err != nil && errors.Is(err, http.ErrServerClosed) { - panic(err) + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } } }() // Initialize and start HTTP server go func() { if err := http.ListenAndServe("localhost:42070", mux); err != nil { - panic(err) + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } } }() diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go index 99b8ef054..e8ceadda0 100644 --- a/testutil/e2e/app_test.go +++ b/testutil/e2e/app_test.go @@ -2,16 +2,17 @@ package e2e import ( "context" + "encoding/hex" "net/http" "testing" "time" "cosmossdk.io/depinject" "cosmossdk.io/math" + abci "github.com/cometbft/cometbft/abci/types" cometrpctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cometbft/cometbft/types" cosmostx "github.com/cosmos/cosmos-sdk/client/tx" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" cosmostypes "github.com/cosmos/cosmos-sdk/types" @@ -148,7 +149,10 @@ func TestNewE2EApp(t *testing.T) { //blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:26657") //require.NoError(t, err) - deps := depinject.Supply(app.QueryHelper(), blockQueryClient) + creds := insecure.NewCredentials() + grpcConn, err := grpc.NewClient("127.0.0.1:42069", grpc.WithTransportCredentials(creds)) + require.NoError(t, err) + deps := depinject.Supply(grpcConn, blockQueryClient) sharedQueryClient, err := query.NewSharedQuerier(deps) require.NoError(t, err) @@ -213,18 +217,58 @@ func TestGRPCServer(t *testing.T) { //dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") require.NoError(t, err) - req := gatewaytypes.QueryGetGatewayRequest{ - Address: "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", - } + //req := gatewaytypes.QueryGetGatewayRequest{ + // Address: "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", + //} + res := &abci.ResponseQuery{} + + //grpcConn.Invoke(context.Background(), "abci_query", req, res) + + ctrl := gomock.NewController(t) + blockQueryClient := mockclient.NewMockBlockQueryClient(ctrl) + blockQueryClient.EXPECT(). + Block(gomock.Any(), gomock.Any()). + DoAndReturn( + func(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error) { + //time.Sleep(time.Second * 100) + blockResultMock := &cometrpctypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + Height: 1, + }, + }, + } + return blockResultMock, nil + }, + ).AnyTimes() + + deps := depinject.Supply(grpcConn, blockQueryClient) + + sharedQueryClient, err := query.NewSharedQuerier(deps) + require.NoError(t, err) - // convert the request to a proto message - anyReq, err := codectypes.NewAnyWithValue(&req) + //res := new(gatewaytypes.QueryGetGatewayResponse) + // + //err = grpcConn.Invoke(context.Background(), "/poktroll.gateway.Query/Gateway", anyReq, res) + dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") require.NoError(t, err) - res := new(gatewaytypes.QueryGetGatewayResponse) + req := &abci.RequestQuery{ + Data: dataHex, + Path: "/cosmos.auth.v1beta1.Query/Account", + Height: 0, + Prove: false, + } + + err = grpcConn.Invoke(context.Background(), "abci_query", req, res) + //err = grpcConn.Invoke(context.Background(), "abci_query", req, res) + require.NoError(t, err) - err = grpcConn.Invoke(context.Background(), "/poktroll.gateway.Query/Gateway", anyReq, res) require.NoError(t, err) + sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) + require.NoError(t, err) + + t.Logf("shared params: %+v", sharedParams) //"method" : "abci_query", //"params" : { From f9091a3422e8a9b9260b5e2fb5b677fe71e2578c Mon Sep 17 00:00:00 2001 From: Bryan White Date: Mon, 13 Jan 2025 12:21:19 +0100 Subject: [PATCH 5/8] wip --- testutil/e2e/app.go | 32 ++++++--- testutil/e2e/app_test.go | 61 ++++++++++++---- testutil/e2e/comet.go | 123 ++++++++++++++++++++++++++++++++ testutil/integration/app.go | 57 +++++++++------ testutil/integration/options.go | 10 +++ testutil/testclient/localnet.go | 6 +- 6 files changed, 237 insertions(+), 52 deletions(-) create mode 100644 testutil/e2e/comet.go diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index a6b753c4c..383f18986 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -3,12 +3,12 @@ package e2e import ( "context" "errors" - "fmt" "net" "net/http" "sync" "testing" + "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/cosmos/cosmos-sdk/types/module" "github.com/gorilla/websocket" "github.com/grpc-ecosystem/grpc-gateway/runtime" @@ -19,6 +19,7 @@ import ( coretypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/pokt-network/poktroll/testutil/integration" + "github.com/pokt-network/poktroll/testutil/testclient" ) // E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing @@ -43,14 +44,27 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp grpcServer := grpc.NewServer(grpc.Creds(creds)) mux := runtime.NewServeMux() + abciQueryPattern, err := runtime.NewPattern( + 1, + []int{}, + []string{""}, + "", + ) + require.NoError(t, err) + + // Register the handler with the mux + mux.Handle(http.MethodPost, abciQueryPattern, handleABCIQuery) + // Create the integration app + opts = append(opts, integration.WithGRPCServer(grpcServer)) app := integration.NewCompleteIntegrationApp(t, opts...) app.RegisterGRPCServer(grpcServer) + //app.RegisterGRPCServer(app.MsgServiceRouter()) //app.RegisterGRPCServer(e2eApp.grpcServer) - //flagSet := testclient.NewFlagSet(t, "127.0.0.1:42069") - //keyRing := keyring.NewInMemory(app.GetCodec()) - //clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + keyRing := keyring.NewInMemory(app.GetCodec()) + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) //authtx.RegisterTxService( // //app.GRPCQueryRouter(), @@ -61,12 +75,8 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp //) //authtx.RegisterGRPCGatewayRoutes(clientCtx, mux) - configurator := module.NewConfigurator(app.GetCodec(), app.MsgServiceRouter(), app.GRPCQueryRouter()) - for moduleName, mod := range app.GetModuleManager().Modules { - fmt.Printf(">>> start: %s\n", moduleName) - //mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) - mod.(module.HasServices).RegisterServices(configurator) - fmt.Printf(">>> done: %s\n", moduleName) + for _, mod := range app.GetModuleManager().Modules { + mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) } // Create listeners for gRPC, WebSocket, and HTTP @@ -106,7 +116,7 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp // Initialize and start HTTP server go func() { - if err := http.ListenAndServe("localhost:42070", mux); err != nil { + if err := http.ListenAndServe("127.0.0.1:42070", mux); err != nil { if !errors.Is(err, http.ErrServerClosed) { panic(err) } diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go index e8ceadda0..518059c07 100644 --- a/testutil/e2e/app_test.go +++ b/testutil/e2e/app_test.go @@ -1,15 +1,15 @@ package e2e import ( + "bytes" "context" - "encoding/hex" + "io" "net/http" "testing" "time" "cosmossdk.io/depinject" "cosmossdk.io/math" - abci "github.com/cometbft/cometbft/abci/types" cometrpctypes "github.com/cometbft/cometbft/rpc/core/types" "github.com/cometbft/cometbft/types" cosmostx "github.com/cosmos/cosmos-sdk/client/tx" @@ -170,7 +170,7 @@ func TestNewE2EApp(t *testing.T) { // TODO_IN_THIS_COMMIT: NOT localnet flagset NOR context, should be // configured to match the E2E app listeners. - flagSet := testclient.NewFlagSet(t, "127.0.0.1:42069") + flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) @@ -220,7 +220,7 @@ func TestGRPCServer(t *testing.T) { //req := gatewaytypes.QueryGetGatewayRequest{ // Address: "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", //} - res := &abci.ResponseQuery{} + //res := &abci.ResponseQuery{} //grpcConn.Invoke(context.Background(), "abci_query", req, res) @@ -250,19 +250,19 @@ func TestGRPCServer(t *testing.T) { //res := new(gatewaytypes.QueryGetGatewayResponse) // //err = grpcConn.Invoke(context.Background(), "/poktroll.gateway.Query/Gateway", anyReq, res) - dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") - require.NoError(t, err) - - req := &abci.RequestQuery{ - Data: dataHex, - Path: "/cosmos.auth.v1beta1.Query/Account", - Height: 0, - Prove: false, - } + //dataHex, err := hex.DecodeString("0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A") + //require.NoError(t, err) - err = grpcConn.Invoke(context.Background(), "abci_query", req, res) + //req := &abci.RequestQuery{ + // Data: dataHex, + // Path: "/cosmos.auth.v1beta1.Query/Account", + // Height: 0, + // Prove: false, + //} + // //err = grpcConn.Invoke(context.Background(), "abci_query", req, res) - require.NoError(t, err) + ////err = grpcConn.Invoke(context.Background(), "abci_query", req, res) + //require.NoError(t, err) require.NoError(t, err) sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) @@ -283,3 +283,34 @@ func TestGRPCServer(t *testing.T) { // "tx" : "CmsKZgohL3Bva3Ryb2xsLmdhdGV3YXkuTXNnU3Rha2VHYXRld2F5EkEKK3Bva3QxNXczZmhmeWMwbHR0djdyNTg1ZTJuY3BmNnQya2w5dWg4cnNueXoSEgoFdXBva3QSCTEwMDAwMDAwMRiGOxJYCk4KRgofL2Nvc21vcy5jcnlwdG8uc2VjcDI1NmsxLlB1YktleRIjCiEDZo2bY9XquUsFljtW/OKWVCDhYFf7NbidN4Y99VQ9438SBAoCCAESBhCqoYLJAhpAw5e7iJN5SpFit3fftxnZY7EDiFqupi7XEL3sUyeV0IBSQv2JZ7Cdu0dCG0yEVgj0xarkPi7dR10pNDL1gcUJxw==" //} } + +func TestSanity3(t *testing.T) { + app := NewE2EApp(t) + t.Cleanup(func() { + app.Close() + }) + + time.Sleep(time.Second * 1) + + client := http.DefaultClient + res, err := client.Post( + "http://127.0.0.1:42070/", + "application/json", + bytes.NewBuffer([]byte(`{ + "jsonrpc":"2.0", + "id":"0", + "method":"abci_query", + "params":{"path":"/cosmos.auth.v1beta1.Query/Account", + "data":"0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A", + "prove":false, + "height":"0" + } + }`)), + ) + require.NoError(t, err) + + result, err := io.ReadAll(res.Body) + require.NoError(t, err) + + t.Logf("result: %s", result) +} diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go new file mode 100644 index 000000000..bb2fd4b7c --- /dev/null +++ b/testutil/e2e/comet.go @@ -0,0 +1,123 @@ +package e2e + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" +) + +//func handleABCIQuery(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { +// // Only accept POST method +// if r.Method != http.MethodPost { +// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) +// return +// } +// +// // Read the request body +// body, err := io.ReadAll(r.Body) +// if err != nil { +// http.Error(w, "Error reading request body", http.StatusBadRequest) +// return +// } +// defer r.Body.Close() +// +// // Parse the JSON-RPC request +// var req comettypes.RPCRequest +// if err := json.Unmarshal(body, &req); err != nil { +// http.Error(w, "Invalid JSON request", http.StatusBadRequest) +// return +// } +// +// // Verify method +// if req.Method != "abci_query" { +// fmt.Printf(">>>> WRONG METHOD") +// +// // TODO_IN_THIS_COMMIT: consolidate with other error response logic... +// res := comettypes.RPCInvalidRequestError(req.ID, fmt.Errorf("Method %s not supported", req.Method)) +// json.NewEncoder(w).Encode(res) +// +// return +// } +// +// // Process the ABCI query +// // This is where you'd implement the actual ABCI query logic +// result := processABCIQuery(req.Params) +// +// // Send response +// response := comettypes.RPCResponse{ +// JSONRPC: "2.0", +// ID: req.ID, +// Result: result, +// } +// +// w.Header().Set("Content-Type", "application/json") +// json.NewEncoder(w).Encode(response) +//} + +// handleABCIQuery handles the actual ABCI query logic +func handleABCIQuery(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { + fmt.Println(">>> handleABCIQuery called") + fmt.Printf("Method: %s, URL: %s\n", r.Method, r.URL.Path) + + // Read and log request body + body, err := io.ReadAll(r.Body) + if err != nil { + fmt.Printf("Error reading body: %v\n", err) + http.Error(w, "Error reading request body", http.StatusBadRequest) + return + } + defer r.Body.Close() + fmt.Printf("Request body: %s\n", string(body)) + + // Parse JSON-RPC request + var req rpctypes.RPCRequest + if err := json.Unmarshal(body, &req); err != nil { + fmt.Printf("Error unmarshaling request: %v\n", err) + http.Error(w, "Invalid JSON request", http.StatusBadRequest) + return + } + + fmt.Printf("RPC Method: %s\n", req.Method) + fmt.Printf("RPC ID: %v\n", req.ID) + fmt.Printf("RPC Params: %s\n", string(req.Params)) + + // Send response with nil result + response := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + Result: nil, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + fmt.Println(">>> Response sent") +} + +// processABCIQuery handles the actual ABCI query logic +func processABCIQuery(params json.RawMessage) json.RawMessage { + // Implement your ABCI query processing logic here + // This would typically involve: + // 1. Decoding the hex data + // 2. Parsing the height + // 3. Making the actual ABCI query to your blockchain node + // 4. Processing the response + // 5. Returning the result in the expected format + + fmt.Println(">>>> ABCI_QUERY") + + // For now, returning a placeholder + //return map[string]interface{}{ + // "height": height, + // "result": map[string]interface{}{ + // // Add your actual query result structure here + // "code": 0, + // "log": "", + // "height": height, + // "value": "", // Base64-encoded response value + // }, + //} + return nil +} diff --git a/testutil/integration/app.go b/testutil/integration/app.go index aae7e6071..82b0f1ade 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -563,32 +563,43 @@ func NewCompleteIntegrationApp(t *testing.T, opts ...IntegrationAppOptionFn) *Ap opts..., ) + configurator := module.NewConfigurator(cdc, msgRouter, queryHelper) + //// TODO_IN_THIS_COMMIT: clean up... + //flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") + //keyRing := keyring.NewInMemory(cdc) + //clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + + for _, mod := range integrationApp.GetModuleManager().Modules { + //mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) + mod.(module.HasServices).RegisterServices(configurator) + } + // Register the message servers - banktypes.RegisterMsgServer(msgRouter, bankkeeper.NewMsgServerImpl(bankKeeper)) - tokenomicstypes.RegisterMsgServer(msgRouter, tokenomicskeeper.NewMsgServerImpl(tokenomicsKeeper)) - servicetypes.RegisterMsgServer(msgRouter, servicekeeper.NewMsgServerImpl(serviceKeeper)) - sharedtypes.RegisterMsgServer(msgRouter, sharedkeeper.NewMsgServerImpl(sharedKeeper)) - gatewaytypes.RegisterMsgServer(msgRouter, gatewaykeeper.NewMsgServerImpl(gatewayKeeper)) - apptypes.RegisterMsgServer(msgRouter, appkeeper.NewMsgServerImpl(applicationKeeper)) - suppliertypes.RegisterMsgServer(msgRouter, supplierkeeper.NewMsgServerImpl(supplierKeeper)) - prooftypes.RegisterMsgServer(msgRouter, proofkeeper.NewMsgServerImpl(proofKeeper)) - authtypes.RegisterMsgServer(msgRouter, authkeeper.NewMsgServerImpl(accountKeeper)) - sessiontypes.RegisterMsgServer(msgRouter, sessionkeeper.NewMsgServerImpl(sessionKeeper)) - authz.RegisterMsgServer(msgRouter, authzKeeper) + //banktypes.RegisterMsgServer(msgRouter, bankkeeper.NewMsgServerImpl(bankKeeper)) + //tokenomicstypes.RegisterMsgServer(msgRouter, tokenomicskeeper.NewMsgServerImpl(tokenomicsKeeper)) + //servicetypes.RegisterMsgServer(msgRouter, servicekeeper.NewMsgServerImpl(serviceKeeper)) + //sharedtypes.RegisterMsgServer(msgRouter, sharedkeeper.NewMsgServerImpl(sharedKeeper)) + //gatewaytypes.RegisterMsgServer(msgRouter, gatewaykeeper.NewMsgServerImpl(gatewayKeeper)) + //apptypes.RegisterMsgServer(msgRouter, appkeeper.NewMsgServerImpl(applicationKeeper)) + //suppliertypes.RegisterMsgServer(msgRouter, supplierkeeper.NewMsgServerImpl(supplierKeeper)) + //prooftypes.RegisterMsgServer(msgRouter, proofkeeper.NewMsgServerImpl(proofKeeper)) + //authtypes.RegisterMsgServer(msgRouter, authkeeper.NewMsgServerImpl(accountKeeper)) + //sessiontypes.RegisterMsgServer(msgRouter, sessionkeeper.NewMsgServerImpl(sessionKeeper)) + //authz.RegisterMsgServer(msgRouter, authzKeeper) // Register query servers - banktypes.RegisterQueryServer(queryHelper, bankKeeper) - authz.RegisterQueryServer(queryHelper, authzKeeper) - tokenomicstypes.RegisterQueryServer(queryHelper, tokenomicsKeeper) - servicetypes.RegisterQueryServer(queryHelper, serviceKeeper) - sharedtypes.RegisterQueryServer(queryHelper, sharedKeeper) - gatewaytypes.RegisterQueryServer(queryHelper, gatewayKeeper) - apptypes.RegisterQueryServer(queryHelper, applicationKeeper) - suppliertypes.RegisterQueryServer(queryHelper, supplierKeeper) - prooftypes.RegisterQueryServer(queryHelper, proofKeeper) - // TODO_TECHDEBT: What is the query server for authtypes? - // authtypes.RegisterQueryServer(queryHelper, accountKeeper) - sessiontypes.RegisterQueryServer(queryHelper, sessionKeeper) + //banktypes.RegisterQueryServer(queryHelper, bankKeeper) + //authz.RegisterQueryServer(queryHelper, authzKeeper) + //tokenomicstypes.RegisterQueryServer(queryHelper, tokenomicsKeeper) + //servicetypes.RegisterQueryServer(queryHelper, serviceKeeper) + //sharedtypes.RegisterQueryServer(queryHelper, sharedKeeper) + //gatewaytypes.RegisterQueryServer(queryHelper, gatewayKeeper) + //apptypes.RegisterQueryServer(queryHelper, applicationKeeper) + //suppliertypes.RegisterQueryServer(queryHelper, supplierKeeper) + //prooftypes.RegisterQueryServer(queryHelper, proofKeeper) + //// TODO_TECHDEBT: What is the query server for authtypes? + //// authtypes.RegisterQueryServer(queryHelper, accountKeeper) + //sessiontypes.RegisterQueryServer(queryHelper, sessionKeeper) // Need to go to the next block to finalize the genesis and setup. // This has to be after the params are set, as the params are stored in the diff --git a/testutil/integration/options.go b/testutil/integration/options.go index 1695002cf..e273a14f5 100644 --- a/testutil/integration/options.go +++ b/testutil/integration/options.go @@ -5,6 +5,7 @@ import ( "github.com/cosmos/cosmos-sdk/codec" cosmostypes "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" + gogogrpc "github.com/cosmos/gogoproto/grpc" tlm "github.com/pokt-network/poktroll/x/tokenomics/token_logic_module" ) @@ -17,6 +18,8 @@ type IntegrationAppConfig struct { // InitChainer function. InitChainerModuleFns []InitChainerModuleFn TokenLogicModules []tlm.TokenLogicModule + + grpcServer gogogrpc.Server } // IntegrationAppOptionFn is a function that receives and has the opportunity to @@ -70,3 +73,10 @@ func WithTokenLogicModules(tokenLogicModules []tlm.TokenLogicModule) Integration config.TokenLogicModules = tokenLogicModules } } + +// TODO_IN_THIS_COMMIT: godoc... +func WithGRPCServer(grpcServer gogogrpc.Server) IntegrationAppOptionFn { + return func(config *IntegrationAppConfig) { + config.grpcServer = grpcServer + } +} diff --git a/testutil/testclient/localnet.go b/testutil/testclient/localnet.go index 72c05554e..1e6fdb00b 100644 --- a/testutil/testclient/localnet.go +++ b/testutil/testclient/localnet.go @@ -110,9 +110,9 @@ func NewFlagSet(t gocuke.TestingT, cometTCPURL string) *pflag.FlagSet { mockFlagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) // TODO_IMPROVE: It would be nice if the value could be set correctly based // on whether the test using it is running in tilt or not. - mockFlagSet.Bool(flags.FlagGRPCInsecure, true, "use insecure grpc connection") - mockFlagSet.String(flags.FlagGRPC, cometTCPURL, "use localnet poktrolld node") - //mockFlagSet.String(flags.FlagNode, cometTCPURL, "use localnet poktrolld node") + //mockFlagSet.Bool(flags.FlagGRPCInsecure, true, "use insecure grpc connection") + //mockFlagSet.String(flags.FlagGRPC, cometTCPURL, "use localnet poktrolld node") + mockFlagSet.String(flags.FlagNode, cometTCPURL, "use localnet poktrolld node") mockFlagSet.String(flags.FlagHome, "", "use localnet poktrolld node") mockFlagSet.String(flags.FlagKeyringBackend, "test", "use test keyring") mockFlagSet.String(flags.FlagChainID, app.Name, "use poktroll chain-id") From c76a958691ef61bfa9da959b445cf2bd00dbf221 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 14 Jan 2025 09:17:47 +0100 Subject: [PATCH 6/8] wip --- testutil/e2e/app.go | 13 ++- testutil/e2e/app_test.go | 5 + testutil/e2e/comet.go | 199 ++++++++++++++++++++++++++++++++------- 3 files changed, 180 insertions(+), 37 deletions(-) diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index 383f18986..fd7deb185 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -12,6 +12,7 @@ import ( "github.com/cosmos/cosmos-sdk/types/module" "github.com/gorilla/websocket" "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/grpc-ecosystem/grpc-gateway/utilities" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -46,15 +47,12 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp abciQueryPattern, err := runtime.NewPattern( 1, - []int{}, + []int{int(utilities.OpLitPush), int(utilities.OpNop)}, []string{""}, "", ) require.NoError(t, err) - // Register the handler with the mux - mux.Handle(http.MethodPost, abciQueryPattern, handleABCIQuery) - // Create the integration app opts = append(opts, integration.WithGRPCServer(grpcServer)) app := integration.NewCompleteIntegrationApp(t, opts...) @@ -66,6 +64,13 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp keyRing := keyring.NewInMemory(app.GetCodec()) clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + // Register the handler with the mux + //client, err := comethttp.New("tcp://127.0.0.1:42070", "/websocket") + client, err := grpc.NewClient("127.0.0.1:42069", grpc.WithInsecure()) + require.NoError(t, err) + + mux.Handle(http.MethodPost, abciQueryPattern, newHandleABCIQuery(t, app.GetCodec(), client)) + //authtx.RegisterTxService( // //app.GRPCQueryRouter(), // grpcServer, diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go index 518059c07..18f53c9b3 100644 --- a/testutil/e2e/app_test.go +++ b/testutil/e2e/app_test.go @@ -293,6 +293,11 @@ func TestSanity3(t *testing.T) { time.Sleep(time.Second * 1) client := http.DefaultClient + //res, err := client.Do(&http.Request{ + // Method: http.MethodPost, + // URL: &url.URL{Scheme: "http", Host: "127.0.0.1:42070", Path: ""}, + // Body: io.NopCloser(bytes.NewBuffer([]byte(`{"jsonrpc":"2.0","id":"0","method":"abci_query","params":{"path":"/cosmos.auth.v1beta1.Query/Account","data":"0A2B706F6B74313577336668667963306C747476377235383565326E6370663674326B6C3975683872736E797A","prove":false,"height":"0"}}`))), + //}) res, err := client.Post( "http://127.0.0.1:42070/", "application/json", diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go index bb2fd4b7c..ef0f03d88 100644 --- a/testutil/e2e/comet.go +++ b/testutil/e2e/comet.go @@ -1,12 +1,24 @@ package e2e import ( + "bytes" + "context" + "encoding/hex" "encoding/json" "fmt" "io" "net/http" + "testing" + "github.com/cometbft/cometbft/abci/types" + cmtjson "github.com/cometbft/cometbft/libs/json" + coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + cosmoscodec "github.com/cosmos/cosmos-sdk/codec" + authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" + gogogrpc "github.com/cosmos/gogoproto/grpc" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/stretchr/testify/require" ) //func handleABCIQuery(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { @@ -58,42 +70,163 @@ import ( //} // handleABCIQuery handles the actual ABCI query logic -func handleABCIQuery(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { - fmt.Println(">>> handleABCIQuery called") - fmt.Printf("Method: %s, URL: %s\n", r.Method, r.URL.Path) - - // Read and log request body - body, err := io.ReadAll(r.Body) - if err != nil { - fmt.Printf("Error reading body: %v\n", err) - http.Error(w, "Error reading request body", http.StatusBadRequest) - return - } - defer r.Body.Close() - fmt.Printf("Request body: %s\n", string(body)) - - // Parse JSON-RPC request - var req rpctypes.RPCRequest - if err := json.Unmarshal(body, &req); err != nil { - fmt.Printf("Error unmarshaling request: %v\n", err) - http.Error(w, "Invalid JSON request", http.StatusBadRequest) - return - } +func newHandleABCIQuery(t *testing.T, cdc cosmoscodec.Codec, client gogogrpc.ClientConn) runtime.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { + ctx := context.Background() - fmt.Printf("RPC Method: %s\n", req.Method) - fmt.Printf("RPC ID: %v\n", req.ID) - fmt.Printf("RPC Params: %s\n", string(req.Params)) + fmt.Println(">>> handleABCIQuery called") + //fmt.Printf("Method: %s, URL: %s\n", r.Method, r.URL.Path) - // Send response with nil result - response := rpctypes.RPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Result: nil, - } + // Read and log request body + body, err := io.ReadAll(r.Body) + if err != nil { + //fmt.Printf("Error reading body: %v\n", err) + http.Error(w, "Error reading request body", http.StatusBadRequest) + return + } + defer r.Body.Close() + //fmt.Printf("Request body: %s\n", string(body)) + + // Parse JSON-RPC request + var req rpctypes.RPCRequest + if err := json.Unmarshal(body, &req); err != nil { + fmt.Printf("Error unmarshaling request: %v\n", err) + http.Error(w, "Invalid JSON request", http.StatusBadRequest) + return + } + + //fmt.Printf("RPC Method: %s\n", req.Method) + //fmt.Printf("RPC ID: %v\n", req.ID) + //fmt.Printf("RPC Params: %s\n", string(req.Params)) + + params := make(map[string]json.RawMessage) + err = json.Unmarshal(req.Params, ¶ms) + require.NoError(t, err) + + //t.Logf(">>> params: %+v", params) + + //abciRes, err := client.ABCIQueryWithOptions(ctx, path, data, opts) + //require.NoError(t, err) + + //var args, reply any + //var jsonRes json.RawMessage + switch req.Method { + // TODO_IN_THIS_COMMIT: extract... + case "abci_query": + // TODO_IN_THIS_COMMIT: add switch for different service/method handlers... + + pathRaw, hasPath := params["path"] + require.True(t, hasPath) + + //abciQueryReq := new(cmtservice.ABCIQueryRequest) + var path string + err = json.Unmarshal(pathRaw, &path) + require.NoError(t, err) + + dataRaw, hasData := params["data"] + require.True(t, hasData) + + data, err := hex.DecodeString(string(bytes.Trim(dataRaw, `"`))) + require.NoError(t, err) - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) - fmt.Println(">>> Response sent") + queryReq := new(authtypes.QueryAccountRequest) + err = queryReq.Unmarshal(data) + require.NoError(t, err) + + var height int64 + heightRaw, hasHeight := params["height"] + if hasHeight { + err = json.Unmarshal(bytes.Trim(heightRaw, `"`), &height) + require.NoError(t, err) + } + + //proveRaw, hasProve := params["prove"] + //if hasProve { + // err = json.Unmarshal(proveRaw, &abciQueryReq.Prove) + // require.NoError(t, err) + //} + + //abciQueryRes := new(cmtservice.ABCIQueryResponse) + //err = client.Invoke(ctx, fmt.Sprintf("%s", abciQueryReq.Path), abciQueryReq, abciQueryRes) + queryRes := new(authtypes.QueryAccountResponse) + + err = client.Invoke(ctx, path, queryReq, queryRes) + require.NoError(t, err) + + //resData, err := cdc.MarshalJSON(queryRes) + resData, err := queryRes.Marshal() + require.NoError(t, err) + + abciQueryRes := coretypes.ResultABCIQuery{ + Response: types.ResponseQuery{ + //Code: 0, + //Log: "", + //Info: "", + //Index: 0, + //Key: nil, + Value: resData, + //ProofOps: nil, + Height: height, + //Codespace: "", + }, + } + //abciQueryRes := &cmtservice.ABCIQueryResponse{ + // //Code: 0, + // //Log: "", + // //Info: "", + // //Index: 0, + // //Key: nil, + // Value: resData, + // //ProofOps: nil, + // Height: height, + // //Codespace: "", + //} + ////jsonRes, err = json.Marshal(abciQueryRes) + ////jsonRes, err = cmtjson.Marshal(queryRes) + //require.NoError(t, err) + + w.Header().Set("Content-Type", "application/json") + //json.NewEncoder(w).Encode(abciQueryRes) + //err = json.NewEncoder(w).Encode(queryRes) + //jsonRes, err := cdc.MarshalJSON(queryRes) + //jsonRes, err := cmtjson.Marshal(queryRes) + + jsonRes, err := cmtjson.Marshal(abciQueryRes) + require.NoError(t, err) + + response := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + Result: jsonRes, + } + + //_, err = w.Write(response) + err = json.NewEncoder(w).Encode(response) + //err = json.NewEncoder(w).Encode(abciQueryRes) + require.NoError(t, err) + fmt.Println(">>> Response sent") + case "broadcast_tx_sync": + fmt.Println(">>>> BROADCAST_TX_SYNC") + t.Fatalf("ERROR: unsupported method %s", req.Method) + case "broadcast_tx_async": + fmt.Println(">>>> BROADCAST_TX_ASYNC") + t.Fatalf("ERROR: unsupported method %s", req.Method) + default: + t.Fatalf("ERROR: unsupported method %s", req.Method) + } + + // Send response with nil result + //response := rpctypes.NewRPCSuccessResponse(req.ID, nil) + //response := rpctypes.RPCResponse{ + // JSONRPC: "2.0", + // ID: req.ID, + // Result: jsonRes, + //} + + //w.Header().Set("Content-Type", "application/json") + //json.NewEncoder(w).Encode(response) + //fmt.Println(">>> Response sent") + } } // processABCIQuery handles the actual ABCI query logic From 423067b1ce4d56a9ec7ded7308d8b16393304d28 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 14 Jan 2025 09:45:42 +0100 Subject: [PATCH 7/8] wip: some cleanup --- testutil/e2e/app.go | 16 +-- testutil/e2e/comet.go | 203 +++++++++++++++++------------------- testutil/e2e/grpc_server.go | 176 ------------------------------- 3 files changed, 99 insertions(+), 296 deletions(-) diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index fd7deb185..d1335768e 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -45,7 +45,7 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp grpcServer := grpc.NewServer(grpc.Creds(creds)) mux := runtime.NewServeMux() - abciQueryPattern, err := runtime.NewPattern( + rootPattern, err := runtime.NewPattern( 1, []int{int(utilities.OpLitPush), int(utilities.OpNop)}, []string{""}, @@ -57,28 +57,16 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp opts = append(opts, integration.WithGRPCServer(grpcServer)) app := integration.NewCompleteIntegrationApp(t, opts...) app.RegisterGRPCServer(grpcServer) - //app.RegisterGRPCServer(app.MsgServiceRouter()) - //app.RegisterGRPCServer(e2eApp.grpcServer) flagSet := testclient.NewFlagSet(t, "tcp://127.0.0.1:42070") keyRing := keyring.NewInMemory(app.GetCodec()) clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) // Register the handler with the mux - //client, err := comethttp.New("tcp://127.0.0.1:42070", "/websocket") client, err := grpc.NewClient("127.0.0.1:42069", grpc.WithInsecure()) require.NoError(t, err) - mux.Handle(http.MethodPost, abciQueryPattern, newHandleABCIQuery(t, app.GetCodec(), client)) - - //authtx.RegisterTxService( - // //app.GRPCQueryRouter(), - // grpcServer, - // clientCtx, - // app.Simulate, - // app.GetRegistry(), - //) - //authtx.RegisterGRPCGatewayRoutes(clientCtx, mux) + mux.Handle(http.MethodPost, rootPattern, newPostHandler(client)) for _, mod := range app.GetModuleManager().Modules { mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go index ef0f03d88..b657becc6 100644 --- a/testutil/e2e/comet.go +++ b/testutil/e2e/comet.go @@ -8,17 +8,14 @@ import ( "fmt" "io" "net/http" - "testing" "github.com/cometbft/cometbft/abci/types" cmtjson "github.com/cometbft/cometbft/libs/json" coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" - cosmoscodec "github.com/cosmos/cosmos-sdk/codec" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" gogogrpc "github.com/cosmos/gogoproto/grpc" "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/stretchr/testify/require" ) //func handleABCIQuery(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { @@ -69,163 +66,139 @@ import ( // json.NewEncoder(w).Encode(response) //} +const ( + authAccountQuery = "/cosmos.auth.v1beta1.Query/Account" +) + // handleABCIQuery handles the actual ABCI query logic -func newHandleABCIQuery(t *testing.T, cdc cosmoscodec.Codec, client gogogrpc.ClientConn) runtime.HandlerFunc { +func newPostHandler(client gogogrpc.ClientConn) runtime.HandlerFunc { return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { ctx := context.Background() - fmt.Println(">>> handleABCIQuery called") - //fmt.Printf("Method: %s, URL: %s\n", r.Method, r.URL.Path) - // Read and log request body body, err := io.ReadAll(r.Body) if err != nil { - //fmt.Printf("Error reading body: %v\n", err) http.Error(w, "Error reading request body", http.StatusBadRequest) return } defer r.Body.Close() - //fmt.Printf("Request body: %s\n", string(body)) // Parse JSON-RPC request var req rpctypes.RPCRequest - if err := json.Unmarshal(body, &req); err != nil { - fmt.Printf("Error unmarshaling request: %v\n", err) - http.Error(w, "Invalid JSON request", http.StatusBadRequest) + if err = json.Unmarshal(body, &req); err != nil { + writeErrorResponseFromErr(w, req, err) return } - //fmt.Printf("RPC Method: %s\n", req.Method) - //fmt.Printf("RPC ID: %v\n", req.ID) - //fmt.Printf("RPC Params: %s\n", string(req.Params)) - params := make(map[string]json.RawMessage) - err = json.Unmarshal(req.Params, ¶ms) - require.NoError(t, err) - - //t.Logf(">>> params: %+v", params) - - //abciRes, err := client.ABCIQueryWithOptions(ctx, path, data, opts) - //require.NoError(t, err) + if err = json.Unmarshal(req.Params, ¶ms); err != nil { + writeErrorResponseFromErr(w, req, err) + return + } - //var args, reply any - //var jsonRes json.RawMessage + var response rpctypes.RPCResponse switch req.Method { // TODO_IN_THIS_COMMIT: extract... case "abci_query": - // TODO_IN_THIS_COMMIT: add switch for different service/method handlers... + var ( + resData []byte + height int64 + ) pathRaw, hasPath := params["path"] - require.True(t, hasPath) + if !hasPath { + writeErrorResponse(w, req, "missing path param", string(req.Params)) + return + } - //abciQueryReq := new(cmtservice.ABCIQueryRequest) var path string - err = json.Unmarshal(pathRaw, &path) - require.NoError(t, err) - - dataRaw, hasData := params["data"] - require.True(t, hasData) - - data, err := hex.DecodeString(string(bytes.Trim(dataRaw, `"`))) - require.NoError(t, err) - - queryReq := new(authtypes.QueryAccountRequest) - err = queryReq.Unmarshal(data) - require.NoError(t, err) - - var height int64 - heightRaw, hasHeight := params["height"] - if hasHeight { - err = json.Unmarshal(bytes.Trim(heightRaw, `"`), &height) - require.NoError(t, err) + if err = json.Unmarshal(pathRaw, &path); err != nil { + writeErrorResponseFromErr(w, req, err) + return } - //proveRaw, hasProve := params["prove"] - //if hasProve { - // err = json.Unmarshal(proveRaw, &abciQueryReq.Prove) - // require.NoError(t, err) - //} - - //abciQueryRes := new(cmtservice.ABCIQueryResponse) - //err = client.Invoke(ctx, fmt.Sprintf("%s", abciQueryReq.Path), abciQueryReq, abciQueryRes) - queryRes := new(authtypes.QueryAccountResponse) - - err = client.Invoke(ctx, path, queryReq, queryRes) - require.NoError(t, err) - - //resData, err := cdc.MarshalJSON(queryRes) - resData, err := queryRes.Marshal() - require.NoError(t, err) + switch path { + case authAccountQuery: + //abciQueryReq := new(cmtservice.ABCIQueryRequest) + + dataRaw, hasData := params["data"] + if !hasData { + writeErrorResponse(w, req, "missing data param", string(req.Params)) + return + } + + data, err := hex.DecodeString(string(bytes.Trim(dataRaw, `"`))) + if err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + + queryReq := new(authtypes.QueryAccountRequest) + if err = queryReq.Unmarshal(data); err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + + var height int64 + heightRaw, hasHeight := params["height"] + if hasHeight { + if err = json.Unmarshal(bytes.Trim(heightRaw, `"`), &height); err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + } + + queryRes := new(authtypes.QueryAccountResponse) + + if err = client.Invoke(ctx, path, queryReq, queryRes); err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + + resData, err = queryRes.Marshal() + if err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + } abciQueryRes := coretypes.ResultABCIQuery{ Response: types.ResponseQuery{ //Code: 0, - //Log: "", - //Info: "", //Index: 0, //Key: nil, - Value: resData, - //ProofOps: nil, + Value: resData, Height: height, - //Codespace: "", }, } - //abciQueryRes := &cmtservice.ABCIQueryResponse{ - // //Code: 0, - // //Log: "", - // //Info: "", - // //Index: 0, - // //Key: nil, - // Value: resData, - // //ProofOps: nil, - // Height: height, - // //Codespace: "", - //} - ////jsonRes, err = json.Marshal(abciQueryRes) - ////jsonRes, err = cmtjson.Marshal(queryRes) - //require.NoError(t, err) w.Header().Set("Content-Type", "application/json") - //json.NewEncoder(w).Encode(abciQueryRes) - //err = json.NewEncoder(w).Encode(queryRes) - //jsonRes, err := cdc.MarshalJSON(queryRes) - //jsonRes, err := cmtjson.Marshal(queryRes) jsonRes, err := cmtjson.Marshal(abciQueryRes) - require.NoError(t, err) + if err != nil { + writeErrorResponseFromErr(w, req, err) + return + } - response := rpctypes.RPCResponse{ + response = rpctypes.RPCResponse{ JSONRPC: "2.0", ID: req.ID, Result: jsonRes, } - - //_, err = w.Write(response) - err = json.NewEncoder(w).Encode(response) - //err = json.NewEncoder(w).Encode(abciQueryRes) - require.NoError(t, err) fmt.Println(">>> Response sent") case "broadcast_tx_sync": fmt.Println(">>>> BROADCAST_TX_SYNC") - t.Fatalf("ERROR: unsupported method %s", req.Method) + response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) case "broadcast_tx_async": fmt.Println(">>>> BROADCAST_TX_ASYNC") - t.Fatalf("ERROR: unsupported method %s", req.Method) + response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) default: - t.Fatalf("ERROR: unsupported method %s", req.Method) + response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) } - // Send response with nil result - //response := rpctypes.NewRPCSuccessResponse(req.ID, nil) - //response := rpctypes.RPCResponse{ - // JSONRPC: "2.0", - // ID: req.ID, - // Result: jsonRes, - //} - - //w.Header().Set("Content-Type", "application/json") - //json.NewEncoder(w).Encode(response) - //fmt.Println(">>> Response sent") + if err = json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } } } @@ -254,3 +227,21 @@ func processABCIQuery(params json.RawMessage) json.RawMessage { //} return nil } + +// TODO_IN_THIS_COMMIT: godoc... +func writeErrorResponseFromErr(w http.ResponseWriter, req rpctypes.RPCRequest, err error) { + var errMsg string + if err != nil { + errMsg = err.Error() + } + writeErrorResponse(w, req, errMsg, "") +} + +// TODO_IN_THIS_COMMIT: godoc... +func writeErrorResponse(w http.ResponseWriter, req rpctypes.RPCRequest, msg, data string) { + errRes := rpctypes.NewRPCErrorResponse(req.ID, 500, msg, data) + if err := json.NewEncoder(w).Encode(errRes); err != nil { + // TODO_IN_THIS_COMMIT: log error + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} diff --git a/testutil/e2e/grpc_server.go b/testutil/e2e/grpc_server.go index c01d4609e..df8caf702 100644 --- a/testutil/e2e/grpc_server.go +++ b/testutil/e2e/grpc_server.go @@ -1,177 +1 @@ package e2e - -import ( - "context" - "fmt" - "strings" - "testing" - - "github.com/cosmos/cosmos-sdk/baseapp" - sdk "github.com/cosmos/cosmos-sdk/types" - "google.golang.org/grpc" - "google.golang.org/protobuf/proto" -) - -// newGRPCServer creates and configures a new gRPC server for the E2EApp -func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { - grpcServer := grpc.NewServer() - app.RegisterGRPCServer(grpcServer) - - return grpcServer -} - -// grpcForwarderServer implements a generic gRPC service that forwards all queries -// to the queryHelper and messages to the app -type grpcForwarderServer struct { - queryHelper *baseapp.QueryServiceTestHelper - app *E2EApp - t *testing.T -} - -// Invoke implements the grpc.Server interface and forwards all requests appropriately -func (s *grpcForwarderServer) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { - // Determine if this is a query or message based on the method name - if isQuery(method) { - return s.queryHelper.Invoke(ctx, method, args, reply) - } - - // If it's not a query, treat it as a message - msg, ok := args.(sdk.Msg) - if !ok { - return fmt.Errorf("expected sdk.Msg, got %T", args) - } - - // Run the message through the app - msgRes, err := s.app.RunMsg(s.t, msg) - if err != nil { - return err - } - - // Type assert the reply as a proto.Message - protoReply, ok := reply.(proto.Message) - if !ok { - return fmt.Errorf("expected proto.Message, got %T", reply) - } - - // Type assert the response as a proto.Message - protoRes, ok := msgRes.(proto.Message) - if !ok { - return fmt.Errorf("expected proto.Message response, got %T", msgRes) - } - - // Marshal the response to bytes - resBz, err := proto.Marshal(protoRes) - if err != nil { - return fmt.Errorf("failed to marshal response: %w", err) - } - - // Unmarshal into the reply - return proto.Unmarshal(resBz, protoReply) -} - -// NewStream implements the grpc.Server interface but is not used in this implementation -func (s *grpcForwarderServer) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { - return nil, fmt.Errorf("streaming is not supported") -} - -// isQuery returns true if the method name indicates this is a query request -func isQuery(method string) bool { - return strings.Contains(method, ".Query/") -} - -//func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { -// grpcServer := grpc.NewServer() -// reflection.Register(grpcServer) -// -// forwarder := &grpcForwarderServer{ -// app: app, -// t: t, -// queryHelper: app.QueryHelper(), -// msgRouter: app.MsgServiceRouter(), -// msgHandlers: map[string]interface{}{}, -// } -// -// // Forward all gRPC messages through our forwarder -// sd := &grpc.ServiceDesc{ -// ServiceName: "cosmos.Service", -// HandlerType: (*interface{})(nil), -// Methods: []grpc.MethodDesc{ -// { -// MethodName: "HandleMessage", -// Handler: forwarder.handleMessageGeneric, -// }, -// }, -// } -// grpcServer.RegisterService(sd, forwarder) -// -// return grpcServer -//} -// -//type grpcForwarderServer struct { -// app *E2EApp -// t *testing.T -// queryHelper *baseapp.QueryServiceTestHelper -// msgRouter *baseapp.MsgServiceRouter -// msgHandlers map[string]interface{} -//} -// -//func (s *grpcForwarderServer) handleMessageGeneric( -// srv interface{}, -// ctx context.Context, -// dec func(interface{}) error, -// interceptor grpc.UnaryServerInterceptor, -//) (interface{}, error) { -// msg, ok := srv.(sdk.Msg) -// if !ok { -// return nil, fmt.Errorf("invalid message type: %T", srv) -// } -// -// // Use the app's existing message handling infrastructure -// msgRes, err := s.app.RunMsg(s.t, msg) -// if err != nil { -// return nil, err -// } -// -// return msgRes, nil -//} - -//func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { -// grpcServer := grpc.NewServer() -// reflection.Register(grpcServer) -// -// // Register a service handler that forwards to MsgServiceRouter -// msgHandler := &grpcForwarderServer{app: app, t: t} -// serverServiceDesc := &grpc.ServiceDesc{ -// ServiceName: "cosmos.msg.v1.Msg", -// HandlerType: (*interface{})(nil), -// Methods: []grpc.MethodDesc{{ -// MethodName: "HandleMessage", -// Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { -// var msg sdk.Msg -// if err := dec(&msg); err != nil { -// return nil, err -// } -// return msgHandler.app.RunMsg(msgHandler.t, msg) -// }, -// }}, -// } -// grpcServer.RegisterService(serverServiceDesc, msgHandler) -// -// // Set up the gRPC-Gateway mux -// mux := runtime.NewServeMux() -// opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} -// -// // Register all your service handlers with the mux -// if err := gatewaytypes.RegisterMsgHandlerFromEndpoint(context.Background(), mux, app.grpcListener.Addr().String(), opts); err != nil { -// panic(err) -// } -// -// // Start HTTP server with the mux -// go func() { -// if err := http.ListenAndServe(":42070", mux); err != nil { -// panic(err) -// } -// }() -// -// return grpcServer -//} From 2b76ef108d96869d586cd943bb09b53adbb47641 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 14 Jan 2025 11:06:12 +0100 Subject: [PATCH 8/8] wip --- testutil/e2e/app.go | 4 +- testutil/e2e/comet.go | 161 +++++++++++++++--------------------- testutil/integration/app.go | 54 +++++++++--- 3 files changed, 108 insertions(+), 111 deletions(-) diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go index d1335768e..d843984d9 100644 --- a/testutil/e2e/app.go +++ b/testutil/e2e/app.go @@ -66,8 +66,6 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp client, err := grpc.NewClient("127.0.0.1:42069", grpc.WithInsecure()) require.NoError(t, err) - mux.Handle(http.MethodPost, rootPattern, newPostHandler(client)) - for _, mod := range app.GetModuleManager().Modules { mod.(module.AppModuleBasic).RegisterGRPCGatewayRoutes(clientCtx, mux) } @@ -89,6 +87,8 @@ func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp blockEventChan: make(chan *coretypes.ResultEvent, 1), } + mux.Handle(http.MethodPost, rootPattern, newPostHandler(client, e2eApp)) + go func() { if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { if !errors.Is(err, http.ErrServerClosed) { diff --git a/testutil/e2e/comet.go b/testutil/e2e/comet.go index b657becc6..5e607f039 100644 --- a/testutil/e2e/comet.go +++ b/testutil/e2e/comet.go @@ -10,70 +10,35 @@ import ( "net/http" "github.com/cometbft/cometbft/abci/types" - cmtjson "github.com/cometbft/cometbft/libs/json" coretypes "github.com/cometbft/cometbft/rpc/core/types" + coregrpc "github.com/cometbft/cometbft/rpc/grpc" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" gogogrpc "github.com/cosmos/gogoproto/grpc" "github.com/grpc-ecosystem/grpc-gateway/runtime" ) -//func handleABCIQuery(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { -// // Only accept POST method -// if r.Method != http.MethodPost { -// http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) -// return -// } -// -// // Read the request body -// body, err := io.ReadAll(r.Body) -// if err != nil { -// http.Error(w, "Error reading request body", http.StatusBadRequest) -// return -// } -// defer r.Body.Close() -// -// // Parse the JSON-RPC request -// var req comettypes.RPCRequest -// if err := json.Unmarshal(body, &req); err != nil { -// http.Error(w, "Invalid JSON request", http.StatusBadRequest) -// return -// } -// -// // Verify method -// if req.Method != "abci_query" { -// fmt.Printf(">>>> WRONG METHOD") -// -// // TODO_IN_THIS_COMMIT: consolidate with other error response logic... -// res := comettypes.RPCInvalidRequestError(req.ID, fmt.Errorf("Method %s not supported", req.Method)) -// json.NewEncoder(w).Encode(res) -// -// return -// } -// -// // Process the ABCI query -// // This is where you'd implement the actual ABCI query logic -// result := processABCIQuery(req.Params) -// -// // Send response -// response := comettypes.RPCResponse{ -// JSONRPC: "2.0", -// ID: req.ID, -// Result: result, -// } -// -// w.Header().Set("Content-Type", "application/json") -// json.NewEncoder(w).Encode(response) -//} +// TODO_IN_THIS_COMMIT: godoc... +type CometBFTMethod string + +// TODO_IN_THIS_COMMIT: godoc... +type ServiceMethodUri string const ( - authAccountQuery = "/cosmos.auth.v1beta1.Query/Account" + abciQueryMethod = CometBFTMethod("abci_query") + broadcastTxSyncMethod = CometBFTMethod("broadcast_tx_sync") + broadcastTxAsyncMethod = CometBFTMethod("broadcast_tx_async") + broadcastTxCommitMethod = CometBFTMethod("broadcast_tx_commit") + + authAccountQueryUri = ServiceMethodUri("/cosmos.auth.v1beta1.Query/Account") ) // handleABCIQuery handles the actual ABCI query logic -func newPostHandler(client gogogrpc.ClientConn) runtime.HandlerFunc { +func newPostHandler(client gogogrpc.ClientConn, app *E2EApp) runtime.HandlerFunc { return func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { ctx := context.Background() + // DEV_NOTE: http.Error() automatically sets the Content-Type header to "text/plain". + w.Header().Set("Content-Type", "application/json") // Read and log request body body, err := io.ReadAll(r.Body) @@ -97,9 +62,9 @@ func newPostHandler(client gogogrpc.ClientConn) runtime.HandlerFunc { } var response rpctypes.RPCResponse - switch req.Method { + switch CometBFTMethod(req.Method) { // TODO_IN_THIS_COMMIT: extract... - case "abci_query": + case abciQueryMethod: var ( resData []byte height int64 @@ -117,10 +82,8 @@ func newPostHandler(client gogogrpc.ClientConn) runtime.HandlerFunc { return } - switch path { - case authAccountQuery: - //abciQueryReq := new(cmtservice.ABCIQueryRequest) - + switch ServiceMethodUri(path) { + case authAccountQueryUri: dataRaw, hasData := params["data"] if !hasData { writeErrorResponse(w, req, "missing data param", string(req.Params)) @@ -149,7 +112,6 @@ func newPostHandler(client gogogrpc.ClientConn) runtime.HandlerFunc { } queryRes := new(authtypes.QueryAccountResponse) - if err = client.Invoke(ctx, path, queryReq, queryRes); err != nil { writeErrorResponseFromErr(w, req, err) return @@ -172,26 +134,59 @@ func newPostHandler(client gogogrpc.ClientConn) runtime.HandlerFunc { }, } - w.Header().Set("Content-Type", "application/json") + response = rpctypes.NewRPCSuccessResponse(req.ID, abciQueryRes) + case broadcastTxSyncMethod, broadcastTxAsyncMethod, broadcastTxCommitMethod: + fmt.Println(">>>> BROADCAST_TX") + + var txBz []byte + txRaw, hasTx := params["tx"] + if !hasTx { + writeErrorResponse(w, req, "missing tx param", string(req.Params)) + return + } + if err = json.Unmarshal(txRaw, &txBz); err != nil { + writeErrorResponseFromErr(w, req, err) + return + } + + // TODO_CONSIDERATION: more correct implementation of the different + // broadcast_tx methods (i.e. sync, async, commit) is a matter of + // the sequence of running the tx and sending the JSON-RPC response. - jsonRes, err := cmtjson.Marshal(abciQueryRes) + _, finalizeBlockRes, err := app.RunTx(nil, txBz) if err != nil { writeErrorResponseFromErr(w, req, err) return } - response = rpctypes.RPCResponse{ - JSONRPC: "2.0", - ID: req.ID, - Result: jsonRes, + // DEV_NOTE: There SHOULD ALWAYS be exactly one tx result so long as + // we're finalizing one tx at a time (single tx blocks). + txRes := finalizeBlockRes.GetTxResults()[0] + + bcastTxRes := coregrpc.ResponseBroadcastTx{ + CheckTx: &types.ResponseCheckTx{ + Code: txRes.GetCode(), + Data: txRes.GetData(), + Log: txRes.GetLog(), + Info: txRes.GetInfo(), + GasWanted: txRes.GetGasWanted(), + GasUsed: txRes.GetGasUsed(), + Events: txRes.GetEvents(), + Codespace: txRes.GetCodespace(), + }, + TxResult: &types.ExecTxResult{ + Code: txRes.GetCode(), + Data: txRes.GetData(), + Log: txRes.GetLog(), + Info: txRes.GetInfo(), + GasWanted: txRes.GetGasWanted(), + GasUsed: txRes.GetGasUsed(), + Events: txRes.GetEvents(), + Codespace: txRes.GetCodespace(), + }, } - fmt.Println(">>> Response sent") - case "broadcast_tx_sync": - fmt.Println(">>>> BROADCAST_TX_SYNC") - response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) - case "broadcast_tx_async": - fmt.Println(">>>> BROADCAST_TX_ASYNC") - response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) + + response = rpctypes.NewRPCSuccessResponse(req.ID, bcastTxRes) default: response = rpctypes.NewRPCErrorResponse(req.ID, 500, "unsupported method", string(req.Params)) } @@ -202,32 +197,6 @@ func newPostHandler(client gogogrpc.ClientConn) runtime.HandlerFunc { } } -// processABCIQuery handles the actual ABCI query logic -func processABCIQuery(params json.RawMessage) json.RawMessage { - // Implement your ABCI query processing logic here - // This would typically involve: - // 1. Decoding the hex data - // 2. Parsing the height - // 3. Making the actual ABCI query to your blockchain node - // 4. Processing the response - // 5. Returning the result in the expected format - - fmt.Println(">>>> ABCI_QUERY") - - // For now, returning a placeholder - //return map[string]interface{}{ - // "height": height, - // "result": map[string]interface{}{ - // // Add your actual query result structure here - // "code": 0, - // "log": "", - // "height": height, - // "value": "", // Base64-encoded response value - // }, - //} - return nil -} - // TODO_IN_THIS_COMMIT: godoc... func writeErrorResponseFromErr(w http.ResponseWriter, req rpctypes.RPCRequest, err error) { var errMsg string diff --git a/testutil/integration/app.go b/testutil/integration/app.go index 82b0f1ade..84caf1544 100644 --- a/testutil/integration/app.go +++ b/testutil/integration/app.go @@ -705,7 +705,9 @@ func (app *App) GetFaucetBech32() string { // returned. In order to run a message, the application must have a handler for it. // These handlers are registered on the application message service router. func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgResponse, err error) { - t.Helper() + if t != nil { + t.Helper() + } // Commit the updated state after the message has been handled. var finalizeBlockRes *abci.ResponseFinalizeBlock @@ -738,6 +740,25 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo app.logger.Info("Running msg", "msg", msg.String()) } + txMsgResps, finalizeBlockRes, err = app.RunTx(t, txBz) + if err != nil { + // DEV_NOTE: Intentionally returning and not asserting nil error to improve reusability. + return nil, err + } + + return txMsgResps, nil +} + +// TODO_IN_THIS_COMMIT: godoc... +func (app *App) RunTx(t *testing.T, txBz []byte) ( + txMsgResps []tx.MsgResponse, + finalizeBlockRes *abci.ResponseFinalizeBlock, + err error, +) { + if t != nil { + t.Helper() + } + // Finalize the block with the transaction. finalizeBlockReq := &cmtabcitypes.RequestFinalizeBlock{ Height: app.LastBlockHeight() + 1, @@ -751,12 +772,14 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo finalizeBlockRes, err = app.FinalizeBlock(finalizeBlockReq) if err != nil { - return nil, fmt.Errorf("finalizing block: %w", err) + return nil, nil, fmt.Errorf("finalizing block: %w", err) } - // NB: We're batching the messages in a single transaction, so we expect - // a single transaction result. - require.Equal(t, 1, len(finalizeBlockRes.TxResults)) + if t != nil { + // NB: We're batching the messages in a single transaction, so we expect + // a single transaction result. + require.Equal(t, 1, len(finalizeBlockRes.TxResults)) + } // Collect the message responses. Accumulate errors related to message handling // failure. If any message fails, an error will be returned. @@ -769,24 +792,29 @@ func (app *App) RunMsgs(t *testing.T, msgs ...sdk.Msg) (txMsgResps []tx.MsgRespo } txMsgDataBz := txResult.GetData() - require.NotNil(t, txMsgDataBz) + if t != nil { + require.NotNil(t, txMsgDataBz) + } txMsgData := new(cosmostypes.TxMsgData) err = app.GetCodec().Unmarshal(txMsgDataBz, txMsgData) - require.NoError(t, err) + if t != nil { + require.NoError(t, err) + } var txMsgRes tx.MsgResponse err = app.GetCodec().UnpackAny(txMsgData.MsgResponses[0], &txMsgRes) - require.NoError(t, err) - require.NotNil(t, txMsgRes) + if t != nil { + require.NoError(t, err) + require.NotNil(t, txMsgRes) + } else { + return nil, nil, err + } txMsgResps = append(txMsgResps, txMsgRes) } - if txResultErrs != nil { - return nil, err - } - return txMsgResps, nil + return txMsgResps, finalizeBlockRes, nil } // NextBlocks calls NextBlock numBlocks times