diff --git a/cmd/servicecmd/servicestartcmd.go b/cmd/servicecmd/servicestartcmd.go index 52f3167c..36006bd2 100644 --- a/cmd/servicecmd/servicestartcmd.go +++ b/cmd/servicecmd/servicestartcmd.go @@ -153,7 +153,7 @@ func StartOutputHostService() { // and since we aren't using thrift anyway. We are selfish with our Frontend. frontendhost, _ := frontendhost.NewFrontendHost(common.FrontendServiceName, sCommon, meta, cfg) - h, tc := outputhost.NewOutputHost(serviceName, sCommon, meta, frontendhost, nil) + h, tc := outputhost.NewOutputHost(serviceName, sCommon, meta, frontendhost, nil, cfg.GetKafkaConfig()) h.Start(tc) // start websocket server diff --git a/common/configure/commonkafkaconfig.go b/common/configure/commonkafkaconfig.go index 1df4aa8a..39f8fbc9 100644 --- a/common/configure/commonkafkaconfig.go +++ b/common/configure/commonkafkaconfig.go @@ -24,6 +24,7 @@ import ( log "github.com/Sirupsen/logrus" "gopkg.in/yaml.v2" "io/ioutil" + "strings" ) // KafkaConfig holds the configuration for the Kafka client @@ -79,7 +80,7 @@ func (r *KafkaConfig) loadClusterConfigFileIfNecessary() { func (r *KafkaConfig) loadClusterConfigFile() { // TODO do we need to detect file change and reload on file change if len(r.KafkaClusterConfigFile) == 0 { - log.Warnf("Could not load kafka configu because kafka cluster config file is not configured") + log.Warnf("Could not load kafka config because kafka cluster config file is not configured") return } @@ -95,4 +96,16 @@ func (r *KafkaConfig) loadClusterConfigFile() { } else { r.ClustersConfig = clusters } + r.addPortNumbers() +} + +// addPortNumbers adds the default Kafka broker port number to all broker names +func (r *KafkaConfig) addPortNumbers() { + for _, c := range r.ClustersConfig.Clusters { + for i, b := range c.Brokers { + if !strings.Contains(b, `:`) { + c.Brokers[i] = b + `:9092` + } + } + } } diff --git a/common/configure/commonkafkaconfig_test.go b/common/configure/commonkafkaconfig_test.go index 0fcce8ac..1dbe45aa 100644 --- a/common/configure/commonkafkaconfig_test.go +++ b/common/configure/commonkafkaconfig_test.go @@ -38,7 +38,7 @@ func TestLoadKafkaConfig(t *testing.T) { clusterConfig, ok := KafkaConfig.GetKafkaClusterConfig("local") assert.True(t, ok) assert.Equal(t, 1, len(clusterConfig.Brokers)) - assert.Equal(t, "localhost", clusterConfig.Brokers[0]) + assert.Equal(t, "localhost:9092", clusterConfig.Brokers[0]) assert.Equal(t, 1, len(clusterConfig.Zookeepers)) assert.Equal(t, "localhost", clusterConfig.Zookeepers[0]) assert.Equal(t, 0, len(clusterConfig.Chroot)) @@ -52,8 +52,8 @@ func TestLoadKafkaConfig(t *testing.T) { clusterConfig3, ok3 := KafkaConfig.GetKafkaClusterConfig("test") assert.True(t, ok3) assert.Equal(t, 2, len(clusterConfig3.Brokers)) - assert.Equal(t, "server1", clusterConfig3.Brokers[0]) - assert.Equal(t, "server2", clusterConfig3.Brokers[1]) + assert.Equal(t, "server1:9092", clusterConfig3.Brokers[0]) + assert.Equal(t, "server2:9092", clusterConfig3.Brokers[1]) assert.Equal(t, 2, len(clusterConfig3.Zookeepers)) assert.Equal(t, "server3", clusterConfig3.Zookeepers[0]) assert.Equal(t, "server4", clusterConfig3.Zookeepers[1]) diff --git a/common/configure/interfaces.go b/common/configure/interfaces.go index c2f458e6..7e99a933 100644 --- a/common/configure/interfaces.go +++ b/common/configure/interfaces.go @@ -83,7 +83,9 @@ type ( GetDestinationConfig() CommonDestinationConfig // GetDefaultServiceConfig is used to retrieve the default bootstrap service config GetDefaultServiceConfig() CommonServiceConfig - // SetServiceConfig is used to set the service config sepcific to the service + // GetKafkaConfig gets the Kafka configuration + GetKafkaConfig() CommonKafkaConfig + // SetServiceConfig is used to set the service config specific to the service SetServiceConfig(serviceName string, cfg CommonServiceConfig) } diff --git a/common/metadata.go b/common/metadata.go index eeda9431..b5a49d73 100644 --- a/common/metadata.go +++ b/common/metadata.go @@ -36,3 +36,8 @@ func IsDLQDestination(dstDesc *shared.DestinationDescription) bool { func IsDLQDestinationPath(path string) bool { return len(path) > 4 && strings.HasSuffix(path, ".dlq") } + +// IsKafkaConsumerGroupExtent determines if a consumer group extent is a Kafka consumption assignment +func IsKafkaConsumerGroupExtent(e *shared.ConsumerGroupExtent) bool { + return AreKafkaPhantomStores(e.GetStoreUUIDs()) +} diff --git a/services/outputhost/cgcache.go b/services/outputhost/cgcache.go index 536c22eb..d6e2a6ac 100644 --- a/services/outputhost/cgcache.go +++ b/services/outputhost/cgcache.go @@ -39,6 +39,10 @@ import ( "github.com/uber/cherami-thrift/.generated/go/shared" ) +const ( + kafkaConnectedStoreUUID = `cafca000-0000-0caf-ca00-0000000cafca` // Placeholder connected store for logs +) + type ( // cacheMsg is the message written to the cg cache // it has the actual message and a connection ID @@ -189,6 +193,12 @@ type ( // cfgMgr is the reference to the cassandra backed cfgMgr cfgMgr dconfig.ConfigManager + + // kafkaCluster is the Kafka cluster for this consumer group, if applicable + kafkaCluster string + + // kafkaTopics is the list of kafka topics consumed by this consumer group, if applicable + kafkaTopics []string } ) @@ -293,6 +303,7 @@ func (cgCache *consumerGroupCache) getConsumerGroupTags() map[string]string { // loadExtentCache loads the extent cache, if it doesn't already exist for this consumer group func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType shared.DestinationType, cge *shared.ConsumerGroupExtent) { + var committer Committer extUUID := cge.GetExtentUUID() if extCache, exists := cgCache.extentCache[extUUID]; !exists { extCache = &extentCache{ @@ -336,15 +347,24 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType extCache.initialCredits = cgCache.getMessageCacheSize(cfg, defaultNumOutstandingMsgs) } - committer := NewCheramiCommitter( - cgCache.metaClient, - cgCache.outputHostUUID, - cgCache.cachedCGDesc.GetConsumerGroupUUID(), - extCache.extUUID, - &extCache.connectedStoreUUID, - cgCache.cachedCGDesc.GetIsMultiZone(), - cgCache.tClients, - ) + if common.IsKafkaConsumerGroupExtent(cge) { + committer = NewKafkaCommitter( + cgCache.outputHostUUID, + cgCache.cachedCGDesc.GetConsumerGroupUUID(), + extCache.logger, + &extCache.kafkaClient, + ) + } else { + committer = NewCheramiCommitter( + cgCache.metaClient, + cgCache.outputHostUUID, + cgCache.cachedCGDesc.GetConsumerGroupUUID(), + extCache.extUUID, + &extCache.connectedStoreUUID, + cgCache.cachedCGDesc.GetIsMultiZone(), + cgCache.tClients, + ) + } extCache.ackMgr = newAckManager( cgCache, @@ -364,7 +384,16 @@ func (cgCache *consumerGroupCache) loadExtentCache(ctx thrift.Context, destType extCache.shutdownWG.Add(1) // if load fails we will unload it the usual way - go extCache.load(cgCache.outputHostUUID, cgCache.cachedCGDesc.GetConsumerGroupUUID(), cgCache.metaClient, cge) + go extCache.load( + cgCache.outputHostUUID, + cgCache.cachedCGDesc.GetConsumerGroupUUID(), + cgCache.cachedCGDesc.GetConsumerGroupName(), + cgCache.kafkaCluster, + cgCache.kafkaTopics, + common.UnixNanoTime(cgCache.cachedCGDesc.GetStartFrom()), + cgCache.metaClient, + cge, + ) // now notify the outputhost cgCache.ackMgrLoadCh <- ackMgrLoadMsg{uint32(extCache.ackMgr.ackMgrID), extCache.ackMgr} @@ -519,6 +548,11 @@ func (cgCache *consumerGroupCache) refreshCgCache(ctx thrift.Context) error { return ErrCgUnloaded } + if dstDesc.GetType() == shared.DestinationType_KAFKA { + cgCache.kafkaCluster = dstDesc.GetKafkaCluster() + cgCache.kafkaTopics = dstDesc.GetKafkaTopics() + } + readReq := &shared.ReadConsumerGroupRequest{ DestinationUUID: common.StringPtr(cgCache.cachedCGDesc.GetDestinationUUID()), ConsumerGroupName: common.StringPtr(cgCache.cachedCGDesc.GetConsumerGroupName()), diff --git a/services/outputhost/extcache.go b/services/outputhost/extcache.go index 9c09e53b..be607d32 100644 --- a/services/outputhost/extcache.go +++ b/services/outputhost/extcache.go @@ -21,11 +21,13 @@ package outputhost import ( + "encoding/json" "fmt" "math/rand" "net" "net/http" "os" + "strings" "sync" "time" @@ -34,6 +36,8 @@ import ( "github.com/uber-common/bark" "github.com/uber/tchannel-go/thrift" + "github.com/Shopify/sarama" + sc "github.com/bsm/sarama-cluster" "github.com/uber/cherami-server/common" "github.com/uber/cherami-server/common/metrics" "github.com/uber/cherami-server/services/outputhost/load" @@ -137,13 +141,30 @@ type extentCache struct { // consumerM3Client for metrics per consumer group consumerM3Client metrics.Client + + // kafkaClient is the client for the kafka connection, if any + kafkaClient *sc.Consumer } +var kafkaLogSetup sync.Once + // extentLoadReportingInterval is the freq which load // metrics are reported to the controller const extentLoadReportingInterval = 2 * time.Second -func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClient metadata.TChanMetadataService, cge *shared.ConsumerGroupExtent) (err error) { +// kafkaDefaultRetention is the default value of log.retention.hours in the Kafka system +const kafkaDefaultRetention = common.UnixNanoTime(time.Hour * 24 * 7) + +func (extCache *extentCache) load( + outputHostUUID, + cgUUID, + cgName, + kafkaCluster string, + kafkaTopics []string, + startFrom common.UnixNanoTime, + metaClient metadata.TChanMetadataService, + cge *shared.ConsumerGroupExtent, +) (err error) { // it is ok to take the local lock for this extent which will not affect // others extCache.cacheMutex.Lock() @@ -153,8 +174,14 @@ func (extCache *extentCache) load(outputHostUUID string, cgUUID string, metaClie extCache.ackMgr.start() // now try to load the replica streams - extCache.connection, extCache.pickedIndex, err = - extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs))) + + if common.IsKafkaConsumerGroupExtent(cge) { + extCache.connectedStoreUUID = kafkaConnectedStoreUUID + extCache.connection, err = extCache.loadKafkaStream(cgName, outputHostUUID, startFrom, kafkaCluster, kafkaTopics) + } else { + extCache.connection, extCache.pickedIndex, err = + extCache.loadReplicaStream(cge.GetAckLevelOffset(), common.SequenceNumber(cge.GetAckLevelSeqNo()), rand.Intn(len(extCache.storeUUIDs))) + } if err != nil { // Exhausted all replica streams.. giving up extCache.logger.Error(`unable to load replica stream for extent`) @@ -306,6 +333,72 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence return } +func (extCache *extentCache) loadKafkaStream( + cgName string, + outputHostUUID string, + startFrom common.UnixNanoTime, + kafkaCluster string, + kafkaTopics []string, +) (repl *replicaConnection, err error) { + groupID := getKafkaGroupIDForCheramiConsumerGroupName(cgName) + + // Configure sarama-cluster + cfg := sc.NewConfig() + + // Metadata for the Kafka group join + meta := KafkaGroupMetadata{ + Version: kafkaGroupMetadataVersion, + OutputHostUUID: outputHostUUID, + } + cfg.Group.Member.UserData, _ = json.Marshal(meta) + + // Get the notifications channel; we will just log it + cfg.Group.Return.Notifications = true + + // Older startFroms (e.g. 0, >3.5 days back) are considered to want the oldest offset + // The logic here is that the default Kafka retention is 7 days, so we just decide whether + // the oldest or newest offset is likely to be 'closer' to the desired startFrom time + // Obviously, startFrom = 0 always works perfectly, and startFrom = now also works, as long + // as the consumer group is used within 3.5 days of creation. + // TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request + // to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster + // starts from the right place + if common.Now()-startFrom > kafkaDefaultRetention/2 { + cfg.Config.Consumer.Offsets.Initial = sarama.OffsetOldest + } + + // This is an ID that may appear in Kafka logs or metadata + cfg.Config.ClientID = `cherami_` + groupID + + // TODO: Sarama metrics registry + + // Build the Kafka client. Note that we would ideally like to have a factory for this, but the client + // has consumer-group-specific changes to its configuration + extCache.kafkaClient, err = sc.NewConsumer( + getKafkaBrokersForCluster(kafkaCluster), + groupID, + kafkaTopics, + cfg, + ) + if err != nil { + extCache.logger.WithField(common.TagErr, err).Error(`couldn't make Kafka client`) + return nil, err + } + + // Setup the notification logger + go kafkaNotificationsLogger(extCache.kafkaClient.Notifications(), extCache.logger) + + // Create the kafkaStream + call := OpenKafkaStream(extCache.kafkaClient.Messages(), extCache.logger) + + // Setup the replicaConnection + replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster) + repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0) + extCache.shutdownWG.Add(1) + repl.open() + return +} + // stop the extentCache stops the ackMgr and notifies the cgCache that this extent is done // Notification to the CG happens only when extent is closed after it is consumed. // If it is being unloaded by the CG, then no need to notify again @@ -417,6 +510,11 @@ func (extCache *extentCache) Report(reporter common.LoadReporter) { func (extCache *extentCache) unload() { extCache.cacheMutex.Lock() close(extCache.closeChannel) + if extCache.kafkaClient != nil { + if err := extCache.kafkaClient.Close(); err != nil { + extCache.logger.WithField(common.TagErr, err).Error(`error closing Kafka client`) + } + } extCache.cacheMutex.Unlock() } @@ -431,3 +529,57 @@ func (extCache *extentCache) getState() *admin.OutputCgExtent { return cge } + +// KafkaGroupMetadata is a structure used for JSON encoding/decoding of the metadata stored for +// Kafka groups joined by Cherami +type KafkaGroupMetadata struct { + // Version is the version of this structure + Version uint + + // CGUUID is the internal Cherami consumer group UUID that committed this offset + CGUUID string + + // OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset + OutputHostUUID string +} + +const kafkaGroupMetadataVersion = uint(0) // Current version of the KafkaGroupMetadata + +func kafkaNotificationsLogger(ch <-chan *sc.Notification, log bark.Logger) { + notificationNum := 0 +notificationsLoop: + for { + select { + case n, ok := <-ch: + if !ok { + break notificationsLoop + } + if n == nil { + log.Warn(`nil notification received`) + continue notificationsLoop + } + notificationNum++ + log = log.WithField(`notificationNum`, notificationNum) + if len(n.Claimed) > 0 { + log.WithField(`claimed`, n.Claimed).Info(`claimed partitions`) + } + if len(n.Released) > 0 { + log.WithField(`released`, n.Released).Info(`released partitions`) + } + if len(n.Current) > 0 { + log.WithField(`current`, n.Current).Info(`current partitions`) + } + } + } + log.Info(`Notifications channel closed`) +} + +func getKafkaGroupIDForCheramiConsumerGroupName(cgName string) string { + s := strings.Split(cgName, `/`) + return s[len(s)-1] +} + +func getKafkaBrokersForCluster(cluster string) []string { + cfg, _ := thisOutputHost.kafkaCfg.GetKafkaClusterConfig(cluster) + return cfg.Brokers +} diff --git a/services/outputhost/kafkaCommitter.go b/services/outputhost/kafkaCommitter.go index 0de71b83..a1e79caa 100644 --- a/services/outputhost/kafkaCommitter.go +++ b/services/outputhost/kafkaCommitter.go @@ -23,22 +23,23 @@ package outputhost import ( "encoding/json" "sync" + "time" sc "github.com/bsm/sarama-cluster" "github.com/uber-common/bark" "github.com/uber/cherami-server/common" - "github.com/uber/cherami-thrift/.generated/go/metadata" ) +var outputHostStartTime = time.Now() + // KafkaCommitter is commits ackLevels to Cassandra through the TChanMetadataClient interface type KafkaCommitter struct { connectedStoreUUID *string commitLevel CommitterLevel readLevel CommitterLevel finalLevel CommitterLevel - metaclient metadata.TChanMetadataService *sc.OffsetStash - *sc.Consumer + consumer **sc.Consumer KafkaOffsetMetadata metadataString string // JSON version of KafkaOffsetMetadata logger bark.Logger @@ -55,6 +56,12 @@ type KafkaOffsetMetadata struct { // OutputHostUUID is the UUID of the Cherami Outputhost that committed this offset OutputHostUUID string + + // OutputHostStartTime is the time that the output host started + OutputHostStartTime string + + // CommitterStartTime is the time that this committer was started + CommitterStartTime string } const kafkaOffsetMetadataVersion = uint(0) // Current version of the KafkaOffsetMetadata @@ -84,11 +91,16 @@ func (c *KafkaCommitter) SetFinalLevel(l CommitterLevel) { // UnlockAndFlush pushes our commit and read levels to Cherami metadata, using SetAckOffset func (c *KafkaCommitter) UnlockAndFlush(l sync.Locker) error { + var err error os := c.OffsetStash c.OffsetStash = sc.NewOffsetStash() l.Unlock() // MarkOffsets may take some time, so we unlock the thread that owns us - c.MarkOffsets(os) - return nil + if *c.consumer != nil { + c.logger.WithField(`offsets`, os.Offsets()).Debug(`Flushing Offsets`) + (*c.consumer).MarkOffsets(os) + err = (*c.consumer).CommitOffsets() + } + return err } // GetReadLevel returns the next readlevel that will be flushed @@ -107,27 +119,29 @@ func (c *KafkaCommitter) GetCommitLevel() (l CommitterLevel) { * Setup & Utility */ -// NewKafkaCommitter instantiates a KafkaCommitter -func NewKafkaCommitter(metaclient metadata.TChanMetadataService, +// NewKafkaCommitter instantiates a kafkaCommitter +func NewKafkaCommitter( outputHostUUID string, cgUUID string, - extUUID string, - connectedStoreUUID *string, - logger bark.Logger) *KafkaCommitter { + logger bark.Logger, + client **sc.Consumer) *KafkaCommitter { + now := time.Now() + meta := KafkaOffsetMetadata{ - Version: kafkaOffsetMetadataVersion, - OutputHostUUID: outputHostUUID, - CGUUID: cgUUID, + Version: kafkaOffsetMetadataVersion, + OutputHostUUID: outputHostUUID, + CGUUID: cgUUID, + OutputHostStartTime: outputHostStartTime.Format(time.RFC3339), + CommitterStartTime: now.Format(time.RFC3339), } metaJSON, _ := json.Marshal(meta) return &KafkaCommitter{ - metaclient: metaclient, - connectedStoreUUID: connectedStoreUUID, OffsetStash: sc.NewOffsetStash(), metadataString: string(metaJSON), KafkaOffsetMetadata: meta, logger: logger, + consumer: client, } } diff --git a/services/outputhost/kafkaStream.go b/services/outputhost/kafkaStream.go index 16fe3afe..f060f33c 100644 --- a/services/outputhost/kafkaStream.go +++ b/services/outputhost/kafkaStream.go @@ -119,7 +119,10 @@ func (k *kafkaStream) convertKafkaMessageToCherami(m *s.ConsumerMessage, logger c.Message.Message = &store.AppendMessage{ SequenceNumber: common.Int64Ptr(atomic.AddInt64(&k.seqNo, 1)), - EnqueueTimeUtc: common.Int64Ptr(m.Timestamp.UnixNano()), + } + + if !m.Timestamp.IsZero() { // only set if kafka is version 0.10+ + c.Message.Message.EnqueueTimeUtc = common.Int64Ptr(m.Timestamp.UnixNano()) } c.Message.Message.Payload = &cherami.PutMessage{ diff --git a/services/outputhost/outputhost.go b/services/outputhost/outputhost.go index b00f971e..9c9c16df 100644 --- a/services/outputhost/outputhost.go +++ b/services/outputhost/outputhost.go @@ -32,8 +32,10 @@ import ( "github.com/uber-common/bark" "github.com/uber/tchannel-go/thrift" + "github.com/Shopify/sarama" ccommon "github.com/uber/cherami-client-go/common" "github.com/uber/cherami-server/common" + "github.com/uber/cherami-server/common/configure" cassDconfig "github.com/uber/cherami-server/common/dconfig" dconfig "github.com/uber/cherami-server/common/dconfigclient" mm "github.com/uber/cherami-server/common/metadata" @@ -89,6 +91,7 @@ type ( ackMgrUnloadCh chan uint32 hostMetrics *load.HostMetrics cfgMgr cassDconfig.ConfigManager + kafkaCfg configure.CommonKafkaConfig common.SCommon } @@ -374,6 +377,7 @@ func (h *OutputHost) processAcks(ackIds []string, isNack bool) (invalidIDs []str if ackMgr == nil { h.logger.WithFields(bark.Fields{ common.TagAckID: ackIDStr, + `ackMgrID`: ackMgrID, }).Info("processAcks could not get ack manager (probably extent is consumed)") h.m3Client.IncCounter(metrics.AckMessagesScope, metrics.OutputhostMessageNoAckManager) continue @@ -743,7 +747,14 @@ func (h *OutputHost) SetFrontendClient(frontendClient ccherami.TChanBFrontend) { } // NewOutputHost is the constructor for BOut -func NewOutputHost(serviceName string, sVice common.SCommon, metadataClient metadata.TChanMetadataService, frontendClient ccherami.TChanBFrontend, opts *OutOptions) (*OutputHost, []thrift.TChanServer) { +func NewOutputHost( + serviceName string, + sVice common.SCommon, + metadataClient metadata.TChanMetadataService, + frontendClient ccherami.TChanBFrontend, + opts *OutOptions, + kafkaCfg configure.CommonKafkaConfig, +) (*OutputHost, []thrift.TChanServer) { // Get the deployment name for logger field deploymentName := sVice.GetConfig().GetDeploymentName() @@ -760,8 +771,11 @@ func NewOutputHost(serviceName string, sVice common.SCommon, metadataClient meta ackMgrUnloadCh: make(chan uint32, defaultAckMgrMapChSize), ackMgrIDGen: common.NewHostAckIDGenerator(defaultAckMgrIDStartFrom), hostMetrics: load.NewHostMetrics(), + kafkaCfg: kafkaCfg, } + sarama.Logger = NewSaramaLoggerFromBark(bs.logger, `sarama`) + bs.sessionID = common.UUIDToUint16(sVice.GetHostUUID()) bs.m3Client = metrics.NewClient(sVice.GetMetricsReporter(), metrics.Outputhost) diff --git a/services/outputhost/outputhost_test.go b/services/outputhost/outputhost_test.go index 6ec45e77..ea6934e5 100644 --- a/services/outputhost/outputhost_test.go +++ b/services/outputhost/outputhost_test.go @@ -156,7 +156,7 @@ func utilGetHTTPRequestWithPath(path string) *http.Request { // fail, if there is no path given to publish func (s *OutputHostSuite) TestOutputHostRejectNoPath() { sName := "cherami-test-out" - outputHost, _ := NewOutputHost(sName, s.mockService, nil, nil, nil) + outputHost, _ := NewOutputHost(sName, s.mockService, nil, nil, nil, nil) httpRequest := utilGetHTTPRequest() outputHost.OpenConsumerStreamHandler(s.mockHTTPResponse, httpRequest) s.mockHTTPResponse.AssertCalled(s.T(), "WriteHeader", http.StatusBadRequest) @@ -168,7 +168,7 @@ func (s *OutputHostSuite) TestOutputHostReadMessage() { count := 10 - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) httpRequest := utilGetHTTPRequestWithPath("foo") destUUID := uuid.New() @@ -273,7 +273,7 @@ func (s *OutputHostSuite) TestOutputHostReadMessage() { // of whether write() succeeds or fails. func (s *OutputHostSuite) TestOutputHostPutsMsgIntoMsgCacheOnWriteError() { - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) httpRequest := utilGetHTTPRequestWithPath("foo") destUUID := uuid.New() @@ -363,7 +363,7 @@ func (s *OutputHostSuite) TestOutputHostPutsMsgIntoMsgCacheOnWriteError() { } func (s *OutputHostSuite) TestOutputHostAckMessage() { - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) httpRequest := utilGetHTTPRequestWithPath("foo") ctx, _ := utilGetThriftContext() @@ -445,7 +445,7 @@ func (s *OutputHostSuite) TestOutputHostAckMessage() { } func (s *OutputHostSuite) TestOutputHostInvalidAcks() { - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) ctx, _ := utilGetThriftContext() // ack a couple of invalid messages @@ -478,7 +478,7 @@ func (s *OutputHostSuite) TestOutputHostInvalidAcks() { } func (s *OutputHostSuite) TestOutputHostReconfigure() { - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) httpRequest := utilGetHTTPRequestWithPath("foo") ctx, _ := utilGetThriftContext() @@ -607,7 +607,7 @@ func (s *OutputHostSuite) TestOutputHostReceiveMessageBatch() { var count int32 count = 10 - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) ctx, _ := utilGetThriftContext() destUUID := uuid.New() @@ -676,7 +676,7 @@ func (s *OutputHostSuite) TestOutputHostReceiveMessageBatch_NoMsg() { var count int32 count = 10 - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) ctx, _ := utilGetThriftContext() destUUID := uuid.New() @@ -726,7 +726,7 @@ func (s *OutputHostSuite) TestOutputHostReceiveMessageBatch_SomeMsgAvailable() { var count int32 count = 10 - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) ctx, _ := utilGetThriftContext() destUUID := uuid.New() @@ -793,7 +793,7 @@ func (s *OutputHostSuite) TestOutputHostReceiveMessageBatch_SomeMsgAvailable() { } func (s *OutputHostSuite) TestOutputCgUnload() { - outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test", s.mockService, s.mockMeta, nil, nil, nil) httpRequest := utilGetHTTPRequestWithPath("foo") ctx, _ := utilGetThriftContext() @@ -887,7 +887,7 @@ func (s *OutputHostSuite) TestOutputCgUnload() { } func (s *OutputHostSuite) TestOutputAckMgrReset() { - outputHost, _ := NewOutputHost("outputhost-test-reset", s.mockService, s.mockMeta, nil, nil) + outputHost, _ := NewOutputHost("outputhost-test-reset", s.mockService, s.mockMeta, nil, nil, nil) httpRequest := utilGetHTTPRequestWithPath("foo") go outputHost.manageCgCache() diff --git a/services/outputhost/saramaLogger.go b/services/outputhost/saramaLogger.go new file mode 100644 index 00000000..345e8525 --- /dev/null +++ b/services/outputhost/saramaLogger.go @@ -0,0 +1,88 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package outputhost + +import ( + "github.com/Shopify/sarama" + "github.com/uber-common/bark" + "strings" +) + +type saramaLogger struct { + l bark.Logger +} + +// NewSaramaLoggerFromBark provides a logger suitable for Sarama from a given bark logger +func NewSaramaLoggerFromBark(l bark.Logger, module string) sarama.StdLogger { + return &saramaLogger{ + l: l.WithField(`module`, module), + } +} + +// Print prints the objects given +func (l *saramaLogger) Print(v ...interface{}) { + if isError(v...) { + l.l.Error(trim(v)...) + return + } + l.l.Info(trim(v)...) +} + +// Printf prints the objects given with the specified format +func (l *saramaLogger) Printf(format string, v ...interface{}) { + if isError(format) || isError(v...) { + l.l.Errorf(strings.TrimSpace(format), v...) + return + } + l.l.Infof(strings.TrimSpace(format), v...) +} + +// Println is the same as Print +func (l *saramaLogger) Println(v ...interface{}) { + l.Print(v...) +} + +// Try to detect the level of the log line +func isError(v ...interface{}) bool { + if len(v) == 0 { + return false + } + if _, ok := v[0].(error); ok { + return true + } + if s, ok := v[0].(string); ok { + s = strings.ToLower(s) + if strings.Contains(s, `error`) || strings.Contains(s, `fail`) { + return true + } + } + return false +} + +func trim(v ...interface{}) []interface{} { + if len(v) == 0 { + return v + } + if s, ok := v[0].(string); ok { + v[0] = strings.TrimSpace(s) + } + return v +} diff --git a/test/integration/base.go b/test/integration/base.go index fe1e30b2..66f63af1 100644 --- a/test/integration/base.go +++ b/test/integration/base.go @@ -254,7 +254,14 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) { dClient := dconfig.NewDconfigClient(configure.NewCommonServiceConfig(), common.OutputServiceName) sCommon := common.NewService(common.OutputServiceName, hostID, cfg, tb.UUIDResolver, hwInfoReader, reporter, dClient) log.Infof("output ringHosts: %v", cfg.GetRingHosts()) - oh, tc := outputhost.NewOutputHost(common.OutputServiceName, sCommon, tb.mClient, frontendForOut, nil) + oh, tc := outputhost.NewOutputHost( + common.OutputServiceName, + sCommon, + tb.mClient, + frontendForOut, + nil, + cfgMap[common.OutputServiceName][i].GetKafkaConfig(), + ) oh.Start(tc) // start websocket server common.WSStart(cfg.GetListenAddress().String(), cfg.GetWebsocketPort(), oh)