Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
integration test for kafka-for-cherami
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed Apr 26, 2017
1 parent a5a8629 commit f7514e8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 54 deletions.
2 changes: 1 addition & 1 deletion config/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ ReplicatorConfig:

# KafkaConfig specifies
KafkaConfig:
kafkaClusterConfigFile: "./config/local_kafka_clusters.yaml"
kafkaClusterConfigFile: "../../config/local_kafka_clusters.yaml"

# Logging configuration
logging:
Expand Down
121 changes: 68 additions & 53 deletions test/integration/kfc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ package integration

import (
"github.com/Shopify/sarama"
"github.com/pborman/uuid"
"github.com/stretchr/testify/suite"
"log"
"math/rand"
"net"
"strconv"
"sync"
"testing"
"time"

Expand All @@ -42,10 +42,11 @@ import (
type KfCIntegrationSuite struct {
testBase

cheramiConsumer cheramiclient.Consumer
cheramiConsumerCh cheramiclient.Delivery
kafkaProducer sarama.SyncProducer
kafkaTopics []string
destPath, cgPath string
cheramiConsumer cheramiclient.Consumer
cheramiMsgsCh chan cheramiclient.Delivery
kafkaProducer sarama.SyncProducer
kafkaTopics []string
}

func TestKfCIntegrationSuite(t *testing.T) {
Expand All @@ -54,11 +55,10 @@ func TestKfCIntegrationSuite(t *testing.T) {
suite.Run(t, s)
}

func (s *KfCIntegrationSuite) setupTest() {
func (s *KfCIntegrationSuite) TestKafkaForCherami() {

destPath := "/kafka_test_dest/kfc"
cgPath := "/kafka_test_cg/kfc"
s.kafkaTopics = []string{"kfc0", "kfc1"}
s.destPath, s.cgPath = "/kafka_test_dest/kfc", "/kafka_test_cg/kfc"
s.kafkaTopics = []string{uuid.New(), uuid.New(), uuid.New()}

// cherami client
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
Expand All @@ -67,23 +67,23 @@ func (s *KfCIntegrationSuite) setupTest() {

// create cherami kfc destination
destDesc, err := cheramiClient.CreateDestination(&cherami.CreateDestinationRequest{
Path: common.StringPtr(destPath),
Path: common.StringPtr(s.destPath),
Type: cherami.DestinationTypePtr(cherami.DestinationType_KAFKA),
ConsumedMessagesRetention: common.Int32Ptr(60),
UnconsumedMessagesRetention: common.Int32Ptr(120),
ChecksumOption: cherami.ChecksumOption_CRC32IEEE,
OwnerEmail: common.StringPtr("[email protected]"),
IsMultiZone: common.BoolPtr(false),
KafkaCluster: common.StringPtr("local"),
KafkaTopics: kafkaTopics,
KafkaTopics: s.kafkaTopics,
})
s.NotNil(destDesc)
s.NoError(err)

// create cherami kfc consumer group
cgDesc, err := cheramiClient.CreateConsumerGroup(&cherami.CreateConsumerGroupRequest{
ConsumerGroupName: common.StringPtr(cgPath),
DestinationPath: common.StringPtr(destPath),
ConsumerGroupName: common.StringPtr(s.cgPath),
DestinationPath: common.StringPtr(s.destPath),
LockTimeoutInSeconds: common.Int32Ptr(30),
MaxDeliveryCount: common.Int32Ptr(1),
OwnerEmail: common.StringPtr("[email protected]"),
Expand All @@ -93,73 +93,88 @@ func (s *KfCIntegrationSuite) setupTest() {
s.NotNil(cgDesc)

s.cheramiConsumer = cheramiClient.CreateConsumer(&cheramiclient.CreateConsumerRequest{
Path: destPath,
ConsumerGroupName: cgPath,
Path: s.destPath,
ConsumerGroupName: s.cgPath,
ConsumerName: "KfCIntegration",
PrefetchCount: 1,
Options: &cheramiclient.ClientOptions{Timeout: time.Second * 30}, // this is the thrift context timeout
})
s.NotNil(s.cheramiConsumer)
defer s.cheramiConsumer.Close()

// setup kafka producer
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true

brokers := []string{"localhost:9092"}

s.kafkaProducer, err = sarama.NewSyncProducer(brokers, config)
s.kafkaProducer, err = sarama.NewSyncProducer([]string{"localhost:9092"}, config)
s.NoError(err)
defer s.kafkaProducer.Close()

s.cheramiMsgsCh, err = s.cheramiConsumer.Open(make(chan cheramiclient.Delivery, 1))
s.NoError(err)

var wgConsumer sync.WaitGroup
const numMsgs = 10

log := common.GetDefaultLogger()
type kafkaMsg struct {
topic string
key string
val []byte
part int32
offs int64
}

wgConsumer.Add(1)
go func() {
defer wgConsumer.Done()
msgs := make(map[string]*kafkaMsg)

ReadLoop:
for {
timeout := time.NewTimer(time.Second * 45)
// publish messages to kafka
for i := 0; i < numMsgs; i++ {

select {
case msg := <-msgsCh:
log.Infof("consumer: recv msg id: %v [addr=%x]", msg.GetMessage().Payload.GetID(), msg.GetMessage().GetAddress())
msg.Ack()
const minSize, maxSize = 512, 8192

case <-timeout.C:
log.Errorf("consumer: timed-out")
break ReadLoop
}
}
}()
}
var topic = s.kafkaTopics[rand.Intn(len(s.kafkaTopics))] // pick one of the topics at random
var key = uuid.New() // random key
var val = make([]byte, minSize+rand.Intn(maxSize-minSize)) // random buf
rand.Read(val) // fill 'val' with random bytes

func (s *KfCIntegrationSuite) cleanupTest() {
part, offs, err :=
s.kafkaProducer.SendMessage(
&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(val),
},
)

err := s.kafkaProducer.Close()
s.NoError(err)
s.NoError(err)

s.cheramiConsumer.Close()
}
msgs[key] = &kafkaMsg{topic: topic, key: key, val: val, part: part, offs: offs}
}

func (s *KfCIntegrationSuite) consumeFromCherami() {
}
// consume messages from cherami
loop:
for i := 0; i < numMsgs; i++ {

select {
case cmsg := <-s.cheramiMsgsCh:
payload := cmsg.GetMessage().Payload
uc := payload.GetUserContext()

func (s *KfCIntegrationSuite) publishToKafka(topic, msg string) (partition int32, offset int64, err error) {
msg := msgs[uc["key"]]

kmsg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(msg)}
partition, offset, err = s.kafkaProducer.SendMessage(kmsg)
if err != nil {
log.Printf("Failed to send message: %s\n", err)
} else {
log.Printf("Sent message to partition %d at offset %d: %s\n", partition, offset, kmsg)
s.Equal(msg.topic, uc["topic"])
s.Equal(msg.key, uc["key"])
s.Equal(len(msg.val), len(payload.GetData()))
s.EqualValues(msg.val, payload.GetData())
s.Equal(strconv.Itoa(int(msg.part)), uc["partition"])
s.Equal(strconv.Itoa(int(msg.offs)), uc["offset"])

cmsg.Ack()

case <-time.After(45 * time.Second):
s.Fail("cherami-consumer: timed out")
break loop
}
}
s.Nil(err)

return
}

0 comments on commit f7514e8

Please sign in to comment.