From 93c33b5f0a619077f8060e83b920f8b80b4a620a Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Wed, 29 Aug 2018 14:07:12 +0900 Subject: [PATCH 01/26] rename convert_pmt_proto to convert_proto_to_pmt --- gr-starcoder/lib/command_source_impl.cc | 2 +- gr-starcoder/lib/proto_to_pmt.cc | 122 ++++++++++++------------ gr-starcoder/lib/proto_to_pmt.h | 2 +- 3 files changed, 63 insertions(+), 63 deletions(-) diff --git a/gr-starcoder/lib/command_source_impl.cc b/gr-starcoder/lib/command_source_impl.cc index 9cccbc8a..9dae3606 100644 --- a/gr-starcoder/lib/command_source_impl.cc +++ b/gr-starcoder/lib/command_source_impl.cc @@ -68,7 +68,7 @@ void command_source_impl::readloop() { GR_LOG_WARN(d_logger, "Failed to deserialize gRPC"); continue; } - pmt::pmt_t m = convert_pmt_proto(grpc_pmt); + pmt::pmt_t m = convert_proto_to_pmt(grpc_pmt); message_port_pub(port_, m); } } diff --git a/gr-starcoder/lib/proto_to_pmt.cc b/gr-starcoder/lib/proto_to_pmt.cc index c4971e66..b74f2615 100644 --- a/gr-starcoder/lib/proto_to_pmt.cc +++ b/gr-starcoder/lib/proto_to_pmt.cc @@ -20,7 +20,7 @@ #include "proto_to_pmt.h" -pmt::pmt_t convert_pmt_proto(const starcoder::BlockMessage &proto_msg) { +pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg) { starcoder::BlockMessage::MessageOneofCase type = proto_msg.message_oneof_case(); switch (type) { @@ -36,8 +36,8 @@ pmt::pmt_t convert_pmt_proto(const starcoder::BlockMessage &proto_msg) { return pmt::from_complex(proto_msg.complex_value().real_value(), proto_msg.complex_value().imaginary_value()); case starcoder::BlockMessage::MessageOneofCase::kPairValue: - return pmt::cons(convert_pmt_proto(proto_msg.pair_value().car()), - convert_pmt_proto(proto_msg.pair_value().cdr())); + return pmt::cons(convert_proto_to_pmt(proto_msg.pair_value().car()), + convert_proto_to_pmt(proto_msg.pair_value().cdr())); case starcoder::BlockMessage::MessageOneofCase::kListValue: return convert_pmt_list(proto_msg.list_value()); case starcoder::BlockMessage::MessageOneofCase::kUniformVectorValue: @@ -55,70 +55,70 @@ pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list) { case 0: return pmt::make_tuple(); case 1: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0))); case 2: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1))); case 3: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2))); case 4: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3))); case 5: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4))); case 6: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5))); case 7: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6))); case 8: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6)), - convert_pmt_proto(proto_pmt_list.value(7))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6)), + convert_proto_to_pmt(proto_pmt_list.value(7))); case 9: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6)), - convert_pmt_proto(proto_pmt_list.value(7)), - convert_pmt_proto(proto_pmt_list.value(8))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6)), + convert_proto_to_pmt(proto_pmt_list.value(7)), + convert_proto_to_pmt(proto_pmt_list.value(8))); case 10: - return pmt::make_tuple(convert_pmt_proto(proto_pmt_list.value(0)), - convert_pmt_proto(proto_pmt_list.value(1)), - convert_pmt_proto(proto_pmt_list.value(2)), - convert_pmt_proto(proto_pmt_list.value(3)), - convert_pmt_proto(proto_pmt_list.value(4)), - convert_pmt_proto(proto_pmt_list.value(5)), - convert_pmt_proto(proto_pmt_list.value(6)), - convert_pmt_proto(proto_pmt_list.value(7)), - convert_pmt_proto(proto_pmt_list.value(8)), - convert_pmt_proto(proto_pmt_list.value(9))); + return pmt::make_tuple(convert_proto_to_pmt(proto_pmt_list.value(0)), + convert_proto_to_pmt(proto_pmt_list.value(1)), + convert_proto_to_pmt(proto_pmt_list.value(2)), + convert_proto_to_pmt(proto_pmt_list.value(3)), + convert_proto_to_pmt(proto_pmt_list.value(4)), + convert_proto_to_pmt(proto_pmt_list.value(5)), + convert_proto_to_pmt(proto_pmt_list.value(6)), + convert_proto_to_pmt(proto_pmt_list.value(7)), + convert_proto_to_pmt(proto_pmt_list.value(8)), + convert_proto_to_pmt(proto_pmt_list.value(9))); default: throw("PMT tuple sizes >10 not supported"); } @@ -126,7 +126,7 @@ pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list) { pmt::pmt_t vec = pmt::make_vector(proto_pmt_list.value_size(), pmt::get_PMT_NIL()); for (int i = 0; i < proto_pmt_list.value_size(); i++) { - pmt::vector_set(vec, i, convert_pmt_proto(proto_pmt_list.value(i))); + pmt::vector_set(vec, i, convert_proto_to_pmt(proto_pmt_list.value(i))); } return vec; } else @@ -246,8 +246,8 @@ pmt::pmt_t convert_pmt_uniform_vector( pmt::pmt_t convert_pmt_dict(const starcoder::Dict &proto_pmt_dict) { pmt::pmt_t dict = pmt::make_dict(); for (const starcoder::Dict_Entry &entry : proto_pmt_dict.entry()) { - dict = pmt::dict_add(dict, convert_pmt_proto(entry.key()), - convert_pmt_proto(entry.value())); + dict = pmt::dict_add(dict, convert_proto_to_pmt(entry.key()), + convert_proto_to_pmt(entry.value())); } return dict; } diff --git a/gr-starcoder/lib/proto_to_pmt.h b/gr-starcoder/lib/proto_to_pmt.h index 24862a11..43f8139e 100644 --- a/gr-starcoder/lib/proto_to_pmt.h +++ b/gr-starcoder/lib/proto_to_pmt.h @@ -24,7 +24,7 @@ #include #include "starcoder.pb.h" -pmt::pmt_t convert_pmt_proto(const starcoder::BlockMessage &proto_msg); +pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg); pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list); pmt::pmt_t convert_pmt_uniform_vector( const starcoder::UniformVector &proto_pmt_uniform_vector); From 68e50c0fea66b891747173fc4d6c5b63202cfd0f Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Wed, 29 Aug 2018 18:58:11 +0900 Subject: [PATCH 02/26] init commit of pmt_to_proto --- gr-starcoder/lib/CMakeLists.txt | 1 + gr-starcoder/lib/pmt_to_proto.cc | 115 +++++++++++++++++++++++ gr-starcoder/lib/pmt_to_proto.h | 30 ++++++ gr-starcoder/python/qa_command_source.py | 4 +- 4 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 gr-starcoder/lib/pmt_to_proto.cc create mode 100644 gr-starcoder/lib/pmt_to_proto.h diff --git a/gr-starcoder/lib/CMakeLists.txt b/gr-starcoder/lib/CMakeLists.txt index 9cfd217d..276ce15c 100644 --- a/gr-starcoder/lib/CMakeLists.txt +++ b/gr-starcoder/lib/CMakeLists.txt @@ -60,6 +60,7 @@ list(APPEND starcoder_sources ${CMAKE_CURRENT_SOURCE_DIR}/../../cqueue/string_queue.cc ${hw_proto_srcs} proto_to_pmt.cc + pmt_to_proto.cc init.c driver.c firmware.c diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc new file mode 100644 index 00000000..06693ae4 --- /dev/null +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -0,0 +1,115 @@ +/* -*- c++ -*- */ +/* + * Copyright 2018 Infostellar, Inc. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#include "pmt_to_proto.h" + +void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg) { + if (pmt::is_bool(pmt_msg)) { + proto_msg->set_boolean_value(pmt::to_bool(pmt_msg)); + } else if (pmt::is_symbol(pmt_msg)) { + proto_msg->set_symbol_value(pmt::symbol_to_string(pmt_msg)); + } else if (pmt::is_integer(pmt_msg)) { + proto_msg->set_integer_value(pmt::to_long(pmt_msg)); + } else if (pmt::is_uint64(pmt_msg)) { + proto_msg->set_integer_value(pmt::to_uint64(pmt_msg)); + } else if (pmt::is_real(pmt_msg)) { + proto_msg->set_double_value(pmt::to_double(pmt_msg)); + } else if (pmt::is_complex(pmt_msg)) { + std::complex val = pmt::to_complex(pmt_msg); + starcoder::Complex *complex = proto_msg->mutable_complex_value(); + complex->set_real_value(val.real()); + complex->set_imaginary_value(val.imag()); + } else if (pmt::is_pair(pmt_msg)) { + starcoder::Pair *pair = proto_msg->mutable_pair_value(); + starcoder::BlockMessage *car = pair->mutable_car(); + starcoder::BlockMessage *cdr = pair->mutable_cdr(); + convert_pmt_to_proto(pmt::car(pmt_msg), car); + convert_pmt_to_proto(pmt::cdr(pmt_msg), cdr); + } else if (pmt::is_tuple(pmt_msg)) { + starcoder::List *list = proto_msg->mutable_list_value(); + list->set_type(starcoder::List::TUPLE); + for (int i=0; iadd_value(); + convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i), element); + } + } else if (pmt::is_vector(pmt_msg)) { + starcoder::List *list = proto_msg->mutable_list_value(); + list->set_type(starcoder::List::VECTOR); + for (int i=0; iadd_value(); + convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i), element); + } + } else if (pmt::is_dict(pmt_msg)) { + starcoder::Dict *dict = proto_msg->mutable_dict_value(); + pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg); + for (int i=0; iadd_entry(); + starcoder::BlockMessage *key = entry->mutable_key(); + starcoder::BlockMessage *value = entry->mutable_value(); + convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)), key); + convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)), value); + } + } else if (pmt::is_uniform_vector(pmt_msg)) { + starcoder::UniformVector *uni_vector = proto_msg->mutable_uniform_vector_value(); + convert_proto_uniform_vector(pmt_msg, uni_vector); + } +} + +void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformVector *uni_vector) { + if (pmt::is_u8vector(pmt_msg)) { + starcoder::UVector *u_vector = uni_vector->mutable_u_value(); + u_vector->set_size(starcoder::IntSize::Size8); + const std::vector < uint8_t > vector_elements = pmt::u8vector_elements(pmt_msg); + *u_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_s8vector(pmt_msg)) { + starcoder::IVector *i_vector = uni_vector->mutable_i_value(); + i_vector->set_size(starcoder::IntSize::Size8); + const std::vector vector_elements = pmt::s8vector_elements(pmt_msg); + *i_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_u16vector(pmt_msg)) { + starcoder::UVector *u_vector = uni_vector->mutable_u_value(); + u_vector->set_size(starcoder::IntSize::Size16); + const std::vector < uint16_t > vector_elements = pmt::u16vector_elements(pmt_msg); + *u_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_s16vector(pmt_msg)) { + starcoder::IVector *i_vector = uni_vector->mutable_i_value(); + i_vector->set_size(starcoder::IntSize::Size16); + const std::vector vector_elements = pmt::s16vector_elements(pmt_msg); + *i_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_u32vector(pmt_msg)) { + starcoder::UVector *u_vector = uni_vector->mutable_u_value(); + u_vector->set_size(starcoder::IntSize::Size32); + const std::vector < uint32_t > vector_elements = pmt::u32vector_elements(pmt_msg); + *u_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_s32vector(pmt_msg)) { + starcoder::IVector *i_vector = uni_vector->mutable_i_value(); + i_vector->set_size(starcoder::IntSize::Size32); + const std::vector vector_elements = pmt::s32vector_elements(pmt_msg); + *i_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_u64vector(pmt_msg)) { + starcoder::U64Vector *u64_vector = uni_vector->mutable_u64_value(); + const std::vector vector_elements = pmt::u64vector_elements(pmt_msg); + *u64_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_s64vector(pmt_msg)) { + starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value(); + const std::vector vector_elements = pmt::s64vector_elements(pmt_msg); + *i64_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } +} diff --git a/gr-starcoder/lib/pmt_to_proto.h b/gr-starcoder/lib/pmt_to_proto.h new file mode 100644 index 00000000..1a74698a --- /dev/null +++ b/gr-starcoder/lib/pmt_to_proto.h @@ -0,0 +1,30 @@ +/* -*- c++ -*- */ +/* + * Copyright 2018 Infostellar, Inc. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_PMT_TO_PROTO_H +#define INCLUDED_PMT_TO_PROTO_H + +#include +#include "starcoder.pb.h" + +void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg); +void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformVector *uni_vector); + +#endif /* INCLUDED_PMT_TO_PROTO_H */ diff --git a/gr-starcoder/python/qa_command_source.py b/gr-starcoder/python/qa_command_source.py index 76681f23..97881282 100755 --- a/gr-starcoder/python/qa_command_source.py +++ b/gr-starcoder/python/qa_command_source.py @@ -130,10 +130,10 @@ def test_i32_vector(self): self.tb.msg_connect((cs, 'out'), (snk, 'store')) msg = starcoder_pb2.BlockMessage() - msg.uniform_vector_value.i_value.value.extend([12, -2, 3]) + msg.uniform_vector_value.i_value.value.extend([12, -65500, 3]) msg.uniform_vector_value.i_value.size = starcoder_pb2.Size32 - expected = pmt.init_s32vector(3, [12, -2, 3]) + expected = pmt.init_s32vector(3, [12, -65500, 3]) self.tb.start() cs.push(msg.SerializeToString()) From 3b353b5714b11312d64373930eff7940d745e566 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Wed, 29 Aug 2018 19:36:31 +0900 Subject: [PATCH 03/26] finish pmt_to_proto? --- gr-starcoder/lib/pmt_to_proto.cc | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index 06693ae4..91dc4600 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -111,5 +111,37 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformV starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value(); const std::vector vector_elements = pmt::s64vector_elements(pmt_msg); *i64_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_f32vector(pmt_msg)) { + starcoder::F32Vector *f32_vector = uni_vector->mutable_f32_value(); + const std::vector vector_elements = pmt::f32vector_elements(pmt_msg); + *f32_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_f64vector(pmt_msg)) { + starcoder::F64Vector *f64_vector = uni_vector->mutable_f64_value(); + const std::vector vector_elements = pmt::f64vector_elements(pmt_msg); + *f64_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + } else if (pmt::is_c32vector(pmt_msg)) { + starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value(); + const std::vector> vector_elements = pmt::c32vector_elements(pmt_msg); + std::transform(vector_elements.begin(), + vector_elements.end(), + c32_vector->mutable_value()->begin(), + [](std::complex c)->starcoder::Complex32 { + starcoder::Complex32 new_val; + new_val.set_real_value(c.real()); + new_val.set_imaginary_value(c.imag()); + return new_val; + }); + } else if (pmt::is_c64vector(pmt_msg)) { + starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value(); + const std::vector> vector_elements = pmt::c64vector_elements(pmt_msg); + std::transform(vector_elements.begin(), + vector_elements.end(), + c64_vector->mutable_value()->begin(), + [](std::complex c)->starcoder::Complex { + starcoder::Complex new_val; + new_val.set_real_value(c.real()); + new_val.set_imaginary_value(c.imag()); + return new_val; + }); } } From c7b66535ac08d5ec478f3e410a29e87fdeda5400 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Wed, 29 Aug 2018 19:41:50 +0900 Subject: [PATCH 04/26] clang --- gr-starcoder/lib/pmt_to_proto.cc | 87 ++++++++++++++++++++------------ gr-starcoder/lib/pmt_to_proto.h | 6 ++- 2 files changed, 58 insertions(+), 35 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index 91dc4600..f9438eb6 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -20,8 +20,13 @@ #include "pmt_to_proto.h" -void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg) { - if (pmt::is_bool(pmt_msg)) { +void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, + starcoder::BlockMessage *proto_msg) { + if (pmt::is_uniform_vector(pmt_msg)) { + starcoder::UniformVector *uni_vector = + proto_msg->mutable_uniform_vector_value(); + convert_proto_uniform_vector(pmt_msg, uni_vector); + } else if (pmt::is_bool(pmt_msg)) { proto_msg->set_boolean_value(pmt::to_bool(pmt_msg)); } else if (pmt::is_symbol(pmt_msg)) { proto_msg->set_symbol_value(pmt::symbol_to_string(pmt_msg)); @@ -45,85 +50,101 @@ void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *pr } else if (pmt::is_tuple(pmt_msg)) { starcoder::List *list = proto_msg->mutable_list_value(); list->set_type(starcoder::List::TUPLE); - for (int i=0; iadd_value(); convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i), element); } } else if (pmt::is_vector(pmt_msg)) { starcoder::List *list = proto_msg->mutable_list_value(); list->set_type(starcoder::List::VECTOR); - for (int i=0; iadd_value(); convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i), element); } } else if (pmt::is_dict(pmt_msg)) { starcoder::Dict *dict = proto_msg->mutable_dict_value(); pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg); - for (int i=0; iadd_entry(); starcoder::BlockMessage *key = entry->mutable_key(); starcoder::BlockMessage *value = entry->mutable_value(); convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)), key); convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)), value); } - } else if (pmt::is_uniform_vector(pmt_msg)) { - starcoder::UniformVector *uni_vector = proto_msg->mutable_uniform_vector_value(); - convert_proto_uniform_vector(pmt_msg, uni_vector); } } -void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformVector *uni_vector) { +void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, + starcoder::UniformVector *uni_vector) { if (pmt::is_u8vector(pmt_msg)) { starcoder::UVector *u_vector = uni_vector->mutable_u_value(); u_vector->set_size(starcoder::IntSize::Size8); - const std::vector < uint8_t > vector_elements = pmt::u8vector_elements(pmt_msg); - *u_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::u8vector_elements(pmt_msg); + *u_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_s8vector(pmt_msg)) { starcoder::IVector *i_vector = uni_vector->mutable_i_value(); i_vector->set_size(starcoder::IntSize::Size8); const std::vector vector_elements = pmt::s8vector_elements(pmt_msg); - *i_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + *i_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_u16vector(pmt_msg)) { starcoder::UVector *u_vector = uni_vector->mutable_u_value(); u_vector->set_size(starcoder::IntSize::Size16); - const std::vector < uint16_t > vector_elements = pmt::u16vector_elements(pmt_msg); - *u_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::u16vector_elements(pmt_msg); + *u_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_s16vector(pmt_msg)) { starcoder::IVector *i_vector = uni_vector->mutable_i_value(); i_vector->set_size(starcoder::IntSize::Size16); - const std::vector vector_elements = pmt::s16vector_elements(pmt_msg); - *i_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::s16vector_elements(pmt_msg); + *i_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_u32vector(pmt_msg)) { starcoder::UVector *u_vector = uni_vector->mutable_u_value(); u_vector->set_size(starcoder::IntSize::Size32); - const std::vector < uint32_t > vector_elements = pmt::u32vector_elements(pmt_msg); - *u_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::u32vector_elements(pmt_msg); + *u_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_s32vector(pmt_msg)) { starcoder::IVector *i_vector = uni_vector->mutable_i_value(); i_vector->set_size(starcoder::IntSize::Size32); - const std::vector vector_elements = pmt::s32vector_elements(pmt_msg); - *i_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::s32vector_elements(pmt_msg); + *i_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_u64vector(pmt_msg)) { starcoder::U64Vector *u64_vector = uni_vector->mutable_u64_value(); - const std::vector vector_elements = pmt::u64vector_elements(pmt_msg); - *u64_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::u64vector_elements(pmt_msg); + *u64_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_s64vector(pmt_msg)) { starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value(); - const std::vector vector_elements = pmt::s64vector_elements(pmt_msg); - *i64_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::s64vector_elements(pmt_msg); + *i64_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_f32vector(pmt_msg)) { starcoder::F32Vector *f32_vector = uni_vector->mutable_f32_value(); const std::vector vector_elements = pmt::f32vector_elements(pmt_msg); - *f32_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + *f32_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_f64vector(pmt_msg)) { starcoder::F64Vector *f64_vector = uni_vector->mutable_f64_value(); - const std::vector vector_elements = pmt::f64vector_elements(pmt_msg); - *f64_vector->mutable_value() = {vector_elements.begin(), vector_elements.end()}; + const std::vector vector_elements = + pmt::f64vector_elements(pmt_msg); + *f64_vector->mutable_value() = { vector_elements.begin(), + vector_elements.end() }; } else if (pmt::is_c32vector(pmt_msg)) { starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value(); - const std::vector> vector_elements = pmt::c32vector_elements(pmt_msg); - std::transform(vector_elements.begin(), - vector_elements.end(), + const std::vector> vector_elements = + pmt::c32vector_elements(pmt_msg); + std::transform(vector_elements.begin(), vector_elements.end(), c32_vector->mutable_value()->begin(), [](std::complex c)->starcoder::Complex32 { starcoder::Complex32 new_val; @@ -133,9 +154,9 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformV }); } else if (pmt::is_c64vector(pmt_msg)) { starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value(); - const std::vector> vector_elements = pmt::c64vector_elements(pmt_msg); - std::transform(vector_elements.begin(), - vector_elements.end(), + const std::vector> vector_elements = + pmt::c64vector_elements(pmt_msg); + std::transform(vector_elements.begin(), vector_elements.end(), c64_vector->mutable_value()->begin(), [](std::complex c)->starcoder::Complex { starcoder::Complex new_val; diff --git a/gr-starcoder/lib/pmt_to_proto.h b/gr-starcoder/lib/pmt_to_proto.h index 1a74698a..b1de532f 100644 --- a/gr-starcoder/lib/pmt_to_proto.h +++ b/gr-starcoder/lib/pmt_to_proto.h @@ -24,7 +24,9 @@ #include #include "starcoder.pb.h" -void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg); -void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformVector *uni_vector); +void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, + starcoder::BlockMessage *proto_msg); +void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, + starcoder::UniformVector *uni_vector); #endif /* INCLUDED_PMT_TO_PROTO_H */ From 044f5adb9b09ec3bd770c2243f9825119271435a Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Wed, 29 Aug 2018 19:50:48 +0900 Subject: [PATCH 05/26] update proto --- api/starcoder.proto | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/starcoder.proto b/api/starcoder.proto index 23829f11..eeb2a1b7 100644 --- a/api/starcoder.proto +++ b/api/starcoder.proto @@ -203,7 +203,10 @@ message RunFlowgraphResponse { string block_id = 1; // Serialized arbitrary PMT (GNURadio Polymorphic Message Type) - bytes payload = 2; + bytes payload = 2 [deprecated=true]; + + // PMT (GNURadio Polymorphic Message Type) response sent from the block + BlockMessage pmt = 3; } // Complex number From c873e2df0eddfe26c19efa8202b8aa9ca433d90d Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Wed, 29 Aug 2018 20:43:42 +0900 Subject: [PATCH 06/26] convert in enqueue_message_sink --- gr-starcoder/lib/enqueue_message_sink_impl.cc | 7 ++++++- gr-starcoder/lib/qa_enqueue_message_sink.cc | 10 ++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/gr-starcoder/lib/enqueue_message_sink_impl.cc b/gr-starcoder/lib/enqueue_message_sink_impl.cc index dd5f5557..6a31372a 100644 --- a/gr-starcoder/lib/enqueue_message_sink_impl.cc +++ b/gr-starcoder/lib/enqueue_message_sink_impl.cc @@ -25,6 +25,8 @@ #include #include "enqueue_message_sink_impl.h" +#include "pmt_to_proto.h" + namespace gr { namespace starcoder { @@ -51,7 +53,10 @@ enqueue_message_sink_impl::~enqueue_message_sink_impl() {} void enqueue_message_sink_impl::handler(pmt::pmt_t msg) { if (string_queue_ != NULL) { - std::string serialized = pmt::serialize_str(msg); + ::starcoder::BlockMessage grpc_pmt; + convert_pmt_to_proto(msg, &grpc_pmt); + std::string serialized = grpc_pmt.SerializeAsString(); + if (serialized.length() > 10485760) { GR_LOG_ERROR(d_logger, boost::format("Received large packet of length %d in " diff --git a/gr-starcoder/lib/qa_enqueue_message_sink.cc b/gr-starcoder/lib/qa_enqueue_message_sink.cc index 8455b132..720d09f6 100644 --- a/gr-starcoder/lib/qa_enqueue_message_sink.cc +++ b/gr-starcoder/lib/qa_enqueue_message_sink.cc @@ -85,10 +85,12 @@ void qa_enqueue_message_sink::test_registered_queue() { tb->stop(); tb->wait(); - // Check that the passed PMT blobs are accessible from the queue - // in the expected serialized binary format. - CPPUNIT_ASSERT_EQUAL(q.pop(), pmt::serialize_str(pmt::make_u8vector(10, 97))); - CPPUNIT_ASSERT_EQUAL(q.pop(), pmt::serialize_str(pmt::make_u8vector(3, 98))); + // TODO (rei): Make sure the contents of the returned string are in the + // expected + // binary format. Currently I'm unable to include the protobuf class in C++ + // tests. + CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 16); + CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 9); // Check that after retrieving all available messages, the queue // returns the empty string. From e62c48d1c26bdafa3957983ebf25eb3276084a9d Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 11:50:50 +0900 Subject: [PATCH 07/26] update starcoder to parse the protobufs --- sampleclient/main.go | 7 ++++--- server/starcoder.go | 9 +++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sampleclient/main.go b/sampleclient/main.go index 2c5d5e00..11377720 100644 --- a/sampleclient/main.go +++ b/sampleclient/main.go @@ -28,6 +28,7 @@ import ( "io/ioutil" "log" "time" + "github.com/gogo/protobuf/proto" ) func main() { @@ -52,10 +53,10 @@ func main() { log.Println("error receiving!") log.Fatalf("%v", err) } - if len(r.GetPayload()) > 20 { - log.Println(r.GetBlockId(), len(r.GetPayload())) + if len(r.GetPayload()) > 20 || proto.Size(r.GetPmt()) > 20 { + log.Println(r.GetBlockId(), len(r.GetPayload()), proto.Size(r.GetPmt())) } else { - log.Println(r.GetBlockId(), r.GetPayload()) + log.Println(r.GetBlockId(), r.GetPayload(), r.GetPmt()) } if r.GetBlockId() == "starcoder_waterfall_sink_0" { ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", r.GetPayload(), 0644) diff --git a/server/starcoder.go b/server/starcoder.go index b9106f66..f4cada1a 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -230,9 +230,12 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString sh.wg.Add(1) defer sh.wg.Done() for { - // TODO: Convert PMT to a gRPC native data structure. - // Use built-in PMT serialization for now. bytes := []byte(q.BlockingPop()) + message := &pb.BlockMessage{} + err := proto.Unmarshal(bytes, message) + if err != nil { + sh.log.Errorw("Failed to unmarshal protobuf", "block", blockName) + } if len(bytes) > 10485760 { sh.log.Errorw( "Length of packet received from Starcoder much bigger than expected.", @@ -242,6 +245,7 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString if err := sh.stream.Send(&pb.RunFlowgraphResponse{ BlockId: blockName, Payload: bytes, + Pmt: message, }); err != nil { sh.log.Errorf("Error sending stream: %v", err) sh.finish(err) @@ -260,6 +264,7 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString if err := sh.stream.Send(&pb.RunFlowgraphResponse{ BlockId: blockName, Payload: bytes, + Pmt: message, }); err != nil { sh.log.Errorf("Error sending stream: %v", err) } From dc1308a18fa3abb1096197a15f004789eca7e0a5 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 12:09:14 +0900 Subject: [PATCH 08/26] modify waterfall plotter to pass up protobuf --- gr-starcoder/lib/qa_enqueue_message_sink.cc | 2 +- gr-starcoder/lib/waterfall_plotter_impl.cc | 11 +++++++++-- sampleclient/main.go | 12 +++++++++++- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/gr-starcoder/lib/qa_enqueue_message_sink.cc b/gr-starcoder/lib/qa_enqueue_message_sink.cc index 720d09f6..502a66f0 100644 --- a/gr-starcoder/lib/qa_enqueue_message_sink.cc +++ b/gr-starcoder/lib/qa_enqueue_message_sink.cc @@ -85,7 +85,7 @@ void qa_enqueue_message_sink::test_registered_queue() { tb->stop(); tb->wait(); - // TODO (rei): Make sure the contents of the returned string are in the + // TODO (rei): Explicitly check the contents of the returned string are in the // expected // binary format. Currently I'm unable to include the protobuf class in C++ // tests. diff --git a/gr-starcoder/lib/waterfall_plotter_impl.cc b/gr-starcoder/lib/waterfall_plotter_impl.cc index 0840b7bb..5892b5a0 100644 --- a/gr-starcoder/lib/waterfall_plotter_impl.cc +++ b/gr-starcoder/lib/waterfall_plotter_impl.cc @@ -30,6 +30,8 @@ #include "numpy/arrayobject.h" #include "waterfall_plotter_impl.h" +#include "pmt_to_proto.h" + namespace gr { namespace starcoder { @@ -152,8 +154,13 @@ bool waterfall_plotter_impl::stop() { Py_ssize_t image_size = PyString_Size(result); char *image_buffer = PyString_AsString(result); if (image_buffer == NULL) goto error; - const std::string image_binary(image_buffer, image_size); - string_queue_->push(image_binary); + + pmt::pmt_t blob = pmt::make_blob(image_buffer, image_size); + ::starcoder::BlockMessage grpc_pmt; + convert_pmt_to_proto(blob, &grpc_pmt); + + const std::string serialized = grpc_pmt.SerializeAsString(); + string_queue_->push(serialized); } error: diff --git a/sampleclient/main.go b/sampleclient/main.go index 11377720..09c21c01 100644 --- a/sampleclient/main.go +++ b/sampleclient/main.go @@ -59,7 +59,7 @@ func main() { log.Println(r.GetBlockId(), r.GetPayload(), r.GetPmt()) } if r.GetBlockId() == "starcoder_waterfall_sink_0" { - ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", r.GetPayload(), 0644) + ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", convertUint32SliceToByteSlice(r.GetPmt().GetUniformVectorValue().GetUValue().GetValue()), 0644) } if r.GetBlockId() == "noaa_apt_decoded" { ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", r.GetPayload(), 0644) @@ -149,3 +149,13 @@ func constructU8Vector() *pb.BlockMessage { MessageOneof: &pb.BlockMessage_UniformVectorValue{pmtUVector}, } } + +func convertUint32SliceToByteSlice(in []uint32) []byte { + out := make([]byte, len(in)) + var v uint32 + var i int + for i, v = range in { + out[i] = byte(v) + } + return out +} From 2efc6e4617bc0ede0b0a3390ce4b63318d27213d Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 13:45:36 +0900 Subject: [PATCH 09/26] update noaa --- gr-starcoder/lib/noaa_apt_sink_impl.cc | 7 ++++++- gr-starcoder/lib/pmt_to_proto.cc | 5 +++++ gr-starcoder/lib/pmt_to_proto.h | 1 + sampleclient/main.go | 2 +- 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/gr-starcoder/lib/noaa_apt_sink_impl.cc b/gr-starcoder/lib/noaa_apt_sink_impl.cc index b62e031e..fc6edea5 100644 --- a/gr-starcoder/lib/noaa_apt_sink_impl.cc +++ b/gr-starcoder/lib/noaa_apt_sink_impl.cc @@ -47,6 +47,7 @@ #include #include "gil_util.h" +#include "pmt_to_proto.h" namespace gr { namespace starcoder { @@ -130,7 +131,11 @@ void noaa_apt_sink_impl::write_image(std::string filename) { else image_string = store_gray_to_png_string(flipped_up_down_view(image_received_view_)); - if (!image_string.empty()) string_queue_->push(image_string); + if (!image_string.empty()) { + ::starcoder::BlockMessage grpc_pmt; + convert_string_to_proto(image_string, &grpc_pmt); + string_queue_->push(grpc_pmt.SerializeAsString()); + } } } diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index f9438eb6..bbbb5149 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -166,3 +166,8 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, }); } } + +void convert_string_to_proto(const std::string &in, starcoder::BlockMessage *proto_msg) { + pmt::pmt_t blob = pmt::make_blob(in.c_str(), in.size()); + convert_pmt_to_proto(blob, proto_msg); +} \ No newline at end of file diff --git a/gr-starcoder/lib/pmt_to_proto.h b/gr-starcoder/lib/pmt_to_proto.h index b1de532f..376ef3b1 100644 --- a/gr-starcoder/lib/pmt_to_proto.h +++ b/gr-starcoder/lib/pmt_to_proto.h @@ -28,5 +28,6 @@ void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg); void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformVector *uni_vector); +void convert_string_to_proto(const std::string &in, starcoder::BlockMessage *proto_msg); #endif /* INCLUDED_PMT_TO_PROTO_H */ diff --git a/sampleclient/main.go b/sampleclient/main.go index 09c21c01..e8da9066 100644 --- a/sampleclient/main.go +++ b/sampleclient/main.go @@ -62,7 +62,7 @@ func main() { ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", convertUint32SliceToByteSlice(r.GetPmt().GetUniformVectorValue().GetUValue().GetValue()), 0644) } if r.GetBlockId() == "noaa_apt_decoded" { - ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", r.GetPayload(), 0644) + ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", convertUint32SliceToByteSlice(r.GetPmt().GetUniformVectorValue().GetUValue().GetValue()), 0644) } if r.GetBlockId() == "meteor_decoder_sink" { ioutil.WriteFile(fmt.Sprintf("/home/rei/sampleAR2300IQ/meteor_decoded_%v.png", meteor_decoder_sink_idx), r.GetPayload(), 0644) From 359b1c1c0e2801a1fb8dbef88b270c0bcaccca82 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 14:31:45 +0900 Subject: [PATCH 10/26] introduce blob_value --- api/starcoder.proto | 2 ++ gr-starcoder/lib/noaa_apt_sink_impl.cc | 2 +- gr-starcoder/lib/pmt_to_proto.cc | 4 +++- gr-starcoder/lib/qa_enqueue_message_sink.cc | 4 ++-- gr-starcoder/lib/waterfall_plotter_impl.cc | 6 ++---- sampleclient/main.go | 4 ++-- 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/api/starcoder.proto b/api/starcoder.proto index eeb2a1b7..df45591a 100644 --- a/api/starcoder.proto +++ b/api/starcoder.proto @@ -84,6 +84,8 @@ message BlockMessage { UniformVector uniform_vector_value = 9; Dict dict_value = 10; + + bytes blob_value = 11; } } diff --git a/gr-starcoder/lib/noaa_apt_sink_impl.cc b/gr-starcoder/lib/noaa_apt_sink_impl.cc index fc6edea5..d5774ff8 100644 --- a/gr-starcoder/lib/noaa_apt_sink_impl.cc +++ b/gr-starcoder/lib/noaa_apt_sink_impl.cc @@ -133,7 +133,7 @@ void noaa_apt_sink_impl::write_image(std::string filename) { if (!image_string.empty()) { ::starcoder::BlockMessage grpc_pmt; - convert_string_to_proto(image_string, &grpc_pmt); + grpc_pmt.set_blob_value(image_string); string_queue_->push(grpc_pmt.SerializeAsString()); } } diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index bbbb5149..9cd06527 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -22,7 +22,9 @@ void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg) { - if (pmt::is_uniform_vector(pmt_msg)) { + if (pmt::is_blob(pmt_msg)) { + proto_msg->set_blob_value(pmt::blob_data(pmt_msg), pmt::blob_length(pmt_msg)); + } else if (pmt::is_uniform_vector(pmt_msg)) { starcoder::UniformVector *uni_vector = proto_msg->mutable_uniform_vector_value(); convert_proto_uniform_vector(pmt_msg, uni_vector); diff --git a/gr-starcoder/lib/qa_enqueue_message_sink.cc b/gr-starcoder/lib/qa_enqueue_message_sink.cc index 502a66f0..d1a6c62f 100644 --- a/gr-starcoder/lib/qa_enqueue_message_sink.cc +++ b/gr-starcoder/lib/qa_enqueue_message_sink.cc @@ -89,8 +89,8 @@ void qa_enqueue_message_sink::test_registered_queue() { // expected // binary format. Currently I'm unable to include the protobuf class in C++ // tests. - CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 16); - CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 9); + CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 12); + CPPUNIT_ASSERT_EQUAL(q.pop().size(), (size_t) 5); // Check that after retrieving all available messages, the queue // returns the empty string. diff --git a/gr-starcoder/lib/waterfall_plotter_impl.cc b/gr-starcoder/lib/waterfall_plotter_impl.cc index 5892b5a0..c95180e7 100644 --- a/gr-starcoder/lib/waterfall_plotter_impl.cc +++ b/gr-starcoder/lib/waterfall_plotter_impl.cc @@ -155,12 +155,10 @@ bool waterfall_plotter_impl::stop() { char *image_buffer = PyString_AsString(result); if (image_buffer == NULL) goto error; - pmt::pmt_t blob = pmt::make_blob(image_buffer, image_size); ::starcoder::BlockMessage grpc_pmt; - convert_pmt_to_proto(blob, &grpc_pmt); + grpc_pmt.set_blob_value(image_buffer, image_size); - const std::string serialized = grpc_pmt.SerializeAsString(); - string_queue_->push(serialized); + string_queue_->push(grpc_pmt.SerializeAsString()); } error: diff --git a/sampleclient/main.go b/sampleclient/main.go index e8da9066..143d2382 100644 --- a/sampleclient/main.go +++ b/sampleclient/main.go @@ -59,10 +59,10 @@ func main() { log.Println(r.GetBlockId(), r.GetPayload(), r.GetPmt()) } if r.GetBlockId() == "starcoder_waterfall_sink_0" { - ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", convertUint32SliceToByteSlice(r.GetPmt().GetUniformVectorValue().GetUValue().GetValue()), 0644) + ioutil.WriteFile("/home/rei/sampleAR2300IQ/waterfall_rec.png", r.GetPmt().GetBlobValue(), 0644) } if r.GetBlockId() == "noaa_apt_decoded" { - ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", convertUint32SliceToByteSlice(r.GetPmt().GetUniformVectorValue().GetUValue().GetValue()), 0644) + ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", r.GetPmt().GetBlobValue(), 0644) } if r.GetBlockId() == "meteor_decoder_sink" { ioutil.WriteFile(fmt.Sprintf("/home/rei/sampleAR2300IQ/meteor_decoded_%v.png", meteor_decoder_sink_idx), r.GetPayload(), 0644) From f8c4b65b6d3d3437daf7d2d41e42c810602485f8 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 16:07:37 +0900 Subject: [PATCH 11/26] add blob to proto_to_pmt --- gr-starcoder/lib/pmt_to_proto.cc | 5 ----- gr-starcoder/lib/pmt_to_proto.h | 1 - gr-starcoder/lib/proto_to_pmt.cc | 5 +++++ 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index 9cd06527..bccaa888 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -168,8 +168,3 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, }); } } - -void convert_string_to_proto(const std::string &in, starcoder::BlockMessage *proto_msg) { - pmt::pmt_t blob = pmt::make_blob(in.c_str(), in.size()); - convert_pmt_to_proto(blob, proto_msg); -} \ No newline at end of file diff --git a/gr-starcoder/lib/pmt_to_proto.h b/gr-starcoder/lib/pmt_to_proto.h index 376ef3b1..b1de532f 100644 --- a/gr-starcoder/lib/pmt_to_proto.h +++ b/gr-starcoder/lib/pmt_to_proto.h @@ -28,6 +28,5 @@ void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg); void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformVector *uni_vector); -void convert_string_to_proto(const std::string &in, starcoder::BlockMessage *proto_msg); #endif /* INCLUDED_PMT_TO_PROTO_H */ diff --git a/gr-starcoder/lib/proto_to_pmt.cc b/gr-starcoder/lib/proto_to_pmt.cc index b74f2615..29e53cfe 100644 --- a/gr-starcoder/lib/proto_to_pmt.cc +++ b/gr-starcoder/lib/proto_to_pmt.cc @@ -44,6 +44,11 @@ pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg) { return convert_pmt_uniform_vector(proto_msg.uniform_vector_value()); case starcoder::BlockMessage::MessageOneofCase::kDictValue: return convert_pmt_dict(proto_msg.dict_value()); + case starcoder::BlockMessage::MessageOneofCase::kBlobValue: + std::vector vec( + proto_msg.blob_value().begin(), + proto_msg.blob_value().end()); + return pmt::init_u8vector(proto_msg.blob_value().size(), vec); } return pmt::get_PMT_NIL(); } From a6109686230134c5b0970fb891abf7f60ed820be Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 16:31:36 +0900 Subject: [PATCH 12/26] test for proto_to_pmt --- gr-starcoder/lib/pmt_to_proto.cc | 3 ++- gr-starcoder/lib/proto_to_pmt.cc | 5 ++--- gr-starcoder/python/qa_command_source.py | 20 ++++++++++++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index bccaa888..c73ce168 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -23,7 +23,8 @@ void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg) { if (pmt::is_blob(pmt_msg)) { - proto_msg->set_blob_value(pmt::blob_data(pmt_msg), pmt::blob_length(pmt_msg)); + proto_msg->set_blob_value(pmt::blob_data(pmt_msg), + pmt::blob_length(pmt_msg)); } else if (pmt::is_uniform_vector(pmt_msg)) { starcoder::UniformVector *uni_vector = proto_msg->mutable_uniform_vector_value(); diff --git a/gr-starcoder/lib/proto_to_pmt.cc b/gr-starcoder/lib/proto_to_pmt.cc index 29e53cfe..12241ed6 100644 --- a/gr-starcoder/lib/proto_to_pmt.cc +++ b/gr-starcoder/lib/proto_to_pmt.cc @@ -45,9 +45,8 @@ pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg) { case starcoder::BlockMessage::MessageOneofCase::kDictValue: return convert_pmt_dict(proto_msg.dict_value()); case starcoder::BlockMessage::MessageOneofCase::kBlobValue: - std::vector vec( - proto_msg.blob_value().begin(), - proto_msg.blob_value().end()); + std::vector vec(proto_msg.blob_value().begin(), + proto_msg.blob_value().end()); return pmt::init_u8vector(proto_msg.blob_value().size(), vec); } return pmt::get_PMT_NIL(); diff --git a/gr-starcoder/python/qa_command_source.py b/gr-starcoder/python/qa_command_source.py index 97881282..4408ce05 100755 --- a/gr-starcoder/python/qa_command_source.py +++ b/gr-starcoder/python/qa_command_source.py @@ -103,6 +103,26 @@ def test_tuple(self): self.assertTrue(pmt.is_tuple(snk.get_message(0))) self.assertTrue(pmt.equal(snk.get_message(0), expected)) + def test_blob(self): + cs = starcoder.command_source() + snk = blocks.message_debug() + self.tb.msg_connect((cs, 'out'), (snk, 'store')) + + msg = starcoder_pb2.BlockMessage() + msg.blob_value = "data" + + expected = pmt.init_u8vector(4, [ord('d'), ord('a'), ord('t'), ord('a')]) + + self.tb.start() + cs.push(msg.SerializeToString()) + time.sleep(0.1) + self.tb.stop() + self.tb.wait() + + self.assertEqual(snk.num_messages(), 1) + self.assertTrue(pmt.is_u8vector(snk.get_message(0))) + self.assertTrue(pmt.equal(snk.get_message(0), expected)) + def test_u8_vector(self): cs = starcoder.command_source() snk = blocks.message_debug() From dc5fc29cee138317d47817a7848a3a812371f227 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 16:48:26 +0900 Subject: [PATCH 13/26] use pmt in meteor decoder --- gr-starcoder/lib/meteor_decoder_sink_impl.cc | 26 ++++++++++++++++---- gr-starcoder/lib/noaa_apt_sink_impl.cc | 3 ++- sampleclient/main.go | 2 +- server/starcoder.go | 5 ++++ 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/gr-starcoder/lib/meteor_decoder_sink_impl.cc b/gr-starcoder/lib/meteor_decoder_sink_impl.cc index c93fd7ce..66cc06d6 100644 --- a/gr-starcoder/lib/meteor_decoder_sink_impl.cc +++ b/gr-starcoder/lib/meteor_decoder_sink_impl.cc @@ -28,6 +28,8 @@ #include "meteor/meteor_decoder.h" #include "meteor/meteor_packet.h" +#include "pmt_to_proto.h" + namespace gr { namespace starcoder { @@ -94,7 +96,8 @@ bool meteor_decoder_sink_impl::stop() { int ok = 0; while (decoder.pos() < total_size_ - meteor::SOFT_FRAME_LEN) { total++; - bool res = decoder.decode_one_frame(raw.get(), total_size_, error_corrected_data.get()); + bool res = decoder.decode_one_frame(raw.get(), total_size_, + error_corrected_data.get()); if (res) { ok++; std::cout << std::dec << 100. * decoder.pos() / total_size_ << "% " @@ -114,10 +117,23 @@ bool meteor_decoder_sink_impl::stop() { std::string png_b = packeter.dump_gray_image(meteor::BLUE_APID); if (string_queue_ != NULL) { - if (!png_img.empty()) string_queue_->push(png_img); - if (!png_r.empty()) string_queue_->push(png_r); - if (!png_g.empty()) string_queue_->push(png_g); - if (!png_b.empty()) string_queue_->push(png_b); + ::starcoder::BlockMessage grpc_pmt; + if (!png_img.empty()) { + grpc_pmt.set_blob_value(png_img); + string_queue_->push(grpc_pmt.SerializeAsString()); + } + if (!png_r.empty()) { + grpc_pmt.set_blob_value(png_r); + string_queue_->push(grpc_pmt.SerializeAsString()); + } + if (!png_g.empty()) { + grpc_pmt.set_blob_value(png_g); + string_queue_->push(grpc_pmt.SerializeAsString()); + } + if (!png_b.empty()) { + grpc_pmt.set_blob_value(png_b); + string_queue_->push(grpc_pmt.SerializeAsString()); + } } if (!filename_.empty()) { diff --git a/gr-starcoder/lib/noaa_apt_sink_impl.cc b/gr-starcoder/lib/noaa_apt_sink_impl.cc index d5774ff8..cd8318a8 100644 --- a/gr-starcoder/lib/noaa_apt_sink_impl.cc +++ b/gr-starcoder/lib/noaa_apt_sink_impl.cc @@ -129,7 +129,8 @@ void noaa_apt_sink_impl::write_image(std::string filename) { if (!d_flip) image_string = (store_gray_to_png_string(image_received_view_)); else - image_string = store_gray_to_png_string(flipped_up_down_view(image_received_view_)); + image_string = + store_gray_to_png_string(flipped_up_down_view(image_received_view_)); if (!image_string.empty()) { ::starcoder::BlockMessage grpc_pmt; diff --git a/sampleclient/main.go b/sampleclient/main.go index 143d2382..56d94d9f 100644 --- a/sampleclient/main.go +++ b/sampleclient/main.go @@ -65,7 +65,7 @@ func main() { ioutil.WriteFile("/home/rei/sampleAR2300IQ/noaa_apt_rec.png", r.GetPmt().GetBlobValue(), 0644) } if r.GetBlockId() == "meteor_decoder_sink" { - ioutil.WriteFile(fmt.Sprintf("/home/rei/sampleAR2300IQ/meteor_decoded_%v.png", meteor_decoder_sink_idx), r.GetPayload(), 0644) + ioutil.WriteFile(fmt.Sprintf("/home/rei/sampleAR2300IQ/meteor_decoded_%v.png", meteor_decoder_sink_idx), r.GetPmt().GetBlobValue(), 0644) meteor_decoder_sink_idx++ } } diff --git a/server/starcoder.go b/server/starcoder.go index f4cada1a..006c5249 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -255,6 +255,11 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString if sh.finished() { // Send the rest of the bytes if any are left for bytes = []byte(q.Pop()); len(bytes) != 0; bytes = []byte(q.Pop()) { + message := &pb.BlockMessage{} + err := proto.Unmarshal(bytes, message) + if err != nil { + sh.log.Errorw("Failed to unmarshal protobuf", "block", blockName) + } if len(bytes) > 10485760 { sh.log.Errorw( "Length of packet received from Starcoder much bigger than expected.", From 044768ec6e5c01458b79a5fc518b44d1b92fba04 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 18:05:17 +0900 Subject: [PATCH 14/26] experimental drop payload --- server/starcoder.go | 52 ++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/server/starcoder.go b/server/starcoder.go index 006c5249..efeaaa05 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -231,27 +231,34 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString defer sh.wg.Done() for { bytes := []byte(q.BlockingPop()) - message := &pb.BlockMessage{} - err := proto.Unmarshal(bytes, message) - if err != nil { - sh.log.Errorw("Failed to unmarshal protobuf", "block", blockName) - } - if len(bytes) > 10485760 { - sh.log.Errorw( - "Length of packet received from Starcoder much bigger than expected.", - "block", blockName, "size", len(bytes)) - } else if len(bytes) != 0 { - // Could be woken up spuriously or by something else calling q.Close() - if err := sh.stream.Send(&pb.RunFlowgraphResponse{ + + if len(bytes) != 0 { // Could be woken up spuriously or by something else calling q.Close() + message := &pb.BlockMessage{} + err := proto.Unmarshal(bytes, message) + if err != nil { + sh.log.Errorw("Failed to unmarshal protobuf", "block", blockName) + continue + } + + request := &pb.RunFlowgraphResponse{ BlockId: blockName, - Payload: bytes, Pmt: message, - }); err != nil { + } + + if proto.Size(request) > 10485670 { + sh.log.Errorw( + "Length of request message from Starcoder much bigger than expected.", + "block", blockName, "size", proto.Size(request)) + continue + } + + if err := sh.stream.Send(request); err != nil { sh.log.Errorf("Error sending stream: %v", err) sh.finish(err) return } } + if sh.finished() { // Send the rest of the bytes if any are left for bytes = []byte(q.Pop()); len(bytes) != 0; bytes = []byte(q.Pop()) { @@ -259,18 +266,19 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString err := proto.Unmarshal(bytes, message) if err != nil { sh.log.Errorw("Failed to unmarshal protobuf", "block", blockName) - } - if len(bytes) > 10485760 { - sh.log.Errorw( - "Length of packet received from Starcoder much bigger than expected.", - "block", blockName, "size", len(bytes)) continue } - if err := sh.stream.Send(&pb.RunFlowgraphResponse{ + request := &pb.RunFlowgraphResponse{ BlockId: blockName, - Payload: bytes, Pmt: message, - }); err != nil { + } + if proto.Size(request) > 10485760 { + sh.log.Errorw( + "Length of request message from Starcoder much bigger than expected.", + "block", blockName, "size", proto.Size(request)) + continue + } + if err := sh.stream.Send(request); err != nil { sh.log.Errorf("Error sending stream: %v", err) } } From e4fbdbc86d53f87c87356b29e9ef7567911c3e84 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Thu, 30 Aug 2018 19:39:48 +0900 Subject: [PATCH 15/26] fix closing --- server/starcoder.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/starcoder.go b/server/starcoder.go index efeaaa05..2834c384 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -383,6 +383,12 @@ func (sh *streamHandler) Close() { close(sh.perfCtrStopChannel) sh.wg.Done() }() + + // Detect if finish() was not called from inside stream handler i.e. Starcoder closing + if !sh.finished() { + sh.finish(nil) + } + // TODO: Make this call unblock by getting rid of `wait` err := sh.starcoder.stopFlowGraph(sh.flowgraphProps.pyInstance) if err != nil { From df2b0284abe5a8df5a0621d3164a8bc3469bbb93 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Mon, 10 Sep 2018 16:23:54 +0900 Subject: [PATCH 16/26] check if queue is closed() to exit goroutine --- cqueue/c_queue.go | 4 ++++ cqueue/string_queue.cc | 5 +++++ cqueue/string_queue.h | 1 + server/starcoder.go | 32 ++++++++++---------------------- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/cqueue/c_queue.go b/cqueue/c_queue.go index 71a190d0..94de2eae 100644 --- a/cqueue/c_queue.go +++ b/cqueue/c_queue.go @@ -49,6 +49,10 @@ func (q *CStringQueue) Close() { q.queue.Close() } +func (q *CStringQueue) Closed() bool { + return q.queue.Closed() +} + func (q *CStringQueue) Push(str string) { q.queue.Push(str) } diff --git a/cqueue/string_queue.cc b/cqueue/string_queue.cc index d0c73030..d22e9d18 100644 --- a/cqueue/string_queue.cc +++ b/cqueue/string_queue.cc @@ -65,6 +65,11 @@ void string_queue::close() { condition_var_.notify_one(); } +bool string_queue::closed() { + std::unique_lock lock(mutex_); + return closed_; +} + uint64_t string_queue::get_ptr() const { return reinterpret_cast(this); } diff --git a/cqueue/string_queue.h b/cqueue/string_queue.h index 21108d75..8c438132 100644 --- a/cqueue/string_queue.h +++ b/cqueue/string_queue.h @@ -42,6 +42,7 @@ class string_queue { std::string blocking_pop(); unsigned long get_ptr() const; void close(); + bool closed(); // TODO: Make this uint64_t static string_queue *queue_from_pointer(unsigned long long ptr); private: diff --git a/server/starcoder.go b/server/starcoder.go index cf81bd12..3ce050f5 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -85,9 +85,9 @@ func NewStarcoderServer(flowgraphDir string, perfCtrInterval time.Duration, sile deregisterStreamHandler: make(chan *streamHandler), closeAllStreamsChannel: make(chan chan bool), filepathToModAndClassName: make(map[string]*moduleAndClassNames), - log: log, - perfCtrInterval: perfCtrInterval, - silencedCommandBlocks: silencedCommandBlocksMap, + log: log, + perfCtrInterval: perfCtrInterval, + silencedCommandBlocks: silencedCommandBlocksMap, } tempDir, err := ioutil.TempDir("", "starcoder") @@ -163,7 +163,7 @@ type streamHandler struct { perfCtrStopChannel chan struct{} clientFinished bool streamError error - mustCloseMutex sync.Mutex + errorMutex sync.Mutex wg sync.WaitGroup log *zap.SugaredLogger } @@ -176,7 +176,7 @@ func newStreamHandler(sc *Starcoder, stream pb.Starcoder_RunFlowgraphServer, flo perfCtrStopChannel: make(chan struct{}), clientFinished: false, streamError: nil, - mustCloseMutex: sync.Mutex{}, + errorMutex: sync.Mutex{}, wg: sync.WaitGroup{}, log: log, } @@ -251,7 +251,7 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString request := &pb.RunFlowgraphResponse{ BlockId: blockName, - Pmt: message, + Pmt: message, } if proto.Size(request) > 10485670 { @@ -268,7 +268,7 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString } } - if sh.finished() { + if q.Closed() { // Send the rest of the bytes if any are left for bytes = []byte(q.Pop()); len(bytes) != 0; bytes = []byte(q.Pop()) { message := &pb.BlockMessage{} @@ -279,7 +279,7 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString } request := &pb.RunFlowgraphResponse{ BlockId: blockName, - Pmt: message, + Pmt: message, } if proto.Size(request) > 10485760 { sh.log.Errorw( @@ -365,8 +365,8 @@ func (sh *streamHandler) Wait() { // Called within streamHandler whenever an error happens or client finished. // Signals starcoder server that it needs to end this stream handler func (sh *streamHandler) finish(err error) { - sh.mustCloseMutex.Lock() - defer sh.mustCloseMutex.Unlock() + sh.errorMutex.Lock() + defer sh.errorMutex.Unlock() if err == nil { sh.clientFinished = true } @@ -376,13 +376,6 @@ func (sh *streamHandler) finish(err error) { }() } -// Called by goroutines of streamHandler when they want to know when to close -func (sh *streamHandler) finished() bool { - sh.mustCloseMutex.Lock() - defer sh.mustCloseMutex.Unlock() - return sh.clientFinished || sh.streamError != nil -} - // Must only be called by starcoder server func (sh *streamHandler) Close() { defer func() { @@ -393,11 +386,6 @@ func (sh *streamHandler) Close() { sh.wg.Done() }() - // Detect if finish() was not called from inside stream handler i.e. Starcoder closing - if !sh.finished() { - sh.finish(nil) - } - // TODO: Make this call unblock by getting rid of `wait` err := sh.starcoder.stopFlowGraph(sh.flowgraphProps.pyInstance) if err != nil { From 5686c6701b3f669697e0cf7fbc19f73cd3699681 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Mon, 10 Sep 2018 16:48:33 +0900 Subject: [PATCH 17/26] revert --- server/starcoder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/starcoder.go b/server/starcoder.go index 3ce050f5..4f6be160 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -385,7 +385,6 @@ func (sh *streamHandler) Close() { close(sh.perfCtrStopChannel) sh.wg.Done() }() - // TODO: Make this call unblock by getting rid of `wait` err := sh.starcoder.stopFlowGraph(sh.flowgraphProps.pyInstance) if err != nil { From 1215f26b82056b94be5210cc7507b10bae68556a Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Mon, 10 Sep 2018 17:26:15 +0900 Subject: [PATCH 18/26] factor out function --- server/starcoder.go | 51 ++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/server/starcoder.go b/server/starcoder.go index 4f6be160..7ab78c6e 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -242,22 +242,9 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString bytes := []byte(q.BlockingPop()) if len(bytes) != 0 { // Could be woken up spuriously or by something else calling q.Close() - message := &pb.BlockMessage{} - err := proto.Unmarshal(bytes, message) + request, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) if err != nil { - sh.log.Errorw("Failed to unmarshal protobuf", "block", blockName) - continue - } - - request := &pb.RunFlowgraphResponse{ - BlockId: blockName, - Pmt: message, - } - - if proto.Size(request) > 10485670 { - sh.log.Errorw( - "Length of request message from Starcoder much bigger than expected.", - "block", blockName, "size", proto.Size(request)) + sh.log.Errorw("Error constructing flowgraph response", "error", err) continue } @@ -271,20 +258,9 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString if q.Closed() { // Send the rest of the bytes if any are left for bytes = []byte(q.Pop()); len(bytes) != 0; bytes = []byte(q.Pop()) { - message := &pb.BlockMessage{} - err := proto.Unmarshal(bytes, message) + request, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) if err != nil { - sh.log.Errorw("Failed to unmarshal protobuf", "block", blockName) - continue - } - request := &pb.RunFlowgraphResponse{ - BlockId: blockName, - Pmt: message, - } - if proto.Size(request) > 10485760 { - sh.log.Errorw( - "Length of request message from Starcoder much bigger than expected.", - "block", blockName, "size", proto.Size(request)) + sh.log.Errorw("Error constructing flowgraph response", "error", err) continue } if err := sh.stream.Send(request); err != nil { @@ -789,3 +765,22 @@ func getExceptionString() string { return fmt.Sprintf("Exception: %v \n Value: %v ", safeAsString(exc), safeAsString(val)) } + +func constructFlowgraphResponseFromSerializedPMT(blockName string, serialized []byte) (*pb.RunFlowgraphResponse, error) { + message := &pb.BlockMessage{} + err := proto.Unmarshal(serialized, message) + if err != nil { + return nil, err + } + + request := &pb.RunFlowgraphResponse{ + BlockId: blockName, + Pmt: message, + } + + if proto.Size(request) > 10485670 { + return nil, errors.New(fmt.Sprintf("Length of request message from Starcoder much bigger than expected. Block name: %v, Message size: %v", blockName, proto.Size(request))) + } + + return request, nil +} From 8c1522d891343189f96efec3f2415b444b898d19 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Mon, 10 Sep 2018 17:58:29 +0900 Subject: [PATCH 19/26] dont expose in header file --- gr-starcoder/lib/pmt_to_proto.cc | 114 +++++++++++++++---------------- gr-starcoder/lib/pmt_to_proto.h | 2 - gr-starcoder/lib/proto_to_pmt.cc | 64 ++++++++--------- gr-starcoder/lib/proto_to_pmt.h | 4 -- 4 files changed, 89 insertions(+), 95 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index c73ce168..584eed9f 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -20,63 +20,6 @@ #include "pmt_to_proto.h" -void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, - starcoder::BlockMessage *proto_msg) { - if (pmt::is_blob(pmt_msg)) { - proto_msg->set_blob_value(pmt::blob_data(pmt_msg), - pmt::blob_length(pmt_msg)); - } else if (pmt::is_uniform_vector(pmt_msg)) { - starcoder::UniformVector *uni_vector = - proto_msg->mutable_uniform_vector_value(); - convert_proto_uniform_vector(pmt_msg, uni_vector); - } else if (pmt::is_bool(pmt_msg)) { - proto_msg->set_boolean_value(pmt::to_bool(pmt_msg)); - } else if (pmt::is_symbol(pmt_msg)) { - proto_msg->set_symbol_value(pmt::symbol_to_string(pmt_msg)); - } else if (pmt::is_integer(pmt_msg)) { - proto_msg->set_integer_value(pmt::to_long(pmt_msg)); - } else if (pmt::is_uint64(pmt_msg)) { - proto_msg->set_integer_value(pmt::to_uint64(pmt_msg)); - } else if (pmt::is_real(pmt_msg)) { - proto_msg->set_double_value(pmt::to_double(pmt_msg)); - } else if (pmt::is_complex(pmt_msg)) { - std::complex val = pmt::to_complex(pmt_msg); - starcoder::Complex *complex = proto_msg->mutable_complex_value(); - complex->set_real_value(val.real()); - complex->set_imaginary_value(val.imag()); - } else if (pmt::is_pair(pmt_msg)) { - starcoder::Pair *pair = proto_msg->mutable_pair_value(); - starcoder::BlockMessage *car = pair->mutable_car(); - starcoder::BlockMessage *cdr = pair->mutable_cdr(); - convert_pmt_to_proto(pmt::car(pmt_msg), car); - convert_pmt_to_proto(pmt::cdr(pmt_msg), cdr); - } else if (pmt::is_tuple(pmt_msg)) { - starcoder::List *list = proto_msg->mutable_list_value(); - list->set_type(starcoder::List::TUPLE); - for (int i = 0; i < pmt::length(pmt_msg); i++) { - starcoder::BlockMessage *element = list->add_value(); - convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i), element); - } - } else if (pmt::is_vector(pmt_msg)) { - starcoder::List *list = proto_msg->mutable_list_value(); - list->set_type(starcoder::List::VECTOR); - for (int i = 0; i < pmt::length(pmt_msg); i++) { - starcoder::BlockMessage *element = list->add_value(); - convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i), element); - } - } else if (pmt::is_dict(pmt_msg)) { - starcoder::Dict *dict = proto_msg->mutable_dict_value(); - pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg); - for (int i = 0; i < pmt::length(key_value_pairs_list); i++) { - starcoder::Dict_Entry *entry = dict->add_entry(); - starcoder::BlockMessage *key = entry->mutable_key(); - starcoder::BlockMessage *value = entry->mutable_value(); - convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)), key); - convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)), value); - } - } -} - void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::UniformVector *uni_vector) { if (pmt::is_u8vector(pmt_msg)) { @@ -169,3 +112,60 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, }); } } + +void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, + starcoder::BlockMessage *proto_msg) { + if (pmt::is_blob(pmt_msg)) { + proto_msg->set_blob_value(pmt::blob_data(pmt_msg), + pmt::blob_length(pmt_msg)); + } else if (pmt::is_uniform_vector(pmt_msg)) { + starcoder::UniformVector *uni_vector = + proto_msg->mutable_uniform_vector_value(); + convert_proto_uniform_vector(pmt_msg, uni_vector); + } else if (pmt::is_bool(pmt_msg)) { + proto_msg->set_boolean_value(pmt::to_bool(pmt_msg)); + } else if (pmt::is_symbol(pmt_msg)) { + proto_msg->set_symbol_value(pmt::symbol_to_string(pmt_msg)); + } else if (pmt::is_integer(pmt_msg)) { + proto_msg->set_integer_value(pmt::to_long(pmt_msg)); + } else if (pmt::is_uint64(pmt_msg)) { + proto_msg->set_integer_value(pmt::to_uint64(pmt_msg)); + } else if (pmt::is_real(pmt_msg)) { + proto_msg->set_double_value(pmt::to_double(pmt_msg)); + } else if (pmt::is_complex(pmt_msg)) { + std::complex val = pmt::to_complex(pmt_msg); + starcoder::Complex *complex = proto_msg->mutable_complex_value(); + complex->set_real_value(val.real()); + complex->set_imaginary_value(val.imag()); + } else if (pmt::is_pair(pmt_msg)) { + starcoder::Pair *pair = proto_msg->mutable_pair_value(); + starcoder::BlockMessage *car = pair->mutable_car(); + starcoder::BlockMessage *cdr = pair->mutable_cdr(); + convert_pmt_to_proto(pmt::car(pmt_msg), car); + convert_pmt_to_proto(pmt::cdr(pmt_msg), cdr); + } else if (pmt::is_tuple(pmt_msg)) { + starcoder::List *list = proto_msg->mutable_list_value(); + list->set_type(starcoder::List::TUPLE); + for (int i = 0; i < pmt::length(pmt_msg); i++) { + starcoder::BlockMessage *element = list->add_value(); + convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i), element); + } + } else if (pmt::is_vector(pmt_msg)) { + starcoder::List *list = proto_msg->mutable_list_value(); + list->set_type(starcoder::List::VECTOR); + for (int i = 0; i < pmt::length(pmt_msg); i++) { + starcoder::BlockMessage *element = list->add_value(); + convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i), element); + } + } else if (pmt::is_dict(pmt_msg)) { + starcoder::Dict *dict = proto_msg->mutable_dict_value(); + pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg); + for (int i = 0; i < pmt::length(key_value_pairs_list); i++) { + starcoder::Dict_Entry *entry = dict->add_entry(); + starcoder::BlockMessage *key = entry->mutable_key(); + starcoder::BlockMessage *value = entry->mutable_value(); + convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)), key); + convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)), value); + } + } +} diff --git a/gr-starcoder/lib/pmt_to_proto.h b/gr-starcoder/lib/pmt_to_proto.h index b1de532f..47a09f54 100644 --- a/gr-starcoder/lib/pmt_to_proto.h +++ b/gr-starcoder/lib/pmt_to_proto.h @@ -26,7 +26,5 @@ void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, starcoder::BlockMessage *proto_msg); -void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, - starcoder::UniformVector *uni_vector); #endif /* INCLUDED_PMT_TO_PROTO_H */ diff --git a/gr-starcoder/lib/proto_to_pmt.cc b/gr-starcoder/lib/proto_to_pmt.cc index 12241ed6..2aef76b1 100644 --- a/gr-starcoder/lib/proto_to_pmt.cc +++ b/gr-starcoder/lib/proto_to_pmt.cc @@ -20,38 +20,6 @@ #include "proto_to_pmt.h" -pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg) { - starcoder::BlockMessage::MessageOneofCase type = - proto_msg.message_oneof_case(); - switch (type) { - case starcoder::BlockMessage::MessageOneofCase::kBooleanValue: - return pmt::from_bool(proto_msg.boolean_value()); - case starcoder::BlockMessage::MessageOneofCase::kSymbolValue: - return pmt::string_to_symbol(proto_msg.symbol_value()); - case starcoder::BlockMessage::MessageOneofCase::kIntegerValue: - return pmt::from_long(proto_msg.integer_value()); - case starcoder::BlockMessage::MessageOneofCase::kDoubleValue: - return pmt::from_double(proto_msg.double_value()); - case starcoder::BlockMessage::MessageOneofCase::kComplexValue: - return pmt::from_complex(proto_msg.complex_value().real_value(), - proto_msg.complex_value().imaginary_value()); - case starcoder::BlockMessage::MessageOneofCase::kPairValue: - return pmt::cons(convert_proto_to_pmt(proto_msg.pair_value().car()), - convert_proto_to_pmt(proto_msg.pair_value().cdr())); - case starcoder::BlockMessage::MessageOneofCase::kListValue: - return convert_pmt_list(proto_msg.list_value()); - case starcoder::BlockMessage::MessageOneofCase::kUniformVectorValue: - return convert_pmt_uniform_vector(proto_msg.uniform_vector_value()); - case starcoder::BlockMessage::MessageOneofCase::kDictValue: - return convert_pmt_dict(proto_msg.dict_value()); - case starcoder::BlockMessage::MessageOneofCase::kBlobValue: - std::vector vec(proto_msg.blob_value().begin(), - proto_msg.blob_value().end()); - return pmt::init_u8vector(proto_msg.blob_value().size(), vec); - } - return pmt::get_PMT_NIL(); -} - pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list) { int size = proto_pmt_list.value_size(); if (proto_pmt_list.type() == starcoder::List::TUPLE) { @@ -255,3 +223,35 @@ pmt::pmt_t convert_pmt_dict(const starcoder::Dict &proto_pmt_dict) { } return dict; } + +pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg) { + starcoder::BlockMessage::MessageOneofCase type = + proto_msg.message_oneof_case(); + switch (type) { + case starcoder::BlockMessage::MessageOneofCase::kBooleanValue: + return pmt::from_bool(proto_msg.boolean_value()); + case starcoder::BlockMessage::MessageOneofCase::kSymbolValue: + return pmt::string_to_symbol(proto_msg.symbol_value()); + case starcoder::BlockMessage::MessageOneofCase::kIntegerValue: + return pmt::from_long(proto_msg.integer_value()); + case starcoder::BlockMessage::MessageOneofCase::kDoubleValue: + return pmt::from_double(proto_msg.double_value()); + case starcoder::BlockMessage::MessageOneofCase::kComplexValue: + return pmt::from_complex(proto_msg.complex_value().real_value(), + proto_msg.complex_value().imaginary_value()); + case starcoder::BlockMessage::MessageOneofCase::kPairValue: + return pmt::cons(convert_proto_to_pmt(proto_msg.pair_value().car()), + convert_proto_to_pmt(proto_msg.pair_value().cdr())); + case starcoder::BlockMessage::MessageOneofCase::kListValue: + return convert_pmt_list(proto_msg.list_value()); + case starcoder::BlockMessage::MessageOneofCase::kUniformVectorValue: + return convert_pmt_uniform_vector(proto_msg.uniform_vector_value()); + case starcoder::BlockMessage::MessageOneofCase::kDictValue: + return convert_pmt_dict(proto_msg.dict_value()); + case starcoder::BlockMessage::MessageOneofCase::kBlobValue: + std::vector vec(proto_msg.blob_value().begin(), + proto_msg.blob_value().end()); + return pmt::init_u8vector(proto_msg.blob_value().size(), vec); + } + return pmt::get_PMT_NIL(); +} diff --git a/gr-starcoder/lib/proto_to_pmt.h b/gr-starcoder/lib/proto_to_pmt.h index 43f8139e..f3923dc2 100644 --- a/gr-starcoder/lib/proto_to_pmt.h +++ b/gr-starcoder/lib/proto_to_pmt.h @@ -25,9 +25,5 @@ #include "starcoder.pb.h" pmt::pmt_t convert_proto_to_pmt(const starcoder::BlockMessage &proto_msg); -pmt::pmt_t convert_pmt_list(const starcoder::List &proto_pmt_list); -pmt::pmt_t convert_pmt_uniform_vector( - const starcoder::UniformVector &proto_pmt_uniform_vector); -pmt::pmt_t convert_pmt_dict(const starcoder::Dict &proto_pmt_dict); #endif /* INCLUDED_PROTO_TO_PMT_H */ From 26a5a50edbfce5d927bafd8b42cada641c2548da Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Mon, 10 Sep 2018 18:27:49 +0900 Subject: [PATCH 20/26] use back inserter --- gr-starcoder/lib/pmt_to_proto.cc | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index 584eed9f..3ed3df16 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -90,26 +90,30 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value(); const std::vector> vector_elements = pmt::c32vector_elements(pmt_msg); + std::vector vec; std::transform(vector_elements.begin(), vector_elements.end(), - c32_vector->mutable_value()->begin(), + std::back_inserter(vec), [](std::complex c)->starcoder::Complex32 { starcoder::Complex32 new_val; new_val.set_real_value(c.real()); new_val.set_imaginary_value(c.imag()); return new_val; }); + *c32_vector->mutable_value() = {vec.begin(), vec.end()}; } else if (pmt::is_c64vector(pmt_msg)) { starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value(); const std::vector> vector_elements = pmt::c64vector_elements(pmt_msg); + std::vector vec; std::transform(vector_elements.begin(), vector_elements.end(), - c64_vector->mutable_value()->begin(), + std::back_inserter(vec), [](std::complex c)->starcoder::Complex { starcoder::Complex new_val; new_val.set_real_value(c.real()); new_val.set_imaginary_value(c.imag()); return new_val; }); + *c64_vector->mutable_value() = {vec.begin(), vec.end()}; } } From 55f1eea72932ae5adf73d896fe0aaae6bffb65f2 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Mon, 10 Sep 2018 18:39:48 +0900 Subject: [PATCH 21/26] use repeatedptrfieldbackinsertiterator --- gr-starcoder/lib/pmt_to_proto.cc | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index 3ed3df16..c1b782df 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -90,30 +90,26 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value(); const std::vector> vector_elements = pmt::c32vector_elements(pmt_msg); - std::vector vec; std::transform(vector_elements.begin(), vector_elements.end(), - std::back_inserter(vec), + google::protobuf::internal::RepeatedPtrFieldBackInsertIterator(c32_vector->mutable_value()), [](std::complex c)->starcoder::Complex32 { starcoder::Complex32 new_val; new_val.set_real_value(c.real()); new_val.set_imaginary_value(c.imag()); return new_val; }); - *c32_vector->mutable_value() = {vec.begin(), vec.end()}; } else if (pmt::is_c64vector(pmt_msg)) { starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value(); const std::vector> vector_elements = pmt::c64vector_elements(pmt_msg); - std::vector vec; std::transform(vector_elements.begin(), vector_elements.end(), - std::back_inserter(vec), + google::protobuf::internal::RepeatedPtrFieldBackInsertIterator(c64_vector->mutable_value()), [](std::complex c)->starcoder::Complex { starcoder::Complex new_val; new_val.set_real_value(c.real()); new_val.set_imaginary_value(c.imag()); return new_val; }); - *c64_vector->mutable_value() = {vec.begin(), vec.end()}; } } From d9d22b90cd64554508527150f97c69399ce6b048 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Mon, 10 Sep 2018 18:41:34 +0900 Subject: [PATCH 22/26] clang --- gr-starcoder/lib/pmt_to_proto.cc | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index c1b782df..bfc53fca 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -90,9 +90,11 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value(); const std::vector> vector_elements = pmt::c32vector_elements(pmt_msg); - std::transform(vector_elements.begin(), vector_elements.end(), - google::protobuf::internal::RepeatedPtrFieldBackInsertIterator(c32_vector->mutable_value()), - [](std::complex c)->starcoder::Complex32 { + std::transform( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedPtrFieldBackInsertIterator< + starcoder::Complex32>(c32_vector->mutable_value()), + [](std::complex c)->starcoder::Complex32 { starcoder::Complex32 new_val; new_val.set_real_value(c.real()); new_val.set_imaginary_value(c.imag()); @@ -102,9 +104,11 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value(); const std::vector> vector_elements = pmt::c64vector_elements(pmt_msg); - std::transform(vector_elements.begin(), vector_elements.end(), - google::protobuf::internal::RepeatedPtrFieldBackInsertIterator(c64_vector->mutable_value()), - [](std::complex c)->starcoder::Complex { + std::transform( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedPtrFieldBackInsertIterator< + starcoder::Complex>(c64_vector->mutable_value()), + [](std::complex c)->starcoder::Complex { starcoder::Complex new_val; new_val.set_real_value(c.real()); new_val.set_imaginary_value(c.imag()); From 3e9552533fd42f919bf35fd723ee97a021dfaf07 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Tue, 11 Sep 2018 12:57:39 +0900 Subject: [PATCH 23/26] use std::copy --- gr-starcoder/lib/pmt_to_proto.cc | 60 +++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index bfc53fca..306c0ad3 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -27,65 +27,85 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, u_vector->set_size(starcoder::IntSize::Size8); const std::vector vector_elements = pmt::u8vector_elements(pmt_msg); - *u_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u_vector->mutable_value())); } else if (pmt::is_s8vector(pmt_msg)) { starcoder::IVector *i_vector = uni_vector->mutable_i_value(); i_vector->set_size(starcoder::IntSize::Size8); const std::vector vector_elements = pmt::s8vector_elements(pmt_msg); - *i_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i_vector->mutable_value())); } else if (pmt::is_u16vector(pmt_msg)) { starcoder::UVector *u_vector = uni_vector->mutable_u_value(); u_vector->set_size(starcoder::IntSize::Size16); const std::vector vector_elements = pmt::u16vector_elements(pmt_msg); - *u_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u_vector->mutable_value())); } else if (pmt::is_s16vector(pmt_msg)) { starcoder::IVector *i_vector = uni_vector->mutable_i_value(); i_vector->set_size(starcoder::IntSize::Size16); const std::vector vector_elements = pmt::s16vector_elements(pmt_msg); - *i_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i_vector->mutable_value())); } else if (pmt::is_u32vector(pmt_msg)) { starcoder::UVector *u_vector = uni_vector->mutable_u_value(); u_vector->set_size(starcoder::IntSize::Size32); const std::vector vector_elements = pmt::u32vector_elements(pmt_msg); - *u_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u_vector->mutable_value())); } else if (pmt::is_s32vector(pmt_msg)) { starcoder::IVector *i_vector = uni_vector->mutable_i_value(); i_vector->set_size(starcoder::IntSize::Size32); const std::vector vector_elements = pmt::s32vector_elements(pmt_msg); - *i_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i_vector->mutable_value())); } else if (pmt::is_u64vector(pmt_msg)) { starcoder::U64Vector *u64_vector = uni_vector->mutable_u64_value(); const std::vector vector_elements = pmt::u64vector_elements(pmt_msg); - *u64_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + u64_vector->mutable_value())); } else if (pmt::is_s64vector(pmt_msg)) { starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value(); const std::vector vector_elements = pmt::s64vector_elements(pmt_msg); - *i64_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + i64_vector->mutable_value())); } else if (pmt::is_f32vector(pmt_msg)) { starcoder::F32Vector *f32_vector = uni_vector->mutable_f32_value(); const std::vector vector_elements = pmt::f32vector_elements(pmt_msg); - *f32_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + f32_vector->mutable_value())); } else if (pmt::is_f64vector(pmt_msg)) { starcoder::F64Vector *f64_vector = uni_vector->mutable_f64_value(); const std::vector vector_elements = pmt::f64vector_elements(pmt_msg); - *f64_vector->mutable_value() = { vector_elements.begin(), - vector_elements.end() }; + std::copy( + vector_elements.begin(), vector_elements.end(), + google::protobuf::internal::RepeatedFieldBackInsertIterator( + f64_vector->mutable_value())); } else if (pmt::is_c32vector(pmt_msg)) { starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value(); const std::vector> vector_elements = From c2b222e581c9556b69aa3dcad7d9c32d5f46d296 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Tue, 11 Sep 2018 13:28:23 +0900 Subject: [PATCH 24/26] rename request to respose --- server/starcoder.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/starcoder.go b/server/starcoder.go index 7ab78c6e..df9e3436 100644 --- a/server/starcoder.go +++ b/server/starcoder.go @@ -242,13 +242,13 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString bytes := []byte(q.BlockingPop()) if len(bytes) != 0 { // Could be woken up spuriously or by something else calling q.Close() - request, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) + response, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) if err != nil { sh.log.Errorw("Error constructing flowgraph response", "error", err) continue } - if err := sh.stream.Send(request); err != nil { + if err := sh.stream.Send(response); err != nil { sh.log.Errorf("Error sending stream: %v", err) sh.finish(err) return @@ -258,12 +258,12 @@ func (sh *streamHandler) observableQueueLoop(blockName string, q *cqueue.CString if q.Closed() { // Send the rest of the bytes if any are left for bytes = []byte(q.Pop()); len(bytes) != 0; bytes = []byte(q.Pop()) { - request, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) + response, err := constructFlowgraphResponseFromSerializedPMT(blockName, bytes) if err != nil { sh.log.Errorw("Error constructing flowgraph response", "error", err) continue } - if err := sh.stream.Send(request); err != nil { + if err := sh.stream.Send(response); err != nil { sh.log.Errorf("Error sending stream: %v", err) } } @@ -773,14 +773,14 @@ func constructFlowgraphResponseFromSerializedPMT(blockName string, serialized [] return nil, err } - request := &pb.RunFlowgraphResponse{ + response := &pb.RunFlowgraphResponse{ BlockId: blockName, Pmt: message, } - if proto.Size(request) > 10485670 { - return nil, errors.New(fmt.Sprintf("Length of request message from Starcoder much bigger than expected. Block name: %v, Message size: %v", blockName, proto.Size(request))) + if proto.Size(response) > 10485670 { + return nil, errors.New(fmt.Sprintf("Length of request message from Starcoder much bigger than expected. Block name: %v, Message size: %v", blockName, proto.Size(response))) } - return request, nil + return response, nil } From d0df95712b2dea7821ace45cbd751e729cb4c5b5 Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Tue, 11 Sep 2018 14:57:58 +0900 Subject: [PATCH 25/26] change signature of pmt_to_proto --- gr-starcoder/lib/enqueue_message_sink_impl.cc | 4 +- gr-starcoder/lib/pmt_to_proto.cc | 64 ++++++++++--------- gr-starcoder/lib/pmt_to_proto.h | 3 +- 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/gr-starcoder/lib/enqueue_message_sink_impl.cc b/gr-starcoder/lib/enqueue_message_sink_impl.cc index 6a31372a..3becee74 100644 --- a/gr-starcoder/lib/enqueue_message_sink_impl.cc +++ b/gr-starcoder/lib/enqueue_message_sink_impl.cc @@ -53,9 +53,7 @@ enqueue_message_sink_impl::~enqueue_message_sink_impl() {} void enqueue_message_sink_impl::handler(pmt::pmt_t msg) { if (string_queue_ != NULL) { - ::starcoder::BlockMessage grpc_pmt; - convert_pmt_to_proto(msg, &grpc_pmt); - std::string serialized = grpc_pmt.SerializeAsString(); + std::string serialized = convert_pmt_to_proto(msg).SerializeAsString(); if (serialized.length() > 10485760) { GR_LOG_ERROR(d_logger, diff --git a/gr-starcoder/lib/pmt_to_proto.cc b/gr-starcoder/lib/pmt_to_proto.cc index 306c0ad3..d6c1ce1b 100644 --- a/gr-starcoder/lib/pmt_to_proto.cc +++ b/gr-starcoder/lib/pmt_to_proto.cc @@ -137,59 +137,65 @@ void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg, } } -void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, - starcoder::BlockMessage *proto_msg) { +starcoder::BlockMessage convert_pmt_to_proto(const pmt::pmt_t &pmt_msg) { + starcoder::BlockMessage proto_msg; if (pmt::is_blob(pmt_msg)) { - proto_msg->set_blob_value(pmt::blob_data(pmt_msg), - pmt::blob_length(pmt_msg)); + proto_msg.set_blob_value(pmt::blob_data(pmt_msg), + pmt::blob_length(pmt_msg)); } else if (pmt::is_uniform_vector(pmt_msg)) { - starcoder::UniformVector *uni_vector = - proto_msg->mutable_uniform_vector_value(); - convert_proto_uniform_vector(pmt_msg, uni_vector); + convert_proto_uniform_vector(pmt_msg, + proto_msg.mutable_uniform_vector_value()); } else if (pmt::is_bool(pmt_msg)) { - proto_msg->set_boolean_value(pmt::to_bool(pmt_msg)); + proto_msg.set_boolean_value(pmt::to_bool(pmt_msg)); } else if (pmt::is_symbol(pmt_msg)) { - proto_msg->set_symbol_value(pmt::symbol_to_string(pmt_msg)); + proto_msg.set_symbol_value(pmt::symbol_to_string(pmt_msg)); } else if (pmt::is_integer(pmt_msg)) { - proto_msg->set_integer_value(pmt::to_long(pmt_msg)); + proto_msg.set_integer_value(pmt::to_long(pmt_msg)); } else if (pmt::is_uint64(pmt_msg)) { - proto_msg->set_integer_value(pmt::to_uint64(pmt_msg)); + proto_msg.set_integer_value(pmt::to_uint64(pmt_msg)); } else if (pmt::is_real(pmt_msg)) { - proto_msg->set_double_value(pmt::to_double(pmt_msg)); + proto_msg.set_double_value(pmt::to_double(pmt_msg)); } else if (pmt::is_complex(pmt_msg)) { std::complex val = pmt::to_complex(pmt_msg); - starcoder::Complex *complex = proto_msg->mutable_complex_value(); + starcoder::Complex *complex = proto_msg.mutable_complex_value(); complex->set_real_value(val.real()); complex->set_imaginary_value(val.imag()); } else if (pmt::is_pair(pmt_msg)) { - starcoder::Pair *pair = proto_msg->mutable_pair_value(); - starcoder::BlockMessage *car = pair->mutable_car(); - starcoder::BlockMessage *cdr = pair->mutable_cdr(); - convert_pmt_to_proto(pmt::car(pmt_msg), car); - convert_pmt_to_proto(pmt::cdr(pmt_msg), cdr); + starcoder::Pair *pair = proto_msg.mutable_pair_value(); + starcoder::BlockMessage car = convert_pmt_to_proto(pmt::car(pmt_msg)); + starcoder::BlockMessage cdr = convert_pmt_to_proto(pmt::cdr(pmt_msg)); + pair->mutable_car()->Swap(&car); + pair->mutable_cdr()->Swap(&cdr); } else if (pmt::is_tuple(pmt_msg)) { - starcoder::List *list = proto_msg->mutable_list_value(); + starcoder::List *list = proto_msg.mutable_list_value(); list->set_type(starcoder::List::TUPLE); for (int i = 0; i < pmt::length(pmt_msg); i++) { - starcoder::BlockMessage *element = list->add_value(); - convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i), element); + starcoder::BlockMessage element = + convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i)); + list->add_value()->Swap(&element); } } else if (pmt::is_vector(pmt_msg)) { - starcoder::List *list = proto_msg->mutable_list_value(); + starcoder::List *list = proto_msg.mutable_list_value(); list->set_type(starcoder::List::VECTOR); for (int i = 0; i < pmt::length(pmt_msg); i++) { - starcoder::BlockMessage *element = list->add_value(); - convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i), element); + starcoder::BlockMessage element = + convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i)); + list->add_value()->Swap(&element); } } else if (pmt::is_dict(pmt_msg)) { - starcoder::Dict *dict = proto_msg->mutable_dict_value(); + starcoder::Dict *dict = proto_msg.mutable_dict_value(); pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg); for (int i = 0; i < pmt::length(key_value_pairs_list); i++) { starcoder::Dict_Entry *entry = dict->add_entry(); - starcoder::BlockMessage *key = entry->mutable_key(); - starcoder::BlockMessage *value = entry->mutable_value(); - convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)), key); - convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)), value); + + starcoder::BlockMessage key = + convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list))); + starcoder::BlockMessage value = + convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list))); + + entry->mutable_key()->Swap(&key); + entry->mutable_value()->Swap(&value); } } + return proto_msg; } diff --git a/gr-starcoder/lib/pmt_to_proto.h b/gr-starcoder/lib/pmt_to_proto.h index 47a09f54..23f8dabb 100644 --- a/gr-starcoder/lib/pmt_to_proto.h +++ b/gr-starcoder/lib/pmt_to_proto.h @@ -24,7 +24,6 @@ #include #include "starcoder.pb.h" -void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg, - starcoder::BlockMessage *proto_msg); +starcoder::BlockMessage convert_pmt_to_proto(const pmt::pmt_t &pmt_msg); #endif /* INCLUDED_PMT_TO_PROTO_H */ From a9b44137cdba27b0914255b7e3e6d144f5f4ec4e Mon Sep 17 00:00:00 2001 From: Reiichiro Nakano Date: Tue, 11 Sep 2018 15:10:30 +0900 Subject: [PATCH 26/26] changes to test.grc --- flowgraphs/test.grc | 2 +- sampleclient/main.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flowgraphs/test.grc b/flowgraphs/test.grc index 83394a3c..0767f16e 100644 --- a/flowgraphs/test.grc +++ b/flowgraphs/test.grc @@ -430,7 +430,7 @@ msg - pmt.make_u8vector(6, ord('b')) + pmt.make_c64vector(2, -7+3.2j) minoutbuf diff --git a/sampleclient/main.go b/sampleclient/main.go index 56d94d9f..7cfe44b3 100644 --- a/sampleclient/main.go +++ b/sampleclient/main.go @@ -22,13 +22,13 @@ package main import ( "context" "fmt" + "github.com/gogo/protobuf/proto" pb "github.com/infostellarinc/starcoder/api" "google.golang.org/grpc" "io" "io/ioutil" "log" "time" - "github.com/gogo/protobuf/proto" ) func main() { @@ -53,7 +53,7 @@ func main() { log.Println("error receiving!") log.Fatalf("%v", err) } - if len(r.GetPayload()) > 20 || proto.Size(r.GetPmt()) > 20 { + if len(r.GetPayload()) > 50 || proto.Size(r.GetPmt()) > 50 { log.Println(r.GetBlockId(), len(r.GetPayload()), proto.Size(r.GetPmt())) } else { log.Println(r.GetBlockId(), r.GetPayload(), r.GetPmt()) @@ -84,7 +84,7 @@ func main() { time.Sleep(1 * time.Second) commandReq := &pb.SendCommandRequest{ BlockId: "starcoder_command_source_0", - Pmt: constructPDU(), + Pmt: constructPDU(), } req = &pb.RunFlowgraphRequest{ Request: &pb.RunFlowgraphRequest_SendCommandRequest{ @@ -97,7 +97,7 @@ func main() { time.Sleep(4 * time.Second) commandReq = &pb.SendCommandRequest{ BlockId: "starcoder_command_source_1", - Pmt: constructPDU(), + Pmt: constructPDU(), } req = &pb.RunFlowgraphRequest{ Request: &pb.RunFlowgraphRequest_SendCommandRequest{