Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: Support RocketMQ Option After Streaming Service Finished #33962

Open
1 task done
CocaineCong opened this issue Jun 18, 2024 · 7 comments
Open
1 task done
Assignees
Labels
kind/feature Issues related to feature request from users

Comments

@CocaineCong
Copy link

Is there an existing issue for this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe.

Support for more types of MQ, more diversity

Describe the solution you'd like.

  • Provides Producer send msg , with option such as transaction、sync、async
  • Provides PushConsumer & PullConsumer
  • Provides RocketMQ deployment strategy & unit test

Describe an alternate solution.

No response

Anything else? (Additional Context)


title: support rocketmq in milvus

authors: @CocaineCong

creation-date: 2024-06-19


Support RocketMQ Option & RocketMQ API Controller

Milvus currently supports message queue (MQ) options for tasks such as asynchronous data processing, real-time data synchronization, and event-driven architectures. However, the existing MQ options are limited, and users may require additional flexibility and compatibility with different messaging systems.

image

Summary

This proposal suggests extending the message queue options in Milvus to include support for RocketMQ, a distributed messaging and streaming platform developed by the Apache Software Foundation.

RocketMQ provides features like high throughput, fault tolerance, scalability, and strong consistency, making it a popular choice for real-time messaging and event-driven applications.

Motivation

Goals

  • Provides Producer send msg , with option such as transaction、sync、async
  • Provides PushConsumer & PullConsumer
  • Provides RocketMQ deployment strategy & unit test

Preliminary Design

In total, we need to implement the interfaces in these files.

image
  • client 模块
// Client is the interface that provides operations of message queues
type Client interface {
	// CreateProducer creates a producer instance
	CreateProducer(options ProducerOptions) (Producer, error)
	// Subscribe creates a consumer instance and subscribe a topic
	Subscribe(options ConsumerOptions) (Consumer, error)
	// Get the earliest MessageID
	EarliestMessageID() MessageID
	// String to msg ID
	StringToMsgID(string) (MessageID, error)
	// Deserialize MessageId from a byte array
	BytesToMsgID([]byte) (MessageID, error)
	// Close the client and free associated resources
	Close()
}
  • Consumer
type Consumer interface {
	// returns the subscription for the consumer
	Subscription() string
	// Get Message channel, once you chan you can not seek again
	Chan() <-chan Message
	// Seek to the uniqueID position, the second bool param indicates whether the message is included in the position
	Seek(MessageID, bool) error //nolint:govet
	// Ack make sure that msg is received
	Ack(Message)
	// Close consumer
	Close()
	// GetLatestMsgID return the latest message ID
	GetLatestMsgID() (MessageID, error)
	// check created topic whether vaild or not
	CheckTopicValid(channel string) error
}
  • message id
type MessageID interface {
	// Serialize the message id into a sequence of bytes that can be stored somewhere else
	Serialize() []byte
	AtEarliestPosition() bool
	LessOrEqualThan(msgID []byte) (bool, error)
	Equal(msgID []byte) (bool, error)
}
  • message
type Message interface {
	// Topic get the topic from which this message originated from
	Topic() string
	// Properties are application defined key/value pairs that will be attached to the message.
	// Return the properties attached to the message.
	Properties() map[string]string
	// Payload get the payload of the message
	Payload() []byte
	// ID get the unique message ID associated with this message.
	// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
	ID() MessageID
}
  • producer
type Producer interface {
	// return the topic which producer is publishing to
	// Topic() string
	// publish a message
	Send(ctx context.Context, message *ProducerMessage) (MessageID, error)
	Close()
}

Design Details

later…

@CocaineCong CocaineCong added the kind/feature Issues related to feature request from users label Jun 18, 2024
@CocaineCong
Copy link
Author

If this proposal passes, please assign it to me.🫡

@xiaofan-luan
Copy link
Collaborator

@chyezh
could you helo on reviewing it?

@chyezh
Copy link
Contributor

chyezh commented Jun 19, 2024

@CocaineCong
In Milvus 2.4.5, we will introduce wal service instead of original mqwrapper interface.
new design will be given at #33285.

And we want a more simple interface of wal instead of mqwrapper.
Read + Append instead of Produce and Consume.

Recently, the interface definition PR #33745 will be merged.
You can see and review the definition in README, #33745 (comment).
And then I will give a new PR as an example to reimplement pulsar mq into wal.

Could you help to modify rocketMQ proposal after those PR merged?
rocketMQ can be implement as the wal interface directly, and to be published at v2.4.5.

@chyezh
Copy link
Contributor

chyezh commented Jun 19, 2024

we need implement the following interface:

  • message.MessageID
  • walimpls.OpenerBuilderImpls
  • walimpls.OpenerImpls
  • walimpls.ScannerImpls
  • walimpls.WALImpls

@CocaineCong
Copy link
Author

@CocaineCong In Milvus 2.4.5, we will introduce wal service instead of original mqwrapper interface. new design will be given at #33285.

And we want a more simple interface of wal instead of mqwrapper. Read + Append instead of Produce and Consume.

Recently, the interface definition PR #33745 will be merged. You can see and review the definition in README, #33745 (comment). And then I will give a new PR as an example to reimplement pulsar mq into wal.

Could you help to modify rocketMQ proposal after those PR merged? rocketMQ can be implement as the wal interface directly, and to be published at v2.4.5.

ok, I got it. anything need my help just let me know.

@CocaineCong CocaineCong changed the title [Feature]: Support RocketMQ Option [Feature]: Support RocketMQ Option After Streaming Service Finished Jun 19, 2024
@chyezh chyezh self-assigned this Jun 19, 2024
@chyezh
Copy link
Contributor

chyezh commented Jun 27, 2024

related PR is ready at master branch: #34046

You can design and implement wal services based on RocketMQ now.
pulsar implementation is here, pkg/streaming/walimpls/impls/pulsar/.

If the PR pass the walimpls.NewWALImplsTestFramework unit test, we can start to review it. And please link your PR with issue #33285.

Moreover, ScannerImpls interface should be implemented as a scanner without any server-side persistent state, such as Pulsar's Reader interface to avoid resource leak. If RocketMQ does not support this, please let me know. I will design scanner's garbage collection plan in future.

/assign @CocaineCong

@CocaineCong
Copy link
Author

related PR is ready at master branch: #34046

You can design and implement wal services based on RocketMQ now. pulsar implementation is here, pkg/streaming/walimpls/impls/pulsar/.

If the PR pass the walimpls.NewWALImplsTestFramework unit test, we can start to review it. And please link your PR with issue #33285.

Moreover, ScannerImpls interface should be implemented as a scanner without any server-side persistent state, such as Pulsar's Reader interface to avoid resource leak. If RocketMQ does not support this, please let me know. I will design scanner's garbage collection plan in future.

/assign @CocaineCong

got it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature Issues related to feature request from users
Projects
None yet
Development

No branches or pull requests

3 participants