Skip to content

Commit

Permalink
plugin: initialize plugin w/ cores-per-node count
Browse files Browse the repository at this point in the history
Problem: The priority plugin does not know about basic system
information it will need in order to enforce a max-cores limit per
association, such as the number of cores on a node.

Add an estimation of a cores-per-node count estimate during the
initialization of the priority plugin by fetching resource.R from the
KVS. Store this estimate in a global variable in the plugin.

Add this estimate to the list of information returned in the
plugin.query callback.
  • Loading branch information
cmoussa1 committed Jan 6, 2025
1 parent 87d8c01 commit 5d1a247
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
3 changes: 2 additions & 1 deletion src/plugins/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
AM_LDFLAGS = -module -shared $(CODE_COVERAGE_LDFLAGS)

AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS)
AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS) $(FLUX_IDSET_CFLAGS)

AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS) -fPIC -shared

Expand All @@ -11,3 +11,4 @@ jobtap_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp accounting.cpp
mf_priority_la_CPPFLAGS = -I$(top_srcdir)/src/plugins
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
mf_priority_la_LIBADD = $(FLUX_IDSET_LIBS)
53 changes: 51 additions & 2 deletions src/plugins/mf_priority.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern "C" {
#endif
#include <flux/core.h>
#include <flux/jobtap.h>
#include <flux/idset.h>
#include <jansson.h>
}

Expand All @@ -43,6 +44,9 @@ extern "C" {
#define DEFAULT_QUEUE_WEIGHT 10000
#define DEFAULT_AGE_WEIGHT 1000

// set up cores-per-node count for the system
size_t ncores_per_node = 0;

std::map<int, std::map<std::string, Association>> users;
std::map<std::string, Queue> queues;
std::map<int, std::string> users_def_bank;
Expand Down Expand Up @@ -254,9 +258,11 @@ static int query_cb (flux_plugin_t *p,

if (flux_plugin_arg_pack (args,
FLUX_PLUGIN_ARG_OUT,
"{s:O}",
"{s:O s:i}",
"mf_priority_map",
accounting_data) < 0)
accounting_data,
"ncores_per_node",
ncores_per_node) < 0)
flux_log_error (flux_jobtap_get_flux (p),
"mf_priority: query_cb: flux_plugin_arg_pack: %s",
flux_plugin_arg_strerror (args));
Expand Down Expand Up @@ -1184,6 +1190,49 @@ extern "C" int flux_plugin_init (flux_plugin_t *p)
priority_weights["queue"] = DEFAULT_QUEUE_WEIGHT;
priority_weights["age"] = DEFAULT_AGE_WEIGHT;

// initialize the plugin with total node and core counts
flux_t *h;
flux_future_t *f;
const char *core;

h = flux_jobtap_get_flux (p);
// This synchronous call to fetch R from the KVS is needed in order to
// validate and enforce resource limits on jobs. The job manager will
// block here while waiting for R when the plugin is loaded but it *should*
// occur over a very short time.
if (!(f = flux_kvs_lookup (h,
NULL,
FLUX_KVS_WAITCREATE,
"resource.R"))) {
flux_log_error (h, "flux_kvs_lookup");
return -1;
}
// Equal number of cores on all nodes in R is assumed here; thus, only
// the first entry is looked at
if (flux_kvs_lookup_get_unpack (f,
"{s{s[{s{s:s}}]}}",
"execution",
"R_lite",
"children",
"core", &core) < 0) {
flux_log_error (h, "flux_kvs_lookup_unpack");
return -1;
}

if (core == NULL) {
flux_log_error (h,
"mf_priority: could not get system "
"cores-per-node information");
return -1;
}

// calculate number of cores-per-node on system
idset* cores_decoded = idset_decode (core);
ncores_per_node = idset_count (cores_decoded);

flux_future_destroy (f);
idset_destroy (cores_decoded);

return 0;
}

Expand Down

0 comments on commit 5d1a247

Please sign in to comment.