Skip to content

Commit

Permalink
Add asynchronous ability to simulated engine. (#4811)
Browse files Browse the repository at this point in the history
* Add asynchronous ability to simulated engine.

Adds ASYNCHRONOUS type which starts the simulation
immediately in the background and then reports it
as RUNNING until it is completed.  Once completed
or if results is called, the state will be SUCCESS.
  • Loading branch information
dstrain115 authored Jan 31, 2022
1 parent d7a3906 commit 8c8ee43
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 14 deletions.
56 changes: 43 additions & 13 deletions cirq-google/cirq_google/engine/simulated_local_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
and a provided sampler to execute circuits."""
from typing import cast, List, Optional, Sequence, Tuple

import concurrent.futures

import cirq
from cirq_google.engine.client import quantum
from cirq_google.engine.calibration_result import CalibrationResult
Expand Down Expand Up @@ -55,6 +57,16 @@ def __init__(
self._type = simulation_type
self._failure_code = ''
self._failure_message = ''
if self._type == LocalSimulationType.ASYNCHRONOUS:
# If asynchronous mode, just kick off a new task and move on.
self._thread = concurrent.futures.ThreadPoolExecutor(max_workers=1)
try:
self._future = self._thread.submit(self._execute_results)
finally:
# We only expect the one future to run in this thread,
# So we can call shutdown immediately, which will
# close the thread pool once the future is complete.
self._thread.shutdown(wait=False)

def execution_status(self) -> quantum.enums.ExecutionStatus.State:
"""Return the execution status of the job."""
Expand Down Expand Up @@ -98,22 +110,40 @@ def batched_results(self) -> Sequence[Sequence[cirq.Result]]:
raise e
raise ValueError('Unsupported simulation type {self._type}')

def _execute_results(self) -> Sequence[cirq.Result]:
"""Executes the circuit and sweeps on the sampler.
For synchronous execution, this is called when the results()
function is called. For asynchronous execution, this function
is run in a thread pool that begins when the object is
instantiated.
Returns: a List of results from the sweep's execution.
"""
reps, sweeps = self.get_repetitions_and_sweeps()
program = self.program().get_circuit()
try:
self._state = quantum.enums.ExecutionStatus.State.RUNNING
results = self._sampler.run_sweep(
program=program, params=sweeps[0] if sweeps else None, repetitions=reps
)
self._state = quantum.enums.ExecutionStatus.State.SUCCESS
return results
except Exception as e:
self._failure_code = '500'
self._failure_message = str(e)
self._state = quantum.enums.ExecutionStatus.State.FAILURE
raise e

def results(self) -> Sequence[cirq.Result]:
"""Returns the job results, blocking until the job is complete."""
if self._type == LocalSimulationType.SYNCHRONOUS:
reps, sweeps = self.get_repetitions_and_sweeps()
program = self.program().get_circuit()
try:
self._state = quantum.enums.ExecutionStatus.State.SUCCESS
return self._sampler.run_sweep(
program=program, params=sweeps[0] if sweeps else None, repetitions=reps
)
except Exception as e:
self._failure_code = '500'
self._failure_message = str(e)
self._state = quantum.enums.ExecutionStatus.State.FAILURE
raise e
raise ValueError('Unsupported simulation type {self._type}')
return self._execute_results()
elif self._type == LocalSimulationType.ASYNCHRONOUS:
return self._future.result()

else:
raise ValueError('Unsupported simulation type {self._type}')

def calibration_results(self) -> Sequence[CalibrationResult]:
"""Returns the results of a run_calibration() call.
Expand Down
20 changes: 19 additions & 1 deletion cirq-google/cirq_google/engine/simulated_local_job_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_unsupported_types():
parent_program=program,
repetitions=100,
sweeps=[],
simulation_type=LocalSimulationType.ASYNCHRONOUS,
simulation_type=LocalSimulationType.ASYNCHRONOUS_WITH_DELAY,
)
with pytest.raises(ValueError, match='Unsupported simulation type'):
job.results()
Expand Down Expand Up @@ -138,6 +138,24 @@ def test_failure():
assert 'Circuit contains ops whose symbols were not specified' in message


def test_run_async():
qubits = cirq.LineQubit.range(20)
c = cirq.testing.random_circuit(qubits, n_moments=20, op_density=1.0)
c.append(cirq.measure(*qubits))
program = ParentProgram([c], None)
job = SimulatedLocalJob(
job_id='test_job',
processor_id='test1',
parent_program=program,
repetitions=100,
sweeps=[],
simulation_type=LocalSimulationType.ASYNCHRONOUS,
)
assert job.execution_status() == quantum.enums.ExecutionStatus.State.RUNNING
_ = job.results()
assert job.execution_status() == quantum.enums.ExecutionStatus.State.SUCCESS


def test_run_calibration_unsupported():
program = ParentProgram([cirq.Circuit()], None)
job = SimulatedLocalJob(
Expand Down

0 comments on commit 8c8ee43

Please sign in to comment.