Skip to content

Commit

Permalink
flux-simulator: apply black formatter
Browse files Browse the repository at this point in the history
  • Loading branch information
SteVwonder committed May 14, 2019
1 parent e303663 commit b2afbad
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions src/cmd/flux-simulator
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,9 @@ class Job(object):
logger.debug("Starting job {}".format(self.jobid))
self.start_time = start_time
self._start_msg = start_msg.copy()
flux_handle.respond(self._start_msg, payload={
"id": self.jobid,
"type": "start",
"data": {}
})
flux_handle.respond(
self._start_msg, payload={"id": self.jobid, "type": "start", "data": {}}
)

def complete(self):
raise NotImplementedError()
Expand Down Expand Up @@ -180,10 +178,7 @@ class Simulation(object):
def start_job(self, jobid, start_msg):
job = self.job_map[jobid]
job.start(self.flux_handle, start_msg, self.current_time)
self.add_event(
job.complete_time,
lambda: self.complete_job(job)
)
self.add_event(job.complete_time, lambda: self.complete_job(job))

def complete_job(self, job):
pass
Expand All @@ -199,10 +194,9 @@ class Simulation(object):
for event in events_at_time:
event()
logger.debug("Sending quiescent request for time {}".format(self.current_time))
self.flux_handle.rpc(
"job-manager.quiescent",
{"time":self.current_time}
).then(system_quiescent_continuation, arg=self)
self.flux_handle.rpc("job-manager.quiescent", {"time": self.current_time}).then(
system_quiescent_continuation, arg=self
)


def datetime_to_epoch(dt):
Expand Down Expand Up @@ -368,12 +362,13 @@ def job_exception_cb(flux_handle, watcher, msg, cb_args):
def sim_exec_start_cb(flux_handle, watcher, msg, simulation):
payload = msg.payload
logger.debug("Received sim-exec.start request. Payload: {}".format(payload))
jobid = payload['id']
jobid = payload["id"]
simulation.start_job(jobid, msg)


def exec_hello(flux_handle):
logger.debug("Registering sim-exec with job-manager")
flux_handle.rpc("job-manager.exec-hello", payload={"service":"sim-exec"}).get()
flux_handle.rpc("job-manager.exec-hello", payload={"service": "sim-exec"}).get()


def service_add(f, name):
Expand All @@ -390,16 +385,18 @@ def setup_watchers(flux_handle, simulation):
watchers = []
services = set()
for type_mask, topic, cb, args in [
(flux.constants.FLUX_MSGTYPE_EVENT, "job-state", job_state_cb, simulation),
(flux.constants.FLUX_MSGTYPE_REQUEST, "sim-exec.start", sim_exec_start_cb, simulation),
(flux.constants.FLUX_MSGTYPE_EVENT, "job-state", job_state_cb, simulation),
(
flux.constants.FLUX_MSGTYPE_REQUEST,
"sim-exec.start",
sim_exec_start_cb,
simulation,
),
]:
if type_mask == flux.constants.FLUX_MSGTYPE_EVENT:
flux_handle.event_subscribe(topic)
watcher = flux_handle.msg_watcher_create(
cb,
type_mask=type_mask,
topic_glob=topic,
args=args,
cb, type_mask=type_mask, topic_glob=topic, args=args
)
watcher.start()
watchers.append(watcher)
Expand Down Expand Up @@ -452,5 +449,6 @@ def main():
flux_handle.reactor_run(flux_handle.get_reactor(), 0)
teardown_watchers(flux_handle, watchers, services)


if __name__ == "__main__":
main()

0 comments on commit b2afbad

Please sign in to comment.