Skip to content

Commit

Permalink
Merge pull request PaddlePaddle#46 from ChunweiYan/feature/support_we…
Browse files Browse the repository at this point in the history
…b_server_storage_read
  • Loading branch information
Superjomn authored Dec 11, 2017
2 parents b7f95aa + baf4d1b commit 8bf1415
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 12 deletions.
2 changes: 1 addition & 1 deletion visualdl/backend/logic/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ PYBIND11_PLUGIN(core) {
&vs::start_write_service,
"global information-maintainer object.");
m.def("im", &vs::im);
m.def("stop_threads", &vs::StopThreads);
m.def("stop_threads", &vs::stop_threads);

// interfaces for components
#define ADD_SCALAR_TYPED_INTERFACE(T, name__) \
Expand Down
2 changes: 0 additions & 2 deletions visualdl/backend/logic/sdk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ std::string TabletHelper::human_readable_buffer() const {
return buffer;
}

void ImHelper::PersistToDisk() const { IM::Global().PersistToDisk(); }

// implementations for components
namespace components {

Expand Down
67 changes: 61 additions & 6 deletions visualdl/backend/logic/sdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,69 @@ class ImHelper {
public:
ImHelper() {}

/*
* mode:
* 0: read
* 1: write
* 2: none
*/
StorageHelper storage() {
return StorageHelper(IM::Global().storage().mutable_data());
}
TabletHelper tablet(const std::string &tag) {
return TabletHelper(IM::Global().storage().tablet(tag));
}
TabletHelper AddTablet(const std::string &tag, int num_samples) {
return TabletHelper(IM::Global().AddTablet(tag, num_samples));
}
void ClearTablets() {
IM::Global().storage().mutable_data()->clear_tablets();
}

void PersistToDisk() const { IM::Global().PersistToDisk(); }
};

namespace components {

/*
* Read and write support for Scalar component.
*/
template <typename T>
class ScalarHelper {
public:
ScalarHelper(storage::Tablet *tablet) : data_(tablet) {}

void SetCaptions(const std::vector<std::string> &captions);

void AddRecord(int id, const std::vector<T> &values);

std::vector<std::vector<T>> GetRecords() const;

std::vector<int> GetIds() const;

std::vector<int> GetTimestamps() const;

std::vector<std::string> GetCaptions() const;

size_t GetSize() const { return data_->records_size(); }

private:
storage::Tablet *data_;
};

} // namespace components

static ImHelper &im() {
static ImHelper im;
return im;
}

static void start_read_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainRead();
}

static void start_write_service(const std::string &dir) {
IM::Global().SetPersistDest(dir);
IM::Global().MaintainWrite();
}

static void stop_threads() { cc::PeriodExector::Global().Quit(); }

} // namespace visualdl

#endif // VISUALDL_BACKEND_LOGIC_SDK_H
2 changes: 1 addition & 1 deletion visualdl/backend/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ add_library(storage_proto ${PROTO_SRCS})
add_dependencies(storage_proto protobuf)

## add storage as target
add_library(storage storage.cc ${PROTO_SRCS} ${PROTO_HDRS})
add_library(storage storage.cc storage.h ${PROTO_SRCS} ${PROTO_HDRS})
add_dependencies(storage storage_proto)
19 changes: 17 additions & 2 deletions visualdl/backend/utils/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,27 @@ struct PeriodExector {
void Start() { quit = false; }

void operator()(task_t&& task, int msec) {
const int interval = 500;

auto task_wrapper = [=] {
while (!quit) {
if (!task()) break;
std::this_thread::sleep_for(std::chrono::milliseconds(msec));
// if the program is terminated, quit while as soon as possible.
// this is just trick, but should works.
if (msec > 1000) {
int i;
for (i = 0; i < msec / interval; i++) {
if (quit) break;
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
std::this_thread::sleep_for(
std::chrono::milliseconds(msec - i * interval));
if (quit) break;
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(msec));
}
}
LOG(INFO) << "quit job";
LOG(INFO) << "quit concurrent job";
};
threads_.emplace_back(std::thread(std::move(task_wrapper)));
msec_ = msec;
Expand Down

0 comments on commit 8bf1415

Please sign in to comment.