From a9f74e574555e4405c64bcfe7f5a266c71a8102d Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Fri, 18 Feb 2022 16:01:14 +0800 Subject: [PATCH] [BUGFIX]Transfer test fix (#23) * Flowcontrol on empty messages * mock queue empty message flow control * Update streaming/src/channel/channel.h Co-authored-by: Qing Wang * convert -1 to uint64_t max * timer interval init * data reader timeout setting * fix empty message emit Co-authored-by: ashione Co-authored-by: Qing Wang --- streaming/src/channel/channel.cc | 2 +- streaming/src/channel/channel.h | 4 ++-- streaming/src/data_reader.cc | 6 ++++-- streaming/src/data_writer.cc | 3 ++- streaming/src/test/mock_transfer_tests.cc | 5 +++-- 5 files changed, 12 insertions(+), 8 deletions(-) diff --git a/streaming/src/channel/channel.cc b/streaming/src/channel/channel.cc index 6766e9c5..c8528fa5 100644 --- a/streaming/src/channel/channel.cc +++ b/streaming/src/channel/channel.cc @@ -122,7 +122,7 @@ StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data, return StreamingStatus::OK; } -uint64_t StreamingQueueProducer::GetLastBundleId() { return queue_->GetCurrentSeqId(); } +uint64_t StreamingQueueProducer::GetLastBundleId() const { return queue_->GetCurrentSeqId(); } Status StreamingQueueProducer::PushQueueItem(uint8_t *data, uint32_t data_size, uint64_t timestamp, uint64_t msg_id_start, diff --git a/streaming/src/channel/channel.h b/streaming/src/channel/channel.h index 4c099186..6adaf43d 100644 --- a/streaming/src/channel/channel.h +++ b/streaming/src/channel/channel.h @@ -105,7 +105,7 @@ class ProducerChannel { virtual StreamingStatus RefreshChannelInfo() = 0; virtual StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) = 0; virtual StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) = 0; - virtual uint64_t GetLastBundleId() = 0; + virtual uint64_t GetLastBundleId() const = 0; protected: std::shared_ptr transfer_config_; @@ -199,7 +199,7 @@ class MockProducer : public ProducerChannel { StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) override { return StreamingStatus::OK; } - uint64_t GetLastBundleId() { return current_bundle_id_; } + uint64_t GetLastBundleId() const { return current_bundle_id_; } private: uint64_t current_bundle_id_; diff --git a/streaming/src/data_reader.cc b/streaming/src/data_reader.cc index 99746768..18a30f84 100644 --- a/streaming/src/data_reader.cc +++ b/streaming/src/data_reader.cc @@ -33,13 +33,15 @@ void DataReader::Init(const std::vector &input_ids, void DataReader::Init(const std::vector &input_ids, const std::vector &init_params, int64_t timer_interval) { - STREAMING_LOG(INFO) << input_ids.size() << " queue to init."; + STREAMING_LOG(INFO) << "Reader " << input_ids.size() + << " queue to init, timer interval " << timer_interval; transfer_config_->Set(ConfigEnum::QUEUE_ID_VECTOR, input_ids); last_fetched_queue_item_ = nullptr; timer_interval_ = timer_interval; - last_message_ts_ = 0; + // NOTE(lingxuan.zlx): Last recived message timestamp is marked in current system time. + last_message_ts_ = current_sys_time_ms(); input_queue_ids_ = input_ids; last_message_latency_ = 0; last_bundle_unit_ = 0; diff --git a/streaming/src/data_writer.cc b/streaming/src/data_writer.cc index 40e4b50b..e0fd78be 100644 --- a/streaming/src/data_writer.cc +++ b/streaming/src/data_writer.cc @@ -137,7 +137,8 @@ StreamingStatus DataWriter::Init(const std::vector &queue_id_vec, const std::vector &channel_message_id_vec, const std::vector &queue_size_vec) { STREAMING_CHECK(!queue_id_vec.empty() && !channel_message_id_vec.empty()); - STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName(); + STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName() + << ", reliability level => " << runtime_context_->GetConfig().GetReliabilityLevel(); output_queue_ids_ = queue_id_vec; transfer_config_->Set(ConfigEnum::QUEUE_ID_VECTOR, queue_id_vec); diff --git a/streaming/src/test/mock_transfer_tests.cc b/streaming/src/test/mock_transfer_tests.cc index d7616f7d..8e199592 100644 --- a/streaming/src/test/mock_transfer_tests.cc +++ b/streaming/src/test/mock_transfer_tests.cc @@ -83,7 +83,7 @@ class StreamingTransferTest : public ::testing::Test { std::vector queue_vec; std::shared_ptr writer_runtime_context; std::shared_ptr reader_runtime_context; - int64_t timer_interval = -1; + int64_t timer_interval = std::numeric_limits::max(); }; TEST_F(StreamingTransferTest, exchange_single_channel_test) { @@ -102,6 +102,7 @@ TEST_F(StreamingTransferTest, exchange_single_channel_test) { } TEST_F(StreamingTransferTest, exchange_multichannel_test) { + timer_interval = 0xffff; int channel_num = 4; InitTransfer(4); writer->Run(); @@ -111,7 +112,7 @@ TEST_F(StreamingTransferTest, exchange_multichannel_test) { writer->WriteMessageToBufferRing(queue_vec[i], data, data_size); std::shared_ptr msg; reader->GetBundle(5000, msg); - EXPECT_EQ(msg->from, queue_vec[i]); + EXPECT_EQ(msg->from, queue_vec[i]) << *msg.get(); StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data); auto &message_list = bundle_ptr->GetMessageList(); auto &message = message_list.front();