Skip to content

Commit

Permalink
Load data in two separate data regions
Browse files Browse the repository at this point in the history
  • Loading branch information
TheMarex committed Apr 6, 2018
1 parent c7daa52 commit 59e2d93
Show file tree
Hide file tree
Showing 14 changed files with 571 additions and 429 deletions.
53 changes: 38 additions & 15 deletions include/engine/data_watchdog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,23 @@ class DataWatchdogImpl<AlgorithmT, datafacade::ContiguousInternalMemoryDataFacad
boost::interprocess::scoped_lock<mutex_type> current_region_lock(barrier.get_mutex());

auto &shared_register = barrier.data();
auto region_id = shared_register.Find(dataset_name + "/data");
if (region_id == storage::SharedRegionRegister::INVALID_REGION_ID)
auto static_region_id = shared_register.Find(dataset_name + "/static");
auto updatable_region_id = shared_register.Find(dataset_name + "/updatable");
if (static_region_id == storage::SharedRegionRegister::INVALID_REGION_ID ||
updatable_region_id == storage::SharedRegionRegister::INVALID_REGION_ID)
{
throw util::exception("Could not find shared memory region for \"" + dataset_name +
"/data\". Did you run osrm-datastore?");
}
shared_region = &shared_register.GetRegion(region_id);
region = *shared_region;
static_shared_region = &shared_register.GetRegion(static_region_id);
updatable_shared_region = &shared_register.GetRegion(updatable_region_id);
static_region = *static_shared_region;
updatable_region = *updatable_shared_region;

facade_factory =
DataFacadeFactory<datafacade::ContiguousInternalMemoryDataFacade, AlgorithmT>(
std::make_shared<datafacade::SharedMemoryAllocator>(region.shm_key));
std::make_shared<datafacade::SharedMemoryAllocator>(
std::vector<storage::SharedRegionRegister::ShmKey> {static_region.shm_key, updatable_region.shm_key}));
}

watcher = std::thread(&DataWatchdogImpl::Run, this);
Expand Down Expand Up @@ -83,20 +88,36 @@ class DataWatchdogImpl<AlgorithmT, datafacade::ContiguousInternalMemoryDataFacad
{
boost::interprocess::scoped_lock<mutex_type> current_region_lock(barrier.get_mutex());

while (active && region.timestamp == shared_region->timestamp)
while (active && static_region.timestamp == static_shared_region->timestamp &&
updatable_region.timestamp == updatable_shared_region->timestamp)
{
barrier.wait(current_region_lock);
}

if (region.timestamp != shared_region->timestamp)
if (!active)
break;

if (static_region.timestamp != static_shared_region->timestamp)
{
static_region = *static_shared_region;
}
else if (updatable_region.timestamp != updatable_shared_region->timestamp)
{
updatable_region = *updatable_shared_region;
}
else
{
region = *shared_region;
facade_factory =
DataFacadeFactory<datafacade::ContiguousInternalMemoryDataFacade, AlgorithmT>(
std::make_shared<datafacade::SharedMemoryAllocator>(region.shm_key));
util::Log() << "updated facade to region " << (int)region.shm_key
<< " with timestamp " << region.timestamp;
BOOST_ASSERT_MSG(false, "One region always needs to update");
}

util::Log() << "updated facade to regions " << (int)static_region.shm_key << " and "
<< (int)updatable_region.shm_key << " with timestamps "
<< static_region.timestamp << " and " << updatable_region.timestamp;

facade_factory =
DataFacadeFactory<datafacade::ContiguousInternalMemoryDataFacade, AlgorithmT>(
std::make_shared<datafacade::SharedMemoryAllocator>(
std::vector<storage::SharedRegionRegister::ShmKey>{static_region.shm_key, updatable_region.shm_key}));
}

util::Log() << "DataWatchdog thread stopped";
Expand All @@ -106,8 +127,10 @@ class DataWatchdogImpl<AlgorithmT, datafacade::ContiguousInternalMemoryDataFacad
storage::SharedMonitor<storage::SharedRegionRegister> barrier;
std::thread watcher;
bool active;
storage::SharedRegion region;
storage::SharedRegion *shared_region;
storage::SharedRegion static_region;
storage::SharedRegion updatable_region;
storage::SharedRegion *static_shared_region;
storage::SharedRegion *updatable_shared_region;
DataFacadeFactory<datafacade::ContiguousInternalMemoryDataFacade, AlgorithmT> facade_factory;
};
}
Expand Down
5 changes: 2 additions & 3 deletions include/engine/datafacade/contiguous_block_allocator.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef OSRM_ENGINE_DATAFACADE_CONTIGUOUS_BLOCK_ALLOCATOR_HPP_
#define OSRM_ENGINE_DATAFACADE_CONTIGUOUS_BLOCK_ALLOCATOR_HPP_

#include "storage/shared_datatype.hpp"
#include "storage/shared_data_index.hpp"

namespace osrm
{
Expand All @@ -16,8 +16,7 @@ class ContiguousBlockAllocator
virtual ~ContiguousBlockAllocator() = default;

// interface to give access to the datafacades
virtual const storage::DataLayout &GetLayout() = 0;
virtual char *GetMemory() = 0;
virtual const storage::SharedDataIndex &GetIndex() = 0;
};

} // namespace datafacade
Expand Down
67 changes: 29 additions & 38 deletions include/engine/datafacade/contiguous_internalmem_datafacade.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,15 @@ class ContiguousInternalMemoryAlgorithmDataFacade<CH> : public datafacade::Algor
std::size_t exclude_index)
: allocator(std::move(allocator_))
{
InitializeInternalPointers(
allocator->GetLayout(), allocator->GetMemory(), metric_name, exclude_index);
InitializeInternalPointers(allocator->GetIndex(), metric_name, exclude_index);
}

void InitializeInternalPointers(const storage::DataLayout &data_layout,
char *memory_block,
void InitializeInternalPointers(const storage::SharedDataIndex &index,
const std::string &metric_name,
const std::size_t exclude_index)
{
m_query_graph = make_filtered_graph_view(
memory_block, data_layout, "/ch/metrics/" + metric_name, exclude_index);
m_query_graph =
make_filtered_graph_view(index, "/ch/metrics/" + metric_name, exclude_index);
}

// search graph access
Expand Down Expand Up @@ -172,55 +170,51 @@ class ContiguousInternalMemoryDataFacadeBase : public BaseDataFacade
// allocator that keeps the allocation data
std::shared_ptr<ContiguousBlockAllocator> allocator;

void InitializeInternalPointers(const storage::DataLayout &layout,
char *memory_ptr,
void InitializeInternalPointers(const storage::SharedDataIndex &index,
const std::string &metric_name,
const std::size_t exclude_index)
{
// TODO: For multi-metric support we need to have separate exclude classes per metric
(void)metric_name;

m_profile_properties =
layout.GetBlockPtr<extractor::ProfileProperties>(memory_ptr, "/common/properties");
index.GetBlockPtr<extractor::ProfileProperties>("/common/properties");

exclude_mask = m_profile_properties->excludable_classes[exclude_index];

m_check_sum =
*layout.GetBlockPtr<std::uint32_t>(memory_ptr, "/common/connectivity_checksum");
m_check_sum = *index.GetBlockPtr<std::uint32_t>("/common/connectivity_checksum");

std::tie(m_coordinate_list, m_osmnodeid_list) =
make_nbn_data_view(memory_ptr, layout, "/common/nbn_data");
make_nbn_data_view(index, "/common/nbn_data");

m_static_rtree = make_search_tree_view(memory_ptr, layout, "/common/rtree");
m_static_rtree = make_search_tree_view(index, "/common/rtree");
m_geospatial_query.reset(
new SharedGeospatialQuery(m_static_rtree, m_coordinate_list, *this));

edge_based_node_data = make_ebn_data_view(memory_ptr, layout, "/common/ebg_node_data");
edge_based_node_data = make_ebn_data_view(index, "/common/ebg_node_data");

turn_data = make_turn_data_view(memory_ptr, layout, "/common/turn_data");
turn_data = make_turn_data_view(index, "/common/turn_data");

m_name_table = make_name_table_view(memory_ptr, layout, "/common/names");
m_name_table = make_name_table_view(index, "/common/names");

std::tie(m_lane_description_offsets, m_lane_description_masks) =
make_turn_lane_description_views(memory_ptr, layout, "/common/turn_lanes");
m_lane_tupel_id_pairs = make_lane_data_view(memory_ptr, layout, "/common/turn_lanes");
make_turn_lane_description_views(index, "/common/turn_lanes");
m_lane_tupel_id_pairs = make_lane_data_view(index, "/common/turn_lanes");

m_turn_weight_penalties = make_turn_weight_view(memory_ptr, layout, "/common/turn_penalty");
m_turn_duration_penalties =
make_turn_duration_view(memory_ptr, layout, "/common/turn_penalty");
m_turn_weight_penalties = make_turn_weight_view(index, "/common/turn_penalty");
m_turn_duration_penalties = make_turn_duration_view(index, "/common/turn_penalty");

segment_data = make_segment_data_view(memory_ptr, layout, "/common/segment_data");
segment_data = make_segment_data_view(index, "/common/segment_data");

m_datasources =
layout.GetBlockPtr<extractor::Datasources>(memory_ptr, "/common/data_sources_names");
m_datasources = index.GetBlockPtr<extractor::Datasources>("/common/data_sources_names");

intersection_bearings_view =
make_intersection_bearings_view(memory_ptr, layout, "/common/intersection_bearings");
make_intersection_bearings_view(index, "/common/intersection_bearings");

m_entry_class_table = make_entry_classes_view(memory_ptr, layout, "/common/entry_classes");
m_entry_class_table = make_entry_classes_view(index, "/common/entry_classes");

std::tie(m_maneuver_overrides, m_maneuver_override_node_sequences) =
make_maneuver_overrides_views(memory_ptr, layout, "/common/maneuver_overrides");
make_maneuver_overrides_views(index, "/common/maneuver_overrides");
}

public:
Expand All @@ -231,8 +225,7 @@ class ContiguousInternalMemoryDataFacadeBase : public BaseDataFacade
const std::size_t exclude_index)
: allocator(std::move(allocator_))
{
InitializeInternalPointers(
allocator->GetLayout(), allocator->GetMemory(), metric_name, exclude_index);
InitializeInternalPointers(allocator->GetIndex(), metric_name, exclude_index);
}

// node and edge information access
Expand Down Expand Up @@ -661,16 +654,15 @@ template <> class ContiguousInternalMemoryAlgorithmDataFacade<MLD> : public Algo

QueryGraph query_graph;

void InitializeInternalPointers(const storage::DataLayout &layout,
char *memory_ptr,
void InitializeInternalPointers(const storage::SharedDataIndex &index,
const std::string &metric_name,
const std::size_t exclude_index)
{
mld_partition = make_partition_view(memory_ptr, layout, "/mld/multilevelpartition");
mld_cell_metric = make_filtered_cell_metric_view(
memory_ptr, layout, "/mld/metrics/" + metric_name, exclude_index);
mld_cell_storage = make_cell_storage_view(memory_ptr, layout, "/mld/cellstorage");
query_graph = make_multi_level_graph_view(memory_ptr, layout, "/mld/multilevelgraph");
mld_partition = make_partition_view(index, "/mld/multilevelpartition");
mld_cell_metric =
make_filtered_cell_metric_view(index, "/mld/metrics/" + metric_name, exclude_index);
mld_cell_storage = make_cell_storage_view(index, "/mld/cellstorage");
query_graph = make_multi_level_graph_view(index, "/mld/multilevelgraph");
}

// allocator that keeps the allocation data
Expand All @@ -683,8 +675,7 @@ template <> class ContiguousInternalMemoryAlgorithmDataFacade<MLD> : public Algo
const std::size_t exclude_index)
: allocator(std::move(allocator_))
{
InitializeInternalPointers(
allocator->GetLayout(), allocator->GetMemory(), metric_name, exclude_index);
InitializeInternalPointers(allocator->GetIndex(), metric_name, exclude_index);
}

const partitioner::MultiLevelPartitionView &GetMultiLevelPartition() const override
Expand Down
5 changes: 2 additions & 3 deletions include/engine/datafacade/mmap_memory_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ class MMapMemoryAllocator : public ContiguousBlockAllocator
~MMapMemoryAllocator() override final;

// interface to give access to the datafacades
storage::DataLayout &GetLayout() override final;
char *GetMemory() override final;
const storage::SharedDataIndex &GetIndex() override final;

private:
storage::DataLayout *data_layout;
storage::SharedDataIndex index;
util::vector_view<char> mapped_memory;
boost::iostreams::mapped_file mapped_memory_file;
};
Expand Down
5 changes: 2 additions & 3 deletions include/engine/datafacade/process_memory_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ class ProcessMemoryAllocator : public ContiguousBlockAllocator
~ProcessMemoryAllocator() override final;

// interface to give access to the datafacades
const storage::DataLayout &GetLayout() override final;
char *GetMemory() override final;
const storage::SharedDataIndex &GetIndex() override final;

private:
storage::DataLayout internal_layout;
storage::SharedDataIndex index;
std::unique_ptr<char[]> internal_memory;
};

Expand Down
12 changes: 5 additions & 7 deletions include/engine/datafacade/shared_memory_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "engine/datafacade/contiguous_block_allocator.hpp"

#include "storage/shared_datatype.hpp"
#include "storage/shared_data_index.hpp"
#include "storage/shared_memory.hpp"

#include <memory>
Expand All @@ -23,17 +23,15 @@ namespace datafacade
class SharedMemoryAllocator : public ContiguousBlockAllocator
{
public:
explicit SharedMemoryAllocator(storage::SharedRegionRegister::ShmKey data_shm_key);
explicit SharedMemoryAllocator(const std::vector<storage::SharedRegionRegister::ShmKey> &shm_keys);
~SharedMemoryAllocator() override final;

// interface to give access to the datafacades
const storage::DataLayout &GetLayout() override final;
char *GetMemory() override final;
const storage::SharedDataIndex &GetIndex() override final;

private:
std::size_t layout_size;
storage::DataLayout data_layout;
std::unique_ptr<storage::SharedMemory> m_large_memory;
storage::SharedDataIndex index;
std::vector<std::unique_ptr<storage::SharedMemory>> memory_regions;
};

} // namespace datafacade
Expand Down
12 changes: 5 additions & 7 deletions include/engine/datafacade_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,14 @@ template <template <typename A> class FacadeT, typename AlgorithmT> class DataFa
template <typename AllocatorT>
DataFacadeFactory(std::shared_ptr<AllocatorT> allocator, std::true_type)
{
const auto &layout = allocator->GetLayout();
properties = layout.template GetBlockPtr<extractor::ProfileProperties>(
allocator->GetMemory(), "/common/properties");
const auto &index = allocator->GetIndex();
properties = index.template GetBlockPtr<extractor::ProfileProperties>("/common/properties");
const auto &metric_name = properties->GetWeightName();

std::vector<std::string> exclude_prefixes;
auto exclude_path = std::string("/") + routing_algorithms::identifier<AlgorithmT>() +
std::string("/metrics/") + metric_name + "/exclude/";
layout.List(exclude_path, std::back_inserter(exclude_prefixes));
index.List(exclude_path, std::back_inserter(exclude_prefixes));
facades.resize(exclude_prefixes.size());

if (facades.empty())
Expand Down Expand Up @@ -89,9 +88,8 @@ template <template <typename A> class FacadeT, typename AlgorithmT> class DataFa
template <typename AllocatorT>
DataFacadeFactory(std::shared_ptr<AllocatorT> allocator, std::false_type)
{
const auto &layout = allocator->GetLayout();
properties = layout.template GetBlockPtr<extractor::ProfileProperties>(
allocator->GetMemory(), "/common/properties");
const auto &index = allocator->GetIndex();
properties = index.template GetBlockPtr<extractor::ProfileProperties>("/common/properties");
const auto &metric_name = properties->GetWeightName();
facades.push_back(std::make_shared<const Facade>(allocator, metric_name, 0));
}
Expand Down
Loading

0 comments on commit 59e2d93

Please sign in to comment.