From 7c119d8d1fa285ab63fc86c10fcd99468315a8b2 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Tue, 11 Sep 2018 15:26:38 +0900 Subject: [PATCH] Add functionality so Starcoder can send back arbitrary PMTs, not just blobs (#78) * rename convert_pmt_proto to convert_proto_to_pmt * init commit of pmt_to_proto * finish pmt_to_proto? * clang * update proto * convert in enqueue_message_sink * update starcoder to parse the protobufs * modify waterfall plotter to pass up protobuf * update noaa * introduce blob_value * add blob to proto_to_pmt * test for proto_to_pmt * use pmt in meteor decoder * experimental drop payload * fix closing * check if queue is closed() to exit goroutine * revert * factor out function * dont expose in header file * use back inserter * use repeatedptrfieldbackinsertiterator * clang * use std::copy * rename request to respose * change signature of pmt_to_proto * changes to test.grc --- api/starcoder.proto | 7 +- cqueue/c_queue.go | 4 + cqueue/string_queue.cc | 5 + cqueue/string_queue.h | 1 + flowgraphs/test.grc | 2 +- gr-starcoder/lib/CMakeLists.txt | 1 + gr-starcoder/lib/command_source_impl.cc | 2 +- gr-starcoder/lib/enqueue_message_sink_impl.cc | 5 +- gr-starcoder/lib/meteor_decoder_sink_impl.cc | 26 ++- gr-starcoder/lib/noaa_apt_sink_impl.cc | 10 +- gr-starcoder/lib/pmt_to_proto.cc | 201 ++++++++++++++++++ gr-starcoder/lib/pmt_to_proto.h | 29 +++ gr-starcoder/lib/proto_to_pmt.cc | 176 +++++++-------- gr-starcoder/lib/proto_to_pmt.h | 6 +- gr-starcoder/lib/qa_enqueue_message_sink.cc | 10 +- gr-starcoder/lib/waterfall_plotter_impl.cc | 9 +- gr-starcoder/python/qa_command_source.py | 24 ++- sampleclient/main.go | 27 ++- server/starcoder.go | 76 ++++--- 19 files changed, 468 insertions(+), 153 deletions(-) create mode 100644 gr-starcoder/lib/pmt_to_proto.cc create mode 100644 gr-starcoder/lib/pmt_to_proto.h diff --git a/api/starcoder.proto b/api/starcoder.proto index 23829f11..df45591a 100644 --- a/api/starcoder.proto +++ b/api/starcoder.proto @@ -84,6 +84,8 @@ message BlockMessage { UniformVector uniform_vector_value = 9; Dict dict_value = 10; + + bytes blob_value = 11; } } @@ -203,7 +205,10 @@ message RunFlowgraphResponse { string block_id = 1; // Serialized arbitrary PMT (GNURadio Polymorphic Message Type) - bytes payload = 2; + bytes payload = 2 [deprecated=true]; + + // PMT (GNURadio Polymorphic Message Type) response sent from the block + BlockMessage pmt = 3; } // Complex number diff --git a/cqueue/c_queue.go b/cqueue/c_queue.go index 71a190d0..94de2eae 100644 --- a/cqueue/c_queue.go +++ b/cqueue/c_queue.go @@ -49,6 +49,10 @@ func (q *CStringQueue) Close() { q.queue.Close() } +func (q *CStringQueue) Closed() bool { + return q.queue.Closed() +} + func (q *CStringQueue) Push(str string) { q.queue.Push(str) } diff --git a/cqueue/string_queue.cc b/cqueue/string_queue.cc index d0c73030..d22e9d18 100644 --- a/cqueue/string_queue.cc +++ b/cqueue/string_queue.cc @@ -65,6 +65,11 @@ void string_queue::close() { condition_var_.notify_one(); } +bool string_queue::closed() { + std::unique_lock lock(mutex_); + return closed_; +} + uint64_t string_queue::get_ptr() const { return reinterpret_cast(this); } diff --git a/cqueue/string_queue.h b/cqueue/string_queue.h index 21108d75..8c438132 100644 --- a/cqueue/string_queue.h +++ b/cqueue/string_queue.h @@ -42,6 +42,7 @@ class string_queue { std::string blocking_pop(); unsigned long get_ptr() const; void close(); + bool closed(); // TODO: Make this uint64_t static string_queue *queue_from_pointer(unsigned long long ptr); private: diff --git a/flowgraphs/test.grc b/flowgraphs/test.grc index 83394a3c..0767f16e 100644 --- a/flowgraphs/test.grc +++ b/flowgraphs/test.grc @@ -430,7 +430,7 @@ msg - pmt.make_u8vector(6, ord('b')) + pmt.make_c64vector(2, -7+3.2j) minoutbuf diff --git a/gr-starcoder/lib/CMakeLists.txt b/gr-starcoder/lib/CMakeLists.txt index 9cfd217d..276ce15c 100644 --- a/gr-starcoder/lib/CMakeLists.txt +++ b/gr-starcoder/lib/CMakeLists.txt @@ -60,6 +60,7 @@ list(APPEND starcoder_sources ${CMAKE_CURRENT_SOURCE_DIR}/../../cqueue/string_queue.cc ${hw_proto_srcs} proto_to_pmt.cc + pmt_to_proto.cc init.c driver.c firmware.c diff --git a/gr-starcoder/lib/command_source_impl.cc b/gr-starcoder/lib/command_source_impl.cc index 9cccbc8a..9dae3606 100644 --- a/gr-starcoder/lib/command_source_impl.cc +++ b/gr-starcoder/lib/command_source_impl.cc @@ -68,7 +68,7 @@ void command_source_impl::readloop() { GR_LOG_WARN(d_logger, "Failed to deserialize gRPC"); continue; } - pmt::pmt_t m = convert_pmt_proto(grpc_pmt); + pmt::pmt_t m = convert_proto_to_pmt(grpc_pmt); message_port_pub(port_, m); } } diff --git a/gr-starcoder/lib/enqueue_message_sink_impl.cc b/gr-starcoder/lib/enqueue_message_sink_impl.cc index dd5f5557..3becee74 100644 --- a/gr-starcoder/lib/enqueue_message_sink_impl.cc +++ b/gr-starcoder/lib/enqueue_message_sink_impl.cc @@ -25,6 +25,8 @@ #include #include "enqueue_message_sink_impl.h" +#include "pmt_to_proto.h" + namespace gr { namespace starcoder { @@ -51,7 +53,8 @@ enqueue_message_sink_impl::~enqueue_message_sink_impl() {} void enqueue_message_sink_impl::handler(pmt::pmt_t msg) { if (string_queue_ != NULL) { - std::string serialized = pmt::serialize_str(msg); + std::string serialized = convert_pmt_to_proto(msg).SerializeAsString(); + if (serialized.length() > 10485760) { GR_LOG_ERROR(d_logger, boost::format("Received large packet of length %d in " diff --git a/gr-starcoder/lib/meteor_decoder_sink_impl.cc b/gr-starcoder/lib/meteor_decoder_sink_impl.cc index c93fd7ce..66cc06d6 100644 --- a/gr-starcoder/lib/meteor_decoder_sink_impl.cc +++ b/gr-starcoder/lib/meteor_decoder_sink_impl.cc @@ -28,6 +28,8 @@ #include "meteor/meteor_decoder.h" #include "meteor/meteor_packet.h" +#include "pmt_to_proto.h" + namespace gr { namespace starcoder { @@ -94,7 +96,8 @@ bool meteor_decoder_sink_impl::stop() { int ok = 0; while (decoder.pos() < total_size_ - meteor::SOFT_FRAME_LEN) { total++; - bool res = decoder.decode_one_frame(raw.get(), total_size_, error_corrected_data.get()); + bool res = decoder.decode_one_frame(raw.get(), total_size_, + error_corrected_data.get()); if (res) { ok++; std::cout << std::dec << 100. * decoder.pos() / total_size_ << "% " @@ -114,10 +117,23 @@ bool meteor_decoder_sink_impl::stop() { std::string png_b = packeter.dump_gray_image(meteor::BLUE_APID); if (string_queue_ != NULL) { - if (!png_img.empty()) string_queue_->push(png_img); - if (!png_r.empty()) string_queue_->push(png_r); - if (!png_g.empty()) string_queue_->push(png_g); - if (!png_b.empty()) string_queue_->push(png_b); + ::starcoder::BlockMessage grpc_pmt; + if (!png_img.empty()) { + grpc_pmt.set_blob_value(png_img); + string_queue_->push(grpc_pmt.SerializeAsString()); + } + if (!png_r.empty()) { + grpc_pmt.set_blob_value(png_r); + string_queue_->push(grpc_pmt.SerializeAsString()); + } + if (!png_g.empty()) { + grpc_pmt.set_blob_value(png_g); + string_queue_->push(grpc_pmt.SerializeAsString()); + } + if (!png_b.empty()) { + grpc_pmt.set_blob_value(png_b); + string_queue_->push(grpc_pmt.SerializeAsString()); + } } if (!filename_.empty()) { diff --git a/gr-starcoder/lib/noaa_apt_sink_impl.cc b/gr-starcoder/lib/noaa_apt_sink_impl.cc index b62e031e..cd8318a8 100644 --- a/gr-starcoder/lib/noaa_apt_sink_impl.cc +++ b/gr-starcoder/lib/noaa_apt_sink_impl.cc @@ -47,6 +47,7 @@ #include #include "gil_util.h" +#include "pmt_to_proto.h" namespace gr { namespace starcoder { @@ -128,9 +129,14 @@ void noaa_apt_sink_impl::write_image(std::string filename) { if (!d_flip) image_string = (store_gray_to_png_string(image_received_view_)); else - image_string = store_gray_to_png_string(flipped_up_down_view(image_received_view_)); + image_string = + store_gray_to_png_string(flipped_up_down_view(image_received_view_)); - if (!image_string.empty()) string_queue_->push(image_string); + if (!image_string.empty()) { + ::starcoder::BlockMessage grpc_pmt; + grpc_pmt.set_blob_value(image_string); + string_queue_->push(grpc_pmt.SerializeAsString()); + } } } diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc new file mode 100644 index 00000000..d6c1ce1b --- /dev/null +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -0,0 +1,201 @@ +/* -*- c++ -*- */ +/* + * Copyright 2018 Infostellar, Inc. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#include "pmt_to_proto.h" + +void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, + starcoder::UniformVector *uni_vector) { + if (pmt::is_u8vector(pmt_msg)) { + starcoder::UVector *u_vector = uni_vector->mutable_u_value(); + u_vector->set_size(starcoder::IntSize::Size8); + const std::vector vector_elements = + pmt::u8vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u_vector->mutable_value())); + } else if (pmt::is_s8vector(pmt_msg)) { + starcoder::IVector *i_vector = uni_vector->mutable_i_value(); + i_vector->set_size(starcoder::IntSize::Size8); + const std::vector vector_elements = pmt::s8vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i_vector->mutable_value())); + } else if (pmt::is_u16vector(pmt_msg)) { + starcoder::UVector *u_vector = uni_vector->mutable_u_value(); + u_vector->set_size(starcoder::IntSize::Size16); + const std::vector vector_elements = + pmt::u16vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u_vector->mutable_value())); + } else if (pmt::is_s16vector(pmt_msg)) { + starcoder::IVector *i_vector = uni_vector->mutable_i_value(); + i_vector->set_size(starcoder::IntSize::Size16); + const std::vector vector_elements = + pmt::s16vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i_vector->mutable_value())); + } else if (pmt::is_u32vector(pmt_msg)) { + starcoder::UVector *u_vector = uni_vector->mutable_u_value(); + u_vector->set_size(starcoder::IntSize::Size32); + const std::vector vector_elements = + pmt::u32vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u_vector->mutable_value())); + } else if (pmt::is_s32vector(pmt_msg)) { + starcoder::IVector *i_vector = uni_vector->mutable_i_value(); + i_vector->set_size(starcoder::IntSize::Size32); + const std::vector vector_elements = + pmt::s32vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i_vector->mutable_value())); + } else if (pmt::is_u64vector(pmt_msg)) { + starcoder::U64Vector *u64_vector = uni_vector->mutable_u64_value(); + const std::vector vector_elements = + pmt::u64vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u64_vector->mutable_value())); + } else if (pmt::is_s64vector(pmt_msg)) { + starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value(); + const std::vector vector_elements = + pmt::s64vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i64_vector->mutable_value())); + } else if (pmt::is_f32vector(pmt_msg)) { + starcoder::F32Vector *f32_vector = uni_vector->mutable_f32_value(); + const std::vector vector_elements = pmt::f32vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + f32_vector->mutable_value())); + } else if (pmt::is_f64vector(pmt_msg)) { + starcoder::F64Vector *f64_vector = uni_vector->mutable_f64_value(); + const std::vector vector_elements = + pmt::f64vector_elements(pmt_msg); + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + f64_vector->mutable_value())); + } else if (pmt::is_c32vector(pmt_msg)) { + starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value(); + const std::vector> vector_elements = + pmt::c32vector_elements(pmt_msg); + std::transform( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedPtrFieldBackInsertIterator< + starcoder::Complex32>(c32_vector->mutable_value()), + [](std::complex c)->starcoder::Complex32 { + starcoder::Complex32 new_val; + new_val.set_real_value(c.real()); + new_val.set_imaginary_value(c.imag()); + return new_val; + }); + } else if (pmt::is_c64vector(pmt_msg)) { + starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value(); + const std::vector> vector_elements = + pmt::c64vector_elements(pmt_msg); + std::transform( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedPtrFieldBackInsertIterator< + starcoder::Complex>(c64_vector->mutable_value()), + [](std::complex c)->starcoder::Complex { + starcoder::Complex new_val; + new_val.set_real_value(c.real()); + new_val.set_imaginary_value(c.imag()); + return new_val; + }); + } +} + +starcoder::BlockMessage convert_pmt_to_proto(const pmt::pmt_t &pmt_msg) { + starcoder::BlockMessage proto_msg; + if (pmt::is_blob(pmt_msg)) { + proto_msg.set_blob_value(pmt::blob_data(pmt_msg), + pmt::blob_length(pmt_msg)); + } else if (pmt::is_uniform_vector(pmt_msg)) { + convert_proto_uniform_vector(pmt_msg, + proto_msg.mutable_uniform_vector_value()); + } else if (pmt::is_bool(pmt_msg)) { + proto_msg.set_boolean_value(pmt::to_bool(pmt_msg)); + } else if (pmt::is_symbol(pmt_msg)) { + proto_msg.set_symbol_value(pmt::symbol_to_string(pmt_msg)); + } else if (pmt::is_integer(pmt_msg)) { + proto_msg.set_integer_value(pmt::to_long(pmt_msg)); + } else if (pmt::is_uint64(pmt_msg)) { + proto_msg.set_integer_value(pmt::to_uint64(pmt_msg)); + } else if (pmt::is_real(pmt_msg)) { + proto_msg.set_double_value(pmt::to_double(pmt_msg)); + } else if (pmt::is_complex(pmt_msg)) { + std::complex val = pmt::to_complex(pmt_msg); + starcoder::Complex *complex = proto_msg.mutable_complex_value(); + complex->set_real_value(val.real()); + complex->set_imaginary_value(val.imag()); + } else if (pmt::is_pair(pmt_msg)) { + starcoder::Pair *pair = proto_msg.mutable_pair_value(); + starcoder::BlockMessage car = convert_pmt_to_proto(pmt::car(pmt_msg)); + starcoder::BlockMessage cdr = convert_pmt_to_proto(pmt::cdr(pmt_msg)); + pair->mutable_car()->Swap(&car); + pair->mutable_cdr()->Swap(&cdr); + } else if (pmt::is_tuple(pmt_msg)) { + starcoder::List *list = proto_msg.mutable_list_value(); + list->set_type(starcoder::List::TUPLE); + for (int i = 0; i < pmt::length(pmt_msg); i++) { + starcoder::BlockMessage element = + convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i)); + list->add_value()->Swap(&element); + } + } else if (pmt::is_vector(pmt_msg)) { + starcoder::List *list = proto_msg.mutable_list_value(); + list->set_type(starcoder::List::VECTOR); + for (int i = 0; i < pmt::length(pmt_msg); i++) { + starcoder::BlockMessage element = + convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i)); + list->add_value()->Swap(&element); + } + } else if (pmt::is_dict(pmt_msg)) { + starcoder::Dict *dict = proto_msg.mutable_dict_value(); + pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg); + for (int i = 0; i < pmt::length(key_value_pairs_list); i++) { + starcoder::Dict_Entry *entry = dict->add_entry(); + + starcoder::BlockMessage key = + convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list))); + starcoder::BlockMessage value = + convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list))); + + entry->mutable_key()->Swap(&key); + entry->mutable_value()->Swap(&value); + } + } + return proto_msg; +} diff --git a/gr-starcoder/lib/pmt_to_proto.h b/gr-starcoder/lib/pmt_to_proto.h new file mode 100644 index 00000000..23f8dabb --- /dev/null +++ b/gr-starcoder/lib/pmt_to_proto.h @@ -0,0 +1,29 @@ +/* -*- c++ -*- */ +/* + * Copyright 2018 Infostellar, Inc. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_PMT_TO_PROTO_H +#define INCLUDED_PMT_TO_PROTO_H + +#include +#include "starcoder.pb.h" + +starcoder::BlockMessage convert_pmt_to_proto(const pmt::pmt_t &pmt_msg); + +#endif /* INCLUDED_PMT_TO_PROTO_H */ diff --git a/gr-starcoder/lib/proto_to_pmt.cc b/gr-starcoder/lib/proto_to_pmt.cc index c4971e66..2aef76b1 100644 --- a/gr-starcoder/lib/proto_to_pmt.cc +++ b/gr-starcoder/lib/proto_to_pmt.cc @@ -20,34 +20,6 @@ #include "proto_to_pmt.h" -pmt::pmt_t convert_pmt_proto(const starcoder::BlockMessage &proto_msg) { - starcoder::BlockMessage::MessageOneofCase type = - proto_msg.message_oneof_case(); - switch (type) { - case starcoder::BlockMessage::MessageOneofCase::kBooleanValue: - return pmt::from_bool(proto_msg.boolean_value()); - case starcoder::BlockMessage::MessageOneofCase::kSymbolValue: - return pmt::string_to_symbol(proto_msg.symbol_value()); - case starcoder::BlockMessage::MessageOneofCase::kIntegerValue: - return pmt::from_long(proto_msg.integer_value()); - case starcoder::BlockMessage::MessageOneofCase::kDoubleValue: - return pmt::from_double(proto_msg.double_value()); - case starcoder::BlockMessage::MessageOneofCase::kComplexValue: - return pmt::from_complex(proto_msg.complex_value().real_value(), - proto_msg.complex_value().imaginary_value()); - case starcoder::BlockMessage::MessageOneofCase::kPairValue: - return pmt::cons(convert_pmt_proto(proto_msg.pair_value().car()), - convert_pmt_proto(proto_msg.pair_value().cdr())); - case starcoder::BlockMessage::MessageOneofCase::kListValue: - return convert_pmt_list(proto_msg.list_value()); - case starcoder::BlockMessage::MessageOneofCase::kUniformVectorValue: - return convert_pmt_uniform_vector(proto_msg.uniform_vector_value()); - case starcoder::BlockMessage::MessageOneofCase::kDictValue: - return convert_pmt_dict(proto_msg.dict_value()); - } - return pmt::get_PMT_NIL(); -} - pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list) { int size = proto_pmt_list.value_size(); if (proto_pmt_list.type() == starcoder::List::TUPLE) { @@ -55,70 +27,70 @@ pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list) { case 0: return pmt::make_tuple(); case 1: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0))); case 2: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1))); case 3: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2))); case 4: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3))); case 5: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4))); case 6: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5))); case 7: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6))); case 8: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6)), - convert_pmt_proto(proto_pmt_list.value(7))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6)), + convert_proto_to_pmt(proto_pmt_list.value(7))); case 9: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6)), - convert_pmt_proto(proto_pmt_list.value(7)), - convert_pmt_proto(proto_pmt_list.value(8))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6)), + convert_proto_to_pmt(proto_pmt_list.value(7)), + convert_proto_to_pmt(proto_pmt_list.value(8))); case 10: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6)), - convert_pmt_proto(proto_pmt_list.value(7)), - convert_pmt_proto(proto_pmt_list.value(8)), - convert_pmt_proto(proto_pmt_list.value(9))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6)), + convert_proto_to_pmt(proto_pmt_list.value(7)), + convert_proto_to_pmt(proto_pmt_list.value(8)), + convert_proto_to_pmt(proto_pmt_list.value(9))); default: throw("PMT tuple sizes >10 not supported"); } @@ -126,7 +98,7 @@ pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list) { pmt::pmt_t vec = pmt::make_vector(proto_pmt_list.value_size(), pmt::get_PMT_NIL()); for (int i = 0; i < proto_pmt_list.value_size(); i++) { - pmt::vector_set(vec, i, convert_pmt_proto(proto_pmt_list.value(i))); + pmt::vector_set(vec, i, convert_proto_to_pmt(proto_pmt_list.value(i))); } return vec; } else @@ -246,8 +218,40 @@ pmt::pmt_t convert_pmt_uniform_vector( pmt::pmt_t convert_pmt_dict(const starcoder::Dict &proto_pmt_dict) { pmt::pmt_t dict = pmt::make_dict(); for (const starcoder::Dict_Entry &entry : proto_pmt_dict.entry()) { - dict = pmt::dict_add(dict, convert_pmt_proto(entry.key()), - convert_pmt_proto(entry.value())); + dict = pmt::dict_add(dict, convert_proto_to_pmt(entry.key()), + convert_proto_to_pmt(entry.value())); } return dict; } + +pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg) { + starcoder::BlockMessage::MessageOneofCase type = + proto_msg.message_oneof_case(); + switch (type) { + case starcoder::BlockMessage::MessageOneofCase::kBooleanValue: + return pmt::from_bool(proto_msg.boolean_value()); + case starcoder::BlockMessage::MessageOneofCase::kSymbolValue: + return pmt::string_to_symbol(proto_msg.symbol_value()); + case starcoder::BlockMessage::MessageOneofCase::kIntegerValue: + return pmt::from_long(proto_msg.integer_value()); + case starcoder::BlockMessage::MessageOneofCase::kDoubleValue: + return pmt::from_double(proto_msg.double_value()); + case starcoder::BlockMessage::MessageOneofCase::kComplexValue: + return pmt::from_complex(proto_msg.complex_value().real_value(), + proto_msg.complex_value().imaginary_value()); + case starcoder::BlockMessage::MessageOneofCase::kPairValue: + return pmt::cons(convert_proto_to_pmt(proto_msg.pair_value().car()), + convert_proto_to_pmt(proto_msg.pair_value().cdr())); + case starcoder::BlockMessage::MessageOneofCase::kListValue: + return convert_pmt_list(proto_msg.list_value()); + case starcoder::BlockMessage::MessageOneofCase::kUniformVectorValue: + return convert_pmt_uniform_vector(proto_msg.uniform_vector_value()); + case starcoder::BlockMessage::MessageOneofCase::kDictValue: + return convert_pmt_dict(proto_msg.dict_value()); + case starcoder::BlockMessage::MessageOneofCase::kBlobValue: + std::vector vec(proto_msg.blob_value().begin(), + proto_msg.blob_value().end()); + return pmt::init_u8vector(proto_msg.blob_value().size(), vec); + } + return pmt::get_PMT_NIL(); +} diff --git a/gr-starcoder/lib/proto_to_pmt.h b/gr-starcoder/lib/proto_to_pmt.h index 24862a11..f3923dc2 100644 --- a/gr-starcoder/lib/proto_to_pmt.h +++ b/gr-starcoder/lib/proto_to_pmt.h @@ -24,10 +24,6 @@ #include #include "starcoder.pb.h" -pmt::pmt_t convert_pmt_proto(const starcoder::BlockMessage &proto_msg); -pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list); -pmt::pmt_t convert_pmt_uniform_vector( - const starcoder::UniformVector &proto_pmt_uniform_vector); -pmt::pmt_t convert_pmt_dict(const starcoder::Dict &proto_pmt_dict); +pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg); #endif /* INCLUDED_PROTO_TO_PMT_H */ diff --git a/gr-starcoder/lib/qa_enqueue_message_sink.cc b/gr-starcoder/lib/qa_enqueue_message_sink.cc index 8455b132..d1a6c62f 100644 --- a/gr-starcoder/lib/qa_enqueue_message_sink.cc +++ b/gr-starcoder/lib/qa_enqueue_message_sink.cc @@ -85,10 +85,12 @@ void qa_enqueue_message_sink::test_registered_queue() { tb->stop(); tb->wait(); - // Check that the passed PMT blobs are accessible from the queue - // in the expected serialized binary format. - CPPUNIT_ASSERT_EQUAL(q.pop(), pmt::serialize_str(pmt::make_u8vector(10, 97))); - CPPUNIT_ASSERT_EQUAL(q.pop(), pmt::serialize_str(pmt::make_u8vector(3, 98))); + // TODO (rei): Explicitly check the contents of the returned string are in the + // expected + // binary format. Currently I'm unable to include the protobuf class in C++ + // tests. + CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 12); + CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 5); // Check that after retrieving all available messages, the queue // returns the empty string. diff --git a/gr-starcoder/lib/waterfall_plotter_impl.cc b/gr-starcoder/lib/waterfall_plotter_impl.cc index 0840b7bb..c95180e7 100644 --- a/gr-starcoder/lib/waterfall_plotter_impl.cc +++ b/gr-starcoder/lib/waterfall_plotter_impl.cc @@ -30,6 +30,8 @@ #include "numpy/arrayobject.h" #include "waterfall_plotter_impl.h" +#include "pmt_to_proto.h" + namespace gr { namespace starcoder { @@ -152,8 +154,11 @@ bool waterfall_plotter_impl::stop() { Py_ssize_t image_size = PyString_Size(result); char *image_buffer = PyString_AsString(result); if (image_buffer == NULL) goto error; - const std::string image_binary(image_buffer, image_size); - string_queue_->push(image_binary); + + ::starcoder::BlockMessage grpc_pmt; + grpc_pmt.set_blob_value(image_buffer, image_size); + + string_queue_->push(grpc_pmt.SerializeAsString()); } error: diff --git a/gr-starcoder/python/qa_command_source.py b/gr-starcoder/python/qa_command_source.py index 76681f23..4408ce05 100755 --- a/gr-starcoder/python/qa_command_source.py +++ b/gr-starcoder/python/qa_command_source.py @@ -103,6 +103,26 @@ def test_tuple(self): self.assertTrue(pmt.is_tuple(snk.get_message(0))) self.assertTrue(pmt.equal(snk.get_message(0), expected)) + def test_blob(self): + cs = starcoder.command_source() + snk = blocks.message_debug() + self.tb.msg_connect((cs, 'out'), (snk, 'store')) + + msg = starcoder_pb2.BlockMessage() + msg.blob_value = "data" + + expected = pmt.init_u8vector(4, [ord('d'), ord('a'), ord('t'), ord('a')]) + + self.tb.start() + cs.push(msg.SerializeToString()) + time.sleep(0.1) + self.tb.stop() + self.tb.wait() + + self.assertEqual(snk.num_messages(), 1) + self.assertTrue(pmt.is_u8vector(snk.get_message(0))) + self.assertTrue(pmt.equal(snk.get_message(0), expected)) + def test_u8_vector(self): cs = starcoder.command_source() snk = blocks.message_debug() @@ -130,10 +150,10 @@ def test_i32_vector(self): self.tb.msg_connect((cs, 'out'), (snk, 'store')) msg = starcoder_pb2.BlockMessage() - msg.uniform_vector_value.i_value.value.extend([12, -2, 3]) + msg.uniform_vector_value.i_value.value.extend([12, -65500, 3]) msg.uniform_vector_value.i_value.size = starcoder_pb2.Size32 - expected = pmt.init_s32vector(3, [12, -2, 3]) + expected = pmt.init_s32vector(3, [12, -65500, 3]) self.tb.start() cs.push(msg.SerializeToString()) diff --git a/sampleclient/main.go b/sampleclient/main.go index 2c5d5e00..7cfe44b3 100644 --- a/sampleclient/main.go +++ b/sampleclient/main.go @@ -22,6 +22,7 @@ package main import ( "context" "fmt" + "github.com/gogo/protobuf/proto" pb "github.com/infostellarinc/starcoder/api" "google.golang.org/grpc" "io" @@ -52,19 +53,19 @@ func main() { log.Println("error receiving!") log.Fatalf("%v", err) } - if len(r.GetPayload()) > 20 { - log.Println(r.GetBlockId(), len(r.GetPayload())) + if len(r.GetPayload()) > 50 || proto.Size(r.GetPmt()) > 50 { + log.Println(r.GetBlockId(), len(r.GetPayload()), proto.Size(r.GetPmt())) } else { - log.Println(r.GetBlockId(), r.GetPayload()) + log.Println(r.GetBlockId(), r.GetPayload(), r.GetPmt()) } if r.GetBlockId() == "starcoder_waterfall_sink_0" { - ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", r.GetPayload(), 0644) + ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", r.GetPmt().GetBlobValue(), 0644) } if r.GetBlockId() == "noaa_apt_decoded" { - ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", r.GetPayload(), 0644) + ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", r.GetPmt().GetBlobValue(), 0644) } if r.GetBlockId() == "meteor_decoder_sink" { - ioutil.WriteFile(fmt.Sprintf("/home/rei/sampleAR2300IQ/meteor_decoded_%v.png", meteor_decoder_sink_idx), r.GetPayload(), 0644) + ioutil.WriteFile(fmt.Sprintf("/home/rei/sampleAR2300IQ/meteor_decoded_%v.png", meteor_decoder_sink_idx), r.GetPmt().GetBlobValue(), 0644) meteor_decoder_sink_idx++ } } @@ -83,7 +84,7 @@ func main() { time.Sleep(1 * time.Second) commandReq := &pb.SendCommandRequest{ BlockId: "starcoder_command_source_0", - Pmt: constructPDU(), + Pmt: constructPDU(), } req = &pb.RunFlowgraphRequest{ Request: &pb.RunFlowgraphRequest_SendCommandRequest{ @@ -96,7 +97,7 @@ func main() { time.Sleep(4 * time.Second) commandReq = &pb.SendCommandRequest{ BlockId: "starcoder_command_source_1", - Pmt: constructPDU(), + Pmt: constructPDU(), } req = &pb.RunFlowgraphRequest{ Request: &pb.RunFlowgraphRequest_SendCommandRequest{ @@ -148,3 +149,13 @@ func constructU8Vector() *pb.BlockMessage { MessageOneof: &pb.BlockMessage_UniformVectorValue{pmtUVector}, } } + +func convertUint32SliceToByteSlice(in []uint32) []byte { + out := make([]byte, len(in)) + var v uint32 + var i int + for i, v = range in { + out[i] = byte(v) + } + return out +} diff --git a/server/starcoder.go b/server/starcoder.go index 8c1775c4..df9e3436 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -85,9 +85,9 @@ func NewStarcoderServer(flowgraphDir string, perfCtrInterval time.Duration, sile deregisterStreamHandler: make(chan *streamHandler), closeAllStreamsChannel: make(chan chan bool), filepathToModAndClassName: make(map[string]*moduleAndClassNames), - log: log, - perfCtrInterval: perfCtrInterval, - silencedCommandBlocks: silencedCommandBlocksMap, + log: log, + perfCtrInterval: perfCtrInterval, + silencedCommandBlocks: silencedCommandBlocksMap, } tempDir, err := ioutil.TempDir("", "starcoder") @@ -163,7 +163,7 @@ type streamHandler struct { perfCtrStopChannel chan struct{} clientFinished bool streamError error - mustCloseMutex sync.Mutex + errorMutex sync.Mutex wg sync.WaitGroup log *zap.SugaredLogger } @@ -176,7 +176,7 @@ func newStreamHandler(sc *Starcoder, stream pb.Starcoder_RunFlowgraphServer, flo perfCtrStopChannel: make(chan struct{}), clientFinished: false, streamError: nil, - mustCloseMutex: sync.Mutex{}, + errorMutex: sync.Mutex{}, wg: sync.WaitGroup{}, log: log, } @@ -239,37 +239,31 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString sh.wg.Add(1) defer sh.wg.Done() for { - // TODO: Convert PMT to a gRPC native data structure. - // Use built-in PMT serialization for now. bytes := []byte(q.BlockingPop()) - if len(bytes) > 10485760 { - sh.log.Errorw( - "Length of packet received from Starcoder much bigger than expected.", - "block", blockName, "size", len(bytes)) - } else if len(bytes) != 0 { - // Could be woken up spuriously or by something else calling q.Close() - if err := sh.stream.Send(&pb.RunFlowgraphResponse{ - BlockId: blockName, - Payload: bytes, - }); err != nil { + + if len(bytes) != 0 { // Could be woken up spuriously or by something else calling q.Close() + response, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) + if err != nil { + sh.log.Errorw("Error constructing flowgraph response", "error", err) + continue + } + + if err := sh.stream.Send(response); err != nil { sh.log.Errorf("Error sending stream: %v", err) sh.finish(err) return } } - if sh.finished() { + + if q.Closed() { // Send the rest of the bytes if any are left for bytes = []byte(q.Pop()); len(bytes) != 0; bytes = []byte(q.Pop()) { - if len(bytes) > 10485760 { - sh.log.Errorw( - "Length of packet received from Starcoder much bigger than expected.", - "block", blockName, "size", len(bytes)) + response, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) + if err != nil { + sh.log.Errorw("Error constructing flowgraph response", "error", err) continue } - if err := sh.stream.Send(&pb.RunFlowgraphResponse{ - BlockId: blockName, - Payload: bytes, - }); err != nil { + if err := sh.stream.Send(response); err != nil { sh.log.Errorf("Error sending stream: %v", err) } } @@ -347,8 +341,8 @@ func (sh *streamHandler) Wait() { // Called within streamHandler whenever an error happens or client finished. // Signals starcoder server that it needs to end this stream handler func (sh *streamHandler) finish(err error) { - sh.mustCloseMutex.Lock() - defer sh.mustCloseMutex.Unlock() + sh.errorMutex.Lock() + defer sh.errorMutex.Unlock() if err == nil { sh.clientFinished = true } @@ -358,13 +352,6 @@ func (sh *streamHandler) finish(err error) { }() } -// Called by goroutines of streamHandler when they want to know when to close -func (sh *streamHandler) finished() bool { - sh.mustCloseMutex.Lock() - defer sh.mustCloseMutex.Unlock() - return sh.clientFinished || sh.streamError != nil -} - // Must only be called by starcoder server func (sh *streamHandler) Close() { defer func() { @@ -778,3 +765,22 @@ func getExceptionString() string { return fmt.Sprintf("Exception: %v \n Value: %v ", safeAsString(exc), safeAsString(val)) } + +func constructFlowgraphResponseFromSerializedPMT(blockName string, serialized []byte) (*pb.RunFlowgraphResponse, error) { + message := &pb.BlockMessage{} + err := proto.Unmarshal(serialized, message) + if err != nil { + return nil, err + } + + response := &pb.RunFlowgraphResponse{ + BlockId: blockName, + Pmt: message, + } + + if proto.Size(response) > 10485670 { + return nil, errors.New(fmt.Sprintf("Length of request message from Starcoder much bigger than expected. Block name: %v, Message size: %v", blockName, proto.Size(response))) + } + + return response, nil +}