diff --git a/internal/app/triggermessageprocessor.go b/internal/app/triggermessageprocessor.go index 184f77f68..84415e43e 100644 --- a/internal/app/triggermessageprocessor.go +++ b/internal/app/triggermessageprocessor.go @@ -113,12 +113,19 @@ func (mp *triggerMessageProcessor) MessageReceived(ctx interfaces.AppFunctionCon lc.Debugf("trigger sending message to pipeline %s (%s)", p.Id, envelope.CorrelationID) - if msgErr := mp.bnd.ProcessMessage(ctx.Clone().(*appfunction.Context), envelope, p); msgErr != nil { + childCtx, ok := ctx.Clone().(*appfunction.Context) + + if !ok { + errCollector(fmt.Errorf("context received was not *appfunction.Context (%T)", childCtx)) + return + } + + if msgErr := mp.bnd.ProcessMessage(childCtx, envelope, p); msgErr != nil { lc.Errorf("message error in pipeline %s (%s): %s", p.Id, envelope.CorrelationID, msgErr.Err.Error()) errCollector(msgErr.Err) } else { if responseHandler != nil { - if outputErr := responseHandler(ctx, p); outputErr != nil { + if outputErr := responseHandler(childCtx, p); outputErr != nil { lc.Errorf("failed to process output for message '%s' on pipeline %s: %s", ctx.CorrelationID(), p.Id, outputErr.Error()) errCollector(outputErr) return diff --git a/internal/app/triggermessageprocessor_test.go b/internal/app/triggermessageprocessor_test.go index 6ed08979a..2e883f32a 100644 --- a/internal/app/triggermessageprocessor_test.go +++ b/internal/app/triggermessageprocessor_test.go @@ -140,7 +140,8 @@ func Test_triggerMessageProcessor_MessageReceived(t *testing.T) { if !tt.nilRh { rh = func(ctx interfaces.AppFunctionContext, pipeline *interfaces.FunctionPipeline) error { - assert.Equal(t, tt.args.ctx, ctx) + assert.NotEqual(t, tt.args.ctx, ctx) + assert.Equal(t, tt.args.ctx.CorrelationID(), ctx.CorrelationID()) //hmm return nil } }