Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX]Transfer test fix #23

Merged
merged 11 commits into from
Feb 18, 2022
2 changes: 1 addition & 1 deletion streaming/src/channel/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ StreamingStatus StreamingQueueProducer::ProduceItemToChannel(uint8_t *data,
return StreamingStatus::OK;
}

uint64_t StreamingQueueProducer::GetLastBundleId() { return queue_->GetCurrentSeqId(); }
uint64_t StreamingQueueProducer::GetLastBundleId() const { return queue_->GetCurrentSeqId(); }

Status StreamingQueueProducer::PushQueueItem(uint8_t *data, uint32_t data_size,
uint64_t timestamp, uint64_t msg_id_start,
Expand Down
4 changes: 2 additions & 2 deletions streaming/src/channel/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ProducerChannel {
virtual StreamingStatus RefreshChannelInfo() = 0;
virtual StreamingStatus ProduceItemToChannel(uint8_t *data, uint32_t data_size) = 0;
virtual StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) = 0;
virtual uint64_t GetLastBundleId() = 0;
virtual uint64_t GetLastBundleId() const = 0;

protected:
std::shared_ptr<Config> transfer_config_;
Expand Down Expand Up @@ -199,7 +199,7 @@ class MockProducer : public ProducerChannel {
StreamingStatus NotifyChannelConsumed(uint64_t channel_offset) override {
return StreamingStatus::OK;
}
uint64_t GetLastBundleId() { return current_bundle_id_; }
uint64_t GetLastBundleId() const { return current_bundle_id_; }

private:
uint64_t current_bundle_id_;
Expand Down
6 changes: 4 additions & 2 deletions streaming/src/data_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ void DataReader::Init(const std::vector<ObjectID> &input_ids,
void DataReader::Init(const std::vector<ObjectID> &input_ids,
const std::vector<ChannelCreationParameter> &init_params,
int64_t timer_interval) {
STREAMING_LOG(INFO) << input_ids.size() << " queue to init.";
STREAMING_LOG(INFO) << "Reader " << input_ids.size()
<< " queue to init, timer interval " << timer_interval;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add = here to make it make understandable?

"Reader, input id size =" << input_ids.size() << "xxxx timer interval = " << timer_interval;


transfer_config_->Set(ConfigEnum::QUEUE_ID_VECTOR, input_ids);

last_fetched_queue_item_ = nullptr;
timer_interval_ = timer_interval;
last_message_ts_ = 0;
// NOTE(lingxuan.zlx): Last recived message timestamp is marked in current system time.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

last_message_ts_ = current_sys_time_ms();
input_queue_ids_ = input_ids;
last_message_latency_ = 0;
last_bundle_unit_ = 0;
Expand Down
3 changes: 2 additions & 1 deletion streaming/src/data_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ StreamingStatus DataWriter::Init(const std::vector<ObjectID> &queue_id_vec,
const std::vector<uint64_t> &channel_message_id_vec,
const std::vector<uint64_t> &queue_size_vec) {
STREAMING_CHECK(!queue_id_vec.empty() && !channel_message_id_vec.empty());
STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName();
STREAMING_LOG(INFO) << "Job name => " << runtime_context_->GetConfig().GetJobName()
<< ", reliability level => " << runtime_context_->GetConfig().GetReliabilityLevel();

output_queue_ids_ = queue_id_vec;
transfer_config_->Set(ConfigEnum::QUEUE_ID_VECTOR, queue_id_vec);
Expand Down
5 changes: 3 additions & 2 deletions streaming/src/test/mock_transfer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class StreamingTransferTest : public ::testing::Test {
std::vector<ObjectID> queue_vec;
std::shared_ptr<RuntimeContext> writer_runtime_context;
std::shared_ptr<RuntimeContext> reader_runtime_context;
int64_t timer_interval = -1;
int64_t timer_interval = std::numeric_limits<int64_t>::max();
};

TEST_F(StreamingTransferTest, exchange_single_channel_test) {
Expand All @@ -102,6 +102,7 @@ TEST_F(StreamingTransferTest, exchange_single_channel_test) {
}

TEST_F(StreamingTransferTest, exchange_multichannel_test) {
timer_interval = 0xffff;
int channel_num = 4;
InitTransfer(4);
writer->Run();
Expand All @@ -111,7 +112,7 @@ TEST_F(StreamingTransferTest, exchange_multichannel_test) {
writer->WriteMessageToBufferRing(queue_vec[i], data, data_size);
std::shared_ptr<DataBundle> msg;
reader->GetBundle(5000, msg);
EXPECT_EQ(msg->from, queue_vec[i]);
EXPECT_EQ(msg->from, queue_vec[i]) << *msg.get();
StreamingMessageBundlePtr bundle_ptr = StreamingMessageBundle::FromBytes(msg->data);
auto &message_list = bundle_ptr->GetMessageList();
auto &message = message_list.front();
Expand Down