"""Ethernet instrument driver class based on sockets."""from__future__importannotationsimportloggingimportsocketfromtypingimportTYPE_CHECKING,Anyfrom.instrumentimportInstrumentifTYPE_CHECKING:fromcollections.abcimportSequencefromtypesimportTracebackTypefromtyping_extensionsimportUnpackfrom.instrument_baseimportInstrumentBaseKWArgslog=logging.getLogger(__name__)
[docs]classIPInstrument(Instrument):r""" Bare socket ethernet instrument implementation. Use of `VisaInstrument` is promoted instead of this. Args: name: What this instrument is called locally. address: The IP address or name. If not given on construction, must be provided before any communication. port: The IP port. If not given on construction, must be provided before any communication. timeout: Seconds to allow for responses. Default 5. terminator: Character(s) to terminate each send. Default '\n'. persistent: Whether to leave the socket open between calls. Default True. write_confirmation: Whether the instrument acknowledges writes with some response we should read. Default True. **kwargs: Forwarded to the base class. See help for ``qcodes.Instrument`` for additional information on writing instrument subclasses. """def__init__(self,name:str,address:str|None=None,port:int|None=None,timeout:float=5,terminator:str="\n",persistent:bool=True,write_confirmation:bool=True,**kwargs:Unpack[InstrumentBaseKWArgs],):super().__init__(name,**kwargs)self._address=addressself._port=portself._timeout=timeoutself._terminator=terminatorself._confirmation=write_confirmationself._ensure_connection=EnsureConnection(self)self._buffer_size=1400self._socket:socket.socket|None=Noneself.set_persistent(persistent)
[docs]defset_address(self,address:str|None=None,port:int|None=None)->None:""" Change the IP address and/or port of this instrument. Args: address: The IP address or name. port: The IP port. """ifaddressisnotNone:self._address=addresselifnothasattr(self,"_address"):raiseTypeError("This instrument doesn't have an address yet, you must provide one.")ifportisnotNone:self._port=portelifnothasattr(self,"_port"):raiseTypeError("This instrument doesn't have a port yet, you must provide one.")self._disconnect()self.set_persistent(self._persistent)
[docs]defset_persistent(self,persistent:bool)->None:""" Change whether this instrument keeps its socket open between calls. Args: persistent: Set True to keep the socket open all the time. """self._persistent=persistentifpersistent:self._connect()else:self._disconnect()
def_connect(self)->None:ifself._socketisnotNone:self._disconnect()try:log.info("Opening socket")self._socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)log.info(f"Connecting socket to {self._address}:{self._port}")self._socket.connect((self._address,self._port))self.set_timeout(self._timeout)exceptConnectionRefusedError:log.warning("Socket connection failed")ifself._socketisnotNone:self._socket.close()self._socket=Noneraisedef_disconnect(self)->None:ifself._socketisNone:returnlog.info("Socket shutdown")self._socket.shutdown(socket.SHUT_RDWR)log.info("Socket closing")self._socket.close()log.info("Socket closed")self._socket=None
[docs]defset_timeout(self,timeout:float)->None:""" Change the read timeout for the socket. Args: timeout: Seconds to allow for responses. """self._timeout=timeoutifself._socketisnotNone:self._socket.settimeout(float(self._timeout))
[docs]defset_terminator(self,terminator:str)->None:r""" Change the write terminator to use. Args: terminator: Character(s) to terminate each send. Default '\n'. """self._terminator=terminator
def_send(self,cmd:str)->None:ifself._socketisNone:raiseRuntimeError(f"IPInstrument {self.name} is not connected")data=cmd+self._terminatorlog.debug(f"Writing {data} to instrument {self.name}")self._socket.sendall(data.encode())def_recv(self)->str:ifself._socketisNone:raiseRuntimeError(f"IPInstrument {self.name} is not connected")result=self._socket.recv(self._buffer_size)log.debug(f"Got {result!r} from instrument {self.name}")ifresult==b"":log.warning("Got empty response from Socket recv() Connection broken.")returnresult.decode()
[docs]defclose(self)->None:"""Disconnect and irreversibly tear down the instrument."""self._disconnect()super().close()
[docs]defwrite_raw(self,cmd:str)->None:""" Low-level interface to send a command that gets no response. Args: cmd: The command to send to the instrument. """withself._ensure_connection:self._send(cmd)ifself._confirmation:self._recv()
[docs]defask_raw(self,cmd:str)->str:""" Low-level interface to send a command an read a response. Args: cmd: The command to send to the instrument. Returns: The instrument's string response. """withself._ensure_connection:self._send(cmd)returnself._recv()
[docs]defsnapshot_base(self,update:bool|None=False,params_to_skip_update:Sequence[str]|None=None,)->dict[Any,Any]:""" State of the instrument as a JSON-compatible dict (everything that the custom JSON encoder class :class:`.NumpyJSONEncoder` supports). Args: update: If True, update the state by querying the instrument. If None only update if the state is known to be invalid. If False, just use the latest values in memory and never update. params_to_skip_update: List of parameter names that will be skipped in update even if update is True. This is useful if you have parameters that are slow to update but can be updated in a different way (as in the qdac). If you want to skip the update of certain parameters in all snapshots, use the `snapshot_get` attribute of those parameters: instead. Returns: dict: base snapshot """snap=super().snapshot_base(update=update,params_to_skip_update=params_to_skip_update)snap["port"]=self._portsnap["confirmation"]=self._confirmationsnap["address"]=self._addresssnap["terminator"]=self._terminatorsnap["timeout"]=self._timeoutsnap["persistent"]=self._persistentreturnsnap
classEnsureConnection:""" Context manager to ensure an instrument is connected when needed. Uses ``instrument._persistent`` to determine whether or not to close the connection immediately on completion. Args: instrument: the instance to connect. """def__init__(self,instrument:IPInstrument):self.instrument=instrumentdef__enter__(self)->None:"""Make sure we connect when entering the context."""ifnotself.instrument._persistentorself.instrument._socketisNone:self.instrument._connect()def__exit__(self,exc_type:type[BaseException]|None,exc_value:BaseException|None,traceback:TracebackType|None,)->None:"""Possibly disconnect on exiting the context."""ifnotself.instrument._persistent:self.instrument._disconnect()