Skip to content

Commit

Permalink
[FAB-5006] Disable endpoint
Browse files Browse the repository at this point in the history
If orderer endpoint return SERVICE_UNAVALIABLE status, to endpoint
become disabled for predefined period of time.

Change-Id: I69983739f99df53b1eb32676f5e7025a20102c78
Signed-off-by: Gennady Laventman <[email protected]>
  • Loading branch information
gennadylaventman committed Aug 8, 2017
1 parent ee12505 commit 8ffc237
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 159 deletions.
73 changes: 49 additions & 24 deletions core/comm/producer.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package comm
Expand All @@ -28,6 +18,8 @@ import (

var logger = flogging.MustGetLogger("ConnProducer")

var EndpointDisableInterval = time.Second * 10

// ConnectionFactory creates a connection to a certain endpoint
type ConnectionFactory func(endpoint string) (*grpc.ClientConn, error)

Expand All @@ -41,12 +33,15 @@ type ConnectionProducer interface {
// UpdateEndpoints updates the endpoints of the ConnectionProducer
// to be the given endpoints
UpdateEndpoints(endpoints []string)
// DisableEndpoint remove endpoint from endpoint for some time
DisableEndpoint(endpoint string)
}

type connProducer struct {
sync.RWMutex
endpoints []string
connect ConnectionFactory
sync.Mutex
endpoints []string
disabledEndpoints map[string]time.Time
connect ConnectionFactory
}

// NewConnectionProducer creates a new ConnectionProducer with given endpoints and connection factory.
Expand All @@ -55,26 +50,36 @@ func NewConnectionProducer(factory ConnectionFactory, endpoints []string) Connec
if len(endpoints) == 0 {
return nil
}
return &connProducer{endpoints: endpoints, connect: factory}
return &connProducer{endpoints: endpoints, connect: factory, disabledEndpoints: make(map[string]time.Time)}
}

// NewConnection creates a new connection.
// Returns the connection, the endpoint selected, nil on success.
// Returns nil, "", error on failure
func (cp *connProducer) NewConnection() (*grpc.ClientConn, string, error) {
cp.RLock()
defer cp.RUnlock()
cp.Lock()
defer cp.Unlock()

for endpoint, timeout := range cp.disabledEndpoints {
if time.Since(timeout) >= EndpointDisableInterval {
delete(cp.disabledEndpoints, endpoint)
}
}

endpoints := shuffle(cp.endpoints)
checkedEndpoints := make([]string, 0)
for _, endpoint := range endpoints {
conn, err := cp.connect(endpoint)
if err != nil {
logger.Error("Failed connecting to", endpoint, ", error:", err)
continue
if _, ok := cp.disabledEndpoints[endpoint]; !ok {
checkedEndpoints = append(checkedEndpoints, endpoint)
conn, err := cp.connect(endpoint)
if err != nil {
logger.Error("Failed connecting to", endpoint, ", error:", err)
continue
}
return conn, endpoint, nil
}
return conn, endpoint, nil
}
return nil, "", fmt.Errorf("Could not connect to any of the endpoints: %v", endpoints)
return nil, "", fmt.Errorf("Could not connect to any of the endpoints: %v", checkedEndpoints)
}

// UpdateEndpoints updates the endpoints of the ConnectionProducer
Expand All @@ -86,7 +91,27 @@ func (cp *connProducer) UpdateEndpoints(endpoints []string) {
}
cp.Lock()
defer cp.Unlock()

newDisabled := make(map[string]time.Time)
for i := range endpoints {
if startTime, ok := cp.disabledEndpoints[endpoints[i]]; ok {
newDisabled[endpoints[i]] = startTime
}
}
cp.endpoints = endpoints
cp.disabledEndpoints = newDisabled
}

func (cp *connProducer) DisableEndpoint(endpoint string) {
cp.Lock()
defer cp.Unlock()

for i := range cp.endpoints {
if cp.endpoints[i] == endpoint {
cp.disabledEndpoints[endpoint] = time.Now()
break
}
}
}

func shuffle(a []string) []string {
Expand Down
54 changes: 42 additions & 12 deletions core/comm/producer_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,15 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
Expand Down Expand Up @@ -109,3 +100,42 @@ func TestUpdateEndpoints(t *testing.T) {
conn, _, err = producer.NewConnection()
assert.Equal(t, "b", conn2Endpoint[fmt.Sprintf("%p", conn)])
}

func TestDisableEndpoint(t *testing.T) {
orgEndpointDisableInterval := EndpointDisableInterval
EndpointDisableInterval = time.Millisecond * 100
defer func() { EndpointDisableInterval = orgEndpointDisableInterval }()

conn2Endpoint := make(map[string]string)
connFactory := func(endpoint string) (*grpc.ClientConn, error) {
conn := &grpc.ClientConn{}
conn2Endpoint[fmt.Sprintf("%p", conn)] = endpoint
return conn, nil
}
// Create producer with single endpoint
producer := NewConnectionProducer(connFactory, []string{"a"})
conn, a, err := producer.NewConnection()
assert.NoError(t, err)
assert.Equal(t, "a", conn2Endpoint[fmt.Sprintf("%p", conn)])
assert.Equal(t, "a", a)
// Now disable endpoint for 100 milliseconds
producer.DisableEndpoint("a")
_, _, err = producer.NewConnection()
assert.Error(t, err, "Could not connect")
// Wait until disable expire and try to connect again
time.Sleep(time.Millisecond * 200)
conn, a, err = producer.NewConnection()
assert.NoError(t, err)
assert.Equal(t, "a", conn2Endpoint[fmt.Sprintf("%p", conn)])
assert.Equal(t, "a", a)
// Disable again
producer.DisableEndpoint("a")
// Update endpoints
producer.UpdateEndpoints([]string{"a", "b"})

conn, a, err = producer.NewConnection()
assert.NoError(t, err)
assert.Equal(t, "b", conn2Endpoint[fmt.Sprintf("%p", conn)])
assert.Equal(t, "b", a)

}
28 changes: 11 additions & 17 deletions core/deliverservice/blocksprovider/blocksprovider.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
/*
Copyright IBM Corp. 2017 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/

package blocksprovider
Expand Down Expand Up @@ -83,8 +73,8 @@ type streamClient interface {
// Close closes the stream and its underlying connection
Close()

// Disconnect disconnects from the remote node
Disconnect()
// Disconnect disconnects from the remote node and disable reconnect to current endpoint for predefined period of time
Disconnect(disableEndpoint bool)
}

// blocksProviderImpl the actual implementation for BlocksProvider interface
Expand All @@ -104,7 +94,7 @@ type blocksProviderImpl struct {

const wrongStatusThreshold = 10

var MaxRetryDelay = time.Second * 10
var maxRetryDelay = time.Second * 10

var logger *logging.Logger // package-level logger

Expand Down Expand Up @@ -152,13 +142,17 @@ func (b *blocksProviderImpl) DeliverBlocks() {
errorStatusCounter = 0
logger.Warningf("[%s] Got error %v", b.chainID, t)
}
maxDelay := float64(MaxRetryDelay)
maxDelay := float64(maxRetryDelay)
currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond)
time.Sleep(time.Duration(math.Min(maxDelay, currDelay)))
if currDelay < maxDelay {
statusCounter++
}
b.client.Disconnect()
if t.Status == common.Status_BAD_REQUEST {
b.client.Disconnect(false)
} else {
b.client.Disconnect(true)
}
continue
case *orderer.DeliverResponse_Block:
errorStatusCounter = 0
Expand Down
Loading

0 comments on commit 8ffc237

Please sign in to comment.