From 2147eb81d5c981851fc239c5975ad816857ea5ae Mon Sep 17 00:00:00 2001 From: chenhui Date: Tue, 31 May 2022 15:40:59 +0800 Subject: [PATCH] Client may submit wrong offset when network instability --- consumer/consumer.go | 28 ++++++++++++---- consumer/consumer_test.go | 24 ++++++++----- consumer/mock_offset_store.go | 63 ++++++++++++++++++++++++----------- consumer/offset_store.go | 29 +++++++++++----- consumer/offset_store_test.go | 24 ++++++------- consumer/pull_consumer.go | 6 ++-- consumer/push_consumer.go | 17 +++++++--- 7 files changed, 130 insertions(+), 61 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index ed843cf1..6e53eafa 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -28,6 +28,7 @@ import ( "time" "github.com/apache/rocketmq-client-go/v2/errors" + jsoniter "github.com/json-iterator/go" "github.com/tidwall/gjson" @@ -699,8 +700,9 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv continue } dc.storage.remove(&mq) - nextOffset := dc.computePullFromWhere(&mq) - if nextOffset >= 0 { + nextOffset, err := dc.computePullFromWhereWithException(&mq) + + if nextOffset >= 0 && err == nil { _, exist := dc.processQueueTable.Load(mq) if exist { rlog.Debug("do defaultConsumer, mq already exist", map[string]interface{}{ @@ -741,12 +743,23 @@ func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQu return true } +// Deprecated: Use computePullFromWhereWithException instead. func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int64 { + result, _ := dc.computePullFromWhereWithException(mq) + return result +} + +func (dc *defaultConsumer) computePullFromWhereWithException(mq *primitive.MessageQueue) (int64, error) { if dc.cType == _PullConsume { - return 0 + return 0, nil } - var result = int64(-1) - lastOffset := dc.storage.read(mq, _ReadFromStore) + result := int64(-1) + lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore) + if err != nil { + // 这里 lastOffset = -1 + return lastOffset, err + } + if lastOffset >= 0 { result = lastOffset } else { @@ -803,7 +816,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int6 default: } } - return result + return result, nil } func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue, data *internal.SubscriptionData, @@ -950,7 +963,8 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er } func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 { - return dc.storage.read(mq, _ReadMemoryThenStore) + result, _ := dc.storage.readWithException(mq, _ReadMemoryThenStore) + return result } // SearchOffsetByTimestamp with specific queueId and topic diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 12ccd185..e4412902 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -26,6 +26,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" + "github.com/apache/rocketmq-client-go/v2/errors" "github.com/apache/rocketmq-client-go/v2/internal" "github.com/apache/rocketmq-client-go/v2/internal/remote" "github.com/apache/rocketmq-client-go/v2/primitive" @@ -116,13 +117,20 @@ func TestComputePullFromWhere(t *testing.T) { rmqCli.SetNameSrv(namesrvCli) Convey("get effective offset", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10)) - res := dc.computePullFromWhere(mq) + offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(10), nil) + res, _ := dc.computePullFromWhereWithException(mq) assert.Equal(t, int64(10), res) }) + Convey("get offset error", func() { + offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), errors.ErrRequestTimeout) + + _, err := dc.computePullFromWhereWithException(mq) + assert.Equal(t, err, errors.ErrRequestTimeout) + }) + Convey("ConsumeFromLastOffset for normal topic", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1)) + offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil) dc.option.FromWhere = ConsumeFromLastOffset broker := "a" @@ -135,20 +143,20 @@ func TestComputePullFromWhere(t *testing.T) { }, }, nil) - res := dc.computePullFromWhere(mq) + res, _ := dc.computePullFromWhereWithException(mq) assert.Equal(t, int64(20), res) }) Convey("ConsumeFromFirstOffset for normal topic", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1)) + offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil) dc.option.FromWhere = ConsumeFromFirstOffset - res := dc.computePullFromWhere(mq) + res, _ := dc.computePullFromWhereWithException(mq) assert.Equal(t, int64(0), res) }) Convey("ConsumeFromTimestamp for normal topic", func() { - offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1)) + offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil) dc.option.FromWhere = ConsumeFromTimestamp dc.option.ConsumeTimestamp = "20060102150405" @@ -163,7 +171,7 @@ func TestComputePullFromWhere(t *testing.T) { }, }, nil) - res := dc.computePullFromWhere(mq) + res, _ := dc.computePullFromWhereWithException(mq) assert.Equal(t, int64(30), res) }) diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go index bac1cdb8..145ec3d4 100644 --- a/consumer/mock_offset_store.go +++ b/consumer/mock_offset_store.go @@ -28,67 +28,90 @@ import ( gomock "github.com/golang/mock/gomock" ) -// MockOffsetStore is a mock of OffsetStore interface +// MockOffsetStore is a mock of OffsetStore interface. type MockOffsetStore struct { ctrl *gomock.Controller recorder *MockOffsetStoreMockRecorder } -// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore +// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore. type MockOffsetStoreMockRecorder struct { mock *MockOffsetStore } -// NewMockOffsetStore creates a new mock instance +// NewMockOffsetStore creates a new mock instance. func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore { mock := &MockOffsetStore{ctrl: ctrl} mock.recorder = &MockOffsetStoreMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder { return m.recorder } -// persist mocks base method +// persist mocks base method. func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) { + m.ctrl.T.Helper() m.ctrl.Call(m, "persist", mqs) } -// persist indicates an expected call of persist +// persist indicates an expected call of persist. func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs) } -// remove mocks base method -func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) { - m.ctrl.Call(m, "remove", mq) -} - -// remove indicates an expected call of remove -func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq) -} - -// read mocks base method +// read mocks base method. func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "read", mq, t) ret0, _ := ret[0].(int64) return ret0 } -// read indicates an expected call of read +// read indicates an expected call of read. func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t) } -// update mocks base method +// readWithException mocks base method. +func (m *MockOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "readWithException", mq, t) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// readWithException indicates an expected call of readWithException. +func (mr *MockOffsetStoreMockRecorder) readWithException(mq, t interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "readWithException", reflect.TypeOf((*MockOffsetStore)(nil).readWithException), mq, t) +} + +// remove mocks base method. +func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "remove", mq) +} + +// remove indicates an expected call of remove. +func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq) +} + +// update mocks base method. func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) { + m.ctrl.T.Helper() m.ctrl.Call(m, "update", mq, offset, increaseOnly) } -// update indicates an expected call of update +// update indicates an expected call of update. func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly) } diff --git a/consumer/offset_store.go b/consumer/offset_store.go index 5ecfd146..86ecd187 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -57,6 +57,7 @@ type OffsetStore interface { persist(mqs []*primitive.MessageQueue) remove(mq *primitive.MessageQueue) read(mq *primitive.MessageQueue, t readType) int64 + readWithException(mq *primitive.MessageQueue, t readType) (int64, error) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) } @@ -156,21 +157,27 @@ func (local *localFileOffsetStore) load() { } } +// Deprecated: Use readWithException instead. func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 { + result, _ := local.readWithException(mq, t) + return result +} + +func (local *localFileOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) { switch t { case _ReadFromMemory, _ReadMemoryThenStore: off := readFromMemory(local.OffsetTable, mq) if off >= 0 || (off == -1 && t == _ReadFromMemory) { - return off + return off, nil } fallthrough case _ReadFromStore: local.load() - return readFromMemory(local.OffsetTable, mq) + return readFromMemory(local.OffsetTable, mq), nil default: } - return -1 + return -1, nil } func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) { @@ -284,18 +291,24 @@ func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) { }) } +// Deprecated: Use readWithException instead. func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 { + result, _ := r.readWithException(mq, t) + return result +} + +func (r *remoteBrokerOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) { r.mutex.RLock() switch t { case _ReadFromMemory, _ReadMemoryThenStore: off, exist := r.OffsetTable[*mq] if exist { r.mutex.RUnlock() - return off + return off, nil } if t == _ReadFromMemory { r.mutex.RUnlock() - return -1 + return -1, nil } fallthrough case _ReadFromStore: @@ -307,7 +320,7 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i rlog.LogKeyUnderlayError: err, }) r.mutex.RUnlock() - return -1 + return -1, err } rlog.Warning("fetch offset of mq from broker success", map[string]interface{}{ rlog.LogKeyConsumerGroup: r.group, @@ -316,11 +329,11 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i }) r.mutex.RUnlock() r.update(mq, off, true) - return off + return off, nil default: } - return -1 + return -1, nil } func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) { diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go index cfa0eaac..d833f36d 100644 --- a/consumer/offset_store_test.go +++ b/consumer/offset_store_test.go @@ -98,7 +98,7 @@ func TestLocalFileOffsetStore(t *testing.T) { } for _, value := range cases { localStore.update(value.queue, value.setOffset, false) - offset := localStore.read(value.queue, _ReadFromMemory) + offset, _ := localStore.readWithException(value.queue, _ReadFromMemory) So(offset, ShouldEqual, value.expectedOffset) } }) @@ -119,7 +119,7 @@ func TestLocalFileOffsetStore(t *testing.T) { } for _, value := range cases { localStore.update(value.queue, value.setOffset, true) - offset := localStore.read(value.queue, _ReadFromMemory) + offset, _ := localStore.readWithException(value.queue, _ReadFromMemory) So(offset, ShouldEqual, value.expectedOffset) } }) @@ -127,16 +127,16 @@ func TestLocalFileOffsetStore(t *testing.T) { Convey("test persist", func() { localStore.update(mq, 1, false) - offset := localStore.read(mq, _ReadFromMemory) + offset, _ := localStore.readWithException(mq, _ReadFromMemory) So(offset, ShouldEqual, 1) queues := []*primitive.MessageQueue{mq} localStore.persist(queues) - offset = localStore.read(mq, _ReadFromStore) + offset, _ = localStore.readWithException(mq, _ReadFromStore) So(offset, ShouldEqual, 1) localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq)) - offset = localStore.read(mq, _ReadMemoryThenStore) + offset, _ = localStore.readWithException(mq, _ReadMemoryThenStore) So(offset, ShouldEqual, 1) }) }) @@ -178,7 +178,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) { } for _, value := range cases { remoteStore.update(value.queue, value.setOffset, false) - offset := remoteStore.read(value.queue, _ReadFromMemory) + offset, _ := remoteStore.readWithException(value.queue, _ReadFromMemory) So(offset, ShouldEqual, value.expectedOffset) } }) @@ -199,7 +199,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) { } for _, value := range cases { remoteStore.update(value.queue, value.setOffset, true) - offset := remoteStore.read(value.queue, _ReadFromMemory) + offset, _ := remoteStore.readWithException(value.queue, _ReadFromMemory) So(offset, ShouldEqual, value.expectedOffset) } }) @@ -219,24 +219,24 @@ func TestRemoteBrokerOffsetStore(t *testing.T) { rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil).MaxTimes(2) remoteStore.persist(queues) - offset := remoteStore.read(mq, _ReadFromStore) + offset, _ := remoteStore.readWithException(mq, _ReadFromStore) So(offset, ShouldEqual, 1) remoteStore.remove(mq) - offset = remoteStore.read(mq, _ReadFromMemory) + offset, _ = remoteStore.readWithException(mq, _ReadFromMemory) So(offset, ShouldEqual, -1) - offset = remoteStore.read(mq, _ReadMemoryThenStore) + offset, _ = remoteStore.readWithException(mq, _ReadMemoryThenStore) So(offset, ShouldEqual, 1) }) Convey("test remove", func() { remoteStore.update(mq, 1, false) - offset := remoteStore.read(mq, _ReadFromMemory) + offset, _ := remoteStore.readWithException(mq, _ReadFromMemory) So(offset, ShouldEqual, 1) remoteStore.remove(mq) - offset = remoteStore.read(mq, _ReadFromMemory) + offset, _ = remoteStore.readWithException(mq, _ReadFromMemory) So(offset, ShouldEqual, -1) }) }) diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index ff6f7d7f..4699601b 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -20,10 +20,11 @@ package consumer import ( "context" "fmt" - errors2 "github.com/apache/rocketmq-client-go/v2/errors" "sync" "sync/atomic" + errors2 "github.com/apache/rocketmq-client-go/v2/errors" + "github.com/pkg/errors" "github.com/apache/rocketmq-client-go/v2/internal" @@ -219,7 +220,8 @@ func (c *defaultPullConsumer) makeSureStateOK() error { } func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) int64 { - return c.computePullFromWhere(queue) + result, _ := c.computePullFromWhereWithException(queue) + return result } // PullFrom pull messages of queue from the offset to offset + numbers diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 801f412e..4ad5ee36 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -20,7 +20,6 @@ package consumer import ( "context" "fmt" - errors2 "github.com/apache/rocketmq-client-go/v2/errors" "math" "runtime/pprof" "strconv" @@ -29,6 +28,8 @@ import ( "sync/atomic" "time" + errors2 "github.com/apache/rocketmq-client-go/v2/errors" + "github.com/pkg/errors" "github.com/apache/rocketmq-client-go/v2/internal" @@ -375,7 +376,7 @@ func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRun mq := key.(primitive.MessageQueue) pq := value.(*processQueue) pInfo := pq.currentInfo() - pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore) + pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore) info.MQTable[mq] = pInfo return true }) @@ -644,7 +645,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { } else { if pq.IsLock() { if !request.lockedFirst { - offset := pc.computePullFromWhere(request.mq) + offset, err := pc.computePullFromWhereWithException(request.mq) + if err != nil { + rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err.Error(), + }) + sleepTime = _PullDelayTimeWhenError + goto NEXT + } + brokerBusy := offset < request.nextOffset rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{ rlog.LogKeyPullRequest: request.String(), @@ -684,7 +693,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) { ) if pc.model == Clustering { - commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory) + commitOffsetValue, _ = pc.storage.readWithException(request.mq, _ReadFromMemory) if commitOffsetValue > 0 { commitOffsetEnable = true }