diff --git a/pubsub/common.go b/pubsub/common.go index a4fc141bb5..3f9e310591 100644 --- a/pubsub/common.go +++ b/pubsub/common.go @@ -11,6 +11,7 @@ import ( ) func ResultKeyFor(streamName, id string) string { return fmt.Sprintf("%s.%s", streamName, id) } +func ErrorKeyFor(streamName, id string) string { return fmt.Sprintf("%s.%s.error", streamName, id) } // CreateStream tries to create stream with given name, if it already exists // does not return an error. diff --git a/pubsub/consumer.go b/pubsub/consumer.go index 3f28749473..7344eb2479 100644 --- a/pubsub/consumer.go +++ b/pubsub/consumer.go @@ -36,6 +36,8 @@ var TestConsumerConfig = ConsumerConfig{ IdletimeToAutoclaim: 30 * time.Millisecond, } +var AlreadySetError = errors.New("redis key already set") + func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry") f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers") @@ -221,7 +223,10 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s resultKey := ResultKeyFor(c.StreamName(), messageID) log.Debug("consumer: setting result", "cid", c.id, "msgIdInStream", messageID, "resultKeyInRedis", resultKey) acquired, err := c.client.SetNX(ctx, resultKey, resp, c.cfg.ResponseEntryTimeout).Result() - if err != nil || !acquired { + if !acquired && err == nil { + err = AlreadySetError + } + if err != nil { return fmt.Errorf("setting result for message with message-id in stream: %v, error: %w", messageID, err) } log.Debug("consumer: xack", "cid", c.id, "messageId", messageID) @@ -233,3 +238,27 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s } return nil } + +func (c *Consumer[Request, Response]) SetError(ctx context.Context, messageID string, error string) error { + resp, err := json.Marshal(error) + if err != nil { + return fmt.Errorf("marshaling result: %w", err) + } + errorKey := ErrorKeyFor(c.StreamName(), messageID) + log.Debug("consumer: setting error", "cid", c.id, "msgIdInStream", messageID, "errorKeyInRedis", errorKey) + acquired, err := c.client.SetNX(ctx, errorKey, resp, c.cfg.ResponseEntryTimeout).Result() + if !acquired && err == nil { + err = AlreadySetError + } + if err != nil { + return fmt.Errorf("setting error for message with message-id in stream: %v, error: %w", messageID, err) + } + log.Debug("consumer: xack", "cid", c.id, "messageId", messageID) + if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil { + return fmt.Errorf("acking message: %v, error: %w", messageID, err) + } + if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil { + return fmt.Errorf("deleting message: %v, error: %w", messageID, err) + } + return nil +} diff --git a/pubsub/producer.go b/pubsub/producer.go index 5aaca77aa7..b7e85048c8 100644 --- a/pubsub/producer.go +++ b/pubsub/producer.go @@ -142,6 +142,24 @@ func (p *Producer[Request, Response]) checkResponses(ctx context.Context) time.D return 0 } checked++ + // First check if there is an error for this promise + errorKey := ErrorKeyFor(p.redisStream, id) + errorResponse, err := p.client.Get(ctx, errorKey).Result() + if err != nil && !errors.Is(err, redis.Nil) { + log.Error("Error reading error in redis", "key", errorKey, "error", err) + continue + } + // If there is an error, then delete the error key and return the error + // There will always be and error even if the response is present, its just that the error will be empty + p.client.Del(ctx, errorKey) + if errorResponse != "" { + promise.ProduceError(errors.New(errorResponse)) + log.Error("error getting response", "error", errorResponse) + errored++ + delete(p.promises, id) + continue + } + // If there is no error, then check if there is a response resultKey := ResultKeyFor(p.redisStream, id) res, err := p.client.Get(ctx, resultKey).Result() if err != nil { diff --git a/validator/valnode/redis/consumer.go b/validator/valnode/redis/consumer.go index 93b3eddd3f..53275a8030 100644 --- a/validator/valnode/redis/consumer.go +++ b/validator/valnode/redis/consumer.go @@ -163,10 +163,19 @@ func (s *ValidationServer) Start(ctx_in context.Context) { res, err := valRun.Await(ctx) if err != nil { log.Error("Error validating", "request value", work.req.Value, "error", err) + err := s.consumers[work.moduleRoot].SetError(ctx, work.req.ID, err.Error()) work.req.Ack() + if err != nil { + log.Error("Error setting error for request", "id", work.req.ID, "error", err) + } } else { log.Debug("done work", "thread", i, "workid", work.req.ID) err := s.consumers[work.moduleRoot].SetResult(ctx, work.req.ID, res) + if err != nil { + log.Error("Error setting result for request", "id", work.req.ID, "result", res, "error", err) + } + // Set an empty error even if the result is set successfully, as the error key is always checked first. + err = s.consumers[work.moduleRoot].SetError(ctx, work.req.ID, "") // Even in error we close ackNotifier as there's no retry mechanism here and closing it will alow other consumers to autoclaim work.req.Ack() if err != nil {