Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ring handler on lifecycler #112

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
* [ENHANCEMENT] Trigger metrics update on ring changes instead of doing it periodically to speed up tests that wait for certain metrics. #107
* [ENHANCEMENT] Ring: Add ring page handler to BasicLifecycler and Lifecycler. #112
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
18 changes: 18 additions & 0 deletions ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ring
import (
"context"
"fmt"
"net/http"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -491,3 +492,20 @@ func (l *BasicLifecycler) run(fn func() error) error {
return <-errCh
}
}

func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return l.store.CAS(ctx, l.ringKey, f)
}

func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) {
obj, err := l.store.Get(ctx, l.ringKey)
if err != nil {
return nil, err
}

return GetOrCreateRingDesc(obj), nil
}

func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req)
}
73 changes: 48 additions & 25 deletions ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"sort"
"strings"
"time"

"github.com/go-kit/log/level"
)

const pageContent = `
Expand Down Expand Up @@ -90,19 +88,6 @@ func init() {
pageTemplate = template.Must(t.Parse(pageContent))
}

func (r *Ring) forget(ctx context.Context, id string) error {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}

ringDesc := in.(*Desc)
ringDesc.RemoveIngester(id)
return ringDesc, true, nil
}
return r.KVClient.CAS(ctx, r.key, unregister)
}

type ingesterDesc struct {
ID string `json:"id"`
State string `json:"state"`
Expand All @@ -121,11 +106,33 @@ type httpResponse struct {
ShowTokens bool `json:"-"`
}

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
type ringAccess interface {
casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error
getRing(context.Context) (*Desc, error)
}

type ringPageHandler struct {
r ringAccess
heartbeatPeriod time.Duration
}

func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler {
return &ringPageHandler{
r: r,
heartbeatPeriod: heartbeatPeriod,
}
}

func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
ingesterID := req.FormValue("forget")
if err := r.forget(req.Context(), ingesterID); err != nil {
level.Error(r.logger).Log("msg", "error forgetting instance", "err", err)
if err := h.forget(req.Context(), ingesterID); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: it would be nice if this error was reported to the user, instead of (or in addition to) being logged. (If we removed logging here, we could remove logger completely)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have removed it in 13169e4.

I guess there is no riks of exposing additional information to the HTTP caller, as the caller has to have quite a lot of access to the cluster anyhow, if they are able to remove ingesters.

http.Error(
w,
fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(),
http.StatusInternalServerError,
)
return
}

// Implement PRG pattern to prevent double-POST and work with CSRF middleware.
Expand All @@ -140,23 +147,26 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

r.mtx.RLock()
defer r.mtx.RUnlock()
ringDesc, err := h.r.getRing(req.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, ownedTokens := ringDesc.countTokens()

ingesterIDs := []string{}
for id := range r.ringDesc.Ingesters {
for id := range ringDesc.Ingesters {
ingesterIDs = append(ingesterIDs, id)
}
sort.Strings(ingesterIDs)

now := time.Now()
var ingesters []ingesterDesc
_, owned := r.countTokens()
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
ing := ringDesc.Ingesters[id]
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(&ing, Reporting, now) {
if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) {
state = unhealthy
}

Expand All @@ -175,7 +185,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Tokens: ing.Tokens,
Zone: ing.Zone,
NumTokens: len(ing.Tokens),
Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100,
Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100,
})
}

Expand Down Expand Up @@ -203,6 +213,19 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ
}
}

func (h *ringPageHandler) forget(ctx context.Context, id string) error {
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
}

ringDesc := in.(*Desc)
ringDesc.RemoveIngester(id)
return ringDesc, true, nil
}
return h.r.casRing(ctx, unregister)
}

// WriteJSONResponse writes some JSON as a HTTP response.
func writeJSONResponse(w http.ResponseWriter, v httpResponse) {
w.Header().Set("Content-Type", "application/json")
Expand Down
18 changes: 18 additions & 0 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -849,6 +850,23 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
time.Sleep(i.cfg.FinalSleep)
}

func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return i.KVStore.CAS(ctx, i.RingKey, f)
}

func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) {
obj, err := i.KVStore.Get(ctx, i.RingKey)
if err != nil {
return nil, err
}

return GetOrCreateRingDesc(obj), nil
}

func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req)
}

// unregister removes our entry from consul.
func (i *Lifecycler) unregister(ctx context.Context) error {
level.Debug(i.logger).Log("msg", "unregistering instance from ring", "ring", i.RingName)
Expand Down
46 changes: 35 additions & 11 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"fmt"
"math"
"math/rand"
"net/http"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -513,27 +515,32 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}

// countTokens returns the number of tokens and tokens within the range for each instance.
// The ring read lock must be already taken when calling this function.
func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
owned := map[string]uint32{}
numTokens := map[string]uint32{}
for i, token := range r.ringTokens {
func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) {
var (
owned = map[string]uint32{}
numTokens = map[string]uint32{}

ringTokens = r.GetTokens()
ringInstanceByToken = r.getTokensInfo()
)

for i, token := range ringTokens {
var diff uint32

// Compute how many tokens are within the range.
if i+1 == len(r.ringTokens) {
diff = (math.MaxUint32 - token) + r.ringTokens[0]
if i+1 == len(ringTokens) {
diff = (math.MaxUint32 - token) + ringTokens[0]
} else {
diff = r.ringTokens[i+1] - token
diff = ringTokens[i+1] - token
}

info := r.ringInstanceByToken[token]
info := ringInstanceByToken[token]
numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1
owned[info.InstanceID] = owned[info.InstanceID] + diff
}

// Set to 0 the number of owned tokens by instances which don't have tokens yet.
for id := range r.ringDesc.Ingesters {
for id := range r.Ingesters {
if _, ok := owned[id]; !ok {
owned[id] = 0
numTokens[id] = 0
Expand Down Expand Up @@ -582,7 +589,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {

prevOwners := r.reportedOwners
r.reportedOwners = make(map[string]struct{})
numTokens, ownedRange := r.countTokens()
numTokens, ownedRange := r.ringDesc.countTokens()
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
Expand Down Expand Up @@ -840,6 +847,23 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) {
}
}

func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return r.KVClient.CAS(ctx, r.key, f)
}

func (r *Ring) getRing(ctx context.Context) (*Desc, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

ringDesc := proto.Clone(r.ringDesc).(*Desc)

return ringDesc, nil
}

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req)
}

// Operation describes which instances can be included in the replica set, based on their state.
//
// Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.
Expand Down