Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plugin: add multifactor priority plugin #122

Merged
merged 14 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,8 @@ jobs:
- name: coverage report
if: success() && matrix.coverage
run: DOCKER_REPO= bash <(curl -s https://codecov.io/bash)

- name: annotate errors
if: failure() || cancelled()
env: ${{matrix.env}}
run: src/test/checks-annotate.sh
4 changes: 4 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ AC_SUBST(acct_db_path)
AC_CONFIG_MACRO_DIR([config])
X_AC_EXPAND_INSTALL_DIRS

fluxplugin_ldflags="-avoid-version -export-symbols-regex '^flux_plugin_init\$\$' --disable-static -shared -export-dynamic"
AC_SUBST(fluxplugin_ldflags)

AC_CONFIG_FILES([Makefile
src/Makefile
src/common/Makefile
Expand All @@ -125,6 +128,7 @@ AC_CONFIG_FILES([Makefile
src/bindings/python/fluxacct/accounting/Makefile
src/bindings/python/fluxacct/accounting/__init__.py
src/cmd/Makefile
src/plugins/Makefile
t/Makefile
])
AC_OUTPUT
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile.am
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SUBDIRS = common cmd fairness bindings
SUBDIRS = common cmd fairness bindings plugins
20 changes: 11 additions & 9 deletions src/bindings/python/fluxacct/accounting/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,16 @@ def create_db(
conn.execute(
"""
CREATE TABLE IF NOT EXISTS association_table (
creation_time bigint(20) NOT NULL,
mod_time bigint(20) DEFAULT 0 NOT NULL,
deleted tinyint(4) DEFAULT 0 NOT NULL,
username tinytext NOT NULL,
admin_level smallint(6) DEFAULT 1 NOT NULL,
bank tinytext NOT NULL,
shares int(11) DEFAULT 1 NOT NULL,
job_usage real DEFAULT 0.0 NOT NULL,
fairshare real DEFAULT 0.0 NOT NULL,
creation_time bigint(20) NOT NULL,
mod_time bigint(20) DEFAULT 0 NOT NULL,
deleted tinyint(4) DEFAULT 0 NOT NULL,
username tinytext NOT NULL,
userid int(11) DEFAULT 65534 NOT NULL,
admin_level smallint(6) DEFAULT 1 NOT NULL,
bank tinytext NOT NULL,
shares int(11) DEFAULT 1 NOT NULL,
job_usage real DEFAULT 0.0 NOT NULL,
fairshare real DEFAULT 0.5 NOT NULL,
PRIMARY KEY (username, bank)
);"""
)
Expand All @@ -122,6 +123,7 @@ def create_db(
"""
CREATE TABLE IF NOT EXISTS job_usage_factor_table (
username tinytext NOT NULL,
userid int(11) NOT NULL,
bank tinytext NOT NULL,
last_job_timestamp real DEFAULT 0.0,
PRIMARY KEY (username, bank)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def fetch_usg_bins(acct_conn, user=None, bank=None):
),
)

for val in dataframe.iloc[0].values[3:]:
for val in dataframe.iloc[0].values[4:]:
if isinstance(val, float):
past_usage_factors.append(val)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def test_02_create_association(self):
conn.execute(
"""
INSERT INTO association_table
(creation_time, mod_time, deleted, username, admin_level,
(creation_time, mod_time, deleted, username, userid, admin_level,
bank, shares)
VALUES
(0, 0, 0, "test user", 1, "test account", 0)
(0, 0, 0, "test user", 1234, 1, "test account", 0)
"""
)
cursor = conn.cursor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ def setUpClass(self):
b.add_bank(acct_conn, bank="D", parent_bank="B", shares=1)

# add users
u.add_user(acct_conn, username="1001", bank="C")
u.add_user(acct_conn, username="1002", bank="C")
u.add_user(acct_conn, username="1003", bank="D")
u.add_user(acct_conn, username="1004", bank="D")
u.add_user(acct_conn, username="1001", uid="1001", bank="C")
u.add_user(acct_conn, username="1002", uid="1002", bank="C")
u.add_user(acct_conn, username="1003", uid="1003", bank="D")
u.add_user(acct_conn, username="1004", uid="1004", bank="D")

jobid = 100
interval = 0 # add to job timestamps to diversify job-archive records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_01_add_valid_user(self):
u.add_user(
acct_conn,
username="fluxuser",
uid="1234",
admin_level="1",
bank="acct",
shares="10",
Expand All @@ -44,19 +45,21 @@ def test_01_add_valid_user(self):

self.assertEqual(num_rows_assoc_table, num_rows_job_usage_factor_table)

# adding a user with the same primary key (user_name, account) should
# adding a user with the same primary key (username, account) should
# return an IntegrityError
def test_02_add_duplicate_primary_key(self):
u.add_user(
acct_conn,
username="fluxuser",
uid="1234",
admin_level="1",
bank="acct",
shares="10",
)
u.add_user(
acct_conn,
username="fluxuser",
uid="1234",
admin_level="1",
bank="acct",
shares="10",
Expand All @@ -70,13 +73,15 @@ def test_03_add_duplicate_user(self):
u.add_user(
acct_conn,
username="dup_user",
uid="5678",
admin_level="1",
bank="acct",
shares="10",
)
u.add_user(
acct_conn,
username="dup_user",
uid="5678",
admin_level="1",
bank="other_acct",
shares="10",
Expand Down
36 changes: 33 additions & 3 deletions src/bindings/python/fluxacct/accounting/user_subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
###############################################################
import sqlite3
import time
import pwd

import pandas as pd

Expand All @@ -30,7 +31,32 @@ def view_user(conn, user):
print(e_database_error)


def add_user(conn, username, bank, admin_level=1, shares=1):
def get_uid(username):
try:
return pwd.getpwnam(username).pw_uid
except KeyError:
return str(username)


def add_user(
conn,
username,
bank,
uid=65534,
admin_level=1,
shares=1,
):

# get uid of user
fetched_uid = get_uid(username)

try:
if isinstance(fetched_uid, int):
uid = fetched_uid
else:
raise KeyError
except KeyError as key_error:
print(key_error)

try:
# insert the user values into association_table
Expand All @@ -41,17 +67,19 @@ def add_user(conn, username, bank, admin_level=1, shares=1):
mod_time,
deleted,
username,
userid,
admin_level,
bank,
shares
)
VALUES (?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
int(time.time()),
int(time.time()),
0,
username,
uid,
admin_level,
bank,
shares,
Expand All @@ -64,12 +92,14 @@ def add_user(conn, username, bank, admin_level=1, shares=1):
"""
INSERT INTO job_usage_factor_table (
username,
userid,
bank
)
VALUES (?, ?)
VALUES (?, ?, ?)
""",
(
username,
uid,
bank,
),
)
Expand Down
6 changes: 6 additions & 0 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ def add_add_user_arg(subparsers):
help="username",
metavar="USERNAME",
)
subparser_add_user.add_argument(
"--userid",
help="userid",
metavar="USERID",
)
subparser_add_user.add_argument(
"--admin-level",
help="admin level",
Expand Down Expand Up @@ -295,6 +300,7 @@ def select_accounting_function(args, conn, output_file, parser):
conn,
args.username,
args.bank,
args.userid,
args.admin_level,
args.shares,
args.max_jobs,
Expand Down
9 changes: 9 additions & 0 deletions src/plugins/Makefile.am
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
AM_LDFLAGS = -module -shared $(CODE_COVERAGE_LDFLAGS)

AM_CPPFLAGS = -I$(top_srcdir) $(FLUX_CORE_CFLAGS)

AM_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS) -fPIC -shared

lib_LTLIBRARIES = mf_priority.la
mf_priority_la_SOURCES = mf_priority.cpp
mf_priority_la_LDFLAGS = $(fluxplugin_ldflags) -module
76 changes: 76 additions & 0 deletions src/plugins/bulk_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python3

###############################################################
# Copyright 2020 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import flux
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this file have a copyright header? I'm actually unsure, but might as well throw it in here just in case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, thank you for pointing this out. I just pushed up a fix that adds a copyright header. Sorry for forgetting to put one. 🤦‍♂️

import argparse
import sys
import os
import sqlite3

import fluxacct.accounting


def set_db_loc(args):
path = args.path if args.path else fluxacct.accounting.db_path

return path


def est_sqlite_conn(path):
# try to open database file; will exit with -1 if database file not found
if not os.path.isfile(path):
print(f"Database file does not exist: {path}", file=sys.stderr)
sys.exit(1)

db_uri = "file:" + path + "?mode=rw"
try:
conn = sqlite3.connect(db_uri, uri=True)
# set foreign keys constraint
conn.execute("PRAGMA foreign_keys = 1")
except sqlite3.OperationalError:
print(f"Unable to open database file: {db_uri}", file=sys.stderr)
sys.exit(1)

return conn


def bulk_update(path):
conn = est_sqlite_conn(path)
cur = conn.cursor()

# fetch all rows from association_table (will print out tuples)
for row in cur.execute("SELECT userid, bank, fairshare FROM association_table"):
# create a JSON payload with the results of the query
data = {"userid": str(row[0]), "bank": str(row[1]), "fairshare": str(row[2])}

flux.Flux().rpc("job-manager.mf_priority.rec_update", data).get()


def main():
parser = argparse.ArgumentParser(
description="""
Description: Send a bulk update of user information from a
flux-accounting database to the multi-factor priority plugin.
"""
)

parser.add_argument(
"-p", "--path", dest="path", help="specify location of database file"
)
args = parser.parse_args()

path = set_db_loc(args)

bulk_update(path)


if __name__ == "__main__":
main()
Loading