Skip to content

Commit

Permalink
Merge pull request #716 from zouyx/feature/merge1.4
Browse files Browse the repository at this point in the history
Rft : merge 1.4
  • Loading branch information
gaoxinge authored Aug 16, 2020
2 parents e3dd46f + a007d32 commit 5a135e6
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 72 deletions.
9 changes: 6 additions & 3 deletions config/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

import (
"github.com/creasty/defaults"
"github.com/dubbogo/getty"
perrors "github.com/pkg/errors"
)

Expand All @@ -34,6 +33,10 @@ import (
"github.com/apache/dubbo-go/common/yaml"
)

const (
MaxWheelTimeSpan = 900e9 // 900s, 15 minute
)

/////////////////////////
// consumerConfig
/////////////////////////
Expand Down Expand Up @@ -107,9 +110,9 @@ func ConsumerInit(confConFile string) error {
if consumerConfig.RequestTimeout, err = time.ParseDuration(consumerConfig.Request_Timeout); err != nil {
return perrors.WithMessagef(err, "time.ParseDuration(Request_Timeout{%#v})", consumerConfig.Request_Timeout)
}
if consumerConfig.RequestTimeout >= time.Duration(getty.MaxWheelTimeSpan) {
if consumerConfig.RequestTimeout >= time.Duration(MaxWheelTimeSpan) {
return perrors.WithMessagef(err, "request_timeout %s should be less than %s",
consumerConfig.Request_Timeout, time.Duration(getty.MaxWheelTimeSpan))
consumerConfig.Request_Timeout, time.Duration(MaxWheelTimeSpan))
}
}
if consumerConfig.Connect_Timeout != "" {
Expand Down
36 changes: 13 additions & 23 deletions config_center/apollo/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,28 @@ import (
import (
gxset "github.com/dubbogo/gost/container/set"
perrors "github.com/pkg/errors"
"github.com/zouyx/agollo"
"github.com/zouyx/agollo/v3"
agolloConstant "github.com/zouyx/agollo/v3/constant"
"github.com/zouyx/agollo/v3/env/config"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
cc "github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/config_center/parser"
"github.com/apache/dubbo-go/remoting"
)

const (
apolloProtocolPrefix = "http://"
apolloConfigFormat = "%s.%s"
apolloConfigFormat = "%s%s"
)

type apolloConfiguration struct {
url *common.URL

listeners sync.Map
appConf *agollo.AppConfig
appConf *config.AppConfig
parser parser.ConfigurationParser
}

Expand All @@ -60,39 +61,28 @@ func newApolloConfiguration(url *common.URL) (*apolloConfiguration, error) {

appId := url.GetParam(constant.CONFIG_APP_ID_KEY, "")
namespaces := getProperties(url.GetParam(constant.CONFIG_NAMESPACE_KEY, cc.DEFAULT_GROUP))
c.appConf = &agollo.AppConfig{
AppId: appId,
c.appConf = &config.AppConfig{
AppID: appId,
Cluster: configCluster,
NamespaceName: namespaces,
Ip: configAddr,
IP: configAddr,
}

agollo.InitCustomConfig(func() (*agollo.AppConfig, error) {
agollo.InitCustomConfig(func() (*config.AppConfig, error) {
return c.appConf, nil
})

return c, agollo.Start()
}

func getChangeType(change agollo.ConfigChangeType) remoting.EventType {
switch change {
case agollo.ADDED:
return remoting.EventTypeAdd
case agollo.DELETED:
return remoting.EventTypeDel
default:
return remoting.EventTypeUpdate
}
}

func (c *apolloConfiguration) AddListener(key string, listener cc.ConfigurationListener, opts ...cc.Option) {
k := &cc.Options{}
for _, opt := range opts {
opt(k)
}

key = k.Group + key
l, _ := c.listeners.LoadOrStore(key, NewApolloListener())
l, _ := c.listeners.LoadOrStore(key, newApolloListener())
l.(*apolloListener).AddListener(listener)
}

Expand All @@ -110,10 +100,10 @@ func (c *apolloConfiguration) RemoveListener(key string, listener cc.Configurati
}

func getProperties(namespace string) string {
return getNamespaceName(namespace, agollo.Properties)
return getNamespaceName(namespace, agolloConstant.Properties)
}

func getNamespaceName(namespace string, configFileFormat agollo.ConfigFileFormat) string {
func getNamespaceName(namespace string, configFileFormat agolloConstant.ConfigFileFormat) string {
return fmt.Sprintf(apolloConfigFormat, namespace, configFileFormat)
}

Expand Down Expand Up @@ -148,7 +138,7 @@ func (c *apolloConfiguration) GetProperties(key string, opts ...cc.Option) (stri
if config == nil {
return "", perrors.New(fmt.Sprintf("nothing in namespace:%s ", key))
}
return config.GetContent(agollo.Properties), nil
return config.GetContent(), nil
}

func (c *apolloConfiguration) getAddressWithProtocolPrefix(url *common.URL) string {
Expand Down
25 changes: 5 additions & 20 deletions config_center/apollo/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ func initMockApollo(t *testing.T) *apolloConfiguration {
return configuration
}

func TestAddListener(t *testing.T) {
func TestListener(t *testing.T) {
listener := &apolloDataListener{}
listener.wg.Add(1)
listener.wg.Add(2)
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
Expand All @@ -215,28 +215,14 @@ func TestAddListener(t *testing.T) {
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
//test add
apollo.AddListener(mockNamespace, listener)
listener.wg.Wait()
assert.Equal(t, "registries.hangzhouzk.username", listener.event)
assert.Equal(t, "mockDubbog.properties", listener.event)
assert.Greater(t, listener.count, 0)
deleteMockJson(t)
}

func TestRemoveListener(t *testing.T) {
listener := &apolloDataListener{}
apollo := initMockApollo(t)
mockConfigRes = `{
"appId": "testApplication_yang",
"cluster": "default",
"namespaceName": "mockDubbog.properties",
"configurations": {
"registries.hangzhouzk.username": "11111"
},
"releaseKey": "20191104105242-0f13805d89f834a4"
}`
apollo.AddListener(mockNamespace, listener)
//test remove
apollo.RemoveListener(mockNamespace, listener)
assert.Equal(t, "", listener.event)
listenerCount := 0
apollo.listeners.Range(func(_, value interface{}) bool {
apolloListener := value.(*apolloListener)
Expand All @@ -247,7 +233,6 @@ func TestRemoveListener(t *testing.T) {
return true
})
assert.Equal(t, listenerCount, 0)
assert.Equal(t, listener.count, 0)
deleteMockJson(t)
}

Expand Down
38 changes: 26 additions & 12 deletions config_center/apollo/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,48 @@
package apollo

import (
"github.com/zouyx/agollo"
"github.com/zouyx/agollo/v3"
"github.com/zouyx/agollo/v3/storage"
"gopkg.in/yaml.v2"
)

import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config_center"
"github.com/apache/dubbo-go/remoting"
)

type apolloListener struct {
listeners map[config_center.ConfigurationListener]struct{}
}

// NewApolloListener creates a new apolloListener
func NewApolloListener() *apolloListener {
// nolint
func newApolloListener() *apolloListener {
return &apolloListener{
listeners: make(map[config_center.ConfigurationListener]struct{}, 0),
}
}

// OnChange process each listener
func (a *apolloListener) OnChange(changeEvent *agollo.ChangeEvent) {
for key, change := range changeEvent.Changes {
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: getChangeType(change.ChangeType),
Key: key,
Value: change.NewValue,
})
}
func (a *apolloListener) OnChange(changeEvent *storage.ChangeEvent) {

}

// OnNewestChange process each listener by all changes
func (a *apolloListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {
b, err := yaml.Marshal(changeEvent.Changes)
if err != nil {
logger.Errorf("apollo onNewestChange err %+v",
err)
return
}
content := string(b)
for listener := range a.listeners {
listener.Process(&config_center.ConfigChangeEvent{
ConfigType: remoting.EventTypeUpdate,
Key: changeEvent.Namespace,
Value: content,
})
}
}

Expand Down
5 changes: 4 additions & 1 deletion config_center/nacos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
}
nacosAddresses := strings.Split(url.Location, ",")
if container.NacosClient() == nil {
//in dubbo ,every registry only connect one node ,so this is []string{r.Address}
newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout, url)
if err != nil {
logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
Expand All @@ -115,6 +116,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
container.NacosClient().SetClient(&configClient)

}

return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
Expand Down Expand Up @@ -167,8 +169,9 @@ func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url commo
"serverConfigs": svrConfList,
"clientConfig": nacosconst.ClientConfig{
TimeoutMs: uint64(int32(timeout / time.Millisecond)),
ListenInterval: uint64(int32(timeout / time.Millisecond)),
NotLoadCacheAtStart: true,
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, logDir),
LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
Username: url.GetParam(constant.NACOS_USERNAME, ""),
Expand Down
1 change: 1 addition & 0 deletions config_center/nacos/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (n *nacosDynamicConfiguration) addListener(key string, listener config_cent
})
if err != nil {
logger.Errorf("nacos : listen config fail, error:%v ", err)
return
}
newListener := make(map[config_center.ConfigurationListener]context.CancelFunc)
newListener[listener] = cancel
Expand Down
8 changes: 2 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
module github.com/apache/dubbo-go

require (
cloud.google.com/go v0.39.0 // indirect
github.com/Microsoft/go-winio v0.4.13 // indirect
github.com/NYTimes/gziphandler v1.1.1 // indirect
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/apache/dubbo-go-hessian2 v1.6.1
github.com/apache/dubbo-go-hessian2 v1.6.2
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/creasty/defaults v1.3.0
github.com/docker/go-connections v0.4.0 // indirect
Expand Down Expand Up @@ -47,14 +45,12 @@ require (
github.com/prometheus/client_golang v1.1.0
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/shirou/gopsutil v2.19.9+incompatible // indirect
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
github.com/zouyx/agollo v0.0.0-20191114083447-dde9fc9f35b8
github.com/zouyx/agollo/v3 v3.4.4
go.etcd.io/bbolt v1.3.4 // indirect
go.uber.org/atomic v1.6.0
go.uber.org/zap v1.15.0
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 // indirect
google.golang.org/grpc v1.23.0
gopkg.in/yaml.v2 v2.2.8
k8s.io/api v0.16.9
Expand Down
Loading

0 comments on commit 5a135e6

Please sign in to comment.