Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Testing, Tooling] Expose integration app via gRPC/HTTP/WS #1017

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions testutil/e2e/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package e2e

import (
"context"
"encoding/json"
"net"
"net/http"
"sync"
"testing"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

coretypes "github.com/cometbft/cometbft/rpc/core/types"
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"

"github.com/pokt-network/poktroll/testutil/integration"
)

// E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing
type E2EApp struct {
*integration.App
grpcServer *grpc.Server
grpcListener net.Listener
wsServer *http.Server
wsListener net.Listener
httpServer *http.Server
httpListener net.Listener
wsUpgrader websocket.Upgrader
wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries
wsConnMutex sync.RWMutex
blockEventChan chan *coretypes.ResultEvent
}

// NewE2EApp creates a new E2EApp instance with integration.App, gRPC, and WebSocket servers
func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp {
t.Helper()

// Create the integration app
app := integration.NewCompleteIntegrationApp(t, opts...)

// Create listeners for gRPC, WebSocket, and HTTP
grpcListener, err := net.Listen("tcp", "localhost:42069")
require.NoError(t, err, "failed to create gRPC listener")

wsListener, err := net.Listen("tcp", "localhost:6969")
require.NoError(t, err, "failed to create WebSocket listener")

httpListener, err := net.Listen("tcp", "localhost:42070")
require.NoError(t, err, "failed to create HTTP listener")

e2eApp := &E2EApp{
App: app,
grpcListener: grpcListener,
wsListener: wsListener,
httpListener: httpListener,
wsConnections: make(map[*websocket.Conn]map[string]struct{}),
wsUpgrader: websocket.Upgrader{},
blockEventChan: make(chan *coretypes.ResultEvent, 1),
}

// Initialize and start gRPC server
e2eApp.grpcServer = newGRPCServer(e2eApp, t)
go func() {
if err := e2eApp.grpcServer.Serve(grpcListener); err != nil {
panic(err)
}
}()

// Initialize and start WebSocket server
e2eApp.wsServer = newWebSocketServer(e2eApp)
go func() {
if err := e2eApp.wsServer.Serve(wsListener); err != nil && err != http.ErrServerClosed {
panic(err)
}
}()

// Initialize and start HTTP server
mux := http.NewServeMux()
mux.HandleFunc("/", e2eApp.handleHTTP)
e2eApp.httpServer = &http.Server{Handler: mux}
go func() {
if err := e2eApp.httpServer.Serve(httpListener); err != nil && err != http.ErrServerClosed {
panic(err)
}
}()

// Start event handling
go e2eApp.handleBlockEvents(t)

return e2eApp
}

// Close gracefully shuts down the E2EApp and its servers
func (app *E2EApp) Close() error {
app.grpcServer.GracefulStop()
if err := app.wsServer.Close(); err != nil {
return err
}
if err := app.httpServer.Close(); err != nil {
return err
}
close(app.blockEventChan)
return nil
}

// GetClientConn returns a gRPC client connection to the E2EApp's gRPC server
func (app *E2EApp) GetClientConn(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(

Check failure on line 111 in testutil/e2e/app.go

View workflow job for this annotation

GitHub Actions / go-test

SA1019: grpc.DialContext is deprecated: use NewClient instead. Will be supported throughout 1.x. (staticcheck)
ctx,
app.grpcListener.Addr().String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
}

// GetWSEndpoint returns the WebSocket endpoint URL
func (app *E2EApp) GetWSEndpoint() string {
return "ws://" + app.wsListener.Addr().String() + "/websocket"
}

// handleHTTP handles incoming HTTP requests by responding with RPCResponse
func (app *E2EApp) handleHTTP(w http.ResponseWriter, r *http.Request) {
var req rpctypes.RPCRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// Process the request - for now just return a basic response
// TODO_IMPROVE: Implement proper CometBFT RPC endpoint handling
response := rpctypes.RPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Result: json.RawMessage(`{}`),
}

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)

Check failure on line 140 in testutil/e2e/app.go

View workflow job for this annotation

GitHub Actions / go-test

Error return value of `(*encoding/json.Encoder).Encode` is not checked (errcheck)
}
81 changes: 81 additions & 0 deletions testutil/e2e/app_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package e2e

import (
"testing"

"cosmossdk.io/depinject"
"cosmossdk.io/math"
sdkclient "github.com/cosmos/cosmos-sdk/client"
cosmostx "github.com/cosmos/cosmos-sdk/client/tx"
"github.com/cosmos/cosmos-sdk/crypto/hd"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
cosmostypes "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/app/volatile"
"github.com/pokt-network/poktroll/pkg/client/block"
"github.com/pokt-network/poktroll/pkg/client/events"
"github.com/pokt-network/poktroll/pkg/client/query"
"github.com/pokt-network/poktroll/pkg/client/tx"
txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types"
"github.com/pokt-network/poktroll/testutil/testclient"
gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types"
)

func TestNewE2EApp(t *testing.T) {
app := NewE2EApp(t)

blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070")
require.NoError(t, err)

deps := depinject.Supply(app.QueryHelper(), blockQueryClient)

sharedQueryClient, err := query.NewSharedQuerier(deps)
require.NoError(t, err)

sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx())
require.NoError(t, err)

t.Logf("shared params: %+v", sharedParams)

eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:6969/websocket")
deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient))
blockClient, err := block.NewBlockClient(app.GetSdkCtx(), deps)
require.NoError(t, err)

keyRing := keyring.NewInMemory(app.GetCodec())
// TODO: add the gateway2 key...
_, err = keyRing.NewAccount(
"gateway2",
"suffer wet jelly furnace cousin flip layer render finish frequent pledge feature economy wink like water disease final erase goat include apple state furnace",
"",
cosmostypes.FullFundraiserPath,
hd.Secp256k1,
)
require.NoError(t, err)

flagSet := testclient.NewLocalnetFlagSet(t)
clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing)

txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet)
require.NoError(t, err)

deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory))
txContext, err := tx.NewTxContext(deps)
require.NoError(t, err)

deps = depinject.Configs(deps, depinject.Supply(blockClient, txContext))
txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("gateway2"))
require.NoError(t, err)

eitherErr := txClient.SignAndBroadcast(
app.GetSdkCtx(),
gatewaytypes.NewMsgStakeGateway(
"pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz",
cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewInt(100000000)),
),
)
err, errCh := eitherErr.SyncOrAsyncError()
require.NoError(t, err)
require.NoError(t, <-errCh)
}
96 changes: 96 additions & 0 deletions testutil/e2e/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package e2e

import (
"context"
"fmt"
"strings"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"

"github.com/cosmos/cosmos-sdk/baseapp"
sdk "github.com/cosmos/cosmos-sdk/types"
)

// newGRPCServer creates and configures a new gRPC server for the E2EApp
func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server {
grpcServer := grpc.NewServer()
reflection.Register(grpcServer)

forwarder := &grpcForwarderServer{
queryHelper: app.QueryHelper(),
app: app,
t: t,
}

grpcServer.RegisterService(&grpc.ServiceDesc{
ServiceName: "cosmos.Service",
HandlerType: (*interface{})(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{},
Metadata: "",
}, forwarder)

return grpcServer
}

// grpcForwarderServer implements a generic gRPC service that forwards all queries
// to the queryHelper and messages to the app
type grpcForwarderServer struct {
queryHelper *baseapp.QueryServiceTestHelper
app *E2EApp
t *testing.T
}

// Invoke implements the grpc.Server interface and forwards all requests appropriately
func (s *grpcForwarderServer) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error {
// Determine if this is a query or message based on the method name
if isQuery(method) {
return s.queryHelper.Invoke(ctx, method, args, reply)
}

// If it's not a query, treat it as a message
msg, ok := args.(sdk.Msg)
if !ok {
return fmt.Errorf("expected sdk.Msg, got %T", args)
}

// Run the message through the app
msgRes, err := s.app.RunMsg(s.t, msg)
if err != nil {
return err
}

// Type assert the reply as a proto.Message
protoReply, ok := reply.(proto.Message)
if !ok {
return fmt.Errorf("expected proto.Message, got %T", reply)
}

// Type assert the response as a proto.Message
protoRes, ok := msgRes.(proto.Message)
if !ok {
return fmt.Errorf("expected proto.Message response, got %T", msgRes)
}

// Marshal the response to bytes
resBz, err := proto.Marshal(protoRes)
if err != nil {
return fmt.Errorf("failed to marshal response: %w", err)
}

// Unmarshal into the reply
return proto.Unmarshal(resBz, protoReply)
}

// NewStream implements the grpc.Server interface but is not used in this implementation
func (s *grpcForwarderServer) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming is not supported")
}

// isQuery returns true if the method name indicates this is a query request
func isQuery(method string) bool {
return strings.Contains(method, ".Query/")
}
Loading
Loading