rfgen.py
This document describes an app for controlling the low spurious RF signal generator design.
1 The Script
2 The main
Function
def main(): global server_thread <<process-cmdline-args>> spur_data = read_spur_data(args.spurdata) app = QApplication(sys.argv) loop = QEventLoop(app) loop.set_default_executor(QThreadExecutor(1)) asyncio.set_event_loop(loop) rfgen_app = RFGenApp( spur_data = spur_data, serial_device=args.device, baudrate=args.baudrate, no_cal_data = args.no_cal_data, headless = args.nogui) if not args.nogui: rfgen_app.show() server_thread = QThread() server = RPyCServer(LowSpurRFGenService(rfgen_app), args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() with loop: loop.set_debug(True) sys.exit(loop.run_forever())
2.1 process-cmdline-args
defaultBaud = 0 parser = ArgumentParser(description= '''A low spurious RF signal generator.''') parser.add_argument("--nogui", action='store_true', help="Disable GUI and run 'headless'") parser.add_argument("--no_cal_data", action='store_true', help="No output power calibration data should be used") parser.add_argument("-d", "--device", default=None, help="The hardware serial device") parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int, help="Baud rate (default: {})".format(defaultBaud)) parser.add_argument("-A", "--ipaddr", default="127.0.0.1", help="IP address for to bind the RPyC server instance") parser.add_argument("-P", "--port", default=18867, type=int, help="TCP port for the RPyC server instance") parser.add_argument("-D", "--spurdata", required=True, help="File containing freq. planner spur data") args = parser.parse_args()
3 App Class
<<low-spur-controller>> class RFGenApp(QWidget): <<rfgenapp-config>> def __init__(self, spur_data, config: Dict = DEFAULT_APP_CONFIG, serial_device: Optional[str] = None, baudrate: int = 0, no_cal_data: bool = False, headless: bool = False) -> None: super().__init__() self._nogui: bool = headless self._spur_data = spur_data self._ctl_device: str = serial_device self._baudrate: int = baudrate if baudrate == 0: self._baudrate = DEFAULT_BAUDRATE self._ref_plo: hmc833 = hmc833(**RFGenApp.HMC833_REF_HWCONF) self._plo: hmc833 = hmc833(**RFGenApp.HMC833_PLO_HWCONF) self._att: pe43711 = pe43711(**RFGenApp.STEPATTEN_HWCONF) self._ref_ctl: HMC833Controller = HMC833Controller( RFGenApp.REFPLO_NAME, self._ref_plo, RFGenApp.DEFAULT_APP_CONFIG[RFGenApp.REFPLO_NAME]) self._plo_ctl: LowSpurController = LowSpurController( RFGenApp.PLO_NAME, self._plo, self._ref_ctl, self._spur_data, RFGenApp.DEFAULT_APP_CONFIG[RFGenApp.PLO_NAME]) self._att_ctl: PE43711Controller = PE43711Controller( RFGenApp.ATTEN_NAME, self._att) chan_id = 'plo_chan' if no_cal_data is True: self._plo_chan = PLOChan(self, chan_id, self._plo_ctl, self._att_ctl) else: self._plo_chan = PLOChan(self, chan_id, self._plo_ctl, self._att_ctl, CHAN_CAL_DATA, max_linear_gains) if not headless: self.build_ui() <<build-app-ui>> <<initialize>> @property def ctl_device(self) -> Optional[str]: return self._ctl_device @property def baudrate(self) -> int: return self._baudrate @property def serial_ports(self) -> List: ports = [DEFAULT_SOCKET_URL] ports.extend([port.device for port in comports() if port.serial_number]) if self.ctl_device: if self.ctl_device in ports: ports.pop(ports.index(self.ctl_device)) ports.insert(0, self.ctl_device) return ports def tty_changed(self, combo: QComboBox, idx: int) -> None: self._ctl_device = combo.itemText(idx) def disable_rfgen(self) -> None: if not self._nogui: self._plo_chan.enabled = False def enable_rfgen(self) -> None: if not self._nogui: self._plo_chan.enabled = True @property def plo_chan(self) -> PLOChan: return self._plo_chan @property def att_ctl(self) -> PE43711Controller: return self._att_ctl
3.1 build-app-ui
def build_ui(self): """Build the on-screen UI for the RF generator app.""" vbox = QVBoxLayout() hbox = QHBoxLayout() tty_combo = QComboBox() tty_combo.currentIndexChanged.connect( lambda idx, w=tty_combo: self.tty_changed(w, idx)) tty_combo.addItems(self.serial_ports) line_edit = QLineEdit() tty_combo.setLineEdit(line_edit) hbox.addWidget(QLabel("Control Port:")) hbox.addWidget(tty_combo) hbox.addStretch(1) self.cal_lbl = QLabel('') self.cal_lbl.setStyleSheet('QLabel { color: "red" }') hbox.addWidget(self.cal_lbl) hbox.addStretch(1) initialize_btn = QPushButton("Initialize") initialize_btn.clicked.connect(self.initialize) hbox.addWidget(initialize_btn) vbox.addLayout(hbox) hbox = QHBoxLayout() hbox.addWidget(self._plo_chan.create_channel_group()) vbox.addLayout(hbox) self.setLayout(vbox) self.disable_rfgen() self.setWindowTitle('Low Spurious RF Generator')
3.2 low-spur-controller
class LowSpurController(HMC833Controller): def __init__(self, controller_id: str, device: hmc833, ref_ctl: HMC833Controller, spur_data: Dict, config: Optional[Dict] = None) -> None: super().__init__(controller_id, device, config) self._spur_data = spur_data self._ref_ctl = ref_ctl self._current_freq = 0.0 def initialize(self, ser: serial.Serial): print(f'LowSpurController.initialize: {self.freq=}') _ = write_cmd(ser, f'O{RFGenApp.PE42420_CTL_1}:O{RFGenApp.PE42420_CTL_2}') self.switch_to_low_band(ser) self._ref_ctl.initialize(ser) print(f' {self._ref_ctl.freq=}, {self._ref_ctl.buffer_gain=}') self.ref_freq = RFGenApp.DEFAULT_REF_OUTPUT_FREQ self.ref_div = 2 self.refsrc = hmc833.ReferenceSource.EXTERNAL self.freq = RFGenApp.DEFAULT_OUTPUT_FREQ super().initialize(ser) self._current_freq = self.freq def configure_freq(self, ser: serial.Serial): # retrieve reference parameters for the currently # set frequency value, self.freq. print('LowSpurController.configure_freq: ') if self.freq == self._current_freq: return True f = self.freq if f > 3000.0: f /= 2 self.switch_to_high_band(ser) else: self.switch_to_low_band(ser) f_str = str(round(f * 10)/10.0) print(f' {f=}, {f_str=}') try: ref_params = self._spur_data[f_str] except KeyError: # Fall back to using the on-board reference # and generate a warning message. print('No spur data available!') if self._ref_ctl.freq != ref_params.fref: self._ref_ctl.freq = ref_params.fref self._ref_ctl.configure_freq(ser) self.ref_freq = ref_params.fref self.ref_div = ref_params.R self.configure_refdiv(ser) locked = self.check_plo_lock(ser) print(f' Ref: {locked=}, {self._ref_ctl.freq=}, {ref_params=}') self.cp_gain = ref_params.icp self.configure_chargepump(ser) locked = super().configure_freq(ser) print(f' PLO: {locked=}, {self.freq=}') self._current_freq = self.freq return locked def switch_to_low_band(self, ser: serial.Serial): print('switch_to_low_band') _ = write_cmd(ser, f'L{RFGenApp.PE42420_CTL_1}:H{RFGenApp.PE42420_CTL_2}') def switch_to_high_band(self, ser: serial.Serial): print('switch_to_high_band') _ = write_cmd(ser, f'H{RFGenApp.PE42420_CTL_1}:L{RFGenApp.PE42420_CTL_2}')
3.3 rfgenapp-config
REFPLO_NAME: str = 'ref' PLO_NAME: str = 'plo1' ATTEN_NAME: str = 'att1' HMC833_REF_HWCONF: Dict = { 'sen': 'D0', 'ld_sdo': 'C4' } HMC833_PLO_HWCONF: Dict = { 'sen': 'D1', 'ld_sdo': 'C5', 'ref': 'D2' } STEPATTEN_HWCONF: Dict = { 'le': 'B7' } PE42420_CTL_1: str = 'D6' PE42420_CTL_2: str = 'D7' DEFAULT_OUTPUT_FREQ: float = 1000.0 DEFAULT_REF_FREQ: float = 50.0 DEFAULT_REF_OUTPUT_FREQ: float = 100.0 DEFAULT_OUTPUT_ATTENUATION: float = 10.0 DEFAULT_REF_CONFIG: Dict = { 'freq': DEFAULT_REF_OUTPUT_FREQ, 'ref_freq': DEFAULT_REF_FREQ, 'refsrc': hmc833.DEFAULT_REFSRC, 'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB, 'vco_mute': False } DEFAULT_PLO_CONFIG: Dict = { 'freq': DEFAULT_OUTPUT_FREQ, 'ref_freq': DEFAULT_REF_OUTPUT_FREQ, 'refsrc': hmc833.ReferenceSource.EXTERNAL, 'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB, 'vco_mute': False } DEFAULT_ATTEN_CONFIG: Dict = { 'attenuation': DEFAULT_OUTPUT_ATTENUATION } DEFAULT_APP_CONFIG: Dict = { REFPLO_NAME: DEFAULT_REF_CONFIG, PLO_NAME: DEFAULT_PLO_CONFIG, ATTEN_NAME: DEFAULT_ATTEN_CONFIG }
3.4 initialize
@asyncSlot() async def initialize(self): try: self.initialize_ui() loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.initialize_hw) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se)) finally: self.enable_rfgen() def initialize_ui(self) -> None: self._plo_chan.initialize_ui() def initialize_hw(self) -> None: with create_serial(self.ctl_device, self.baudrate) as ser: self._plo_chan.initialize_hw(ser)
4 read-spur-data
def read_spur_data(spur_data_file): spur_data = {} with open(spur_data_file) as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') line_count = 0 for row in csv_reader: if line_count < 7: line_count += 1 elif line_count == 7: header = row line_count += 1 else: key = row[3] line_count += 1 spur_data[key] = RefSetting( float(row[1]), int(row[2]), float(row[3]), ceil(float(row[6])*100)/100.0, int(row[7]), float(row[8]), float(row[9])) return spur_data
5 ref-settings-class
6 lowspur-rfgen-service
class LowSpurRFGenService(rpyc.Service): def __init__(self, app): super().__init__() self._app = app def initialize(self): """Initialize the service app and associated hardware modules. """ if not self._app._nogui: self._app.initialize_ui() self._app.initialize_hw() self._app.enable_rfgen() @property def plo_chan(self): """Return a reference to the signal generator channel object. :return: A ``PLOChan`` instance. .. code:: python rfgen = rpyc.connect('127.0.0.1', 18867) rfgen.root.initialize() plo = rfgen.root.plo_chan plo.plo_ctl.freq = 1500.0 rfgen.root.configure_plo_freq() """ return self._app.plo_chan def configure_plo_freq(self) -> bool: """Update the PLO module output frequency. .. code:: python rfgen = rpyc.connect('127.0.0.1', 18867) rfgen.root.initialize() plo = rfgen.root.plo_chan plo.plo_ctl.freq = 1500.0 rfgen.root.configure_plo_freq() """ plo = self._app.plo_chan with create_serial(self._app.ctl_device, self._app.baudrate) as ser: plo.plo_ctl.configure_freq(ser) return plo.plo_ctl.lock_status def configure_plo(self) -> bool: """Update the PLO module hardware. .. code:: python rfgen = rpyc.connect('127.0.0.1', 18867) rfgen.root.initialize() plo = rfgen.root.plo_chan plo.plo_ctl.buffer_gain = \ hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB rfgen.root.configure_plo() """ plo = self._app.plo_chan with create_serial(self._app.ctl_device, self._app.baudrate) as ser: plo.plo_ctl.configure(ser) return plo.plo_ctl.lock_status def configure_atten(self): """Update step attenuator module hardware. .. code:: python rfgen = rpyc.connect('127.0.0.1', 18867) rfgen.root.initialize() plo = rfgen.root.plo_chan plo.atten_ctl.attenuation = 10.0 rfgen.root.configure_atten() """ plo = self._app.plo_chan with create_serial(self._app.ctl_device, self._app.baudrate) as ser: plo.atten_ctl.configure(ser) def configure(self) -> bool: """Update the channel hardware - PLO and attenuator. """ plo = self._app.plo_chan with create_serial(self._app.ctl_device, self._app.baudrate) as ser: plo.plo_ctl.configure(ser) plo.atten_ctl.configure(ser) return plo.plo_ctl.lock_status def set_calibration_mode(self, mode): """Set the calibration mode. :param mode: True to set the app to calibration mode, False to set the app for normal operation. :type mode: bool Setting the calibration mode to True will disconnect the HMC833 controller ``freq_changed`` signal. This then prevents the ``_update_channel_settings`` method being invoked which in turn prevents any change to the channel attenuation. .. code:: python rfgen = rpyc.connect('127.0.0.1', 18864) rfgen.root.initialize() rfgen.root.set_calibration_mode(True) """ self._app.plo_chan.set_calibration_mode(mode) if mode is True: self._app.cal_lbl.setText('Calibrating') else: self._app.cal_lbl.setText('') class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("Low Spur. RFGen rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname = self._host, port = self._port, protocol_config = { 'allow_all_attrs': True, 'allow_setattr': True, 'allow_pickle': True}) self._server.start() self.finished.emit()
7 Imports
from typing import ( List, Dict, Optional ) from dataclasses import dataclass import sys from math import ceil import copy from math import sqrt, log10 from enum import Enum import asyncio from argparse import ArgumentParser import serial import json from serial.tools.list_ports import comports import csv from rfblocks import ( hmc833, HMC833Controller, create_serial, write_cmd, pe43711, PE43711Controller, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL ) from qtrfblocks import ( HMC833Module, StepAttenuator ) from qasync import ( QEventLoop, QThreadExecutor, asyncSlot, asyncClose ) from PyQt5.QtWidgets import ( QApplication, QComboBox, QLineEdit, QErrorMessage, QWidget, QGroupBox, QHBoxLayout, QVBoxLayout, QPushButton, QLabel, QFormLayout, QCheckBox ) from PyQt5.QtCore import ( Qt, QObject, QThread, pyqtSignal, pyqtSlot ) import rpyc from rpyc.utils.server import ThreadedServer from plochan import PLOChan, CalibrationDataException from rfgen_cal_data import CHAN_CAL_DATA, max_linear_gains