Skip to content

Commit

Permalink
[CAPPL-324] Fix panic when fetching the binary
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Dec 3, 2024
1 parent 52f7e36 commit ce1bdb8
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 63 deletions.
2 changes: 1 addition & 1 deletion core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *OutgoingConnectorHandler) Name() string {

func validMethod(method string) bool {
switch method {
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction:
case capabilities.MethodWebAPITarget, capabilities.MethodComputeAction, capabilities.MethodWorkflowSyncer:
return true
default:
return false
Expand Down
26 changes: 3 additions & 23 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand All @@ -50,8 +49,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/feeds"
"github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway"
capabilities2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
common2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
"github.com/smartcontractkit/chainlink/v2/core/services/headreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/keeper"
Expand Down Expand Up @@ -303,27 +300,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
return nil, fmt.Errorf("expected 1 key, got %d", len(keys))
}

connector := gatewayConnectorWrapper.GetGatewayConnector()
outgoingConnectorLggr := globalLogger.Named("WorkflowSyncer")

webAPIConfig := webapi.ServiceConfig{
RateLimiter: common2.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector,
webAPIConfig,
capabilities2.MethodWorkflowSyncer, outgoingConnectorLggr)
if err != nil {
return nil, fmt.Errorf("could not create outgoing connector handler: %w", err)
}
fetcher := syncer.NewFetcherService(globalLogger, gatewayConnectorWrapper)

eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger),
syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
fetcher.Fetch, workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0])

loader := syncer.NewWorkflowRegistryContractLoader(globalLogger, cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
Expand All @@ -338,7 +318,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
QueryCount: 100,
}, eventHandler, loader, workflowDonNotifier)

srvcs = append(srvcs, outgoingConnectorHandler, wfSyncer)
srvcs = append(srvcs, fetcher, wfSyncer)
}
}
} else {
Expand Down
137 changes: 137 additions & 0 deletions core/services/gateway/connector/mocks/gateway_connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 77 additions & 20 deletions core/services/workflows/syncer/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,93 @@ import (
"net/http"
"strings"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

func NewFetcherFunc(
lggr logger.Logger,
och *webapi.OutgoingConnectorHandler) FetcherFunc {
return func(ctx context.Context, url string) ([]byte, error) {
payloadBytes, err := json.Marshal(ghcapabilities.Request{
URL: url,
Method: http.MethodGet,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}
type FetcherService struct {
services.StateMachine
lggr logger.Logger
och *webapi.OutgoingConnectorHandler
wrapper gatewayConnector
}

messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/")
resp, err := och.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return nil, err
type gatewayConnector interface {
GetGatewayConnector() connector.GatewayConnector
}

func NewFetcherService(lggr logger.Logger, wrapper gatewayConnector) *FetcherService {
return &FetcherService{
lggr: lggr.Named("FetcherService"),
wrapper: wrapper,
}
}

func (s *FetcherService) Start(ctx context.Context) error {
return s.StartOnce("FetcherService", func() error {
connector := s.wrapper.GetGatewayConnector()

outgoingConnectorLggr := s.lggr.Named("WorkflowSyncer")

webAPIConfig := webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

lggr.Debugw("received gateway response", "resp", resp)
var payload ghcapabilities.Response
err = json.Unmarshal(resp.Body.Payload, &payload)
och, err := webapi.NewOutgoingConnectorHandler(connector,
webAPIConfig,
capabilities.MethodWorkflowSyncer, outgoingConnectorLggr)
if err != nil {
return nil, err
return fmt.Errorf("could not create outgoing connector handler: %w", err)
}

return payload.Body, nil
s.och = och
return och.Start(ctx)
})
}

func (s *FetcherService) Close() error {
return nil
}

func (s *FetcherService) HealthReport() map[string]error {
return map[string]error{s.Name(): nil}
}

func (s *FetcherService) Name() string {
return s.lggr.Name()
}

func (s *FetcherService) Fetch(ctx context.Context, url string) ([]byte, error) {
payloadBytes, err := json.Marshal(ghcapabilities.Request{
URL: url,
Method: http.MethodGet,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal fetch request: %w", err)
}

messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/")
resp, err := s.och.HandleSingleNodeRequest(ctx, messageID, payloadBytes)
if err != nil {
return nil, err
}

s.lggr.Debugw("received gateway response", "resp", resp)
var payload ghcapabilities.Response
err = json.Unmarshal(resp.Body.Payload, &payload)
if err != nil {
return nil, err
}

return payload.Body, nil
}
38 changes: 20 additions & 18 deletions core/services/workflows/syncer/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,48 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common"
)

func TestNewFetcherFunc(t *testing.T) {
type wrapper struct {
c connector.GatewayConnector
}

func (w *wrapper) GetGatewayConnector() connector.GatewayConnector {
return w.c
}

func TestNewFetcherService(t *testing.T) {
ctx := context.Background()
lggr := logger.TestLogger(t)

config := webapi.ServiceConfig{
RateLimiter: common.RateLimiterConfig{
GlobalRPS: 100.0,
GlobalBurst: 100,
PerSenderRPS: 100.0,
PerSenderBurst: 100,
},
}

connector := gcmocks.NewGatewayConnector(t)
och, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, lggr)
require.NoError(t, err)
wrapper := &wrapper{c: connector}

url := "http://example.com"

msgID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/")

t.Run("OK-valid_request", func(t *testing.T) {
connector.EXPECT().AddHandler([]string{capabilities.MethodWorkflowSyncer}, mock.Anything).Return(nil)

fetcher := NewFetcherService(lggr, wrapper)
fetcher.Start(ctx)
defer fetcher.Close()

gatewayResp := gatewayResponse(t, msgID)
connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) {
och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
fetcher.och.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Return(nil).Times(1)
connector.EXPECT().DonID().Return("don-id")
connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

fetcher := NewFetcherFunc(lggr, och)

payload, err := fetcher(ctx, url)
payload, err := fetcher.Fetch(ctx, url)
require.NoError(t, err)

expectedPayload := []byte("response body")
Expand Down
Loading

0 comments on commit ce1bdb8

Please sign in to comment.