Skip to content

Commit

Permalink
[BUGFIX]Transfer test fix (#23)
Browse files Browse the repository at this point in the history
* Flowcontrol on empty messages

* mock queue empty message flow control

* Update streaming/src/channel/channel.h

Co-authored-by: Qing Wang <[email protected]>

* convert -1 to uint64_t max

* timer interval init

* data reader timeout setting

* fix empty message emit

Co-authored-by: ashione <[email protected]>
Co-authored-by: Qing Wang <[email protected]>
  • Loading branch information
3 people committed Feb 18, 2022
1 parent 626cc38 commit a9f74e5
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 8 deletions.
2 changes: 1 addition & 1 deletion streaming/src/channel/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data,
return StreamingStatus::OK;
}

uint64_t StreamingQueueProducer::GetLastBundleId() { return queue_->GetCurrentSeqId(); }
uint64_t StreamingQueueProducer::GetLastBundleId() const { return queue_->GetCurrentSeqId(); }

Status StreamingQueueProducer::PushQueueItem(uint8_t *data, uint32_t data_size,
uint64_t timestamp, uint64_t msg_id_start,
Expand Down
4 changes: 2 additions & 2 deletions streaming/src/channel/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ProducerChannel {
virtual StreamingStatus RefreshChannelInfo() = 0;
virtual StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) = 0;
virtual StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) = 0;
virtual uint64_t GetLastBundleId() = 0;
virtual uint64_t GetLastBundleId() const = 0;

protected:
std::shared_ptr<Config> transfer_config_;
Expand Down Expand Up @@ -199,7 +199,7 @@ class MockProducer : public ProducerChannel {
StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) override {
return StreamingStatus::OK;
}
uint64_t GetLastBundleId() { return current_bundle_id_; }
uint64_t GetLastBundleId() const { return current_bundle_id_; }

private:
uint64_t current_bundle_id_;
Expand Down
6 changes: 4 additions & 2 deletions streaming/src/data_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ void DataReader::Init(const std::vector<ObjectID> &input_ids,
void DataReader::Init(const std::vector<ObjectID> &input_ids,
const std::vector<ChannelCreationParameter> &init_params,
int64_t timer_interval) {
STREAMING_LOG(INFO) << input_ids.size() << " queue to init.";
STREAMING_LOG(INFO) << "Reader " << input_ids.size()
<< " queue to init, timer interval " << timer_interval;

transfer_config_->Set(ConfigEnum::QUEUE_ID_VECTOR, input_ids);

last_fetched_queue_item_ = nullptr;
timer_interval_ = timer_interval;
last_message_ts_ = 0;
// NOTE(lingxuan.zlx): Last recived message timestamp is marked in current system time.
last_message_ts_ = current_sys_time_ms();
input_queue_ids_ = input_ids;
last_message_latency_ = 0;
last_bundle_unit_ = 0;
Expand Down
3 changes: 2 additions & 1 deletion streaming/src/data_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ StreamingStatus DataWriter::Init(const std::vector<ObjectID> &queue_id_vec,
const std::vector<uint64_t> &channel_message_id_vec,
const std::vector<uint64_t> &queue_size_vec) {
STREAMING_CHECK(!queue_id_vec.empty() && !channel_message_id_vec.empty());
STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName();
STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName()
<< ", reliability level => " << runtime_context_->GetConfig().GetReliabilityLevel();

output_queue_ids_ = queue_id_vec;
transfer_config_->Set(ConfigEnum::QUEUE_ID_VECTOR, queue_id_vec);
Expand Down
5 changes: 3 additions & 2 deletions streaming/src/test/mock_transfer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class StreamingTransferTest : public ::testing::Test {
std::vector<ObjectID> queue_vec;
std::shared_ptr<RuntimeContext> writer_runtime_context;
std::shared_ptr<RuntimeContext> reader_runtime_context;
int64_t timer_interval = -1;
int64_t timer_interval = std::numeric_limits<int64_t>::max();
};

TEST_F(StreamingTransferTest, exchange_single_channel_test) {
Expand All @@ -102,6 +102,7 @@ TEST_F(StreamingTransferTest, exchange_single_channel_test) {
}

TEST_F(StreamingTransferTest, exchange_multichannel_test) {
timer_interval = 0xffff;
int channel_num = 4;
InitTransfer(4);
writer->Run();
Expand All @@ -111,7 +112,7 @@ TEST_F(StreamingTransferTest, exchange_multichannel_test) {
writer->WriteMessageToBufferRing(queue_vec[i], data, data_size);
std::shared_ptr<DataBundle> msg;
reader->GetBundle(5000, msg);
EXPECT_EQ(msg->from, queue_vec[i]);
EXPECT_EQ(msg->from, queue_vec[i]) << *msg.get();
StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data);
auto &message_list = bundle_ptr->GetMessageList();
auto &message = message_list.front();
Expand Down

0 comments on commit a9f74e5

Please sign in to comment.