Skip to content

Commit

Permalink
fix: ensure a default timeout is provided for all outgoing requests
Browse files Browse the repository at this point in the history
  • Loading branch information
agparadiso committed Dec 12, 2024
1 parent 3e74cb9 commit 827b0d0
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 29 deletions.
7 changes: 1 addition & 6 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,18 +318,13 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
headersReq[k] = v.String()
}

payloadBytes, err := json.Marshal(ghcapabilities.Request{
resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
URL: req.Url,
Method: req.Method,
Headers: headersReq,
Body: req.Body,
TimeoutMs: req.TimeoutMs,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return nil, err
}
Expand Down
20 changes: 17 additions & 3 deletions core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

const (
defaultFetchTimeoutMs = 20_000
)

var _ connector.GatewayConnectorHandler = &OutgoingConnectorHandler{}

type OutgoingConnectorHandler struct {
Expand Down Expand Up @@ -51,8 +55,18 @@ func NewOutgoingConnectorHandler(gc connector.GatewayConnector, config ServiceCo
}

// HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received
// TODO: handle retries and timeouts
func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, payload []byte) (*api.Message, error) {
// TODO: handle retries
func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, req capabilities.Request) (*api.Message, error) {
// set default timeout if not provided for all outgoing requests
if req.TimeoutMs == 0 {
req.TimeoutMs = defaultFetchTimeoutMs
}

payload, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

ch := make(chan *api.Message, 1)
c.responseChsMu.Lock()
c.responseChs[messageID] = ch
Expand All @@ -75,7 +89,7 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
}
sort.Strings(gatewayIDs)

err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
err = c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
if err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}
Expand Down
131 changes: 131 additions & 0 deletions core/capabilities/webapi/outgoing_connector_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package webapi

import (
"context"
"encoding/json"
"testing"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

func TestHandleSingleNodeRequest(t *testing.T) {
t.Run("OK-timeout_is_not_specify_default_timeout_is_expected", func(t *testing.T) {
ctx := tests.Context(t)
log := logger.TestLogger(t)
connector := gcmocks.NewGatewayConnector(t)
var defaultConfig = ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}
connectorHandler, err := NewOutgoingConnectorHandler(connector, defaultConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the default timeout
req := ghcapabilities.Request{
URL: testURL,
TimeoutMs: defaultFetchTimeoutMs,
}
payload, err := json.Marshal(req)
require.NoError(t, err)

expectedBody := &api.MessageBody{
MessageId: msgID,
DonId: connector.DonID(),
Method: ghcapabilities.MethodComputeAction,
Payload: payload,
}

// expect the request body to contain the default timeout
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", expectedBody).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResponse(t, msgID))
}).Return(nil).Times(1)

_, err = connectorHandler.HandleSingleNodeRequest(ctx, msgID, ghcapabilities.Request{
URL: testURL,
})
require.NoError(t, err)
})

t.Run("OK-timeout_is_specified", func(t *testing.T) {
ctx := tests.Context(t)
log := logger.TestLogger(t)
connector := gcmocks.NewGatewayConnector(t)
var defaultConfig = ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}
connectorHandler, err := NewOutgoingConnectorHandler(connector, defaultConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the defined timeout
req := ghcapabilities.Request{
URL: testURL,
TimeoutMs: 40000,
}
payload, err := json.Marshal(req)
require.NoError(t, err)

expectedBody := &api.MessageBody{
MessageId: msgID,
DonId: connector.DonID(),
Method: ghcapabilities.MethodComputeAction,
Payload: payload,
}

// expect the request body to contain the defined timeout
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", expectedBody).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResponse(t, msgID))
}).Return(nil).Times(1)

_, err = connectorHandler.HandleSingleNodeRequest(ctx, msgID, ghcapabilities.Request{
URL: testURL,
TimeoutMs: 40000,
})
require.NoError(t, err)
})
}

func gatewayResponse(t *testing.T, msgID string) *api.Message {
headers := map[string]string{"Content-Type": "application/json"}
body := []byte("response body")
responsePayload, err := json.Marshal(ghcapabilities.Response{
StatusCode: 200,
Headers: headers,
Body: body,
ExecutionError: false,
})
require.NoError(t, err)
return &api.Message{
Body: api.MessageBody{
MessageId: msgID,
Method: ghcapabilities.MethodWebAPITarget,
Payload: responsePayload,
},
}
}
7 changes: 1 addition & 6 deletions core/capabilities/webapi/target/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,13 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq
return capabilities.CapabilityResponse{}, err
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

// Default to SingleNode delivery mode
deliveryMode := defaultIfNil(workflowCfg.DeliveryMode, webapi.SingleNode)

switch deliveryMode {
case webapi.SingleNode:
// blocking call to handle single node request. waits for response from gateway
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand Down
18 changes: 4 additions & 14 deletions core/services/workflows/syncer/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

const (
defaultFetchTimeoutMs = 20_000
)

type FetcherService struct {
services.StateMachine
lggr logger.Logger
Expand Down Expand Up @@ -88,17 +84,11 @@ func hash(url string) string {
}

func (s *FetcherService) Fetch(ctx context.Context, url string) ([]byte, error) {
payloadBytes, err := json.Marshal(ghcapabilities.Request{
URL: url,
Method: http.MethodGet,
TimeoutMs: defaultFetchTimeoutMs,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, hash(url)}, "/")
resp, err := s.och.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
resp, err := s.och.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
URL: url,
Method: http.MethodGet,
})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 827b0d0

Please sign in to comment.