Skip to content

Commit

Permalink
Add functionality so Starcoder can send back arbitrary PMTs, not just…
Browse files Browse the repository at this point in the history
… blobs (#78)

* rename convert_pmt_proto to convert_proto_to_pmt

* init commit of pmt_to_proto

* finish pmt_to_proto?

* clang

* update proto

* convert in enqueue_message_sink

* update starcoder to parse the protobufs

* modify waterfall plotter to pass up protobuf

* update noaa

* introduce blob_value

* add blob to proto_to_pmt

* test for proto_to_pmt

* use pmt in meteor decoder

* experimental drop payload

* fix closing

* check if queue is closed() to exit goroutine

* revert

* factor out function

* dont expose in header file

* use back inserter

* use repeatedptrfieldbackinsertiterator

* clang

* use std::copy

* rename request to respose

* change signature of pmt_to_proto

* changes to test.grc
  • Loading branch information
reiinakano authored Sep 11, 2018
1 parent 5f9406a commit 7c119d8
Show file tree
Hide file tree
Showing 19 changed files with 468 additions and 153 deletions.
7 changes: 6 additions & 1 deletion api/starcoder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ message BlockMessage {
UniformVector uniform_vector_value = 9;

Dict dict_value = 10;

bytes blob_value = 11;
}
}

Expand Down Expand Up @@ -203,7 +205,10 @@ message RunFlowgraphResponse {
string block_id = 1;

// Serialized arbitrary PMT (GNURadio Polymorphic Message Type)
bytes payload = 2;
bytes payload = 2 [deprecated=true];

// PMT (GNURadio Polymorphic Message Type) response sent from the block
BlockMessage pmt = 3;
}

// Complex number
Expand Down
4 changes: 4 additions & 0 deletions cqueue/c_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (q *CStringQueue) Close() {
q.queue.Close()
}

func (q *CStringQueue) Closed() bool {
return q.queue.Closed()
}

func (q *CStringQueue) Push(str string) {
q.queue.Push(str)
}
Expand Down
5 changes: 5 additions & 0 deletions cqueue/string_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ void string_queue::close() {
condition_var_.notify_one();
}

bool string_queue::closed() {
std::unique_lock<std::mutex> lock(mutex_);
return closed_;
}

uint64_t string_queue::get_ptr() const {
return reinterpret_cast<uint64_t>(this);
}
Expand Down
1 change: 1 addition & 0 deletions cqueue/string_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class string_queue {
std::string blocking_pop();
unsigned long get_ptr() const;
void close();
bool closed();
// TODO: Make this uint64_t
static string_queue *queue_from_pointer(unsigned long long ptr);
private:
Expand Down
2 changes: 1 addition & 1 deletion flowgraphs/test.grc
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@
</param>
<param>
<key>msg</key>
<value>pmt.make_u8vector(6, ord('b'))</value>
<value>pmt.make_c64vector(2, -7+3.2j)</value>
</param>
<param>
<key>minoutbuf</key>
Expand Down
1 change: 1 addition & 0 deletions gr-starcoder/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ list(APPEND starcoder_sources
${CMAKE_CURRENT_SOURCE_DIR}/../../cqueue/string_queue.cc
${hw_proto_srcs}
proto_to_pmt.cc
pmt_to_proto.cc
init.c
driver.c
firmware.c
Expand Down
2 changes: 1 addition & 1 deletion gr-starcoder/lib/command_source_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void command_source_impl::readloop() {
GR_LOG_WARN(d_logger, "Failed to deserialize gRPC");
continue;
}
pmt::pmt_t m = convert_pmt_proto(grpc_pmt);
pmt::pmt_t m = convert_proto_to_pmt(grpc_pmt);
message_port_pub(port_, m);
}
}
Expand Down
5 changes: 4 additions & 1 deletion gr-starcoder/lib/enqueue_message_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <gnuradio/io_signature.h>
#include "enqueue_message_sink_impl.h"

#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {

Expand All @@ -51,7 +53,8 @@ enqueue_message_sink_impl::~enqueue_message_sink_impl() {}

void enqueue_message_sink_impl::handler(pmt::pmt_t msg) {
if (string_queue_ != NULL) {
std::string serialized = pmt::serialize_str(msg);
std::string serialized = convert_pmt_to_proto(msg).SerializeAsString();

if (serialized.length() > 10485760) {
GR_LOG_ERROR(d_logger,
boost::format("Received large packet of length %d in "
Expand Down
26 changes: 21 additions & 5 deletions gr-starcoder/lib/meteor_decoder_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
#include "meteor/meteor_decoder.h"
#include "meteor/meteor_packet.h"

#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {

Expand Down Expand Up @@ -94,7 +96,8 @@ bool meteor_decoder_sink_impl::stop() {
int ok = 0;
while (decoder.pos() < total_size_ - meteor::SOFT_FRAME_LEN) {
total++;
bool res = decoder.decode_one_frame(raw.get(), total_size_, error_corrected_data.get());
bool res = decoder.decode_one_frame(raw.get(), total_size_,
error_corrected_data.get());
if (res) {
ok++;
std::cout << std::dec << 100. * decoder.pos() / total_size_ << "% "
Expand All @@ -114,10 +117,23 @@ bool meteor_decoder_sink_impl::stop() {
std::string png_b = packeter.dump_gray_image(meteor::BLUE_APID);

if (string_queue_ != NULL) {
if (!png_img.empty()) string_queue_->push(png_img);
if (!png_r.empty()) string_queue_->push(png_r);
if (!png_g.empty()) string_queue_->push(png_g);
if (!png_b.empty()) string_queue_->push(png_b);
::starcoder::BlockMessage grpc_pmt;
if (!png_img.empty()) {
grpc_pmt.set_blob_value(png_img);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_r.empty()) {
grpc_pmt.set_blob_value(png_r);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_g.empty()) {
grpc_pmt.set_blob_value(png_g);
string_queue_->push(grpc_pmt.SerializeAsString());
}
if (!png_b.empty()) {
grpc_pmt.set_blob_value(png_b);
string_queue_->push(grpc_pmt.SerializeAsString());
}
}

if (!filename_.empty()) {
Expand Down
10 changes: 8 additions & 2 deletions gr-starcoder/lib/noaa_apt_sink_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include <cmath>

#include "gil_util.h"
#include "pmt_to_proto.h"

namespace gr {
namespace starcoder {
Expand Down Expand Up @@ -128,9 +129,14 @@ void noaa_apt_sink_impl::write_image(std::string filename) {
if (!d_flip)
image_string = (store_gray_to_png_string(image_received_view_));
else
image_string = store_gray_to_png_string(flipped_up_down_view(image_received_view_));
image_string =
store_gray_to_png_string(flipped_up_down_view(image_received_view_));

if (!image_string.empty()) string_queue_->push(image_string);
if (!image_string.empty()) {
::starcoder::BlockMessage grpc_pmt;
grpc_pmt.set_blob_value(image_string);
string_queue_->push(grpc_pmt.SerializeAsString());
}
}
}

Expand Down
201 changes: 201 additions & 0 deletions gr-starcoder/lib/pmt_to_proto.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/* -*- c++ -*- */
/*
* Copyright 2018 Infostellar, Inc.
*
* This is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3, or (at your option)
* any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this software; see the file COPYING. If not, write to
* the Free Software Foundation, Inc., 51 Franklin Street,
* Boston, MA 02110-1301, USA.
*/

#include "pmt_to_proto.h"

void convert_proto_uniform_vector(const pmt::pmt_t &pmt_msg,
starcoder::UniformVector *uni_vector) {
if (pmt::is_u8vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size8);
const std::vector<uint8_t> vector_elements =
pmt::u8vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint32_t>(
u_vector->mutable_value()));
} else if (pmt::is_s8vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size8);
const std::vector<int8_t> vector_elements = pmt::s8vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int32_t>(
i_vector->mutable_value()));
} else if (pmt::is_u16vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size16);
const std::vector<uint16_t> vector_elements =
pmt::u16vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint32_t>(
u_vector->mutable_value()));
} else if (pmt::is_s16vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size16);
const std::vector<int16_t> vector_elements =
pmt::s16vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int32_t>(
i_vector->mutable_value()));
} else if (pmt::is_u32vector(pmt_msg)) {
starcoder::UVector *u_vector = uni_vector->mutable_u_value();
u_vector->set_size(starcoder::IntSize::Size32);
const std::vector<uint32_t> vector_elements =
pmt::u32vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint32_t>(
u_vector->mutable_value()));
} else if (pmt::is_s32vector(pmt_msg)) {
starcoder::IVector *i_vector = uni_vector->mutable_i_value();
i_vector->set_size(starcoder::IntSize::Size32);
const std::vector<int32_t> vector_elements =
pmt::s32vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int32_t>(
i_vector->mutable_value()));
} else if (pmt::is_u64vector(pmt_msg)) {
starcoder::U64Vector *u64_vector = uni_vector->mutable_u64_value();
const std::vector<uint64_t> vector_elements =
pmt::u64vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<uint64_t>(
u64_vector->mutable_value()));
} else if (pmt::is_s64vector(pmt_msg)) {
starcoder::I64Vector *i64_vector = uni_vector->mutable_i64_value();
const std::vector<int64_t> vector_elements =
pmt::s64vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<int64_t>(
i64_vector->mutable_value()));
} else if (pmt::is_f32vector(pmt_msg)) {
starcoder::F32Vector *f32_vector = uni_vector->mutable_f32_value();
const std::vector<float> vector_elements = pmt::f32vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<float>(
f32_vector->mutable_value()));
} else if (pmt::is_f64vector(pmt_msg)) {
starcoder::F64Vector *f64_vector = uni_vector->mutable_f64_value();
const std::vector<double> vector_elements =
pmt::f64vector_elements(pmt_msg);
std::copy(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedFieldBackInsertIterator<double>(
f64_vector->mutable_value()));
} else if (pmt::is_c32vector(pmt_msg)) {
starcoder::C32Vector *c32_vector = uni_vector->mutable_c32_value();
const std::vector<std::complex<float>> vector_elements =
pmt::c32vector_elements(pmt_msg);
std::transform(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedPtrFieldBackInsertIterator<
starcoder::Complex32>(c32_vector->mutable_value()),
[](std::complex<float> c)->starcoder::Complex32 {
starcoder::Complex32 new_val;
new_val.set_real_value(c.real());
new_val.set_imaginary_value(c.imag());
return new_val;
});
} else if (pmt::is_c64vector(pmt_msg)) {
starcoder::C64Vector *c64_vector = uni_vector->mutable_c64_value();
const std::vector<std::complex<double>> vector_elements =
pmt::c64vector_elements(pmt_msg);
std::transform(
vector_elements.begin(), vector_elements.end(),
google::protobuf::internal::RepeatedPtrFieldBackInsertIterator<
starcoder::Complex>(c64_vector->mutable_value()),
[](std::complex<double> c)->starcoder::Complex {
starcoder::Complex new_val;
new_val.set_real_value(c.real());
new_val.set_imaginary_value(c.imag());
return new_val;
});
}
}

starcoder::BlockMessage convert_pmt_to_proto(const pmt::pmt_t &pmt_msg) {
starcoder::BlockMessage proto_msg;
if (pmt::is_blob(pmt_msg)) {
proto_msg.set_blob_value(pmt::blob_data(pmt_msg),
pmt::blob_length(pmt_msg));
} else if (pmt::is_uniform_vector(pmt_msg)) {
convert_proto_uniform_vector(pmt_msg,
proto_msg.mutable_uniform_vector_value());
} else if (pmt::is_bool(pmt_msg)) {
proto_msg.set_boolean_value(pmt::to_bool(pmt_msg));
} else if (pmt::is_symbol(pmt_msg)) {
proto_msg.set_symbol_value(pmt::symbol_to_string(pmt_msg));
} else if (pmt::is_integer(pmt_msg)) {
proto_msg.set_integer_value(pmt::to_long(pmt_msg));
} else if (pmt::is_uint64(pmt_msg)) {
proto_msg.set_integer_value(pmt::to_uint64(pmt_msg));
} else if (pmt::is_real(pmt_msg)) {
proto_msg.set_double_value(pmt::to_double(pmt_msg));
} else if (pmt::is_complex(pmt_msg)) {
std::complex<double> val = pmt::to_complex(pmt_msg);
starcoder::Complex *complex = proto_msg.mutable_complex_value();
complex->set_real_value(val.real());
complex->set_imaginary_value(val.imag());
} else if (pmt::is_pair(pmt_msg)) {
starcoder::Pair *pair = proto_msg.mutable_pair_value();
starcoder::BlockMessage car = convert_pmt_to_proto(pmt::car(pmt_msg));
starcoder::BlockMessage cdr = convert_pmt_to_proto(pmt::cdr(pmt_msg));
pair->mutable_car()->Swap(&car);
pair->mutable_cdr()->Swap(&cdr);
} else if (pmt::is_tuple(pmt_msg)) {
starcoder::List *list = proto_msg.mutable_list_value();
list->set_type(starcoder::List::TUPLE);
for (int i = 0; i < pmt::length(pmt_msg); i++) {
starcoder::BlockMessage element =
convert_pmt_to_proto(pmt::tuple_ref(pmt_msg, i));
list->add_value()->Swap(&element);
}
} else if (pmt::is_vector(pmt_msg)) {
starcoder::List *list = proto_msg.mutable_list_value();
list->set_type(starcoder::List::VECTOR);
for (int i = 0; i < pmt::length(pmt_msg); i++) {
starcoder::BlockMessage element =
convert_pmt_to_proto(pmt::vector_ref(pmt_msg, i));
list->add_value()->Swap(&element);
}
} else if (pmt::is_dict(pmt_msg)) {
starcoder::Dict *dict = proto_msg.mutable_dict_value();
pmt::pmt_t key_value_pairs_list = pmt::dict_items(pmt_msg);
for (int i = 0; i < pmt::length(key_value_pairs_list); i++) {
starcoder::Dict_Entry *entry = dict->add_entry();

starcoder::BlockMessage key =
convert_pmt_to_proto(pmt::car(pmt::nth(i, key_value_pairs_list)));
starcoder::BlockMessage value =
convert_pmt_to_proto(pmt::cdr(pmt::nth(i, key_value_pairs_list)));

entry->mutable_key()->Swap(&key);
entry->mutable_value()->Swap(&value);
}
}
return proto_msg;
}
Loading

0 comments on commit 7c119d8

Please sign in to comment.