Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Flux simulator #2561

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/bindings/python/_flux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ _core_build.py: $(MAKE_BINDING)
--modname _core \
--add_sub '.*va_list.*|||' \
--additional_headers src/bindings/python/_flux/callbacks.h \
src/common/libutil/fluid.h \
--ignore_header 'macros' \
--extra_source '#include <src/common/libutil/fluid.h>' \
src/include/flux/core.h

BUILT_SOURCES= _core.c
Expand Down
2 changes: 1 addition & 1 deletion src/bindings/python/flux/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
MOD = sys.modules[__name__]
# Inject enum/define names matching ^FLUX_[A-Z_]+$ into module
ALL_LIST = []
PATTERN = re.compile("^FLUX_[A-Z_]+")
PATTERN = re.compile("^(FLUX|FLUID)_[A-Z_]+")
for k in dir(lib):
if PATTERN.match(k):
setattr(MOD, k, getattr(lib, k))
Expand Down
58 changes: 58 additions & 0 deletions src/bindings/python/flux/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import six
import yaml

import flux.constants
from flux.wrapper import Wrapper
from flux.util import check_future_error, parse_fsd
from flux.future import Future
Expand Down Expand Up @@ -561,3 +562,60 @@ def from_command(
tasks = [{"command": command, "slot": "task", "count": task_count_dict}]
attributes = {"system": {"duration": 0}}
return cls(resources, tasks, attributes=attributes)

def convert_id(jobid, src="dec", dst="dec"):
valid_id_types = six.string_types + six.integer_types
if not any((isinstance(jobid, id_type) for id_type in valid_id_types)):
raise TypeError("Jobid must be an integer or string, not {}".format(type(jobid)))

valid_formats = ("dec", "hex", "kvs", "words")
if src not in valid_formats:
raise EnvironmentError(errno.EINVAL, "src must be one of {}", valid_formats)
if dst not in valid_formats:
raise EnvironmentError(errno.EINVAL, "dst must be one of {}", valid_formats)

if isinstance(jobid, six.text_type):
jobid = jobid.encode('utf-8')

if src == dst:
return jobid

dec_jobid = ffi.new('uint64_t [1]') # uint64_t*
if src == "dec":
dec_jobid = jobid
elif src == "hex":
if (lib.fluid_decode (jobid, dec_jobid, flux.constants.FLUID_STRING_DOTHEX) < 0):
raise EnvironmentError(errno.EINVAL, "malformed jobid: {}".format(src));
dec_jobid = dec_jobid[0]
elif src == "kvs":
if jobid[0:4] != 'job.':
raise EnvironmentError(errno.EINVAL, "missing 'job.' prefix")
if (lib.fluid_decode (jobid[4:], dec_jobid, flux.constants.FLUID_STRING_DOTHEX) < 0):
raise EnvironmentError(errno.EINVAL, "malformed jobid: {}".format(src));
dec_jobid = dec_jobid[0]
elif src == "words":
if (lib.fluid_decode (jobid, dec_jobid, flux.constants.FLUID_STRING_MNEMONIC) < 0):
raise EnvironmentError(errno.EINVAL, "malformed jobid: {}".format(src));
dec_jobid = dec_jobid[0]


buf_size = 64
buf = ffi.new('char []', buf_size)
def encode(id_format):
pass

if dst == 'dec':
return dec_jobid
elif dst == 'kvs':
key_len = RAW.flux_job_kvs_key(buf, buf_size, dec_jobid, ffi.NULL)
if key_len < 0:
raise RuntimeError("error enconding id")
return ffi.string(buf, key_len).decode('utf-8')
elif dst == 'hex':
if (lib.fluid_encode (buf, buf_size, dec_jobid, flux.constants.FLUID_STRING_DOTHEX) < 0):
raise RuntimeError("error enconding id")
return ffi.string(buf).decode('utf-8')
elif dst == 'words':
if (lib.fluid_encode (buf, buf_size, dec_jobid, flux.constants.FLUID_STRING_MNEMONIC) < 0):
raise RuntimeError("error enconding id")
return ffi.string(buf).decode('utf-8')
7 changes: 4 additions & 3 deletions src/bindings/python/flux/kvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,18 @@ class KVSWrapper(Wrapper):
RAW.flux_kvsitr_next.set_error_check(lambda x: False)


def get_key_direct(flux_handle, key):
def get_key_raw(flux_handle, key):
valp = ffi.new("char *[1]")
future = RAW.flux_kvs_lookup(flux_handle, None, 0, key)
RAW.flux_kvs_lookup_get(future, valp)
if valp[0] == ffi.NULL:
return None

ret = json.loads(ffi.string(valp[0]).decode("utf-8"))
ret = ffi.string(valp[0]).decode("utf-8")
RAW.flux_future_destroy(future)
return ret

def get_key_direct(flux_handle, key):
return json.loads(get_key_raw(flux_handle, key))

def exists(flux_handle, key):
try:
Expand Down
10 changes: 10 additions & 0 deletions src/bindings/python/flux/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def type(self, value):
def type_str(self):
return msg_typestr(self.type)

def copy(self, payload=True):
"""Duplicate message
:param payload: Whether the payload should be included in the message copy
:type payload: boolean
:return type: Message
"""
return Message(
type_id=self.type, handle=self.pimpl.copy(payload), destruct=True
)


# Residing here to avoid cyclic references

Expand Down
46 changes: 32 additions & 14 deletions src/bindings/python/flux/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# SPDX-License-Identifier: LGPL-3.0
###############################################################

import os
import re
import sys
import errno
Expand All @@ -27,6 +28,7 @@
"encode_topic",
"CLIMain",
"parse_fsd",
"modfind",
]


Expand All @@ -53,25 +55,41 @@ def func_wrapper(calling_obj, *args, **kwargs):


def encode_payload(payload):
# Convert payload to ffi.NULL or utf-8 string
if payload is None or payload == ffi.NULL:
payload = ffi.NULL
elif isinstance(payload, six.text_type):
payload = payload.encode("UTF-8")
elif not isinstance(payload, six.binary_type):
payload = json.dumps(payload, ensure_ascii=False).encode("UTF-8")
return payload

return ffi.NULL
try:
return six.ensure_binary(payload)
except TypeError:
return json.dumps(payload, ensure_ascii=False).encode("UTF-8")

def encode_topic(topic):
# Convert topic to utf-8 binary string
# Convert topic to utf-8 string
if topic is None or topic == ffi.NULL:
raise EnvironmentError(errno.EINVAL, "Topic must not be None/NULL")
elif isinstance(topic, six.text_type):
topic = topic.encode("UTF-8")
elif not isinstance(topic, six.binary_type):
errmsg = "Topic must be a string, not {}".format(type(topic))
raise TypeError(errno.EINVAL, errmsg)
return topic
try:
return six.ensure_binary(topic)
except TypeError:
errmsg = "Topic must be a string, not {}".format(topic, type(topic))
raise EnvironmentError(errno.EINVAL, errmsg)

def modfind(modname):
"""Search FLUX_MODULE_PATH for a shared library (.so) of a given name

:param modname: name of the module to search for
:type modname: str, bytes, unicode
:returns: path of the first matching shared library in FLUX_MODULE_PATH
"""
searchpath = os.getenv("FLUX_MODULE_PATH")
if searchpath is None:
raise ValueError("FLUX_MODULE_PATH not set")
modname = six.ensure_binary(modname)
ret = raw.modfind(searchpath, modname, ffi.NULL, ffi.NULL)
if ret is None:
raise EnvironmentError(
errno.ENOENT, "{} not found in module search path".format(modname)
)
return ret


class CLIMain(object):
Expand Down
Loading