Skip to content

Commit

Permalink
Merge pull request #258 from Rblp/fix-subscribe
Browse files Browse the repository at this point in the history
Fix subscribe
  • Loading branch information
eddelbuettel authored Jan 20, 2018
2 parents 6966507 + 7281c2f commit 2ab9e0e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: Rblpapi
Title: R Interface to 'Bloomberg'
Version: 0.3.6.2
Version: 0.3.6.3
Date: 2017-07-01
Maintainer: Dirk Eddelbuettel <[email protected]>
Author: Whit Armstrong, Dirk Eddelbuettel and John Laing
Expand Down
52 changes: 28 additions & 24 deletions src/subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ SEXP eleToDatetime(const Element& e) {
}

SEXP eleToArray(const Element& e) {
if(e.isNull()) { R_NilValue; }
SEXP ans;
if(e.isNull()) { return R_NilValue; }
switch(e.datatype()) {
case BLPAPI_DATATYPE_BOOL:
return eleToLogical(e);
Expand Down Expand Up @@ -189,9 +188,8 @@ SEXP subscribe_Impl(SEXP con_, std::vector<std::string> securities, std::vector<
options_collapsed = vectorToCSVString(options);
}

for(const std::string& security : securities) {
CorrelationId cid(reinterpret_cast<void*>(const_cast<char *>(security.c_str())));
subscriptions.add(security.c_str(),fields_collapsed.c_str(),options_collapsed.c_str(),cid);
for(size_t i = 0; i < securities.size(); ++i) {
subscriptions.add(securities[i].c_str(),fields_collapsed.c_str(),options_collapsed.c_str(),CorrelationId(i));
}

// check if identity was passed, if so, use it
Expand All @@ -203,28 +201,34 @@ SEXP subscribe_Impl(SEXP con_, std::vector<std::string> securities, std::vector<
session->subscribe(subscriptions);
}

while (true) {
Event event = session->nextEvent();
Rcpp::checkUserInterrupt();
MessageIterator msgIter(event);
while (msgIter.next()) {
Message msg = msgIter.message();
const char *topic = (char *)msg.correlationId().asPointer();
if (event.eventType() == Event::SUBSCRIPTION_STATUS ||
event.eventType() == Event::SUBSCRIPTION_DATA) {

Rcpp::List ans;
auto it = BlpapiEventToString.find(event.eventType());
if(it==BlpapiEventToString.end()) {
throw Rcpp::exception("Unknown event type.");
try {
while (true) {
Event event = session->nextEvent();
Rcpp::checkUserInterrupt();
MessageIterator msgIter(event);
while (msgIter.next()) {
Message msg = msgIter.message();
if (event.eventType() == Event::SUBSCRIPTION_STATUS ||
event.eventType() == Event::SUBSCRIPTION_DATA) {

Rcpp::List ans;
auto it = BlpapiEventToString.find(event.eventType());
if(it==BlpapiEventToString.end()) {
throw Rcpp::exception("Unknown event type.");
}
ans["event.type"] = it->second;
size_t cid(msg.correlationId().asInteger());
if(cid >= 0 && cid < securities.size()) {
ans["topic"] = securities[cid];
}
ans["data"] = recursiveParse(msg.asElement());
// call user function
fun(ans);
}
ans["event.type"] = it->second;
ans["topic"] = topic;
ans["data"] = recursiveParse(msg.asElement());
// call user function
fun(ans);
}
}
} catch (const Rcpp::internal::InterruptedException& e) {
session->unsubscribe(subscriptions);
}
return R_NilValue;
}

0 comments on commit 2ab9e0e

Please sign in to comment.