Skip to content

Commit

Permalink
Add files for ingester (#940)
Browse files Browse the repository at this point in the history
This commit contains core functionality for asynchronous ingestion, future commits will make use of it.  
Signed-off-by: Prithvi Raj <[email protected]>
  • Loading branch information
vprithvi authored Jul 20, 2018
1 parent 214c133 commit 933efb3
Show file tree
Hide file tree
Showing 26 changed files with 2,045 additions and 6 deletions.
51 changes: 51 additions & 0 deletions cmd/ingester/app/consumer/committing_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumer

import (
"errors"
"io"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
)

type comittingProcessor struct {
processor processor.SpanProcessor
marker offsetMarker
io.Closer
}

type offsetMarker interface {
MarkOffset(int64)
}

// NewCommittingProcessor returns a processor that commits message offsets to Kafka
func NewCommittingProcessor(processor processor.SpanProcessor, marker offsetMarker) processor.SpanProcessor {
return &comittingProcessor{
processor: processor,
marker: marker,
}
}

func (d *comittingProcessor) Process(message processor.Message) error {
if msg, ok := message.(Message); ok {
err := d.processor.Process(message)
if err == nil {
d.marker.MarkOffset(msg.Offset())
}
return err
}
return errors.New("committing processor used with non-kafka message")
}
75 changes: 75 additions & 0 deletions cmd/ingester/app/consumer/committing_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumer

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

kafka "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks"
)

type fakeOffsetMarker struct {
capturedOffset int64
}

func (f *fakeOffsetMarker) MarkOffset(o int64) {
f.capturedOffset = o
}

func TestNewCommittingProcessor(t *testing.T) {
msgOffset := int64(123)
offsetMarker := &fakeOffsetMarker{}
spanProcessor := &mocks.SpanProcessor{}
spanProcessor.On("Process", mock.Anything).Return(nil)
committingProcessor := NewCommittingProcessor(spanProcessor, offsetMarker)

msg := &kafka.Message{}
msg.On("Offset").Return(msgOffset)

assert.NoError(t, committingProcessor.Process(msg))

spanProcessor.AssertExpectations(t)
assert.Equal(t, msgOffset, offsetMarker.capturedOffset)
}

func TestNewCommittingProcessorError(t *testing.T) {
offsetMarker := &fakeOffsetMarker{}
spanProcessor := &mocks.SpanProcessor{}
spanProcessor.On("Process", mock.Anything).Return(errors.New("boop"))
committingProcessor := NewCommittingProcessor(spanProcessor, offsetMarker)
msg := &kafka.Message{}

assert.Error(t, committingProcessor.Process(msg))

spanProcessor.AssertExpectations(t)
assert.Equal(t, int64(0), offsetMarker.capturedOffset)
}

type fakeProcessorMessage struct{}

func (f fakeProcessorMessage) Value() []byte {
return nil
}

func TestNewCommittingProcessorErrorNoKafkaMessage(t *testing.T) {
committingProcessor := NewCommittingProcessor(&mocks.SpanProcessor{}, &fakeOffsetMarker{})

assert.Error(t, committingProcessor.Process(fakeProcessorMessage{}))
}
111 changes: 111 additions & 0 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumer

import (
"io"
"sync"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
)

type consumer struct {
metricsFactory metrics.Factory
logger *zap.Logger
processorFactory processorFactory

close chan struct{}
isClosed sync.WaitGroup

SaramaConsumer
}

// SaramaConsumer is an interface to features of Sarama that we use
type SaramaConsumer interface {
Partitions() <-chan sc.PartitionConsumer
MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
io.Closer
}

func (c *consumer) mainLoop() {
c.isClosed.Add(1)
c.logger.Info("Starting main loop")
go func() {
for {
select {
case pc := <-c.Partitions():
c.isClosed.Add(2)

go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())

case <-c.close:
c.isClosed.Done()
return
}
}
}()
}

func (c *consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler")
defer c.isClosed.Done()
defer c.closePartition(pc)

msgMetrics := c.newMsgMetrics(pc.Partition())
var msgProcessor processor.SpanProcessor

for msg := range pc.Messages() {
c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1)
defer msgProcessor.Close()
}

msgProcessor.Process(&saramaMessageWrapper{msg})
}
}

func (c *consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
}

func (c *consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
c.logger.Info("Starting error handler")
defer c.isClosed.Done()

errMetrics := c.newErrMetrics(partition)
for err := range errChan {
errMetrics.errCounter.Inc(1)
c.logger.Error("Error consuming from Kafka", zap.Error(err))
}
}

func (c *consumer) Close() error {
close(c.close)
c.isClosed.Wait()
return c.SaramaConsumer.Close()
}
46 changes: 46 additions & 0 deletions cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumer

import (
"strconv"

"github.com/uber/jaeger-lib/metrics"
)

type msgMetrics struct {
counter metrics.Counter
offsetGauge metrics.Gauge
lagGauge metrics.Gauge
}

type errMetrics struct {
errCounter metrics.Counter
}

func (c *consumer) newMsgMetrics(partition int32) msgMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return msgMetrics{
counter: f.Counter("messages", nil),
offsetGauge: f.Gauge("current-offset", nil),
lagGauge: f.Gauge("offset-lag", nil),
}
}

func (c *consumer) newErrMetrics(partition int32) errMetrics {
f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return errMetrics{errCounter: f.Counter("errors", nil)}

}
Loading

0 comments on commit 933efb3

Please sign in to comment.