From eda0b07371bf9b47a4c12f114a3e4f09e5151947 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 24 Nov 2020 16:53:59 +0800 Subject: [PATCH 1/6] add key based batcher --- pulsar/batcher_builder.go | 41 +++++ pulsar/consumer_test.go | 154 ++++++++++++++++ pulsar/internal/batch_builder.go | 69 ++++++-- pulsar/internal/batch_builder_test.go | 18 ++ pulsar/internal/commands_test.go | 12 +- pulsar/internal/key_based_batch_builder.go | 194 +++++++++++++++++++++ pulsar/producer.go | 2 + pulsar/producer_partition.go | 69 ++++++-- 8 files changed, 525 insertions(+), 34 deletions(-) create mode 100644 pulsar/batcher_builder.go create mode 100644 pulsar/internal/batch_builder_test.go create mode 100644 pulsar/internal/key_based_batch_builder.go diff --git a/pulsar/batcher_builder.go b/pulsar/batcher_builder.go new file mode 100644 index 0000000000..63e8f90bc5 --- /dev/null +++ b/pulsar/batcher_builder.go @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "errors" + "github.com/apache/pulsar-client-go/pulsar/internal" +) + +type BatcherBuilderType int + +const ( + DefaultBatchBuilder BatcherBuilderType = iota + KeyBasedBatchBuilder +) + +func GetBatcherBuilderProvider(typ BatcherBuilderType) (internal.BatcherBuilderProvider, error) { + switch typ { + case DefaultBatchBuilder: + return internal.NewBatchBuilder, nil + case KeyBasedBatchBuilder: + return internal.NewKeyBasedBatchBuilder, nil + default: + return nil, errors.New("unsupported batcher builder provider type") + } +} diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f31bb690f3..dcbef0971f 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1712,3 +1712,157 @@ func TestConsumerName(t *testing.T) { assert.Equal(consumerName, consumer.Name()) } + +func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { + const MsgBatchCount = 100 + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/test-key-based-batch-with-key-shared" + + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Type: KeyShared, + }) + assert.Nil(t, err) + defer consumer1.Close() + + consumer2, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Type: KeyShared, + }) + assert.Nil(t, err) + defer consumer2.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + BatcherBuilderType: KeyBasedBatchBuilder, + BatchingMaxMessages: 10, + }) + assert.Nil(t, err) + defer producer.Close() + + ctx := context.Background() + keys := []string{"key1", "key2", "key3"} + for i := 0; i < MsgBatchCount; i++ { + for _, k := range keys { + producer.SendAsync(ctx, &ProducerMessage{ + Key: k, + Payload: []byte(fmt.Sprintf("value-%d", i)), + }, func(id MessageID, producerMessage *ProducerMessage, err error) { + assert.Nil(t, err) + }) + } + } + + receivedConsumer1 := 0 + receivedConsumer2 := 0 + consumer1Keys := make(map[string]int, 0) + consumer2Keys := make(map[string]int, 0) + for (receivedConsumer1 + receivedConsumer2) < 300 { + select { + case cm, ok := <-consumer1.Chan(): + if !ok { + break + } + receivedConsumer1++ + if cnt, has := consumer1Keys[cm.Key()]; !has { + consumer1Keys[cm.Key()] = 1 + } else { + consumer1Keys[cm.Key()] = cnt + 1 + } + consumer1.Ack(cm.Message) + case cm, ok := <-consumer2.Chan(): + if !ok { + break + } + receivedConsumer2++ + if cnt, has := consumer2Keys[cm.Key()]; !has { + consumer2Keys[cm.Key()] = 1 + } else { + consumer2Keys[cm.Key()] = cnt + 1 + } + consumer2.Ack(cm.Message) + } + } + + assert.NotEqual(t, 0, receivedConsumer1) + assert.NotEqual(t, 0, receivedConsumer2) + assert.Equal(t, len(consumer1Keys)*MsgBatchCount, receivedConsumer1) + assert.Equal(t, len(consumer2Keys)*MsgBatchCount, receivedConsumer2) + + fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages consumer1: %d consumser2: %d\n", + receivedConsumer1, receivedConsumer2) + assert.Equal(t, 300, receivedConsumer1+receivedConsumer2) + + fmt.Printf("TestKeyBasedBatchProducerConsumerKeyShared received messages keys consumer1: %v consumser2: %v\n", + consumer1Keys, consumer2Keys) +} + +func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { + const MsgBatchCount = 10 + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/test-ordering-of-key-based-batch-with-key-shared" + + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "sub-1", + Type: KeyShared, + }) + assert.Nil(t, err) + defer consumer1.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + BatcherBuilderType: KeyBasedBatchBuilder, + BatchingMaxMessages: 30, + BatchingMaxPublishDelay: time.Second * 5, + }) + assert.Nil(t, err) + defer producer.Close() + + ctx := context.Background() + keys := []string{"key1", "key2", "key3"} + for i := 0; i < MsgBatchCount; i++ { + for _, k := range keys { + producer.SendAsync(ctx, &ProducerMessage{ + Key: k, + Payload: []byte(fmt.Sprintf("value-%d", i)), + }, func(id MessageID, producerMessage *ProducerMessage, err error) { + assert.Nil(t, err) + }) + } + } + + var receivedKey string + var receivedMessageIndex int + for i := 0; i < len(keys)*MsgBatchCount; i += 1 { + cm, ok := <-consumer1.Chan() + if !ok { + break + } + if receivedKey != cm.Key() { + receivedKey = cm.Key() + receivedMessageIndex = 0 + } + assert.Equal(t, fmt.Sprintf("value-%d", receivedMessageIndex%10), string(cm.Payload())) + consumer1.Ack(cm.Message) + receivedMessageIndex += 1 + } + + // TODO: add OrderingKey support +} diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index ecf2b88f86..03e45d5928 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -31,8 +31,23 @@ type BuffersPool interface { GetBuffer() Buffer } -// BatchBuilder wraps the objects needed to build a batch. -type BatchBuilder struct { +type BatcherBuilderProvider func(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, + compressionType pb.CompressionType, level compression.Level, + bufferPool BuffersPool, logger log.Logger) (BatchBuilder, error) + +type BatchBuilder interface { + IsFull() bool + Add(metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time) bool + Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) + FlushBatches() (batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}) + reset() + Close() error + IsMultiBatches() bool +} + +// batchContainer wraps the objects needed to a batch. +type batchContainer struct { buffer Buffer // Current number of messages in the batch @@ -41,7 +56,7 @@ type BatchBuilder struct { // Max number of message allowed in the batch maxMessages uint - // The largest size for a batch sent from this praticular producer. + // The largest size for a batch sent from this particular producer. // This is used as a baseline to allocate a new buffer that can hold the entire batch // without needing costly re-allocations. maxBatchSize uint @@ -59,12 +74,19 @@ type BatchBuilder struct { log log.Logger } -// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. -func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, +func (bb *batchContainer) FlushBatches() (batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}) { + panic("single batch container not support FlushBatches(), please use Flush() instead") +} + +func (bb *batchContainer) IsMultiBatches() bool { + return false +} + +func newBatchContainer(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger) (*BatchBuilder, error) { + bufferPool BuffersPool, logger log.Logger) batchContainer { - bb := &BatchBuilder{ + bc := batchContainer{ buffer: NewBuffer(4096), numMessages: 0, maxMessages: maxMessages, @@ -85,24 +107,34 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p } if compressionType != pb.CompressionType_NONE { - bb.msgMetadata.Compression = &compressionType + bc.msgMetadata.Compression = &compressionType } - return bb, nil + return bc +} + +// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. +func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, + compressionType pb.CompressionType, level compression.Level, + bufferPool BuffersPool, logger log.Logger) (BatchBuilder, error) { + + bc := newBatchContainer(maxMessages, maxBatchSize, producerName, producerID, compressionType, level, bufferPool, logger) + + return &bc, nil } // IsFull check if the size in the current batch exceeds the maximum size allowed by the batch -func (bb *BatchBuilder) IsFull() bool { +func (bb *batchContainer) IsFull() bool { return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > uint32(bb.maxBatchSize) } -func (bb *BatchBuilder) hasSpace(payload []byte) bool { +func (bb *batchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > uint32(bb.maxBatchSize) } // Add will add single message to batch. -func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte, +func (bb *batchContainer) Add(metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time) bool { if replicateTo != nil && bb.numMessages != 0 { // If the current batch is not empty and we're trying to set the replication clusters, @@ -118,9 +150,14 @@ func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint6 } if bb.numMessages == 0 { + var sequenceID uint64 + if metadata.SequenceId != nil { + sequenceID = uint64(*metadata.SequenceId) + } else { + sequenceID = GetAndAdd(sequenceIDGenerator, 1) + } bb.msgMetadata.SequenceId = proto.Uint64(sequenceID) bb.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now())) - bb.msgMetadata.SequenceId = proto.Uint64(sequenceID) bb.msgMetadata.ProducerName = &bb.producerName bb.msgMetadata.ReplicateTo = replicateTo bb.msgMetadata.PartitionKey = metadata.PartitionKey @@ -138,7 +175,7 @@ func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint6 return true } -func (bb *BatchBuilder) reset() { +func (bb *batchContainer) reset() { bb.numMessages = 0 bb.buffer.Clear() bb.callbacks = []interface{}{} @@ -147,7 +184,7 @@ func (bb *BatchBuilder) reset() { } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. -func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) { +func (bb *batchContainer) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) { if bb.numMessages == 0 { // No-Op for empty batch return nil, 0, nil @@ -172,7 +209,7 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks return buffer, sequenceID, callbacks } -func (bb *BatchBuilder) Close() error { +func (bb *batchContainer) Close() error { return bb.compressionProvider.Close() } diff --git a/pulsar/internal/batch_builder_test.go b/pulsar/internal/batch_builder_test.go new file mode 100644 index 0000000000..89c1144cff --- /dev/null +++ b/pulsar/internal/batch_builder_test.go @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go index b43335a573..b182c35b96 100644 --- a/pulsar/internal/commands_test.go +++ b/pulsar/internal/commands_test.go @@ -26,7 +26,7 @@ import ( func TestConvertStringMap(t *testing.T) { m := make(map[string]string) m["a"] = "1" - m["b"] = "2" + m["containers"] = "2" pbm := ConvertFromStringMap(m) @@ -35,7 +35,7 @@ func TestConvertStringMap(t *testing.T) { m2 := ConvertToStringMap(pbm) assert.Equal(t, 2, len(m2)) assert.Equal(t, "1", m2["a"]) - assert.Equal(t, "2", m2["b"]) + assert.Equal(t, "2", m2["containers"]) } func TestReadMessageMetadata(t *testing.T) { @@ -50,7 +50,7 @@ func TestReadMessageMetadata(t *testing.T) { assert.Equal(t, len(props), 2) assert.Equal(t, "a", props[0].GetKey()) assert.Equal(t, "1", props[0].GetValue()) - assert.Equal(t, "b", props[1].GetKey()) + assert.Equal(t, "containers", props[1].GetKey()) assert.Equal(t, "2", props[1].GetValue()) // read message with batch of 1 @@ -132,7 +132,7 @@ func TestReadMessagesBatchSize10(t *testing.T) { } // Raw single message in old format -// metadata properties: properties: +// metadata properties: properties: // payload = "hello" var rawCompatSingleMessage = []byte{ 0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00, @@ -146,7 +146,7 @@ var rawCompatSingleMessage = []byte{ } // Message with batch of 1 -// singe message metadata properties: properties: +// singe message metadata properties: properties: // payload = "hello" var rawBatchMessage1 = []byte{ 0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00, @@ -161,7 +161,7 @@ var rawBatchMessage1 = []byte{ } // Message with batch of 10 -// singe message metadata properties: properties: +// singe message metadata properties: properties: // payload = "hello" var rawBatchMessage10 = []byte{ 0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08, diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go new file mode 100644 index 0000000000..4f6a25f009 --- /dev/null +++ b/pulsar/internal/key_based_batch_builder.go @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "encoding/base64" + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" + "sort" + "sync" + "time" +) + +type keyBasedBatches struct { + containers map[string]*batchContainer + l *sync.RWMutex +} + +type keyBasedBatchContainer struct { + batches keyBasedBatches + batchContainer + compressionType pb.CompressionType + level compression.Level +} + +func newKeyBasedBatches() keyBasedBatches { + return keyBasedBatches{ + containers: map[string]*batchContainer{}, + l: &sync.RWMutex{}, + } +} + +func (h *keyBasedBatches) Add(key string, val *batchContainer) { + h.l.Lock() + defer h.l.Unlock() + h.containers[key] = val +} + +func (h *keyBasedBatches) Del(key string) { + h.l.Lock() + defer h.l.Unlock() + delete(h.containers, key) +} + +func (h *keyBasedBatches) Val(key string) *batchContainer { + h.l.RLock() + defer h.l.RUnlock() + return h.containers[key] +} + +func NewKeyBasedBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, + compressionType pb.CompressionType, level compression.Level, + bufferPool BuffersPool, logger log.Logger) (BatchBuilder, error) { + + bb := &keyBasedBatchContainer{ + batches: newKeyBasedBatches(), + batchContainer: newBatchContainer(maxMessages, maxBatchSize, producerName, producerID, compressionType, level, bufferPool, logger), + compressionType: compressionType, + level: level, + } + + if compressionType != pb.CompressionType_NONE { + bb.msgMetadata.Compression = &compressionType + } + + return bb, nil +} + +// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch +func (bc *keyBasedBatchContainer) IsFull() bool { + return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize) +} + +func (bc *keyBasedBatchContainer) IsMultiBatches() bool { + return true +} + +func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool { + msgSize := uint32(len(payload)) + return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize) +} + +// Add will add single message to batch. +func (bc *keyBasedBatchContainer) Add(metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time) bool { + if replicateTo != nil && bc.numMessages != 0 { + // If the current batch is not empty and we're trying to set the replication clusters, + // then we need to force the current batch to flush and send the message individually + return false + } else if bc.msgMetadata.ReplicateTo != nil { + // There's already a message with cluster replication list. need to flush before next + // message can be sent + return false + } else if bc.hasSpace(payload) { + // The current batch is full. Producer has to call Flush() to + return false + } + + var msgKey = getMessageKey(metadata) + batchPart := bc.batches.Val(msgKey) + if batchPart == nil { + t := newBatchContainer(bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID, bc.compressionType, bc.level, bc.buffersPool, bc.log) + batchPart = &t + bc.batches.Add(msgKey, &t) + } + + batchPart.Add(metadata, sequenceIDGenerator, payload, callback, replicateTo, deliverAt) + addSingleMessageToBatch(bc.buffer, metadata, payload) + + bc.numMessages++ + bc.callbacks = append(bc.callbacks, callback) + return true +} + +func (bc *keyBasedBatchContainer) reset() { + bc.batches.l.RLock() + defer bc.batches.l.RUnlock() + for _, container := range bc.batches.containers { + container.reset() + } + bc.numMessages = 0 + bc.buffer.Clear() + bc.callbacks = []interface{}{} + bc.msgMetadata.ReplicateTo = nil + bc.msgMetadata.DeliverAtTime = nil + bc.batches.containers = map[string]*batchContainer{} +} + +func (bc *keyBasedBatchContainer) FlushBatches() (batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}) { + if bc.numMessages == 0 { + // No-Op for empty batch + return nil, nil, nil + } + + bc.log.Debug("keyBasedBatchContainer flush: messages: ", bc.numMessages) + var batchesLen = len(bc.batches.containers) + var idx = 0 + sortedKeys := make([]string, 0, batchesLen) + + batchesData = make([]Buffer, batchesLen) + sequenceIDs = make([]uint64, batchesLen) + callbacks = make([][]interface{}, batchesLen) + + bc.batches.l.RLock() + defer bc.batches.l.RUnlock() + for k := range bc.batches.containers { + sortedKeys = append(sortedKeys, k) + } + sort.Strings(sortedKeys) + for _, k := range sortedKeys { + container := bc.batches.containers[k] + b, s, c := container.Flush() + if b != nil { + batchesData[idx] = b + sequenceIDs[idx] = s + callbacks[idx] = c + } + idx += 1 + } + + bc.reset() + return batchesData, sequenceIDs, callbacks +} + +func (bc *keyBasedBatchContainer) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) { + panic("multi batches container not support Flush(), please use FlushBatches() instead") +} + +func (bc *keyBasedBatchContainer) Close() error { + return bc.compressionProvider.Close() +} + +func getMessageKey(metadata *pb.SingleMessageMetadata) string { + if k := metadata.GetOrderingKey(); k != nil { + return base64.StdEncoding.EncodeToString(k) + } + return metadata.GetPartitionKey() +} diff --git a/pulsar/producer.go b/pulsar/producer.go index 1dc0775b0e..66c7d0e58c 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -152,6 +152,8 @@ type ProducerOptions struct { // MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint + + BatcherBuilderType } // Producer is used to publish messages on a topic diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index bea73aa0fc..f8718c78af 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -108,7 +108,7 @@ type partitionProducer struct { options *ProducerOptions producerName string producerID uint64 - batchBuilder *internal.BatchBuilder + batchBuilder internal.BatchBuilder sequenceIDGenerator *uint64 batchFlushTicker *time.Ticker @@ -235,8 +235,23 @@ func (p *partitionProducer) grabCnx() error { } p.producerName = res.Response.ProducerSuccess.GetProducerName() - if p.batchBuilder == nil { - p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, + if p.options.DisableBatching { + provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder) + p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, + p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), + compression.Level(p.options.CompressionLevel), + p, + p.log) + if err != nil { + return err + } + } else if p.batchBuilder == nil { + provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType) + if err != nil { + provider, _ = GetBatcherBuilderProvider(DefaultBatchBuilder) + } + + p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), compression.Level(p.options.CompressionLevel), p, @@ -331,7 +346,11 @@ func (p *partitionProducer) runEventsLoop() { case <-p.connectClosedCh: p.reconnectToBroker() case <-p.batchFlushTicker.C: - p.internalFlushCurrentBatch() + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + } else { + p.internalFlushCurrentBatch() + } } } } @@ -400,29 +419,27 @@ func (p *partitionProducer) internalSend(request *sendRequest) { smm.Properties = internal.ConvertFromStringMap(msg.Properties) } - var sequenceID uint64 if msg.SequenceID != nil { + var sequenceID uint64 sequenceID = uint64(*msg.SequenceID) - } else { - sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1) + smm.SequenceId = proto.Uint64(sequenceID) } if !sendAsBatch { p.internalFlushCurrentBatch() } - added := p.batchBuilder.Add(smm, sequenceID, payload, request, + added := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request, msg.ReplicationClusters, deliverAt) if !added { // The current batch is full.. flush it and retry p.internalFlushCurrentBatch() // after flushing try again to add the current payload - if ok := p.batchBuilder.Add(smm, sequenceID, payload, request, + if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request, msg.ReplicationClusters, deliverAt); !ok { p.publishSemaphore.Release() request.callback(nil, request.msg, errFailAddBatch) p.log.WithField("size", len(payload)). - WithField("sequenceID", sequenceID). WithField("properties", msg.Properties). Error("unable to add message to batch") return @@ -430,7 +447,11 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } if !sendAsBatch || request.flushImmediately { - p.internalFlushCurrentBatch() + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + } else { + p.internalFlushCurrentBatch() + } } } @@ -506,8 +527,32 @@ func (p *partitionProducer) failTimeoutMessages() { } } +func (p *partitionProducer) internalFlushCurrentBatches() { + batchesData, sequenceIDs, callbacks := p.batchBuilder.FlushBatches() + if batchesData == nil { + return + } + + for i := range batchesData { + if batchesData[i] == nil { + continue + } + p.pendingQueue.Put(&pendingItem{ + batchData: batchesData[i], + sequenceID: sequenceIDs[i], + sendRequests: callbacks[i], + }) + p.cnx.WriteData(batchesData[i]) + } + +} + func (p *partitionProducer) internalFlush(fr *flushRequest) { - p.internalFlushCurrentBatch() + if p.batchBuilder.IsMultiBatches() { + p.internalFlushCurrentBatches() + } else { + p.internalFlushCurrentBatch() + } pi, ok := p.pendingQueue.PeekLast().(*pendingItem) if !ok { From 7c2f98054ac5fc9820fccc842e91e06af3fd5aa7 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 24 Nov 2020 17:11:33 +0800 Subject: [PATCH 2/6] lint --- pulsar/batcher_builder.go | 5 +- pulsar/consumer_test.go | 19 +++--- pulsar/internal/batch_builder.go | 67 +++++++++++++++------- pulsar/internal/key_based_batch_builder.go | 47 ++++++++++----- pulsar/producer_partition.go | 3 +- 5 files changed, 97 insertions(+), 44 deletions(-) diff --git a/pulsar/batcher_builder.go b/pulsar/batcher_builder.go index 63e8f90bc5..caefa8d463 100644 --- a/pulsar/batcher_builder.go +++ b/pulsar/batcher_builder.go @@ -19,6 +19,7 @@ package pulsar import ( "errors" + "github.com/apache/pulsar-client-go/pulsar/internal" ) @@ -29,7 +30,9 @@ const ( KeyBasedBatchBuilder ) -func GetBatcherBuilderProvider(typ BatcherBuilderType) (internal.BatcherBuilderProvider, error) { +func GetBatcherBuilderProvider(typ BatcherBuilderType) ( + internal.BatcherBuilderProvider, error, +) { switch typ { case DefaultBatchBuilder: return internal.NewBatchBuilder, nil diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index dcbef0971f..f499b913c9 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1758,14 +1758,15 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { assert.Nil(t, err) - }) + }, + ) } } receivedConsumer1 := 0 receivedConsumer2 := 0 - consumer1Keys := make(map[string]int, 0) - consumer2Keys := make(map[string]int, 0) + consumer1Keys := make(map[string]int) + consumer2Keys := make(map[string]int) for (receivedConsumer1 + receivedConsumer2) < 300 { select { case cm, ok := <-consumer1.Chan(): @@ -1844,13 +1845,14 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { Payload: []byte(fmt.Sprintf("value-%d", i)), }, func(id MessageID, producerMessage *ProducerMessage, err error) { assert.Nil(t, err) - }) + }, + ) } } var receivedKey string var receivedMessageIndex int - for i := 0; i < len(keys)*MsgBatchCount; i += 1 { + for i := 0; i < len(keys)*MsgBatchCount; i++ { cm, ok := <-consumer1.Chan() if !ok { break @@ -1859,9 +1861,12 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { receivedKey = cm.Key() receivedMessageIndex = 0 } - assert.Equal(t, fmt.Sprintf("value-%d", receivedMessageIndex%10), string(cm.Payload())) + assert.Equal( + t, fmt.Sprintf("value-%d", receivedMessageIndex%10), + string(cm.Payload()), + ) consumer1.Ack(cm.Message) - receivedMessageIndex += 1 + receivedMessageIndex++ } // TODO: add OrderingKey support diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 03e45d5928..5a77cdeba6 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -31,16 +31,23 @@ type BuffersPool interface { GetBuffer() Buffer } -type BatcherBuilderProvider func(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, +type BatcherBuilderProvider func( + maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger) (BatchBuilder, error) + bufferPool BuffersPool, logger log.Logger, +) (BatchBuilder, error) type BatchBuilder interface { IsFull() bool - Add(metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, - callback interface{}, replicateTo []string, deliverAt time.Time) bool + Add( + metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, + payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time, + ) bool Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) - FlushBatches() (batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}) + FlushBatches() ( + batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, + ) reset() Close() error IsMultiBatches() bool @@ -74,7 +81,9 @@ type batchContainer struct { log log.Logger } -func (bb *batchContainer) FlushBatches() (batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}) { +func (bb *batchContainer) FlushBatches() ( + batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, +) { panic("single batch container not support FlushBatches(), please use Flush() instead") } @@ -82,9 +91,11 @@ func (bb *batchContainer) IsMultiBatches() bool { return false } -func newBatchContainer(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, +func newBatchContainer( + maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger) batchContainer { + bufferPool BuffersPool, logger log.Logger, +) batchContainer { bc := batchContainer{ buffer: NewBuffer(4096), @@ -93,10 +104,12 @@ func newBatchContainer(maxMessages uint, maxBatchSize uint, producerName string, maxBatchSize: maxBatchSize, producerName: producerName, producerID: producerID, - cmdSend: baseCommand(pb.BaseCommand_SEND, + cmdSend: baseCommand( + pb.BaseCommand_SEND, &pb.CommandSend{ ProducerId: &producerID, - }), + }, + ), msgMetadata: &pb.MessageMetadata{ ProducerName: &producerName, }, @@ -114,11 +127,16 @@ func newBatchContainer(maxMessages uint, maxBatchSize uint, producerName string, } // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. -func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, +func NewBatchBuilder( + maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger) (BatchBuilder, error) { + bufferPool BuffersPool, logger log.Logger, +) (BatchBuilder, error) { - bc := newBatchContainer(maxMessages, maxBatchSize, producerName, producerID, compressionType, level, bufferPool, logger) + bc := newBatchContainer( + maxMessages, maxBatchSize, producerName, producerID, compressionType, + level, bufferPool, logger, + ) return &bc, nil } @@ -134,8 +152,11 @@ func (bb *batchContainer) hasSpace(payload []byte) bool { } // Add will add single message to batch. -func (bb *batchContainer) Add(metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, - callback interface{}, replicateTo []string, deliverAt time.Time) bool { +func (bb *batchContainer) Add( + metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, + payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time, +) bool { if replicateTo != nil && bb.numMessages != 0 { // If the current batch is not empty and we're trying to set the replication clusters, // then we need to force the current batch to flush and send the message individually @@ -152,7 +173,7 @@ func (bb *batchContainer) Add(metadata *pb.SingleMessageMetadata, sequenceIDGene if bb.numMessages == 0 { var sequenceID uint64 if metadata.SequenceId != nil { - sequenceID = uint64(*metadata.SequenceId) + sequenceID = *metadata.SequenceId } else { sequenceID = GetAndAdd(sequenceIDGenerator, 1) } @@ -184,7 +205,9 @@ func (bb *batchContainer) reset() { } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. -func (bb *batchContainer) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) { +func (bb *batchContainer) Flush() ( + batchData Buffer, sequenceID uint64, callbacks []interface{}, +) { if bb.numMessages == 0 { // No-Op for empty batch return nil, 0, nil @@ -201,7 +224,9 @@ func (bb *batchContainer) Flush() (batchData Buffer, sequenceID uint64, callback if buffer == nil { buffer = NewBuffer(int(uncompressedSize * 3 / 2)) } - serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider) + serializeBatch( + buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider, + ) callbacks = bb.callbacks sequenceID = bb.cmdSend.Send.GetSequenceId() @@ -213,8 +238,10 @@ func (bb *batchContainer) Close() error { return bb.compressionProvider.Close() } -func getCompressionProvider(compressionType pb.CompressionType, - level compression.Level) compression.Provider { +func getCompressionProvider( + compressionType pb.CompressionType, + level compression.Level, +) compression.Provider { switch compressionType { case pb.CompressionType_NONE: return compression.NewNoopProvider() diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 4f6a25f009..6f91658a2f 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -19,12 +19,13 @@ package internal import ( "encoding/base64" - "github.com/apache/pulsar-client-go/pulsar/internal/compression" - pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" - "github.com/apache/pulsar-client-go/pulsar/log" "sort" "sync" "time" + + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" ) type keyBasedBatches struct { @@ -64,13 +65,18 @@ func (h *keyBasedBatches) Val(key string) *batchContainer { return h.containers[key] } -func NewKeyBasedBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, +func NewKeyBasedBatchBuilder( + maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, - bufferPool BuffersPool, logger log.Logger) (BatchBuilder, error) { + bufferPool BuffersPool, logger log.Logger, +) (BatchBuilder, error) { bb := &keyBasedBatchContainer{ - batches: newKeyBasedBatches(), - batchContainer: newBatchContainer(maxMessages, maxBatchSize, producerName, producerID, compressionType, level, bufferPool, logger), + batches: newKeyBasedBatches(), + batchContainer: newBatchContainer( + maxMessages, maxBatchSize, producerName, producerID, + compressionType, level, bufferPool, logger, + ), compressionType: compressionType, level: level, } @@ -97,8 +103,11 @@ func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool { } // Add will add single message to batch. -func (bc *keyBasedBatchContainer) Add(metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, - callback interface{}, replicateTo []string, deliverAt time.Time) bool { +func (bc *keyBasedBatchContainer) Add( + metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, + payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time, +) bool { if replicateTo != nil && bc.numMessages != 0 { // If the current batch is not empty and we're trying to set the replication clusters, // then we need to force the current batch to flush and send the message individually @@ -115,12 +124,18 @@ func (bc *keyBasedBatchContainer) Add(metadata *pb.SingleMessageMetadata, sequen var msgKey = getMessageKey(metadata) batchPart := bc.batches.Val(msgKey) if batchPart == nil { - t := newBatchContainer(bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID, bc.compressionType, bc.level, bc.buffersPool, bc.log) + t := newBatchContainer( + bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID, + bc.compressionType, bc.level, bc.buffersPool, bc.log, + ) batchPart = &t bc.batches.Add(msgKey, &t) } - batchPart.Add(metadata, sequenceIDGenerator, payload, callback, replicateTo, deliverAt) + batchPart.Add( + metadata, sequenceIDGenerator, payload, callback, replicateTo, + deliverAt, + ) addSingleMessageToBatch(bc.buffer, metadata, payload) bc.numMessages++ @@ -142,7 +157,9 @@ func (bc *keyBasedBatchContainer) reset() { bc.batches.containers = map[string]*batchContainer{} } -func (bc *keyBasedBatchContainer) FlushBatches() (batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}) { +func (bc *keyBasedBatchContainer) FlushBatches() ( + batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, +) { if bc.numMessages == 0 { // No-Op for empty batch return nil, nil, nil @@ -171,14 +188,16 @@ func (bc *keyBasedBatchContainer) FlushBatches() (batchesData []Buffer, sequence sequenceIDs[idx] = s callbacks[idx] = c } - idx += 1 + idx++ } bc.reset() return batchesData, sequenceIDs, callbacks } -func (bc *keyBasedBatchContainer) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) { +func (bc *keyBasedBatchContainer) Flush() ( + batchData Buffer, sequenceID uint64, callbacks []interface{}, +) { panic("multi batches container not support Flush(), please use FlushBatches() instead") } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f8718c78af..bb528c2094 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -420,8 +420,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } if msg.SequenceID != nil { - var sequenceID uint64 - sequenceID = uint64(*msg.SequenceID) + sequenceID := uint64(*msg.SequenceID) smm.SequenceId = proto.Uint64(sequenceID) } From 36c0f10123c5864f326999d2777b2d92c3ed22f0 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 24 Nov 2020 17:26:54 +0800 Subject: [PATCH 3/6] revert --- go.mod | 2 ++ pulsar/internal/commands_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index bf0b627d0a..817e2239a0 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/klauspost/compress v1.10.8 github.com/kr/pretty v0.2.0 // indirect github.com/linkedin/goavro/v2 v2.9.8 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pierrec/lz4 v2.0.5+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go index b182c35b96..b43335a573 100644 --- a/pulsar/internal/commands_test.go +++ b/pulsar/internal/commands_test.go @@ -26,7 +26,7 @@ import ( func TestConvertStringMap(t *testing.T) { m := make(map[string]string) m["a"] = "1" - m["containers"] = "2" + m["b"] = "2" pbm := ConvertFromStringMap(m) @@ -35,7 +35,7 @@ func TestConvertStringMap(t *testing.T) { m2 := ConvertToStringMap(pbm) assert.Equal(t, 2, len(m2)) assert.Equal(t, "1", m2["a"]) - assert.Equal(t, "2", m2["containers"]) + assert.Equal(t, "2", m2["b"]) } func TestReadMessageMetadata(t *testing.T) { @@ -50,7 +50,7 @@ func TestReadMessageMetadata(t *testing.T) { assert.Equal(t, len(props), 2) assert.Equal(t, "a", props[0].GetKey()) assert.Equal(t, "1", props[0].GetValue()) - assert.Equal(t, "containers", props[1].GetKey()) + assert.Equal(t, "b", props[1].GetKey()) assert.Equal(t, "2", props[1].GetValue()) // read message with batch of 1 @@ -132,7 +132,7 @@ func TestReadMessagesBatchSize10(t *testing.T) { } // Raw single message in old format -// metadata properties: properties: +// metadata properties: properties: // payload = "hello" var rawCompatSingleMessage = []byte{ 0x0e, 0x01, 0x08, 0x36, 0xb4, 0x66, 0x00, 0x00, @@ -146,7 +146,7 @@ var rawCompatSingleMessage = []byte{ } // Message with batch of 1 -// singe message metadata properties: properties: +// singe message metadata properties: properties: // payload = "hello" var rawBatchMessage1 = []byte{ 0x0e, 0x01, 0x1f, 0x80, 0x09, 0x68, 0x00, 0x00, @@ -161,7 +161,7 @@ var rawBatchMessage1 = []byte{ } // Message with batch of 10 -// singe message metadata properties: properties: +// singe message metadata properties: properties: // payload = "hello" var rawBatchMessage10 = []byte{ 0x0e, 0x01, 0x7b, 0x28, 0x8c, 0x08, From 7e23a685ca9e3e89b71be596e5ce2bf46602c930 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Tue, 24 Nov 2020 18:04:31 +0800 Subject: [PATCH 4/6] add comments --- pulsar/internal/batch_builder.go | 117 ++++++++++++--------- pulsar/internal/batch_builder_test.go | 18 ---- pulsar/internal/key_based_batch_builder.go | 26 ++++- 3 files changed, 92 insertions(+), 69 deletions(-) delete mode 100644 pulsar/internal/batch_builder_test.go diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 5a77cdeba6..3e1601f156 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -31,29 +31,43 @@ type BuffersPool interface { GetBuffer() Buffer } +// BatcherBuilderProvider defines func which returns the BatchBuilder. type BatcherBuilderProvider func( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, ) (BatchBuilder, error) +// BatchBuilder is a interface of batch builders type BatchBuilder interface { + // IsFull check if the size in the current batch exceeds the maximum size allowed by the batch IsFull() bool + + // Add will add single message to batch. Add( metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time, ) bool + + // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) + + // Flush all the messages buffered in multiple batches and wait until all + // messages have been successfully persisted. FlushBatches() ( batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, ) + + // Return the batch container batch message in multiple batches. + IsMultiBatches() bool + reset() Close() error - IsMultiBatches() bool } // batchContainer wraps the objects needed to a batch. +// batchContainer implement BatchBuilder as a single batch container. type batchContainer struct { buffer Buffer @@ -81,16 +95,7 @@ type batchContainer struct { log log.Logger } -func (bb *batchContainer) FlushBatches() ( - batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, -) { - panic("single batch container not support FlushBatches(), please use Flush() instead") -} - -func (bb *batchContainer) IsMultiBatches() bool { - return false -} - +// newBatchContainer init a batchContainer func newBatchContainer( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, @@ -142,100 +147,112 @@ func NewBatchBuilder( } // IsFull check if the size in the current batch exceeds the maximum size allowed by the batch -func (bb *batchContainer) IsFull() bool { - return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > uint32(bb.maxBatchSize) +func (bc *batchContainer) IsFull() bool { + return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize) } -func (bb *batchContainer) hasSpace(payload []byte) bool { +func (bc *batchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) - return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > uint32(bb.maxBatchSize) + return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize) } // Add will add single message to batch. -func (bb *batchContainer) Add( +func (bc *batchContainer) Add( metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time, ) bool { - if replicateTo != nil && bb.numMessages != 0 { + if replicateTo != nil && bc.numMessages != 0 { // If the current batch is not empty and we're trying to set the replication clusters, // then we need to force the current batch to flush and send the message individually return false - } else if bb.msgMetadata.ReplicateTo != nil { + } else if bc.msgMetadata.ReplicateTo != nil { // There's already a message with cluster replication list. need to flush before next // message can be sent return false - } else if bb.hasSpace(payload) { + } else if bc.hasSpace(payload) { // The current batch is full. Producer has to call Flush() to return false } - if bb.numMessages == 0 { + if bc.numMessages == 0 { var sequenceID uint64 if metadata.SequenceId != nil { sequenceID = *metadata.SequenceId } else { sequenceID = GetAndAdd(sequenceIDGenerator, 1) } - bb.msgMetadata.SequenceId = proto.Uint64(sequenceID) - bb.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now())) - bb.msgMetadata.ProducerName = &bb.producerName - bb.msgMetadata.ReplicateTo = replicateTo - bb.msgMetadata.PartitionKey = metadata.PartitionKey + bc.msgMetadata.SequenceId = proto.Uint64(sequenceID) + bc.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now())) + bc.msgMetadata.ProducerName = &bc.producerName + bc.msgMetadata.ReplicateTo = replicateTo + bc.msgMetadata.PartitionKey = metadata.PartitionKey if deliverAt.UnixNano() > 0 { - bb.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt))) + bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt))) } - bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceID) + bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID) } - addSingleMessageToBatch(bb.buffer, metadata, payload) + addSingleMessageToBatch(bc.buffer, metadata, payload) - bb.numMessages++ - bb.callbacks = append(bb.callbacks, callback) + bc.numMessages++ + bc.callbacks = append(bc.callbacks, callback) return true } -func (bb *batchContainer) reset() { - bb.numMessages = 0 - bb.buffer.Clear() - bb.callbacks = []interface{}{} - bb.msgMetadata.ReplicateTo = nil - bb.msgMetadata.DeliverAtTime = nil +func (bc *batchContainer) reset() { + bc.numMessages = 0 + bc.buffer.Clear() + bc.callbacks = []interface{}{} + bc.msgMetadata.ReplicateTo = nil + bc.msgMetadata.DeliverAtTime = nil } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. -func (bb *batchContainer) Flush() ( +func (bc *batchContainer) Flush() ( batchData Buffer, sequenceID uint64, callbacks []interface{}, ) { - if bb.numMessages == 0 { + if bc.numMessages == 0 { // No-Op for empty batch return nil, 0, nil } - bb.log.Debug("BatchBuilder flush: messages: ", bb.numMessages) + bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages) - bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages)) - bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages)) + bc.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bc.numMessages)) + bc.cmdSend.Send.NumMessages = proto.Int32(int32(bc.numMessages)) - uncompressedSize := bb.buffer.ReadableBytes() - bb.msgMetadata.UncompressedSize = &uncompressedSize + uncompressedSize := bc.buffer.ReadableBytes() + bc.msgMetadata.UncompressedSize = &uncompressedSize - buffer := bb.buffersPool.GetBuffer() + buffer := bc.buffersPool.GetBuffer() if buffer == nil { buffer = NewBuffer(int(uncompressedSize * 3 / 2)) } serializeBatch( - buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider, + buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, ) - callbacks = bb.callbacks - sequenceID = bb.cmdSend.Send.GetSequenceId() - bb.reset() + callbacks = bc.callbacks + sequenceID = bc.cmdSend.Send.GetSequenceId() + bc.reset() return buffer, sequenceID, callbacks } -func (bb *batchContainer) Close() error { - return bb.compressionProvider.Close() +// FlushBatches only for multiple batches container +func (bc *batchContainer) FlushBatches() ( + batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, +) { + panic("single batch container not support FlushBatches(), please use Flush() instead") +} + +// batchContainer as a single batch container +func (bc *batchContainer) IsMultiBatches() bool { + return false +} + +func (bc *batchContainer) Close() error { + return bc.compressionProvider.Close() } func getCompressionProvider( diff --git a/pulsar/internal/batch_builder_test.go b/pulsar/internal/batch_builder_test.go deleted file mode 100644 index 89c1144cff..0000000000 --- a/pulsar/internal/batch_builder_test.go +++ /dev/null @@ -1,18 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package internal diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 6f91658a2f..545c2c8996 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -28,11 +28,25 @@ import ( "github.com/apache/pulsar-client-go/pulsar/log" ) +/** + * Key based batch message container + * + * incoming single messages: + * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) + * + * batched into multiple batch messages: + * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] + */ + +// keyBasedBatches is a simple concurrent-safe map for the batchContainer type type keyBasedBatches struct { containers map[string]*batchContainer l *sync.RWMutex } +// keyBasedBatchContainer wraps the objects needed to key based batch. +// keyBasedBatchContainer implement BatchBuilder as a multiple batches +// container. type keyBasedBatchContainer struct { batches keyBasedBatches batchContainer @@ -40,6 +54,7 @@ type keyBasedBatchContainer struct { level compression.Level } +// newKeyBasedBatches init a keyBasedBatches func newKeyBasedBatches() keyBasedBatches { return keyBasedBatches{ containers: map[string]*batchContainer{}, @@ -65,6 +80,8 @@ func (h *keyBasedBatches) Val(key string) *batchContainer { return h.containers[key] } +// NewKeyBasedBatchBuilder init batch builder and return BatchBuilder +// pointer. Build a new key based batch message container. func NewKeyBasedBatchBuilder( maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, @@ -102,7 +119,7 @@ func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool { return bc.numMessages > 0 && (bc.buffer.ReadableBytes()+msgSize) > uint32(bc.maxBatchSize) } -// Add will add single message to batch. +// Add will add single message to key-based batch with message key. func (bc *keyBasedBatchContainer) Add( metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, payload []byte, @@ -124,6 +141,7 @@ func (bc *keyBasedBatchContainer) Add( var msgKey = getMessageKey(metadata) batchPart := bc.batches.Val(msgKey) if batchPart == nil { + // create batchContainer for new key t := newBatchContainer( bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID, bc.compressionType, bc.level, bc.buffersPool, bc.log, @@ -132,6 +150,7 @@ func (bc *keyBasedBatchContainer) Add( bc.batches.Add(msgKey, &t) } + // add message to batch container batchPart.Add( metadata, sequenceIDGenerator, payload, callback, replicateTo, deliverAt, @@ -157,6 +176,8 @@ func (bc *keyBasedBatchContainer) reset() { bc.batches.containers = map[string]*batchContainer{} } +// Flush all the messages buffered in multiple batches and wait until all +// messages have been successfully persisted. func (bc *keyBasedBatchContainer) FlushBatches() ( batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, ) { @@ -205,6 +226,9 @@ func (bc *keyBasedBatchContainer) Close() error { return bc.compressionProvider.Close() } +// getMessageKey extracts message key from message metadata. +// If the OrderingKey exists, the base64-encoded string is returned, +// otherwise the PartitionKey is returned. func getMessageKey(metadata *pb.SingleMessageMetadata) string { if k := metadata.GetOrderingKey(); k != nil { return base64.StdEncoding.EncodeToString(k) From f0915887e74f9cd88f033e418f2d850f35861a6e Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 25 Nov 2020 09:38:31 +0800 Subject: [PATCH 5/6] add comments --- pulsar/producer.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pulsar/producer.go b/pulsar/producer.go index 66c7d0e58c..b41415a4aa 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -153,6 +153,11 @@ type ProducerOptions struct { // MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint + // BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder) + // This will be used to create batch container when batching is enabled. + // Options: + // - DefaultBatchBuilder + // - KeyBasedBatchBuilder BatcherBuilderType } From 3e27bc5c30f1d5d2671ea5f3ae79c65b6d637d1b Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Mon, 30 Nov 2020 11:19:23 +0800 Subject: [PATCH 6/6] add order test & add issue track --- pulsar/consumer_test.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f499b913c9..6d58cd4091 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1774,22 +1774,30 @@ func TestKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { break } receivedConsumer1++ - if cnt, has := consumer1Keys[cm.Key()]; !has { - consumer1Keys[cm.Key()] = 1 - } else { - consumer1Keys[cm.Key()] = cnt + 1 + cnt := 0 + if _, has := consumer1Keys[cm.Key()]; has { + cnt = consumer1Keys[cm.Key()] } + assert.Equal( + t, fmt.Sprintf("value-%d", cnt), + string(cm.Payload()), + ) + consumer1Keys[cm.Key()] = cnt + 1 consumer1.Ack(cm.Message) case cm, ok := <-consumer2.Chan(): if !ok { break } receivedConsumer2++ - if cnt, has := consumer2Keys[cm.Key()]; !has { - consumer2Keys[cm.Key()] = 1 - } else { - consumer2Keys[cm.Key()] = cnt + 1 + cnt := 0 + if _, has := consumer2Keys[cm.Key()]; has { + cnt = consumer2Keys[cm.Key()] } + assert.Equal( + t, fmt.Sprintf("value-%d", cnt), + string(cm.Payload()), + ) + consumer2Keys[cm.Key()] = cnt + 1 consumer2.Ack(cm.Message) } } @@ -1869,5 +1877,5 @@ func TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) { receivedMessageIndex++ } - // TODO: add OrderingKey support + // TODO: add OrderingKey support, see GH issue #401 }