Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush Cell cache when we run out. #33

Merged
merged 2 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bu/Processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,21 @@ Processor::writeOctantCompressed(const OctantInfo& o, Index& index, IndexIter po
}
}
flush:
flushCompressed(table, view, o);
try
{
flushCompressed(table, view, o);
}
catch (pdal::pdal_error& err)
{
fatal(err.what());
}

m_manager.logOctant(o.key(), count);
return pos;
}


// Copy data from the source file to the point view.
void Processor::appendCompressed(pdal::PointViewPtr view, const DimInfoList& dims,
const FileInfo& fi, IndexIter begin, IndexIter end)
{
Expand Down
13 changes: 4 additions & 9 deletions epf/BufferCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ namespace epf
{

// If we have a buffer in the cache, return it. Otherwise create a new one and return that.
DataVecPtr BufferCache::fetch()
// If nonblock is true and there are no available buffers, return null.
DataVecPtr BufferCache::fetch(std::unique_lock<std::mutex>& lock, bool nonblock)
{
std::unique_lock<std::mutex> lock(m_mutex);
if (nonblock && m_buffers.empty() && m_count >= MaxBuffers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this buffers.empty() check be locked?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. Thx.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no. Or to put it another way, it already is.

The lock that was in the BufferCache has been removed. The lock that's used is now the one that's in the Writer, that owns the buffer cache. Writer::fetchBuffer() acquires the lock before calling BufferCache::fetch(). It passes the lock along so that it can be UN-locked if we need to wait on a CV.

return nullptr;

m_cv.wait(lock, [this](){ return m_buffers.size() || m_count < MaxBuffers; });
if (m_buffers.size())
Expand All @@ -42,17 +44,10 @@ DataVecPtr BufferCache::fetch()
// Put a buffer back in the cache.
void BufferCache::replace(DataVecPtr&& buf)
{
std::unique_lock<std::mutex> lock(m_mutex);

//ABELL - Fix this.
// buf->resize(BufSize);
m_buffers.push_back(std::move(buf));

if (m_count == MaxBuffers)
{
lock.unlock();
m_cv.notify_one();
}
}

} // namespace epf
Expand Down
3 changes: 1 addition & 2 deletions epf/BufferCache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ class BufferCache
BufferCache() : m_count(0)
{}

DataVecPtr fetch();
DataVecPtr fetch(std::unique_lock<std::mutex>& lock, bool nonblock);
void replace(DataVecPtr&& buf);

private:
std::deque<DataVecPtr> m_buffers;
std::mutex m_mutex;
std::condition_variable m_cv;
int m_count;
};
Expand Down
41 changes: 32 additions & 9 deletions epf/Cell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,26 @@ namespace epf

void Cell::initialize()
{
m_buf = m_writer->bufferCache().fetch();
m_buf = m_writer->fetchBuffer();

// If we couldn't fetch a buffer, flush all the the buffers for this processor and
// try again, but block.
if (!m_buf)
{
m_flush(this);
m_buf = m_writer->fetchBufferBlocking();
}
m_pos = m_buf->data();

m_endPos = m_pos + m_pointSize * (BufSize / m_pointSize);
}

// NOTE - After write(), the cell is invalid and must be initialized or destroyed.
void Cell::write()
{
// Resize the buffer so the writer knows how much to write.
size_t size = m_pos - m_buf->data();
if (size)
// {
// m_buf->resize(size);
m_writer->enqueue(m_key, std::move(m_buf), size);
// }
}

void Cell::advance()
Expand All @@ -61,17 +66,35 @@ Cell *CellMgr::get(const VoxelKey& key)
auto it = m_cells.find(key);
if (it == m_cells.end())
{
std::unique_ptr<Cell> cell(new Cell(key, m_pointSize, m_writer));
Cell::FlushFunc f = [this](Cell *exclude)
{
flush(exclude);
};
std::unique_ptr<Cell> cell(new Cell(key, m_pointSize, m_writer, f));
it = m_cells.insert( {key, std::move(cell)} ).first;
}
Cell& c = *(it->second);
return &c;
}

void CellMgr::flush()
// Eliminate all the cells and their associated data buffers except the `exclude`
// cell.
void CellMgr::flush(Cell *exclude)
{
for (auto& cp : m_cells)
cp.second->write();
CellMap::iterator it = m_cells.end();
if (exclude)
it = m_cells.find(exclude->key());

// If there was no exclude cell or it isn't in our list, just clear the cells.
// Otherwise, save the exclude cell, clear the list, and reinsert.
if (it == m_cells.end())
m_cells.clear();
else
{
std::unique_ptr<Cell> c = std::move(it->second);
m_cells.clear();
m_cells.insert({c->key(), std::move(c)});
}
}

} // namespace epf
Expand Down
23 changes: 17 additions & 6 deletions epf/Cell.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

#include <cstdint>
#include <cstddef>
#include <map>
#include <functional>
#include <unordered_map>
#include <memory>

#include "EpfTypes.hpp"
Expand All @@ -35,12 +36,18 @@ class Writer;
class Cell
{
public:
Cell(const VoxelKey& key, int pointSize, Writer *writer) :
m_key(key), m_pointSize(pointSize), m_writer(writer)
using FlushFunc = std::function<void(Cell *)>;

Cell(const VoxelKey& key, int pointSize, Writer *writer, FlushFunc flush) :
m_key(key), m_pointSize(pointSize), m_writer(writer), m_flush(flush)
{
assert(pointSize < BufSize);
initialize();
}
~Cell()
{
write();
}

void initialize();
Point point()
Expand All @@ -49,7 +56,6 @@ class Cell
{ return m_key; }
void copyPoint(Point& b)
{ std::copy(b.data(), b.data() + m_pointSize, m_pos); }
void write();
void advance();

private:
Expand All @@ -59,19 +65,24 @@ class Cell
Writer *m_writer;
uint8_t *m_pos;
uint8_t *m_endPos;
FlushFunc m_flush;

void write();
};

class CellMgr
{
public:
CellMgr(int pointSize, Writer *writer);

Cell *get(const VoxelKey& key);
void flush();
void flush(Cell *exclude);

private:
using CellMap = std::unordered_map<VoxelKey, std::unique_ptr<Cell>>;
int m_pointSize;
Writer *m_writer;
std::map<VoxelKey, std::unique_ptr<Cell>> m_cells;
CellMap m_cells;
};


Expand Down
6 changes: 3 additions & 3 deletions epf/Epf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void writeMetadata(const std::string& tempDir, const Grid& grid,

/// Epf

Epf::Epf() : m_pool(8)
Epf::Epf() : m_pool(NumFileProcessors)
{}


Expand Down Expand Up @@ -120,8 +120,8 @@ void Epf::run(const Options& options, ProgressWriter& progress)
}
}

// Make a writer with 4 threads.
m_writer.reset(new Writer(options.tempDir, 4, layout->pointSize()));
// Make a writer with NumWriters threads.
m_writer.reset(new Writer(options.tempDir, NumWriters, layout->pointSize()));

// Sort file infos so the largest files come first. This helps to make sure we don't delay
// processing big files that take the longest (use threads more efficiently).
Expand Down
2 changes: 2 additions & 0 deletions epf/EpfTypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ using Totals = std::unordered_map<VoxelKey, size_t>;
constexpr int MaxPointsPerNode = 100000;
constexpr int BufSize = 4096 * 10;
constexpr int MaxBuffers = 1000;
constexpr int NumWriters = 4;
constexpr int NumFileProcessors = 8;

struct FileInfo
{
Expand Down
15 changes: 10 additions & 5 deletions epf/FileProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,17 @@ void FileProcessor::run()

pdal::FixedPointTable t(1000);

f.prepare(t);
f.execute(t);
m_progress.update(count % CountIncrement);
try
{
f.prepare(t);
f.execute(t);
}
catch (const pdal::pdal_error& err)
{
fatal(err.what());
}

// Flush any data remaining in the cells.
m_cellMgr.flush();
m_progress.update(count % CountIncrement);
}

} // namespace epf
Expand Down
1 change: 0 additions & 1 deletion epf/Reprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ void Reprocessor::run()
cell->advance();
pos += m_pointSize;
}
m_mgr.flush();
pdal::FileUtils::unmapFile(ctx);
pdal::FileUtils::deleteFile(m_filename);
}
Expand Down
20 changes: 19 additions & 1 deletion epf/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ Totals Writer::totals(size_t minSize)
return t;
}

DataVecPtr Writer::fetchBuffer()
{
std::unique_lock<std::mutex> lock(m_mutex);

// If there are fewer items in the queue than we have FileProcessors, we may choose not
// to block and return a nullptr, expecting that the caller will flush outstanding cells.
return m_bufferCache.fetch(lock, m_queue.size() < NumFileProcessors);
}


DataVecPtr Writer::fetchBufferBlocking()
{
std::unique_lock<std::mutex> lock(m_mutex);

return m_bufferCache.fetch(lock, false);
}


void Writer::enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize)
{
{
Expand Down Expand Up @@ -123,9 +141,9 @@ void Writer::run()
out.close();
if (!out)
fatal("Failure writing to '" + path(wd.key) + "'.");
m_bufferCache.replace(std::move(wd.data));

std::lock_guard<std::mutex> lock(m_mutex);
m_bufferCache.replace(std::move(wd.data));
m_active.remove(wd.key);
}
}
Expand Down
4 changes: 2 additions & 2 deletions epf/Writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class Writer

void enqueue(const VoxelKey& key, DataVecPtr data, size_t dataSize);
void stop();
BufferCache& bufferCache()
{ return m_bufferCache; }
const Totals& totals()
{ return m_totals; }
Totals totals(size_t minSize);
DataVecPtr fetchBuffer();
DataVecPtr fetchBufferBlocking();

private:
std::string path(const VoxelKey& key);
Expand Down