rfgen.py
This document describes an app for controlling the simple RF generator reference design. When executed, the script displays a GUI window which allows the generator reference design to be controlled via it's USB connection.
1 The Script
Generate the script using C-c C-v t
.
2 The main
Function
The asyncio
framework is used to coordinate communications over the
serial device without blocking user interface events. For this
purpose we make use of QEventLoop
and QThreadExecutor
from the
qasync package.
The event loop is equipped with an instance of QThreadExecutor
which
executes a single thread. The idea here is to ensure that requests to
the ECApp serial device will execute one by one in a manner which will
not block the operation of the Qt event loop.
def main(): global server_thread <<process-cmdline-args>> # This ensures that Cntl-C will work as expected: signal.signal(signal.SIGINT, signal.SIG_DFL) if args.nogui is True: app = QCoreApplication(sys.argv) else: app = QApplication(sys.argv) loop = QEventLoop(app) loop.set_default_executor(QThreadExecutor(1)) asyncio.set_event_loop(loop) rfgen_app = RFGenApp( serial_device = args.device, baudrate = args.baudrate, no_cal_data = args.no_cal_data, headless = args.nogui) if not args.nogui: rfgen_app.show() server_thread = QThread() server = RPyCServer(RFGenService(rfgen_app), args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() with loop: loop.set_debug(True) sys.exit(loop.run_forever()) #sys.exit(app.exec_())
2.1 process-cmdline-args
Command line arguments also allow the specification of a serial device and baud rate for the purposes of testing.
defaultBaud = 0 parser = ArgumentParser(description= '''A simple RF signal generator.''') parser.add_argument("--nogui", action='store_true', help="Disable GUI and run 'headless'") parser.add_argument("--no_cal_data", action='store_true', help="No output power calibration data should be used") parser.add_argument("-d", "--device", default=None, help="The serial device for testing") parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int, help="Baud rate (default: {})".format(defaultBaud)) parser.add_argument("-A", "--ipaddr", default="127.0.0.1", help="IP address for to bind the RPyC server instance") parser.add_argument("-P", "--port", default=18864, type=int, help="TCP port for the RPyC server instance") args = parser.parse_args()
3 app-class
class RFGenApp(QObject): <<hardware-config>> def __init__(self, config: Dict = DEFAULT_APP_CONFIG, serial_device: Optional[str] = None, baudrate: int = 0, no_cal_data: bool = False, headless: bool = False) -> None: super().__init__() self._nogui: bool = headless if self._nogui is False: self._widget = QWidget() self._ctl_device: str = serial_device self._baudrate: int = baudrate if baudrate == 0: self._baudrate = DEFAULT_BAUDRATE self._synths: List[hmc833] = [ hmc833(**hw_conf) for hw_conf in RFGenApp.RF_SYNTHS_HWCONF ] self._stepattens: List[pe43711] = [ pe43711(**hw_conf) for hw_conf in RFGenApp.STEPATTENS_HWCONF ] self._detectors: List[LogDetector] = [ LogDetector(**hw_conf) for hw_conf in RFGenApp.DETECTORS_HWCONF ] d: Dict = config['channels'] z = zip(d.keys(), self._synths, d.values()) self._synth_controllers = { ctl_id: HMC833Controller(ctl_id, synth, ctl_config) for ctl_id, synth, ctl_config in z } z = zip(d.keys(), self._stepattens) self._atten_controllers = { ctl_id: PE43711Controller(ctl_id, atten) for ctl_id, atten in z } z = zip(d.keys(), self._detectors) self._det_controllers = { ctl_id: PwrDetectorController(ctl_id, det) for ctl_id, det in z } if no_cal_data is True: self._channels: Dict = { chan_id: PLOChan(self, chan_id, self._synth_controllers[chan_id], self._atten_controllers[chan_id], self._det_controllers[chan_id]) for chan_id in d.keys()} else: self._channels: Dict = { chan_id: PLOChan(self, chan_id, self._synth_controllers[chan_id], self._atten_controllers[chan_id], self._det_controllers[chan_id], channel_cal_data[chan_id], max_linear_gains) for chan_id in d.keys()} if not headless: self.build_ui() <<build-app-ui>> <<initialize>> <<app-configuration>> @property def channels(self): return self._channels @property def ctl_device(self) -> str: return self._ctl_device @property def baudrate(self) -> int: return self._baudrate @property def serial_ports(self) -> List: ports = [DEFAULT_SOCKET_URL] ports.extend([port.device for port in comports() if port.serial_number]) if self.ctl_device: if self.ctl_device in ports: ports.pop(ports.index(self.ctl_device)) ports.insert(0, self.ctl_device) return ports def tty_changed(self, combo: QComboBox, idx: int) -> None: self._ctl_device = combo.itemText(idx) def disable_rfgen(self) -> None: if not self._nogui: for chan in self._channels.values(): chan.enabled = False def enable_rfgen(self) -> None: if not self._nogui: for chan in self._channels.values(): chan.enabled = True
3.1 hardware-config
RFCH1_NAME: str = 'Chan 1' RFCH2_NAME: str = 'Chan 2' HMC833_CH1_HWCONF: Dict = { 'sen': 'D1', 'ld_sdo': 'C4'} HMC833_CH2_HWCONF: Dict = { 'sen': 'D6', 'ld_sdo': 'D7'} STEPATTEN_CH1_HWCONF: Dict = { 'le': 'D0'} STEPATTEN_CH2_HWCONF: Dict = { 'le': 'B0'} DETECTOR_CH1_HWCONF: Dict = { 'cs': 'D5'} DETECTOR_CH2_HWCONF: Dict = { 'cs': 'D4'} RF_SYNTHS_HWCONF: List = [ HMC833_CH1_HWCONF, HMC833_CH2_HWCONF ] STEPATTENS_HWCONF: List = [ STEPATTEN_CH1_HWCONF, STEPATTEN_CH2_HWCONF ] DETECTORS_HWCONF: List = [ DETECTOR_CH1_HWCONF, DETECTOR_CH2_HWCONF ] DEFAULT_PLO_FREQ: float = 1000.0 DEFAULT_REF_FREQ: float = 50.0 DEFAULT_SYNTH_CONFIG: Dict = { 'freq': DEFAULT_PLO_FREQ, 'ref_freq': DEFAULT_REF_FREQ, 'refsrc': hmc833.DEFAULT_REFSRC, 'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB, 'vco_mute': False } DEFAULT_APP_CONFIG: Dict = { 'channels': { RFCH1_NAME: { 'label': RFCH1_NAME, **DEFAULT_SYNTH_CONFIG }, RFCH2_NAME: { 'label': RFCH2_NAME, **DEFAULT_SYNTH_CONFIG } } }
3.2 initialize
@asyncSlot() async def initialize(self): try: self.initialize_ui() loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.initialize_hw) for chan in self._channels.values(): chan.configure() sleep(1.0) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se)) finally: self.enable_rfgen() def initialize_ui(self) -> None: for chan in self._channels.values(): chan.initialize_ui() def initialize_hw(self) -> None: with create_serial(self.ctl_device, self.baudrate) as ser: sleep(0.1) for chan in self._channels.values(): chan.initialize_hw(ser) sleep(0.1)
3.3 app-configuration
def save_config(self) -> None: config = self.dump_config() filter_str = "JSON Configuration (*.json)" def_filepath = '.' name, selfilter = QFileDialog.getSaveFileName( self, 'Save configuration', def_filepath, filter_str) output_file = str(name).strip() if len(output_file) == 0: return with open(output_file, 'w') as fd: json.dump(config, fd) def open_config(self) -> None: filter_str = "JSON Configuration (*.json)" def_filepath = '.' names, selfilter = QFileDialog.getOpenFileNames( self, 'Load Configuration', def_filepath, filter_str) if len(names): input_file = names[0] with open(input_file, 'r') as fd: config = json.load(fd) self.load_config(config) def dump_config(self) -> Dict: chan_config = {} for chan_id, chan in self._channels.items(): chan_config[chan_id] = { 'label': chan_id, **chan.dump_config()} atten_config = {} for chan_id, atten in self._chan_attenuators.items(): atten_config[chan_id] = {**atten.dump_config()} config = { 'channels': chan_config, 'attenuators': atten_config } return config def load_config(self, config: Dict) -> None: for chan_id, chan in self._channels.items(): chan_config = config['channels'][chan_id] chan.load_config(chan_config) chan.configure_ui(chan_config) for chan_id, atten in self._chan_attens.items(): atten_config = config['attenuators'][chan_id] atten.configure_ui(atten_config)
3.4 build-app-ui
def build_ui(self): """Build the on-screen UI for the RF generator app.""" vbox = QVBoxLayout() hbox = QHBoxLayout() tty_combo = QComboBox() tty_combo.currentIndexChanged.connect( lambda idx, w=tty_combo: self.tty_changed(w, idx)) tty_combo.addItems(self.serial_ports) line_edit = QLineEdit() tty_combo.setLineEdit(line_edit) hbox.addWidget(QLabel("Control Port:")) hbox.addWidget(tty_combo) hbox.addStretch(1) self.cal_lbl = QLabel('') self.cal_lbl.setStyleSheet('QLabel { color: "red" }') hbox.addWidget(self.cal_lbl) hbox.addStretch(1) configuration_btn = QPushButton("Configuration") config_menu = QMenu() config_menu.addAction('Save', self.save_config) config_menu.addAction('Load', self.open_config) configuration_btn.setMenu(config_menu) hbox.addWidget(configuration_btn) initialize_btn = QPushButton("Initialize") initialize_btn.clicked.connect(self.initialize) hbox.addWidget(initialize_btn) vbox.addLayout(hbox) hbox = QHBoxLayout() hbox.addWidget( self._channels[RFGenApp.RFCH1_NAME].create_channel_group()) hbox.addWidget( self._channels[RFGenApp.RFCH2_NAME].create_channel_group()) vbox.addLayout(hbox) self._widget.setLayout(vbox) self.disable_rfgen() self._widget.setWindowTitle('Dual RF Generator') def show(self): self._widget.show()
4 class PLOChan
from typing import ( List, Dict, Optional ) import sys from time import sleep from argparse import ArgumentParser import asyncio import serial from serial.tools.list_ports import comports from scipy import interpolate from rfblocks import ( hmc833, HMC833Controller, pe43711, PE43711Controller, LogDetector, PwrDetectorController, BridgeCoupler, create_serial, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL ) from qtrfblocks import ( HMC833Module, StepAttenuator ) from PyQt5.QtWidgets import ( QWidget, QGroupBox, QHBoxLayout, QVBoxLayout, QPushButton, QLabel, QFormLayout, QCheckBox ) from PyQt5.QtCore import ( Qt, QObject, QThread, pyqtSignal, pyqtSlot ) class CalibrationDataException(Exception): """Raised when no calibration data is available. """ pass class PLOChan(QObject): MIN_FREQUENCY = 100.0 MIN_LEVEL = -100.0 GAIN_LEVELS = [hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, hmc833.OutputBufferGain.MAXGAIN] level_changed = pyqtSignal(float) def __init__(self, app: QObject, chan_id: str, plo_controller: HMC833Controller, atten_controller: PE43711Controller, det_controller: Optional[PwrDetectorController] = None, cal_data: Optional[Dict] = None, max_linear_gains: Optional[Dict] = None): """Create an instance of ``PLOChan``. :param app: The parent application for the channel display. :type app: QWidget :param plo_controller: HMC833 PLO controller instance. :type plo_controller: :py:class:`rfblocks.HMC833Controller` :param atten_controller: PE43711 step attenuator controller instance. :type atten_controller: :py:class:`rfblocks.PE43711Controller` :param det_controller: Controller for optional power detector. :type det_controller: :py:class:`rfblocks.PwrDetectorController` :param cal_data: Calibrated channel output power as a function of frequency for each HMC833 output buffer gain setting. Format: .. code-block:: python {'max-minus-9dB': {100.0: 6.54, .. , 6000.0: -7.03}, 'max-minus-6dB': .. .. 'maxgain' {100.0: 12.72, .. , 6000.0: -4.66}} :type cal_data: Dict :param max_linear_gains: HMC833 output buffer gain settings for optimizing signal linearity as a function of output frequency. Format: ..code-block:: python {50.0: hmc833.OutputBufferGain.MAXGAIN, ... 2000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 3000.0: hmc833.OutputBufferGain.MAXGAIN, 6000.0: hmc833.OutputBufferGain.MAXGAIN} :type max_linear_gains: Dict The following signal is defined: - :py:`level_changed(float)` """ super().__init__(app) self._app: QObject = app self._chan_id: str = chan_id self._plo_controller: HMC833Controller = plo_controller self._atten_controller: PE43711Controller = atten_controller self._det_controller: Optional[PwrDetectorController] = det_controller self._coupler: BridgeCoupler = BridgeCoupler() if self._app._nogui is False: self._plo_module: HMC833Module = \ HMC833Module(app._widget, plo_controller) self._atten_module: StepAttenuator = \ StepAttenuator(app._widget, atten_controller) self._cal_data = cal_data self._full_gains = {50.0: hmc833.OutputBufferGain.MAXGAIN} self._linear_gains = max_linear_gains self._max_gains = self._full_gains self._output_pwr_fns = {} self._max_output_pwr_fn = None if self._cal_data is not None: self._compute_output_pwr_fns() self._level = 0.0 self._group_box: Optional[QGroupBox] = None def _compute_output_pwr_fns(self): if self._cal_data is not None: gain_map = { 'max-minus-9dB': PLOChan.GAIN_LEVELS[0], 'max-minus-6dB': PLOChan.GAIN_LEVELS[1], 'max-minus-3dB': PLOChan.GAIN_LEVELS[2], 'maxgain': PLOChan.GAIN_LEVELS[3] } for gain_label, output_pwr in self._cal_data.items(): pwr_interp_fn = interpolate.interp1d( list(output_pwr.keys()), list(output_pwr.values()), kind='linear') gain_setting = gain_map[gain_label] self._output_pwr_fns[gain_setting] = pwr_interp_fn def _max_gain(self, freq): f_breakpts = list(self._max_gains.keys()) setting = self._max_gains[f_breakpts[-1]] for f_lo, f_hi in zip(f_breakpts, f_breakpts[1:]): if freq < f_hi: setting = self._max_gains[f_lo] break return setting def _update_channel_settings(self) -> float: actual_pwr = self._compute_channel_settings(self.level, self.plo_ctl.freq) self.level = actual_pwr def _compute_output_pwr(self, gain_level, freq) -> float: if freq < 1500.0 and freq > 1499.0: freq = 1499.0 elif freq > 3000.0 and freq < 3001.0: freq = 3001.0 pwr = float(self._output_pwr_fns[gain_level](freq)) return pwr def _compute_output_pwr_levels(self, freq: float): """Calculate output buffer power levels for the specified signal frequency. :param freq: The output signal frequency :type freq: float :returns: A tuple of a list and a dict. The list contains the output power settings in descending order. The dict contains the gain settings keyed using the signal output power associated with the setting. """ output_pwrs = [] gain_settings = {} max_gain_level = self._max_gain(freq) for gain_level in PLOChan.GAIN_LEVELS: if gain_level > max_gain_level: break p = self._compute_output_pwr(gain_level, freq) gain_settings[p] = hmc833.OutputBufferGain(gain_level) output_pwrs.append(p) output_pwrs.reverse() return output_pwrs, gain_settings def _compute_channel_settings(self, pwr: float, freq: float) -> float: """Calculate channel gain and attenuation settings for the currently specified output power level. :param pwr: The desired output signal power in dBm. :type pwr: float :param freq: The output signal frequency in MHz. :type freq: float :return: float The actual output power level using the calculated output buffer gain and attenuation settings. :raises: CalibrationDataException """ if len(self._output_pwr_fns) == 0: raise CalibrationDataException output_pwrs, gain_settings = self._compute_output_pwr_levels(freq) # Step through the calculated output power levels # and determine where the desired power output level # sits in relation to these. Select the output buffer # gain so that the PLO output power is equal to or above # the desired level and then select the step attenuator # setting to match the output power to the desired # power level. if pwr > output_pwrs[0]: # The desired power exceeds the maximum PLO power # output level. buffer_gain = gain_settings[output_pwrs[0]] attenuation = 0.0 actual_pwr = output_pwrs[0] elif pwr <= output_pwrs[-1]: buffer_gain = gain_settings[output_pwrs[-1]] attenuation = output_pwrs[-1] - pwr actual_pwr = pwr if attenuation > pe43711.MAX_ATTENUATION: attenuation = pe43711.MAX_ATTENUATION actual_pwr = output_pwrs[-1] - attenuation else: for pwr_hi, pwr_lo in zip(output_pwrs, output_pwrs[1:]): if pwr > pwr_lo: buffer_gain = gain_settings[pwr_hi] attenuation = pwr_hi - pwr actual_pwr = pwr break self.plo_ctl.buffer_gain = buffer_gain self.atten_ctl.attenuation = round(attenuation * 4)/4 return actual_pwr def power_range(self, freq): """Return the range of output signal powers at the specified frequency. :param freq: The output signal frequency :type freq: float :returns: A tuple containing the min and max output power available at the specified frequency. """ output_pwrs, _ = self._compute_output_pwr_levels(freq) max_pwr = output_pwrs[0] min_pwr = output_pwrs[-1] - pe43711.MAX_ATTENUATION return min_pwr, max_pwr @property def enabled(self) -> bool: """Enable/disable the on screen PLO channel UI components. """ if self._group_box: return self._group_box.isEnabled() else: return False @enabled.setter def enabled(self, en: bool) -> None: if self._group_box: self._group_box.setEnabled(en) @property def plo_ctl(self) -> HMC833Controller: """Return a reference to the channel's PLO controller. """ return self._plo_controller @property def atten_ctl(self) -> PE43711Controller: """Return a reference to the channel's step attenuator controller. """ return self._atten_controller @property def detector_ctl(self) -> Optional[PwrDetectorController]: """Return a reference to the channel's power detector controller. """ return self._det_controller @property def coupler(self) -> BridgeCoupler: """Return a reference to the channel's bridge coupler instance. """ return self._coupler @property def level(self) -> float: """The current channel output power (in dBm). """ if self._cal_data is None: raise CalibrationDataException return self._level @level.setter def level(self, lvl: float) -> None: """Set the current channel output level (in dBm). :param lvl: The desired output power in dBm :type lvl: float """ if self._cal_data is None: raise CalibrationDataException if self._level != lvl: actual_lvl = self._compute_channel_settings(lvl, self.plo_ctl.freq) if self._level != actual_lvl: self._level = actual_lvl if self._app._nogui is False: self.update_chan_level(actual_lvl) else: self._level = actual_lvl self.level_changed.emit(actual_lvl) def set_level(self, lvl: float) -> None: self.level = lvl @pyqtSlot(float) def update_chan_level(self, lvl: float) -> None: """Update the displayed channel output power. """ self._plo_module._level_box.setValue(lvl) def set_max_gains(self, state): if state == Qt.Checked: self._max_gains = self._linear_gains else: self._max_gains = self._full_gains self._update_channel_settings() def initialize_hw(self, ser: serial.Serial): """Initialize the HMC833 and PE43711 board hardware. """ self.plo_ctl.initialize(ser) self.atten_ctl.initialize(ser) def initialize_ui(self): self._plo_module.initialize_ui() def configure(self) -> bool: """Configure currently set frequency and level values in channel hardware. """ with create_serial(self._app.ctl_device, self._app.baudrate) as ser: lock_status = self.plo_ctl.configure(ser) self.atten_ctl.configure(ser) return lock_status def set_calibration_mode(self, mode: bool) -> None: """Set the channel calibration mode. :param mode: True to set the channel to calibration mode, False to set the channel for normal operation. :type mode: bool Setting the calibration mode to True will disconnect the HMC833 controller ``freq_changed`` signal and thus prevent the ``_update_channel_settings`` method being invoked. This in turn prevents any change to the channel attenuation. """ if mode is True: self.plo_ctl.freq_changed.disconnect( self._update_channel_settings) else: self.plo_ctl.freq_changed.connect( self._update_channel_settings) def gain_linearity_layout(self): vbox = QVBoxLayout() fbox = QFormLayout() hbox = QHBoxLayout() self._linearity_box = QCheckBox("") self._linearity_box.setChecked(False) self._linearity_box.stateChanged.connect(self.set_max_gains) fbox.addRow(QLabel("Optimize Linearity:"), self._linearity_box) hbox.addLayout(fbox) hbox.addStretch() vbox.addLayout(hbox) return vbox def create_channel_group(self): """Build the on-screen UI for the RF channel. """ self._group_box = QGroupBox(self._chan_id) vbox = QVBoxLayout() if self._cal_data is None: vbox.addLayout(self._plo_module.create_ploconfig_layout( has_level = False, has_configure = False)) vbox.addLayout(self._atten_module.create_attenuator_layout( has_configure = False)) else: vbox.addLayout(self._plo_module.create_ploconfig_layout( has_level = True, level_delegate = self, has_configure = False, has_divider_gain = False)) self._plo_module.enable_buffer_gain(False) self._plo_module._freq_box.setMinimum(PLOChan.MIN_FREQUENCY) self._plo_module._level_box.setMinimum(PLOChan.MIN_LEVEL) self.level_changed.connect(self.update_chan_level) self.plo_ctl.freq_changed.connect( self._update_channel_settings) vbox.addLayout(self._atten_module.create_attenuator_layout( has_configure = False)) self._atten_module._atten_box.setEnabled(False) self._update_channel_settings() vbox.addLayout(self.gain_linearity_layout()) hbox = QHBoxLayout() hbox.addStretch(1) self._config_btn = QPushButton("Configure") self._config_btn.clicked.connect(self.configure) hbox.addWidget(self._config_btn) vbox.addLayout(hbox) self._group_box.setLayout(vbox) return self._group_box
5 rfgen-service
class RFGenService(rpyc.Service): OUTPUT_PWR_DELTA_THRESHOLD = 0.3 OUTPUT_PWR_MAX_ADJUST_ATTEMPTS = 5 def __init__(self, app): super().__init__() self._app = app def initialize(self) -> None: """Initialize the service app and associated hardware modules. """ if not self._app._nogui: self._app.initialize_ui() self._app.initialize_hw() for chan in self.channels.values(): chan.configure() sleep(1.0) self._app.enable_rfgen() def initialize_detector(self, chan_id: str) -> bool: """Initialize the power detector connected to the specified channel. :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str :return: True if there is a detector connected, False if no detector connected. """ chan = self._app.channels[chan_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: try: chan.detector_ctl.initialize(ser) return True except DetectorReadError: return False @property def channels(self) -> Dict[str, PLOChan]: """Return a dictionary of the signal generator channel objects. :return: A dictionary of ``PLOChan`` instances keyed using the signal generator channel id ("Chan 1" and "Chan 2"). >>> rfgen = rpyc.connect('127.0.0.1', 18864) >>> rfgen.root.initialize() >>> chans = rfgen.root.channels >>> chan_id = 'Chan 1' >>> chans[chan_id].plo_ctl.freq = 1500.0 >>> rfgen.root.configure_plo_freq(chan_id) """ return self._app.channels def configure_plo_freq(self, chan_id: str) -> None: """Update the PLO module output frequency for the specified channel. :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str >>> rfgen = rpyc.connect('127.0.0.1', 18864) >>> rfgen.root.initialize() >>> chans = rfgen.root.channels >>> chan_id = 'Chan 1' >>> chans[chan_id].plo_ctl.freq = 1500.0 >>> rfgen.root.configure_plo_freq(chan_id) """ chan = self._app.channels[chan_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: chan.plo_ctl.configure_freq(ser) def configure_plo(self, chan_id: str) -> None: """Update the PLO module hardware for the specified channel. Note that this should only be used when the generator is in calibration mode. .. seealso:: :meth:`.RFGenService.set_calibration_mode` and :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str >>> rfgen = rpyc.connect('127.0.0.1', 18864) >>> rfgen.root.initialize() >>> chans = rfgen.root.channels >>> chan_id = 'Chan 1' >>> chans[chan_id].plo_ctl.buffer_gain = \ ... hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB >>> rfgen.root.configure_plo(chan_id) """ chan = self._app.channels[chan_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: chan.plo_ctl.configure(ser) def configure_atten(self, chan_id: str) -> None: """Update step attenuator module hardware for the specified channel. Note that this should only be used when the generator is in calibration mode. .. seealso:: :meth:`.RFGenService.set_calibration_mode` and :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str >>> rfgen = rpyc.connect('127.0.0.1', 18864) >>> rfgen.root.initialize() >>> chans = rfgen.root.channels >>> chan_id = 'Chan 2' >>> chans[chan_id].atten_ctl.attenuation = 10.0 >>> rfgen.root.configure_atten(chan_id) """ chan = self._app.channels[chan_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: chan.atten_ctl.configure(ser) def detector(self, chan_id: str) -> Optional[PwrDetectorController]: """Return the controller associated with the power detector connected to the specified channel. :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str :return: An instance of PwrDetectorController if there is a detector connected, None if no detector connected. """ chan = self._app.channels[chan_id] return chan.detector_ctl def measure_power(self, chan_id: str, freq: Optional[float] = None) -> float: """Measure the signal power currently at the input of the power detector currently attached to the specified channel. :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str :param freq: The input signal frequency (in MHz) :type freq: Optional[float] :return: The measured signal power in dBm. :raises: KeyError If there is no detector currently connected to the specified channel. """ chan = self._app.channels[chan_id] if chan.detector_ctl is None: raise KeyError(f"No detector attached to '{chan_id}'") with create_serial(self._app.ctl_device, self._app.baudrate) as ser: if freq is None: chan.detector_ctl.freq = chan.plo_ctl.freq else: chan.detector_ctl.freq = freq chan.detector_ctl.measure(ser) return chan.detector_ctl.pwr def set_channel_output(self, chan_id, freq, lvl, ignore_cal_state=False): """Set output frequency and power of the specified channel. :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str :param freq: The output signal frequency (in MHz) :type freq: float :param lvl: The output signal power (in dBm) :type lvl: float :param ignore_cal_state: If this is set to False (the default value) an InvalidCalibrationDataError will be raised if the power detector connected to the associated channel monitoring output is not calibrated. If set to True, the calibration state of the detector is ignored. :type ignore_cal_state: bool :return: True if the signal output was successfully set, False otherwise. :raises: KeyError If there is no detector currently connected to the specified channel. :raises: InvalidCalibrationDataError If the detector is not calibrated and the ignore_cal_status parameter is set to False. """ chan = self._app.channels[chan_id] if chan.detector_ctl is None: raise KeyError(f"No detector attached to '{chan_id}'") if ignore_cal_state is False: if chan.detector_ctl.cal_data is None: raise InvalidCalibrationDataError( f"Detector on '{chan_id}' is not calibrated") min_pwr, max_pwr = chan.power_range(freq) if lvl < min_pwr: p_des = min_pwr elif lvl > max_pwr: p_des = max_pwr else: p_des = lvl attempts = 0 while attempts < RFGenService.OUTPUT_PWR_MAX_ADJUST_ATTEMPTS: delta_p = p_des - (self.measure_power(chan_id, freq) - chan.coupler.coupling(freq) + chan.coupler.insertion_loss(freq)) attempts += 1 print(f'{attempts=}, {delta_p=}') if abs(delta_p) > RFGenService.OUTPUT_PWR_DELTA_THRESHOLD: chan.level = chan.level + delta_p chan.plo_ctl.freq = freq chan.configure() else: break if attempts < RFGenService.OUTPUT_PWR_MAX_ADJUST_ATTEMPTS: return True else: return False def configure(self, chan_id: str) -> None: """Update the channel hardware - PLO and attenuator. :param chan_id: A signal generator channel id. This will be one of "Chan 1" or "Chan 2". :type chan_id: str """ chan = self._app.channels[chan_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: chan.plo_ctl.configure(ser) chan.atten_ctl.configure(ser) def set_calibration_mode(self, mode: bool) -> None: """Set the calibration mode. :param mode: True to set the app to calibration mode, False to set the app for normal operation. :type mode: bool Setting the calibration mode to True will disconnect the HMC833 controller ``freq_changed`` signal for each signal generator channel. This then prevents the ``_update_channel_settings`` method being invoked which in turn prevents any change to the channel attenuation. >>> rfgen = rpyc.connect('127.0.0.1', 18864) >>> rfgen.root.initialize() >>> rfgen.root.set_calibration_mode(True) """ for chan in self._app.channels.values(): chan.set_calibration_mode(mode) if mode is True: self._app.cal_lbl.setText('Calibrating') else: self._app.cal_lbl.setText('') class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("RFGen rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname = self._host, port = self._port, protocol_config = { 'allow_all_attrs': True, 'allow_setattr': True, 'allow_pickle': True}) self._server.start() self.finished.emit()
6 Imports
from typing import ( List, Dict, Optional ) import sys from time import sleep import signal from argparse import ArgumentParser import asyncio import serial from serial.tools.list_ports import comports from scipy import interpolate from rfblocks import ( hmc833, HMC833Controller, pe43711, PE43711Controller, LogDetector, PwrDetectorController, InvalidCalibrationDataError, DetectorReadError, create_serial, write_cmd, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL ) from qtrfblocks import ( HMC833Module, StepAttenuator ) from qasync import ( QEventLoop, QThreadExecutor, asyncSlot, asyncClose ) from PyQt5.QtWidgets import ( QApplication, QWidget, QComboBox, QGroupBox, QHBoxLayout, QVBoxLayout, QPushButton, QFileDialog, QErrorMessage, QLabel, QMenu, QLineEdit, QFormLayout, QCheckBox ) from PyQt5.QtCore import ( Qt, QCoreApplication, QObject, QThread, pyqtSignal, pyqtSlot ) import rpyc from rpyc.utils.server import ThreadedServer from plochan import PLOChan, CalibrationDataException from rfgen_cal_data import channel_cal_data, max_linear_gains