Skip to content

Commit

Permalink
consumer and producer support unit mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tuweizhong authored and twz915 committed Feb 1, 2023
1 parent 896a8a3 commit cb8da2a
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 9 deletions.
26 changes: 24 additions & 2 deletions consumer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ limitations under the License.
package consumer

import (
"github.com/apache/rocketmq-client-go/v2/hooks"
"strings"
"time"

"github.com/apache/rocketmq-client-go/v2/hooks"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
Expand Down Expand Up @@ -327,7 +328,28 @@ func WithNameServer(nameServers primitive.NamesrvAddr) Option {
// WithNameServerDomain set NameServer domain
func WithNameServerDomain(nameServerUrl string) Option {
return func(opts *consumerOptions) {
opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
h := primitive.NewHttpResolver("DEFAULT", nameServerUrl)
if opts.UnitName != "" {
h.SetUnitName(opts.UnitName)
}
opts.Resolver = h
}
}

// WithUnitMode set the unit mode
func WithUnitMode(unitMode bool) Option {
return func(opts *consumerOptions) {
opts.UnitMode = unitMode
}
}

// WithUnitName set the name of specified unit
func WithUnitName(unitName string) Option {
return func(opts *consumerOptions) {
opts.UnitName = strings.TrimSpace(unitName)
if ns, ok := opts.Resolver.(*primitive.HttpResolver); ok {
ns.SetUnitName(opts.UnitName)
}
}
}

Expand Down
71 changes: 71 additions & 0 deletions consumer/option_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package consumer

import (
"fmt"
"reflect"
"strings"
"testing"
)

func getFieldString(obj interface{}, field string) string {
v := reflect.Indirect(reflect.ValueOf(obj))
return v.FieldByNameFunc(func(n string) bool {
return n == field
}).String()
}

func TestWithUnitMode(t *testing.T) {
opt := defaultPullConsumerOptions()
WithUnitMode(true)(&opt)
if !opt.UnitMode {
t.Errorf("consumer option WithUnitMode. want:true, got=%v", opt.UnitMode)
}
}

func TestWithUnitName(t *testing.T) {
opt := defaultPullConsumerOptions()
unitName := "unsh"
WithUnitName(unitName)(&opt)
if opt.UnitName != unitName {
t.Errorf("consumer option WithUnitName. want:%s, got=%s", unitName, opt.UnitName)
}
}

func TestWithNameServerDomain(t *testing.T) {
opt := defaultPullConsumerOptions()
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
WithNameServerDomain(nameServerAddr)(&opt)
domainStr := getFieldString(opt.Resolver, "domain")
if domainStr != nameServerAddr {
t.Errorf("consumer option WithUnitName. want:%s, got=%s", nameServerAddr, domainStr)
}
}

func TestWithNameServerDomainAndUnitName(t *testing.T) {
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
unitName := "unsh"
suffix := fmt.Sprintf("-%s?nofix=1", unitName)

// test with two different orders
t.Run("WithNameServerDomain & WithUnitName", func(t *testing.T) {
opt := defaultPullConsumerOptions()
WithNameServerDomain(nameServerAddr)(&opt)
WithUnitName(unitName)(&opt)

domainStr := getFieldString(opt.Resolver, "domain")
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
}
})

t.Run("WithUnitName & WithNameServerDomain", func(t *testing.T) {
opt := defaultPullConsumerOptions()
WithNameServerDomain(nameServerAddr)(&opt)
WithUnitName(unitName)(&opt)

domainStr := getFieldString(opt.Resolver, "domain")
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
}
})
}
18 changes: 14 additions & 4 deletions primitive/nsresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -111,6 +111,16 @@ func NewHttpResolver(instance string, domain ...string) *HttpResolver {
return h
}

func (h *HttpResolver) SetUnitName(unitName string) {
if unitName == "" {
return
}
if strings.Contains(h.domain, "?nofix=1") {
return
}
h.domain = fmt.Sprintf("%s-%s?nofix=1", h.domain, unitName)
}

func (h *HttpResolver) Resolve() []string {
addrs := h.get()
if len(addrs) > 0 {
Expand Down Expand Up @@ -152,14 +162,14 @@ func (h *HttpResolver) get() []string {
return nil
}

bodyStr := string(body)
bodyStr := strings.TrimSpace(string(body))
if bodyStr == "" {
return nil
}

h.saveSnapshot(body)
_ = h.saveSnapshot([]byte(bodyStr))

return strings.Split(string(body), ";")
return strings.Split(bodyStr, ";")
}

func (h *HttpResolver) saveSnapshot(body []byte) error {
Expand Down
42 changes: 40 additions & 2 deletions primitive/nsresolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,14 +18,15 @@ package primitive

import (
"fmt"
"github.com/apache/rocketmq-client-go/v2/rlog"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"testing"

"github.com/apache/rocketmq-client-go/v2/rlog"

. "github.com/smartystreets/goconvey/convey"
)

Expand Down Expand Up @@ -81,6 +82,43 @@ func TestHttpResolverWithGet(t *testing.T) {
})
}

func TestHttpResolverWithGetUnitName(t *testing.T) {
Convey("Test UpdateNameServerAddress Save Local Snapshot", t, func() {
srvs := []string{
"192.168.100.1",
"192.168.100.2",
"192.168.100.3",
"192.168.100.4",
"192.168.100.5",
}
http.HandleFunc("/nameserver/addrs3-unsh", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Query().Get("nofix") == "1" {
fmt.Fprintf(w, strings.Join(srvs, ";"))
}
fmt.Fprintf(w, "")
})
server := &http.Server{Addr: ":0", Handler: nil}
listener, _ := net.Listen("tcp", ":0")
go server.Serve(listener)

port := listener.Addr().(*net.TCPAddr).Port
nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs3", port)
rlog.Info("Temporary Nameserver", map[string]interface{}{
"domain": nameServerDommain,
})

resolver := NewHttpResolver("DEFAULT", nameServerDommain)
resolver.SetUnitName("unsh")
resolver.Resolve()

// check snapshot saved
filePath := resolver.getSnapshotFilePath("DEFAULT")
body := strings.Join(srvs, ";")
bs, _ := ioutil.ReadFile(filePath)
So(string(bs), ShouldEqual, body)
})
}

func TestHttpResolverWithSnapshotFile(t *testing.T) {
Convey("Test UpdateNameServerAddress Use Local Snapshot", t, func() {
srvs := []string{
Expand Down
24 changes: 23 additions & 1 deletion producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package producer

import (
"strings"
"time"

"github.com/apache/rocketmq-client-go/v2/internal"
Expand Down Expand Up @@ -144,7 +145,28 @@ func WithNameServer(nameServers primitive.NamesrvAddr) Option {
// WithNameServerDomain set NameServer domain
func WithNameServerDomain(nameServerUrl string) Option {
return func(opts *producerOptions) {
opts.Resolver = primitive.NewHttpResolver("DEFAULT", nameServerUrl)
h := primitive.NewHttpResolver("DEFAULT", nameServerUrl)
if opts.UnitName != "" {
h.SetUnitName(opts.UnitName)
}
opts.Resolver = h
}
}

// WithUnitMode set the unit mode
func WithUnitMode(unitMode bool) Option {
return func(opts *producerOptions) {
opts.UnitMode = unitMode
}
}

// WithUnitName set the name of specified unit
func WithUnitName(unitName string) Option {
return func(opts *producerOptions) {
opts.UnitName = strings.TrimSpace(unitName)
if ns, ok := opts.Resolver.(*primitive.HttpResolver); ok {
ns.SetUnitName(opts.UnitName)
}
}
}

Expand Down
71 changes: 71 additions & 0 deletions producer/option_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package producer

import (
"fmt"
"reflect"
"strings"
"testing"
)

func getFieldString(obj interface{}, field string) string {
v := reflect.Indirect(reflect.ValueOf(obj))
return v.FieldByNameFunc(func(n string) bool {
return n == field
}).String()
}

func TestWithUnitMode(t *testing.T) {
opt := defaultProducerOptions()
WithUnitMode(true)(&opt)
if !opt.UnitMode {
t.Errorf("consumer option WithUnitMode. want:true, got=%v", opt.UnitMode)
}
}

func TestWithUnitName(t *testing.T) {
opt := defaultProducerOptions()
unitName := "unsh"
WithUnitName(unitName)(&opt)
if opt.UnitName != unitName {
t.Errorf("consumer option WithUnitName. want:%s, got=%s", unitName, opt.UnitName)
}
}

func TestWithNameServerDomain(t *testing.T) {
opt := defaultProducerOptions()
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
WithNameServerDomain(nameServerAddr)(&opt)
domainStr := getFieldString(opt.Resolver, "domain")
if domainStr != nameServerAddr {
t.Errorf("consumer option WithUnitName. want:%s, got=%s", nameServerAddr, domainStr)
}
}

func TestWithNameServerDomainAndUnitName(t *testing.T) {
nameServerAddr := "http://127.0.0.1:8080/nameserver/addr"
unitName := "unsh"
suffix := fmt.Sprintf("-%s?nofix=1", unitName)

// test with two different orders
t.Run("WithNameServerDomain & WithUnitName", func(t *testing.T) {
opt := defaultProducerOptions()
WithNameServerDomain(nameServerAddr)(&opt)
WithUnitName(unitName)(&opt)

domainStr := getFieldString(opt.Resolver, "domain")
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
}
})

t.Run("WithUnitName & WithNameServerDomain", func(t *testing.T) {
opt := defaultProducerOptions()
WithNameServerDomain(nameServerAddr)(&opt)
WithUnitName(unitName)(&opt)

domainStr := getFieldString(opt.Resolver, "domain")
if !strings.Contains(domainStr, nameServerAddr) || !strings.Contains(domainStr, suffix) {
t.Errorf("consumer option should contains %s and %s", nameServerAddr, suffix)
}
})
}

0 comments on commit cb8da2a

Please sign in to comment.