Skip to content

Commit

Permalink
enhance: new messsage interfacefor log service
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed May 23, 2024
1 parent 5452376 commit 676b1c9
Show file tree
Hide file tree
Showing 28 changed files with 1,196 additions and 27 deletions.
15 changes: 15 additions & 0 deletions internal/mq/mqimpl/rocksmq/server/rmq_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ func (rid *RmqID) Equal(msgID []byte) (bool, error) {
return rid.MessageID == rMsgID, nil
}

// LT less than
func (rid *RmqID) LT(id2 mqwrapper.MessageID) bool {
return rid.MessageID < id2.(*RmqID).MessageID
}

// LTE less than or equal to
func (rid *RmqID) LTE(id2 mqwrapper.MessageID) bool {
return rid.MessageID <= id2.(*RmqID).MessageID
}

// EQ Equal to.
func (rid *RmqID) EQ(id2 mqwrapper.MessageID) bool {
return rid.MessageID == id2.(*RmqID).MessageID
}

// SerializeRmqID is used to serialize a message ID to byte array
func SerializeRmqID(messageID int64) []byte {
b := make([]byte, 8)
Expand Down
25 changes: 25 additions & 0 deletions internal/mq/mqimpl/rocksmq/server/rmq_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,31 @@ func Test_AtEarliestPosition(t *testing.T) {
assert.False(t, rid.AtEarliestPosition())
}

func TestCompare(t *testing.T) {
rid1 := &RmqID{
MessageID: 1,
}
rid2 := &RmqID{
MessageID: 2,
}
assert.True(t, rid1.LT(rid2))
assert.True(t, rid1.LTE(rid2))
assert.False(t, rid1.EQ(rid2))
assert.False(t, rid2.LT(rid1))
assert.False(t, rid2.LTE(rid1))
assert.False(t, rid2.EQ(rid1))

rid1 = &RmqID{
MessageID: 2,
}
assert.False(t, rid1.LT(rid2))
assert.True(t, rid1.LTE(rid2))
assert.True(t, rid1.EQ(rid2))
assert.False(t, rid2.LT(rid1))
assert.True(t, rid2.LTE(rid1))
assert.True(t, rid2.EQ(rid1))
}

func TestLessOrEqualThan(t *testing.T) {
rid1 := &RmqID{
MessageID: 0,
Expand Down
49 changes: 49 additions & 0 deletions internal/proto/log.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
syntax = "proto3";

package milvus.proto.log;

option go_package = "github.com/milvus-io/milvus/internal/proto/logpb";

import "milvus.proto";
import "google/protobuf/empty.proto";

//
// Common
//

// MessageID is the unique identifier of messages in same channel.
// It's different if underlying message system is different.
message MessageID {
oneof id {
MessageIDKafka kafka = 1;
MessageIDPulsar pulsar = 2;
MessageIDRmq rmq = 3;
MessageIDNmq nmq = 4;
}
}

// MessageIDKafka is the message id like kafka.
message MessageIDKafka {
int64 Offset = 1;
}

// MessageIDRmq is the message id like rocksmq.
message MessageIDRmq {
int64 offset = 1;
}

// MessageIDNmq is the message id like natsmq.
message MessageIDNmq {
uint64 offset = 1;
}

// MessageIDPulsar is the message id like pulsar.
message MessageIDPulsar {
bytes serialized = 1;
}

// Message is the basic unit of communication between publisher and consumer.
message Message {
bytes payload = 1; // message body
map<string, string> properties = 2; // message properties
}
78 changes: 78 additions & 0 deletions internal/util/logserviceutil/message/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package message

// NewBuilder creates a new builder.
func NewBuilder() *Builder {
return &Builder{
id: nil,
payload: nil,
properties: make(propertiesImpl),
}
}

// Builder is the builder for message.
type Builder struct {
id MessageID
payload []byte
properties propertiesImpl
}

// WithMessageID creates a new builder with message id.
func (b *Builder) WithMessageID(id MessageID) *Builder {
b.id = id
return b
}

// WithMessageType creates a new builder with message type.
func (b *Builder) WithMessageType(t MessageType) *Builder {
b.properties.Set(messageTypeKey, t.marshal())
return b
}

// WithProperty creates a new builder with message property.
func (b *Builder) WithProperty(key string, val string) *Builder {
b.properties.Set(key, val)
return b
}

// WithProperties creates a new builder with message properties.
func (b *Builder) WithProperties(kvs map[string]string) *Builder {
for key, val := range kvs {
b.properties.Set(key, val)
}
return b
}

// WithPayload creates a new builder with message payload.
func (b *Builder) WithPayload(payload []byte) *Builder {
b.payload = payload
return b
}

// BuildMutable builds a mutable message.
// Panic if set the message id.
func (b *Builder) BuildMutable() MutableMessage {
if b.id != nil {
panic("build a mutable message, message id should be nil")
}
// Set message version.
b.properties.Set(messageVersion, VersionV1.String())
return &messageImpl{
payload: b.payload,
properties: b.properties,
}
}

// BuildImmutable builds a immutable message.
// Panic if not set the message id.
func (b *Builder) BuildImmutable() ImmutableMessage {
if b.id == nil {
panic("build a immutable message, message id should not be nil")
}
return &immutableMessageImpl{
id: b.id,
messageImpl: messageImpl{
payload: b.payload,
properties: b.properties,
},
}
}
98 changes: 98 additions & 0 deletions internal/util/logserviceutil/message/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package message

import (
"github.com/milvus-io/milvus/internal/proto/logpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
)

var (
_ BasicMessage = (*messageImpl)(nil)
_ MutableMessage = (*messageImpl)(nil)
_ ImmutableMessage = (*immutableMessageImpl)(nil)
)

// NewMQProducerMessageFromMutableMessage creates a new mq producer message from mutable message.
// TODO: remove in the future.
func NewMQProducerMessageFromMutableMessage(m MutableMessage) *mqwrapper.ProducerMessage {
return &mqwrapper.ProducerMessage{
Payload: m.Payload(),
Properties: m.Properties().ToRawMap(),
}
}

// NewImmutableMessageFromMQConsumedMessage creates a new immutable message from mq producer message.
// TODO: remove in the future.
func NewImmutableMessageFromMQConsumedMessage(msg mqwrapper.Message) ImmutableMessage {
return &immutableMessageImpl{
id: msg.ID(),
messageImpl: messageImpl{
payload: msg.Payload(),
properties: propertiesImpl(msg.Properties()),
},

Check warning on line 31 in internal/util/logserviceutil/message/message.go

View check run for this annotation

Codecov / codecov/patch

internal/util/logserviceutil/message/message.go#L25-L31

Added lines #L25 - L31 were not covered by tests
}
}

// NewMessageFromPBMessage creates a new message from pb message.
func NewMessageFromPBMessage(msg *logpb.Message) MutableMessage {
return &messageImpl{
payload: msg.Payload,
properties: propertiesImpl(msg.Properties),
}
}

// NewImmutableMessageFromPBMessage creates a new immutable message from pb message.
func NewImmutableMessageFromPBMessage(id *logpb.MessageID, msg *logpb.Message) ImmutableMessage {
return &immutableMessageImpl{
id: NewMessageIDFromPBMessageID(id),
messageImpl: messageImpl{
payload: msg.Payload,
properties: propertiesImpl(msg.Properties),
},
}
}

// BasicMessage is the basic interface of message.
type BasicMessage interface {
// MessageType returns the type of message.
MessageType() MessageType

// Message payload.
Payload() []byte

// EstimateSize returns the estimated size of message.
EstimateSize() int
}

// MutableMessage is the mutable message interface.
// Message can be modified before it is persistent by wal.
type MutableMessage interface {
BasicMessage

WithTimeTick(tt uint64) MutableMessage

// Properties returns the message properties.
Properties() Properties
}

// ImmutableMessage is the read-only message interface.
// Once a message is persistent by wal, it will be immutable.
// And the message id will be assigned.
type ImmutableMessage interface {
BasicMessage

// TimeTick returns the time tick of current message.
// Available only when the message's version greater than 0.
// Otherwise, it will panic.
TimeTick() uint64

// MessageID returns the message id.
MessageID() MessageID

// Properties returns the message read only properties.
Properties() RProperties

// Version returns the message format version.
// 0: old version before lognode.
// from 1: new version after lognode.
Version() Version
}
34 changes: 34 additions & 0 deletions internal/util/logserviceutil/message/message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package message

// Handler is used to handle message read from log.
type Handler interface {
// Handle is the callback for handling message.
Handle(msg ImmutableMessage)

// Close is called after all messages are handled or handling is interrupted.
Close()
}

var _ Handler = ChanMessageHandler(nil)

// ChanMessageHandler is a handler just forward the message into a channel.
type ChanMessageHandler chan ImmutableMessage

// Handle is the callback for handling message.
func (cmh ChanMessageHandler) Handle(msg ImmutableMessage) {
cmh <- msg
}

// Close is called after all messages are handled or handling is interrupted.
func (cmh ChanMessageHandler) Close() {
close(cmh)
}

// NopCloseHandler is a handler that do nothing when close.
type NopCloseHandler struct {
Handler
}

// Close is called after all messages are handled or handling is interrupted.
func (nch NopCloseHandler) Close() {
}
30 changes: 30 additions & 0 deletions internal/util/logserviceutil/message/message_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package message

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMessageHandler(t *testing.T) {
ch := make(chan ImmutableMessage, 100)
h := ChanMessageHandler(ch)
h.Handle(nil)
assert.Nil(t, <-ch)
h.Close()
_, ok := <-ch
assert.False(t, ok)

ch = make(chan ImmutableMessage, 100)
hNop := NopCloseHandler{
Handler: ChanMessageHandler(ch),
}
hNop.Handle(nil)
assert.Nil(t, <-ch)
hNop.Close()
select {
case <-ch:
panic("should not be closed")
default:
}
}
Loading

0 comments on commit 676b1c9

Please sign in to comment.