diff --git a/benchmark/consumer.go b/benchmark/consumer.go index 941713cf..cada9333 100644 --- a/benchmark/consumer.go +++ b/benchmark/consumer.go @@ -18,8 +18,12 @@ package main import ( + "context" "flag" "fmt" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" "os" "os/signal" "sync" @@ -91,7 +95,7 @@ func (s *consumeSnapshots) printStati() { ) } -type consumer struct { +type consumerBenchmark struct { topic string groupPrefix string nameSrv string @@ -107,7 +111,7 @@ type consumer struct { } func init() { - c := &consumer{} + c := &consumerBenchmark{} flags := flag.NewFlagSet("consumer", flag.ExitOnError) c.flags = flags @@ -123,87 +127,87 @@ func init() { registerCommand("consumer", c) } -func (c *consumer) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) { - //consumer, err := rocketmq.NewPushConsumer(&rocketmq.PushConsumerConfig{ - // ClientConfig: rocketmq.ClientConfig{ - // GroupID: c.groupID, - // NameServer: c.nameSrv, - // }, - // ThreadCount: c.instanceCount, - // MessageBatchMaxSize: 16, - //}) - //if err != nil { - // panic("new push consumer error:" + err.Error()) - //} - // - //consumer.Subscribe(c.topic, c.expression, func(m *rocketmq.MessageExt) rocketmq.ConsumeStatus { - // atomic.AddInt64(&stati.receiveMessageTotal, 1) - // now := time.Now().UnixNano() / int64(time.Millisecond) - // b2cRT := now - m.BornTimestamp - // atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT) - // s2cRT := now - m.StoreTimestamp - // atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT) - // - // for { - // old := atomic.LoadInt64(&stati.born2ConsumerMaxRT) - // if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) { - // break - // } - // } - // - // for { - // old := atomic.LoadInt64(&stati.store2ConsumerMaxRT) - // if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) { - // break - // } - // } - // - // return rocketmq.ConsumeSuccess - //}) - //println("Start") - //consumer.Start() - //select { - //case <-exit: - // consumer.Shutdown() - // return - //} +func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) { + c, err := rocketmq.NewPushConsumer( + consumer.WithGroupName(bc.groupID), + consumer.WithNameServer([]string{bc.nameSrv}), + ) + if err != nil { + panic("new push consumer error:" + err.Error()) + } + + selector := consumer.MessageSelector{} + err = c.Subscribe(bc.topic, selector, func(ctx context.Context, + msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + for _, msg := range msgs { + atomic.AddInt64(&stati.receiveMessageTotal, 1) + now := time.Now().UnixNano() / int64(time.Millisecond) + b2cRT := now - msg.BornTimestamp + atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT) + s2cRT := now - msg.StoreTimestamp + atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT) + + for { + old := atomic.LoadInt64(&stati.born2ConsumerMaxRT) + if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) { + break + } + } + + for { + old := atomic.LoadInt64(&stati.store2ConsumerMaxRT) + if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) { + break + } + } + } + return consumer.ConsumeSuccess, nil + }) + + println("Start") + c.Start() + select { + case <-exit: + c.Shutdown() + return + } } -func (c *consumer) run(args []string) { - c.flags.Parse(args) - if c.topic == "" { +func (bc *consumerBenchmark) run(args []string) { + bc.flags.Parse(args) + if bc.topic == "" { println("empty topic") - c.usage() + bc.usage() return } - if c.groupPrefix == "" { + if bc.groupPrefix == "" { println("empty group prefix") - c.usage() + bc.usage() return } - if c.nameSrv == "" { + if bc.nameSrv == "" { println("empty name server") - c.usage() + bc.usage() return } - if c.testMinutes <= 0 { + if bc.testMinutes <= 0 { println("test time must be positive integer") - c.usage() + bc.usage() return } - if c.instanceCount <= 0 { + if bc.instanceCount <= 0 { println("thread count must be positive integer") - c.usage() + bc.usage() return } - c.groupID = c.groupPrefix - if c.isPrefixEnable { - c.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100) + bc.groupID = bc.groupPrefix + if bc.isPrefixEnable { + bc.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100) } stati := statiBenchmarkConsumerSnapshot{} @@ -214,7 +218,7 @@ func (c *consumer) run(args []string) { wg.Add(1) go func() { - c.consumeMsg(&stati, exitChan) + bc.consumeMsg(&stati, exitChan) wg.Done() }() @@ -253,7 +257,7 @@ func (c *consumer) run(args []string) { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) select { - case <-time.Tick(time.Minute * time.Duration(c.testMinutes)): + case <-time.Tick(time.Minute * time.Duration(bc.testMinutes)): case <-signalChan: } @@ -264,6 +268,6 @@ func (c *consumer) run(args []string) { snapshots.printStati() } -func (c *consumer) usage() { - c.flags.Usage() +func (bc *consumerBenchmark) usage() { + bc.flags.Usage() } diff --git a/benchmark/producer.go b/benchmark/producer.go index f3c1c608..537ffbe3 100644 --- a/benchmark/producer.go +++ b/benchmark/producer.go @@ -18,8 +18,12 @@ package main import ( + "context" "flag" "fmt" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" "os" "os/signal" "sync" @@ -93,7 +97,7 @@ func (s *produceSnapshots) printStati() { ) } -type producer struct { +type producerBenchmark struct { topic string nameSrv string groupID string @@ -105,8 +109,8 @@ type producer struct { } func init() { - p := &producer{} - flags := flag.NewFlagSet("consumer", flag.ExitOnError) + p := &producerBenchmark{} + flags := flag.NewFlagSet("producer", flag.ExitOnError) p.flags = flags flags.StringVar(&p.topic, "t", "", "topic name") @@ -116,60 +120,62 @@ func init() { flags.IntVar(&p.testMinutes, "m", 10, "test minutes") flags.IntVar(&p.bodySize, "s", 32, "body size") - registerCommand("consumer", p) + registerCommand("producer", p) } -func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) { - //p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{ - // ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv}, - //}) - //if err != nil { - // fmt.Printf("new consumer error:%s\n", err) - // return - //} - // - //p.Start() - //defer p.Shutdown() - - //topic, tag := bp.topic, "benchmark-consumer" - // - //AGAIN: - // select { - // case <-exit: - // return - // default: - // } - - //now := time.Now() - //r, err := p.SendMessageSync(&rocketmq.Message{ - // Topic: bp.topic, Body: buildMsg(bp.bodySize), - //}) - // - //if err != nil { - // fmt.Printf("send message sync error:%s", err) - // goto AGAIN - //} - // - //if r.Status == rocketmq.SendOK { - // atomic.AddInt64(&stati.receiveResponseSuccessCount, 1) - // atomic.AddInt64(&stati.sendRequestSuccessCount, 1) - // currentRT := int64(time.Since(now) / time.Millisecond) - // atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT) - // prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT) - // for currentRT > prevRT { - // if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) { - // break - // } - // prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT) - // } - // goto AGAIN - //} - // - //fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error()) - //goto AGAIN +func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) { + p, err := rocketmq.NewProducer( + producer.WithNameServer([]string{bp.nameSrv}), + producer.WithRetry(2), + ) + + if err != nil { + fmt.Printf("new producer error: %s\n", err) + return + } + + err = p.Start() + + defer p.Shutdown() + + topic, tag := bp.topic, "benchmark-producer" + msgStr := buildMsg(bp.bodySize) + +AGAIN: + select { + case <-exit: + return + default: + } + + now := time.Now() + r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr))) + + if err != nil { + fmt.Printf("send message sync error:%s", err) + goto AGAIN + } + + if r.Status == primitive.SendOK { + atomic.AddInt64(&stati.receiveResponseSuccessCount, 1) + atomic.AddInt64(&stati.sendRequestSuccessCount, 1) + currentRT := int64(time.Since(now) / time.Millisecond) + atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT) + prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT) + for currentRT > prevRT { + if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) { + break + } + prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT) + } + goto AGAIN + } + + fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error()) + goto AGAIN } -func (bp *producer) run(args []string) { +func (bp *producerBenchmark) run(args []string) { bp.flags.Parse(args) if bp.topic == "" { @@ -266,6 +272,6 @@ func (bp *producer) run(args []string) { fmt.Println("TEST DONE") } -func (bp *producer) usage() { +func (bp *producerBenchmark) usage() { bp.flags.Usage() }