From a6d900f63a0ea05e37315c200e858570f0a1ebfd Mon Sep 17 00:00:00 2001 From: xujianhai666 Date: Tue, 6 Aug 2019 23:06:42 +0800 Subject: [PATCH] add broadcast cluster --- cluster/cluster_impl/broadcast_cluster.go | 40 +++++++ .../cluster_impl/broadcast_cluster_invoker.go | 59 ++++++++++ .../broadcast_cluster_invoker_test.go | 109 ++++++++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 cluster/cluster_impl/broadcast_cluster.go create mode 100644 cluster/cluster_impl/broadcast_cluster_invoker.go create mode 100644 cluster/cluster_impl/broadcast_cluster_invoker_test.go diff --git a/cluster/cluster_impl/broadcast_cluster.go b/cluster/cluster_impl/broadcast_cluster.go new file mode 100644 index 0000000000..50aae3cfab --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +type broadcastCluster struct{} + +const broadcast = "broadcast" + +func init() { + extension.SetCluster(broadcast, NewBroadcastCluster) +} + +func NewBroadcastCluster() cluster.Cluster { + return &broadcastCluster{} +} + +func (cluster *broadcastCluster) Join(directory cluster.Directory) protocol.Invoker { + return newBroadcastClusterInvoker(directory) +} diff --git a/cluster/cluster_impl/broadcast_cluster_invoker.go b/cluster/cluster_impl/broadcast_cluster_invoker.go new file mode 100644 index 0000000000..238df0acfa --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster_invoker.go @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +type broadcastClusterInvoker struct { + baseClusterInvoker +} + +func newBroadcastClusterInvoker(directory cluster.Directory) protocol.Invoker { + return &broadcastClusterInvoker{ + baseClusterInvoker: newBaseClusterInvoker(directory), + } +} + +func (invoker *broadcastClusterInvoker) Invoke(invocation protocol.Invocation) protocol.Result { + invokers := invoker.directory.List(invocation) + err := invoker.checkInvokers(invokers, invocation) + if err != nil { + return &protocol.RPCResult{Err: err} + } + err = invoker.checkWhetherDestroyed() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + var result protocol.Result + for _, ivk := range invokers { + result = ivk.Invoke(invocation) + if result.Error() != nil { + logger.Warnf("broadcast invoker invoke err: %v when use invoker: %v\n", result.Error(), ivk) + err = result.Error() + } + } + if err != nil { + return &protocol.RPCResult{Err: err} + } + return result +} diff --git a/cluster/cluster_impl/broadcast_cluster_invoker_test.go b/cluster/cluster_impl/broadcast_cluster_invoker_test.go new file mode 100644 index 0000000000..565684a8ae --- /dev/null +++ b/cluster/cluster_impl/broadcast_cluster_invoker_test.go @@ -0,0 +1,109 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster_impl + +import ( + "context" + "errors" + "testing" +) + +import ( + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/directory" + "github.com/apache/dubbo-go/cluster/loadbalance" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/mock" +) + +var ( + broadcastUrl, _ = common.NewURL(context.TODO(), "dubbo://192.168.1.1:20000/com.ikurento.user.UserProvider") +) + +func registerBroadcast(t *testing.T, mockInvokers ...*mock.MockInvoker) protocol.Invoker { + extension.SetLoadbalance("random", loadbalance.NewRandomLoadBalance) + + invokers := []protocol.Invoker{} + for i, ivk := range mockInvokers { + invokers = append(invokers, ivk) + if i == 0 { + ivk.EXPECT().GetUrl().Return(broadcastUrl) + } + } + staticDir := directory.NewStaticDirectory(invokers) + + broadcastCluster := NewBroadcastCluster() + clusterInvoker := broadcastCluster.Join(staticDir) + return clusterInvoker +} + +func Test_BroadcastInvokeSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + for i := 0; i < 3; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + + clusterInvoker := registerBroadcast(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockResult, result) +} + +func Test_BroadcastInvokeFailed(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + invokers := make([]*mock.MockInvoker, 0) + + mockResult := &protocol.RPCResult{Rest: rest{tried: 0, success: true}} + mockFailedResult := &protocol.RPCResult{Err: errors.New("just failed")} + for i := 0; i < 10; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockFailedResult) + } + for i := 0; i < 10; i++ { + invoker := mock.NewMockInvoker(ctrl) + invokers = append(invokers, invoker) + invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) + } + + clusterInvoker := registerBroadcast(t, invokers...) + + result := clusterInvoker.Invoke(&invocation.RPCInvocation{}) + assert.Equal(t, mockFailedResult.Err, result.Error()) +}