diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index 316e4f00eea..2ba5daefaa6 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -318,18 +318,13 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq headersReq[k] = v.String() } - payloadBytes, err := json.Marshal(ghcapabilities.Request{ + resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{ URL: req.Url, Method: req.Method, Headers: headersReq, Body: req.Body, TimeoutMs: req.TimeoutMs, }) - if err != nil { - return nil, fmt.Errorf("failed to marshal fetch request: %w", err) - } - - resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes) if err != nil { return nil, err } diff --git a/core/capabilities/webapi/outgoing_connector_handler.go b/core/capabilities/webapi/outgoing_connector_handler.go index d18ee971d1a..a41d3dad2d5 100644 --- a/core/capabilities/webapi/outgoing_connector_handler.go +++ b/core/capabilities/webapi/outgoing_connector_handler.go @@ -17,6 +17,10 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" ) +const ( + defaultFetchTimeoutMs = 20_000 +) + var _ connector.GatewayConnectorHandler = &OutgoingConnectorHandler{} type OutgoingConnectorHandler struct { @@ -51,8 +55,18 @@ func NewOutgoingConnectorHandler(gc connector.GatewayConnector, config ServiceCo } // HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received -// TODO: handle retries and timeouts -func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, payload []byte) (*api.Message, error) { +// TODO: handle retries +func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, req capabilities.Request) (*api.Message, error) { + // set default timeout if not provided for all outgoing requests + if req.TimeoutMs == 0 { + req.TimeoutMs = defaultFetchTimeoutMs + } + + payload, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal fetch request: %w", err) + } + ch := make(chan *api.Message, 1) c.responseChsMu.Lock() c.responseChs[messageID] = ch @@ -75,7 +89,7 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, } sort.Strings(gatewayIDs) - err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body) + err = c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body) if err != nil { return nil, errors.Wrap(err, "failed to send request to gateway") } diff --git a/core/capabilities/webapi/outgoing_connector_handler_test.go b/core/capabilities/webapi/outgoing_connector_handler_test.go new file mode 100644 index 00000000000..46489ac07ef --- /dev/null +++ b/core/capabilities/webapi/outgoing_connector_handler_test.go @@ -0,0 +1,131 @@ +package webapi + +import ( + "context" + "encoding/json" + "testing" + + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" +) + +func TestHandleSingleNodeRequest(t *testing.T) { + t.Run("OK-timeout_is_not_specify_default_timeout_is_expected", func(t *testing.T) { + ctx := tests.Context(t) + log := logger.TestLogger(t) + connector := gcmocks.NewGatewayConnector(t) + var defaultConfig = ServiceConfig{ + RateLimiter: common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + connectorHandler, err := NewOutgoingConnectorHandler(connector, defaultConfig, ghcapabilities.MethodComputeAction, log) + require.NoError(t, err) + + msgID := "msgID" + testURL := "http://localhost:8080" + connector.EXPECT().DonID().Return("donID") + connector.EXPECT().GatewayIDs().Return([]string{"gateway1"}) + + // build the expected body with the default timeout + req := ghcapabilities.Request{ + URL: testURL, + TimeoutMs: defaultFetchTimeoutMs, + } + payload, err := json.Marshal(req) + require.NoError(t, err) + + expectedBody := &api.MessageBody{ + MessageId: msgID, + DonId: connector.DonID(), + Method: ghcapabilities.MethodComputeAction, + Payload: payload, + } + + // expect the request body to contain the default timeout + connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", expectedBody).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { + connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResponse(t, msgID)) + }).Return(nil).Times(1) + + _, err = connectorHandler.HandleSingleNodeRequest(ctx, msgID, ghcapabilities.Request{ + URL: testURL, + }) + require.NoError(t, err) + }) + + t.Run("OK-timeout_is_specified", func(t *testing.T) { + ctx := tests.Context(t) + log := logger.TestLogger(t) + connector := gcmocks.NewGatewayConnector(t) + var defaultConfig = ServiceConfig{ + RateLimiter: common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + connectorHandler, err := NewOutgoingConnectorHandler(connector, defaultConfig, ghcapabilities.MethodComputeAction, log) + require.NoError(t, err) + + msgID := "msgID" + testURL := "http://localhost:8080" + connector.EXPECT().DonID().Return("donID") + connector.EXPECT().GatewayIDs().Return([]string{"gateway1"}) + + // build the expected body with the defined timeout + req := ghcapabilities.Request{ + URL: testURL, + TimeoutMs: 40000, + } + payload, err := json.Marshal(req) + require.NoError(t, err) + + expectedBody := &api.MessageBody{ + MessageId: msgID, + DonId: connector.DonID(), + Method: ghcapabilities.MethodComputeAction, + Payload: payload, + } + + // expect the request body to contain the defined timeout + connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", expectedBody).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { + connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResponse(t, msgID)) + }).Return(nil).Times(1) + + _, err = connectorHandler.HandleSingleNodeRequest(ctx, msgID, ghcapabilities.Request{ + URL: testURL, + TimeoutMs: 40000, + }) + require.NoError(t, err) + }) +} + +func gatewayResponse(t *testing.T, msgID string) *api.Message { + headers := map[string]string{"Content-Type": "application/json"} + body := []byte("response body") + responsePayload, err := json.Marshal(ghcapabilities.Response{ + StatusCode: 200, + Headers: headers, + Body: body, + ExecutionError: false, + }) + require.NoError(t, err) + return &api.Message{ + Body: api.MessageBody{ + MessageId: msgID, + Method: ghcapabilities.MethodWebAPITarget, + Payload: responsePayload, + }, + } +} diff --git a/core/capabilities/webapi/target/target.go b/core/capabilities/webapi/target/target.go index b211e0fe837..4934ab382d5 100644 --- a/core/capabilities/webapi/target/target.go +++ b/core/capabilities/webapi/target/target.go @@ -135,18 +135,13 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq return capabilities.CapabilityResponse{}, err } - payloadBytes, err := json.Marshal(payload) - if err != nil { - return capabilities.CapabilityResponse{}, err - } - // Default to SingleNode delivery mode deliveryMode := defaultIfNil(workflowCfg.DeliveryMode, webapi.SingleNode) switch deliveryMode { case webapi.SingleNode: // blocking call to handle single node request. waits for response from gateway - resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes) + resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payload) if err != nil { return capabilities.CapabilityResponse{}, err } diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index fdd0134909d..6a80739bbfe 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -18,10 +18,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" ) -const ( - defaultFetchTimeoutMs = 20_000 -) - type FetcherService struct { services.StateMachine lggr logger.Logger @@ -88,17 +84,11 @@ func hash(url string) string { } func (s *FetcherService) Fetch(ctx context.Context, url string) ([]byte, error) { - payloadBytes, err := json.Marshal(ghcapabilities.Request{ - URL: url, - Method: http.MethodGet, - TimeoutMs: defaultFetchTimeoutMs, - }) - if err != nil { - return nil, fmt.Errorf("failed to marshal fetch request: %w", err) - } - messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, hash(url)}, "/") - resp, err := s.och.HandleSingleNodeRequest(ctx, messageID, payloadBytes) + resp, err := s.och.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{ + URL: url, + Method: http.MethodGet, + }) if err != nil { return nil, err }