Source code for qcodes.instrument_drivers.Minicircuits.USBHIDMixin
"""A mixin module for USB Human Interface Device instruments"""importosimportstructimporttimefromtypingimportTYPE_CHECKINGfromtyping_extensionsimportdeprecatedfromqcodes.utilsimportQCoDeSDeprecationWarningtry:frompywinusbimporthid# pyright: ignore[reportMissingModuleSource]imported_hid=TrueexceptImportError:# We will raise a proper error when we attempt to instantiate a driver.# Raising an exception here will cause CI to fail under Linuximported_hid=Falsefromqcodes.instrument.baseimportInstrumentifTYPE_CHECKING:fromtyping_extensionsimportUnpackfromqcodes.instrumentimportInstrumentBaseKWArgs@deprecated("USBHIDMixin is deprecated. This is unused in QCoDeS",category=QCoDeSDeprecationWarning,)classUSBHIDMixin(Instrument):# The following class attributes should be set by subclassesvendor_id=0x0000product_id=0x0000@staticmethoddef_check_hid_import()->None:ifos.name!="nt":raiseImportError("This driver only works on Windows.")ifimported_hidisFalse:raiseImportError("pywinusb is not installed. Please install it by typing ""'pip install pywinusb' in a qcodes environment terminal")def__init__(self,name:str,instance_id:str|None=None,timeout:float=2,**kwargs:"Unpack[InstrumentBaseKWArgs]",):""" Args: name: Name of the instrument. instance_id: The id of the instrument we want to connect to. If there is only one instrument, then this argument is optional. If more than one instrument happen to be connected, use `enumerate_devices` method to query their IDs timeout: Specify a timeout for this instrument in seconds **kwargs: Forwarded to base class. """self._check_hid_import()devs=hid.HidDeviceFilter(# pyright: ignore[reportPossiblyUnboundVariable]product_id=self.product_id,vendor_id=self.vendor_id,instance_id=instance_id,).get_devices()iflen(devs)==0:raiseRuntimeError("No instruments found!")eliflen(devs)>1:raiseRuntimeError("Multiple HID devices detected! Please supply a instance id")self._device=devs[0]self._device.open()self._data_buffer:bytes|None=Noneself._device.set_raw_data_handler(self._handler)self._timeout=timeoutself._tries_per_second=5super().__init__(name,**kwargs)def_handler(self,data:bytes)->None:self._data_buffer=datadef_get_data_buffer(self)->bytes|None:data=self._data_bufferself._data_buffer=Nonereturndatadef_pack_string(self,cmd:str)->bytes:raiseNotImplementedError("Please subclass")def_unpack_string(self,response:bytes)->str:raiseNotImplementedError("Please subclass")defwrite_raw(self,cmd:str)->None:""" Send a string command to the human interface device The given command is processed by `_pack_string` method to return a byte sequence that is going to be actually sent to the device. Subclasses must implement `_pack_string` method. Args: cmd: a command to send in a form of a string """data=self._pack_string(cmd)result=self._device.send_output_report(data)ifnotresult:raiseRuntimeError(f"Communication with device failed for command {cmd}")defask_raw(self,cmd:str)->str:""" Send a string command to the human interface device and wait for a reply The given command is processed by `_pack_string` method to return a byte sequence that is going to be actually sent to the device. Subclasses must implement `_pack_string` method. The byte sequence of the reply is processed by `_unpack_string` method, and the resulting string is returned. Subclasses must implement `_unpack_string` method. Args: cmd: a command to send in a form of a string """self.write_raw(cmd)number_of_tries=int(self._tries_per_second*self._timeout)response=Nonefor_inrange(number_of_tries):time.sleep(1/self._tries_per_second)response=self._get_data_buffer()ifresponseisnotNone:breakifresponseisNone:raiseTimeoutError(f"Timed out for command {cmd}")returnself._unpack_string(response)defclose(self)->None:self._device.close()@classmethoddefenumerate_devices(cls)->list[str]:""" This method returns the 'instance_id's of all connected devices for with the given product and vendor IDs. """cls._check_hid_import()devs=hid.HidDeviceFilter(# pyright: ignore[reportPossiblyUnboundVariable]porduct_id=cls.product_id,vendor_id=cls.vendor_id).get_devices()return[dev.instance_idfordevindevs]
[docs]classMiniCircuitsHIDMixin(Instrument):# The following class attributes should be set by subclassesvendor_id=0x0000product_id=0x0000@staticmethoddef_check_hid_import()->None:ifos.name!="nt":raiseImportError("This driver only works on Windows.")ifimported_hidisFalse:raiseImportError("pywinusb is not installed. Please install it by typing ""'pip install pywinusb' in a qcodes environment terminal")def__init__(self,name:str,instance_id:str|None=None,timeout:float=2,**kwargs:"Unpack[InstrumentBaseKWArgs]",):""" The specific implementation for mini circuit human interface devices. This implementation allows to use `write`/`ask` methods of the instrument instance to send SCPI commands to MiniCircuits instruments over USB HID connection. Args: name: instrument name instance_id: The id of the instrument we want to connect. If there is only one instrument then this is an optional argument. If we have more then one instrument, use the class method `enumerate_devices` to query their IDs timeout: Specify a timeout for this instrument in seconds **kwargs: Forwarded to base class. """self._check_hid_import()# USB interrupt code for sending SCPI commandsself._sending_scpi_cmds_code=1self._usb_endpoint=0self._end_of_message=b"\x00"self.packet_size=64devs=hid.HidDeviceFilter(# pyright: ignore[reportPossiblyUnboundVariable]product_id=self.product_id,vendor_id=self.vendor_id,instance_id=instance_id,).get_devices()iflen(devs)==0:raiseRuntimeError("No instruments found!")eliflen(devs)>1:raiseRuntimeError("Multiple HID devices detected! Please supply a instance id")self._device=devs[0]self._device.open()self._data_buffer:bytes|None=Noneself._device.set_raw_data_handler(self._handler)self._timeout=timeoutself._tries_per_second=5super().__init__(name,**kwargs)def_handler(self,data:bytes)->None:self._data_buffer=datadef_get_data_buffer(self)->bytes|None:data=self._data_bufferself._data_buffer=Nonereturndata
[docs]defwrite_raw(self,cmd:str)->None:""" Send a string command to the human interface device The given command is processed by `_pack_string` method to return a byte sequence that is going to be actually sent to the device. Subclasses must implement `_pack_string` method. Args: cmd: a command to send in a form of a string """data=self._pack_string(cmd)result=self._device.send_output_report(data)ifnotresult:raiseRuntimeError(f"Communication with device failed for command {cmd}")
[docs]defask_raw(self,cmd:str)->str:""" Send a string command to the human interface device and wait for a reply The given command is processed by `_pack_string` method to return a byte sequence that is going to be actually sent to the device. Subclasses must implement `_pack_string` method. The byte sequence of the reply is processed by `_unpack_string` method, and the resulting string is returned. Subclasses must implement `_unpack_string` method. Args: cmd: a command to send in a form of a string """self.write_raw(cmd)number_of_tries=int(self._tries_per_second*self._timeout)response=Nonefor_inrange(number_of_tries):time.sleep(1/self._tries_per_second)response=self._get_data_buffer()ifresponseisnotNone:breakifresponseisNone:raiseTimeoutError(f"Timed out for command {cmd}")returnself._unpack_string(response)
[docs]@classmethoddefenumerate_devices(cls)->list[str]:""" This method returns the 'instance_id's of all connected devices for with the given product and vendor IDs. """cls._check_hid_import()devs=hid.HidDeviceFilter(# pyright: ignore[reportPossiblyUnboundVariable]porduct_id=cls.product_id,vendor_id=cls.vendor_id).get_devices()return[dev.instance_idfordevindevs]
def_pack_string(self,cmd:str)->bytes:""" Pack a string to a binary format such that it can be sent to the HID. Args: cmd: a SCPI command to send """str_len=len(cmd)# "-1" is here because we need to compensate for the first byte in# the packet which is always the usb interrupt code of the command# (in this case the command tell the device that we are querying a# SCPI command)pad_len=self.packet_size-str_len-1ifpad_len<0:raiseValueError(f"Length of data exceeds {self.packet_size} B")packed_data=struct.pack(f"BB{str_len}s{pad_len}x",self._usb_endpoint,self._sending_scpi_cmds_code,cmd.encode("ascii"),)returnpacked_datadef_unpack_string(self,response:bytes)->str:""" Unpack data received from the instrument into a string Note that this method is not specific to SCPI-only responses. Args: response: a raw byte sequence response from the instrument """_,_,reply_data=struct.unpack(f"BB{self.packet_size-1}s",bytes(response))span=reply_data.find(self._end_of_message)returnreply_data[:span].decode("ascii")