#! /usr/bin/env python
# vim:fenc=utf-8
#
# Copyright © 2017 unga <giulioungaretti@me.com>
#
# Distributed under terms of the MIT license.
"""
Monitor a set of parameters in a background thread
stream output over websocket
To start monitor, run this file, or if qcodes is installed as a module:
``% python -m qcodes.monitor.monitor``
Add parameters to monitor in your measurement by creating a new monitor with a
list of parameters to monitor:
``monitor = qcodes.Monitor(param1, param2, param3, ...)``
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import socketserver
import time
import webbrowser
from asyncio import CancelledError
from collections import defaultdict
from contextlib import suppress
from importlib.resources import as_file, files
from threading import Event, Thread
from typing import TYPE_CHECKING, Any
import websockets
import websockets.exceptions
from qcodes.parameters import Parameter
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Sequence
from websockets.asyncio.server import ServerConnection
WEBSOCKET_PORT = 5678
SERVER_PORT = 3000
log = logging.getLogger(__name__)
def _get_metadata(
*parameters: Parameter, use_root_instrument: bool = True
) -> dict[str, Any]:
"""
Return a dictionary that contains the parameter metadata grouped by the
instrument it belongs to.
"""
metadata_timestamp = time.time()
# group metadata by instrument
metas: dict[Any, Any] = defaultdict(list)
for parameter in parameters:
# Get the latest value from the parameter,
# respecting the max_val_age parameter
meta: dict[str, float | str | None] = {}
meta["value"] = str(parameter.get_latest())
timestamp = parameter.get_latest.get_timestamp()
if timestamp is not None:
meta["ts"] = timestamp.timestamp()
else:
meta["ts"] = None
meta["name"] = parameter.label or parameter.name
meta["unit"] = parameter.unit
# find the base instrument that this parameter belongs to
if use_root_instrument:
baseinst = parameter.root_instrument
else:
baseinst = parameter.instrument
if baseinst is None:
metas["Unbound Parameter"].append(meta)
else:
metas[str(baseinst)].append(meta)
# Create list of parameters, grouped by instrument
parameters_out = []
for instrument, instrument_meta in metas.items():
temp = {"instrument": instrument, "parameters": instrument_meta}
parameters_out.append(temp)
state = {"ts": metadata_timestamp, "parameters": parameters_out}
return state
def _handler(
parameters: Sequence[Parameter], interval: float, use_root_instrument: bool = True
) -> Callable[[ServerConnection], Awaitable[None]]:
"""
Return the websockets server handler.
"""
async def server_func(websocket: ServerConnection) -> None:
"""
Create a websockets handler that sends parameter values to a listener
every "interval" seconds.
"""
while True:
try:
# Update the parameter values
try:
meta = _get_metadata(
*parameters, use_root_instrument=use_root_instrument
)
except ValueError:
log.exception("Error getting parameters")
break
log.debug("sending.. to %r", websocket)
await websocket.send(json.dumps(meta))
# Wait for interval seconds and then send again
await asyncio.sleep(interval)
except (CancelledError, websockets.exceptions.ConnectionClosed):
log.debug("Got CancelledError or ConnectionClosed", exc_info=True)
break
log.debug("Closing websockets connection")
return server_func
[docs]
class Monitor(Thread):
"""
QCodes Monitor - WebSockets server to monitor qcodes parameters.
"""
running: Monitor | None = None
def __init__(
self,
*parameters: Parameter,
interval: float = 1,
use_root_instrument: bool = True,
):
"""
Monitor qcodes parameters.
Args:
*parameters: Parameters to monitor.
interval: How often one wants to refresh the values.
use_root_instrument: Defines if parameters are grouped according to
parameter.root_instrument or parameter.instrument
"""
super().__init__(daemon=True)
# Check that all values are valid parameters
for parameter in parameters:
if not isinstance(parameter, Parameter):
raise TypeError(
f"We can only monitor QCodes Parameters, not {type(parameter)}"
)
self.loop: asyncio.AbstractEventLoop | None = None
self._stop_loop_future: asyncio.Future | None = None
self._parameters = parameters
self.loop_is_closed = Event()
self.server_is_started = Event()
self.handler = _handler(
parameters, interval=interval, use_root_instrument=use_root_instrument
)
log.debug("Start monitoring thread")
if Monitor.running:
# stop the old server
log.debug("Stopping and restarting server")
Monitor.running.stop()
self.start()
# Wait until the loop is running
self.server_is_started.wait(timeout=5)
if not self.server_is_started.is_set():
raise RuntimeError("Failed to start server")
Monitor.running = self
[docs]
def run(self) -> None:
"""
Start the event loop and run forever.
"""
log.debug("Running Websocket server")
async def run_loop() -> None:
self.loop = asyncio.get_running_loop()
self._stop_loop_future = self.loop.create_future()
async with websockets.serve(
self.handler, "127.0.0.1", WEBSOCKET_PORT, close_timeout=1
):
self.server_is_started.set()
await self._stop_loop_future
log.debug("Websocket server thread shutting down")
try:
asyncio.run(run_loop())
finally:
self.loop_is_closed.set()
[docs]
def update_all(self) -> None:
"""
Update all parameters in the monitor.
"""
for parameter in self._parameters:
# call get if it can be called without arguments
with suppress(TypeError):
parameter.get()
[docs]
def stop(self) -> None:
"""
Shutdown the server, close the event loop and join the thread.
Setting active Monitor to ``None``.
"""
self.join()
Monitor.running = None
[docs]
def join(self, timeout: float | None = None) -> None:
"""
Overwrite ``Thread.join`` to make sure server is stopped before
joining avoiding a potential deadlock.
"""
log.debug("Shutting down server")
if not self.is_alive():
# we run this check before trying to run to prevent a cryptic
# error message
log.debug("monitor is dead")
return
try:
if self.loop is not None and self._stop_loop_future is not None:
log.debug("Instructing server to stop event loop.")
self.loop.call_soon_threadsafe(self._stop_loop_future.set_result, True)
else:
log.debug("No event loop found. Cannot stop event loop.")
except RuntimeError:
# the above may throw a runtime error if the loop is already
# stopped in which case there is nothing more to do
log.exception("Could not close loop")
self.loop_is_closed.wait(timeout=5)
if not self.loop_is_closed.is_set():
raise RuntimeError("Failed to join loop")
log.debug("Loop reported closed")
super().join(timeout=timeout)
log.debug("Monitor Thread has joined")
[docs]
@staticmethod
def show() -> None:
"""
Overwrite this method to show/raise your monitor GUI
F.ex.
::
import webbrowser
url = "localhost:3000"
# Open URL in new window, raising the window if possible.
webbrowser.open_new(url)
"""
webbrowser.open(f"http://localhost:{SERVER_PORT}")
def main() -> None:
import http.server
# If this file is run, create a simple webserver that serves a simple
# website that can be used to view monitored parameters.
# # https://github.com/python/mypy/issues/4182
parent_module = ".".join(__loader__.name.split(".")[:-1]) # type: ignore[name-defined]
static_dir = files(parent_module).joinpath("dist")
try:
with as_file(static_dir) as extracted_dir:
os.chdir(extracted_dir)
log.info("Starting HTTP Server at http://localhost:%i", SERVER_PORT)
with socketserver.TCPServer(
("", SERVER_PORT), http.server.SimpleHTTPRequestHandler
) as httpd:
log.debug("serving directory %s", static_dir)
webbrowser.open(f"http://localhost:{SERVER_PORT}")
httpd.serve_forever()
except KeyboardInterrupt:
log.info("Shutting Down HTTP Server")
if __name__ == "__main__":
main()