Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Go Substrate Client - Connection Failover Implementation #1022

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/010_build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
build-and-test:
runs-on: [self-hosted, tfchainrunner01]
container:
image: threefolddev/tfchain:4
image: threefolddev/tfchain:5
env:
DEBIAN_FRONTEND: noninteractive
PATH: /root/.cargo/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/go/bin
Expand Down
11 changes: 6 additions & 5 deletions .github/workflows/020_lint_and_test_go_client.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"
cache: false
# cache-dependency-path: clients/tfchain-client-go/go.sum
id: go

- name: golangci-lint
uses: golangci/golangci-lint-action@v3.7.0
uses: golangci/golangci-lint-action@v6
with:
args: --timeout 3m --verbose
working-directory: clients/tfchain-client-go

- name: staticcheck
uses: dominikh/staticcheck-action@v1.3.0
uses: dominikh/staticcheck-action@v1
with:
version: "2022.1.3"
version: "latest"
install-go: false
working-directory: clients/tfchain-client-go
env:
GO111MODULE: on
Expand All @@ -44,4 +45,4 @@ jobs:
uses: Jerome1337/[email protected]
with:
gofmt-path: './clients/tfchain-client-go'
gofmt-flags: "-l -d"
gofmt-flags: "-l -d"
6 changes: 3 additions & 3 deletions .github/workflows/build_test.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu:20.04
FROM ubuntu:22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt update && \
apt install -y \
Expand All @@ -16,8 +16,8 @@ RUN apt update && \
zstd \
wget \
protobuf-compiler && \
wget https://go.dev/dl/go1.20.2.linux-amd64.tar.gz && \
tar -xvf go1.20.2.linux-amd64.tar.gz && \
wget https://go.dev/dl/go1.21.13.linux-amd64.tar.gz && \
tar -xvf go1.21.13.linux-amd64.tar.gz && \
mv go /usr/local && \
echo "GOPATH=/usr/local/go" >> ~/.bashrc && \
echo "PATH=\$PATH:\$GOPATH/bin" >> ~/.bashrc && \
Expand Down
35 changes: 0 additions & 35 deletions clients/tfchain-client-go/.github/workflows/lint.yml

This file was deleted.

24 changes: 0 additions & 24 deletions clients/tfchain-client-go/.github/workflows/test.yml

This file was deleted.

70 changes: 58 additions & 12 deletions clients/tfchain-client-go/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
ErrUnknownVersion = fmt.Errorf("unknown version")
//ErrNotFound is returned if an object is not found
ErrNotFound = fmt.Errorf("object not found")
//ErrClosed is returned if the client is closed
ErrClosed = fmt.Errorf("client closed")
)

// Versioned base for all types
Expand Down Expand Up @@ -87,7 +89,7 @@ func (p *mgrImpl) Substrate() (*Substrate, error) {
return nil, err
}

return newSubstrate(cl, meta, p.put)
return newSubstrate(cl, meta, p.put, p.connect)
}

// Raw returns a RPC substrate client. plus meta. The returned connection
Expand All @@ -105,7 +107,7 @@ func (p *mgrImpl) Raw() (Conn, Meta, error) {

boff := backoff.WithMaxRetries(
backoff.NewConstantBackOff(200*time.Millisecond),
2*uint64(len(p.urls)),
4*uint64(len(p.urls)),
)

var (
Expand Down Expand Up @@ -145,36 +147,80 @@ func (p *mgrImpl) Raw() (Conn, Meta, error) {
return cl, meta, err
}

// connect connects to the next endpoint in roundrobin fashion
// and replaces the current connection with the new one.
// need to be called while lock is acquired.
func (p *mgrImpl) connect(s *Substrate) error {
cl, meta, err := p.Raw()
if err != nil {
return err
}
// close the old connection if it exists
if s.cl != nil {
s.cl.Client.Close()
log.Info().Str("url", s.cl.Client.URL()).Msg("unhealthy connection closed")
}
// set the new connection
s.cl = cl
s.meta = meta
log.Info().Str("url", s.cl.Client.URL()).Msg("connection restored")
return nil
}

// TODO: implement reusable connections instead of
// closing the connection.
func (p *mgrImpl) put(cl *Substrate) {
func (p *mgrImpl) put(s *Substrate) {
// naive put implementation for now
// we just immediately kill the connection
if cl.cl != nil {
cl.cl.Client.Close()
if s.cl != nil {
s.cl.Client.Close()
}
cl.cl = nil
cl.meta = nil
s.cl = nil
s.meta = nil
}

// Substrate client
type Substrate struct {
cl Conn
meta Meta
mu sync.Mutex
cl Conn
meta Meta
closed bool

close func(s *Substrate)
close func(s *Substrate)
connect func(s *Substrate) error
}

// NewSubstrate creates a substrate client
func newSubstrate(cl Conn, meta Meta, close func(*Substrate)) (*Substrate, error) {
return &Substrate{cl: cl, meta: meta, close: close}, nil
func newSubstrate(cl Conn, meta Meta, close func(*Substrate), connect func(s *Substrate) error) (*Substrate, error) {
return &Substrate{cl: cl, meta: meta, close: close, connect: connect}, nil
}

func (s *Substrate) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.close(s)
s.closed = true
}

func (s *Substrate) GetClient() (Conn, Meta, error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return nil, nil, ErrClosed
}

// check if connection is healthy
if _, err := getTime(s.cl, s.meta); err != nil {
log.Info().Str("url", s.cl.Client.URL()).Msg("connection unhealthy, attempting failover")
err := s.connect(s)
if err != nil {
return nil, nil, err // all attempts failed, no connection available
}
}
return s.cl, s.meta, nil
}

Expand Down
118 changes: 118 additions & 0 deletions clients/tfchain-client-go/impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package substrate

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFailoverMechanism(t *testing.T) {
t.Run("should failover to next URL when current node is unhealthy", func(t *testing.T) {
// Create manager with multiple URLs
urls := []string{"ws://fail1", getUrlBasedOnEnv()}
mgr := NewManager(urls...)

// Get initial substrate client
sub, err := mgr.Substrate()
require.NoError(t, err)
defer sub.Close()

// Store initial Client
initialClient := sub.cl.Client

// Force connection to become unhealthy by closing it
sub.cl.Client.Close()

// Try to use the connection - should trigger failover
_, err = sub.Time()
require.NoError(t, err)

// Check that we're now using a different URL
newClient := sub.cl.Client
assert.NotEqual(t, initialClient, newClient)
})

t.Run("should try all URLs in rotation", func(t *testing.T) {
urls := []string{
"ws://fail1",
"ws://fail2",
getUrlBasedOnEnv(),
}

mgr := NewManager(urls...)
sub, err := mgr.Substrate()
require.NoError(t, err)
defer sub.Close()

// The final URL should be the working one
assert.Equal(t, getUrlBasedOnEnv(), sub.cl.Client.URL())
})

t.Run("should reuse connection if healthy", func(t *testing.T) {
sub := startLocalConnection(t)
defer sub.Close()

initialClient := sub.cl.Client

// Use the connection multiple times
for i := 0; i < 3; i++ {
_, err := sub.Time()
require.NoError(t, err)
assert.Equal(t, initialClient, sub.cl.Client)
}
})

t.Run("should handle all nodes being down", func(t *testing.T) {
urls := []string{"ws://fail1", "ws://fail2"}
mgr := NewManager(urls...)
_, err := mgr.Substrate()
assert.Error(t, err)
})

t.Run("should handle concurrent failover attempts", func(t *testing.T) {
urls := []string{getUrlBasedOnEnv(), getUrlBasedOnEnv()}
mgr := NewManager(urls...)
sub1, err := mgr.Substrate()
require.NoError(t, err)
defer sub1.Close()

sub2, err := mgr.Substrate()
require.NoError(t, err)
defer sub2.Close()

// Force both connections to fail
sub1.cl.Client.Close()
sub2.cl.Client.Close()

// Create WaitGroup to ensure all goroutines complete before test ends
var wg sync.WaitGroup
wg.Add(2)

// Try to use both connections concurrently
errs := make(chan error, 2)
go func() {
defer wg.Done()
_, err := sub1.Time()
errs <- err
}()

go func() {
defer wg.Done()
_, err := sub2.Time()
errs <- err
}()

// Wait for both operations to complete
go func() {
wg.Wait()
close(errs)
}()

// Check errors from both goroutines
for err := range errs {
require.NoError(t, err)
}
})
}
3 changes: 0 additions & 3 deletions clients/tfchain-client-go/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ func (s *Substrate) Transfer(identity Identity, amount uint64, destination Accou
bal := big.NewInt(int64(amount))

c, err := types.NewCall(meta, "Balances.transfer", dest, types.NewUCompact(bal))
if err != nil {
panic(err)
}

if err != nil {
return errors.Wrap(err, "failed to create call")
Expand Down
8 changes: 8 additions & 0 deletions clients/tfchain-client-go/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func startLocalConnection(t *testing.T) *Substrate {
return cl
}

func getUrlBasedOnEnv() string {
if _, ok := os.LookupEnv("CI"); ok {
return "ws://127.0.0.1:9944"
} else {
return "wss://tfchain.dev.grid.tf"
}
}

func assertCreateTwin(t *testing.T, cl *Substrate, user AccountUser) uint32 {
u := Accounts[user]

Expand Down