Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Initial kafka integration on outputhost #134

Merged
merged 15 commits into from
Apr 17, 2017
2 changes: 1 addition & 1 deletion cmd/servicecmd/servicestartcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func StartOutputHostService() {
// and since we aren't using thrift anyway. We are selfish with our Frontend.
frontendhost, _ := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, meta, cfg)

h, tc := outputhost.NewOutputHost(serviceName, sCommon, meta, frontendhost, nil)
h, tc := outputhost.NewOutputHost(serviceName, sCommon, meta, frontendhost, nil, cfg.GetKafkaConfig())
h.Start(tc)

// start websocket server
Expand Down
15 changes: 14 additions & 1 deletion common/configure/commonkafkaconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
log "github.com/Sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
"strings"
)

// KafkaConfig holds the configuration for the Kafka client
Expand Down Expand Up @@ -79,7 +80,7 @@ func (r *KafkaConfig) loadClusterConfigFileIfNecessary() {
func (r *KafkaConfig) loadClusterConfigFile() {
// TODO do we need to detect file change and reload on file change
if len(r.KafkaClusterConfigFile) == 0 {
log.Warnf("Could not load kafka configu because kafka cluster config file is not configured")
log.Warnf("Could not load kafka config because kafka cluster config file is not configured")
return
}

Expand All @@ -95,4 +96,16 @@ func (r *KafkaConfig) loadClusterConfigFile() {
} else {
r.ClustersConfig = clusters
}
r.addPortNumbers()
}

// addPortNumbers adds the default Kafka broker port number to all broker names
func (r *KafkaConfig) addPortNumbers() {
for _, c := range r.ClustersConfig.Clusters {
for i, b := range c.Brokers {
if !strings.Contains(b, `:`) {
c.Brokers[i] = b + `:9092`
}
}
}
}
6 changes: 3 additions & 3 deletions common/configure/commonkafkaconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestLoadKafkaConfig(t *testing.T) {
clusterConfig, ok := KafkaConfig.GetKafkaClusterConfig("local")
assert.True(t, ok)
assert.Equal(t, 1, len(clusterConfig.Brokers))
assert.Equal(t, "localhost", clusterConfig.Brokers[0])
assert.Equal(t, "localhost:9092", clusterConfig.Brokers[0])
assert.Equal(t, 1, len(clusterConfig.Zookeepers))
assert.Equal(t, "localhost", clusterConfig.Zookeepers[0])
assert.Equal(t, 0, len(clusterConfig.Chroot))
Expand All @@ -52,8 +52,8 @@ func TestLoadKafkaConfig(t *testing.T) {
clusterConfig3, ok3 := KafkaConfig.GetKafkaClusterConfig("test")
assert.True(t, ok3)
assert.Equal(t, 2, len(clusterConfig3.Brokers))
assert.Equal(t, "server1", clusterConfig3.Brokers[0])
assert.Equal(t, "server2", clusterConfig3.Brokers[1])
assert.Equal(t, "server1:9092", clusterConfig3.Brokers[0])
assert.Equal(t, "server2:9092", clusterConfig3.Brokers[1])
assert.Equal(t, 2, len(clusterConfig3.Zookeepers))
assert.Equal(t, "server3", clusterConfig3.Zookeepers[0])
assert.Equal(t, "server4", clusterConfig3.Zookeepers[1])
Expand Down
4 changes: 3 additions & 1 deletion common/configure/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ type (
GetDestinationConfig() CommonDestinationConfig
// GetDefaultServiceConfig is used to retrieve the default bootstrap service config
GetDefaultServiceConfig() CommonServiceConfig
// SetServiceConfig is used to set the service config sepcific to the service
// GetKafkaConfig gets the Kafka configuration
GetKafkaConfig() CommonKafkaConfig
// SetServiceConfig is used to set the service config specific to the service
SetServiceConfig(serviceName string, cfg CommonServiceConfig)
}

Expand Down
5 changes: 5 additions & 0 deletions common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ func IsDLQDestination(dstDesc *shared.DestinationDescription) bool {
func IsDLQDestinationPath(path string) bool {
return len(path) > 4 && strings.HasSuffix(path, ".dlq")
}

// IsKafkaConsumerGroupExtent determines if a consumer group extent is a Kafka consumption assignment
func IsKafkaConsumerGroupExtent(e *shared.ConsumerGroupExtent) bool {
return AreKafkaPhantomStores(e.GetStoreUUIDs())
}
54 changes: 44 additions & 10 deletions services/outputhost/cgcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ import (
"github.com/uber/cherami-thrift/.generated/go/shared"
)

const (
kafkaConnectedStoreUUID = `cafca000-0000-0caf-ca00-0000000cafca` // Placeholder connected store for logs
)

type (
// cacheMsg is the message written to the cg cache
// it has the actual message and a connection ID
Expand Down Expand Up @@ -189,6 +193,12 @@ type (

// cfgMgr is the reference to the cassandra backed cfgMgr
cfgMgr dconfig.ConfigManager

// kafkaCluster is the Kafka cluster for this consumer group, if applicable
kafkaCluster string

// kafkaTopics is the list of kafka topics consumed by this consumer group, if applicable
kafkaTopics []string
}
)

Expand Down Expand Up @@ -293,6 +303,7 @@ func (cgCache *consumerGroupCache) getConsumerGroupTags() map[string]string {

// loadExtentCache loads the extent cache, if it doesn't already exist for this consumer group
func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType shared.DestinationType, cge *shared.ConsumerGroupExtent) {
var committer Committer
extUUID := cge.GetExtentUUID()
if extCache, exists := cgCache.extentCache[extUUID]; !exists {
extCache = &extentCache{
Expand Down Expand Up @@ -336,15 +347,24 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
extCache.initialCredits = cgCache.getMessageCacheSize(cfg, defaultNumOutstandingMsgs)
}

committer := NewCheramiCommitter(
cgCache.metaClient,
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.extUUID,
&extCache.connectedStoreUUID,
cgCache.cachedCGDesc.GetIsMultiZone(),
cgCache.tClients,
)
if common.IsKafkaConsumerGroupExtent(cge) {
committer = NewKafkaCommitter(
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.logger,
&extCache.kafkaClient,
)
} else {
committer = NewCheramiCommitter(
cgCache.metaClient,
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.extUUID,
&extCache.connectedStoreUUID,
cgCache.cachedCGDesc.GetIsMultiZone(),
cgCache.tClients,
)
}

extCache.ackMgr = newAckManager(
cgCache,
Expand All @@ -364,7 +384,16 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
extCache.shutdownWG.Add(1)

// if load fails we will unload it the usual way
go extCache.load(cgCache.outputHostUUID, cgCache.cachedCGDesc.GetConsumerGroupUUID(), cgCache.metaClient, cge)
go extCache.load(
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
cgCache.cachedCGDesc.GetConsumerGroupName(),
cgCache.kafkaCluster,
cgCache.kafkaTopics,
common.UnixNanoTime(cgCache.cachedCGDesc.GetStartFrom()),
cgCache.metaClient,
cge,
)

// now notify the outputhost
cgCache.ackMgrLoadCh <- ackMgrLoadMsg{uint32(extCache.ackMgr.ackMgrID), extCache.ackMgr}
Expand Down Expand Up @@ -519,6 +548,11 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error {
return ErrCgUnloaded
}

if dstDesc.GetType() == shared.DestinationType_KAFKA {
cgCache.kafkaCluster = dstDesc.GetKafkaCluster()
cgCache.kafkaTopics = dstDesc.GetKafkaTopics()
}

readReq := &shared.ReadConsumerGroupRequest{
DestinationUUID: common.StringPtr(cgCache.cachedCGDesc.GetDestinationUUID()),
ConsumerGroupName: common.StringPtr(cgCache.cachedCGDesc.GetConsumerGroupName()),
Expand Down
158 changes: 155 additions & 3 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
package outputhost

import (
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"strings"
"sync"
"time"

Expand All @@ -34,6 +36,8 @@ import (
"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/services/outputhost/load"
Expand Down Expand Up @@ -137,13 +141,30 @@ type extentCache struct {

// consumerM3Client for metrics per consumer group
consumerM3Client metrics.Client

// kafkaClient is the client for the kafka connection, if any
kafkaClient *sc.Consumer
}

var kafkaLogSetup sync.Once

// extentLoadReportingInterval is the freq which load
// metrics are reported to the controller
const extentLoadReportingInterval = 2 * time.Second

func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClient metadata.TChanMetadataService, cge *shared.ConsumerGroupExtent) (err error) {
// kafkaDefaultRetention is the default value of log.retention.hours in the Kafka system
const kafkaDefaultRetention = common.UnixNanoTime(time.Hour * 24 * 7)

func (extCache *extentCache) load(
outputHostUUID,
cgUUID,
cgName,
kafkaCluster string,
kafkaTopics []string,
startFrom common.UnixNanoTime,
metaClient metadata.TChanMetadataService,
cge *shared.ConsumerGroupExtent,
) (err error) {
// it is ok to take the local lock for this extent which will not affect
// others
extCache.cacheMutex.Lock()
Expand All @@ -153,8 +174,14 @@ func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClie
extCache.ackMgr.start()

// now try to load the replica streams
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))

if common.IsKafkaConsumerGroupExtent(cge) {
extCache.connectedStoreUUID = kafkaConnectedStoreUUID
extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics)
} else {
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))
}
if err != nil {
// Exhausted all replica streams.. giving up
extCache.logger.Error(`unable to load replica stream for extent`)
Expand Down Expand Up @@ -306,6 +333,72 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
return
}

func (extCache *extentCache) loadKafkaStream(
cgName string,
outputHostUUID string,
startFrom common.UnixNanoTime,
kafkaCluster string,
kafkaTopics []string,
) (repl *replicaConnection, err error) {
groupID := getKafkaGroupIDForCheramiConsumerGroupName(cgName)

// Configure sarama-cluster
cfg := sc.NewConfig()

// Metadata for the Kafka group join
meta := KafkaGroupMetadata{
Version: kafkaGroupMetadataVersion,
OutputHostUUID: outputHostUUID,
}
cfg.Group.Member.UserData, _ = json.Marshal(meta)

// Get the notifications channel; we will just log it
cfg.Group.Return.Notifications = true

// Older startFroms (e.g. 0, >3.5 days back) are considered to want the oldest offset
// The logic here is that the default Kafka retention is 7 days, so we just decide whether
// the oldest or newest offset is likely to be 'closer' to the desired startFrom time
// Obviously, startFrom = 0 always works perfectly, and startFrom = now also works, as long
// as the consumer group is used within 3.5 days of creation.
// TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request
// to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster
// starts from the right place
if common.Now()-startFrom > kafkaDefaultRetention/2 {
cfg.Config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

// This is an ID that may appear in Kafka logs or metadata
cfg.Config.ClientID = `cherami_` + groupID

// TODO: Sarama metrics registry

// Build the Kafka client. Note that we would ideally like to have a factory for this, but the client
// has consumer-group-specific changes to its configuration
extCache.kafkaClient, err = sc.NewConsumer(
getKafkaBrokersForCluster(kafkaCluster),
groupID,
kafkaTopics,
cfg,
)
if err != nil {
extCache.logger.WithField(common.TagErr, err).Error(`couldn't make Kafka client`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log consumer group and kafka topics here as well? Also better check logs in other places and add consumer group and kafka topics when possible. It will help to narrow down issues when they happen :)

return nil, err
}

// Setup the notification logger
go kafkaNotificationsLogger(extCache.kafkaClient.Notifications(), extCache.logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest passing consumer group id to this kafkaNotificationsLogger function and log the consumer group id as well. The notification itself does not contain consumer group info, thus will be hard to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the extCache.logger already has WithField(tag.CnsmID...


// Create the kafkaStream
call := OpenKafkaStream(extCache.kafkaClient.Messages(), extCache.logger)

// Setup the replicaConnection
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster)
repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0)
extCache.shutdownWG.Add(1)
repl.open()
return
}

// stop the extentCache stops the ackMgr and notifies the cgCache that this extent is done
// Notification to the CG happens only when extent is closed after it is consumed.
// If it is being unloaded by the CG, then no need to notify again
Expand Down Expand Up @@ -417,6 +510,11 @@ func (extCache *extentCache) Report(reporter common.LoadReporter) {
func (extCache *extentCache) unload() {
extCache.cacheMutex.Lock()
close(extCache.closeChannel)
if extCache.kafkaClient != nil {
if err := extCache.kafkaClient.Close(); err != nil {
extCache.logger.WithField(common.TagErr, err).Error(`error closing Kafka client`)
}
}
extCache.cacheMutex.Unlock()
}

Expand All @@ -431,3 +529,57 @@ func (extCache *extentCache) getState() *admin.OutputCgExtent {

return cge
}

// KafkaGroupMetadata is a structure used for JSON encoding/decoding of the metadata stored for
// Kafka groups joined by Cherami
type KafkaGroupMetadata struct {
// Version is the version of this structure
Version uint

// CGUUID is the internal Cherami consumer group UUID that committed this offset
CGUUID string

// OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset
OutputHostUUID string
}

const kafkaGroupMetadataVersion = uint(0) // Current version of the KafkaGroupMetadata

func kafkaNotificationsLogger(ch <-chan *sc.Notification, log bark.Logger) {
notificationNum := 0
notificationsLoop:
for {
select {
case n, ok := <-ch:
if !ok {
break notificationsLoop
}
if n == nil {
log.Warn(`nil notification received`)
continue notificationsLoop
}
notificationNum++
log = log.WithField(`notificationNum`, notificationNum)
if len(n.Claimed) > 0 {
log.WithField(`claimed`, n.Claimed).Info(`claimed partitions`)
}
if len(n.Released) > 0 {
log.WithField(`released`, n.Released).Info(`released partitions`)
}
if len(n.Current) > 0 {
log.WithField(`current`, n.Current).Info(`current partitions`)
}
}
}
log.Info(`Notifications channel closed`)
}

func getKafkaGroupIDForCheramiConsumerGroupName(cgName string) string {
s := strings.Split(cgName, `/`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why split the consumer group path and use the last part? It may cause name conflict for kafka consumer group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka cannot use '/' in their consumer group names. This seems like a reasonable workaround, as long as the documentation is clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the Cherami document require the full path of consumer group (before KFC) to be unique for only the last part to be unique?

Also, I tried to use / in kafka consumer group name (e.g. /kafka/consumer/group/xxx), and there is no error. So you may be able to just use the full path for kafka, or, replace "/" with "_"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if that works, then I'm OK with it, but I wonder if it will break something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed up with the Kafka team; they discourage using '/' in consumer group names, even if it will work.

return s[len(s)-1]
}

func getKafkaBrokersForCluster(cluster string) []string {
cfg, _ := thisOutputHost.kafkaCfg.GetKafkaClusterConfig(cluster)
return cfg.Brokers
}
Loading