Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPL-264] Ensure a default timeout is provided for all outgoing requests #15644

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions core/capabilities/compute/compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,18 +318,13 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq
headersReq[k] = v.String()
}

payloadBytes, err := json.Marshal(ghcapabilities.Request{
resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
URL: req.Url,
Method: req.Method,
Headers: headersReq,
Body: req.Body,
TimeoutMs: req.TimeoutMs,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

resp, err := c.outgoingConnectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return nil, err
}
Expand Down
27 changes: 24 additions & 3 deletions core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sort"
"sync"
"time"

"github.com/pkg/errors"

Expand All @@ -17,6 +18,10 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

const (
defaultFetchTimeoutMs = 20_000
)

var _ connector.GatewayConnectorHandler = &OutgoingConnectorHandler{}

type OutgoingConnectorHandler struct {
Expand Down Expand Up @@ -51,8 +56,24 @@ func NewOutgoingConnectorHandler(gc connector.GatewayConnector, config ServiceCo
}

// HandleSingleNodeRequest sends a request to first available gateway node and blocks until response is received
// TODO: handle retries and timeouts
func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, payload []byte) (*api.Message, error) {
// TODO: handle retries
func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context, messageID string, req capabilities.Request) (*api.Message, error) {
// set default timeout if not provided for all outgoing requests
if req.TimeoutMs == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agparadiso I know I didn't ask you to do this originally, but could we also create a subcontext with this timeout plus some margin added to it?

Atm we don't get a response from the gateway if we time out which causes some small bugs; if we add this, the context would fire on line 101 and we'd at least get a response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added 👍🏼

req.TimeoutMs = defaultFetchTimeoutMs
}

// Create a subcontext with the timeout plus some margin for the gateway to process the request
timeoutDuration := time.Duration(req.TimeoutMs) * time.Millisecond
margin := 100 * time.Millisecond
ctx, cancel := context.WithTimeout(ctx, timeoutDuration+margin)
defer cancel()

payload, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

ch := make(chan *api.Message, 1)
c.responseChsMu.Lock()
c.responseChs[messageID] = ch
Expand All @@ -75,7 +96,7 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
}
sort.Strings(gatewayIDs)

err := c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
err = c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
if err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}
Expand Down
132 changes: 132 additions & 0 deletions core/capabilities/webapi/outgoing_connector_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package webapi

import (
"context"
"encoding/json"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink/v2/core/logger"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

func TestHandleSingleNodeRequest(t *testing.T) {
t.Run("OK-timeout_is_not_specify_default_timeout_is_expected", func(t *testing.T) {
ctx := tests.Context(t)
log := logger.TestLogger(t)
connector := gcmocks.NewGatewayConnector(t)
var defaultConfig = ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}
connectorHandler, err := NewOutgoingConnectorHandler(connector, defaultConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the default timeout
req := ghcapabilities.Request{
URL: testURL,
TimeoutMs: defaultFetchTimeoutMs,
}
payload, err := json.Marshal(req)
require.NoError(t, err)

expectedBody := &api.MessageBody{
MessageId: msgID,
DonId: connector.DonID(),
Method: ghcapabilities.MethodComputeAction,
Payload: payload,
}

// expect the request body to contain the default timeout
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", expectedBody).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResponse(t, msgID))
}).Return(nil).Times(1)

_, err = connectorHandler.HandleSingleNodeRequest(ctx, msgID, ghcapabilities.Request{
URL: testURL,
})
require.NoError(t, err)
})

t.Run("OK-timeout_is_specified", func(t *testing.T) {
ctx := tests.Context(t)
log := logger.TestLogger(t)
connector := gcmocks.NewGatewayConnector(t)
var defaultConfig = ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}
connectorHandler, err := NewOutgoingConnectorHandler(connector, defaultConfig, ghcapabilities.MethodComputeAction, log)
require.NoError(t, err)

msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the defined timeout
req := ghcapabilities.Request{
URL: testURL,
TimeoutMs: 40000,
}
payload, err := json.Marshal(req)
require.NoError(t, err)

expectedBody := &api.MessageBody{
MessageId: msgID,
DonId: connector.DonID(),
Method: ghcapabilities.MethodComputeAction,
Payload: payload,
}

// expect the request body to contain the defined timeout
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", expectedBody).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResponse(t, msgID))
}).Return(nil).Times(1)

_, err = connectorHandler.HandleSingleNodeRequest(ctx, msgID, ghcapabilities.Request{
URL: testURL,
TimeoutMs: 40000,
})
require.NoError(t, err)
})
}

func gatewayResponse(t *testing.T, msgID string) *api.Message {
headers := map[string]string{"Content-Type": "application/json"}
body := []byte("response body")
responsePayload, err := json.Marshal(ghcapabilities.Response{
StatusCode: 200,
Headers: headers,
Body: body,
ExecutionError: false,
})
require.NoError(t, err)
return &api.Message{
Body: api.MessageBody{
MessageId: msgID,
Method: ghcapabilities.MethodWebAPITarget,
Payload: responsePayload,
},
}
}
7 changes: 1 addition & 6 deletions core/capabilities/webapi/target/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,13 @@ func (c *Capability) Execute(ctx context.Context, req capabilities.CapabilityReq
return capabilities.CapabilityResponse{}, err
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}

// Default to SingleNode delivery mode
deliveryMode := defaultIfNil(workflowCfg.DeliveryMode, webapi.SingleNode)

switch deliveryMode {
case webapi.SingleNode:
// blocking call to handle single node request. waits for response from gateway
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
resp, err := c.connectorHandler.HandleSingleNodeRequest(ctx, messageID, payload)
if err != nil {
return capabilities.CapabilityResponse{}, err
}
Expand Down
18 changes: 4 additions & 14 deletions core/services/workflows/syncer/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

const (
defaultFetchTimeoutMs = 20_000
)

type FetcherService struct {
services.StateMachine
lggr logger.Logger
Expand Down Expand Up @@ -88,17 +84,11 @@ func hash(url string) string {
}

func (s *FetcherService) Fetch(ctx context.Context, url string) ([]byte, error) {
payloadBytes, err := json.Marshal(ghcapabilities.Request{
URL: url,
Method: http.MethodGet,
TimeoutMs: defaultFetchTimeoutMs,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, hash(url)}, "/")
resp, err := s.och.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
resp, err := s.och.HandleSingleNodeRequest(ctx, messageID, ghcapabilities.Request{
URL: url,
Method: http.MethodGet,
})
if err != nil {
return nil, err
}
Expand Down
Loading