Open RF Prototyping

rfgen.py

This document describes an app for controlling the low spurious RF signal generator design.

1 The Script

# Generated from LowSpurRFGen-app.org
#

<<imports>>

<<ref-settings-class>>

<<rfgen-app-class>>

<<lowspur-rfgen-service>>

<<read-spur-data>>

<<main-func>>

if __name__ == '__main__':
    main()

2 The main Function

def main():

    global server_thread

    <<process-cmdline-args>>

    spur_data = read_spur_data(args.spurdata)

    app = QApplication(sys.argv)
    loop = QEventLoop(app)
    loop.set_default_executor(QThreadExecutor(1))
    asyncio.set_event_loop(loop)

    rfgen_app = RFGenApp(
        spur_data = spur_data,
        serial_device=args.device,
        baudrate=args.baudrate,
        no_cal_data = args.no_cal_data,
        headless = args.nogui)

    if not args.nogui:
        rfgen_app.show()

    server_thread = QThread()
    server = RPyCServer(LowSpurRFGenService(rfgen_app),
                        args.ipaddr,
                        args.port)
    server.moveToThread(server_thread)
    server_thread.started.connect(server.run)
    server.finished.connect(server_thread.quit)
    server.finished.connect(server.deleteLater)
    server_thread.finished.connect(server_thread.deleteLater)
    server_thread.start()

    with loop:
        loop.set_debug(True)
        sys.exit(loop.run_forever())

2.1 process-cmdline-args

defaultBaud = 0

parser = ArgumentParser(description=
                        '''A low spurious RF signal generator.''')


parser.add_argument("--nogui", action='store_true',
                    help="Disable GUI and run 'headless'")
parser.add_argument("--no_cal_data", action='store_true',
                    help="No output power calibration data should be used")
parser.add_argument("-d", "--device", default=None,
                    help="The hardware serial device")
parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int,
                    help="Baud rate (default: {})".format(defaultBaud))
parser.add_argument("-A", "--ipaddr", default="127.0.0.1",
                    help="IP address for to bind the RPyC server instance")
parser.add_argument("-P", "--port", default=18867, type=int,
                    help="TCP port for the RPyC server instance")
parser.add_argument("-D", "--spurdata", required=True,
                    help="File containing freq. planner spur data")
args = parser.parse_args()

3 App Class

<<low-spur-controller>>


class RFGenApp(QWidget):

    <<rfgenapp-config>>

    def __init__(self,
                 spur_data,
                 config: Dict = DEFAULT_APP_CONFIG,
                 serial_device: Optional[str] = None,
                 baudrate: int = 0,
                 no_cal_data: bool = False,
                 headless: bool = False) -> None:
        super().__init__()
        self._nogui: bool = headless
        self._spur_data = spur_data
        self._ctl_device: str = serial_device
        self._baudrate: int = baudrate
        if baudrate == 0:
            self._baudrate = DEFAULT_BAUDRATE

        self._ref_plo: hmc833 = hmc833(**RFGenApp.HMC833_REF_HWCONF)
        self._plo: hmc833 = hmc833(**RFGenApp.HMC833_PLO_HWCONF)
        self._att: pe43711 = pe43711(**RFGenApp.STEPATTEN_HWCONF)
        self._ref_ctl: HMC833Controller = HMC833Controller(
            RFGenApp.REFPLO_NAME,
            self._ref_plo,
            RFGenApp.DEFAULT_APP_CONFIG[RFGenApp.REFPLO_NAME])
        self._plo_ctl: LowSpurController = LowSpurController(
            RFGenApp.PLO_NAME,
            self._plo,
            self._ref_ctl,
            self._spur_data,
            RFGenApp.DEFAULT_APP_CONFIG[RFGenApp.PLO_NAME])
        self._att_ctl: PE43711Controller = PE43711Controller(
            RFGenApp.ATTEN_NAME, self._att)

        chan_id = 'plo_chan'
        if no_cal_data is True:
            self._plo_chan = PLOChan(self, chan_id,
                                     self._plo_ctl, self._att_ctl)
        else:
            self._plo_chan = PLOChan(self, chan_id,
                                     self._plo_ctl, self._att_ctl,
                                     CHAN_CAL_DATA,
                                     max_linear_gains)

        if not headless:
            self.build_ui()

    <<build-app-ui>>

    <<initialize>>

    @property
    def ctl_device(self) -> Optional[str]:
        return self._ctl_device

    @property
    def baudrate(self) -> int:
        return self._baudrate

    @property
    def serial_ports(self) -> List:
        ports = [DEFAULT_SOCKET_URL]
        ports.extend([port.device for port in comports()
                      if port.serial_number])
        if self.ctl_device:
            if self.ctl_device in ports:
                ports.pop(ports.index(self.ctl_device))
            ports.insert(0, self.ctl_device)
        return ports

    def tty_changed(self, combo: QComboBox, idx: int) -> None:
        self._ctl_device = combo.itemText(idx)

    def disable_rfgen(self) -> None:
        if not self._nogui:
            self._plo_chan.enabled = False

    def enable_rfgen(self) -> None:
        if not self._nogui:
            self._plo_chan.enabled = True

    @property
    def plo_chan(self) -> PLOChan:
        return self._plo_chan

    @property
    def att_ctl(self) -> PE43711Controller:
        return self._att_ctl

3.1 build-app-ui

def build_ui(self):
    """Build the on-screen UI for the RF generator app."""

    vbox = QVBoxLayout()

    hbox = QHBoxLayout()
    tty_combo = QComboBox()
    tty_combo.currentIndexChanged.connect(
        lambda idx, w=tty_combo: self.tty_changed(w, idx))
    tty_combo.addItems(self.serial_ports)
    line_edit = QLineEdit()
    tty_combo.setLineEdit(line_edit)
    hbox.addWidget(QLabel("Control Port:"))
    hbox.addWidget(tty_combo)
    hbox.addStretch(1)
    self.cal_lbl = QLabel('')
    self.cal_lbl.setStyleSheet('QLabel { color: "red" }')
    hbox.addWidget(self.cal_lbl)
    hbox.addStretch(1)
    initialize_btn = QPushButton("Initialize")
    initialize_btn.clicked.connect(self.initialize)
    hbox.addWidget(initialize_btn)

    vbox.addLayout(hbox)

    hbox = QHBoxLayout()

    hbox.addWidget(self._plo_chan.create_channel_group())
    vbox.addLayout(hbox)

    self.setLayout(vbox)
    self.disable_rfgen()
    self.setWindowTitle('Low Spurious RF Generator')

3.2 low-spur-controller

class LowSpurController(HMC833Controller):

    def __init__(self,
                 controller_id: str,
                 device: hmc833,
                 ref_ctl: HMC833Controller,
                 spur_data: Dict,
                 config: Optional[Dict] = None) -> None:
        super().__init__(controller_id, device, config)
        self._spur_data = spur_data
        self._ref_ctl = ref_ctl
        self._current_freq = 0.0

    def initialize(self, ser: serial.Serial):
        print(f'LowSpurController.initialize: {self.freq=}')
        _ = write_cmd(ser,
                      f'O{RFGenApp.PE42420_CTL_1}:O{RFGenApp.PE42420_CTL_2}')
        self.switch_to_low_band(ser)
        self._ref_ctl.initialize(ser)
        print(f'  {self._ref_ctl.freq=}, {self._ref_ctl.buffer_gain=}')
        self.ref_freq = RFGenApp.DEFAULT_REF_OUTPUT_FREQ
        self.ref_div = 2
        self.refsrc = hmc833.ReferenceSource.EXTERNAL
        self.freq = RFGenApp.DEFAULT_OUTPUT_FREQ
        super().initialize(ser)
        self._current_freq = self.freq

    def configure_freq(self, ser: serial.Serial):
        # retrieve reference parameters for the currently
        # set frequency value, self.freq.
        print('LowSpurController.configure_freq: ')
        if self.freq == self._current_freq:
            return True
        f = self.freq
        if f > 3000.0:
            f /= 2
            self.switch_to_high_band(ser)
        else:
            self.switch_to_low_band(ser)

        f_str = str(round(f * 10)/10.0)
        print(f'  {f=}, {f_str=}')
        try:
            ref_params = self._spur_data[f_str]
        except KeyError:
            # Fall back to using the on-board reference
            # and generate a warning message.
            print('No spur data available!')

        if self._ref_ctl.freq != ref_params.fref:
            self._ref_ctl.freq = ref_params.fref
            self._ref_ctl.configure_freq(ser)
            self.ref_freq = ref_params.fref
            self.ref_div = ref_params.R
            self.configure_refdiv(ser)
            locked = self.check_plo_lock(ser)
            print(f'  Ref: {locked=}, {self._ref_ctl.freq=}, {ref_params=}')
        self.cp_gain = ref_params.icp
        self.configure_chargepump(ser)
        locked = super().configure_freq(ser)
        print(f'  PLO: {locked=}, {self.freq=}')
        self._current_freq = self.freq
        return locked

    def switch_to_low_band(self, ser: serial.Serial):
        print('switch_to_low_band')
        _ = write_cmd(ser,
                      f'L{RFGenApp.PE42420_CTL_1}:H{RFGenApp.PE42420_CTL_2}')

    def switch_to_high_band(self, ser: serial.Serial):
        print('switch_to_high_band')
        _ = write_cmd(ser,
                      f'H{RFGenApp.PE42420_CTL_1}:L{RFGenApp.PE42420_CTL_2}')

3.3 rfgenapp-config

REFPLO_NAME: str = 'ref'
PLO_NAME: str = 'plo1'
ATTEN_NAME: str = 'att1'

HMC833_REF_HWCONF: Dict = { 'sen': 'D0', 'ld_sdo': 'C4' }
HMC833_PLO_HWCONF: Dict = { 'sen': 'D1', 'ld_sdo': 'C5', 'ref': 'D2' }
STEPATTEN_HWCONF: Dict = { 'le': 'B7' }
PE42420_CTL_1: str = 'D6'
PE42420_CTL_2: str = 'D7'

DEFAULT_OUTPUT_FREQ: float = 1000.0
DEFAULT_REF_FREQ: float = 50.0
DEFAULT_REF_OUTPUT_FREQ: float = 100.0
DEFAULT_OUTPUT_ATTENUATION: float = 10.0

DEFAULT_REF_CONFIG: Dict = {
    'freq': DEFAULT_REF_OUTPUT_FREQ,
    'ref_freq': DEFAULT_REF_FREQ,
    'refsrc': hmc833.DEFAULT_REFSRC,
    'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
    'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB,
    'vco_mute': False
}

DEFAULT_PLO_CONFIG: Dict = {
    'freq': DEFAULT_OUTPUT_FREQ,
    'ref_freq': DEFAULT_REF_OUTPUT_FREQ,
    'refsrc': hmc833.ReferenceSource.EXTERNAL,
    'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
    'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB,
    'vco_mute': False
}

DEFAULT_ATTEN_CONFIG: Dict = {
    'attenuation': DEFAULT_OUTPUT_ATTENUATION
}

DEFAULT_APP_CONFIG: Dict = {
    REFPLO_NAME: DEFAULT_REF_CONFIG,
    PLO_NAME: DEFAULT_PLO_CONFIG,
    ATTEN_NAME: DEFAULT_ATTEN_CONFIG
}

3.4 initialize

@asyncSlot()
async def initialize(self):
    try:
        self.initialize_ui()
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.initialize_hw)
    except serial.serialutil.SerialException as se:
        error_dialog = QErrorMessage(self)
        error_dialog.showMessage(str(se))
    finally:
        self.enable_rfgen()

def initialize_ui(self) -> None:
    self._plo_chan.initialize_ui()

def initialize_hw(self) -> None:
    with create_serial(self.ctl_device,
                       self.baudrate) as ser:
        self._plo_chan.initialize_hw(ser)

4 read-spur-data

def read_spur_data(spur_data_file):
    spur_data = {}
    with open(spur_data_file) as csv_file:
        csv_reader = csv.reader(csv_file, delimiter=',')
        line_count = 0
        for row in csv_reader:
            if line_count < 7:
                line_count += 1
            elif line_count == 7:
                header = row
                line_count += 1
            else:
                key = row[3]
                line_count += 1
                spur_data[key] = RefSetting(
                    float(row[1]), int(row[2]), float(row[3]),
                    ceil(float(row[6])*100)/100.0, int(row[7]),
                    float(row[8]), float(row[9]))
    return spur_data

5 ref-settings-class

@dataclass
class RefSetting:
    vco_freq: float
    out_div: int
    output_freq: float
    fref: float
    R: int
    fpd: float
    icp: float

6 lowspur-rfgen-service

class LowSpurRFGenService(rpyc.Service):

    def __init__(self, app):
        super().__init__()
        self._app = app

    def initialize(self):
        """Initialize the service app and associated hardware modules.
        """
        if not self._app._nogui:
            self._app.initialize_ui()
        self._app.initialize_hw()
        self._app.enable_rfgen()

    @property
    def plo_chan(self):
        """Return a reference to the signal generator channel object.

        :return: A ``PLOChan`` instance.

        .. code:: python

            rfgen = rpyc.connect('127.0.0.1', 18867)
            rfgen.root.initialize()
            plo = rfgen.root.plo_chan

            plo.plo_ctl.freq = 1500.0
            rfgen.root.configure_plo_freq()
        """
        return self._app.plo_chan

    def configure_plo_freq(self) -> bool:
        """Update the PLO module output frequency.

        .. code:: python

            rfgen = rpyc.connect('127.0.0.1', 18867)
            rfgen.root.initialize()
            plo = rfgen.root.plo_chan

            plo.plo_ctl.freq = 1500.0
            rfgen.root.configure_plo_freq()
        """
        plo = self._app.plo_chan
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            plo.plo_ctl.configure_freq(ser)
        return plo.plo_ctl.lock_status

    def configure_plo(self) -> bool:
        """Update the PLO module hardware.

        .. code:: python

            rfgen = rpyc.connect('127.0.0.1', 18867)
            rfgen.root.initialize()
            plo = rfgen.root.plo_chan

            plo.plo_ctl.buffer_gain = \
                hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB
            rfgen.root.configure_plo()
        """
        plo = self._app.plo_chan
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            plo.plo_ctl.configure(ser)
        return plo.plo_ctl.lock_status

    def configure_atten(self):
        """Update step attenuator module hardware.

        .. code:: python

            rfgen = rpyc.connect('127.0.0.1', 18867)
            rfgen.root.initialize()
            plo = rfgen.root.plo_chan

            plo.atten_ctl.attenuation = 10.0
            rfgen.root.configure_atten()
        """
        plo = self._app.plo_chan
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            plo.atten_ctl.configure(ser)

    def configure(self) -> bool:
        """Update the channel hardware - PLO and attenuator.
        """
        plo = self._app.plo_chan
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            plo.plo_ctl.configure(ser)
            plo.atten_ctl.configure(ser)
        return plo.plo_ctl.lock_status

    def set_calibration_mode(self, mode):
        """Set the calibration mode.

        :param mode: True to set the app to calibration mode,
            False to set the app for normal operation.
        :type mode: bool

        Setting the calibration mode to True will disconnect the
        HMC833 controller ``freq_changed`` signal.
        This then prevents the ``_update_channel_settings``
        method being invoked which in turn prevents any change to
        the channel attenuation.

        .. code:: python

            rfgen = rpyc.connect('127.0.0.1', 18864)
            rfgen.root.initialize()
            rfgen.root.set_calibration_mode(True)
        """
        self._app.plo_chan.set_calibration_mode(mode)
        if mode is True:
            self._app.cal_lbl.setText('Calibrating')
        else:
            self._app.cal_lbl.setText('')


class RPyCServer(QObject):

    finished = pyqtSignal()

    def __init__(self, serviceInst, host, port):
        super().__init__()
        self._serviceInst = serviceInst
        self._host = host
        self._port = port

    def run(self):
        print("Low Spur. RFGen rpyc service on {}:{}".format(self._host, self._port))
        self._server = ThreadedServer(
            self._serviceInst,
            hostname = self._host,
            port = self._port,
            protocol_config = {
                'allow_all_attrs': True,
                'allow_setattr': True,
                'allow_pickle': True})
        self._server.start()
        self.finished.emit()

7 Imports

from typing import (
    List, Dict, Optional
)
from dataclasses import dataclass

import sys
from math import ceil
import copy
from math import sqrt, log10
from enum import Enum
import asyncio
from argparse import ArgumentParser
import serial
import json
from serial.tools.list_ports import comports
import csv

from rfblocks import (
    hmc833, HMC833Controller, create_serial, write_cmd,
    pe43711, PE43711Controller, DEFAULT_BAUDRATE,
    DEFAULT_SOCKET_URL
)

from qtrfblocks import (
    HMC833Module, StepAttenuator
)

from qasync import (
    QEventLoop, QThreadExecutor, asyncSlot, asyncClose
)

from PyQt5.QtWidgets import (
    QApplication, QComboBox, QLineEdit, QErrorMessage,
    QWidget, QGroupBox, QHBoxLayout, QVBoxLayout, QPushButton, QLabel,
    QFormLayout, QCheckBox
)

from PyQt5.QtCore import (
    Qt, QObject, QThread, pyqtSignal, pyqtSlot
)

import rpyc
from rpyc.utils.server import ThreadedServer

from plochan import PLOChan, CalibrationDataException
from rfgen_cal_data import CHAN_CAL_DATA, max_linear_gains