Skip to content

Commit

Permalink
Increase default timeout of remote Executable requests
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk committed Dec 10, 2024
1 parent 45898fc commit b89c452
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
3 changes: 2 additions & 1 deletion core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability
}

var (
defaultTargetRequestTimeout = time.Minute
// TODO: make this configurable
defaultTargetRequestTimeout = 10 * time.Minute
)

func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don registrysyncer.DON, state *registrysyncer.LocalRegistry, remoteWorkflowDONs []registrysyncer.DON) error {
Expand Down
10 changes: 8 additions & 2 deletions core/capabilities/remote/executable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var _ commoncap.ExecutableCapability = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}

const expiryCheckInterval = 30 * time.Second

func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration, lggr logger.Logger) *client {
return &client{
Expand Down Expand Up @@ -98,7 +100,11 @@ func (c *client) checkDispatcherReady() {
}

func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(c.requestTimeout)
tickerInterval := expiryCheckInterval
if c.requestTimeout < tickerInterval {
tickerInterval = c.requestTimeout
}
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {
select {
Expand All @@ -116,7 +122,7 @@ func (c *client) expireRequests() {

for messageID, req := range c.requestIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
req.Cancel(errors.New("request expired by executable client"))
delete(c.requestIDToCallerRequest, messageID)
}

Expand Down
8 changes: 6 additions & 2 deletions core/capabilities/remote/executable/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (r *server) Start(ctx context.Context) error {
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(r.requestTimeout)
tickerInterval := expiryCheckInterval
if r.requestTimeout < tickerInterval {
tickerInterval = r.requestTimeout
}
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
r.lggr.Info("executable capability server started")
for {
Expand Down Expand Up @@ -118,7 +122,7 @@ func (r *server) expireRequests() {

for requestID, executeReq := range r.requestIDToRequest {
if executeReq.request.Expired() {
err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired")
err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired by executable server")
if err != nil {
r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err)
}
Expand Down

0 comments on commit b89c452

Please sign in to comment.