Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #596] Fix command register conflict, add consumer and producer logic code #597

Merged
merged 2 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 69 additions & 65 deletions benchmark/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package main

import (
"context"
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -91,7 +95,7 @@ func (s *consumeSnapshots) printStati() {
)
}

type consumer struct {
type consumerBenchmark struct {
topic string
groupPrefix string
nameSrv string
Expand All @@ -107,7 +111,7 @@ type consumer struct {
}

func init() {
c := &consumer{}
c := &consumerBenchmark{}
flags := flag.NewFlagSet("consumer", flag.ExitOnError)
c.flags = flags

Expand All @@ -123,87 +127,87 @@ func init() {
registerCommand("consumer", c)
}

func (c *consumer) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
//consumer, err := rocketmq.NewPushConsumer(&rocketmq.PushConsumerConfig{
// ClientConfig: rocketmq.ClientConfig{
// GroupID: c.groupID,
// NameServer: c.nameSrv,
// },
// ThreadCount: c.instanceCount,
// MessageBatchMaxSize: 16,
//})
//if err != nil {
// panic("new push consumer error:" + err.Error())
//}
//
//consumer.Subscribe(c.topic, c.expression, func(m *rocketmq.MessageExt) rocketmq.ConsumeStatus {
// atomic.AddInt64(&stati.receiveMessageTotal, 1)
// now := time.Now().UnixNano() / int64(time.Millisecond)
// b2cRT := now - m.BornTimestamp
// atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
// s2cRT := now - m.StoreTimestamp
// atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)
//
// for {
// old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
// if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
// break
// }
// }
//
// for {
// old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
// if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
// break
// }
// }
//
// return rocketmq.ConsumeSuccess
//})
//println("Start")
//consumer.Start()
//select {
//case <-exit:
// consumer.Shutdown()
// return
//}
func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(bc.groupID),
consumer.WithNameServer([]string{bc.nameSrv}),
)
if err != nil {
panic("new push consumer error:" + err.Error())
}

selector := consumer.MessageSelector{}
err = c.Subscribe(bc.topic, selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
atomic.AddInt64(&stati.receiveMessageTotal, 1)
now := time.Now().UnixNano() / int64(time.Millisecond)
b2cRT := now - msg.BornTimestamp
atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
s2cRT := now - msg.StoreTimestamp
atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)

for {
old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
break
}
}

for {
old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
break
}
}
}
return consumer.ConsumeSuccess, nil
})

println("Start")
c.Start()
select {
case <-exit:
c.Shutdown()
return
}
}

func (c *consumer) run(args []string) {
c.flags.Parse(args)
if c.topic == "" {
func (bc *consumerBenchmark) run(args []string) {
bc.flags.Parse(args)
if bc.topic == "" {
println("empty topic")
c.usage()
bc.usage()
return
}

if c.groupPrefix == "" {
if bc.groupPrefix == "" {
println("empty group prefix")
c.usage()
bc.usage()
return
}

if c.nameSrv == "" {
if bc.nameSrv == "" {
println("empty name server")
c.usage()
bc.usage()
return
}

if c.testMinutes <= 0 {
if bc.testMinutes <= 0 {
println("test time must be positive integer")
c.usage()
bc.usage()
return
}

if c.instanceCount <= 0 {
if bc.instanceCount <= 0 {
println("thread count must be positive integer")
c.usage()
bc.usage()
return
}

c.groupID = c.groupPrefix
if c.isPrefixEnable {
c.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
bc.groupID = bc.groupPrefix
if bc.isPrefixEnable {
bc.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
}

stati := statiBenchmarkConsumerSnapshot{}
Expand All @@ -214,7 +218,7 @@ func (c *consumer) run(args []string) {

wg.Add(1)
go func() {
c.consumeMsg(&stati, exitChan)
bc.consumeMsg(&stati, exitChan)
wg.Done()
}()

Expand Down Expand Up @@ -253,7 +257,7 @@ func (c *consumer) run(args []string) {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
select {
case <-time.Tick(time.Minute * time.Duration(c.testMinutes)):
case <-time.Tick(time.Minute * time.Duration(bc.testMinutes)):
case <-signalChan:
}

Expand All @@ -264,6 +268,6 @@ func (c *consumer) run(args []string) {
snapshots.printStati()
}

func (c *consumer) usage() {
c.flags.Usage()
func (bc *consumerBenchmark) usage() {
bc.flags.Usage()
}
114 changes: 60 additions & 54 deletions benchmark/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
package main

import (
"context"
"flag"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -93,7 +97,7 @@ func (s *produceSnapshots) printStati() {
)
}

type producer struct {
type producerBenchmark struct {
topic string
nameSrv string
groupID string
Expand All @@ -105,8 +109,8 @@ type producer struct {
}

func init() {
p := &producer{}
flags := flag.NewFlagSet("consumer", flag.ExitOnError)
p := &producerBenchmark{}
flags := flag.NewFlagSet("producer", flag.ExitOnError)
p.flags = flags

flags.StringVar(&p.topic, "t", "", "topic name")
Expand All @@ -116,60 +120,62 @@ func init() {
flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
flags.IntVar(&p.bodySize, "s", 32, "body size")

registerCommand("consumer", p)
registerCommand("producer", p)
}

func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
//p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
// ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
//})
//if err != nil {
// fmt.Printf("new consumer error:%s\n", err)
// return
//}
//
//p.Start()
//defer p.Shutdown()

//topic, tag := bp.topic, "benchmark-consumer"
//
//AGAIN:
// select {
// case <-exit:
// return
// default:
// }

//now := time.Now()
//r, err := p.SendMessageSync(&rocketmq.Message{
// Topic: bp.topic, Body: buildMsg(bp.bodySize),
//})
//
//if err != nil {
// fmt.Printf("send message sync error:%s", err)
// goto AGAIN
//}
//
//if r.Status == rocketmq.SendOK {
// atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
// atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
// currentRT := int64(time.Since(now) / time.Millisecond)
// atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
// prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
// for currentRT > prevRT {
// if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
// break
// }
// prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
// }
// goto AGAIN
//}
//
//fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
//goto AGAIN
func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
p, err := rocketmq.NewProducer(
producer.WithNameServer([]string{bp.nameSrv}),
producer.WithRetry(2),
)

if err != nil {
fmt.Printf("new producer error: %s\n", err)
return
}

err = p.Start()

defer p.Shutdown()

topic, tag := bp.topic, "benchmark-producer"
msgStr := buildMsg(bp.bodySize)

AGAIN:
select {
case <-exit:
return
default:
}

now := time.Now()
r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr)))

if err != nil {
fmt.Printf("send message sync error:%s", err)
goto AGAIN
}

if r.Status == primitive.SendOK {
atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
currentRT := int64(time.Since(now) / time.Millisecond)
atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
for currentRT > prevRT {
if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
break
}
prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
}
goto AGAIN
}

fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
goto AGAIN
}

func (bp *producer) run(args []string) {
func (bp *producerBenchmark) run(args []string) {
bp.flags.Parse(args)

if bp.topic == "" {
Expand Down Expand Up @@ -266,6 +272,6 @@ func (bp *producer) run(args []string) {
fmt.Println("TEST DONE")
}

func (bp *producer) usage() {
func (bp *producerBenchmark) usage() {
bp.flags.Usage()
}