Skip to content

Commit

Permalink
feat: MessageClient can Pub/Sub binary data
Browse files Browse the repository at this point in the history
MessageClient can Pub/Sub binary data without MessageEnvelope wrapped.

Signed-off-by: bruce <[email protected]>
  • Loading branch information
weichou1229 committed Oct 28, 2024
1 parent 2868ca7 commit 91b7407
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 31 deletions.
84 changes: 60 additions & 24 deletions internal/pkg/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type MessageMarshaller func(v interface{}) ([]byte, error)
// MessageUnmarshaller defines the function signature for unmarshaling []byte into structs.
type MessageUnmarshaller func(data []byte, v interface{}) error

type MessageHandlerCreator func(unmarshaler MessageUnmarshaller,
messageChannel chan<- types.MessageEnvelope, errorChannel chan<- error) pahoMqtt.MessageHandler

// Client facilitates communication to an MQTT server and provides functionality needed to send and receive MQTT
// messages.
type Client struct {
Expand Down Expand Up @@ -159,30 +162,7 @@ func (mc *Client) Publish(message types.MessageEnvelope, topic string) error {

// Subscribe creates a subscription for the specified topics.
func (mc *Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) error {
optionsReader := mc.mqttClient.OptionsReader()

mc.subscriptionMutex.Lock()
defer mc.subscriptionMutex.Unlock()

for _, topic := range topics {
handler := newMessageHandler(mc.unmarshaller, topic.Messages, messageErrors)
qos := optionsReader.WillQos()

token := mc.mqttClient.Subscribe(topic.Topic, qos, handler)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription")
if err != nil {
return err
}

mc.existingSubscriptions[topic.Topic] = existingSubscription{
topic: topic.Topic,
qos: qos,
handler: handler,
errors: messageErrors,
}
}

return nil
return mc.subscribe(topics, messageErrors, newMessageHandler)
}

// Request publishes a request and waits for a response
Expand Down Expand Up @@ -342,3 +322,59 @@ func createClientOptions(

return clientOptions, nil
}

func (mc *Client) PublishBinaryData(data []byte, topic string) error {
optionsReader := mc.mqttClient.OptionsReader()
return getTokenError(
mc.mqttClient.Publish(
topic,
optionsReader.WillQos(),
optionsReader.WillRetained(),
data),
optionsReader.ConnectTimeout(),
PublishOperation,
"Unable to publish message")
}

func (mc *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return mc.subscribe(topics, messageErrors, newBinaryDataMessageHandler)
}

// newBinaryDataMessageHandler creates a function which propagates the received messages to the proper channel.
func newBinaryDataMessageHandler(_ MessageUnmarshaller,
messageChannel chan<- types.MessageEnvelope,
_ chan<- error) pahoMqtt.MessageHandler {
return func(client pahoMqtt.Client, message pahoMqtt.Message) {
// Use MessageEnvelope.Payload to store the binary data instead of unmarshalling binary to MessageEnvelope
messageEnvelope := types.NewMessageEnvelopeForRequest(message.Payload(), nil)
messageEnvelope.ReceivedTopic = message.Topic()
messageChannel <- messageEnvelope
}
}

func (mc *Client) subscribe(topics []types.TopicChannel, messageErrors chan error, messageHandlerCreator MessageHandlerCreator) error {
optionsReader := mc.mqttClient.OptionsReader()

mc.subscriptionMutex.Lock()
defer mc.subscriptionMutex.Unlock()

for _, topic := range topics {
handler := messageHandlerCreator(mc.unmarshaller, topic.Messages, messageErrors)
qos := optionsReader.WillQos()

token := mc.mqttClient.Subscribe(topic.Topic, qos, handler)
err := getTokenError(token, optionsReader.ConnectTimeout(), SubscribeOperation, "Failed to create subscription")
if err != nil {
return err
}

mc.existingSubscriptions[topic.Topic] = existingSubscription{
topic: topic.Topic,
qos: qos,
handler: handler,
errors: messageErrors,
}
}

return nil
}
8 changes: 8 additions & 0 deletions internal/pkg/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,11 @@ func (c *Client) Disconnect() error {
}
return c.connection.Drain()
}

func (c *Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}

func (c *Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
8 changes: 8 additions & 0 deletions internal/pkg/noopclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package pkg

import (
"fmt"
"time"

"github.com/edgexfoundry/go-mod-messaging/v4/pkg/types"
Expand Down Expand Up @@ -48,3 +49,10 @@ func (n NoopClient) Subscribe(topics []types.TopicChannel, messageErrors chan er
func (n NoopClient) Disconnect() error {
panic("implement me")
}

func (n NoopClient) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (n NoopClient) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
7 changes: 7 additions & 0 deletions internal/pkg/redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,10 @@ func convertFromRedisTopicScheme(topic string) string {

return topic
}

func (c Client) PublishBinaryData(data []byte, topic string) error {
return fmt.Errorf("not supported PublishBinaryData func")
}
func (c Client) SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error {
return fmt.Errorf("not supported SubscribeBinaryData func")
}
6 changes: 6 additions & 0 deletions messaging/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type MessageClient interface {
// the timeout period, a timed out error returned.
Request(message types.MessageEnvelope, requestTopic string, responseTopicPrefix string, timeout time.Duration) (*types.MessageEnvelope, error)

// PublishBinaryData sends binary data to the message bus
PublishBinaryData(data []byte, topic string) error

// SubscribeBinaryData receives binary data from the specified topic, and wrap it in MessageEnvelope.
SubscribeBinaryData(topics []types.TopicChannel, messageErrors chan error) error

// Unsubscribe to unsubscribe from the specified topics.
Unsubscribe(topics ...string) error

Expand Down
76 changes: 69 additions & 7 deletions messaging/mocks/MessageClient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 91b7407

Please sign in to comment.