From 91b74073917e063e8818dc571c129eb4cd65a148 Mon Sep 17 00:00:00 2001 From: bruce Date: Fri, 25 Oct 2024 11:31:47 +0800 Subject: [PATCH] feat: MessageClient can Pub/Sub binary data MessageClient can Pub/Sub binary data without MessageEnvelope wrapped. Signed-off-by: bruce --- internal/pkg/mqtt/client.go | 84 +++++++++++++++++++++++--------- internal/pkg/nats/client.go | 8 +++ internal/pkg/noopclient.go | 8 +++ internal/pkg/redis/client.go | 7 +++ messaging/interface.go | 6 +++ messaging/mocks/MessageClient.go | 76 ++++++++++++++++++++++++++--- 6 files changed, 158 insertions(+), 31 deletions(-) diff --git a/internal/pkg/mqtt/client.go b/internal/pkg/mqtt/client.go index 65eb20b..3574bb6 100644 --- a/internal/pkg/mqtt/client.go +++ b/internal/pkg/mqtt/client.go @@ -39,6 +39,9 @@ type MessageMarshaller func(v interface{}) ([]byte, error) // MessageUnmarshaller defines the function signature for unmarshaling []byte into structs. type MessageUnmarshaller func(data []byte, v interface{}) error +type MessageHandlerCreator func(unmarshaler MessageUnmarshaller, + messageChannel chan<- types.MessageEnvelope, errorChannel chan<- error) pahoMqtt.MessageHandler + // Client facilitates communication to an MQTT server and provides functionality needed to send and receive MQTT // messages. type Client struct { @@ -159,30 +162,7 @@ func (mc *Client) Publish(message types.MessageEnvelope, topic string) error { // Subscribe creates a subscription for the specified topics. func (mc *Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) error { - optionsReader := mc.mqttClient.OptionsReader() - - mc.subscriptionMutex.Lock() - defer mc.subscriptionMutex.Unlock() - - for _, topic := range topics { - handler := newMessageHandler(mc.unmarshaller, topic.Messages, messageErrors) - qos := optionsReader.WillQos() - - token := mc.mqttClient.Subscribe(topic.Topic, qos, handler) - err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription") - if err != nil { - return err - } - - mc.existingSubscriptions[topic.Topic] = existingSubscription{ - topic: topic.Topic, - qos: qos, - handler: handler, - errors: messageErrors, - } - } - - return nil + return mc.subscribe(topics, messageErrors, newMessageHandler) } // Request publishes a request and waits for a response @@ -342,3 +322,59 @@ func createClientOptions( return clientOptions, nil } + +func (mc *Client) PublishBinaryData(data []byte, topic string) error { + optionsReader := mc.mqttClient.OptionsReader() + return getTokenError( + mc.mqttClient.Publish( + topic, + optionsReader.WillQos(), + optionsReader.WillRetained(), + data), + optionsReader.ConnectTimeout(), + PublishOperation, + "Unable to publish message") +} + +func (mc *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error { + return mc.subscribe(topics, messageErrors, newBinaryDataMessageHandler) +} + +// newBinaryDataMessageHandler creates a function which propagates the received messages to the proper channel. +func newBinaryDataMessageHandler(_ MessageUnmarshaller, + messageChannel chan<- types.MessageEnvelope, + _ chan<- error) pahoMqtt.MessageHandler { + return func(client pahoMqtt.Client, message pahoMqtt.Message) { + // Use MessageEnvelope.Payload to store the binary data instead of unmarshalling binary to MessageEnvelope + messageEnvelope := types.NewMessageEnvelopeForRequest(message.Payload(), nil) + messageEnvelope.ReceivedTopic = message.Topic() + messageChannel <- messageEnvelope + } +} + +func (mc *Client) subscribe(topics []types.TopicChannel, messageErrors chan error, messageHandlerCreator MessageHandlerCreator) error { + optionsReader := mc.mqttClient.OptionsReader() + + mc.subscriptionMutex.Lock() + defer mc.subscriptionMutex.Unlock() + + for _, topic := range topics { + handler := messageHandlerCreator(mc.unmarshaller, topic.Messages, messageErrors) + qos := optionsReader.WillQos() + + token := mc.mqttClient.Subscribe(topic.Topic, qos, handler) + err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription") + if err != nil { + return err + } + + mc.existingSubscriptions[topic.Topic] = existingSubscription{ + topic: topic.Topic, + qos: qos, + handler: handler, + errors: messageErrors, + } + } + + return nil +} diff --git a/internal/pkg/nats/client.go b/internal/pkg/nats/client.go index 6a18de8..36d482b 100644 --- a/internal/pkg/nats/client.go +++ b/internal/pkg/nats/client.go @@ -223,3 +223,11 @@ func (c *Client) Disconnect() error { } return c.connection.Drain() } + +func (c *Client) PublishBinaryData(data []byte, topic string) error { + return fmt.Errorf("not supported PublishBinaryData func") +} + +func (c *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error { + return fmt.Errorf("not supported SubscribeBinaryData func") +} diff --git a/internal/pkg/noopclient.go b/internal/pkg/noopclient.go index 31b5151..fa96e7b 100644 --- a/internal/pkg/noopclient.go +++ b/internal/pkg/noopclient.go @@ -18,6 +18,7 @@ package pkg import ( + "fmt" "time" "github.com/edgexfoundry/go-mod-messaging/v4/pkg/types" @@ -48,3 +49,10 @@ func (n NoopClient) Subscribe(topics []types.TopicChannel, messageErrors chan er func (n NoopClient) Disconnect() error { panic("implement me") } + +func (n NoopClient) PublishBinaryData(data []byte, topic string) error { + return fmt.Errorf("not supported PublishBinaryData func") +} +func (n NoopClient) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error { + return fmt.Errorf("not supported SubscribeBinaryData func") +} diff --git a/internal/pkg/redis/client.go b/internal/pkg/redis/client.go index c8c348b..b40ecd9 100644 --- a/internal/pkg/redis/client.go +++ b/internal/pkg/redis/client.go @@ -311,3 +311,10 @@ func convertFromRedisTopicScheme(topic string) string { return topic } + +func (c Client) PublishBinaryData(data []byte, topic string) error { + return fmt.Errorf("not supported PublishBinaryData func") +} +func (c Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error { + return fmt.Errorf("not supported SubscribeBinaryData func") +} diff --git a/messaging/interface.go b/messaging/interface.go index e8d59ea..31092d9 100644 --- a/messaging/interface.go +++ b/messaging/interface.go @@ -47,6 +47,12 @@ type MessageClient interface { // the timeout period, a timed out error returned. Request(message types.MessageEnvelope, requestTopic string, responseTopicPrefix string, timeout time.Duration) (*types.MessageEnvelope, error) + // PublishBinaryData sends binary data to the message bus + PublishBinaryData(data []byte, topic string) error + + // SubscribeBinaryData receives binary data from the specified topic, and wrap it in MessageEnvelope. + SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error + // Unsubscribe to unsubscribe from the specified topics. Unsubscribe(topics ...string) error diff --git a/messaging/mocks/MessageClient.go b/messaging/mocks/MessageClient.go index 8fe72b3..1373986 100644 --- a/messaging/mocks/MessageClient.go +++ b/messaging/mocks/MessageClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.46.3. DO NOT EDIT. package mocks @@ -19,6 +19,10 @@ type MessageClient struct { func (_m *MessageClient) Connect() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Connect") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -33,6 +37,10 @@ func (_m *MessageClient) Connect() error { func (_m *MessageClient) Disconnect() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Disconnect") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -47,6 +55,10 @@ func (_m *MessageClient) Disconnect() error { func (_m *MessageClient) Publish(message types.MessageEnvelope, topic string) error { ret := _m.Called(message, topic) + if len(ret) == 0 { + panic("no return value specified for Publish") + } + var r0 error if rf, ok := ret.Get(0).(func(types.MessageEnvelope, string) error); ok { r0 = rf(message, topic) @@ -57,11 +69,37 @@ func (_m *MessageClient) Publish(message types.MessageEnvelope, topic string) er return r0 } +// PublishBinaryData provides a mock function with given fields: data, topic +func (_m *MessageClient) PublishBinaryData(data []byte, topic string) error { + ret := _m.Called(data, topic) + + if len(ret) == 0 { + panic("no return value specified for PublishBinaryData") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]byte, string) error); ok { + r0 = rf(data, topic) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Request provides a mock function with given fields: message, requestTopic, responseTopicPrefix, timeout func (_m *MessageClient) Request(message types.MessageEnvelope, requestTopic string, responseTopicPrefix string, timeout time.Duration) (*types.MessageEnvelope, error) { ret := _m.Called(message, requestTopic, responseTopicPrefix, timeout) + if len(ret) == 0 { + panic("no return value specified for Request") + } + var r0 *types.MessageEnvelope + var r1 error + if rf, ok := ret.Get(0).(func(types.MessageEnvelope, string, string, time.Duration) (*types.MessageEnvelope, error)); ok { + return rf(message, requestTopic, responseTopicPrefix, timeout) + } if rf, ok := ret.Get(0).(func(types.MessageEnvelope, string, string, time.Duration) *types.MessageEnvelope); ok { r0 = rf(message, requestTopic, responseTopicPrefix, timeout) } else { @@ -70,7 +108,6 @@ func (_m *MessageClient) Request(message types.MessageEnvelope, requestTopic str } } - var r1 error if rf, ok := ret.Get(1).(func(types.MessageEnvelope, string, string, time.Duration) error); ok { r1 = rf(message, requestTopic, responseTopicPrefix, timeout) } else { @@ -84,6 +121,28 @@ func (_m *MessageClient) Request(message types.MessageEnvelope, requestTopic str func (_m *MessageClient) Subscribe(topics []types.TopicChannel, messageErrors chan error) error { ret := _m.Called(topics, messageErrors) + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]types.TopicChannel, chan error) error); ok { + r0 = rf(topics, messageErrors) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SubscribeBinaryData provides a mock function with given fields: topics, messageErrors +func (_m *MessageClient) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error { + ret := _m.Called(topics, messageErrors) + + if len(ret) == 0 { + panic("no return value specified for SubscribeBinaryData") + } + var r0 error if rf, ok := ret.Get(0).(func([]types.TopicChannel, chan error) error); ok { r0 = rf(topics, messageErrors) @@ -104,6 +163,10 @@ func (_m *MessageClient) Unsubscribe(topics ...string) error { _ca = append(_ca, _va...) ret := _m.Called(_ca...) + if len(ret) == 0 { + panic("no return value specified for Unsubscribe") + } + var r0 error if rf, ok := ret.Get(0).(func(...string) error); ok { r0 = rf(topics...) @@ -114,13 +177,12 @@ func (_m *MessageClient) Unsubscribe(topics ...string) error { return r0 } -type mockConstructorTestingTNewMessageClient interface { +// NewMessageClient creates a new instance of MessageClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMessageClient(t interface { mock.TestingT Cleanup(func()) -} - -// NewMessageClient creates a new instance of MessageClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMessageClient(t mockConstructorTestingTNewMessageClient) *MessageClient { +}) *MessageClient { mock := &MessageClient{} mock.Mock.Test(t)