diff --git a/consumer/consumer.go b/consumer/consumer.go index 15100cd5..1511f017 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -435,7 +435,7 @@ func (dc *defaultConsumer) doBalance() { func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData { result := make([]*internal.SubscriptionData, 0) dc.subscriptionDataTable.Range(func(key, value interface{}) bool { - result = append(result, value.(*internal.SubscriptionData)) + result = append(result, value.(*internal.SubscriptionData).Clone()) return true }) return result diff --git a/internal/model.go b/internal/model.go index c248bb36..0fd7955b 100644 --- a/internal/model.go +++ b/internal/model.go @@ -62,6 +62,32 @@ type SubscriptionData struct { ExpType string `json:"expressionType"` } +func (sd *SubscriptionData) Clone() *SubscriptionData { + cloned := &SubscriptionData{ + ClassFilterMode: sd.ClassFilterMode, + Topic: sd.Topic, + SubString: sd.SubString, + SubVersion: sd.SubVersion, + ExpType: sd.ExpType, + } + + if sd.Tags.Items() != nil { + cloned.Tags = utils.NewSet() + for _, value := range sd.Tags.Items() { + cloned.Tags.Add(value) + } + } + + if sd.Codes.Items() != nil { + cloned.Codes = utils.NewSet() + for _, value := range sd.Codes.Items() { + cloned.Codes.Add(value) + } + } + + return cloned +} + type producerData struct { GroupName string `json:"groupName"` } diff --git a/internal/utils/set.go b/internal/utils/set.go index e90fb36d..ed9857b5 100644 --- a/internal/utils/set.go +++ b/internal/utils/set.go @@ -43,6 +43,10 @@ func NewSet() Set { } } +func (s *Set) Items() map[string]UniqueItem { + return s.items +} + func (s *Set) Add(v UniqueItem) { s.items[v.UniqueID()] = v } @@ -98,6 +102,6 @@ func (s *Set) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -func (s Set) UnmarshalJSON(data []byte) (err error) { +func (s *Set) UnmarshalJSON(data []byte) (err error) { return nil }