diff --git a/admin/admin.go b/admin/admin.go index 20e17942..487c8b44 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -33,8 +33,9 @@ import ( type Admin interface { CreateTopic(ctx context.Context, opts ...OptionCreate) error DeleteTopic(ctx context.Context, opts ...OptionDelete) error - //TODO - //TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error) + + GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error) + FetchAllTopicList(ctx context.Context) (*TopicList, error) //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) Close() error @@ -108,6 +109,51 @@ func NewAdmin(opts ...AdminOption) (*admin, error) { }, nil } +func (a *admin) GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error) { + cmd := remote.NewRemotingCommand(internal.ReqGetAllSubscriptionGroupConfig, nil, nil) + a.cli.RegisterACL() + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, timeoutMillis) + if err != nil { + rlog.Error("Get all group list error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return nil, err + } else { + rlog.Info("Get all group list success", map[string]interface{}{}) + } + var subscriptionGroupWrapper SubscriptionGroupWrapper + _, err = subscriptionGroupWrapper.Decode(response.Body, &subscriptionGroupWrapper) + if err != nil { + rlog.Error("Get all group list decode error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return nil, err + } + return &subscriptionGroupWrapper, nil +} + +func (a *admin) FetchAllTopicList(ctx context.Context) (*TopicList, error) { + cmd := remote.NewRemotingCommand(internal.ReqGetAllTopicListFromNameServer, nil, nil) + response, err := a.cli.InvokeSync(ctx, a.cli.GetNameSrv().AddrList()[0], cmd, 3*time.Second) + if err != nil { + rlog.Error("Fetch all topic list error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return nil, err + } else { + rlog.Info("Fetch all topic list success", map[string]interface{}{}) + } + var topicList TopicList + _, err = topicList.Decode(response.Body, &topicList) + if err != nil { + rlog.Error("Fetch all topic list decode error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return nil, err + } + return &topicList, nil +} + // CreateTopic create topic. // TODO: another implementation like sarama, without brokerAddr as input func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error { diff --git a/admin/response.go b/admin/response.go new file mode 100644 index 00000000..7a97236f --- /dev/null +++ b/admin/response.go @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package admin + +import "encoding/json" + +type RemotingSerializable struct { +} + +func (r *RemotingSerializable) Encode(obj interface{}) ([]byte, error) { + jsonStr := r.ToJson(obj, false) + if jsonStr != "" { + return []byte(jsonStr), nil + } + return nil, nil +} + +func (r *RemotingSerializable) ToJson(obj interface{}, prettyFormat bool) string { + if prettyFormat { + jsonBytes, err := json.MarshalIndent(obj, "", " ") + if err != nil { + return "" + } + return string(jsonBytes) + } else { + jsonBytes, err := json.Marshal(obj) + if err != nil { + return "" + } + return string(jsonBytes) + } +} +func (r *RemotingSerializable) Decode(data []byte, classOfT interface{}) (interface{}, error) { + jsonStr := string(data) + return r.FromJson(jsonStr, classOfT) +} + +func (r *RemotingSerializable) FromJson(jsonStr string, classOfT interface{}) (interface{}, error) { + err := json.Unmarshal([]byte(jsonStr), classOfT) + if err != nil { + return nil, err + } + return classOfT, nil +} + +type TopicList struct { + TopicList []string + BrokerAddr string + RemotingSerializable +} + +type SubscriptionGroupWrapper struct { + SubscriptionGroupTable map[string]SubscriptionGroupConfig + DataVersion DataVersion + RemotingSerializable +} + +type DataVersion struct { + Timestamp int64 + Counter int32 +} + +type SubscriptionGroupConfig struct { + GroupName string + ConsumeEnable bool + ConsumeFromMinEnable bool + ConsumeBroadcastEnable bool + RetryMaxTimes int + RetryQueueNums int + BrokerId int + WhichBrokerWhenConsumeSlowly int + NotifyConsumerIdsChangedEnable bool +} diff --git a/examples/admin/group/main.go b/examples/admin/group/main.go new file mode 100644 index 00000000..6aefe97e --- /dev/null +++ b/examples/admin/group/main.go @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "time" + + "github.com/apache/rocketmq-client-go/v2/admin" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +func main() { + //clusterName := "DefaultCluster" + nameSrvAddr := []string{"127.0.0.1:9876"} + brokerAddr := "127.0.0.1:10911" + + testAdmin, err := admin.NewAdmin( + admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)), + admin.WithCredentials(primitive.Credentials{ + AccessKey: "RocketMQ", + SecretKey: "12345678", + }), + ) + + // group list + result, err := testAdmin.GetAllSubscriptionGroup(context.Background(), brokerAddr, 3*time.Second) + if err != nil { + fmt.Println("GetAllSubscriptionGroup error:", err.Error()) + } + fmt.Println(result.SubscriptionGroupTable) + + err = testAdmin.Close() + if err != nil { + fmt.Printf("Shutdown admin error: %s", err.Error()) + } +} diff --git a/examples/admin/topic/main.go b/examples/admin/topic/main.go index ef9a5367..c52f3c70 100644 --- a/examples/admin/topic/main.go +++ b/examples/admin/topic/main.go @@ -31,10 +31,20 @@ func main() { nameSrvAddr := []string{"127.0.0.1:9876"} brokerAddr := "127.0.0.1:10911" - testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr))) + testAdmin, err := admin.NewAdmin( + admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)), + admin.WithCredentials(primitive.Credentials{ + AccessKey: "RocketMQ", + SecretKey: "12345678", + }), + ) + + // topic list + result, err := testAdmin.FetchAllTopicList(context.Background()) if err != nil { - fmt.Println(err.Error()) + fmt.Println("FetchAllTopicList error:", err.Error()) } + fmt.Println(result.TopicList) //create topic err = testAdmin.CreateTopic( diff --git a/internal/client.go b/internal/client.go index 2ae85982..e73b7d3d 100644 --- a/internal/client.go +++ b/internal/client.go @@ -162,6 +162,7 @@ type RMQClient interface { UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) GetNameSrv() Namesrvs + RegisterACL() } var _ RMQClient = new(rmqClient) @@ -939,6 +940,12 @@ func (c *rmqClient) consumeMessageDirectly(msg *primitive.MessageExt, group stri return res } +func (c *rmqClient) RegisterACL() { + if !c.option.Credentials.IsEmpty() { + c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials)) + } +} + func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue { list := make([]*primitive.MessageQueue, 0) for idx := range data.QueueDataList { diff --git a/internal/mock_client.go b/internal/mock_client.go index c9750384..3c5b55d5 100644 --- a/internal/mock_client.go +++ b/internal/mock_client.go @@ -428,3 +428,8 @@ func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, ch func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data, changed interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data, changed) } + +// RegisterACL mocks base method +func (m *MockRMQClient) RegisterACL() { + m.ctrl.Call(m, "RegisterACL") +} diff --git a/internal/request.go b/internal/request.go index 5ff04e3d..9a590b8f 100644 --- a/internal/request.go +++ b/internal/request.go @@ -45,6 +45,7 @@ const ( ReqSendBatchMessage = int16(320) ReqCheckTransactionState = int16(39) ReqNotifyConsumerIdsChanged = int16(40) + ReqGetAllSubscriptionGroupConfig = int16(201) ReqGetAllTopicListFromNameServer = int16(206) ReqDeleteTopicInBroker = int16(215) ReqDeleteTopicInNameSrv = int16(216)