From dbde93e923abdf0246b09f40e47f3e15c4af6341 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Mon, 6 May 2024 08:34:13 -0400 Subject: [PATCH 1/5] retrieve job_ids Signed-off-by: Akihiko Kuroda --- gateway/api/views.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/gateway/api/views.py b/gateway/api/views.py index 862843061..9d2023770 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -415,6 +415,9 @@ def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen logs = job.logs return Response({"logs": logs}) + def get_runtime_job(self, job): + return RuntimeJob.objects.filter(job=job) + @action(methods=["POST"], detail=True) def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argument """Stops job""" @@ -426,6 +429,11 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen job.status = Job.STOPPED job.save(update_fields=["status"]) message = "Job has been stopped." + runtime_jobs = self.get_runtime_job(job) + for runtime_job_entry in runtime_jobs: + print(runtime_job_entry.runtime_job) + + if job.compute_resource: if job.compute_resource.active: job_handler = get_job_handler(job.compute_resource.host) From fea165f93a1de72272e29524fb05272d19f4b5f4 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Tue, 7 May 2024 17:12:18 -0400 Subject: [PATCH 2/5] add runtime job cancel in serverless job.stop() --- client/qiskit_serverless/core/job.py | 22 ++++++++++++++------ gateway/api/views.py | 30 ++++++++++++++++++++++------ gateway/requirements.txt | 3 ++- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/client/qiskit_serverless/core/job.py b/client/qiskit_serverless/core/job.py index d1976a5d9..7285ad749 100644 --- a/client/qiskit_serverless/core/job.py +++ b/client/qiskit_serverless/core/job.py @@ -48,6 +48,7 @@ from ray.dashboard.modules.job.sdk import JobSubmissionClient from opentelemetry import trace +from qiskit_ibm_runtime import QiskitRuntimeService from qiskit_serverless.core.constants import ( OT_PROGRAM_NAME, @@ -130,7 +131,7 @@ def status(self, job_id: str): """Check status.""" raise NotImplementedError - def stop(self, job_id: str): + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): """Stops job/program.""" raise NotImplementedError @@ -166,7 +167,7 @@ def __init__(self, client: JobSubmissionClient): def status(self, job_id: str): return self._job_client.get_job_status(job_id).value - def stop(self, job_id: str): + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): return self._job_client.stop_job(job_id) def logs(self, job_id: str): @@ -239,7 +240,7 @@ def __init__(self): def status(self, job_id: str): return self._jobs[job_id]["status"] - def stop(self, job_id: str): + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): """Stops job/program.""" return f"job:{job_id} has already stopped" @@ -528,14 +529,23 @@ def status(self, job_id: str): return response_data.get("status", default_status) - def stop(self, job_id: str): + def stop(self, job_id: str, service: Optional[QiskitRuntimeService] = None): tracer = trace.get_tracer("client.tracer") with tracer.start_as_current_span("job.stop"): + if service: + data = { + "service": json.dumps(service, cls=QiskitObjectsEncoder), + } + else: + data = { + "service": None, + } response_data = safe_json_request( request=lambda: requests.post( f"{self.host}/api/{self.version}/jobs/{job_id}/stop/", headers={"Authorization": f"Bearer {self._token}"}, timeout=REQUESTS_TIMEOUT, + json=data, ) ) @@ -668,9 +678,9 @@ def status(self): """Returns status of the job.""" return _map_status_to_serverless(self._job_client.status(self.job_id)) - def stop(self): + def stop(self, service: Optional[QiskitRuntimeService] = None): """Stops the job from running.""" - return self._job_client.stop(self.job_id) + return self._job_client.stop(self.job_id, service=service) def logs(self) -> str: """Returns logs of the job.""" diff --git a/gateway/api/views.py b/gateway/api/views.py index 9d2023770..7ad244400 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -29,6 +29,11 @@ from rest_framework.decorators import action from rest_framework.generics import get_object_or_404 from rest_framework.response import Response + +from quantum_serverless.serializers.program_serializers import ( + QiskitObjectsDecoder, +) +from qiskit_ibm_runtime import RuntimeInvalidStateError from utils import sanitize_file_path from .models import VIEW_PROGRAM_PERMISSION, Program, Job, RuntimeJob @@ -415,9 +420,10 @@ def logs(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen logs = job.logs return Response({"logs": logs}) - def get_runtime_job(self, job): + def get_runtime_job(self, job): + """get runtime job for job""" return RuntimeJob.objects.filter(job=job) - + @action(methods=["POST"], detail=True) def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argument """Stops job""" @@ -430,10 +436,22 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen job.save(update_fields=["status"]) message = "Job has been stopped." runtime_jobs = self.get_runtime_job(job) - for runtime_job_entry in runtime_jobs: - print(runtime_job_entry.runtime_job) - - + if runtime_jobs and len(runtime_jobs) != 0: + if request.data.get("service"): + service = json.loads( + request.data.get("service"), cls=QiskitObjectsDecoder + ) + for runtime_job_entry in runtime_jobs: + jobinstance = service.job(runtime_job_entry.runtime_job) + if jobinstance: + try: + logger.info( + "canceling [%s]", runtime_job_entry.runtime_job + ) + jobinstance.cancel() + except RuntimeInvalidStateError: + logger.warning("cancel failed") + if job.compute_resource: if job.compute_resource.active: job_handler = get_job_handler(job.compute_resource.host) diff --git a/gateway/requirements.txt b/gateway/requirements.txt index 167d4a728..efb721b5f 100644 --- a/gateway/requirements.txt +++ b/gateway/requirements.txt @@ -18,4 +18,5 @@ drf-yasg>=1.21.7 cryptography>=41.0.1 # Django dependency, but we need a newer version (IBMQ#246) sqlparse>=0.5.0 - +qiskit_ibm_runtime +quantum_serverless From 7272d9792389e3963784b725918410d938e4b5cb Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Thu, 9 May 2024 13:34:43 -0400 Subject: [PATCH 3/5] review comments --- gateway/api/views.py | 9 ++------- gateway/requirements.txt | 4 ++-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/gateway/api/views.py b/gateway/api/views.py index 7ad244400..0c05b690f 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -30,10 +30,7 @@ from rest_framework.generics import get_object_or_404 from rest_framework.response import Response -from quantum_serverless.serializers.program_serializers import ( - QiskitObjectsDecoder, -) -from qiskit_ibm_runtime import RuntimeInvalidStateError +from qiskit_ibm_runtime import RuntimeInvalidStateError, QiskitRuntimeService from utils import sanitize_file_path from .models import VIEW_PROGRAM_PERMISSION, Program, Job, RuntimeJob @@ -438,9 +435,7 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen runtime_jobs = self.get_runtime_job(job) if runtime_jobs and len(runtime_jobs) != 0: if request.data.get("service"): - service = json.loads( - request.data.get("service"), cls=QiskitObjectsDecoder - ) + service = QiskitRuntimeService(**json.loads(request.data.get("service"), cls=json.JSONDecoder)["__value__"]) for runtime_job_entry in runtime_jobs: jobinstance = service.job(runtime_job_entry.runtime_job) if jobinstance: diff --git a/gateway/requirements.txt b/gateway/requirements.txt index efb721b5f..76bb34107 100644 --- a/gateway/requirements.txt +++ b/gateway/requirements.txt @@ -18,5 +18,5 @@ drf-yasg>=1.21.7 cryptography>=41.0.1 # Django dependency, but we need a newer version (IBMQ#246) sqlparse>=0.5.0 -qiskit_ibm_runtime -quantum_serverless +qiskit_ibm_runtime>=0.22.0 + From de0e2fa38e76b7c91ede72589ac82858617870a9 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Thu, 9 May 2024 13:38:12 -0400 Subject: [PATCH 4/5] lint --- gateway/api/views.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gateway/api/views.py b/gateway/api/views.py index 0c05b690f..33a3dd21d 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -435,7 +435,11 @@ def stop(self, request, pk=None): # pylint: disable=invalid-name,unused-argumen runtime_jobs = self.get_runtime_job(job) if runtime_jobs and len(runtime_jobs) != 0: if request.data.get("service"): - service = QiskitRuntimeService(**json.loads(request.data.get("service"), cls=json.JSONDecoder)["__value__"]) + service = QiskitRuntimeService( + **json.loads(request.data.get("service"), cls=json.JSONDecoder)[ + "__value__" + ] + ) for runtime_job_entry in runtime_jobs: jobinstance = service.job(runtime_job_entry.runtime_job) if jobinstance: From 107e82ab1699a163002041f670cdc11442446888 Mon Sep 17 00:00:00 2001 From: Akihiko Kuroda Date: Thu, 9 May 2024 13:42:27 -0400 Subject: [PATCH 5/5] lint --- gateway/api/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gateway/api/views.py b/gateway/api/views.py index 33a3dd21d..1c3a0eb53 100644 --- a/gateway/api/views.py +++ b/gateway/api/views.py @@ -30,7 +30,7 @@ from rest_framework.generics import get_object_or_404 from rest_framework.response import Response -from qiskit_ibm_runtime import RuntimeInvalidStateError, QiskitRuntimeService +from qiskit_ibm_runtime import RuntimeInvalidStateError, QiskitRuntimeService from utils import sanitize_file_path from .models import VIEW_PROGRAM_PERMISSION, Program, Job, RuntimeJob