Open RF Prototyping

rfgen.py

This document describes an app for controlling the simple RF generator reference design. When executed, the script displays a GUI window which allows the generator reference design to be controlled via it's USB connection.

1 The Script

Generate the script using C-c C-v t.

#
# Generated from rfgen-app.org
#
<<imports>>

<<app-class>>

<<rfgen-service>>

<<main-func>>

if __name__ == '__main__':
    main()

2 The main Function

The asyncio framework is used to coordinate communications over the serial device without blocking user interface events. For this purpose we make use of QEventLoop and QThreadExecutor from the qasync package.

The event loop is equipped with an instance of QThreadExecutor which executes a single thread. The idea here is to ensure that requests to the ECApp serial device will execute one by one in a manner which will not block the operation of the Qt event loop.

def main():
    global server_thread

    <<process-cmdline-args>>

    # This ensures that Cntl-C will work as expected:
    signal.signal(signal.SIGINT, signal.SIG_DFL)

    if args.nogui is True:
        app = QCoreApplication(sys.argv)
    else:
        app = QApplication(sys.argv)
    loop = QEventLoop(app)
    loop.set_default_executor(QThreadExecutor(1))
    asyncio.set_event_loop(loop)

    rfgen_app = RFGenApp(
        serial_device = args.device,
        baudrate = args.baudrate,
        no_cal_data = args.no_cal_data,
        headless = args.nogui)

    if not args.nogui:
        rfgen_app.show()

    server_thread = QThread()
    server = RPyCServer(RFGenService(rfgen_app),
                        args.ipaddr,
                        args.port)
    server.moveToThread(server_thread)
    server_thread.started.connect(server.run)
    server.finished.connect(server_thread.quit)
    server.finished.connect(server.deleteLater)
    server_thread.finished.connect(server_thread.deleteLater)
    server_thread.start()

    with loop:
        loop.set_debug(True)
        sys.exit(loop.run_forever())

    #sys.exit(app.exec_())

2.1 process-cmdline-args

Command line arguments also allow the specification of a serial device and baud rate for the purposes of testing.

defaultBaud = 0

parser = ArgumentParser(description=
                        '''A simple RF signal generator.''')

parser.add_argument("--nogui", action='store_true',
                    help="Disable GUI and run 'headless'")
parser.add_argument("--no_cal_data", action='store_true',
                    help="No output power calibration data should be used")
parser.add_argument("-d", "--device", default=None,
                    help="The serial device for testing")
parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int,
                    help="Baud rate (default: {})".format(defaultBaud))
parser.add_argument("-A", "--ipaddr", default="127.0.0.1",
                    help="IP address for to bind the RPyC server instance")
parser.add_argument("-P", "--port", default=18864, type=int,
                    help="TCP port for the RPyC server instance")
args = parser.parse_args()

3 app-class

class RFGenApp(QObject):

    <<hardware-config>>

    def __init__(self,
                 config: Dict = DEFAULT_APP_CONFIG,
                 serial_device: Optional[str] = None,
                 baudrate: int = 0,
                 no_cal_data: bool = False,
                 headless: bool = False) -> None:
        super().__init__()
        self._nogui: bool = headless
        if self._nogui is False:
            self._widget = QWidget()
        self._ctl_device: str = serial_device
        self._baudrate: int = baudrate
        if baudrate == 0:
            self._baudrate = DEFAULT_BAUDRATE

        self._synths: List[hmc833] = [
            hmc833(**hw_conf) for hw_conf in RFGenApp.RF_SYNTHS_HWCONF
        ]
        self._stepattens: List[pe43711] = [
            pe43711(**hw_conf) for hw_conf in RFGenApp.STEPATTENS_HWCONF
        ]
        self._detectors: List[LogDetector] = [
            LogDetector(**hw_conf) for hw_conf in RFGenApp.DETECTORS_HWCONF
        ]
        d: Dict = config['channels']
        z = zip(d.keys(), self._synths, d.values())
        self._synth_controllers = {
            ctl_id: HMC833Controller(ctl_id,
                                     synth,
                                     ctl_config)
            for ctl_id, synth, ctl_config in z
        }
        z = zip(d.keys(), self._stepattens)
        self._atten_controllers = {
            ctl_id: PE43711Controller(ctl_id, atten) for ctl_id, atten in z
        }
        z = zip(d.keys(), self._detectors)
        self._det_controllers = {
            ctl_id: PwrDetectorController(ctl_id, det) for ctl_id, det in z
        }
        if no_cal_data is True:
            self._channels: Dict = {
                chan_id: PLOChan(self, chan_id,
                                 self._synth_controllers[chan_id],
                                 self._atten_controllers[chan_id],
                                 self._det_controllers[chan_id])
                for chan_id in d.keys()}
        else:
            self._channels: Dict = {
                chan_id: PLOChan(self, chan_id,
                                 self._synth_controllers[chan_id],
                                 self._atten_controllers[chan_id],
                                 self._det_controllers[chan_id],
                                 channel_cal_data[chan_id],
                                 max_linear_gains)
                for chan_id in d.keys()}

        if not headless:
            self.build_ui()

    <<build-app-ui>>

    <<initialize>>

    <<app-configuration>>

    @property
    def channels(self):
        return self._channels

    @property
    def ctl_device(self) -> str:
        return self._ctl_device

    @property
    def baudrate(self) -> int:
        return self._baudrate

    @property
    def serial_ports(self) -> List:
        ports = [DEFAULT_SOCKET_URL]
        ports.extend([port.device for port in comports()
                      if port.serial_number])
        if self.ctl_device:
            if self.ctl_device in ports:
                ports.pop(ports.index(self.ctl_device))
            ports.insert(0, self.ctl_device)
        return ports

    def tty_changed(self, combo: QComboBox, idx: int) -> None:
        self._ctl_device = combo.itemText(idx)

    def disable_rfgen(self) -> None:
        if not self._nogui:
            for chan in self._channels.values():
                chan.enabled = False

    def enable_rfgen(self) -> None:
        if not self._nogui:
            for chan in self._channels.values():
                chan.enabled = True

3.1 hardware-config

RFCH1_NAME: str = 'Chan 1'
RFCH2_NAME: str = 'Chan 2'

HMC833_CH1_HWCONF: Dict = { 'sen': 'D1', 'ld_sdo': 'C4'}
HMC833_CH2_HWCONF: Dict = { 'sen': 'D6', 'ld_sdo': 'D7'}

STEPATTEN_CH1_HWCONF: Dict = { 'le': 'D0'}
STEPATTEN_CH2_HWCONF: Dict = { 'le': 'B0'}

DETECTOR_CH1_HWCONF: Dict = { 'cs': 'D5'}
DETECTOR_CH2_HWCONF: Dict = { 'cs': 'D4'}

RF_SYNTHS_HWCONF: List = [ HMC833_CH1_HWCONF, HMC833_CH2_HWCONF ]
STEPATTENS_HWCONF: List = [ STEPATTEN_CH1_HWCONF, STEPATTEN_CH2_HWCONF ]
DETECTORS_HWCONF: List = [ DETECTOR_CH1_HWCONF, DETECTOR_CH2_HWCONF ]

DEFAULT_PLO_FREQ: float = 1000.0
DEFAULT_REF_FREQ: float = 50.0

DEFAULT_SYNTH_CONFIG: Dict = {
    'freq': DEFAULT_PLO_FREQ,
    'ref_freq': DEFAULT_REF_FREQ,
    'refsrc': hmc833.DEFAULT_REFSRC,
    'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
    'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB,
    'vco_mute': False
}

DEFAULT_APP_CONFIG: Dict = {
    'channels': {
        RFCH1_NAME: { 'label': RFCH1_NAME, **DEFAULT_SYNTH_CONFIG },
        RFCH2_NAME: { 'label': RFCH2_NAME, **DEFAULT_SYNTH_CONFIG }
    }
}

3.2 initialize

@asyncSlot()
async def initialize(self):
    try:
        self.initialize_ui()
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.initialize_hw)
        for chan in self._channels.values():
            chan.configure()
            sleep(1.0)
    except serial.serialutil.SerialException as se:
        error_dialog = QErrorMessage(self)
        error_dialog.showMessage(str(se))
    finally:
        self.enable_rfgen()

def initialize_ui(self) -> None:
    for chan in self._channels.values():
        chan.initialize_ui()

def initialize_hw(self) -> None:
    with create_serial(self.ctl_device,
                       self.baudrate) as ser:
        sleep(0.1)
        for chan in self._channels.values():
            chan.initialize_hw(ser)
            sleep(0.1)

3.3 app-configuration

def save_config(self) -> None:
    config = self.dump_config()
    filter_str = "JSON Configuration (*.json)"
    def_filepath = '.'
    name, selfilter = QFileDialog.getSaveFileName(
        self, 'Save configuration',
        def_filepath, filter_str)
    output_file = str(name).strip()
    if len(output_file) == 0:
        return
    with open(output_file, 'w') as fd:
        json.dump(config, fd)

def open_config(self) -> None:
    filter_str = "JSON Configuration (*.json)"
    def_filepath = '.'
    names, selfilter = QFileDialog.getOpenFileNames(
        self, 'Load Configuration',
        def_filepath, filter_str)
    if len(names):
        input_file = names[0]
        with open(input_file, 'r') as fd:
            config = json.load(fd)
        self.load_config(config)

def dump_config(self) -> Dict:
    chan_config = {}
    for chan_id, chan in self._channels.items():
        chan_config[chan_id] = { 'label': chan_id, **chan.dump_config()}
    atten_config = {}
    for chan_id, atten in self._chan_attenuators.items():
        atten_config[chan_id] = {**atten.dump_config()}
    config = {
        'channels': chan_config,
        'attenuators': atten_config
    }
    return config

def load_config(self, config: Dict) -> None:
    for chan_id, chan in self._channels.items():
        chan_config = config['channels'][chan_id]
        chan.load_config(chan_config)
        chan.configure_ui(chan_config)
    for chan_id, atten in self._chan_attens.items():
        atten_config = config['attenuators'][chan_id]
        atten.configure_ui(atten_config)

3.4 build-app-ui

def build_ui(self):
    """Build the on-screen UI for the RF generator app."""

    vbox = QVBoxLayout()

    hbox = QHBoxLayout()
    tty_combo = QComboBox()
    tty_combo.currentIndexChanged.connect(
        lambda idx, w=tty_combo: self.tty_changed(w, idx))
    tty_combo.addItems(self.serial_ports)
    line_edit = QLineEdit()
    tty_combo.setLineEdit(line_edit)
    hbox.addWidget(QLabel("Control Port:"))
    hbox.addWidget(tty_combo)
    hbox.addStretch(1)
    self.cal_lbl = QLabel('')
    self.cal_lbl.setStyleSheet('QLabel { color: "red" }')
    hbox.addWidget(self.cal_lbl)
    hbox.addStretch(1)
    configuration_btn = QPushButton("Configuration")
    config_menu = QMenu()
    config_menu.addAction('Save', self.save_config)
    config_menu.addAction('Load', self.open_config)
    configuration_btn.setMenu(config_menu)
    hbox.addWidget(configuration_btn)
    initialize_btn = QPushButton("Initialize")
    initialize_btn.clicked.connect(self.initialize)
    hbox.addWidget(initialize_btn)

    vbox.addLayout(hbox)

    hbox = QHBoxLayout()

    hbox.addWidget(
        self._channels[RFGenApp.RFCH1_NAME].create_channel_group())
    hbox.addWidget(
        self._channels[RFGenApp.RFCH2_NAME].create_channel_group())
    vbox.addLayout(hbox)

    self._widget.setLayout(vbox)
    self.disable_rfgen()
    self._widget.setWindowTitle('Dual RF Generator')

def show(self):
    self._widget.show()

4 class PLOChan

PLOChan UI

Figure 1: PLOChan UI controls

from typing import (
    List, Dict, Optional
)

import sys
from time import sleep
from argparse import ArgumentParser
import asyncio
import serial
from serial.tools.list_ports import comports
from scipy import interpolate

from rfblocks import (
    hmc833, HMC833Controller, pe43711, PE43711Controller,
    LogDetector, PwrDetectorController, BridgeCoupler,
    create_serial, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL
)

from qtrfblocks import (
    HMC833Module, StepAttenuator
)

from PyQt5.QtWidgets import (
    QWidget, QGroupBox, QHBoxLayout, QVBoxLayout, QPushButton, QLabel,
    QFormLayout, QCheckBox
)

from PyQt5.QtCore import (
    Qt, QObject, QThread, pyqtSignal, pyqtSlot
)


class CalibrationDataException(Exception):
    """Raised when no calibration data is available.
    """
    pass

class PLOChan(QObject):

    MIN_FREQUENCY = 100.0
    MIN_LEVEL = -100.0

    GAIN_LEVELS = [hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
                   hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
                   hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
                   hmc833.OutputBufferGain.MAXGAIN]

    level_changed = pyqtSignal(float)

    def __init__(self,
                 app: QObject,
                 chan_id: str,
                 plo_controller: HMC833Controller,
                 atten_controller: PE43711Controller,
                 det_controller: Optional[PwrDetectorController] = None,
                 cal_data: Optional[Dict] = None,
                 max_linear_gains: Optional[Dict] = None):
        """Create an instance of ``PLOChan``.

        :param app: The parent application for the channel display.
        :type app: QWidget
        :param plo_controller: HMC833 PLO controller instance.
        :type plo_controller: :py:class:`rfblocks.HMC833Controller`
        :param atten_controller: PE43711 step attenuator controller
            instance.
        :type atten_controller: :py:class:`rfblocks.PE43711Controller`
        :param det_controller: Controller for optional power detector.
        :type det_controller: :py:class:`rfblocks.PwrDetectorController`
        :param cal_data: Calibrated channel output power as a function
            of frequency for each HMC833 output buffer gain setting.
            Format:

            .. code-block:: python

               {'max-minus-9dB': {100.0: 6.54, .. , 6000.0: -7.03},
                'max-minus-6dB': ..
                ..
                'maxgain' {100.0: 12.72, .. , 6000.0: -4.66}}

        :type cal_data: Dict
        :param max_linear_gains: HMC833 output buffer gain settings
            for optimizing signal linearity as a function of
            output frequency.  Format:

            ..code-block:: python

              {50.0: hmc833.OutputBufferGain.MAXGAIN,
               ...
               2000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
               3000.0: hmc833.OutputBufferGain.MAXGAIN,
               6000.0: hmc833.OutputBufferGain.MAXGAIN}

        :type max_linear_gains: Dict

        The following signal is defined:

            - :py:`level_changed(float)`
        """
        super().__init__(app)
        self._app: QObject = app
        self._chan_id: str = chan_id
        self._plo_controller: HMC833Controller = plo_controller
        self._atten_controller: PE43711Controller = atten_controller
        self._det_controller: Optional[PwrDetectorController] = det_controller
        self._coupler: BridgeCoupler = BridgeCoupler()
        if self._app._nogui is False:
            self._plo_module: HMC833Module = \
                HMC833Module(app._widget, plo_controller)
            self._atten_module: StepAttenuator = \
                StepAttenuator(app._widget, atten_controller)
        self._cal_data = cal_data
        self._full_gains = {50.0: hmc833.OutputBufferGain.MAXGAIN}
        self._linear_gains = max_linear_gains
        self._max_gains = self._full_gains
        self._output_pwr_fns = {}
        self._max_output_pwr_fn = None
        if self._cal_data is not None:
            self._compute_output_pwr_fns()
        self._level = 0.0

        self._group_box: Optional[QGroupBox] = None

    def _compute_output_pwr_fns(self):
        if self._cal_data is not None:
            gain_map = {
                'max-minus-9dB': PLOChan.GAIN_LEVELS[0],
                'max-minus-6dB': PLOChan.GAIN_LEVELS[1],
                'max-minus-3dB': PLOChan.GAIN_LEVELS[2],
                'maxgain': PLOChan.GAIN_LEVELS[3]
            }
            for gain_label, output_pwr in self._cal_data.items():
                pwr_interp_fn = interpolate.interp1d(
                    list(output_pwr.keys()),
                    list(output_pwr.values()),
                    kind='linear')
                gain_setting = gain_map[gain_label]
                self._output_pwr_fns[gain_setting] = pwr_interp_fn

    def _max_gain(self, freq):
        f_breakpts = list(self._max_gains.keys())
        setting = self._max_gains[f_breakpts[-1]]
        for f_lo, f_hi in zip(f_breakpts, f_breakpts[1:]):
            if freq < f_hi:
                setting = self._max_gains[f_lo]
                break
        return setting

    def _update_channel_settings(self) -> float:
        actual_pwr = self._compute_channel_settings(self.level,
                                                    self.plo_ctl.freq)
        self.level = actual_pwr

    def _compute_output_pwr(self, gain_level, freq) -> float:
        if freq < 1500.0 and freq > 1499.0:
            freq = 1499.0
        elif freq > 3000.0 and freq < 3001.0:
            freq = 3001.0
        pwr = float(self._output_pwr_fns[gain_level](freq))
        return pwr

    def _compute_output_pwr_levels(self, freq: float):
        """Calculate output buffer power levels for the specified
           signal frequency.

        :param freq: The output signal frequency
        :type freq: float

        :returns: A tuple of a list and a dict.  The list contains
            the output power settings in descending order.
            The dict contains the gain settings keyed using the
            signal output power associated with the setting.
        """
        output_pwrs = []
        gain_settings = {}
        max_gain_level = self._max_gain(freq)
        for gain_level in PLOChan.GAIN_LEVELS:
            if gain_level > max_gain_level:
                break
            p = self._compute_output_pwr(gain_level, freq)
            gain_settings[p] = hmc833.OutputBufferGain(gain_level)
            output_pwrs.append(p)
        output_pwrs.reverse()
        return output_pwrs, gain_settings

    def _compute_channel_settings(self, pwr: float, freq: float) -> float:
        """Calculate channel gain and attenuation settings for the
        currently specified output power level.

        :param pwr: The desired output signal power in dBm.
        :type pwr: float
        :param freq: The output signal frequency in MHz.
        :type freq: float

        :return: float
            The actual output power level using the calculated
            output buffer gain and attenuation settings.

        :raises: CalibrationDataException
        """
        if len(self._output_pwr_fns) == 0:
            raise CalibrationDataException

        output_pwrs, gain_settings = self._compute_output_pwr_levels(freq)

        # Step through the calculated output power levels
        # and determine where the desired power output level
        # sits in relation to these.  Select the output buffer
        # gain so that the PLO output power is equal to or above
        # the desired level and then select the step attenuator
        # setting to match the output power to the desired
        # power level.
        if pwr > output_pwrs[0]:
            # The desired power exceeds the maximum PLO power
            # output level.
            buffer_gain = gain_settings[output_pwrs[0]]
            attenuation = 0.0
            actual_pwr = output_pwrs[0]
        elif pwr <= output_pwrs[-1]:
            buffer_gain = gain_settings[output_pwrs[-1]]
            attenuation = output_pwrs[-1] - pwr
            actual_pwr = pwr
            if attenuation > pe43711.MAX_ATTENUATION:
                attenuation = pe43711.MAX_ATTENUATION
                actual_pwr = output_pwrs[-1] - attenuation
        else:
            for pwr_hi, pwr_lo in zip(output_pwrs, output_pwrs[1:]):
                if pwr > pwr_lo:
                    buffer_gain = gain_settings[pwr_hi]
                    attenuation = pwr_hi - pwr
                    actual_pwr = pwr
                    break
        self.plo_ctl.buffer_gain = buffer_gain
        self.atten_ctl.attenuation = round(attenuation * 4)/4
        return actual_pwr

    def power_range(self, freq):
        """Return the range of output signal powers at the specified
        frequency.

        :param freq: The output signal frequency
        :type freq: float

        :returns: A tuple containing the min and max output power
            available at the specified frequency.
        """
        output_pwrs, _ = self._compute_output_pwr_levels(freq)
        max_pwr = output_pwrs[0]
        min_pwr = output_pwrs[-1] - pe43711.MAX_ATTENUATION
        return min_pwr, max_pwr

    @property
    def enabled(self) -> bool:
        """Enable/disable the on screen PLO channel UI components.
        """
        if self._group_box:
            return self._group_box.isEnabled()
        else:
            return False

    @enabled.setter
    def enabled(self, en: bool) -> None:
        if self._group_box:
            self._group_box.setEnabled(en)

    @property
    def plo_ctl(self) -> HMC833Controller:
        """Return a reference to the channel's PLO controller.
        """
        return self._plo_controller

    @property
    def atten_ctl(self) -> PE43711Controller:
        """Return a reference to the channel's step attenuator
        controller.
        """
        return self._atten_controller

    @property
    def detector_ctl(self) -> Optional[PwrDetectorController]:
        """Return a reference to the channel's power detector controller.
        """
        return self._det_controller

    @property
    def coupler(self) -> BridgeCoupler:
        """Return a reference to the channel's bridge coupler instance.
        """
        return self._coupler

    @property
    def level(self) -> float:
        """The current channel output power (in dBm).
        """
        if self._cal_data is None:
            raise CalibrationDataException
        return self._level

    @level.setter
    def level(self, lvl: float) -> None:
        """Set the current channel output level (in dBm).

        :param lvl: The desired output power in dBm
        :type lvl: float
        """
        if self._cal_data is None:
            raise CalibrationDataException
        if self._level != lvl:
            actual_lvl = self._compute_channel_settings(lvl, self.plo_ctl.freq)
            if self._level != actual_lvl:
                self._level = actual_lvl
                if self._app._nogui is False:
                    self.update_chan_level(actual_lvl)
            else:
                self._level = actual_lvl
            self.level_changed.emit(actual_lvl)

    def set_level(self, lvl: float) -> None:
        self.level = lvl

    @pyqtSlot(float)
    def update_chan_level(self, lvl: float) -> None:
        """Update the displayed channel output power.
        """
        self._plo_module._level_box.setValue(lvl)

    def set_max_gains(self, state):
        if state == Qt.Checked:
            self._max_gains = self._linear_gains
        else:
            self._max_gains = self._full_gains
        self._update_channel_settings()

    def initialize_hw(self, ser: serial.Serial):
        """Initialize the HMC833 and PE43711 board hardware.
        """
        self.plo_ctl.initialize(ser)
        self.atten_ctl.initialize(ser)

    def initialize_ui(self):
        self._plo_module.initialize_ui()

    def configure(self) -> bool:
        """Configure currently set frequency and level values in channel hardware.
        """
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            lock_status = self.plo_ctl.configure(ser)
            self.atten_ctl.configure(ser)
        return lock_status

    def set_calibration_mode(self, mode: bool) -> None:
        """Set the channel calibration mode.

        :param mode: True to set the channel to calibration mode,
            False to set the channel for normal operation.
        :type mode: bool

        Setting the calibration mode to True will disconnect the
        HMC833 controller ``freq_changed`` signal and thus prevent
        the ``_update_channel_settings`` method being invoked.
        This in turn prevents any change to the channel attenuation.
        """
        if mode is True:
            self.plo_ctl.freq_changed.disconnect(
                self._update_channel_settings)
        else:
            self.plo_ctl.freq_changed.connect(
                self._update_channel_settings)

    def gain_linearity_layout(self):
        vbox = QVBoxLayout()
        fbox = QFormLayout()
        hbox = QHBoxLayout()

        self._linearity_box = QCheckBox("")
        self._linearity_box.setChecked(False)
        self._linearity_box.stateChanged.connect(self.set_max_gains)
        fbox.addRow(QLabel("Optimize Linearity:"), self._linearity_box)

        hbox.addLayout(fbox)
        hbox.addStretch()
        vbox.addLayout(hbox)
        return vbox

    def create_channel_group(self):
        """Build the on-screen UI for the RF channel.
        """
        self._group_box = QGroupBox(self._chan_id)
        vbox = QVBoxLayout()
        if self._cal_data is None:
            vbox.addLayout(self._plo_module.create_ploconfig_layout(
                has_level = False,
                has_configure = False))
            vbox.addLayout(self._atten_module.create_attenuator_layout(
                has_configure = False))
        else:
            vbox.addLayout(self._plo_module.create_ploconfig_layout(
                has_level = True,
                level_delegate = self,
                has_configure = False,
                has_divider_gain = False))
            self._plo_module.enable_buffer_gain(False)
            self._plo_module._freq_box.setMinimum(PLOChan.MIN_FREQUENCY)
            self._plo_module._level_box.setMinimum(PLOChan.MIN_LEVEL)
            self.level_changed.connect(self.update_chan_level)
            self.plo_ctl.freq_changed.connect(
                 self._update_channel_settings)
            vbox.addLayout(self._atten_module.create_attenuator_layout(
                has_configure = False))
            self._atten_module._atten_box.setEnabled(False)
            self._update_channel_settings()
            vbox.addLayout(self.gain_linearity_layout())
        hbox = QHBoxLayout()
        hbox.addStretch(1)
        self._config_btn = QPushButton("Configure")
        self._config_btn.clicked.connect(self.configure)
        hbox.addWidget(self._config_btn)
        vbox.addLayout(hbox)
        self._group_box.setLayout(vbox)
        return self._group_box

5 rfgen-service

class RFGenService(rpyc.Service):

    OUTPUT_PWR_DELTA_THRESHOLD = 0.3
    OUTPUT_PWR_MAX_ADJUST_ATTEMPTS = 5

    def __init__(self, app):
        super().__init__()
        self._app = app

    def initialize(self) -> None:
        """Initialize the service app and associated hardware modules.
        """
        if not self._app._nogui:
            self._app.initialize_ui()
        self._app.initialize_hw()
        for chan in self.channels.values():
            chan.configure()
            sleep(1.0)
        self._app.enable_rfgen()

    def initialize_detector(self, chan_id: str) -> bool:
        """Initialize the power detector connected to the specified channel.

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str

        :return: True if there is a detector connected, False if no
            detector connected.
        """
        chan = self._app.channels[chan_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            try:
                chan.detector_ctl.initialize(ser)
                return True
            except DetectorReadError:
                return False

    @property
    def channels(self) -> Dict[str, PLOChan]:
        """Return a dictionary of the signal generator channel objects.

        :return: A dictionary of ``PLOChan`` instances keyed using the
        signal generator channel id ("Chan 1" and "Chan 2").

        >>> rfgen = rpyc.connect('127.0.0.1', 18864)
        >>> rfgen.root.initialize()
        >>> chans = rfgen.root.channels

        >>> chan_id = 'Chan 1'
        >>> chans[chan_id].plo_ctl.freq = 1500.0
        >>> rfgen.root.configure_plo_freq(chan_id)

        """
        return self._app.channels

    def configure_plo_freq(self, chan_id: str) -> None:
        """Update the PLO module output frequency for the specified channel.

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str

        >>> rfgen = rpyc.connect('127.0.0.1', 18864)
        >>> rfgen.root.initialize()
        >>> chans = rfgen.root.channels

        >>> chan_id = 'Chan 1'
        >>> chans[chan_id].plo_ctl.freq = 1500.0
        >>> rfgen.root.configure_plo_freq(chan_id)

        """
        chan = self._app.channels[chan_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            chan.plo_ctl.configure_freq(ser)

    def configure_plo(self, chan_id: str) -> None:
        """Update the PLO module hardware for the specified channel.

        Note that this should only be used when the generator is
        in calibration mode.

        .. seealso:: :meth:`.RFGenService.set_calibration_mode` and

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str

        >>> rfgen = rpyc.connect('127.0.0.1', 18864)
        >>> rfgen.root.initialize()
        >>> chans = rfgen.root.channels

        >>> chan_id = 'Chan 1'
        >>> chans[chan_id].plo_ctl.buffer_gain = \
        ...     hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB
        >>> rfgen.root.configure_plo(chan_id)

        """
        chan = self._app.channels[chan_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            chan.plo_ctl.configure(ser)

    def configure_atten(self, chan_id: str) -> None:
        """Update step attenuator module hardware for the specified channel.

        Note that this should only be used when the generator is
        in calibration mode.

        .. seealso:: :meth:`.RFGenService.set_calibration_mode` and

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str

        >>> rfgen = rpyc.connect('127.0.0.1', 18864)
        >>> rfgen.root.initialize()
        >>> chans = rfgen.root.channels

        >>> chan_id = 'Chan 2'
        >>> chans[chan_id].atten_ctl.attenuation = 10.0
        >>> rfgen.root.configure_atten(chan_id)

        """
        chan = self._app.channels[chan_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            chan.atten_ctl.configure(ser)

    def detector(self, chan_id: str) -> Optional[PwrDetectorController]:
        """Return the controller associated with the power detector
        connected to the specified channel.

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str

        :return: An instance of PwrDetectorController if there is a
            detector connected, None if no detector connected.
        """
        chan = self._app.channels[chan_id]
        return chan.detector_ctl

    def measure_power(self, chan_id: str,
                      freq: Optional[float] = None) -> float:
        """Measure the signal power currently at the input of the power detector
        currently attached to the specified channel.

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str
        :param freq: The input signal frequency (in MHz)
        :type freq: Optional[float]

        :return: The measured signal power in dBm.

        :raises: KeyError
            If there is no detector currently connected to the specified
            channel.
        """
        chan = self._app.channels[chan_id]
        if chan.detector_ctl is None:
            raise KeyError(f"No detector attached to '{chan_id}'")
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            if freq is None:
                chan.detector_ctl.freq = chan.plo_ctl.freq
            else:
                chan.detector_ctl.freq = freq
            chan.detector_ctl.measure(ser)
            return chan.detector_ctl.pwr

    def set_channel_output(self, chan_id, freq, lvl, ignore_cal_state=False):
        """Set output frequency and power of the specified channel.

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str
        :param freq: The output signal frequency (in MHz)
        :type freq: float
        :param lvl: The output signal power (in dBm)
        :type lvl: float
        :param ignore_cal_state: If this is set to False (the default value)
            an InvalidCalibrationDataError will be raised if the
            power detector connected to the associated channel monitoring
            output is not calibrated.  If set to True, the calibration
            state of the detector is ignored.
        :type ignore_cal_state: bool

        :return: True if the signal output was successfully set,
            False otherwise.

        :raises: KeyError
            If there is no detector currently connected to the specified
            channel.
        :raises: InvalidCalibrationDataError
            If the detector is not calibrated and the ignore_cal_status
            parameter is set to False.
        """
        chan = self._app.channels[chan_id]
        if chan.detector_ctl is None:
            raise KeyError(f"No detector attached to '{chan_id}'")
        if ignore_cal_state is False:
            if chan.detector_ctl.cal_data is None:
                raise InvalidCalibrationDataError(
                    f"Detector on '{chan_id}' is not calibrated")
        min_pwr, max_pwr = chan.power_range(freq)
        if lvl < min_pwr:
            p_des = min_pwr
        elif lvl > max_pwr:
            p_des = max_pwr
        else:
            p_des = lvl
        attempts = 0
        while attempts < RFGenService.OUTPUT_PWR_MAX_ADJUST_ATTEMPTS:
            delta_p = p_des - (self.measure_power(chan_id, freq)
                               - chan.coupler.coupling(freq)
                               + chan.coupler.insertion_loss(freq))
            attempts += 1
            print(f'{attempts=}, {delta_p=}')
            if abs(delta_p) > RFGenService.OUTPUT_PWR_DELTA_THRESHOLD:
                chan.level = chan.level + delta_p
                chan.plo_ctl.freq = freq
                chan.configure()
            else:
                break
        if attempts < RFGenService.OUTPUT_PWR_MAX_ADJUST_ATTEMPTS:
            return True
        else:
            return False

    def configure(self, chan_id: str) -> None:
        """Update the channel hardware - PLO and attenuator.

        :param chan_id: A signal generator channel id.
            This will be one of "Chan 1" or "Chan 2".
        :type chan_id: str
        """
        chan = self._app.channels[chan_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            chan.plo_ctl.configure(ser)
            chan.atten_ctl.configure(ser)

    def set_calibration_mode(self, mode: bool) -> None:
        """Set the calibration mode.

        :param mode: True to set the app to calibration mode,
            False to set the app for normal operation.
        :type mode: bool

        Setting the calibration mode to True will disconnect the
        HMC833 controller ``freq_changed`` signal for each signal
        generator channel.  This then prevents the ``_update_channel_settings``
        method being invoked which in turn prevents any change to
        the channel attenuation.

        >>> rfgen = rpyc.connect('127.0.0.1', 18864)
        >>> rfgen.root.initialize()
        >>> rfgen.root.set_calibration_mode(True)

        """
        for chan in self._app.channels.values():
            chan.set_calibration_mode(mode)
        if mode is True:
            self._app.cal_lbl.setText('Calibrating')
        else:
            self._app.cal_lbl.setText('')


class RPyCServer(QObject):

    finished = pyqtSignal()

    def __init__(self, serviceInst, host, port):
        super().__init__()
        self._serviceInst = serviceInst
        self._host = host
        self._port = port

    def run(self):
        print("RFGen rpyc service on {}:{}".format(self._host, self._port))
        self._server = ThreadedServer(
            self._serviceInst,
            hostname = self._host,
            port = self._port,
            protocol_config = {
                'allow_all_attrs': True,
                'allow_setattr': True,
                'allow_pickle': True})
        self._server.start()
        self.finished.emit()

6 Imports

from typing import (
    List, Dict, Optional
)

import sys
from time import sleep
import signal
from argparse import ArgumentParser
import asyncio
import serial
from serial.tools.list_ports import comports
from scipy import interpolate

from rfblocks import (
    hmc833, HMC833Controller, pe43711, PE43711Controller,
    LogDetector, PwrDetectorController,
    InvalidCalibrationDataError, DetectorReadError,
    create_serial, write_cmd, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL
)

from qtrfblocks import (
    HMC833Module, StepAttenuator
)

from qasync import (
    QEventLoop, QThreadExecutor, asyncSlot, asyncClose
)

from PyQt5.QtWidgets import (
    QApplication, QWidget, QComboBox, QGroupBox, QHBoxLayout,
    QVBoxLayout, QPushButton, QFileDialog, QErrorMessage, QLabel,
    QMenu, QLineEdit, QFormLayout, QCheckBox
)

from PyQt5.QtCore import (
    Qt, QCoreApplication, QObject, QThread, pyqtSignal, pyqtSlot
)

import rpyc
from rpyc.utils.server import ThreadedServer

from plochan import PLOChan, CalibrationDataException
from rfgen_cal_data import channel_cal_data, max_linear_gains