Source code for mimiq_qiskit.job
"""Qiskit ``JobV1`` returned by :meth:`MimiqBackend.run`.
The job executes in a background thread so ``backend.run`` returns
immediately and ``job.result()`` is the blocking call, matching the
Qiskit Aer and IBM provider conventions. Cancellation is not supported:
the MIMIQ client does not yet expose a ``stopExecution`` endpoint, so
``cancel`` raises rather than pretending to abort the remote job.
"""
from __future__ import annotations
import threading
import uuid
from typing import Callable
from qiskit.providers import JobStatus, JobV1
from qiskit.result import Result
from mimiq_qiskit.result import qcsresults_to_qiskit_result
[docs]
class MimiqJob(JobV1):
"""Background-thread job. One thread per ``run()`` call.
``work`` is a zero-argument callable returning the list of
``QCSResults`` (one per submitted circuit); the whole batch is one
MIMIQ job.
"""
def __init__(
self,
backend,
*,
qiskit_circuits,
work: Callable,
shots: int,
job_id: str | None = None,
):
super().__init__(backend, job_id or str(uuid.uuid4()))
self._qiskit_circuits = qiskit_circuits
self._work = work
self._shots = shots
self._qcs_results = None
self._exc: BaseException | None = None
self._thread: threading.Thread | None = None
self._status = JobStatus.INITIALIZING
# ── lifecycle ──────────────────────────────────────────────────────
[docs]
def submit(self) -> None:
if self._thread is not None:
raise RuntimeError("MimiqJob has already been submitted")
self._status = JobStatus.RUNNING
self._thread = threading.Thread(
target=self._target, name=f"mimiq-job-{self.job_id()}", daemon=True
)
self._thread.start()
def _target(self) -> None:
try:
self._qcs_results = self._work()
self._status = JobStatus.DONE
except BaseException as exc:
self._exc = exc
self._status = JobStatus.ERROR
# ── status ─────────────────────────────────────────────────────────
[docs]
def status(self) -> JobStatus:
return self._status
[docs]
def cancel(self) -> None:
raise NotImplementedError(
"MimiqJob does not support cancellation; the MIMIQ client "
"exposes no stopExecution endpoint yet"
)
# ── result ─────────────────────────────────────────────────────────
[docs]
def result(self, timeout: float | None = None) -> Result:
if self._thread is None:
raise RuntimeError("MimiqJob has not been submitted")
self._thread.join(timeout=timeout)
if self._thread.is_alive():
raise TimeoutError(
f"MimiqJob {self.job_id()} did not finish within {timeout}s"
)
if self._exc is not None:
raise self._exc
return qcsresults_to_qiskit_result(
self._qcs_results,
qiskit_circuits=self._qiskit_circuits,
backend_name=self.backend().name,
backend_version=self.backend().backend_version,
job_id=self.job_id(),
shots=self._shots,
)