diff --git a/message/message.go b/message/message.go index f496536b8..410f5e3a7 100644 --- a/message/message.go +++ b/message/message.go @@ -17,8 +17,13 @@ type Payload []byte // Message is the basic transfer unit. // Messages are emitted by Publishers and received by Subscribers. +// +// A publisher can modify the message during publishing, e.g. can alter the metadata. +// Avoid modifying the message in parallel with publishing, as it can lead to data races. +// In general, a message should be passed to a single Publish and then considered immutable. +// If needed, use the Copy method to create a new message. type Message struct { - // UUID is a unique identifier of message. + // UUID is a unique identifier of the message. // // It is only used by Watermill for debugging. // UUID can be empty. @@ -35,9 +40,9 @@ type Message struct { // Payload is the message's payload. Payload Payload - // ack is closed, when acknowledge is received. + // ack is closed when acknowledge is received. ack chan struct{} - // noACk is closed, when negative acknowledge is received. + // noACk is closed when negative acknowledge is received. noAck chan struct{} ackMutex sync.Mutex diff --git a/message/pubsub.go b/message/pubsub.go index 639b05b86..258e1767e 100644 --- a/message/pubsub.go +++ b/message/pubsub.go @@ -6,32 +6,35 @@ import ( // Publisher is the emitting part of a Pub/Sub. type Publisher interface { - // Publish publishes provided messages to given topic. + // Publish publishes provided messages to the given topic. // // Publish can be synchronous or asynchronous - it depends on the implementation. // - // Most publishers implementations don't support atomic publishing of messages. + // Most publisher implementations don't support atomic publishing of messages. // This means that if publishing one of the messages fails, the next messages will not be published. // + // Publish does not work with a single Context. + // Use the Context() method of each message instead. + // // Publish must be thread safe. Publish(topic string, messages ...*Message) error - // Close should flush unsent messages, if publisher is async. + // Close should flush unsent messages if publisher is async. Close() error } // Subscriber is the consuming part of the Pub/Sub. type Subscriber interface { - // Subscribe returns output channel with messages from provided topic. - // Channel is closed, when Close() was called on the subscriber. + // Subscribe returns an output channel with messages from the provided topic. + // The channel is closed after Close() is called on the subscriber. // // To receive the next message, `Ack()` must be called on the received message. - // If message processing failed and message should be redelivered `Nack()` should be called. + // If message processing fails and the message should be redelivered `Nack()` should be called instead. // - // When provided ctx is cancelled, subscriber will close subscribe and close output channel. - // Provided ctx is set to all produced messages. - // When Nack or Ack is called on the message, context of the message is canceled. + // When the provided ctx is canceled, the subscriber closes the subscription and the output channel. + // The provided ctx is passed to all produced messages. + // When Nack or Ack is called on the message, the context of the message is canceled. Subscribe(ctx context.Context, topic string) (<-chan *Message, error) - // Close closes all subscriptions with their output channels and flush offsets etc. when needed. + // Close closes all subscriptions with their output channels and flushes offsets etc. when needed. Close() error } diff --git a/message/router_test.go b/message/router_test.go index 995c9a91f..017ad393e 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -1356,7 +1356,7 @@ func TestRouter_context_cancel_does_not_log_error(t *testing.T) { require.Eventually(t, func() bool { return r.IsClosed() - }, 1*time.Second, 1*time.Millisecond, "Router should be closed after all handlers are stopped") + }, 3*time.Second, 5*time.Millisecond, "Router should be closed after all handlers are stopped") assert.Empty(t, logger.Captured()[watermill.ErrorLogLevel], "No error should be logged when context is canceled") } diff --git a/pubsub/tests/test_pubsub.go b/pubsub/tests/test_pubsub.go index 4defb7633..809a2f65c 100644 --- a/pubsub/tests/test_pubsub.go +++ b/pubsub/tests/test_pubsub.go @@ -334,6 +334,12 @@ func TestConcurrentSubscribeMultipleTopics( for i := 0; i < topicsCount; i++ { topicName := testTopicName(tCtx.TestID) + fmt.Sprintf("-%d", i) + var messagesToPublishForTopic []*message.Message + for _, msg := range messagesToPublish { + newMsg := msg.Copy() + messagesToPublishForTopic = append(messagesToPublishForTopic, newMsg) + } + go func() { defer subsWg.Done() @@ -344,7 +350,7 @@ func TestConcurrentSubscribeMultipleTopics( } } - err := publishWithRetry(pub, topicName, messagesToPublish...) + err := publishWithRetry(pub, topicName, messagesToPublishForTopic...) if err != nil { t.Error(err) } @@ -353,7 +359,7 @@ func TestConcurrentSubscribeMultipleTopics( if err != nil { t.Error(err) } - topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublish), defaultTimeout*5) + topicMessages, _ := bulkRead(tCtx, messages, len(messagesToPublishForTopic), defaultTimeout*5) receivedMessagesCh <- topicMessages }() @@ -1270,7 +1276,7 @@ func AddSimpleMessagesParallel(t *testing.T, messagesCount int, publisher messag for i := 0; i < publishers; i++ { go func() { for msg := range publishMsg { - err := publishWithRetry(publisher, topicName, msg) + err := publishWithRetry(publisher, topicName, msg.Copy()) require.NoError(t, err, "cannot publish messages") wg.Done() }