Skip to content

Commit

Permalink
latency on perftest
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Nov 25, 2024
1 parent 3b0dc4c commit 6077958
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 45 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,23 @@ jobs:
- name: Install GNU make
run: choco install make
- run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }}
publish:
runs-on: ubuntu-latest
needs: [test]
steps:
- uses: docker/setup-buildx-action@v2
- uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: actions/checkout@v3
- name: Publish Docker Image
run: |
set -x
VERSION=latest
export VERSION
if [[ ! $GITHUB_REF =~ "/tags/" ]]
then
VERSION=dev
fi
make perf-test-docker-push
41 changes: 27 additions & 14 deletions examples/tls/getting_started_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"crypto/tls"
"fmt"
"github.com/google/uuid"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
Expand Down Expand Up @@ -48,13 +47,18 @@ func main() {
fmt.Println("Getting started with Streaming TLS client for RabbitMQ")
fmt.Println("Connecting to RabbitMQ streaming ...")

addressResolver := stream.AddressResolver{
Host: "35.234.132.231",
Port: 5551,
}
// Connect to the broker ( or brokers )
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost("localhost").
SetPort(5551). // standard TLS port
SetUser("guest").
SetPassword("guest").
SetAddressResolver(addressResolver).
SetPort(addressResolver.Port). // standard TLS port
SetHost(addressResolver.Host).
SetUser("remote").
SetPassword("remote").
IsTLS(true).
// use tls.Config to customize the TLS configuration
// for tests you may need InsecureSkipVerify: true
Expand All @@ -73,12 +77,12 @@ func main() {
// err = env.DeclareStream(streamName, nil)
// it is the best practise to define a size, 1GB for example:

streamName := uuid.New().String()
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
streamName := "perf-test-go"
//err = env.DeclareStream(streamName,
// &stream.StreamOptions{
// MaxLengthBytes: stream.ByteCapacity{}.GB(2),
// },
//)

CheckErr(err)

Expand All @@ -92,7 +96,7 @@ func main() {

// the send method automatically aggregates the messages
// based on batch size
for i := 0; i < 1000; i++ {
for i := 0; i < 10; i++ {
err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
CheckErr(err)
}
Expand All @@ -107,8 +111,17 @@ func main() {
//
//}, nil)
// if you need to track the offset you need a consumer name like:
consumed := 0
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data)

consumed++
if consumed%1000 == 0 {

fmt.Printf("name: %s, offset %d, chunk entities count: %d, total: %d \n ",
consumerContext.Consumer.GetName(), consumerContext.Consumer.GetOffset(), consumerContext.GetEntriesCount(), consumed)

}

}

consumer, err := env.NewConsumer(
Expand All @@ -128,7 +141,7 @@ func main() {
err = consumer.Close()
time.Sleep(200 * time.Millisecond)
CheckErr(err)
err = env.DeleteStream(streamName)
//err = env.DeleteStream(streamName)
CheckErr(err)
err = env.Close()
CheckErr(err)
Expand Down
2 changes: 2 additions & 0 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var (
crcCheck bool
runDuration int
initialCredits int
isBatchSend bool
)

func init() {
Expand Down Expand Up @@ -76,6 +77,7 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.")
baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random")
baseCmd.PersistentFlags().IntVarP(&initialCredits, "initial-credits", "", 10, "Consumer initial credits")
baseCmd.PersistentFlags().BoolVarP(&isBatchSend, "batch-send", "", false, "Enable batch send")
baseCmd.AddCommand(versionCmd)
baseCmd.AddCommand(newSilent())
}
Expand Down
84 changes: 53 additions & 31 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ func newSilent() *cobra.Command {
}

var (
publisherMessageCount int32
consumerMessageCount int32
publisherMessageCount int32
consumerMessageCount int32
//consumerMessageCountPerLatency int32
totalLatency int64
confirmedMessageCount int32
notConfirmedMessageCount int32
consumersCloseCount int32
Expand Down Expand Up @@ -77,9 +79,15 @@ func printStats() {

PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
//latency := float64(totalLatency) / float64(atomic.LoadInt32(&consumerMessageCount))
averageLatency := int64(0)
if atomic.LoadInt32(&consumerMessageCount) > 0 {
averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount))
}

ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v |",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent))
logInfo("Published %8.1f msg/s | Confirmed %8.1f msg/s | Consumed %8.1f msg/s | %3v | %3v | msg sent: %3v | latency: %d ms",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent), averageLatency)
}
}

Expand Down Expand Up @@ -273,28 +281,9 @@ func startPublisher(streamName string) error {
return err
}

var arr []message.StreamMessage
var body []byte
for z := 0; z < batchSize; z++ {

if fixedBody > 0 {
body = make([]byte, fixedBody)
} else {
if variableBody > 0 {
rand.Seed(time.Now().UnixNano())
body = make([]byte, rand.Intn(variableBody))
}
}
n := time.Now().UnixNano()
var buff = make([]byte, 8)
binary.BigEndian.PutUint64(buff, uint64(n))
/// added to calculate the latency
msg := amqp.NewMessage(append(buff, body...))
arr = append(arr, msg)
}

go func(prod *ha.ReliableProducer, messages []message.StreamMessage) {
go func(prod *ha.ReliableProducer) {
for {

if rate > 0 {
rateWithBatchSize := float64(rate) / float64(batchSize)
sleepAfterMessage := float64(time.Second) / rateWithBatchSize
Expand All @@ -313,21 +302,50 @@ func startPublisher(streamName string) error {
}
time.Sleep(time.Duration(sleep) * time.Millisecond)
}
messages := buildMessages()

atomic.AddInt64(&messagesSent, int64(len(arr)))
for _, streamMessage := range arr {
err = prod.Send(streamMessage)
atomic.AddInt64(&messagesSent, int64(len(messages)))
if isBatchSend {
err = prod.BatchSend(messages)
checkErr(err)
} else {
for _, streamMessage := range messages {
err = prod.Send(streamMessage)
checkErr(err)
}
}
atomic.AddInt32(&publisherMessageCount, int32(len(arr)))

atomic.AddInt32(&publisherMessageCount, int32(len(messages)))

}
}(rPublisher, arr)
}(rPublisher)

return nil

}

func buildMessages() []message.StreamMessage {
var arr []message.StreamMessage
for z := 0; z < batchSize; z++ {
//var body []byte
if fixedBody > 0 {
// body = make([]byte, fixedBody)
} else {
if variableBody > 0 {
rand.Seed(time.Now().UnixNano())
// body = make([]byte, rand.Intn(variableBody))
}
}
var buff = make([]byte, 8)
sentTime := time.Now().UnixMilli()
binary.BigEndian.PutUint64(buff, uint64(sentTime))
/// added to calculate the latency
msg := amqp.NewMessage(buff)
arr = append(arr, msg)
}
return arr
}

func startPublishers() error {

logInfo("Starting %d publishers...", publishers)
Expand Down Expand Up @@ -362,8 +380,12 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
func startConsumer(consumerName string, streamName string) error {

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&consumerMessageCount, 1)

sentTime := binary.BigEndian.Uint64(message.GetData()[:8]) // Decode the timestamp
startTimeFromMessage := time.UnixMilli(int64(sentTime))
latency := time.Now().Sub(startTimeFromMessage).Milliseconds()
totalLatency += latency
atomic.AddInt32(&consumerMessageCount, 1)
}
offsetSpec := stream.OffsetSpecification{}.Last()
switch consumerOffset {
Expand Down

0 comments on commit 6077958

Please sign in to comment.