Skip to content

Commit

Permalink
Merge pull request #1462 from mxinden/cut-0.15.1
Browse files Browse the repository at this point in the history
*: Cut 0.15.1
  • Loading branch information
mxinden authored Jul 12, 2018
2 parents 898cfbe + b0cb197 commit 8397de1
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 50 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 0.15.1 / 2018-07-10

* [BUGFIX] Fix email template typo in alert-warning style (#1421)
* [BUGFIX] Fix regression in Pager Duty config (#1455)
* [BUGFIX] Catch templating errors in Wechat Notify (#1436)
* [BUGFIX] Fail when no private address can be found for cluster (#1437)
* [BUGFIX] Make sure we don't miss the first pushPull when joining cluster (#1456)
* [BUGFIX] Fix concurrent read and wirte group error in dispatch (#1447)

## 0.15.0 / 2018-06-22

* [CHANGE] [amtool] Update silence add and update flags (#1298)
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.15.0
0.15.1
9 changes: 7 additions & 2 deletions cluster/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (
"github.com/pkg/errors"
)

type getPrivateIPFunc func() (string, error)

// This is overriden in unit tests to mock the sockaddr.GetPrivateIP function.
var getPrivateAddress getPrivateIPFunc = sockaddr.GetPrivateIP

// calculateAdvertiseAddress attempts to clone logic from deep within memberlist
// (NetTransport.FinalAdvertiseAddr) in order to surface its conclusions to the
// application, so we can provide more actionable error messages if the user has
Expand All @@ -39,12 +44,12 @@ func calculateAdvertiseAddress(bindAddr, advertiseAddr string) (net.IP, error) {
}

if isAny(bindAddr) {
privateIP, err := sockaddr.GetPrivateIP()
privateIP, err := getPrivateAddress()
if err != nil {
return nil, errors.Wrap(err, "failed to get private IP")
}
if privateIP == "" {
return nil, errors.Wrap(err, "no private IP found, explicit advertise addr not provided")
return nil, errors.New("no private IP found, explicit advertise addr not provided")
}
ip := net.ParseIP(privateIP)
if ip == nil {
Expand Down
92 changes: 92 additions & 0 deletions cluster/advertise_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2018 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cluster

import (
"errors"
"net"
"testing"

"github.com/stretchr/testify/require"
)

func TestCalculateAdvertiseAddress(t *testing.T) {
old := getPrivateAddress
defer func() {
getPrivateAddress = old
}()

cases := []struct {
fn getPrivateIPFunc
bind, advertise string

expectedIP net.IP
err bool
}{
{
bind: "192.0.2.1",
advertise: "",

expectedIP: net.ParseIP("192.0.2.1"),
err: false,
},
{
bind: "192.0.2.1",
advertise: "192.0.2.2",

expectedIP: net.ParseIP("192.0.2.2"),
err: false,
},
{
fn: func() (string, error) { return "192.0.2.1", nil },
bind: "0.0.0.0",
advertise: "",

expectedIP: net.ParseIP("192.0.2.1"),
err: false,
},
{
fn: func() (string, error) { return "", errors.New("some error") },
bind: "0.0.0.0",
advertise: "",

err: true,
},
{
fn: func() (string, error) { return "invalid", nil },
bind: "0.0.0.0",
advertise: "",

err: true,
},
{
fn: func() (string, error) { return "", nil },
bind: "0.0.0.0",
advertise: "",

err: true,
},
}

for _, c := range cases {
getPrivateAddress = c.fn
got, err := calculateAdvertiseAddress(c.bind, c.advertise)
if c.err {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, c.expectedIP.String(), got.String())
}
}
}
33 changes: 21 additions & 12 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Peer struct {
mlist *memberlist.Memberlist
delegate *delegate

resolvedPeers []string

mtx sync.RWMutex
states map[string]State
stopc chan struct{}
Expand Down Expand Up @@ -98,7 +100,7 @@ const (
maxGossipPacketSize = 1400
)

func Join(
func Create(
l log.Logger,
reg prometheus.Registerer,
bindAddr string,
Expand All @@ -110,8 +112,6 @@ func Join(
tcpTimeout time.Duration,
probeTimeout time.Duration,
probeInterval time.Duration,
reconnectInterval time.Duration,
reconnectTimeout time.Duration,
) (*Peer, error) {
bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
if err != nil {
Expand Down Expand Up @@ -164,11 +164,12 @@ func Join(
}

p := &Peer{
states: map[string]State{},
stopc: make(chan struct{}),
readyc: make(chan struct{}),
logger: l,
peers: map[string]peer{},
states: map[string]State{},
stopc: make(chan struct{}),
readyc: make(chan struct{}),
logger: l,
peers: map[string]peer{},
resolvedPeers: resolvedPeers,
}

p.register(reg)
Expand Down Expand Up @@ -207,12 +208,20 @@ func Join(
return nil, errors.Wrap(err, "create memberlist")
}
p.mlist = ml
return p, nil
}

n, err := ml.Join(resolvedPeers)
func (p *Peer) Join(
reconnectInterval time.Duration,
reconnectTimeout time.Duration) error {
n, err := p.mlist.Join(p.resolvedPeers)
if err != nil {
level.Warn(l).Log("msg", "failed to join cluster", "err", err)
level.Warn(p.logger).Log("msg", "failed to join cluster", "err", err)
if reconnectInterval != 0 {
level.Info(p.logger).Log("msg", fmt.Sprintf("will retry joining cluster every %v", reconnectInterval.String()))
}
} else {
level.Debug(l).Log("msg", "joined cluster", "peers", n)
level.Debug(p.logger).Log("msg", "joined cluster", "peers", n)
}

if reconnectInterval != 0 {
Expand All @@ -222,7 +231,7 @@ func Join(
go p.handleReconnectTimeout(5*time.Minute, reconnectTimeout)
}

return p, nil
return err
}

// All peers are initially added to the failed list. They will be removed from
Expand Down
42 changes: 30 additions & 12 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func TestJoinLeave(t *testing.T) {
logger := log.NewNopLogger()
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
Expand All @@ -38,19 +38,22 @@ func TestJoinLeave(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)
require.False(t, p.Ready())
require.Equal(t, p.Status(), "settling")
go p.Settle(context.Background(), 0*time.Second)
p.WaitReady()
require.Equal(t, p.Status(), "ready")

// Create the peer who joins the first.
p2, err := Join(
p2, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
Expand All @@ -62,11 +65,14 @@ func TestJoinLeave(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p2)
err = p2.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p2)
go p2.Settle(context.Background(), 0*time.Second)

require.Equal(t, 2, p.ClusterSize())
Expand All @@ -79,7 +85,7 @@ func TestJoinLeave(t *testing.T) {

func TestReconnect(t *testing.T) {
logger := log.NewNopLogger()
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
Expand All @@ -91,15 +97,18 @@ func TestReconnect(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)
go p.Settle(context.Background(), 0*time.Second)
p.WaitReady()

p2, err := Join(
p2, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
Expand All @@ -111,11 +120,14 @@ func TestReconnect(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p2)
err = p2.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p2)
go p2.Settle(context.Background(), 0*time.Second)
p2.WaitReady()

Expand All @@ -134,7 +146,7 @@ func TestReconnect(t *testing.T) {

func TestRemoveFailedPeers(t *testing.T) {
logger := log.NewNopLogger()
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
Expand All @@ -146,11 +158,14 @@ func TestRemoveFailedPeers(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)
n := p.Self()

now := time.Now()
Expand Down Expand Up @@ -180,7 +195,7 @@ func TestInitiallyFailingPeers(t *testing.T) {
logger := log.NewNopLogger()
myAddr := "1.2.3.4:5000"
peerAddrs := []string{myAddr, "2.3.4.5:5000", "3.4.5.6:5000", "foo.example.com:5000"}
p, err := Join(
p, err := Create(
logger,
prometheus.NewRegistry(),
"0.0.0.0:0",
Expand All @@ -192,11 +207,14 @@ func TestInitiallyFailingPeers(t *testing.T) {
DefaultTcpTimeout,
DefaultProbeTimeout,
DefaultProbeInterval,
)
require.NoError(t, err)
require.NotNil(t, p)
err = p.Join(
DefaultReconnectInterval,
DefaultReconnectTimeout,
)
require.NoError(t, err)
require.NotNil(t, p)

p.setInitialFailed(peerAddrs, myAddr)

Expand Down
4 changes: 3 additions & 1 deletion cluster/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (d *delegate) NotifyMsg(b []byte) {
level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
return
}

s, ok := d.states[p.Key]
if !ok {
return
Expand All @@ -160,6 +161,7 @@ func (d *delegate) LocalState(_ bool) []byte {
all := &clusterpb.FullState{
Parts: make([]clusterpb.Part, 0, len(d.states)),
}

for key, s := range d.states {
b, err := s.MarshalBinary()
if err != nil {
Expand Down Expand Up @@ -189,10 +191,10 @@ func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
}
d.mtx.RLock()
defer d.mtx.RUnlock()

for _, p := range fs.Parts {
s, ok := d.states[p.Key]
if !ok {
level.Warn(d.logger).Log("received", "unknown state key", "len", len(buf), "key", p.Key)
continue
}
if err := s.Merge(p.Data); err != nil {
Expand Down
Loading

0 comments on commit 8397de1

Please sign in to comment.