from __future__ import annotations
import ctypes
import logging
import sys
import time
import warnings
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
import numpy as np
from typing_extensions import deprecated
from qcodes.instrument import Instrument, InstrumentBaseKWArgs
from qcodes.utils import QCoDeSDeprecationWarning
from .ats_api import AlazarATSAPI
from .constants import NUMBER_OF_CHANNELS_FROM_BYTE_REPR, max_buffer_size
from .helpers import CapabilityHelper
from .utils import TraceParameter
if TYPE_CHECKING:
from collections.abc import Iterator, Sequence
from typing_extensions import Unpack
logger = logging.getLogger(__name__)
OutputType = TypeVar("OutputType")
CtypesTypes = (
type[ctypes.c_uint8]
| type[ctypes.c_uint16]
| type[ctypes.c_uint32]
| type[ctypes.c_int32]
| type[ctypes.c_float]
)
[docs]
class AlazarTechATS(Instrument):
"""
This is the BaseClass for the qcodes drivers for Alazar data acquisition cards.
This should not be instantiated directly, but should be subclassed for a specific
card should be used.
Args:
name: name for this instrument
system_id: target system id for this board
board_id: target board id within the system for this board
dll_path: path to the ATS driver dll library file
api: AlazarATSAPI interface, defaults to the dll api. This argument
makes it possible to provide another api, e.g. for a simulated
driver for which the binary Alazar drivers do not need to be
installed.
"""
# override dll_path in your init script or in the board constructor
# if you have it somewhere else
dll_path = "C:\\WINDOWS\\System32\\ATSApi"
api: AlazarATSAPI
# override channels in a subclass if needed
channels = 2
[docs]
@classmethod
def find_boards(cls, dll_path: str | None = None) -> list[dict[str, Any]]:
"""
Find connected Alazar boards
Args:
dll_path: path to the Alazar API DLL library
Returns:
list of board info dictionaries for each connected board
"""
api = AlazarATSAPI(dll_path or cls.dll_path)
system_count = api.num_of_systems()
boards = []
for system_id in range(1, system_count + 1):
board_count = api.boards_in_system_by_system_id(system_id)
for board_id in range(1, board_count + 1):
boards.append(cls.get_board_info(api, system_id, board_id))
return boards
[docs]
@classmethod
def get_board_info(
cls, api: AlazarATSAPI, system_id: int, board_id: int
) -> dict[str, str | int]:
"""
Get the information from a connected Alazar board
Args:
api: An AlazarATSAPI that wraps around the CTypes CDLL
system_id: id of the Alazar system
board_id: id of the board within the alazar system
Return:
Dictionary containing
- system_id
- board_id
- board_kind (as string)
- max_samples
- bits_per_sample
"""
# make a temporary instrument for this board, to make it easier
# to get its info
board = cls(
f"alazar_temp_{time.perf_counter_ns()}",
system_id=system_id,
board_id=board_id,
)
handle = board._handle
board_model = api.get_board_model(handle)
max_s, bps = api.get_channel_info_(handle)
board.close()
return {
"system_id": system_id,
"board_id": board_id,
"board_kind": board_model,
"max_samples": max_s,
"bits_per_sample": bps,
}
def __init__(
self,
name: str,
system_id: int = 1,
board_id: int = 1,
dll_path: str | None = None,
api: AlazarATSAPI | None = None,
**kwargs: Unpack[InstrumentBaseKWArgs],
) -> None:
super().__init__(name, **kwargs)
self.api = api or AlazarATSAPI(dll_path or self.dll_path)
self._parameters_synced = False
self._handle = self.api.get_board_by_system_id(system_id, board_id)
if not self._handle:
raise Exception(
f"AlazarTech_ATS not found at system {system_id}, board {board_id}"
)
self.capability = CapabilityHelper(self.api, self._handle)
self.buffer_list: list[Buffer] = []
[docs]
def get_idn(self) -> dict[str, str | int | None]: # type: ignore[override]
# TODO return type is inconsistent with the super class. We should consider
# if ints and floats are allowed as values in the dict
"""
This methods gets the most relevant information of this instrument
The firmware version reported should match the version number of
downloadable fw files from AlazarTech. But note that the firmware
version has often been found to be incorrect for several firmware
versions. At the time of writing it is known to be correct for the
9360 (v 21.07) and 9373 (v 30.04) but incorrect for several earlier
versions. In Alazar DSO this is reported as FPGA Version.
Returns:
Dictionary containing
- 'firmware': as string
- 'model': as string
- 'serial': board serial number
- 'vendor': 'AlazarTech'
- 'CPLD_version': version of the CPLD
- 'driver_version': version of the driver dll
- 'SDK_version': version of the SDK
- 'latest_cal_date': date of the latest calibration (as string)
- 'memory_size': size of the memory in samples
- 'asopc_type': type of asopc (as decimal number)
- 'pcie_link_speed': the speed of a single pcie link (in GB/s)
- 'pcie_link_width': number of pcie links
- 'bits_per_sample': number of bits per one sample
- 'max_samples': board memory size in samples
"""
max_s, bps = self.api.get_channel_info_(self._handle)
pcie_link_speed = str(self.capability.query_pcie_link_speed()) + "GB/s"
return {
"firmware": self.capability.query_firmware_version(),
"model": self.api.get_board_model(self._handle),
"max_samples": max_s,
"bits_per_sample": bps,
"serial": self.capability.query_serial(),
"vendor": "AlazarTech",
"CPLD_version": self.api.get_cpld_version_(self._handle),
"driver_version": self.api.get_driver_version_(),
"SDK_version": self.api.get_sdk_version_(),
"latest_cal_date": self.capability.query_latest_calibration(),
"memory_size": str(self.capability.query_memory_size()),
"asopc_type": self.capability.query_asopc_type(),
"pcie_link_speed": pcie_link_speed,
"pcie_link_width": str(self.capability.query_pcie_link_width()),
}
[docs]
@contextmanager
def syncing(self) -> Iterator[None]:
"""
Context manager for syncing settings to Alazar card. It will
automatically call sync_settings_to_card at the end of the
context.
Example:
This is intended to be used around multiple parameter sets
to ensure syncing is done exactly once::
with alazar.syncing():
alazar.trigger_source1('EXTERNAL')
alazar.trigger_level1(100)
"""
yield
self.sync_settings_to_card()
[docs]
def sync_settings_to_card(self) -> None:
"""
Syncs all parameters to Alazar card
"""
if self.clock_source() == "EXTERNAL_CLOCK_10MHz_REF":
sample_rate = self.external_sample_rate
if self.external_sample_rate() == "UNDEFINED":
raise RuntimeError(
"Using external 10 MHz Ref but external sample_rate is not set"
)
if self.sample_rate() != "UNDEFINED":
warnings.warn(
"Using external 10 MHz Ref but parameter sample_"
"rate is set. This will have no effect and "
"is ignored"
)
# mark the unused parameter as up to date
self.sample_rate._set_updated()
else:
if self.sample_rate() == "UNDEFINED":
raise RuntimeError(
"Using Internal clock but parameter sample_rate is not set"
)
if self.external_sample_rate() != "UNDEFINED":
warnings.warn(
"Using Internal clock but parameter external_sample_rate is set."
"This will have no effect and is ignored"
)
# mark the unused parameter as up to date
self.external_sample_rate._set_updated()
sample_rate = self.sample_rate
self.api.set_capture_clock(
self._handle,
self.clock_source,
sample_rate,
self.clock_edge,
self.decimation,
)
for i in range(1, self.channels + 1):
self.api.input_control(
self._handle,
2 ** (i - 1),
self.parameters["coupling" + str(i)],
self.parameters["channel_range" + str(i)],
self.parameters["impedance" + str(i)],
)
if self.parameters.get("bwlimit" + str(i), None) is not None:
self.api.set_bw_limit(
self._handle, 2 ** (i - 1), self.parameters["bwlimit" + str(i)]
)
self.api.set_trigger_operation(
self._handle,
self.trigger_operation,
self.trigger_engine1,
self.trigger_source1,
self.trigger_slope1,
self.trigger_level1,
self.trigger_engine2,
self.trigger_source2,
self.trigger_slope2,
self.trigger_level2,
)
self.api.set_external_trigger(
self._handle, self.external_trigger_coupling, self.external_trigger_range
)
self.api.set_trigger_delay(self._handle, self.trigger_delay)
self.api.set_trigger_time_out(self._handle, self.timeout_ticks)
self.api.configure_aux_io(self._handle, self.aux_io_mode, self.aux_io_param)
self._parameters_synced = True
[docs]
def allocate_and_post_buffer(
self, sample_type: CtypesTypes, n_bytes: int
) -> Buffer:
buffer = Buffer(sample_type, n_bytes)
self.api.post_async_buffer(
self._handle, ctypes.cast(buffer.addr, ctypes.c_void_p), buffer.size_bytes
)
return buffer
[docs]
def acquire( # noqa: D417 (missing args documentation)
self,
mode: str | None = None,
samples_per_record: int | None = None,
records_per_buffer: int | None = None,
buffers_per_acquisition: int | None = None,
channel_selection: str | None = None,
transfer_offset: int | None = None,
external_startcapture: str | None = None,
enable_record_headers: str | None = None,
alloc_buffers: str | None = None,
fifo_only_streaming: str | None = None,
interleave_samples: str | None = None,
get_processed_data: str | None = None,
allocated_buffers: int | None = None,
buffer_timeout: int | None = None,
acquisition_controller: AcquisitionController[OutputType] | None = None,
) -> OutputType:
"""
perform a single acquisition with the Alazar board, and set certain
parameters to the appropriate values
for the parameters, see the ATS-SDK programmer's guide
Args:
mode:
samples_per_record:
records_per_buffer:
buffers_per_acquisition:
channel_selection:
transfer_offset:
external_startcapture:
enable_record_headers:
alloc_buffers:
fifo_only_streaming:
interleave_samples:
get_processed_data:
allocated_buffers:
buffer_timeout:
acquisition_controller: An instance of an acquisition controller
that handles the dataflow of an acquisition
Returns:
Whatever is given by acquisition_controller.post_acquire method
"""
if acquisition_controller is None:
raise RuntimeError("Cannot call acquire without an acquisition_controller")
# region set parameters from args
start_func = time.perf_counter()
if self._parameters_synced is False:
raise RuntimeError(
"You must sync parameters to Alazar card "
"before calling acquire by calling "
"sync_settings_to_card"
)
self._set_if_present("mode", mode)
self._set_if_present("samples_per_record", samples_per_record)
self._set_if_present("records_per_buffer", records_per_buffer)
self._set_if_present("buffers_per_acquisition", buffers_per_acquisition)
self._set_if_present("channel_selection", channel_selection)
self._set_if_present("transfer_offset", transfer_offset)
self._set_if_present("external_startcapture", external_startcapture)
self._set_if_present("enable_record_headers", enable_record_headers)
self._set_if_present("alloc_buffers", alloc_buffers)
self._set_if_present("fifo_only_streaming", fifo_only_streaming)
self._set_if_present("interleave_samples", interleave_samples)
self._set_if_present("get_processed_data", get_processed_data)
self._set_if_present("allocated_buffers", allocated_buffers)
self._set_if_present("buffer_timeout", buffer_timeout)
# endregion
mode = self.mode.get()
if mode not in ("TS", "NPT"):
raise Exception(
"Only the 'TS' and 'NPT' modes are implemented at this point"
)
# -----set final configurations-----
buffers_per_acquisition = cast(int, self.buffers_per_acquisition())
samples_per_record = cast(int, self.samples_per_record())
records_per_buffer = cast(int, self.records_per_buffer())
# bits per sample
_, bits_per_sample = self.api.get_channel_info_(self._handle)
# channels
channels_binrep = self.channel_selection.raw_value
number_of_channels = self.get_num_channels(channels_binrep)
# In the following we need to consider the size of the buffer
# in two different scenarios as several Alazar cards have sample sizes
# that are in fractions of bytes. (such as 12 bits).
# We are transferring data padded to
# whole bytes. I.e a sample of 12 bits will take up 16 bits when
# transferred so we are allocating buffers of that size.
# However, when calculating internal limitations on the card we are
# using the fractional sizes of samples
# number of bytes per sample rounded up to the nearest integer
whole_bytes_per_sample = (bits_per_sample + 7) // 8
transfer_record_size = whole_bytes_per_sample * samples_per_record
transfer_buffer_size = (
transfer_record_size * records_per_buffer * number_of_channels
)
sample_type: type[ctypes.c_uint16] | type[ctypes.c_uint8] = (
ctypes.c_uint16 if whole_bytes_per_sample > 1 else ctypes.c_uint8
)
internal_buffer_size_requested = (
bits_per_sample * samples_per_record * records_per_buffer
) // 8
if mode == "TS":
transfer_buffer_size //= buffers_per_acquisition
internal_buffer_size_requested //= buffers_per_acquisition
if internal_buffer_size_requested > max_buffer_size:
raise RuntimeError(
f"Requested a buffer of size: "
f"{internal_buffer_size_requested / 1024 ** 2}"
f" MB. The maximum supported size is "
f"{max_buffer_size / 1024 ** 2} MB "
f"(recommended is <8MB)."
)
# Set record size for NPT mode
if mode == "NPT":
pretriggersize = 0 # pretriggersize is 0 for NPT always
post_trigger_size = samples_per_record
self.api.set_record_size(self._handle, pretriggersize, post_trigger_size)
# set acquisition parameters here for NPT, TS mode
samples_per_buffer = 0
acquire_flags = (
self.mode.raw_value
| self.external_startcapture.raw_value
| self.enable_record_headers.raw_value
| self.alloc_buffers.raw_value
| self.fifo_only_streaming.raw_value
| self.interleave_samples.raw_value
| self.get_processed_data.raw_value
)
if mode == "NPT":
records_per_acquisition = records_per_buffer * buffers_per_acquisition
self.api.before_async_read(
self._handle,
self.channel_selection.raw_value,
self.transfer_offset.raw_value,
samples_per_record,
records_per_buffer,
records_per_acquisition,
acquire_flags,
)
elif mode == "TS":
if samples_per_record % buffers_per_acquisition != 0:
self.log.warning(
"buffers_per_acquisition is not a divisor "
"of samples per record which it should be "
"in TS mode, rounding down in samples per "
"buffer calculation"
)
samples_per_buffer = int(samples_per_record / buffers_per_acquisition)
if self.records_per_buffer() != 1:
self.log.warning(
"records_per_buffer should be 1 in TS mode, defauling to 1"
)
self.records_per_buffer.set(1)
records_per_buffer = cast(int, self.records_per_buffer())
self.api.before_async_read(
self._handle,
self.channel_selection.raw_value,
self.transfer_offset.raw_value,
samples_per_buffer,
records_per_buffer,
buffers_per_acquisition,
acquire_flags,
)
self.clear_buffers()
# make sure that allocated_buffers <= buffers_per_acquisition
allocated_buffers = cast(int, self.allocated_buffers())
buffers_per_acquisition = cast(int, self.buffers_per_acquisition())
if allocated_buffers > buffers_per_acquisition:
self.log.warning(
"'allocated_buffers' should be <= "
"'buffers_per_acquisition'. Defaulting "
"'allocated_buffers' to "
f"{buffers_per_acquisition}"
)
self.allocated_buffers.set(buffers_per_acquisition)
allocated_buffers = cast(int, self.allocated_buffers())
buffer_recycling = buffers_per_acquisition > allocated_buffers
# post buffers to Alazar
try:
for _ in range(allocated_buffers):
buf = self.allocate_and_post_buffer(sample_type, transfer_buffer_size)
self.buffer_list.append(buf)
# -----start capture here-----
acquisition_controller.pre_start_capture()
start = time.perf_counter() # Keep track of when acquisition started
# call the startcapture method
self.api.start_capture(self._handle)
acquisition_controller.pre_acquire()
# buffer handling from acquisition
buffers_completed = 0
bytes_transferred = 0
buffer_timeout = cast(int, self.buffer_timeout())
done_setup = time.perf_counter()
while buffers_completed < self.buffers_per_acquisition.get():
# Wait for the buffer at the head of the list of available
# buffers to be filled by the board.
buf = self.buffer_list[buffers_completed % allocated_buffers]
self.api.wait_async_buffer_complete(
self._handle, ctypes.cast(buf.addr, ctypes.c_void_p), buffer_timeout
)
acquisition_controller.buffer_done_callback(buffers_completed)
# if buffers must be recycled, extract data and repost them
# otherwise continue to next buffer
if buffer_recycling:
acquisition_controller.handle_buffer(buf.buffer, buffers_completed)
self.api.post_async_buffer(
self._handle,
ctypes.cast(buf.addr, ctypes.c_void_p),
buf.size_bytes,
)
buffers_completed += 1
bytes_transferred += buf.size_bytes
finally:
# stop measurement here
done_capture = time.perf_counter()
self.api.abort_async_read(self._handle)
time_done_abort = time.perf_counter()
# -----cleanup here-----
# extract data if not yet done
if not buffer_recycling:
for i, buf in enumerate(self.buffer_list):
acquisition_controller.handle_buffer(buf.buffer, i)
time_done_handling = time.perf_counter()
# free up memory
self.clear_buffers()
time_done_free_mem = time.perf_counter()
# check if all parameters are up to date
# Getting IDN is very slow so skip that
for _, p in self.parameters.items():
if isinstance(p, TraceParameter):
if p.synced_to_card is False:
raise RuntimeError(
f"TraceParameter {p} not synced to "
f"Alazar card detected. Aborting. Data "
f"may be corrupt"
)
# Compute the total transfer time, and display performance information.
end_time = time.perf_counter()
tot_time = end_time - start_func
transfer_time_sec = end_time - start
presetup_time = start - start_func
setup_time = done_setup - start
capture_time = done_capture - done_setup
abort_time = time_done_abort - done_capture
handling_time = time_done_handling - time_done_abort
free_mem_time = time_done_free_mem - time_done_handling
buffers_per_sec: float = 0
bytes_per_sec: float = 0
records_per_sec: float = 0
if transfer_time_sec > 0:
buffers_per_sec = buffers_completed / transfer_time_sec
bytes_per_sec = bytes_transferred / transfer_time_sec
records_per_sec = records_per_buffer * buffers_completed / transfer_time_sec
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug(
"Captured %d buffers (%f buffers per sec)",
buffers_completed,
buffers_per_sec,
)
self.log.debug(
"Captured %d records (%f records per sec)",
records_per_buffer * buffers_completed,
records_per_sec,
)
self.log.debug(
"Transferred %d bytes (%f bytes per sec)",
bytes_transferred,
bytes_per_sec,
)
self.log.debug(f"Pre setup took {presetup_time}")
self.log.debug(f"Pre capture setup took {setup_time}")
self.log.debug(f"Capture took {capture_time}")
self.log.debug(f"abort took {abort_time}")
self.log.debug(f"handling took {handling_time}")
self.log.debug(f"free mem took {free_mem_time}")
self.log.debug(f"tot acquire time is {tot_time}")
# return result
return acquisition_controller.post_acquire()
def _set_if_present(self, param_name: str, value: str | float | None) -> None:
if value is not None:
parameter = self.parameters[param_name]
parameter.set(value)
def _set_list_if_present(
self, param_base: str, value: Sequence[str | float]
) -> None:
if value is not None:
for i, v in enumerate(value):
parameter = self.parameters[param_base + str(i + 1)]
parameter.set(v)
[docs]
def clear_buffers(self) -> None:
"""
This method uncommits all buffers that were committed by the driver.
This method only has to be called when the acquistion crashes, otherwise
the driver will uncommit the buffers itself
Returns:
None
"""
for b in self.buffer_list:
b.free_mem()
self.log.debug("buffers cleared")
self.buffer_list = []
[docs]
def signal_to_volt(self, channel: int, signal: float) -> float:
"""
convert a value from a buffer to an actual value in volts based on the
ranges of the channel
Args:
channel: number of the channel where the signal value came from
signal: the value that needs to be converted
Returns:
the corresponding value in volts
"""
return ((signal - 127.5) / 127.5) * (
self.parameters["channel_range" + str(channel)].get()
)
[docs]
def get_sample_rate(self, include_decimation: bool = True) -> float:
"""
Obtain the effective sampling rate of the acquisition
based on clock speed and decimation
Returns:
the number of samples (per channel) per second
"""
if (
self.clock_source.get() == "EXTERNAL_CLOCK_10MHz_REF"
and "external_sample_rate" in self.parameters
):
rate = self.external_sample_rate.get()
# if we are using an external ref clock the sample rate
# is set as an integer and not value mapped so we use a different
# parameter to represent it
elif self.sample_rate.get() == "EXTERNAL_CLOCK":
raise Exception(
"External clock is used, alazar driver "
"could not determine sample speed."
)
else:
rate = self.sample_rate.get()
if rate == "1GHz_REFERENCE_CLOCK":
rate = 1e9
if include_decimation:
decimation = self.decimation.get()
else:
decimation = 0
if decimation > 0:
return rate / decimation
else:
return rate
[docs]
@staticmethod
def get_num_channels(byte_rep: int) -> int:
"""
Return the number of channels for a specific channel mask
Each single channel is represented by a bitarray with one
non zero entry i.e. powers of two. All multichannel masks can be
constructed by summing the single channel ones. However, not all
configurations are supported. See table 4 Input Channel Configurations
on page 241 of the Alazar SDK manual. This contains the complete
mapping for all current Alazar cards. It's left to the driver to
ensure that only the ones supported for a specific card can be
selected
"""
n_ch = NUMBER_OF_CHANNELS_FROM_BYTE_REPR.get(byte_rep, None)
if n_ch is None:
raise RuntimeError(f"Invalid channel configuration {byte_rep!r} supplied")
return n_ch
def _read_register(self, offset: int) -> int:
return self.api.read_register_(self._handle, offset)
def _write_register(self, offset: int, value: int) -> None:
self.api.write_register_(self._handle, offset, value)
def _setup_ctypes_for_windll_lib_functions() -> None:
"""
Set up ``argtypes`` and ``restype`` for functions from ``ctypes.windll``
libraries, which are used in this module.
"""
if sys.platform == "win32":
ctypes.windll.kernel32.VirtualAlloc.argtypes = [
ctypes.c_void_p,
ctypes.c_long,
ctypes.c_long,
ctypes.c_long,
]
ctypes.windll.kernel32.VirtualAlloc.restype = ctypes.c_void_p
ctypes.windll.kernel32.VirtualFree.argtypes = [
ctypes.c_void_p,
ctypes.c_long,
ctypes.c_long,
]
ctypes.windll.kernel32.VirtualFree.restype = ctypes.c_int
_setup_ctypes_for_windll_lib_functions()
class Buffer:
"""Buffer suitable for DMA transfers.
AlazarTech digitizers use direct memory access (DMA) to transfer
data from digitizers to the computer's main memory. This class
abstracts a memory buffer on the host, and ensures that all the
requirements for DMA transfers are met.
Buffer export a 'buffer' member, which is a NumPy array view
of the underlying memory buffer
Args:
c_sample_type: The datatype of the buffer to create. Should be a valid
ctypes type.
size_bytes: The size of the buffer to allocate, in bytes.
"""
def __init__(self, c_sample_type: CtypesTypes, size_bytes: int):
self.size_bytes = size_bytes
self.buffer: np.ndarray
bytes_per_sample = {
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_int32: 4,
ctypes.c_float: 4,
}.get(c_sample_type, 0)
self._allocated = True
if sys.platform == "win32":
MEM_COMMIT = 0x1000
PAGE_READWRITE = 0x4
self.addr = ctypes.windll.kernel32.VirtualAlloc(
0, ctypes.c_long(size_bytes), MEM_COMMIT, PAGE_READWRITE
)
else:
self._allocated = True
ctypes_array = (c_sample_type * (size_bytes // bytes_per_sample))()
self.addr = ctypes.addressof(ctypes_array)
ctypes_array = (c_sample_type * (size_bytes // bytes_per_sample)).from_address(
self.addr
)
self.buffer = np.ctypeslib.as_array(ctypes_array)
self.ctypes_buffer = ctypes_array
def free_mem(self) -> None:
"""
uncommit memory allocated with this buffer object
"""
self._allocated = False
if sys.platform == "win32":
MEM_RELEASE = 0x8000
ctypes.windll.kernel32.VirtualFree(
ctypes.c_void_p(self.addr), 0, MEM_RELEASE
)
def __del__(self) -> None:
"""
If python garbage collects this object, __del__ should be called and it
is the last chance to uncommit the memory to prevent a memory leak.
This method is not very reliable so users should not rely on this
functionality
"""
if self._allocated:
self.free_mem()
logger.warning(
"Buffer prevented memory leak; Memory released to Windows.\n"
"Memory should have been released before buffer was deleted."
)
[docs]
class AcquisitionInterface(Generic[OutputType]):
"""
This class represents all choices that the end-user has to make regarding
the data-acquisition. this class should be subclassed to program these
choices.
The basic structure of an acquisition is:
- Call to :meth:`AlazarTech_ATS.acquire` internal configuration
- Call to :meth:`AcquisitionInterface.pre_start_capture`
- Call to the start capture of the Alazar board
- Call to :meth:`AcquisitionInterface.pre_acquire`
- Loop over all buffers that need to be acquired
dump each buffer to acquisitioncontroller.handle_buffer
(only if buffers need to be recycled to finish the acquisiton)
- Dump remaining buffers to :meth:`AcquisitionInterface.handle_buffer`
alazar internals
- Return return value from :meth:`AcquisitionController.post_acquire`
"""
[docs]
def pre_start_capture(self) -> None:
"""
Use this method to prepare yourself for the data acquisition
The Alazar instrument will call this method right before
'AlazarStartCapture' is called
"""
pass
[docs]
def pre_acquire(self) -> None:
"""
This method is called immediately after 'AlazarStartCapture' is called
"""
pass
[docs]
def handle_buffer(
self, buffer: np.ndarray, buffer_number: int | None = None
) -> None:
"""
This method should store or process the information that is contained
in the buffers obtained during the acquisition.
Args:
buffer: np.array with the data from the Alazar card
buffer_number: counter for which buffer we are handling
"""
raise NotImplementedError("This method should be implemented in a subclass")
[docs]
def post_acquire(self) -> OutputType:
"""
This method should return any information you want to save from this
acquisition. The acquisition method from the Alazar driver will use
this data as its own return value
Returns:
this function should return all relevant data that you want
to get form the acquisition
"""
raise NotImplementedError("This method should be implemented in a subclass")
[docs]
def buffer_done_callback(self, buffers_completed: int) -> None:
"""
This method is called when a buffer is completed. It can be used
if you want to implement an event that happens for each buffer.
You will probably want to combine this with `AUX_IN_TRIGGER_ENABLE`
to wait before starting capture of the next buffer.
Args:
buffers_completed: how many buffers have been completed and copied
to local memory at the time of this callback.
"""
pass
@deprecated(
"AlazarTech_ATS is deprecated, use AlazarTechATS instead.",
category=QCoDeSDeprecationWarning,
)
class AlazarTech_ATS(AlazarTechATS):
pass
[docs]
class AcquisitionController(Instrument, AcquisitionInterface[Any], Generic[OutputType]):
"""
Compatibility class. The methods of :class:`AcquisitionController`
have been extracted. This class is the base class fro AcquisitionInterfaces
that are intended to be QCoDeS instruments at the same time.
"""
def __init__(
self, name: str, alazar_name: str, **kwargs: Unpack[InstrumentBaseKWArgs]
):
"""
Args:
name: The name of the AcquisitionController
alazar_name: The name of the alazar instrument.
**kwargs: kwargs are forwarded to base class.
"""
super().__init__(name, **kwargs)
self._alazar: AlazarTechATS = self.find_instrument(
alazar_name, instrument_class=AlazarTechATS
)
def _get_alazar(self) -> AlazarTechATS:
"""
returns a reference to the alazar instrument. A call to self._alazar is
quicker, so use that if in need for speed
:return: reference to the Alazar instrument
"""
return self._alazar