Skip to content

Commit

Permalink
Merge a42c8f9 into e7c31a4
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser authored Aug 11, 2023
2 parents e7c31a4 + a42c8f9 commit 0cfa98f
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 70 deletions.
28 changes: 18 additions & 10 deletions libs/full/agas/src/route.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#if defined(HPX_HAVE_NETWORKING)
#include <hpx/actions_base/plain_action.hpp>
#include <hpx/agas/addressing_service.hpp>
#include <hpx/agas_base/route.hpp>
#include <hpx/agas_base/server/primary_namespace.hpp>
#include <hpx/assert.hpp>
#include <hpx/async_distributed/continuation.hpp>
Expand Down Expand Up @@ -41,32 +42,28 @@ HPX_PLAIN_ACTION_ID(hpx::detail::update_agas_cache, update_agas_cache_action,

namespace hpx::agas::server {

void primary_namespace::route(parcelset::parcel&& p)
void route_impl(primary_namespace& server, parcelset::parcel&& p)
{
LPT_(debug).format("primary_namespace::route: {}", p.parcel_id());

util::scoped_timer<std::atomic<std::int64_t>> update(
counter_data_.route_.time_, counter_data_.route_.enabled_);
counter_data_.increment_route_count();
LPT_(debug).format("agas::server::route_impl: {}", p.parcel_id());

naming::gid_type const& gid = p.destination();
naming::address& addr = p.addr();
resolved_type cache_address;
primary_namespace::resolved_type cache_address;

// resolve destination addresses, we should be able to resolve all of
// them, otherwise it's an error
{
std::unique_lock<mutex_type> l(mutex_);
std::unique_lock<primary_namespace::mutex_type> l(server.mutex());

error_code& ec = throws;

// wait for any migration to be completed
if (naming::detail::is_migratable(gid))
{
wait_for_migration_locked(l, gid, ec);
server.wait_for_migration_locked(l, gid, ec);
}

cache_address = resolve_gid_locked(l, gid, ec);
cache_address = server.resolve_gid_locked(l, gid, ec);

if (ec || hpx::get<0>(cache_address) == naming::invalid_gid)
{
Expand Down Expand Up @@ -133,6 +130,17 @@ namespace hpx::agas::server {
}
}
}

///////////////////////////////////////////////////////////////////////////
struct init_route_function
{
init_route_function()
{
server::route = &route_impl;
}
};

init_route_function init;
} // namespace hpx::agas::server

#endif
1 change: 1 addition & 0 deletions libs/full/agas_base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(agas_base_headers
hpx/agas_base/gva.hpp
hpx/agas_base/locality_namespace.hpp
hpx/agas_base/primary_namespace.hpp
hpx/agas_base/route.hpp
hpx/agas_base/server/component_namespace.hpp
hpx/agas_base/server/locality_namespace.hpp
hpx/agas_base/server/primary_namespace.hpp
Expand Down
21 changes: 21 additions & 0 deletions libs/full/agas_base/include/hpx/agas_base/route.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2007-2023 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>

#if defined(HPX_HAVE_NETWORKING)
#include <hpx/agas_base/agas_fwd.hpp>
#include <hpx/parcelset/parcel.hpp>

namespace hpx::agas::server {

extern HPX_EXPORT void (*route)(
primary_namespace& server, parcelset::parcel&& p);
} // namespace hpx::agas::server

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ namespace hpx::agas::server {
using resolved_type =
hpx::tuple<naming::gid_type, gva, naming::gid_type>;

mutex_type& mutex()
{
return mutex_;
}

private:
// REVIEW: Separate mutexes might reduce contention here. This has to be
// investigated carefully.
Expand Down Expand Up @@ -241,6 +246,7 @@ namespace hpx::agas::server {
char const* func_name);
#endif

public:
// helper function
void wait_for_migration_locked(std::unique_lock<mutex_type>& l,
naming::gid_type const& id, error_code& ec);
Expand Down Expand Up @@ -295,10 +301,10 @@ namespace hpx::agas::server {
std::pair<naming::gid_type, naming::gid_type> allocate(
std::uint64_t count);

private:
resolved_type resolve_gid_locked(std::unique_lock<mutex_type>& l,
naming::gid_type const& gid, error_code& ec);

private:
resolved_type resolve_gid_locked_non_local(
std::unique_lock<mutex_type>& l, naming::gid_type const& gid,
error_code& ec);
Expand Down
14 changes: 14 additions & 0 deletions libs/full/agas_base/src/server/primary_namespace_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/config.hpp>
#include <hpx/agas_base/route.hpp>
#include <hpx/agas_base/server/primary_namespace.hpp>
#include <hpx/assert.hpp>
#include <hpx/components_base/agas_interface.hpp>
Expand Down Expand Up @@ -1119,6 +1120,19 @@ namespace hpx::agas::server {
return resolved_type(naming::invalid_gid, gva(), naming::invalid_gid);
}

#if defined(HPX_HAVE_NETWORKING)
void (*route)(primary_namespace& server, parcelset::parcel&& p) = nullptr;

void primary_namespace::route(parcelset::parcel&& p)
{
util::scoped_timer<std::atomic<std::int64_t>> update(
counter_data_.route_.time_, counter_data_.route_.enabled_);
counter_data_.increment_route_count();

(*server::route)(*this, HPX_MOVE(p));
}
#endif

// access current counter values
std::int64_t primary_namespace::counter_data::get_bind_gid_count(bool reset)
{
Expand Down
5 changes: 3 additions & 2 deletions libs/full/naming/include/hpx/naming/credit_handling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ namespace hpx::naming {
gid_type& id, std::int64_t debit);

HPX_EXPORT std::int64_t fill_credit_for_gid(gid_type& id,
std::int64_t credits = std::int64_t(HPX_GLOBALCREDIT_INITIAL));
std::int64_t credits = static_cast<std::int64_t>(
HPX_GLOBALCREDIT_INITIAL));

///////////////////////////////////////////////////////////////////////
HPX_EXPORT gid_type move_gid(gid_type& id);
Expand All @@ -58,7 +59,7 @@ namespace hpx::naming {
std::unique_lock<gid_type::mutex_type>& l, gid_type& id);

///////////////////////////////////////////////////////////////////////
HPX_EXPORT void decrement_refcnt(id_type_impl* gid) noexcept;
HPX_EXPORT void decrement_refcnt(id_type_impl const* gid) noexcept;

///////////////////////////////////////////////////////////////////////
// credit management (called during serialization), this function
Expand Down
62 changes: 37 additions & 25 deletions libs/full/naming/src/credit_handling.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2022 Hartmut Kaiser
// Copyright (c) 2007-2023 Hartmut Kaiser
// Copyright (c) 2011 Bryce Lelbach
//
// SPDX-License-Identifier: BSL-1.0
Expand Down Expand Up @@ -114,7 +114,7 @@ namespace hpx::naming {
///////////////////////////////////////////////////////////////////////////
namespace detail {

void decrement_refcnt(id_type_impl* p) noexcept
void decrement_refcnt(id_type_impl const* p) noexcept
{
// do nothing if it's too late in the game
if (!get_runtime_ptr())
Expand All @@ -136,7 +136,7 @@ namespace hpx::naming {
if (gid_was_split(*p) || !agas::resolve_cached(*p, addr))
{
// decrement global reference count for the given gid,
std::int64_t credits = detail::get_credit_from_gid(*p);
std::int64_t const credits = get_credit_from_gid(*p);
HPX_ASSERT(0 != credits);

if (get_runtime_ptr()) // -V547
Expand Down Expand Up @@ -206,13 +206,13 @@ namespace hpx::naming {
// Get the current credit for our gid. If no other concurrent
// split has happened since we invoked incref below, the credit
// of this gid is equal to 2, otherwise it is larger.
std::int64_t src_credit = get_credit_from_gid(gid);
std::int64_t const src_credit = get_credit_from_gid(gid);
HPX_ASSERT(src_credit >= 2);

std::int64_t split_credit =
constexpr std::int64_t split_credit =
static_cast<std::int64_t>(HPX_GLOBALCREDIT_INITIAL) - 2;
std::int64_t new_credit = src_credit + split_credit;
std::int64_t overflow_credit = new_credit -
std::int64_t const overflow_credit = new_credit -
static_cast<std::int64_t>(HPX_GLOBALCREDIT_INITIAL);
HPX_ASSERT(overflow_credit >= 0);

Expand Down Expand Up @@ -258,7 +258,8 @@ namespace hpx::naming {
// credit is guaranteed to arrive only after we incremented the
// credit successfully in agas.
HPX_ASSERT(get_log2credit_from_gid(gid) > 0);
std::int16_t src_log2credits = get_log2credit_from_gid(gid);
std::int16_t const src_log2credits =
get_log2credit_from_gid(gid);

// Credit exhaustion - we need to get more.
if (src_log2credits == 1)
Expand All @@ -279,11 +280,11 @@ namespace hpx::naming {

// We add HPX_GLOBALCREDIT_INITIAL credits for the new gid
// and HPX_GLOBALCREDIT_INITIAL - 2 for the old one.
std::int64_t new_credit = 2 *
constexpr std::int64_t new_credit = 2 *
(static_cast<std::int64_t>(HPX_GLOBALCREDIT_INITIAL) -
1);

naming::gid_type new_gid = gid; // strips lock-bit
naming::gid_type const new_gid = gid; // strips lock-bit
HPX_ASSERT(new_gid != invalid_gid);
return agas::incref(new_gid, new_credit)
.then(hpx::launch::sync,
Expand Down Expand Up @@ -338,7 +339,7 @@ namespace hpx::naming {
{
HPX_ASSERT_OWNS_LOCK(l);

std::uint16_t log2credits = get_log2credit_from_gid(id);
std::int16_t const log2credits = get_log2credit_from_gid(id);
HPX_ASSERT(log2credits > 0);

gid_type newid = id; // strips lock-bit
Expand All @@ -362,21 +363,19 @@ namespace hpx::naming {
std::int64_t replenish_credits_locked(
std::unique_lock<gid_type::mutex_type>& l, gid_type& gid)
{
std::int64_t added_credit = 0;

HPX_ASSERT(0 == get_credit_from_gid(gid));

added_credit = naming::detail::fill_credit_for_gid(gid);
std::int64_t const added_credit =
naming::detail::fill_credit_for_gid(gid);
naming::detail::set_credit_split_mask_for_gid(gid);

gid_type unlocked_gid = gid; // strips lock-bit
gid_type const unlocked_gid = gid; // strips lock-bit

std::int64_t result = 0;
std::int64_t result;
{
hpx::unlock_guard<std::unique_lock<gid_type::mutex_type>> ul(l);
result = agas::incref(launch::sync, unlocked_gid, added_credit);
}

return result;
}

Expand All @@ -403,10 +402,10 @@ namespace hpx::naming {

std::int64_t fill_credit_for_gid(gid_type& id, std::int64_t credits)
{
std::int64_t c = get_credit_from_gid(id);
std::int64_t const c = get_credit_from_gid(id);
HPX_ASSERT(c <= credits);

std::int64_t added = credits - c;
std::int64_t const added = credits - c;
set_credit_for_gid(id, credits);

return added;
Expand All @@ -419,7 +418,7 @@ namespace hpx::naming {
HPX_ASSERT(detail::gid_was_split(gid));

// decrement global reference count for the given gid,
std::int64_t credits = detail::get_credit_from_gid(gid);
std::int64_t const credits = detail::get_credit_from_gid(gid);
HPX_ASSERT(0 != credits);

// Fire-and-forget semantics.
Expand All @@ -432,7 +431,7 @@ namespace hpx::naming {

// custom deleter for managed gid_types, will be called when the last
// copy of the corresponding hpx::id_type goes out of scope
void gid_managed_deleter(id_type_impl* p) noexcept
void gid_managed_deleter_impl(id_type_impl const* p) noexcept
{
// a credit of zero means the component is not (globally) reference
// counted
Expand All @@ -450,11 +449,23 @@ namespace hpx::naming {

// custom deleter for unmanaged gid_types, will be called when the last
// copy of the corresponding hpx::id_type goes out of scope
void gid_unmanaged_deleter(id_type_impl* p) noexcept
void gid_unmanaged_deleter_impl(id_type_impl const* p) noexcept
{
delete p; // delete local gid representation only
}

// break cyclic dependency with naming_base
struct init_deleter_functions
{
init_deleter_functions()
{
gid_managed_deleter = &gid_managed_deleter_impl;
gid_unmanaged_deleter = &gid_unmanaged_deleter_impl;
}
};

init_deleter_functions init;

///////////////////////////////////////////////////////////////////////
// prepare the given id, note: this function modifies the passed id
void handle_credit_splitting(serialization::output_archive& ar,
Expand Down Expand Up @@ -551,7 +562,8 @@ namespace hpx::naming {
{
preprocess_gid(id_impl, ar);

gid_serialization_data data{id_impl, type};
gid_serialization_data const data{
static_cast<gid_type const&>(id_impl), type};
ar << data;
return;
}
Expand All @@ -569,7 +581,7 @@ namespace hpx::naming {
gid_type new_gid;
if (hpx::id_type::management_type::unmanaged == type)
{
new_gid = id_impl;
new_gid = static_cast<gid_type const&>(id_impl);
}
else if (hpx::id_type::management_type::managed_move_credit == type)
{
Expand All @@ -587,12 +599,12 @@ namespace hpx::naming {
}

#if defined(HPX_DEBUG)
auto* split_gids = ar.try_get_extra_data<
auto const* split_gids = ar.try_get_extra_data<
serialization::detail::preprocess_gid_types>();
HPX_ASSERT(!split_gids || !split_gids->has_gid(id_impl));
#endif

gid_serialization_data data{new_gid, type};
gid_serialization_data const data{new_gid, type};
ar << data;
}

Expand Down
Loading

0 comments on commit 0cfa98f

Please sign in to comment.