-
Notifications
You must be signed in to change notification settings - Fork 96
Initial kafka integration on outputhost #134
Conversation
services/outputhost/extcache.go
Outdated
// TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request | ||
// to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster | ||
// starts from the right place | ||
if common.Now()-startFrom > common.UnixNanoTime(time.Hour*24*7) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the startFrom is less than 1 week, will it use lastest offset? If yes, feel a little risky here, since users may not be aware of that they are actually consuming from latest even though they specify a "start from" time. Like the TODO here. And feel we'd better default to oldest offset before we implement that TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be compatible with our current system, we need to distinguish between people that recently created a consumer group and those that want to receive all messages. In practice, I believe I have seen only one customer that had a specific startFrom time desired. This seems like an acceptable workaround.
services/outputhost/extcache.go
Outdated
log.WithField(`released`, n.Released).Info(`released partitions`) | ||
} | ||
if len(n.Current) > 0 { | ||
log.WithField(`current`, n.Released).Info(`current partitions`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log n.Current instead of n.Released?
} | ||
|
||
// Setup the notification logger | ||
go kafkaNotificationsLogger(extCache.kafkaClient.Notifications(), extCache.logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest passing consumer group id to this kafkaNotificationsLogger function and log the consumer group id as well. The notification itself does not contain consumer group info, thus will be hard to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the extCache.logger already has WithField(tag.CnsmID...
cfg, | ||
) | ||
if err != nil { | ||
extCache.logger.WithField(common.TagErr, err).Error(`couldn't make Kafka client`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log consumer group and kafka topics here as well? Also better check logs in other places and add consumer group and kafka topics when possible. It will help to narrow down issues when they happen :)
} | ||
|
||
func getKafkaGroupIDForCheramiConsumerGroupName(cgName string) string { | ||
s := strings.Split(cgName, `/`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why split the consumer group path and use the last part? It may cause name conflict for kafka consumer group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka cannot use '/' in their consumer group names. This seems like a reasonable workaround, as long as the documentation is clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the Cherami document require the full path of consumer group (before KFC) to be unique for only the last part to be unique?
Also, I tried to use / in kafka consumer group name (e.g. /kafka/consumer/group/xxx), and there is no error. So you may be able to just use the full path for kafka, or, replace "/" with "_"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if that works, then I'm OK with it, but I wonder if it will break something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followed up with the Kafka team; they discourage using '/' in consumer group names, even if it will work.
services/outputhost/extcache.go
Outdated
// TODO: Use Sarama GetMetadata to get the list of partitions, then build the offset request | ||
// to use with GetAvailableOffsets, and then "somehow" manually commit it so that sarama-cluster | ||
// starts from the right place | ||
if common.Now()-startFrom > common.UnixNanoTime(time.Hour*24*7) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the 7 day to a const..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
common/stdLoggerFromBark.go
Outdated
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used? if it is you can leave as is, if not we can remove this..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
No description provided.