From 23e35ea36d58a1ff8cd334d5218ab952483cffa4 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Wed, 20 Sep 2023 10:25:53 -0700 Subject: [PATCH] goroutine --- packages/relayer/processor/processor.go | 32 +++++++++++++------------ 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/packages/relayer/processor/processor.go b/packages/relayer/processor/processor.go index f9e2c2d2646..0128e92bbca 100644 --- a/packages/relayer/processor/processor.go +++ b/packages/relayer/processor/processor.go @@ -289,25 +289,27 @@ func (p *Processor) eventLoop(ctx context.Context) { case <-ctx.Done(): return case msg := <-p.msgCh: - err := p.processMessage(ctx, msg) - - if err != nil { - slog.Error("err processing message", "err", err.Error()) - - if errors.Is(err, errUnprocessable) { - if err := p.queue.Ack(ctx, msg); err != nil { - slog.Error("Err acking message", "err", err.Error()) + go func(msg queue.Message) { + err := p.processMessage(ctx, msg) + + if err != nil { + slog.Error("err processing message", "err", err.Error()) + + if errors.Is(err, errUnprocessable) { + if err := p.queue.Ack(ctx, msg); err != nil { + slog.Error("Err acking message", "err", err.Error()) + } + } else { + if err := p.queue.Nack(ctx, msg); err != nil { + slog.Error("Err nacking message", "err", err.Error()) + } } } else { - if err := p.queue.Nack(ctx, msg); err != nil { - slog.Error("Err nacking message", "err", err.Error()) + if err := p.queue.Ack(ctx, msg); err != nil { + slog.Error("Err acking message", "err", err.Error()) } } - } else { - if err := p.queue.Ack(ctx, msg); err != nil { - slog.Error("Err acking message", "err", err.Error()) - } - } + }(msg) } } }