Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality so Starcoder can send back arbitrary PMTs, not just blobs #78

Merged
merged 31 commits into from
Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
93c33b5
rename convert_pmt_proto to convert_proto_to_pmt
Aug 29, 2018
68e50c0
init commit of pmt_to_proto
Aug 29, 2018
3b353b5
finish pmt_to_proto?
Aug 29, 2018
c7b6653
clang
Aug 29, 2018
044f5ad
update proto
Aug 29, 2018
c873e2d
convert in enqueue_message_sink
Aug 29, 2018
e62c48d
update starcoder to parse the protobufs
Aug 30, 2018
dc1308a
modify waterfall plotter to pass up protobuf
Aug 30, 2018
2efc6e4
update noaa
Aug 30, 2018
359b1c1
introduce blob_value
Aug 30, 2018
f8c4b65
add blob to proto_to_pmt
Aug 30, 2018
2c33d2e
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Aug 30, 2018
a610968
test for proto_to_pmt
Aug 30, 2018
dc5fc29
use pmt in meteor decoder
Aug 30, 2018
044768e
experimental drop payload
Aug 30, 2018
e4fbdbc
fix closing
Aug 30, 2018
bf3e2ee
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Aug 31, 2018
c6e7ab7
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Sep 7, 2018
ad27a4d
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Sep 10, 2018
db77770
Merge remote-tracking branch 'upstream/master' into convert-pmt-to-proto
Sep 10, 2018
df2b028
check if queue is closed() to exit goroutine
Sep 10, 2018
5686c67
revert
Sep 10, 2018
1215f26
factor out function
Sep 10, 2018
8c1522d
dont expose in header file
Sep 10, 2018
26a5a50
use back inserter
Sep 10, 2018
55f1eea
use repeatedptrfieldbackinsertiterator
Sep 10, 2018
d9d22b9
clang
Sep 10, 2018
3e95525
use std::copy
Sep 11, 2018
c2b222e
rename request to respose
Sep 11, 2018
d0df957
change signature of pmt_to_proto
Sep 11, 2018
a9b4413
changes to test.grc
Sep 11, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/starcoder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ message BlockMessage {
UniformVector uniform_vector_value = 9;

Dict dict_value = 10;

bytes blob_value = 11;
}
}

Expand Down Expand Up @@ -203,7 +205,10 @@ message RunFlowgraphResponse {
string block_id = 1;

// Serialized arbitrary PMT (GNURadio Polymorphic Message Type)
bytes payload = 2;
bytes payload = 2 [deprecated=true];

// PMT (GNURadio Polymorphic Message Type) response sent from the block
BlockMessage pmt = 3;
}

// Complex number
Expand Down
4 changes: 4 additions & 0 deletions cqueue/c_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (q *CStringQueue) Close() {
q.queue.Close()
}

func (q *CStringQueue) Closed() bool {
return q.queue.Closed()
}

func (q *CStringQueue) Push(str string) {
q.queue.Push(str)
}
Expand Down
5 changes: 5 additions & 0 deletions cqueue/string_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ void string_queue::close() {
condition_var_.notify_one();
}

bool string_queue::closed() {
std::unique_lock<std::mutex> lock(mutex_);
return closed_;
}

uint64_t string_queue::get_ptr() const {
return reinterpret_cast<uint64_t>(this);
}
Expand Down
1 change: 1 addition & 0 deletions cqueue/string_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class string_queue {
std::string blocking_pop();
unsigned long get_ptr() const;
void close();
bool closed();
reiinakano marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Make this uint64_t
static string_queue *queue_from_pointer(unsigned long long ptr);
private:
Expand Down
1 change: 1 addition & 0 deletions gr-starcoder/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ list(APPEND starcoder_sources
${CMAKE_CURRENT_SOURCE_DIR}/../../cqueue/string_queue.cc
${hw_proto_srcs}
proto_to_pmt.cc
pmt_to_proto.cc
init.c
driver.c
firmware.c
Expand Down
2 changes: 1 addition & 1 deletion gr-starcoder/lib/command_source_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void command_source_impl::readloop() {
GR_LOG_WARN(d_logger, "Failed to deserialize gRPC");
continue;
}
pmt::pmt_t m = convert_pmt_proto(grpc_pmt);
pmt::pmt_t m = convert_proto_to_pmt(grpc_pmt);
message_port_pub(port_, m);
}
}
Expand Down
7 changes: 6 additions & 1 deletion gr-starcoder/lib/enqueue_message_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <gnuradio/io_signature.h>
#include "enqueue_message_sink_impl.h"

#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {

Expand All @@ -51,7 +53,10 @@ enqueue_message_sink_impl::~enqueue_message_sink_impl() {}

void enqueue_message_sink_impl::handler(pmt::pmt_t msg) {
if (string_queue_ != NULL) {
std::string serialized = pmt::serialize_str(msg);
::starcoder::BlockMessage grpc_pmt;
convert_pmt_to_proto(msg, &grpc_pmt);
std::string serialized = grpc_pmt.SerializeAsString();

if (serialized.length() > 10485760) {
GR_LOG_ERROR(d_logger,
boost::format("Received large packet of length %d in "
Expand Down
26 changes: 21 additions & 5 deletions gr-starcoder/lib/meteor_decoder_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "meteor/meteor_decoder.h"
#include "meteor/meteor_packet.h"

#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {

Expand Down Expand Up @@ -94,7 +96,8 @@ bool meteor_decoder_sink_impl::stop() {
int ok = 0;
while (decoder.pos() < total_size_ - meteor::SOFT_FRAME_LEN) {
total++;
bool res = decoder.decode_one_frame(raw.get(), total_size_, error_corrected_data.get());
bool res = decoder.decode_one_frame(raw.get(), total_size_,
error_corrected_data.get());
if (res) {
ok++;
std::cout << std::dec << 100. * decoder.pos() / total_size_ << "% "
Expand All @@ -114,10 +117,23 @@ bool meteor_decoder_sink_impl::stop() {
std::string png_b = packeter.dump_gray_image(meteor::BLUE_APID);

if (string_queue_ != NULL) {
if (!png_img.empty()) string_queue_->push(png_img);
if (!png_r.empty()) string_queue_->push(png_r);
if (!png_g.empty()) string_queue_->push(png_g);
if (!png_b.empty()) string_queue_->push(png_b);
::starcoder::BlockMessage grpc_pmt;
if (!png_img.empty()) {
grpc_pmt.set_blob_value(png_img);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_r.empty()) {
grpc_pmt.set_blob_value(png_r);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_g.empty()) {
grpc_pmt.set_blob_value(png_g);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_b.empty()) {
grpc_pmt.set_blob_value(png_b);
string_queue_->push(grpc_pmt.SerializeAsString());
}
}

if (!filename_.empty()) {
Expand Down
10 changes: 8 additions & 2 deletions gr-starcoder/lib/noaa_apt_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <cmath>

#include "gil_util.h"
#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {
Expand Down Expand Up @@ -128,9 +129,14 @@ void noaa_apt_sink_impl::write_image(std::string filename) {
if (!d_flip)
image_string = (store_gray_to_png_string(image_received_view_));
else
image_string = store_gray_to_png_string(flipped_up_down_view(image_received_view_));
image_string =
store_gray_to_png_string(flipped_up_down_view(image_received_view_));

if (!image_string.empty()) string_queue_->push(image_string);
if (!image_string.empty()) {
::starcoder::BlockMessage grpc_pmt;
grpc_pmt.set_blob_value(image_string);
string_queue_->push(grpc_pmt.SerializeAsString());
}
}
}

Expand Down
171 changes: 171 additions & 0 deletions gr-starcoder/lib/pmt_to_proto.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/* -*- c++ -*- */
/*
* Copyright 2018 Infostellar, Inc.
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3, or (at your option)
* any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; see the file COPYING. If not, write to
* the Free Software Foundation, Inc., 51 Franklin Street,
* Boston, MA 02110-1301, USA.
*/

#include "pmt_to_proto.h"

void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg,
starcoder::BlockMessage *proto_msg) {
if (pmt::is_blob(pmt_msg)) {
proto_msg->set_blob_value(pmt::blob_data(pmt_msg),
pmt::blob_length(pmt_msg));
} else if (pmt::is_uniform_vector(pmt_msg)) {
starcoder::UniformVector *uni_vector =
proto_msg->mutable_uniform_vector_value();
convert_proto_uniform_vector(pmt_msg, uni_vector);
} else if (pmt::is_bool(pmt_msg)) {
proto_msg->set_boolean_value(pmt::to_bool(pmt_msg));
} else if (pmt::is_symbol(pmt_msg)) {
proto_msg->set_symbol_value(pmt::symbol_to_string(pmt_msg));
} else if (pmt::is_integer(pmt_msg)) {
proto_msg->set_integer_value(pmt::to_long(pmt_msg));
} else if (pmt::is_uint64(pmt_msg)) {
proto_msg->set_integer_value(pmt::to_uint64(pmt_msg));
} else if (pmt::is_real(pmt_msg)) {
proto_msg->set_double_value(pmt::to_double(pmt_msg));
} else if (pmt::is_complex(pmt_msg)) {
std::complex<double> val = pmt::to_complex(pmt_msg);
starcoder::Complex *complex = proto_msg->mutable_complex_value();
complex->set_real_value(val.real());
complex->set_imaginary_value(val.imag());
} else if (pmt::is_pair(pmt_msg)) {
starcoder::Pair *pair = proto_msg->mutable_pair_value();
starcoder::BlockMessage *car = pair->mutable_car();
starcoder::BlockMessage *cdr = pair->mutable_cdr();
convert_pmt_to_proto(pmt::car(pmt_msg), car);
convert_pmt_to_proto(pmt::cdr(pmt_msg), cdr);
} else if (pmt::is_tuple(pmt_msg)) {
starcoder::List *list = proto_msg->mutable_list_value();
list->set_type(starcoder::List::TUPLE);
for (int i = 0; i < pmt::length(pmt_msg); i++) {
starcoder::BlockMessage *element = list->add_value();
convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i), element);
}
} else if (pmt::is_vector(pmt_msg)) {
starcoder::List *list = proto_msg->mutable_list_value();
list->set_type(starcoder::List::VECTOR);
for (int i = 0; i < pmt::length(pmt_msg); i++) {
starcoder::BlockMessage *element = list->add_value();
convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i), element);
}
} else if (pmt::is_dict(pmt_msg)) {
starcoder::Dict *dict = proto_msg->mutable_dict_value();
pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg);
for (int i = 0; i < pmt::length(key_value_pairs_list); i++) {
starcoder::Dict_Entry *entry = dict->add_entry();
starcoder::BlockMessage *key = entry->mutable_key();
starcoder::BlockMessage *value = entry->mutable_value();
convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)), key);
convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)), value);
}
}
}

void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg,
starcoder::UniformVector *uni_vector) {
if (pmt::is_u8vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size8);
const std::vector<uint8_t> vector_elements =
pmt::u8vector_elements(pmt_msg);
*u_vector->mutable_value() = { vector_elements.begin(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My C++ knowledge hasn't been update for C++11 and I really don't know how initializer expressions work... Can you explain how this assignment works? My reasoning is the right hand side calls RepeatedPtrField::RepeatedPtrField(Iter begin, const Iter & end) and then the whole content is copied for the another time, into the left hand side, which is redundant. You used transform and an inserter below. Why this piece of code follows a different pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to use std::copy. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍

vector_elements.end() };
} else if (pmt::is_s8vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size8);
const std::vector<int8_t> vector_elements = pmt::s8vector_elements(pmt_msg);
*i_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_u16vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size16);
const std::vector<uint16_t> vector_elements =
pmt::u16vector_elements(pmt_msg);
*u_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_s16vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size16);
const std::vector<int16_t> vector_elements =
pmt::s16vector_elements(pmt_msg);
*i_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_u32vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size32);
const std::vector<uint32_t> vector_elements =
pmt::u32vector_elements(pmt_msg);
*u_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_s32vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size32);
const std::vector<int32_t> vector_elements =
pmt::s32vector_elements(pmt_msg);
*i_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_u64vector(pmt_msg)) {
starcoder::U64Vector *u64_vector = uni_vector->mutable_u64_value();
const std::vector<uint64_t> vector_elements =
pmt::u64vector_elements(pmt_msg);
*u64_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_s64vector(pmt_msg)) {
starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value();
const std::vector<int64_t> vector_elements =
pmt::s64vector_elements(pmt_msg);
*i64_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_f32vector(pmt_msg)) {
starcoder::F32Vector *f32_vector = uni_vector->mutable_f32_value();
const std::vector<float> vector_elements = pmt::f32vector_elements(pmt_msg);
*f32_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_f64vector(pmt_msg)) {
starcoder::F64Vector *f64_vector = uni_vector->mutable_f64_value();
const std::vector<double> vector_elements =
pmt::f64vector_elements(pmt_msg);
*f64_vector->mutable_value() = { vector_elements.begin(),
vector_elements.end() };
} else if (pmt::is_c32vector(pmt_msg)) {
starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value();
const std::vector<std::complex<float>> vector_elements =
pmt::c32vector_elements(pmt_msg);
std::transform(vector_elements.begin(), vector_elements.end(),
c32_vector->mutable_value()->begin(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to use a back inserter? Is it guaranteed that c32_vector has enough buffer to store all transformed elements?
https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.repeated_field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I modified it. Could you check if I did it correctly? Sorry, I don't fully understand STL containers yet...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

STL algorithms write to output iterators. And a vector's iterator is just the address of underlying memory buffer. So outputting to an iterator is no more than writing to a memory buffer. It won't change the container size, it won't reallocate the memory buffer if there is enough room.

[](std::complex<float> c)->starcoder::Complex32 {
starcoder::Complex32 new_val;
new_val.set_real_value(c.real());
new_val.set_imaginary_value(c.imag());
return new_val;
});
} else if (pmt::is_c64vector(pmt_msg)) {
starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value();
const std::vector<std::complex<double>> vector_elements =
pmt::c64vector_elements(pmt_msg);
std::transform(vector_elements.begin(), vector_elements.end(),
c64_vector->mutable_value()->begin(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

[](std::complex<double> c)->starcoder::Complex {
starcoder::Complex new_val;
new_val.set_real_value(c.real());
new_val.set_imaginary_value(c.imag());
return new_val;
});
}
}
32 changes: 32 additions & 0 deletions gr-starcoder/lib/pmt_to_proto.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/* -*- c++ -*- */
/*
* Copyright 2018 Infostellar, Inc.
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3, or (at your option)
* any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; see the file COPYING. If not, write to
* the Free Software Foundation, Inc., 51 Franklin Street,
* Boston, MA 02110-1301, USA.
*/

#ifndef INCLUDED_PMT_TO_PROTO_H
#define INCLUDED_PMT_TO_PROTO_H

#include <pmt/pmt.h>
#include "starcoder.pb.h"

void convert_pmt_to_proto(const pmt::pmt_t &pmt_msg,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to pass BlockMessage back as a return value. I believe the compiler is smart enough to avoid copying an object when returning it (and actually this behavior is required by the language specification.)

Copy link
Contributor Author

@reiinakano reiinakano Sep 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I did first. The problem is the C++ generated protobuf has no way to set embedded message fields other than through getting a pointer to the embedded message field with the mutable_xxx() method and directly modifying that pointer (there is no set_xxx() method). See https://stackoverflow.com/questions/43268845/do-i-need-to-delete-objects-passed-to-google-protocol-buffer-protobuf. For the recursion to work, not sure if there's another way than having the function take a pointer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you can call Swap() on a mutable message. I haven't verified this API on my side and leave the decision up to you.
https://developers.google.com/protocol-buffers/docs/reference/cpp-generated#message

AMessageType parentMessage;

...
AMessageType submessage = convert_pmt_to_proto(...);
parentMessage.get_mutable_field()->Swap(&submessage);

....

return parentMessage;

Copy link
Contributor Author

@reiinakano reiinakano Sep 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this would work. Is there a specific reason to prefer returning the BlockMessage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more or less a matter of personal preference so I left it up to you. But do you think it is more straightforward to return a value via a pointer argument than to use a return value? You wouldn't do it if you are programming in another language.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for reference, found these about the protobuf move semantics. Not sure if copy when reallocating affects us

protocolbuffers/protobuf#2791
protocolbuffers/protobuf#3630

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI. No explicit move is needed in this case. Its optimization for returning a local variable from a function.
https://gist.github.com/kenichi-fukushima/197238e0cd23b82666a27abfa5fbd540

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kenichi-fukushima Converted this to your suggested function signature. It does seem more intuitive to use.

starcoder::BlockMessage *proto_msg);
void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this function is only used by convert_pmt_to_proto and does not need to be exposes in a header file.

starcoder::UniformVector *uni_vector);

#endif /* INCLUDED_PMT_TO_PROTO_H */
Loading