Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Initial kafka integration on outputhost (#134)
Browse files Browse the repository at this point in the history
* Initial kafka integration on outputhost

* kFC fixes

* Lint

* Integrate Kafka config

* Adjust Kafka CGE detection heuristic

* Add special UUID to kafka CGE detection heuristic

* Adjust kafka startFrom strategy

* Remove unused file

* Fmt

* Integrate KafkaPhantom changes
  • Loading branch information
Guillaume Bailey authored Apr 17, 2017
1 parent c4018e0 commit b249347
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cmd/servicecmd/servicestartcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func StartOutputHostService() {
// and since we aren't using thrift anyway. We are selfish with our Frontend.
frontendhost, _ := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, meta, cfg)

h, tc := outputhost.NewOutputHost(serviceName, sCommon, meta, frontendhost, nil)
h, tc := outputhost.NewOutputHost(serviceName, sCommon, meta, frontendhost, nil, cfg.GetKafkaConfig())
h.Start(tc)

// start websocket server
Expand Down
15 changes: 14 additions & 1 deletion common/configure/commonkafkaconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
log "github.com/Sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
"strings"
)

// KafkaConfig holds the configuration for the Kafka client
Expand Down Expand Up @@ -79,7 +80,7 @@ func (r *KafkaConfig) loadClusterConfigFileIfNecessary() {
func (r *KafkaConfig) loadClusterConfigFile() {
// TODO do we need to detect file change and reload on file change
if len(r.KafkaClusterConfigFile) == 0 {
log.Warnf("Could not load kafka configu because kafka cluster config file is not configured")
log.Warnf("Could not load kafka config because kafka cluster config file is not configured")
return
}

Expand All @@ -95,4 +96,16 @@ func (r *KafkaConfig) loadClusterConfigFile() {
} else {
r.ClustersConfig = clusters
}
r.addPortNumbers()
}

// addPortNumbers adds the default Kafka broker port number to all broker names
func (r *KafkaConfig) addPortNumbers() {
for _, c := range r.ClustersConfig.Clusters {
for i, b := range c.Brokers {
if !strings.Contains(b, `:`) {
c.Brokers[i] = b + `:9092`
}
}
}
}
6 changes: 3 additions & 3 deletions common/configure/commonkafkaconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestLoadKafkaConfig(t *testing.T) {
clusterConfig, ok := KafkaConfig.GetKafkaClusterConfig("local")
assert.True(t, ok)
assert.Equal(t, 1, len(clusterConfig.Brokers))
assert.Equal(t, "localhost", clusterConfig.Brokers[0])
assert.Equal(t, "localhost:9092", clusterConfig.Brokers[0])
assert.Equal(t, 1, len(clusterConfig.Zookeepers))
assert.Equal(t, "localhost", clusterConfig.Zookeepers[0])
assert.Equal(t, 0, len(clusterConfig.Chroot))
Expand All @@ -52,8 +52,8 @@ func TestLoadKafkaConfig(t *testing.T) {
clusterConfig3, ok3 := KafkaConfig.GetKafkaClusterConfig("test")
assert.True(t, ok3)
assert.Equal(t, 2, len(clusterConfig3.Brokers))
assert.Equal(t, "server1", clusterConfig3.Brokers[0])
assert.Equal(t, "server2", clusterConfig3.Brokers[1])
assert.Equal(t, "server1:9092", clusterConfig3.Brokers[0])
assert.Equal(t, "server2:9092", clusterConfig3.Brokers[1])
assert.Equal(t, 2, len(clusterConfig3.Zookeepers))
assert.Equal(t, "server3", clusterConfig3.Zookeepers[0])
assert.Equal(t, "server4", clusterConfig3.Zookeepers[1])
Expand Down
4 changes: 3 additions & 1 deletion common/configure/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ type (
GetDestinationConfig() CommonDestinationConfig
// GetDefaultServiceConfig is used to retrieve the default bootstrap service config
GetDefaultServiceConfig() CommonServiceConfig
// SetServiceConfig is used to set the service config sepcific to the service
// GetKafkaConfig gets the Kafka configuration
GetKafkaConfig() CommonKafkaConfig
// SetServiceConfig is used to set the service config specific to the service
SetServiceConfig(serviceName string, cfg CommonServiceConfig)
}

Expand Down
5 changes: 5 additions & 0 deletions common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ func IsDLQDestination(dstDesc *shared.DestinationDescription) bool {
func IsDLQDestinationPath(path string) bool {
return len(path) > 4 && strings.HasSuffix(path, ".dlq")
}

// IsKafkaConsumerGroupExtent determines if a consumer group extent is a Kafka consumption assignment
func IsKafkaConsumerGroupExtent(e *shared.ConsumerGroupExtent) bool {
return AreKafkaPhantomStores(e.GetStoreUUIDs())
}
54 changes: 44 additions & 10 deletions services/outputhost/cgcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ import (
"github.com/uber/cherami-thrift/.generated/go/shared"
)

const (
kafkaConnectedStoreUUID = `cafca000-0000-0caf-ca00-0000000cafca` // Placeholder connected store for logs
)

type (
// cacheMsg is the message written to the cg cache
// it has the actual message and a connection ID
Expand Down Expand Up @@ -189,6 +193,12 @@ type (

// cfgMgr is the reference to the cassandra backed cfgMgr
cfgMgr dconfig.ConfigManager

// kafkaCluster is the Kafka cluster for this consumer group, if applicable
kafkaCluster string

// kafkaTopics is the list of kafka topics consumed by this consumer group, if applicable
kafkaTopics []string
}
)

Expand Down Expand Up @@ -293,6 +303,7 @@ func (cgCache *consumerGroupCache) getConsumerGroupTags() map[string]string {

// loadExtentCache loads the extent cache, if it doesn't already exist for this consumer group
func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType shared.DestinationType, cge *shared.ConsumerGroupExtent) {
var committer Committer
extUUID := cge.GetExtentUUID()
if extCache, exists := cgCache.extentCache[extUUID]; !exists {
extCache = &extentCache{
Expand Down Expand Up @@ -336,15 +347,24 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
extCache.initialCredits = cgCache.getMessageCacheSize(cfg, defaultNumOutstandingMsgs)
}

committer := NewCheramiCommitter(
cgCache.metaClient,
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.extUUID,
&extCache.connectedStoreUUID,
cgCache.cachedCGDesc.GetIsMultiZone(),
cgCache.tClients,
)
if common.IsKafkaConsumerGroupExtent(cge) {
committer = NewKafkaCommitter(
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.logger,
&extCache.kafkaClient,
)
} else {
committer = NewCheramiCommitter(
cgCache.metaClient,
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
extCache.extUUID,
&extCache.connectedStoreUUID,
cgCache.cachedCGDesc.GetIsMultiZone(),
cgCache.tClients,
)
}

extCache.ackMgr = newAckManager(
cgCache,
Expand All @@ -364,7 +384,16 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType
extCache.shutdownWG.Add(1)

// if load fails we will unload it the usual way
go extCache.load(cgCache.outputHostUUID, cgCache.cachedCGDesc.GetConsumerGroupUUID(), cgCache.metaClient, cge)
go extCache.load(
cgCache.outputHostUUID,
cgCache.cachedCGDesc.GetConsumerGroupUUID(),
cgCache.cachedCGDesc.GetConsumerGroupName(),
cgCache.kafkaCluster,
cgCache.kafkaTopics,
common.UnixNanoTime(cgCache.cachedCGDesc.GetStartFrom()),
cgCache.metaClient,
cge,
)

// now notify the outputhost
cgCache.ackMgrLoadCh <- ackMgrLoadMsg{uint32(extCache.ackMgr.ackMgrID), extCache.ackMgr}
Expand Down Expand Up @@ -519,6 +548,11 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error {
return ErrCgUnloaded
}

if dstDesc.GetType() == shared.DestinationType_KAFKA {
cgCache.kafkaCluster = dstDesc.GetKafkaCluster()
cgCache.kafkaTopics = dstDesc.GetKafkaTopics()
}

readReq := &shared.ReadConsumerGroupRequest{
DestinationUUID: common.StringPtr(cgCache.cachedCGDesc.GetDestinationUUID()),
ConsumerGroupName: common.StringPtr(cgCache.cachedCGDesc.GetConsumerGroupName()),
Expand Down
158 changes: 155 additions & 3 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
package outputhost

import (
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"strings"
"sync"
"time"

Expand All @@ -34,6 +36,8 @@ import (
"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/metrics"
"github.com/uber/cherami-server/services/outputhost/load"
Expand Down Expand Up @@ -137,13 +141,30 @@ type extentCache struct {

// consumerM3Client for metrics per consumer group
consumerM3Client metrics.Client

// kafkaClient is the client for the kafka connection, if any
kafkaClient *sc.Consumer
}

var kafkaLogSetup sync.Once

// extentLoadReportingInterval is the freq which load
// metrics are reported to the controller
const extentLoadReportingInterval = 2 * time.Second

func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClient metadata.TChanMetadataService, cge *shared.ConsumerGroupExtent) (err error) {
// kafkaDefaultRetention is the default value of log.retention.hours in the Kafka system
const kafkaDefaultRetention = common.UnixNanoTime(time.Hour * 24 * 7)

func (extCache *extentCache) load(
outputHostUUID,
cgUUID,
cgName,
kafkaCluster string,
kafkaTopics []string,
startFrom common.UnixNanoTime,
metaClient metadata.TChanMetadataService,
cge *shared.ConsumerGroupExtent,
) (err error) {
// it is ok to take the local lock for this extent which will not affect
// others
extCache.cacheMutex.Lock()
Expand All @@ -153,8 +174,14 @@ func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClie
extCache.ackMgr.start()

// now try to load the replica streams
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))

if common.IsKafkaConsumerGroupExtent(cge) {
extCache.connectedStoreUUID = kafkaConnectedStoreUUID
extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics)
} else {
extCache.connection, extCache.pickedIndex, err =
extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs)))
}
if err != nil {
// Exhausted all replica streams.. giving up
extCache.logger.Error(`unable to load replica stream for extent`)
Expand Down Expand Up @@ -306,6 +333,72 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
return
}

func (extCache *extentCache) loadKafkaStream(
cgName string,
outputHostUUID string,
startFrom common.UnixNanoTime,
kafkaCluster string,
kafkaTopics []string,
) (repl *replicaConnection, err error) {
groupID := getKafkaGroupIDForCheramiConsumerGroupName(cgName)

// Configure sarama-cluster
cfg := sc.NewConfig()

// Metadata for the Kafka group join
meta := KafkaGroupMetadata{
Version: kafkaGroupMetadataVersion,
OutputHostUUID: outputHostUUID,
}
cfg.Group.Member.UserData, _ = json.Marshal(meta)

// Get the notifications channel; we will just log it
cfg.Group.Return.Notifications = true

// Older startFroms (e.g. 0, >3.5 days back) are considered to want the oldest offset
// The logic here is that the default Kafka retention is 7 days, so we just decide whether
// the oldest or newest offset is likely to be 'closer' to the desired startFrom time
// Obviously, startFrom = 0 always works perfectly, and startFrom = now also works, as long
// as the consumer group is used within 3.5 days of creation.
// TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request
// to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster
// starts from the right place
if common.Now()-startFrom > kafkaDefaultRetention/2 {
cfg.Config.Consumer.Offsets.Initial = sarama.OffsetOldest
}

// This is an ID that may appear in Kafka logs or metadata
cfg.Config.ClientID = `cherami_` + groupID

// TODO: Sarama metrics registry

// Build the Kafka client. Note that we would ideally like to have a factory for this, but the client
// has consumer-group-specific changes to its configuration
extCache.kafkaClient, err = sc.NewConsumer(
getKafkaBrokersForCluster(kafkaCluster),
groupID,
kafkaTopics,
cfg,
)
if err != nil {
extCache.logger.WithField(common.TagErr, err).Error(`couldn't make Kafka client`)
return nil, err
}

// Setup the notification logger
go kafkaNotificationsLogger(extCache.kafkaClient.Notifications(), extCache.logger)

// Create the kafkaStream
call := OpenKafkaStream(extCache.kafkaClient.Messages(), extCache.logger)

// Setup the replicaConnection
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster)
repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0)
extCache.shutdownWG.Add(1)
repl.open()
return
}

// stop the extentCache stops the ackMgr and notifies the cgCache that this extent is done
// Notification to the CG happens only when extent is closed after it is consumed.
// If it is being unloaded by the CG, then no need to notify again
Expand Down Expand Up @@ -417,6 +510,11 @@ func (extCache *extentCache) Report(reporter common.LoadReporter) {
func (extCache *extentCache) unload() {
extCache.cacheMutex.Lock()
close(extCache.closeChannel)
if extCache.kafkaClient != nil {
if err := extCache.kafkaClient.Close(); err != nil {
extCache.logger.WithField(common.TagErr, err).Error(`error closing Kafka client`)
}
}
extCache.cacheMutex.Unlock()
}

Expand All @@ -431,3 +529,57 @@ func (extCache *extentCache) getState() *admin.OutputCgExtent {

return cge
}

// KafkaGroupMetadata is a structure used for JSON encoding/decoding of the metadata stored for
// Kafka groups joined by Cherami
type KafkaGroupMetadata struct {
// Version is the version of this structure
Version uint

// CGUUID is the internal Cherami consumer group UUID that committed this offset
CGUUID string

// OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset
OutputHostUUID string
}

const kafkaGroupMetadataVersion = uint(0) // Current version of the KafkaGroupMetadata

func kafkaNotificationsLogger(ch <-chan *sc.Notification, log bark.Logger) {
notificationNum := 0
notificationsLoop:
for {
select {
case n, ok := <-ch:
if !ok {
break notificationsLoop
}
if n == nil {
log.Warn(`nil notification received`)
continue notificationsLoop
}
notificationNum++
log = log.WithField(`notificationNum`, notificationNum)
if len(n.Claimed) > 0 {
log.WithField(`claimed`, n.Claimed).Info(`claimed partitions`)
}
if len(n.Released) > 0 {
log.WithField(`released`, n.Released).Info(`released partitions`)
}
if len(n.Current) > 0 {
log.WithField(`current`, n.Current).Info(`current partitions`)
}
}
}
log.Info(`Notifications channel closed`)
}

func getKafkaGroupIDForCheramiConsumerGroupName(cgName string) string {
s := strings.Split(cgName, `/`)
return s[len(s)-1]
}

func getKafkaBrokersForCluster(cluster string) []string {
cfg, _ := thisOutputHost.kafkaCfg.GetKafkaClusterConfig(cluster)
return cfg.Brokers
}
Loading

0 comments on commit b249347

Please sign in to comment.