import array as arr
import logging
import re
import struct
from collections import abc
from collections.abc import Sequence
from io import BytesIO
from time import localtime, sleep
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Literal,
NamedTuple,
cast,
)
import numpy as np
from pyvisa.errors import VisaIOError
from qcodes import validators as vals
from qcodes.instrument import VisaInstrument, VisaInstrumentKWArgs
if TYPE_CHECKING:
from typing_extensions import Unpack
from qcodes.parameters import Parameter
log = logging.getLogger(__name__)
class _MarkerDescriptor(NamedTuple):
marker: int
channel: int
def parsestr(v: str) -> str:
return v.strip().strip('"')
[docs]
class TektronixAWG5014(VisaInstrument):
"""
This is the QCoDeS driver for the Tektronix AWG5014
Arbitrary Waveform Generator.
The driver makes some assumptions on the settings of the instrument:
- The output channels are always in Amplitude/Offset mode
- The output markers are always in High/Low mode
Todo:
- Implement support for cable transfer function compensation
- Implement more instrument functionality in the driver
- Remove double functionality
- Remove inconsistensies between the name of a parameter and
the name of the same variable in the tektronix manual
In the future, we should consider the following:
* Removing test_send??
* That sequence element (SQEL) parameter functions exist but no
corresponding parameters.
"""
AWG_FILE_FORMAT_HEAD: ClassVar[dict[str, str]] = {
"SAMPLING_RATE": "d", # d
"REPETITION_RATE": "d", # # NAME?
"HOLD_REPETITION_RATE": "h", # True | False
"CLOCK_SOURCE": "h", # Internal | External
"REFERENCE_SOURCE": "h", # Internal | External
"EXTERNAL_REFERENCE_TYPE": "h", # Fixed | Variable
"REFERENCE_CLOCK_FREQUENCY_SELECTION": "h",
"REFERENCE_MULTIPLIER_RATE": "h",
"DIVIDER_RATE": "h", # 1 | 2 | 4 | 8 | 16 | 32 | 64 | 128 | 256
"TRIGGER_SOURCE": "h", # Internal | External
"INTERNAL_TRIGGER_RATE": "d",
"TRIGGER_INPUT_IMPEDANCE": "h", # 50 ohm | 1 kohm
"TRIGGER_INPUT_SLOPE": "h", # Positive | Negative
"TRIGGER_INPUT_POLARITY": "h", # Positive | Negative
"TRIGGER_INPUT_THRESHOLD": "d",
"EVENT_INPUT_IMPEDANCE": "h", # 50 ohm | 1 kohm
"EVENT_INPUT_POLARITY": "h", # Positive | Negative
"EVENT_INPUT_THRESHOLD": "d",
"JUMP_TIMING": "h", # Sync | Async
"INTERLEAVE": "h", # On | This setting is stronger than .
"ZEROING": "h", # On | Off
"COUPLING": "h", # The Off | Pair | All setting is weaker than .
"RUN_MODE": "h", # Continuous | Triggered | Gated | Sequence
"WAIT_VALUE": "h", # First | Last
"RUN_STATE": "h", # On | Off
"INTERLEAVE_ADJ_PHASE": "d",
"INTERLEAVE_ADJ_AMPLITUDE": "d",
}
AWG_FILE_FORMAT_CHANNEL: ClassVar[dict[str, str]] = {
# Include NULL.(Output Waveform Name for Non-Sequence mode)
"OUTPUT_WAVEFORM_NAME_N": "s",
"CHANNEL_STATE_N": "h", # On | Off
"ANALOG_DIRECT_OUTPUT_N": "h", # On | Off
"ANALOG_FILTER_N": "h", # Enum type.
"ANALOG_METHOD_N": "h", # Amplitude/Offset, High/Low
# When the Input Method is High/Low, it is skipped.
"ANALOG_AMPLITUDE_N": "d",
# When the Input Method is High/Low, it is skipped.
"ANALOG_OFFSET_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"ANALOG_HIGH_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"ANALOG_LOW_N": "d",
"MARKER1_SKEW_N": "d",
"MARKER1_METHOD_N": "h", # Amplitude/Offset, High/Low
# When the Input Method is High/Low, it is skipped.
"MARKER1_AMPLITUDE_N": "d",
# When the Input Method is High/Low, it is skipped.
"MARKER1_OFFSET_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"MARKER1_HIGH_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"MARKER1_LOW_N": "d",
"MARKER2_SKEW_N": "d",
"MARKER2_METHOD_N": "h", # Amplitude/Offset, High/Low
# When the Input Method is High/Low, it is skipped.
"MARKER2_AMPLITUDE_N": "d",
# When the Input Method is High/Low, it is skipped.
"MARKER2_OFFSET_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"MARKER2_HIGH_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"MARKER2_LOW_N": "d",
"DIGITAL_METHOD_N": "h", # Amplitude/Offset, High/Low
# When the Input Method is High/Low, it is skipped.
"DIGITAL_AMPLITUDE_N": "d",
# When the Input Method is High/Low, it is skipped.
"DIGITAL_OFFSET_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"DIGITAL_HIGH_N": "d",
# When the Input Method is Amplitude/Offset, it is skipped.
"DIGITAL_LOW_N": "d",
"EXTERNAL_ADD_N": "h", # AWG5000 only
"PHASE_DELAY_INPUT_METHOD_N": "h", # Phase/DelayInme/DelayInints
"PHASE_N": "d", # When the Input Method is not Phase, it is skipped.
# When the Input Method is not DelayInTime, it is skipped.
"DELAY_IN_TIME_N": "d",
# When the Input Method is not DelayInPoint, it is skipped.
"DELAY_IN_POINTS_N": "d",
"CHANNEL_SKEW_N": "d",
"DC_OUTPUT_LEVEL_N": "d", # V
}
default_timeout = 180
def __init__(
self,
name: str,
address: str,
*,
num_channels: int = 4,
**kwargs: "Unpack[VisaInstrumentKWArgs]",
):
"""
Initializes the AWG5014.
Args:
name: name of the instrument
address: GPIB or ethernet address as used by VISA
num_channels: number of channels on the device
**kwargs: kwargs are forwarded to base class.
"""
super().__init__(name, address, **kwargs)
self._address = address
self.num_channels = num_channels
self._values: dict[str, dict[str, dict[str, np.ndarray | float | None]]] = {}
self._values["files"] = {}
self.add_function("reset", call_cmd="*RST")
self.state: Parameter = self.add_parameter("state", get_cmd=self.get_state)
"""Parameter state"""
self.run_mode: Parameter = self.add_parameter(
"run_mode",
get_cmd="AWGControl:RMODe?",
set_cmd="AWGControl:RMODe " + "{}",
vals=vals.Enum("CONT", "TRIG", "SEQ", "GAT"),
get_parser=self.newlinestripper,
)
"""Parameter run_mode"""
self.clock_source: Parameter = self.add_parameter(
"clock_source",
label="Clock source",
get_cmd="AWGControl:CLOCk:SOURce?",
set_cmd="AWGControl:CLOCk:SOURce " + "{}",
vals=vals.Enum("INT", "EXT"),
get_parser=self.newlinestripper,
)
"""Parameter clock_source"""
self.ref_source: Parameter = self.add_parameter(
"ref_source",
label="Reference source",
get_cmd="SOURce1:ROSCillator:SOURce?",
set_cmd="SOURce1:ROSCillator:SOURce " + "{}",
vals=vals.Enum("INT", "EXT"),
get_parser=self.newlinestripper,
)
"""Parameter ref_source"""
self.DC_output: Parameter = self.add_parameter(
"DC_output",
label="DC Output (ON/OFF)",
get_cmd="AWGControl:DC:STATe?",
set_cmd="AWGControl:DC:STATe {}",
vals=vals.Ints(0, 1),
get_parser=int,
)
"""Parameter DC_output"""
# sequence parameter(s)
self.sequence_length: Parameter = self.add_parameter(
"sequence_length",
label="Sequence length",
get_cmd="SEQuence:LENGth?",
set_cmd="SEQuence:LENGth " + "{}",
get_parser=int,
vals=vals.Ints(0, 8000),
docstring=(
"""
This command sets the sequence length.
Use this command to create an
uninitialized sequence. You can also
use the command to clear all sequence
elements in a single action by passing
0 as the parameter. However, this
action cannot be undone so exercise
necessary caution. Also note that
passing a value less than the
sequence's current length will cause
some sequence elements to be deleted at
the end of the sequence. For example if
self.get_sq_length returns 200 and you
subsequently set sequence_length to 21,
all sequence elements except the first
20 will be deleted.
"""
),
)
"""
This command sets the sequence length.
Use this command to create an
uninitialized sequence. You can also
use the command to clear all sequence
elements in a single action by passing
0 as the parameter. However, this
action cannot be undone so exercise
necessary caution. Also note that
passing a value less than the
sequence's current length will cause
some sequence elements to be deleted at
the end of the sequence. For example if
self.get_sq_length returns 200 and you
subsequently set sequence_length to 21,
all sequence elements except the first
20 will be deleted.
"""
self.sequence_pos: Parameter = self.add_parameter(
"sequence_pos",
label="Sequence position",
get_cmd="AWGControl:SEQuencer:POSition?",
set_cmd="SEQuence:JUMP:IMMediate {}",
vals=vals.PermissiveInts(1),
set_parser=lambda x: int(round(x)),
)
"""Parameter sequence_pos"""
# Trigger parameters #
# Warning: `trigger_mode` is the same as `run_mode`, do not use! exists
# solely for legacy purposes
self.trigger_mode: Parameter = self.add_parameter(
"trigger_mode",
get_cmd="AWGControl:RMODe?",
set_cmd="AWGControl:RMODe " + "{}",
vals=vals.Enum("CONT", "TRIG", "SEQ", "GAT"),
get_parser=self.newlinestripper,
)
"""Parameter trigger_mode"""
self.trigger_impedance: Parameter = self.add_parameter(
"trigger_impedance",
label="Trigger impedance",
unit="Ohm",
get_cmd="TRIGger:IMPedance?",
set_cmd="TRIGger:IMPedance " + "{}",
vals=vals.Enum(50, 1000),
get_parser=float,
)
"""Parameter trigger_impedance"""
self.trigger_level: Parameter = self.add_parameter(
"trigger_level",
unit="V",
label="Trigger level",
get_cmd="TRIGger:LEVel?",
set_cmd="TRIGger:LEVel " + "{:.3f}",
vals=vals.Numbers(-5, 5),
get_parser=float,
)
"""Parameter trigger_level"""
self.trigger_slope: Parameter = self.add_parameter(
"trigger_slope",
get_cmd="TRIGger:SLOPe?",
set_cmd="TRIGger:SLOPe " + "{}",
vals=vals.Enum("POS", "NEG"),
get_parser=self.newlinestripper,
)
"""Parameter trigger_slope"""
self.trigger_source: Parameter = self.add_parameter(
"trigger_source",
get_cmd="TRIGger:SOURce?",
set_cmd="TRIGger:SOURce " + "{}",
vals=vals.Enum("INT", "EXT"),
get_parser=self.newlinestripper,
)
"""Parameter trigger_source"""
# Event parameters
self.event_polarity: Parameter = self.add_parameter(
"event_polarity",
get_cmd="EVENt:POL?",
set_cmd="EVENt:POL " + "{}",
vals=vals.Enum("POS", "NEG"),
get_parser=self.newlinestripper,
)
"""Parameter event_polarity"""
self.event_impedance: Parameter = self.add_parameter(
"event_impedance",
label="Event impedance",
unit="Ohm",
get_cmd="EVENt:IMPedance?",
set_cmd="EVENt:IMPedance " + "{}",
vals=vals.Enum(50, 1000),
get_parser=float,
)
"""Parameter event_impedance"""
self.event_level: Parameter = self.add_parameter(
"event_level",
label="Event level",
unit="V",
get_cmd="EVENt:LEVel?",
set_cmd="EVENt:LEVel " + "{:.3f}",
vals=vals.Numbers(-5, 5),
get_parser=float,
)
"""Parameter event_level"""
self.event_jump_timing: Parameter = self.add_parameter(
"event_jump_timing",
get_cmd="EVENt:JTIMing?",
set_cmd="EVENt:JTIMing {}",
vals=vals.Enum("SYNC", "ASYNC"),
get_parser=self.newlinestripper,
)
"""Parameter event_jump_timing"""
self.clock_freq: Parameter = self.add_parameter(
"clock_freq",
label="Clock frequency",
unit="Hz",
get_cmd="SOURce:FREQuency?",
set_cmd="SOURce:FREQuency " + "{}",
vals=vals.Numbers(1e6, 1.2e9),
get_parser=float,
)
"""Parameter clock_freq"""
self.setup_filename: Parameter = self.add_parameter(
"setup_filename", get_cmd="AWGControl:SNAMe?"
)
"""Parameter setup_filename"""
# Channel parameters #
for i in range(1, self.num_channels + 1):
amp_cmd = f"SOURce{i}:VOLTage:LEVel:IMMediate:AMPLitude"
offset_cmd = f"SOURce{i}:VOLTage:LEVel:IMMediate:OFFS"
state_cmd = f"OUTPUT{i}:STATE"
waveform_cmd = f"SOURce{i}:WAVeform"
directoutput_cmd = f"AWGControl:DOUTput{i}:STATE"
filter_cmd = f"OUTPut{i}:FILTer:FREQuency"
add_input_cmd = f"SOURce{i}:COMBine:FEED"
dc_out_cmd = f"AWGControl:DC{i}:VOLTage:OFFSet"
# Set channel first to ensure sensible sorting of pars
self.add_parameter(
f"ch{i}_state",
label=f"Status channel {i}",
get_cmd=state_cmd + "?",
set_cmd=state_cmd + " {}",
vals=vals.Ints(0, 1),
get_parser=int,
)
self.add_parameter(
f"ch{i}_amp",
label=f"Amplitude channel {i}",
unit="Vpp",
get_cmd=amp_cmd + "?",
set_cmd=amp_cmd + " {:.6f}",
vals=vals.Numbers(0.02, 4.5),
get_parser=float,
)
self.add_parameter(
f"ch{i}_offset",
label=f"Offset channel {i}",
unit="V",
get_cmd=offset_cmd + "?",
set_cmd=offset_cmd + " {:.3f}",
vals=vals.Numbers(-2.25, 2.25),
get_parser=float,
)
self.add_parameter(
f"ch{i}_waveform",
label=f"Waveform channel {i}",
get_cmd=waveform_cmd + "?",
set_cmd=waveform_cmd + ' "{}"',
vals=vals.Strings(),
get_parser=parsestr,
)
self.add_parameter(
f"ch{i}_direct_output",
label=f"Direct output channel {i}",
get_cmd=directoutput_cmd + "?",
set_cmd=directoutput_cmd + " {}",
vals=vals.Ints(0, 1),
)
self.add_parameter(
f"ch{i}_add_input",
label="Add input channel {}",
get_cmd=add_input_cmd + "?",
set_cmd=add_input_cmd + " {}",
vals=vals.Enum('"ESIG"', '"ESIGnal"', '""'),
get_parser=self.newlinestripper,
)
self.add_parameter(
f"ch{i}_filter",
label=f"Low pass filter channel {i}",
unit="Hz",
get_cmd=filter_cmd + "?",
set_cmd=filter_cmd + " {}",
vals=vals.Enum(20e6, 100e6, float("inf"), "INF", "INFinity"),
get_parser=self._tek_outofrange_get_parser,
)
self.add_parameter(
f"ch{i}_DC_out",
label=f"DC output level channel {i}",
unit="V",
get_cmd=dc_out_cmd + "?",
set_cmd=dc_out_cmd + " {}",
vals=vals.Numbers(-3, 5),
get_parser=float,
)
# Marker channels
for j in range(1, 3):
m_del_cmd = f"SOURce{i}:MARKer{j}:DELay"
m_high_cmd = f"SOURce{i}:MARKer{j}:VOLTage:LEVel:IMMediate:HIGH"
m_low_cmd = f"SOURce{i}:MARKer{j}:VOLTage:LEVel:IMMediate:LOW"
self.add_parameter(
f"ch{i}_m{j}_del",
label=f"Channel {i} Marker {j} delay",
unit="ns",
get_cmd=m_del_cmd + "?",
set_cmd=m_del_cmd + " {:.3f}e-9",
vals=vals.Numbers(0, 1),
get_parser=float,
)
self.add_parameter(
f"ch{i}_m{j}_high",
label=f"Channel {i} Marker {j} high level",
unit="V",
get_cmd=m_high_cmd + "?",
set_cmd=m_high_cmd + " {:.3f}",
vals=vals.Numbers(-0.9, 2.7),
get_parser=float,
)
self.add_parameter(
f"ch{i}_m{j}_low",
label=f"Channel {i} Marker {j} low level",
unit="V",
get_cmd=m_low_cmd + "?",
set_cmd=m_low_cmd + " {:.3f}",
vals=vals.Numbers(-1.0, 2.6),
get_parser=float,
)
self.trigger_impedance.set(50)
if self.clock_freq.get() != 1e9:
log.info("AWG clock freq not set to 1GHz")
self.connect_message()
# Convenience parser
[docs]
def newlinestripper(self, string: str) -> str:
if string.endswith("\n"):
return string[:-1]
else:
return string
def _tek_outofrange_get_parser(self, string: str) -> float:
val = float(string)
# note that 9.9e37 is used as a generic out of range value
# in tektronix instruments
if val >= 9.9e37:
val = float("INF")
return val
# Functions
[docs]
def get_state(self) -> Literal["Idle", "Waiting for trigger", "Running"]:
"""
This query returns the run state of the arbitrary waveform
generator or the sequencer.
Returns:
Either 'Idle', 'Waiting for trigger', or 'Running'.
Raises:
ValueError: if none of the three states above apply.
"""
state = self.ask("AWGControl:RSTATe?")
if state.startswith("0"):
return "Idle"
elif state.startswith("1"):
return "Waiting for trigger"
elif state.startswith("2"):
return "Running"
else:
raise ValueError(f'{__name__} : AWG in undefined state "{state}"')
[docs]
def start(self) -> str:
"""Convenience function, identical to self.run()"""
return self.run()
[docs]
def run(self) -> str:
"""
This command initiates the output of a waveform or a sequence.
This is equivalent to pressing Run/Stop button on the front panel.
The instrument can be put in the run state only when output waveforms
are assigned to channels.
Returns:
The output of self.get_state()
"""
self.write("AWGControl:RUN")
return self.get_state()
[docs]
def stop(self) -> None:
"""This command stops the output of a waveform or a sequence."""
self.write("AWGControl:STOP")
[docs]
def force_trigger(self) -> None:
"""
This command generates a trigger event. This is equivalent to
pressing the Force Trigger button on front panel.
"""
self.write("*TRG")
[docs]
def get_folder_contents(self, print_contents: bool = True) -> str:
"""
This query returns the current contents and state of the mass storage
media (on the AWG Windows machine).
Args:
print_contents: If True, the folder name and the query
output are printed. Default: True.
Returns:
str: A comma-seperated string of the folder contents.
"""
contents = self.ask("MMEMory:CATalog?")
if print_contents:
print("Current folder:", self.get_current_folder_name())
print(
contents.replace(',"$', "\n$").replace('","', "\n").replace(",", "\t")
)
return contents
[docs]
def get_current_folder_name(self) -> str:
"""
This query returns the current directory of the file system on the
arbitrary waveform generator. The current directory for the
programmatic interface is different from the currently selected
directory in the Windows Explorer on the instrument.
Returns:
A string with the full path of the current folder.
"""
return self.ask("MMEMory:CDIRectory?")
[docs]
def set_current_folder_name(self, file_path: str) -> int:
"""
Set the current directory of the file system on the arbitrary
waveform generator. The current directory for the programmatic
interface is different from the currently selected directory in the
Windows Explorer on the instrument.
Args:
file_path: The full path.
Returns:
The number of bytes written to instrument
"""
writecmd = 'MMEMory:CDIRectory "{}"'
return self.visa_handle.write(writecmd.format(file_path))
[docs]
def change_folder(self, folder: str) -> int:
"""Duplicate of self.set_current_folder_name"""
writecmd = r'MMEMory:CDIRectory "{}"'
return self.visa_handle.write(writecmd.format(folder))
[docs]
def goto_root(self) -> None:
"""
Set the current directory of the file system on the arbitrary
waveform generator to C: (the 'root' location in Windows).
"""
self.write('MMEMory:CDIRectory "c:\\.."')
[docs]
def create_and_goto_dir(self, folder: str) -> str:
"""
Set the current directory of the file system on the arbitrary
waveform generator. Creates the directory if if doesn't exist.
Queries the resulting folder for its contents.
Args:
folder: The path of the directory to set as current.
Note: this function expects only root level directories.
Returns:
A comma-seperated string of the folder contents.
"""
dircheck = f"{folder}, DIR"
if dircheck in self.get_folder_contents():
self.change_folder(folder)
log.debug("Directory already exists")
log.warning(f"Directory already exists, changed path to {folder}")
content = self.ask("MMEMory:cat?")
log.info("Contents of folder is %s", content)
elif self.get_current_folder_name() == f'"\\{folder}"':
log.info(f"Directory already set to {folder}")
else:
self.write(f'MMEMory:MDIRectory "{folder}"')
self.write(f'MMEMory:CDIRectory "{folder}"')
return self.get_folder_contents()
[docs]
def all_channels_on(self) -> None:
"""
Set the state of all channels to be ON. Note: only channels with
defined waveforms can be ON.
"""
for i in range(1, self.num_channels + 1):
self.parameters[f"ch{i}_state"].set(1)
[docs]
def all_channels_off(self) -> None:
"""Set the state of all channels to be OFF."""
for i in range(1, self.num_channels + 1):
self.parameters[f"ch{i}_state"].set(0)
#####################
# Sequences section #
#####################
[docs]
def force_trigger_event(self) -> None:
"""
This command generates a trigger event. Equivalent to
self.force_trigger.
"""
self.write("TRIGger:IMMediate")
[docs]
def force_event(self) -> None:
"""
This command generates a forced event. This is used to generate the
event when the sequence is waiting for an event jump. This is
equivalent to pressing the Force Event button on the front panel of the
instrument.
"""
self.write("EVENt:IMMediate")
[docs]
def set_sqel_event_target_index(self, element_no: int, index: int) -> None:
"""
This command sets the target index for
the sequencer's event jump operation. Note that this will take
effect only when the event jump target type is set to
INDEX.
Args:
element_no: The sequence element number
index: The index to set the target to
"""
self.write(f"SEQuence:ELEMent{element_no}:JTARGet:INDex {index}")
[docs]
def set_sqel_goto_target_index(
self, element_no: int, goto_to_index_no: int
) -> None:
"""
This command sets the target index for the GOTO command of the
sequencer. After generating the waveform specified in a
sequence element, the sequencer jumps to the element specified
as GOTO target. This is an unconditional jump. If GOTO target
is not specified, the sequencer simply moves on to the next
element. If the Loop Count is Infinite, the GOTO target which
is specified in the element is not used. For this command to
work, the goto state of the squencer must be ON and the
sequence element must exist.
Note that the first element of a sequence is taken to be 1 not 0.
Args:
element_no: The sequence element number
goto_to_index_no: The target index number
"""
self.write(f"SEQuence:ELEMent{element_no}:GOTO:INDex {goto_to_index_no}")
[docs]
def set_sqel_goto_state(self, element_no: int, goto_state: int) -> None:
"""
This command sets the GOTO state of the sequencer for the specified
sequence element.
Args:
element_no: The sequence element number
goto_state: The GOTO state of the sequencer. Must be either
0 (OFF) or 1 (ON).
"""
allowed_states = [0, 1]
if goto_state not in allowed_states:
log.warning(
f"{goto_state} not recognized as a valid goto"
" state. Setting to 0 (OFF)."
)
goto_state = 0
self.write(f"SEQuence:ELEMent{element_no}:GOTO:STATe {int(goto_state)}")
[docs]
def set_sqel_loopcnt_to_inf(self, element_no: int, state: int = 1) -> None:
"""
This command sets the infinite looping state for a sequence
element. When an infinite loop is set on an element, the
sequencer continuously executes that element. To break the
infinite loop, issue self.stop()
Args:
element_no (int): The sequence element number
state (int): The infinite loop state. Must be either 0 (OFF) or
1 (ON).
"""
allowed_states = [0, 1]
if state not in allowed_states:
log.warning(
f"{state} not recognized as a valid loop state. Setting to 0 (OFF)."
)
state = 0
self.write(f"SEQuence:ELEMent{element_no}:LOOP:INFinite {int(state)}")
[docs]
def get_sqel_loopcnt(self, element_no: int = 1) -> str:
"""
This query returns the loop count (number of repetitions) of a
sequence element. Loop count setting for an element is ignored
if the infinite looping state is set to ON.
Args:
element_no: The sequence element number. Default: 1.
"""
return self.ask(f"SEQuence:ELEMent{element_no}:LOOP:COUNt?")
[docs]
def set_sqel_loopcnt(self, loopcount: int, element_no: int = 1) -> None:
"""
This command sets the loop count. Loop count setting for an
element is ignored if the infinite looping state is set to ON.
Args:
loopcount: The number of times the sequence is being output.
The maximal possible number is 65536, beyond that: infinity.
element_no: The sequence element number. Default: 1.
"""
self.write(f"SEQuence:ELEMent{element_no}:LOOP:COUNt {loopcount}")
[docs]
def set_sqel_trigger_wait(self, element_no: int, state: int = 1) -> str:
"""
This command sets the wait trigger state for an element. Send
a trigger signal in one of the following ways:
* By using an external trigger signal.
* By pressing the “Force Trigger” button on the front panel
* By using self.force_trigger or self.force_trigger_event
Args:
element_no: The sequence element number.
state: The wait trigger state. Must be either 0 (OFF)
or 1 (ON). Default: 1.
Returns:
The current state (after setting it).
"""
self.write(f"SEQuence:ELEMent{element_no}:TWAit {state}")
return self.get_sqel_trigger_wait(element_no)
[docs]
def get_sqel_trigger_wait(self, element_no: int) -> str:
"""
This query returns the wait trigger state for an element. Send
a trigger signal in one of the following ways:
* By using an external trigger signal.
* By pressing the “Force Trigger” button on the front panel
* By using self.force_trigger or self.force_trigger_event
Args:
element_no: The sequence element number.
Returns:
The current state. Example: '1'.
"""
return self.ask(f"SEQuence:ELEMent{element_no}:TWAit?")
[docs]
def set_sqel_event_jump_target_index(
self, element_no: int, jtar_index_no: int
) -> None:
"""Duplicate of set_sqel_event_target_index"""
self.write(f"SEQuence:ELEMent{element_no}:JTARget:INDex {jtar_index_no}")
[docs]
def set_sqel_event_jump_type(self, element_no: int, jtar_state: str) -> None:
"""
This command sets the event jump target type for the jump for
the specified sequence element. Generate an event in one of
the following ways:
* By connecting an external cable to instrument rear panel
for external event.
* By pressing the Force Event button on the
front panel.
* By using self.force_event
Args:
element_no: The sequence element number
jtar_state: The jump target type. Must be either 'INDEX',
'NEXT', or 'OFF'.
"""
self.write(f"SEQuence:ELEMent{element_no}:JTARget:TYPE {jtar_state}")
[docs]
def get_sq_mode(self) -> str:
"""
This query returns the type of the arbitrary waveform
generator's sequencer. The sequence is executed by the
hardware sequencer whenever possible.
Returns:
str: Either 'HARD' or 'SOFT' indicating that the instrument is in\
either hardware or software sequencer mode.
"""
return self.ask("AWGControl:SEQuence:TYPE?")
######################
# AWG file functions #
######################
def _pack_record(
self, name: str, value: float | str | Sequence[Any] | np.ndarray, dtype: str
) -> bytes:
"""
packs awg_file record into a struct in the folowing way:
struct.pack(fmtstring, namesize, datasize, name, data)
where fmtstring = '<IIs"dtype"'
The file record format is as follows:
Record Name Size: (32-bit unsigned integer)
Record Data Size: (32-bit unsigned integer)
Record Name: (ASCII) (Include NULL.)
Record Data
For details see "File and Record Format" in the AWG help
< denotes little-endian encoding, I and other dtypes are format
characters denoted in the documentation of the struct package
Args:
name: Name of the record (Example: 'MAGIC' or 'SAMPLING_RATE')
value: The value of that record.
dtype: String specifying the data type of the record.
Allowed values: 'h', 'd', 's'.
"""
if len(dtype) == 1:
record_data = struct.pack("<" + dtype, value)
elif dtype[-1] == "s":
assert isinstance(value, str)
record_data = value.encode("ASCII")
else:
assert isinstance(value, (abc.Sequence, np.ndarray))
if dtype[-1] == "H" and isinstance(value, np.ndarray):
# numpy conversion is fast
record_data = value.astype("<u2").tobytes()
else:
# argument unpacking is slow
record_data = struct.pack("<" + dtype, *value)
# the zero byte at the end the record name is the "(Include NULL.)"
record_name = name.encode("ASCII") + b"\x00"
record_name_size = len(record_name)
record_data_size = len(record_data)
size_struct = struct.pack("<II", record_name_size, record_data_size)
packed_record = size_struct + record_name + record_data
return packed_record
[docs]
def generate_sequence_cfg(self) -> dict[str, float]:
"""
This function is used to generate a config file, that is used when
generating sequence files, from existing settings in the awg.
Querying the AWG for these settings takes ~0.7 seconds
"""
log.info("Generating sequence_cfg")
AWG_sequence_cfg = {
"SAMPLING_RATE": self.clock_freq.get(),
"CLOCK_SOURCE": (
1 if self.clock_source().startswith("INT") else 2
), # Internal | External
"REFERENCE_SOURCE": (
1 if self.ref_source().startswith("INT") else 2
), # Internal | External
"EXTERNAL_REFERENCE_TYPE": 1, # Fixed | Variable
"REFERENCE_CLOCK_FREQUENCY_SELECTION": 1,
# 10 MHz | 20 MHz | 100 MHz
"TRIGGER_SOURCE": 1 if self.trigger_source.get().startswith("EXT") else 2,
# External | Internal
"TRIGGER_INPUT_IMPEDANCE": (
1 if self.trigger_impedance.get() == 50.0 else 2
), # 50 ohm | 1 kohm
"TRIGGER_INPUT_SLOPE": (
1 if self.trigger_slope.get().startswith("POS") else 2
), # Positive | Negative
"TRIGGER_INPUT_POLARITY": (
1 if self.ask("TRIGger:POLarity?").startswith("POS") else 2
), # Positive | Negative
"TRIGGER_INPUT_THRESHOLD": self.trigger_level.get(), # V
"EVENT_INPUT_IMPEDANCE": (
1 if self.event_impedance.get() == 50.0 else 2
), # 50 ohm | 1 kohm
"EVENT_INPUT_POLARITY": (
1 if self.event_polarity.get().startswith("POS") else 2
), # Positive | Negative
"EVENT_INPUT_THRESHOLD": self.event_level(), # V
"JUMP_TIMING": (
1 if self.event_jump_timing.get().startswith("SYNC") else 2
), # Sync | Async
"RUN_MODE": 4, # Continuous | Triggered | Gated | Sequence
"RUN_STATE": 0, # On | Off
}
return AWG_sequence_cfg
[docs]
def generate_channel_cfg(self) -> dict[str, float | None]:
"""
Function to query if the current channel settings that have
been changed from their default value and put them in a
dictionary that can easily be written into an awg file, so as
to prevent said awg file from falling back to default values.
(See :meth:`~make_awg_file` and :meth:`~AWG_FILE_FORMAT_CHANNEL`)
NOTE: This only works for settings changed via the corresponding
QCoDeS parameter.
Returns:
A dict with the current setting for each entry in
AWG_FILE_FORMAT_HEAD iff this entry applies to the
AWG5014 AND has been changed from its default value.
"""
log.info("Getting channel configurations.")
dirouts = [
self.ch1_direct_output.get_latest(),
self.ch2_direct_output.get_latest(),
self.ch3_direct_output.get_latest(),
self.ch4_direct_output.get_latest(),
]
# the return value of the parameter is different from what goes
# into the .awg file, so we translate it
filtertrans = {
20e6: 1,
100e6: 3,
9.9e37: 10,
"INF": 10,
"INFinity": 10,
float("inf"): 10,
None: None,
}
filters = [
filtertrans[self.ch1_filter.get_latest()],
filtertrans[self.ch2_filter.get_latest()],
filtertrans[self.ch3_filter.get_latest()],
filtertrans[self.ch4_filter.get_latest()],
]
amps = [
self.ch1_amp.get_latest(),
self.ch2_amp.get_latest(),
self.ch3_amp.get_latest(),
self.ch4_amp.get_latest(),
]
offsets = [
self.ch1_offset.get_latest(),
self.ch2_offset.get_latest(),
self.ch3_offset.get_latest(),
self.ch4_offset.get_latest(),
]
mrk1highs = [
self.ch1_m1_high.get_latest(),
self.ch2_m1_high.get_latest(),
self.ch3_m1_high.get_latest(),
self.ch4_m1_high.get_latest(),
]
mrk1lows = [
self.ch1_m1_low.get_latest(),
self.ch2_m1_low.get_latest(),
self.ch3_m1_low.get_latest(),
self.ch4_m1_low.get_latest(),
]
mrk2highs = [
self.ch1_m2_high.get_latest(),
self.ch2_m2_high.get_latest(),
self.ch3_m2_high.get_latest(),
self.ch4_m2_high.get_latest(),
]
mrk2lows = [
self.ch1_m2_low.get_latest(),
self.ch2_m2_low.get_latest(),
self.ch3_m2_low.get_latest(),
self.ch4_m2_low.get_latest(),
]
# the return value of the parameter is different from what goes
# into the .awg file, so we translate it
addinptrans = {'"ESIG"': 1, '""': 0, None: None}
addinputs = [
addinptrans[self.ch1_add_input.get_latest()],
addinptrans[self.ch2_add_input.get_latest()],
addinptrans[self.ch3_add_input.get_latest()],
addinptrans[self.ch4_add_input.get_latest()],
]
# the return value of the parameter is different from what goes
# into the .awg file, so we translate it
def mrkdeltrans(x: float | None) -> float | None:
if x is None:
return None
else:
return x * 1e-9
mrk1delays = [
mrkdeltrans(self.ch1_m1_del.get_latest()),
mrkdeltrans(self.ch2_m1_del.get_latest()),
mrkdeltrans(self.ch3_m1_del.get_latest()),
mrkdeltrans(self.ch4_m1_del.get_latest()),
]
mrk2delays = [
mrkdeltrans(self.ch1_m2_del.get_latest()),
mrkdeltrans(self.ch2_m2_del.get_latest()),
mrkdeltrans(self.ch3_m2_del.get_latest()),
mrkdeltrans(self.ch4_m2_del.get_latest()),
]
AWG_channel_cfg: dict[str, float | None] = {}
for chan in range(1, self.num_channels + 1):
if dirouts[chan - 1] is not None:
AWG_channel_cfg.update(
{f"ANALOG_DIRECT_OUTPUT_{chan}": int(dirouts[chan - 1])}
)
if filters[chan - 1] is not None:
AWG_channel_cfg.update({f"ANALOG_FILTER_{chan}": filters[chan - 1]})
if amps[chan - 1] is not None:
AWG_channel_cfg.update({f"ANALOG_AMPLITUDE_{chan}": amps[chan - 1]})
if offsets[chan - 1] is not None:
AWG_channel_cfg.update({f"ANALOG_OFFSET_{chan}": offsets[chan - 1]})
if mrk1highs[chan - 1] is not None:
AWG_channel_cfg.update({f"MARKER1_HIGH_{chan}": mrk1highs[chan - 1]})
if mrk1lows[chan - 1] is not None:
AWG_channel_cfg.update({f"MARKER1_LOW_{chan}": mrk1lows[chan - 1]})
if mrk2highs[chan - 1] is not None:
AWG_channel_cfg.update({f"MARKER2_HIGH_{chan}": mrk2highs[chan - 1]})
if mrk2lows[chan - 1] is not None:
AWG_channel_cfg.update({f"MARKER2_LOW_{chan}": mrk2lows[chan - 1]})
if mrk1delays[chan - 1] is not None:
AWG_channel_cfg.update({f"MARKER1_SKEW_{chan}": mrk1delays[chan - 1]})
if mrk2delays[chan - 1] is not None:
AWG_channel_cfg.update({f"MARKER2_SKEW_{chan}": mrk2delays[chan - 1]})
if addinputs[chan - 1] is not None:
AWG_channel_cfg.update({f"EXTERNAL_ADD_{chan}": addinputs[chan - 1]})
return AWG_channel_cfg
[docs]
@staticmethod
def parse_marker_channel_name(name: str) -> _MarkerDescriptor:
"""
returns from the channel index and marker index from a marker
descriptor string e.g. '1M1'->(1,1)
"""
res = re.match(r"^(?P<channel>\d+)M(?P<marker>\d+)$", name)
assert res is not None
return _MarkerDescriptor(
marker=int(res.group("marker")), channel=int(res.group("channel"))
)
def _generate_awg_file(
self,
packed_waveforms: dict[str, np.ndarray],
wfname_l: np.ndarray,
nrep: Sequence[int],
trig_wait: Sequence[int],
goto_state: Sequence[int],
jump_to: Sequence[int],
channel_cfg: dict[str, Any],
sequence_cfg: dict[str, float] | None = None,
preservechannelsettings: bool = False,
) -> bytes:
"""
This function generates an .awg-file for uploading to the AWG.
The .awg-file contains a waveform list, full sequencing information
and instrument configuration settings.
Args:
packed_waveforms: dictionary containing packed waveforms
with keys wfname_l
wfname_l: array of waveform names, e.g.
array([[segm1_ch1,segm2_ch1..], [segm1_ch2,segm2_ch2..],...])
nrep: list of len(segments) of integers specifying the
no. of repetions per sequence element.
Allowed values: 1 to 65536.
trig_wait: list of len(segments) of integers specifying the
trigger wait state of each sequence element.
Allowed values: 0 (OFF) or 1 (ON).
goto_state: list of len(segments) of integers specifying the
goto state of each sequence element. Allowed values: 0 to 65536
(0 means next)
jump_to: list of len(segments) of integers specifying
the logic jump state for each sequence element. Allowed values:
0 (OFF) or 1 (ON).
channel_cfg: dictionary of valid channel configuration
records. See self.AWG_FILE_FORMAT_CHANNEL for a complete
overview of valid configuration parameters.
preservechannelsettings: If True, the current channel
settings are queried from the instrument and added to
channel_cfg (does not overwrite). Default: False.
sequence_cfg: dictionary of valid head configuration records
(see self.AWG_FILE_FORMAT_HEAD)
When an awg file is uploaded these settings will be set
onto the AWG, any parameter not specified will be set to
its default value (even overwriting current settings)
for info on filestructure and valid record names, see AWG Help,
File and Record Format (Under 'Record Name List' in Help)
"""
if preservechannelsettings:
channel_settings = self.generate_channel_cfg()
for setting in channel_settings:
if setting not in channel_cfg:
channel_cfg.update({setting: channel_settings[setting]})
timetuple = tuple(np.array(localtime())[[0, 1, 8, 2, 3, 4, 5, 6, 7]])
# general settings
head_str = BytesIO()
bytes_to_write = self._pack_record("MAGIC", 5000, "h") + self._pack_record(
"VERSION", 1, "h"
)
head_str.write(bytes_to_write)
# head_str.write(string(bytes_to_write))
if sequence_cfg is None:
sequence_cfg = self.generate_sequence_cfg()
for k in list(sequence_cfg.keys()):
if k in self.AWG_FILE_FORMAT_HEAD:
head_str.write(
self._pack_record(k, sequence_cfg[k], self.AWG_FILE_FORMAT_HEAD[k])
)
else:
log.warning(f"AWG: {k} not recognized as valid AWG setting")
# channel settings
ch_record_str = BytesIO()
for k in list(channel_cfg.keys()):
ch_k = k[:-1] + "N"
if ch_k in self.AWG_FILE_FORMAT_CHANNEL:
pack = self._pack_record(
k, channel_cfg[k], self.AWG_FILE_FORMAT_CHANNEL[ch_k]
)
ch_record_str.write(pack)
else:
log.warning(f"AWG: {k} not recognized as valid AWG channel setting")
# waveforms
ii = 21
wf_record_str = BytesIO()
wlist = list(packed_waveforms.keys())
wlist.sort()
for wf in wlist:
wfdat = packed_waveforms[wf]
lenwfdat = len(wfdat)
wf_record_str.write(
self._pack_record(
f"WAVEFORM_NAME_{ii}", wf + "\x00", "{}s".format(len(wf + "\x00"))
)
+ self._pack_record(f"WAVEFORM_TYPE_{ii}", 1, "h")
+ self._pack_record(f"WAVEFORM_LENGTH_{ii}", lenwfdat, "l")
+ self._pack_record(f"WAVEFORM_TIMESTAMP_{ii}", timetuple[:-1], "8H")
+ self._pack_record(f"WAVEFORM_DATA_{ii}", wfdat, f"{lenwfdat}H")
)
ii += 1
# sequence
kk = 1
seq_record_str = BytesIO()
for segment in wfname_l.transpose():
seq_record_str.write(
self._pack_record(f"SEQUENCE_WAIT_{kk}", trig_wait[kk - 1], "h")
+ self._pack_record(f"SEQUENCE_LOOP_{kk}", int(nrep[kk - 1]), "l")
+ self._pack_record(f"SEQUENCE_JUMP_{kk}", jump_to[kk - 1], "h")
+ self._pack_record(f"SEQUENCE_GOTO_{kk}", goto_state[kk - 1], "h")
)
for wfname in segment:
if wfname is not None:
# TODO (WilliamHPNielsen): maybe infer ch automatically
# from the data size?
ch = wfname[-1]
seq_record_str.write(
self._pack_record(
"SEQUENCE_WAVEFORM_NAME_CH_" + ch + f"_{kk}",
wfname + "\x00",
"{}s".format(len(wfname + "\x00")),
)
)
kk += 1
awg_file = (
head_str.getvalue()
+ ch_record_str.getvalue()
+ wf_record_str.getvalue()
+ seq_record_str.getvalue()
)
return awg_file
[docs]
def send_awg_file(
self, filename: str, awg_file: bytes, verbose: bool = False
) -> None:
"""
Writes an .awg-file onto the disk of the AWG.
Overwrites existing files.
Args:
filename: The name that the file will get on
the AWG.
awg_file: A byte sequence containing the awg_file.
Usually the output of self.make_awg_file.
verbose: A boolean to allow/suppress printing of messages
about the status of the filw writing. Default: False.
"""
if verbose:
print(
"Writing to:",
self.ask("MMEMory:CDIRectory?").replace("\n", "\\ "),
filename,
)
# Header indicating the name and size of the file being send
name_str = f'MMEMory:DATA "{filename}",'.encode("ASCII")
size_str = ("#" + str(len(str(len(awg_file)))) + str(len(awg_file))).encode(
"ASCII"
)
mes = name_str + size_str + awg_file
self.visa_handle.write_raw(mes)
[docs]
def load_awg_file(self, filename: str) -> None:
"""
Loads an .awg-file from the disc of the AWG into the AWG memory.
This may overwrite all instrument settings, the waveform list, and the
sequence in the sequencer.
Args:
filename: The filename of the .awg-file to load.
"""
s = f'AWGControl:SREStore "{filename}"'
b = s.encode(encoding="ASCII")
log.debug(f"Loading awg file using {s}")
self.visa_handle.write_raw(b)
# we must update the appropriate parameter(s) for the sequence
self.sequence_length.set(self.sequence_length.get())
[docs]
def make_awg_file(
self,
waveforms: Sequence[Sequence[np.ndarray]] | Sequence[np.ndarray],
m1s: Sequence[Sequence[np.ndarray]] | Sequence[np.ndarray],
m2s: Sequence[Sequence[np.ndarray]] | Sequence[np.ndarray],
nreps: Sequence[int],
trig_waits: Sequence[int],
goto_states: Sequence[int],
jump_tos: Sequence[int],
channels: Sequence[int] | None = None,
preservechannelsettings: bool = True,
) -> bytes:
"""
Args:
waveforms: A list of the waveforms to be packed. The list
should be filled like so:
[[wfm1ch1, wfm2ch1, ...], [wfm1ch2, wfm2ch2], ...]
Each waveform should be a numpy array with values in the range
-1 to 1 (inclusive). If you do not wish to send waveforms to
channels 1 and 2, use the channels parameter.
m1s: A list of marker 1's. The list should be filled
like so:
[[elem1m1ch1, elem2m1ch1, ...], [elem1m1ch2, elem2m1ch2], ...]
Each marker should be a numpy array containing only 0's and 1's
m2s: A list of marker 2's. The list should be filled
like so:
[[elem1m2ch1, elem2m2ch1, ...], [elem1m2ch2, elem2m2ch2], ...]
Each marker should be a numpy array containing only 0's and 1's
nreps: List of integers specifying the no. of
repetitions per sequence element. Allowed values: 0 to
65536. O corresponds to Infinite repetitions.
trig_waits: List of len(segments) of integers specifying the
trigger wait state of each sequence element.
Allowed values: 0 (OFF) or 1 (ON).
goto_states: List of len(segments) of integers
specifying the goto state of each sequence
element. Allowed values: 0 to 65536 (0 means next)
jump_tos: List of len(segments) of integers specifying
the logic jump state for each sequence element. Allowed values:
0 (OFF) or 1 (ON).
channels (list): List of channels to send the waveforms to.
Example: [1, 3, 2]
preservechannelsettings (bool): If True, the current channel
settings are found from the parameter history and added to
the .awg file. Else, channel settings are not written in the
file and will be reset to factory default when the file is
loaded. Default: True.
"""
packed_wfs = {}
waveform_names = []
if not isinstance(waveforms[0], abc.Sequence):
waveforms_int: Sequence[Sequence[np.ndarray]] = [
cast(Sequence[np.ndarray], waveforms)
]
m1s_int: Sequence[Sequence[np.ndarray]] = [cast(Sequence[np.ndarray], m1s)]
m2s_int: Sequence[Sequence[np.ndarray]] = [cast(Sequence[np.ndarray], m2s)]
else:
waveforms_int = cast(Sequence[Sequence[np.ndarray]], waveforms)
m1s_int = cast(Sequence[Sequence[np.ndarray]], m1s)
m2s_int = cast(Sequence[Sequence[np.ndarray]], m2s)
for ii in range(len(waveforms_int)):
namelist = []
for jj in range(len(waveforms_int[ii])):
if channels is None:
thisname = f"wfm{jj + 1:03d}ch{ii + 1}"
else:
thisname = f"wfm{jj + 1:03d}ch{channels[ii]}"
namelist.append(thisname)
package = self._pack_waveform(
waveforms_int[ii][jj], m1s_int[ii][jj], m2s_int[ii][jj]
)
packed_wfs[thisname] = package
waveform_names.append(namelist)
wavenamearray = np.array(waveform_names, dtype="str")
channel_cfg: dict[str, Any] = {}
return self._generate_awg_file(
packed_wfs,
wavenamearray,
nreps,
trig_waits,
goto_states,
jump_tos,
channel_cfg,
preservechannelsettings=preservechannelsettings,
)
[docs]
def make_send_and_load_awg_file(
self,
waveforms: Sequence[Sequence[np.ndarray]],
m1s: Sequence[Sequence[np.ndarray]],
m2s: Sequence[Sequence[np.ndarray]],
nreps: Sequence[int],
trig_waits: Sequence[int],
goto_states: Sequence[int],
jump_tos: Sequence[int],
channels: Sequence[int] | None = None,
filename: str = "customawgfile.awg",
preservechannelsettings: bool = True,
) -> None:
"""
Makes an .awg-file, sends it to the AWG and loads it. The .awg-file
is uploaded to C:\\\\Users\\\\OEM\\\\Documents. The waveforms appear in
the user defined waveform list with names wfm001ch1, wfm002ch1, ...
Args:
waveforms: A list of the waveforms to upload. The list
should be filled like so:
[[wfm1ch1, wfm2ch1, ...], [wfm1ch2, wfm2ch2], ...]
Each waveform should be a numpy array with values in the range
-1 to 1 (inclusive). If you do not wish to send waveforms to
channels 1 and 2, use the channels parameter.
m1s: A list of marker 1's. The list should be filled
like so:
[[elem1m1ch1, elem2m1ch1, ...], [elem1m1ch2, elem2m1ch2], ...]
Each marker should be a numpy array containing only 0's and 1's
m2s: A list of marker 2's. The list should be filled
like so:
[[elem1m2ch1, elem2m2ch1, ...], [elem1m2ch2, elem2m2ch2], ...]
Each marker should be a numpy array containing only 0's and 1's
nreps: List of integers specifying the no. of
repetions per sequence element. Allowed values: 0 to
65536. 0 corresponds to Infinite repetions.
trig_waits: List of len(segments) of integers specifying the
trigger wait state of each sequence element.
Allowed values: 0 (OFF) or 1 (ON).
goto_states: List of len(segments) of integers
specifying the goto state of each sequence
element. Allowed values: 0 to 65536 (0 means next)
jump_tos: List of len(segments) of integers specifying
the logic jump state for each sequence element. Allowed values:
0 (OFF) or 1 (ON).
channels: List of channels to send the waveforms to.
Example: [1, 3, 2]
filename: The name of the .awg-file. Should end with the .awg
extension. Default: 'customawgfile.awg'
preservechannelsettings: If True, the current channel
settings are found from the parameter history and added to
the .awg file. Else, channel settings are reset to the factory
default values. Default: True.
"""
# waveform names and the dictionary of packed waveforms
awg_file = self.make_awg_file(
waveforms,
m1s,
m2s,
nreps,
trig_waits,
goto_states,
jump_tos,
channels=channels,
preservechannelsettings=preservechannelsettings,
)
# by default, an unusable directory is targeted on the AWG
self.visa_handle.write('MMEMory:CDIRectory "C:\\Users\\OEM\\Documents"')
self.send_awg_file(filename, awg_file)
currentdir = self.visa_handle.query("MMEMory:CDIRectory?")
currentdir = currentdir.replace('"', "")
currentdir = currentdir.replace("\n", "\\")
loadfrom = f"{currentdir}{filename}"
self.load_awg_file(loadfrom)
[docs]
def make_and_save_awg_file(
self,
waveforms: Sequence[Sequence[np.ndarray]],
m1s: Sequence[Sequence[np.ndarray]],
m2s: Sequence[Sequence[np.ndarray]],
nreps: Sequence[int],
trig_waits: Sequence[int],
goto_states: Sequence[int],
jump_tos: Sequence[int],
channels: Sequence[int] | None = None,
filename: str = "customawgfile.awg",
preservechannelsettings: bool = True,
) -> None:
"""
Makes an .awg-file and saves it locally.
Args:
waveforms: A list of the waveforms to upload. The list
should be filled like so:
[[wfm1ch1, wfm2ch1, ...], [wfm1ch2, wfm2ch2], ...]
Each waveform should be a numpy array with values in the range
-1 to 1 (inclusive). If you do not wish to send waveforms to
channels 1 and 2, use the channels parameter.
m1s: A list of marker 1's. The list should be filled
like so:
[[elem1m1ch1, elem2m1ch1, ...], [elem1m1ch2, elem2m1ch2], ...]
Each marker should be a numpy array containing only 0's and 1's
m2s: A list of marker 2's. The list should be filled
like so:
[[elem1m2ch1, elem2m2ch1, ...], [elem1m2ch2, elem2m2ch2], ...]
Each marker should be a numpy array containing only 0's and 1's
nreps: List of integers specifying the no. of
repetions per sequence element. Allowed values: 0 to
65536. O corresponds to Infinite repetions.
trig_waits: List of len(segments) of integers specifying the
trigger wait state of each sequence element.
Allowed values: 0 (OFF) or 1 (ON).
goto_states: List of len(segments) of integers
specifying the goto state of each sequence
element. Allowed values: 0 to 65536 (0 means next)
jump_tos: List of len(segments) of integers specifying
the logic jump state for each sequence element. Allowed values:
0 (OFF) or 1 (ON).
channels: List of channels to send the waveforms to.
Example: [1, 3, 2]
preservechannelsettings: If True, the current channel
settings are found from the parameter history and added to
the .awg file. Else, channel settings are not written in the
file and will be reset to factory default when the file is
loaded. Default: True.
filename: The full path of the .awg-file. Should end with the
.awg extension. Default: 'customawgfile.awg'
"""
awg_file = self.make_awg_file(
waveforms,
m1s,
m2s,
nreps,
trig_waits,
goto_states,
jump_tos,
channels=channels,
preservechannelsettings=preservechannelsettings,
)
with open(filename, "wb") as fid:
fid.write(awg_file)
[docs]
def get_error(self) -> str:
"""
This function retrieves and returns data from the error and
event queues.
Returns:
String containing the error/event number, the error/event
description.
"""
return self.ask("SYSTEM:ERRor:NEXT?")
def _pack_waveform(
self, wf: np.ndarray, m1: np.ndarray, m2: np.ndarray
) -> np.ndarray:
"""
Converts/packs a waveform and two markers into a 16-bit format
according to the AWG Integer format specification.
The waveform occupies 14 bits and the markers one bit each.
See Table 2-25 in the Programmer's manual for more information
Since markers can only be in one of two states, the marker input
arrays should consist only of 0's and 1's.
Args:
wf: A numpy array containing the waveform. The
data type of wf is unimportant.
m1: A numpy array containing the first marker.
m2: A numpy array containing the second marker.
Returns:
An array of unsigned 16 bit integers.
Raises:
Exception: if the lengths of w, m1, and m2 don't match
TypeError: if the waveform contains values outside (-1, 1)
TypeError: if the markers contain values that are not 0 or 1
"""
# Input validation
if not ((len(wf) == len(m1)) and (len(m1) == len(m2))):
raise Exception("error: sizes of the waveforms do not match")
if np.min(wf) < -1 or np.max(wf) > 1:
raise TypeError(
"Waveform values out of bonds. Allowed values: -1 to 1 (inclusive)"
)
if not np.all(np.isin(m1, np.array([0, 1]))):
raise TypeError(
"Marker 1 contains invalid values. Only 0 and 1 are allowed"
)
if not np.all(np.isin(m2, np.array([0, 1]))):
raise TypeError(
"Marker 2 contains invalid values. Only 0 and 1 are allowed"
)
# Note: we use np.trunc here rather than np.round
# as it is an order of magnitude faster
packed_wf = np.trunc(16384 * m1 + 32768 * m2 + wf * 8191 + 8191.5).astype(
np.uint16
)
if len(np.where(packed_wf == -1)[0]) > 0:
print(np.where(packed_wf == -1))
return packed_wf
###########################
# Waveform file functions #
###########################
def _file_dict(
self, wf: np.ndarray, m1: np.ndarray, m2: np.ndarray, clock: float | None
) -> dict[str, np.ndarray | float | None]:
"""
Make a file dictionary as used by self.send_waveform_to_list
Args:
wf: A numpy array containing the waveform. The
data type of wf is unimportant.
m1: A numpy array containing the first marker.
m2: A numpy array containing the second marker.
clock: The desired clock frequency
Returns:
dict: A dictionary with keys 'w', 'm1', 'm2', 'clock_freq', and
'numpoints' and corresponding values.
"""
outdict = {
"w": wf,
"m1": m1,
"m2": m2,
"clock_freq": clock,
"numpoints": len(wf),
}
return outdict
[docs]
def get_filenames(self) -> str:
"""Duplicate of self.get_folder_contents"""
return self.ask("MMEMory:CATalog?")
[docs]
def send_DC_pulse(
self, DC_channel_number: int, set_level: float, length: float
) -> None:
"""
Sets the DC level on the specified channel, waits a while and then
resets it to what it was before.
Note: Make sure that the output DC state is ON.
Args:
DC_channel_number (int): The channel number (1-4).
set_level (float): The voltage level to set to (V).
length (float): The time to wait before resetting (s).
"""
DC_channel_number -= 1
chandcs = [self.ch1_DC_out, self.ch2_DC_out, self.ch3_DC_out, self.ch4_DC_out]
restore = chandcs[DC_channel_number].get()
chandcs[DC_channel_number].set(set_level)
sleep(length)
chandcs[DC_channel_number].set(restore)
[docs]
def is_awg_ready(self) -> bool:
"""
Assert if the AWG is ready.
Returns:
True, irrespective of anything.
"""
try:
self.ask("*OPC?")
# makes the awg read again if there is a timeout
except Exception as e:
log.warning(e)
log.warning("AWG is not ready")
self.visa_handle.read()
return True
[docs]
def clear_message_queue(self, verbose: bool = False) -> None:
"""
Function to clear up (flush) the VISA message queue of the AWG
instrument. Reads all messages in the queue.
Args:
verbose: If True, the read messages are printed.
Default: False.
"""
original_timeout = self.visa_handle.timeout
self.visa_handle.timeout = 1000 # 1 second as VISA counts in ms
gotexception = False
while not gotexception:
try:
message = self.visa_handle.read()
if verbose:
print(message)
except VisaIOError:
gotexception = True
self.visa_handle.timeout = original_timeout
class Tektronix_AWG5014(TektronixAWG5014):
"""
Alias with non-conformant name left for backwards compatibility
"""
pass