Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

自动创建Rocket topic #21

Merged
merged 10 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions v2/brokers/rocketmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package brokers

import (
"fmt"

"github.com/gojuukaze/YTask/v2/drive"
"github.com/gojuukaze/YTask/v2/message"
"github.com/gojuukaze/YTask/v2/util/yjson"
Expand All @@ -11,31 +12,35 @@ import (

type RocketMqBroker struct {
client *drive.RocketMqClient
host string
port string
namesrvAddr []string
brokerAddr []string
auto bool
}

func NewRocketMqBroker(host, port string) RocketMqBroker {
func NewRocketMqBroker(namesrvAddr []string,brokerAddr... []string) RocketMqBroker {
/*
1、目前不能自动创建topic (mqadmin手动创建,并设置读写队列数为1)
2、rocketmq topic名称不允许存在 ‘:’ ,
(所以在生产、消费前先做了名称转换topic RocketMqClient.topicChecker 将非法字符全部转换为 ‘_’)
3、为提供pullConsumer实现,所以添加了在worker和consumer之间添加了 RocketMqClient.MsgChan
4、consumerOffset不能同步更新,所以任务执行时间更长
FIX:1、目前不能自动创建topic (mqadmin手动创建,并设置读写队列数为1)
2、consumerOffset不能同步更新,所以任务执行时间更长
(需要将队列中多余的message消费掉才能消费到当前taskId对应的消息)
5、未支持RocketMqBroker.LSend
6、rockemq日志级别设置,设置环境变量"ROCKETMQ_GO_LOG_LEVEL"="error"||"info"||“debug”||...
3、未支持RocketMqBroker.LSend
*/
var auto bool
if len(brokerAddr)>0 {
auto=true
}
return RocketMqBroker{
host: host,
port: port,
//poolSize: 0,
namesrvAddr: namesrvAddr,
brokerAddr: brokerAddr[0],
auto: auto,
}
}
func (r *RocketMqBroker) Activate() {

client := drive.NewRocketMqClient(r.host, r.port)
client := drive.NewRocketMqClient(
drive.WithNameSrvAddr(r.namesrvAddr),
drive.WithBrokerAddr(r.brokerAddr),
drive.WithAutoCreateTopic(r.auto))
r.client = &client

}

func (r *RocketMqBroker) SetPoolSize(n int) {
Expand All @@ -58,12 +63,11 @@ func (r *RocketMqBroker) Next(topic string) (message.Message, error) {
select {
case value=<-queue:

case <-time.After(2 * time.Second):
case <-time.After(5 * time.Second):
return msg,yerrors.ErrEmptyQuery{}
}

err = yjson.YJson.UnmarshalFromString(value, &msg)

return msg, err
}

Expand Down Expand Up @@ -92,28 +96,36 @@ func (r *RocketMqBroker) LSend(queueName string, msg message.Message) error {
func (r RocketMqBroker) Clone() BrokerInterface {

return &RocketMqBroker{
host: r.host,
port: r.port,

//poolSize: 0,
namesrvAddr: r.namesrvAddr,
brokerAddr: r.brokerAddr,
auto: r.auto,
}
}
//目前不做使用
func (r RocketMqBroker)Shutdown(){
for topic,producer:=range r.client.RocketMqProducerMap{
TRY1:
err:=producer.Shutdown()
if err !=nil{
fmt.Println(topic,"producer shutdown err",err)
goto TRY1
}
TRY1:
err:=r.client.Producer.Shutdown()
if err !=nil{
fmt.Println("YTask[RocketMQ]: producer shutdown err:",err)
goto TRY1
}
for topic,consumer:=range r.client.RocketMqConsumerMap{
close(r.client.MsgChanMap[topic])
for topic,consumer:=range r.client.ConsumerMap{
err:=consumer.Unsubscribe(topic)
if err!=nil {
fmt.Println("YTask[RocketMQ]: Unsubscribe err: ",err)
}
TRY2:
err:=consumer.Shutdown()
err=consumer.Shutdown()
if err !=nil{
fmt.Println(topic,"consumer shutdown err",err)
fmt.Println(topic,"YTask[RocketMQ]: consumer shutdown err: ",err)
goto TRY2
}
close(r.client.MsgChanMap[topic])

r.client.TopicDeleter(topic)
//consumer.Shutdown()方法没法及时同步,所以在异步任务结束后删除topic
//重新开启任务时创建topic,代理点位和消费点位重置为0
//不得已为之,待改善
}
r.client.Admin.Close()
}
170 changes: 116 additions & 54 deletions v2/drive/rocketmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,137 @@ import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/producer"
"regexp"
"github.com/apache/rocketmq-client-go/v2/admin"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"regexp"
"sync"
)

type RocketMqClient struct {
addr primitive.NamesrvAddr
group string
RocketMqProducerMap map[string]rocketmq.Producer
RocketMqConsumerMap map[string]rocketmq.PushConsumer
MsgChanMap map[string]chan string
options *clientOptions
Producer rocketmq.Producer
ConsumerMap map[string]rocketmq.PushConsumer
MsgChanMap map[string]chan string
Admin admin.Admin
topicMap sync.Map

}
type clientOptions struct {
NamesrvAddr primitive.NamesrvAddr
AutoCreateTopic bool
BrokerAddr []string
auto bool
}
type ClientOption func(options *clientOptions)

func defaultAdminOptions() *clientOptions {
return &clientOptions{}
}
func WithNameSrvAddr(addr []string) ClientOption{
return func(opts *clientOptions) {
opts.NamesrvAddr=addr
}
}
func WithBrokerAddr(addr []string) ClientOption{
return func(opts *clientOptions) {
opts.BrokerAddr=addr
}
}
func WithAutoCreateTopic(auto bool) ClientOption {
return func(opts *clientOptions) {
opts.auto=auto
}
}

func NewRocketMqClient(host,port string) RocketMqClient{

func NewRocketMqClient(opts... ClientOption) RocketMqClient{
defaultOpts := defaultAdminOptions()
for _, opt := range opts {
opt(defaultOpts)
}
var adm admin.Admin
var err error
addr,err:=primitive.NewNamesrvAddr(host+":"+port)

adm, err = admin.NewAdmin(admin.WithResolver(
primitive.NewPassthroughResolver(defaultOpts.NamesrvAddr)))
if err!=nil {
panic("YTask: rocketMq error : " + err.Error())
panic("YTask: admin create error : "+err.Error())
}

input, err := rocketmq.NewProducer(
producer.WithNameServer(defaultOpts.NamesrvAddr),
//producer.WithCreateTopicKey(topic),
)
if err!=nil {
panic("YTask[RockerMQ]: Producer create error : " + err.Error())
}
err=input.Start()
if err!=nil {
panic("YTask[RockerMQ]: Producer start error : " +err.Error())
}
return RocketMqClient{
addr:addr,
RocketMqProducerMap: make(map[string]rocketmq.Producer) ,
RocketMqConsumerMap: make(map[string]rocketmq.PushConsumer),
options: defaultOpts,
Producer: input,
ConsumerMap: make(map[string]rocketmq.PushConsumer),
MsgChanMap: make(map[string]chan string),
Admin: adm,
}
}
func (c *RocketMqClient) topicCreator(topic string) {
if c.options.BrokerAddr == nil {
return
}
//create topic
for _, addr := range c.options.BrokerAddr {
err := c.Admin.CreateTopic(
context.Background(),
admin.WithTopicCreate(topic),
admin.WithBrokerAddrCreate(addr),
admin.WithReadQueueNums(1),
admin.WithWriteQueueNums(1),
admin.WithPerm(6),
)
if err != nil {
fmt.Println("YTask[RocketMQ]: create topic error:", err.Error())
}
}
}
func (c *RocketMqClient) TopicDeleter(topic string) {
//delete topic
err:=c.Admin.DeleteTopic(
context.Background(),
admin.WithTopicDelete(topic),
)
if err != nil {
fmt.Println("YTask[RocketMQ]: delete topic error:", err.Error())
}
}


func (c *RocketMqClient) topicChecker(topic string)(string) {
//rocketmq topic 只能包含%数字大小写字母及下划线和中划线
re := regexp.MustCompile("[^A-z0-9_-]")
//所以用下划线替换非法字符
return re.ReplaceAllString(topic, "_")
}
func (c *RocketMqClient) Register(topic string) (<-chan string,error){
topic=c.topicChecker(topic)


if _,ok:=c.MsgChanMap[topic];!ok{
func (c *RocketMqClient) Register(topic string)(<-chan string,error){
topic=c.topicChecker(topic)
if _,ok:=c.ConsumerMap[topic];!ok{
if _,ok:=c.topicMap.LoadOrStore(topic,1);!ok {
if c.options.auto {
c.topicCreator(topic)
}
}
c.MsgChanMap[topic]=make(chan string,0)
output,err:=rocketmq.NewPushConsumer(
consumer.WithNameServer(c.addr),
consumer.WithNameServer(c.options.NamesrvAddr),
consumer.WithGroupName(topic),
)
c.RocketMqConsumerMap[topic]=output
/*addr,_:=internal.NewNamesrv(c.addr)
options:=internal.ClientOptions{
GroupName: topic,
Namesrv: addr,
if err!=nil {
panic("YTask[RockerMQ]: Consumer create error : " + err.Error())
}
callBackChan:=make(chan interface{})
client:=internal.GetOrNewRocketMQClient(options,callBackChan)
offset:=consumer.NewRemoteOffsetStore(topic,client,addr)*/
output.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

Expand All @@ -69,40 +145,26 @@ func (c *RocketMqClient) Register(topic string) (<-chan string,error){
return consumer.ConsumeSuccess, nil
})
err=output.Start()
if err!=nil{
fmt.Println("consumer start error ",err.Error())
if err!=nil {
panic("YTask[RockerMQ]: Consumer start error : " +err.Error())
}
return c.MsgChanMap[topic],err
c.ConsumerMap[topic]=output
return c.MsgChanMap[topic],nil
}
//pull方式未实现
//ref:=reflect.ValueOf(c.rocketMqConsumer)
//method:=ref.MethodByName("Pull")
//args:=[]reflect.Value{reflect.ValueOf(context.Background()),
// reflect.ValueOf(topic),
// reflect.ValueOf(consumer.MessageSelector{}),
// reflect.ValueOf(1)}
//result:=method.Call(args)
//res,err:=result[0].Interface().((*primitive.PullResult)),result[1].Interface().(error)

return c.MsgChanMap[topic],nil
}
func (c *RocketMqClient) Publish(topic string,value interface{}, Priority uint8) error {

topic=c.topicChecker(topic)
if _,ok:=c.RocketMqProducerMap[topic];!ok{
input, err := rocketmq.NewProducer(
producer.WithNameServer(c.addr),
producer.WithCreateTopicKey(topic),
producer.WithGroupName(topic),
)
err=input.Start()
if err!=nil {
panic("YTask: rocketMq error : " + err.Error())
return err

func (c *RocketMqClient) Publish(topic string,value interface{}, Priority uint8) error {
if _,ok:=c.topicMap.LoadOrStore(topic,1);!ok {
if c.options.auto {
c.topicCreator(topic)
}
c.RocketMqProducerMap[topic]=input
}
fmt.Println("product:",string(value.([]byte)))
_, err := c.RocketMqProducerMap[topic].SendSync(context.Background(),
topic=c.topicChecker(topic)
fmt.Println("produce:",string(value.([]byte)))
_, err := c.Producer.SendSync(context.Background(),
primitive.NewMessage(topic,value.([]byte)))
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/gojuukaze/YTask/v2
go 1.12

require (
github.com/apache/rocketmq-client-go/v2 v2.0.0
github.com/apache/rocketmq-client-go/v2 v2.1.0-rc5.0.20201102074636-e1d9be806c18
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/go-redis/redis/v7 v7.4.0
github.com/golang/protobuf v1.3.3 // indirect
Expand Down
9 changes: 7 additions & 2 deletions v2/test/rockemqBroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (

func TestRocketMqBroker(t *testing.T) {

broker := brokers.NewRocketMqBroker("127.0.0.1", "9876")

broker := brokers.NewRocketMqBroker([]string{"127.0.0.1:9876"},[]string{"127.0.0.1:10911"})

broker.Activate()
//broker.Shutdown()主要是为了关闭consumer,同步offset到broker
//BUG:会出现同步失败
Expand Down Expand Up @@ -47,7 +49,10 @@ func TestRocketMqBroker(t *testing.T) {
}

func TestRocketMqBrokerLSend(t *testing.T) {
broker := brokers.NewRocketMqBroker("127.0.0.1", "9876")
broker := brokers.NewRocketMqBroker(
[]string{"127.0.0.1:9876"},
[]string{"127.0.0.1:10911"})

broker.Activate()
defer broker.Shutdown()
msg := message.NewMessage(controller.NewTaskCtl())
Expand Down
1 change: 1 addition & 0 deletions v2/test/ytask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func testWorker1(ser server.Server, t *testing.T) {
if !result.IsSuccess() {
t.Fatal("result is not success")
}

}

func testWorker2(ser server.Server, t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions v2/ytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (i iBroker) NewRabbitMqBroker(host, port, user, password, vhost string) bro
return brokers.NewRabbitMqBroker(host, port, user, password, vhost)
}

func (i iBroker) NewRocketMqBroker(host, port string) brokers.RocketMqBroker {
return brokers.NewRocketMqBroker(host, port)
func (i iBroker) NewRocketMqBroker(namesrvAddr []string,brokerAddr... []string) brokers.RocketMqBroker {
return brokers.NewRocketMqBroker(namesrvAddr,brokerAddr...)
}

type iConfig struct {
Expand Down