Skip to content

Commit

Permalink
Merge pull request #1015 from cserwen/orderly_consume_github
Browse files Browse the repository at this point in the history
[ISSUE #994] Fix: unlock all queues when consumer shutdown in orderly model
  • Loading branch information
tiger lee authored Mar 20, 2023
2 parents 56d1a7d + 9bca88d commit 3b8b238
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
12 changes: 6 additions & 6 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,12 +631,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
}
} else {
response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
rlog.LogKeyBroker: addr,
rlog.LogKeyUnderlayError: err,
})
if response.Code != internal.ResSuccess {
// TODO error
if err != nil || response == nil || response.Code != internal.ResSuccess {
rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
rlog.LogKeyBroker: addr,
rlog.LogKeyUnderlayError: err,
"response": response,
})
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion consumer/push_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ func (pc *pushConsumer) Shutdown() error {
pc.option.TraceDispatcher.Close()
}
close(pc.done)

if pc.consumeOrderly && pc.model == Clustering {
pc.unlockAll(false)
}
pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()
})
Expand Down

0 comments on commit 3b8b238

Please sign in to comment.