diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 9118ae20..d14f0a55 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -267,6 +267,28 @@ func (pc *pushConsumer) IsUnitMode() bool { return pc.unitMode } +func (pc *pushConsumer) GetcType() string { + return string(pc.cType) +} + +func (pc *pushConsumer) GetModel() string { + return pc.model.String() +} + +func (pc *pushConsumer) GetWhere() string { + switch pc.fromWhere { + case ConsumeFromLastOffset: + return "CONSUME_FROM_LAST_OFFSET" + case ConsumeFromFirstOffset: + return "CONSUME_FROM_FIRST_OFFSET" + case ConsumeFromTimestamp: + return "CONSUME_FROM_TIMESTAMP" + default: + return "UNKOWN" + } + +} + func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo { info := internal.NewConsumerRunningInfo() diff --git a/internal/client.go b/internal/client.go index 253b96e9..954cc575 100644 --- a/internal/client.go +++ b/internal/client.go @@ -84,6 +84,9 @@ type InnerConsumer interface { Rebalance() IsUnitMode() bool GetConsumerRunningInfo() *ConsumerRunningInfo + GetcType() string + GetModel() string + GetWhere() string } func DefaultClientOptions() ClientOptions { @@ -454,9 +457,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() { consumer := value.(InnerConsumer) cData := consumerData{ GroupName: key.(string), - CType: "CONSUME_PASSIVELY", - MessageModel: "CLUSTERING", - Where: "CONSUME_FROM_FIRST_OFFSET", + CType: consumeType(consumer.GetcType()), + MessageModel: strings.ToUpper(consumer.GetModel()), + Where: consumer.GetWhere(), UnitMode: consumer.IsUnitMode(), SubscriptionDatas: consumer.SubscriptionDataList(), }