diff --git a/epf/Cell.cpp b/epf/Cell.cpp index fa5a8cc..400493a 100644 --- a/epf/Cell.cpp +++ b/epf/Cell.cpp @@ -24,9 +24,7 @@ void Cell::initialize() m_buf = m_writer->bufferCache().fetch(); m_pos = m_buf->data(); - // The end position is one less than the buffer size to allow for - // speculative writes into the cell. See FileProcessor for details. - m_endPos = m_pos + m_pointSize * ((BufSize / m_pointSize) - 1); + m_endPos = m_pos + m_pointSize * (BufSize / m_pointSize); } void Cell::write() @@ -43,7 +41,7 @@ void Cell::write() void Cell::advance() { m_pos += m_pointSize; - if (m_pos > m_endPos) + if (m_pos >= m_endPos) { write(); initialize(); diff --git a/epf/Epf.cpp b/epf/Epf.cpp index 1dce392..3aed693 100644 --- a/epf/Epf.cpp +++ b/epf/Epf.cpp @@ -158,8 +158,6 @@ void Epf::run(const std::vector& options) // Make a writer with 4 threads. m_writer.reset(new Writer(m_outputDir, 4)); - std::vector> processors; - // Sort file infos so the largest files come first. This helps to make sure we don't delay // processing big files that take the longest (use threads more efficiently). std::sort(fileInfos.begin(), fileInfos.end(), [](const FileInfo& f1, const FileInfo& f2) @@ -167,11 +165,12 @@ void Epf::run(const std::vector& options) // Add the files to the processing pool for (const FileInfo& fi : fileInfos) { - std::unique_ptr fp( - new FileProcessor(fi, layout->pointSize(), m_grid, m_writer.get())); - std::function processor = std::bind(&FileProcessor::operator(), fp.get()); - m_pool.add(processor); - processors.push_back(std::move(fp)); + int pointSize = layout->pointSize(); + m_pool.add([&fi, pointSize, this]() + { + FileProcessor fp(fi, pointSize, m_grid, m_writer.get()); + fp.run(); + }); } // Wait for all the processors to finish and restart. diff --git a/epf/FileProcessor.cpp b/epf/FileProcessor.cpp index d045301..ac720c9 100644 --- a/epf/FileProcessor.cpp +++ b/epf/FileProcessor.cpp @@ -30,7 +30,7 @@ FileProcessor::FileProcessor(const FileInfo& fi, size_t pointSize, const Grid& g m_fi(fi), m_cellMgr(pointSize, writer), m_grid(grid), m_cnt(++m_totalCnt) {} -void FileProcessor::operator()() +void FileProcessor::run() { Options opts; opts.add("filename", m_fi.filename); diff --git a/epf/FileProcessor.hpp b/epf/FileProcessor.hpp index a25b714..266a8c6 100644 --- a/epf/FileProcessor.hpp +++ b/epf/FileProcessor.hpp @@ -29,7 +29,7 @@ class FileProcessor FileProcessor(const FileInfo& fi, size_t pointSize, const Grid& grid, Writer *writer); Cell *getCell(const VoxelKey& key); - void operator()(); + void run(); private: FileInfo m_fi; diff --git a/epf/Writer.cpp b/epf/Writer.cpp index a96e0c3..563fe89 100644 --- a/epf/Writer.cpp +++ b/epf/Writer.cpp @@ -26,12 +26,6 @@ namespace ept2 namespace epf { -struct WriteData -{ - VoxelKey key; - DataVecPtr data; -}; - Writer::Writer(const std::string& directory, int numThreads) : m_directory(directory), m_pool(numThreads), m_stop(false) { @@ -114,6 +108,9 @@ void Writer::run() // Remove the key from the active key list. std::ofstream out(path(wd.key), std::ios::app | std::ios::binary); out.write(reinterpret_cast(wd.data->data()), wd.data->size()); + out.close(); + if (!out) + throw Error("Failure writing to '" + path(wd.key) + "'."); m_bufferCache.replace(std::move(wd.data)); std::lock_guard lock(m_mutex); diff --git a/epf/Writer.hpp b/epf/Writer.hpp index cc7c196..2b4f102 100644 --- a/epf/Writer.hpp +++ b/epf/Writer.hpp @@ -35,7 +35,7 @@ class Writer struct WriteData { VoxelKey key; - std::unique_ptr> data; + DataVecPtr data; }; public: