Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom address broadcasting for ringpop to work in k8s #6288

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,14 @@ func (r *ring) emitHashIdentifier() float64 {
// in-case it overflowed something. The number itself is meaningless, so additional precision
// doesn't really give any advantage, besides reducing the risk of collision
trimmedForMetric := float64(hashedView % 1000)
r.logger.Debug("Hashring view", tag.Dynamic("hashring-view", sb.String()), tag.Dynamic("trimmed-hash-id", trimmedForMetric), tag.Service(r.service))
r.logger.Debug("Hashring view",
tag.Dynamic("hashring-view", sb.String()),
tag.Dynamic("trimmed-hash-id", trimmedForMetric),
tag.Service(r.service),
tag.Dynamic("self-addr", self.addr),
tag.Dynamic("self-identity", self.identity),
tag.Dynamic("self-ip", self.ip),
)
r.scope.Tagged(
metrics.ServiceTag(r.service),
metrics.HostTag(self.identity),
Expand Down
4 changes: 4 additions & 0 deletions common/peerprovider/ringpopprovider/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ const (
type Config struct {
// Name to be used in ringpop advertisement
Name string `yaml:"name" validate:"nonzero"`
// BroadcastAddress is communicated with peers to connect to this container/host.
// This is useful when running cadence in K8s and the containers need to listen on 0.0.0.0 but advertise their pod IP to peers.
// If not set, the listen address will be used as broadcast address.
BroadcastAddress string `yaml:"broadcastAddress"`
// BootstrapMode is a enum that defines the ringpop bootstrap method, currently supports: hosts, files, custom, dns, and dns-srv
BootstrapMode BootstrapMode `yaml:"bootstrapMode"`
// BootstrapHosts is a list of seed hosts to be used for ringpop bootstrap
Expand Down
2 changes: 2 additions & 0 deletions common/peerprovider/ringpopprovider/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (s *RingpopSuite) TestDNSMode() {
s.Nil(err)
s.Equal("test", cfg.Name)
s.Equal(BootstrapModeDNS, cfg.BootstrapMode)
s.Equal("10.66.1.71", cfg.BroadcastAddress)
s.Nil(cfg.validate())
logger := testlogger.New(s.T())

Expand Down Expand Up @@ -297,6 +298,7 @@ maxJoinDuration: 30s`
func getDNSConfig() string {
return `name: "test"
bootstrapMode: "dns"
broadcastAddress: "10.66.1.71"
bootstrapHosts:
- example.net:1111
- example.net:1112
Expand Down
34 changes: 30 additions & 4 deletions common/peerprovider/ringpopprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,17 @@
DiscoverProvider: discoveryProvider,
}

rp, err := ringpop.New(config.Name, ringpop.Channel(channel.(*tcg.Channel)))
ch := channel.(*tcg.Channel)
opts := []ringpop.Option{ringpop.Channel(ch)}
if config.BroadcastAddress != "" {
broadcastIP := net.ParseIP(config.BroadcastAddress)
if broadcastIP == nil {
return nil, fmt.Errorf("failed parsing broadcast address %q, err: %w", config.BroadcastAddress, err)

Check warning on line 93 in common/peerprovider/ringpopprovider/provider.go

View check run for this annotation

Codecov / codecov/patch

common/peerprovider/ringpopprovider/provider.go#L93

Added line #L93 was not covered by tests
}
logger.Info("Initializing ringpop with custom broadcast address", tag.Address(broadcastIP.String()))
opts = append(opts, ringpop.AddressResolverFunc(broadcastAddrResolver(ch, broadcastIP)))
}
rp, err := ringpop.New(config.Name, opts...)
if err != nil {
return nil, fmt.Errorf("ringpop instance creation: %w", err)
}
Expand Down Expand Up @@ -148,9 +158,7 @@
}

// HandleEvent handles updates from ringpop
func (r *Provider) HandleEvent(
event events.Event,
) {
func (r *Provider) HandleEvent(event events.Event) {
// We only care about RingChangedEvent
e, ok := event.(events.RingChangedEvent)
if !ok {
Expand Down Expand Up @@ -300,3 +308,21 @@
}
return uint16(port), nil
}

func broadcastAddrResolver(ch *tcg.Channel, broadcastIP net.IP) func() (string, error) {
return func() (string, error) {
peerInfo := ch.PeerInfo()
if peerInfo.IsEphemeralHostPort() {
// not listening yet
return "", ringpop.ErrEphemeralAddress

Check warning on line 317 in common/peerprovider/ringpopprovider/provider.go

View check run for this annotation

Codecov / codecov/patch

common/peerprovider/ringpopprovider/provider.go#L317

Added line #L317 was not covered by tests
}

_, port, err := net.SplitHostPort(peerInfo.HostPort)
if err != nil {
return "", fmt.Errorf("failed splitting tchannel's hostport %q, err: %w", peerInfo.HostPort, err)

Check warning on line 322 in common/peerprovider/ringpopprovider/provider.go

View check run for this annotation

Codecov / codecov/patch

common/peerprovider/ringpopprovider/provider.go#L322

Added line #L322 was not covered by tests
}

// return broadcast_ip:listen_port
return net.JoinHostPort(broadcastIP.String(), port), nil
}
}
188 changes: 113 additions & 75 deletions common/peerprovider/ringpopprovider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,114 +23,152 @@
package ringpopprovider

import (
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/pborman/uuid"
"github.com/uber/ringpop-go"
"github.com/uber/ringpop-go/discovery/statichosts"
"github.com/uber/ringpop-go/swim"
"github.com/uber/tchannel-go"
"go.uber.org/goleak"

"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/membership"
)

type HostInfo struct {
addr string
type srvAndCh struct {
service string
ch *tchannel.Channel
provider *Provider
}

// TestRingpopCluster is a type that represents a test ringpop cluster
type TestRingpopCluster struct {
hostUUIDs []string
hostAddrs []string
hostInfoList []HostInfo
func TestRingpopProvider(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t,
// ignore the goroutines leaked by ringpop library
goleak.IgnoreTopFunction("github.com/rcrowley/go-metrics.(*meterArbiter).tick"),
goleak.IgnoreTopFunction("github.com/uber/ringpop-go.(*Ringpop).startTimers.func1"),
goleak.IgnoreTopFunction("github.com/uber/ringpop-go.(*Ringpop).startTimers.func2"))
})
logger := testlogger.New(t)
serviceName := "matching"

channels []*tchannel.Channel
seedNode string
}
matchingChs, cleanupMatchingChs, err := createAndListenChannels(serviceName, 3)
t.Cleanup(cleanupMatchingChs)
if err != nil {
t.Fatalf("Failed to create and listen on channels: %v", err)
}

// NewTestRingpopCluster creates a new test cluster with the given name and cluster size
// All the nodes in the test cluster will register themselves in Ringpop
// with the specified name. This is only intended for unit tests.
func NewTestRingpopCluster(t *testing.T, ringPopApp string, size int, ipAddr string, seed string, serviceName string) *TestRingpopCluster {
irrelevantChs, cleanupIrrelevantChs, err := createAndListenChannels("random-svc", 1)
t.Cleanup(cleanupIrrelevantChs)
if err != nil {
t.Fatalf("Failed to create and listen on channels: %v", err)
}

logger := testlogger.New(t)
cluster := &TestRingpopCluster{
hostUUIDs: make([]string, size),
hostAddrs: make([]string, size),
channels: make([]*tchannel.Channel, size),
seedNode: seed,
allServicesAndChs := append(matchingChs, irrelevantChs...)
cfg := Config{
BootstrapMode: BootstrapModeHosts,
Name: "ring",
BootstrapHosts: toHosts(t, allServicesAndChs),
MaxJoinDuration: 10 * time.Second,
}

for i := 0; i < size; i++ {
var err error
cluster.channels[i], err = tchannel.NewChannel(ringPopApp, nil)
if err != nil {
logger.Error("Failed to create tchannel", tag.Error(err))
return nil
t.Logf("Config: %+v", cfg)

// start ringpop provider for each channel
var wg sync.WaitGroup
for i, svc := range allServicesAndChs {
svc := svc
cfg := cfg
if i == 0 {
// set broadcast address for the first provider to test that path
cfg.BroadcastAddress = "127.0.0.1"
}
listenAddr := ipAddr + ":0"
err = cluster.channels[i].ListenAndServe(listenAddr)
p, err := New(svc.service, &cfg, svc.ch, membership.PortMap{}, logger)
if err != nil {
logger.Error("tchannel listen failed", tag.Error(err))
return nil
t.Fatalf("Failed to create ringpop provider: %v", err)
}
cluster.hostUUIDs[i] = uuid.New()
cluster.hostAddrs[i] = cluster.channels[i].PeerInfo().HostPort
cluster.hostInfoList[i] = HostInfo{addr: cluster.hostAddrs[i]}

svc.provider = p

wg.Add(1)
go func() {
defer wg.Done()
p.Start()
}()
t.Cleanup(p.Stop)
}

// if seed node is already supplied, use it; if not, set it
if cluster.seedNode == "" {
cluster.seedNode = cluster.hostAddrs[0]
t.Logf("Waiting for %d ringpop providers to start", len(matchingChs)+len(irrelevantChs))
wg.Wait()

sleep := 5 * time.Second
t.Logf("Sleeping for %d seconds for ring to update", int(sleep.Seconds()))
time.Sleep(sleep)

provider := matchingChs[0].provider
hostInfo, err := provider.WhoAmI()
if err != nil {
t.Fatalf("Failed to get who am I: %v", err)
}
logger.Info(fmt.Sprintf("seedNode: %v", cluster.seedNode))
bOptions := new(swim.BootstrapOptions)
bOptions.DiscoverProvider = statichosts.New(cluster.seedNode) // just use the first addr as the seed
bOptions.MaxJoinDuration = time.Duration(time.Second * 2)
bOptions.JoinSize = 1

for i := 0; i < size; i++ {
ringPop, err := ringpop.New(ringPopApp, ringpop.Channel(cluster.channels[i]))
if err != nil {
logger.Error("failed to create ringpop instance", tag.Error(err))
return nil
}

NewRingpopProvider(ringPopApp, ringPop, membership.PortMap{}, bOptions, logger)
t.Logf("Who am I: %+v", hostInfo.GetAddress())

members, err := provider.GetMembers(serviceName)
if err != nil {
t.Fatalf("Failed to get members: %v", err)
}
return cluster
}

// GetSeedNode returns the seedNode for this cluster
func (c *TestRingpopCluster) GetSeedNode() string {
return c.seedNode
}
if len(members) != 3 {
t.Fatalf("Expected 3 members, got %v", len(members))
}

// GetHostInfoList returns the list of all hosts within the cluster
func (c *TestRingpopCluster) GetHostInfoList() []HostInfo {
return c.hostInfoList
// Evict one of the providers and make sure it's removed from the ring
matchingChs[1].provider.SelfEvict()
t.Logf("A peer is evicted. Sleeping for %d seconds for ring to update", int(sleep.Seconds()))
time.Sleep(sleep)

members, err = provider.GetMembers(serviceName)
if err != nil {
t.Fatalf("Failed to get members: %v", err)
}

if len(members) != 2 {
t.Fatalf("Expected 2 members, got %v", len(members))
}
}

// GetHostAddrs returns all host addrs within the cluster
func (c *TestRingpopCluster) GetHostAddrs() []string {
return c.hostAddrs
func createAndListenChannels(serviceName string, n int) ([]*srvAndCh, func(), error) {
var res []*srvAndCh
cleanupFn := func(srvs []*srvAndCh) func() {
return func() {
for _, s := range srvs {
s.ch.Close()
}
}
}
for i := 0; i < n; i++ {
ch, err := tchannel.NewChannel(serviceName, nil)
if err != nil {
return nil, cleanupFn(res), err
}

if err := ch.ListenAndServe("localhost:0"); err != nil {
return nil, cleanupFn(res), err
}

res = append(res, &srvAndCh{service: serviceName, ch: ch})
}
return res, cleanupFn(res), nil
}

// FindHostByAddr returns the host info corresponding to
// the given addr, if it exists
func (c *TestRingpopCluster) FindHostByAddr(addr string) (HostInfo, bool) {
for _, hi := range c.hostInfoList {
if strings.Compare(hi.addr, addr) == 0 {
return hi, true
func toHosts(t *testing.T, allServicesAndChs []*srvAndCh) []string {
var hosts []string
for _, svc := range allServicesAndChs {
if svc.ch.PeerInfo().IsEphemeralHostPort() {
t.Fatalf("Channel %v is not listening on a port", svc.ch)
}
hosts = append(hosts, svc.ch.PeerInfo().HostPort)
}
return HostInfo{}, false
return hosts
}

func TestLabelToPort(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions docker/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ persistence:

ringpop:
name: cadence
broadcastAddress: {{ default .Env.BROADCAST_ADDRESS "" }}
bootstrapMode: {{ default .Env.RINGPOP_BOOTSTRAP_MODE "hosts" }}
{{- if .Env.RINGPOP_SEEDS }}
bootstrapHosts:
Expand Down
Loading