Skip to content

Commit

Permalink
Merge pull request #794 from rfratto/forward-ingester-chunks
Browse files Browse the repository at this point in the history
ingester: support chunk transfers on ingester shutdown.
  • Loading branch information
rfratto authored Jul 26, 2019
2 parents 4a569f2 + e64c7e5 commit d21c30d
Show file tree
Hide file tree
Showing 12 changed files with 1,796 additions and 108 deletions.
5 changes: 3 additions & 2 deletions pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ func NewMemChunk(enc Encoding) *MemChunk {
// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
bc := &MemChunk{
cPool: &Gzip,
head: &headBlock{}, // Dummy, empty headblock.
cPool: &Gzip,
encoding: EncGZIP,
head: &headBlock{}, // Dummy, empty headblock.
}

db := decbuf{b: b}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) {
return struct {
logproto.PusherClient
logproto.QuerierClient
logproto.IngesterClient
grpc_health_v1.HealthClient
io.Closer
}{
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
Closer: conn,
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
IngesterClient: logproto.NewIngesterClient(conn),
Closer: conn,
}, nil
}
3 changes: 2 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -61,7 +62,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
chunks: map[string][]chunk.Chunk{},
}

ing, err := New(cfg, store)
ing, err := New(cfg, client.Config{}, store)
require.NoError(t, err)

return store, ing
Expand Down
47 changes: 29 additions & 18 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
)

// ErrReadOnly is returned when the ingester is shutting down and a push was
// attempted.
var ErrReadOnly = errors.New("Ingester is shutting down")

var readinessProbeSuccess = []byte("Ready")

var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
Expand All @@ -31,18 +36,25 @@ var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
type Config struct {
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

// Config for transferring chunks.
MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
}

// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.")
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushed", 16, "")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "")
Expand All @@ -53,10 +65,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// Ingester builds chunks for incoming log streams.
type Ingester struct {
cfg Config
cfg Config
clientConfig client.Config

instancesMtx sync.RWMutex
instances map[string]*instance
readonly bool

lifecycler *ring.Lifecycler
store ChunkStore
Expand All @@ -77,14 +91,19 @@ type ChunkStore interface {
}

// New makes a new Ingester.
func New(cfg Config, store ChunkStore) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store ChunkStore) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}

i := &Ingester{
cfg: cfg,
instances: map[string]*instance{},
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
}

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
Expand Down Expand Up @@ -138,21 +157,13 @@ func (i *Ingester) Stopping() {
}
}

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {

}

// TransferOut implements ring.Lifecycler.
func (i *Ingester) TransferOut(context.Context) error {
return nil
}

// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
} else if i.readonly {
return nil, ErrReadOnly
}

instance := i.getOrCreateInstance(instanceID)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
Expand All @@ -21,7 +22,7 @@ func TestIngester(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, store)
i, err := New(ingesterConfig, client.Config{}, store)
require.NoError(t, err)
defer i.Shutdown()

Expand Down
19 changes: 19 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ func newInstance(instanceID string, blockSize int) *instance {
}
}

// consumeChunk manually adds a chunk that was received during ingester chunk
// transfer.
func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapter, chunk *logproto.Chunk) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

fp := client.FastFingerprint(labels)
stream, ok := i.streams[fp]
if !ok {
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
}

return stream.consumeChunk(ctx, chunk)
}

func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
Expand Down
15 changes: 15 additions & 0 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int
}
}

// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data)
if err != nil {
return err
}

s.chunks = append(s.chunks, chunkDesc{
chunk: c,
})
chunksCreatedTotal.Inc()
return nil
}

func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{
Expand Down
Loading

0 comments on commit d21c30d

Please sign in to comment.