Open RF Prototyping

clkgen.py

This document describes an app for controlling clock generator reference design. When executed, the script displays a GUI window which allows the clock generator reference design to be controlled via it's USB connection.

Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.

The Script

The script is structured as a single instance of the ClkApp class. The ClkApp class includes instances of the ClkModule class which represent the clock generator hardware boards. The ClkModule class models the clock generator boards as instances of ClkChannel which contain configuration for the clock generator outputs.

Each class retains responsibility for the on-screen layout of UI controls for setting the configuration of the underlying hardware.

# Generated from clkgen.org
#

<<imports>>

<<msg-dialog>>

<<clockgen-app-class>>

<<clkgen-service>>

<<main-func>>

if __name__ == '__main__':
    main()

The main Function

The asyncio framework is used to coordinate communications over the serial device without blocking user interface events. For this purpose we make use of QEventLoop and QThreadExecutor from the asyncqt package.

The event loop is equipped with an instance of QThreadExecutor which executes a single thread. The idea here is to ensure that requests to the ECApp serial device will execute one by one in a manner which will not block the operation of the Qt event loop.

def main():
    global server_thread

    <<process-cmdline-args>>

    # This ensures that Cntl-C will work as expected:
    signal.signal(signal.SIGINT, signal.SIG_DFL)

    if args.nogui is True:
        app = QCoreApplication(sys.argv)
    else:
        app = QApplication(sys.argv)
    loop = QEventLoop(app)
    loop.set_default_executor(QThreadExecutor(1))
    asyncio.set_event_loop(loop)

    clkgen_app = ClkApp(serial_device=args.device,
                        baudrate=args.baudrate,
                        headless=args.nogui)
    if args.dumphw:
        for mod_id, controller in clkgen_app.controllers.items():
            print(f'{mod_id}: {vars(controller._ad9552)}')
        sys.exit(0)

    if not args.nogui:
        clkgen_app.show()

    server_thread = QThread()
    server = RPyCServer(ClkgenService(clkgen_app),
                        args.ipaddr,
                        args.port)
    server.moveToThread(server_thread)
    server_thread.started.connect(server.run)
    server.finished.connect(server_thread.quit)
    server.finished.connect(server.deleteLater)
    server_thread.finished.connect(server_thread.deleteLater)
    server_thread.start()

    sys.exit(app.exec_())

process-cmdline-args

Command line arguments allow the specification of a serial device and baud rate for the purposes of testing.

defaultBaud = 0

parser = ArgumentParser(description=
                        '''A clock generator.''')

parser.add_argument("--nogui", action='store_true',
                    help="Disable GUI and run 'headless'")
parser.add_argument("-d", "--device", default=None,
                    help="The hardware serial device")
parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int,
                    help="Baud rate (default: {})".format(defaultBaud))
parser.add_argument("-A", "--ipaddr", default="127.0.0.1",
                    help="IP address for to bind the RPyC server instance")
parser.add_argument("-P", "--port", default=18861, type=int,
                    help="TCP port for the RPyC server instance")
parser.add_argument("-H", "--dumphw", action='store_true',
                    help="Dump device hardware config to stdout and exit")
args = parser.parse_args()

clockgen-app-class

The class which encapsulates the overall application and builds the required on-screen controls.

class ClkApp(QObject):

    DEFAULT_CLKOUT_FREQ = 250.0

    <<clkapp-config>>

    def __init__(self,
                 config: Dict = DEFAULT_APP_CONFIG,
                 serial_device: Optional[str] = None,
                 baudrate: int = 0,
                 headless: bool = False) -> None:
        super().__init__()
        self._nogui: bool = headless
        if self._nogui is False:
            self._widget = QWidget()
        d: Dict = config['modules']
        devices: List[ad9552] = [
            ad9552(**hw_conf) for hw_conf in ClkApp.MODULES_HWCONF]
        cz = zip(d.keys(), devices, d.values())
        self.controllers: Dict = {
            mod_id: AD9552Controller(mod_id, dev, mod_config)
            for mod_id, dev, mod_config in cz }
        self._ctl_device: Optional[str] = serial_device
        self._baudrate: int = baudrate
        if baudrate == 0:
            self._baudrate = DEFAULT_BAUDRATE

        if not headless:
            mz = zip(d.keys(), self.controllers.values(),
                     [[ch['ui'] for ch in mod['channels'].values()] \
                      for mod in d.values()])
            self._modules = {mod_id: ClkModule(self, mod_id, ctl, *has_ui)
                             for mod_id, ctl, has_ui in mz}
            self.build_ui()

    <<build-app-ui>>

    <<initialize>>

    <<app-configuration>>

    @property
    def ctl_device(self) -> Optional[str]:
        return self._ctl_device

    @property
    def serial_ports(self) -> List:
        ports = [DEFAULT_SOCKET_URL]
        ports.extend([port.device for port in comports() if port.serial_number])
        if self.ctl_device:
            if self.ctl_device in ports:
                ports.pop(ports.index(self.ctl_device))
            ports.insert(0, self.ctl_device)
        return ports

    @property
    def baudrate(self) -> int:
        return self._baudrate

    @asyncSlot(QAbstractButton)
    async def set_refsrc(self, btn) -> None:
        """Set the reference source for the generator.

        The reference source is controlled via the `Ref` pin
        on clock module 1.
        """
        clk_ctl = self.controllers[ClkApp.CLKMODULE1_NAME]
        if btn.src == clk_ctl.refsrc:
            return
        clk_ctl.refsrc = btn.src
        try:
            cmd = clk_ctl._ad9552.config_refsrc()
            if len(cmd):
                loop = asyncio.get_event_loop()
                with create_serial(self.ctl_device,
                                   self.baudrate) as ser:
                    await loop.run_in_executor(
                        None, write_cmd, ser, cmd)
        except serial.serialutil.SerialException as se:
            error_dialog = QErrorMessage(self)
            error_dialog.showMessage(str(se))

    def tty_changed(self, combo: QComboBox, idx: int) -> None:
        self._ctl_device = combo.itemText(idx)

    def disable_ui(self) -> None:
        for module in self._modules.values():
            module.enabled = False
        for btn in self._refsrc_btns:
            btn.setEnabled(False)

    def enable_ui(self) -> None:
        for module in self._modules.values():
            module.enabled = True
        for btn in self._refsrc_btns:
            btn.setEnabled(True)

clkapp-config

The clock modules are ClkMod1 and ClkMod2. Each is configured with an input reference frequency of 10 MHz. ClkMod1 is the clock module which may optionally use the external reference via the modified IF amplifier module U1. The D1 controller pin is used to control the reference source using the Ref pin of ClkMod1:

Table 1: Clock generator reference source control.

ClkMod1 Ref

Reference Source

0

External

1

Internal

Note that the clock generator module Ref pin is pulled high by 10K resistor and therefore the default clock reference will be the on-board 10 MHz source.

The default channel configuration is:

Table 2: Default clock channel configuration

Channel state (state)

POWERED_DOWN

Output frequency

250 MHz

Output mode (mode)

LVPECL

Drive strength (drive)

STRONG

CMOS polarity (polarity)

DIFF_POS

Channel source (source)

PLL

The reference channel (output 2 of clkMod1 in this design) should always be configured as ACTIVE and with it's source control set to REF. In this way there will always be a signal present of the reference input of clkMod2.

CLKMODULE1_NAME: str = 'clkmod1'
CLKMODULE2_NAME: str = 'clkmod2'

CLKMODULE1_HWCONF: Dict = {'cs': 'D0', 'lockdetect': 'C4', 'reset': None,
                           'fref': 10.0, 'refselect': 'D1'}
CLKMODULE2_HWCONF: Dict = {'cs': 'D3', 'lockdetect': 'C5', 'reset': None,
                           'fref': 10.0}
MODULES_HWCONF: List[Dict] = [ CLKMODULE1_HWCONF, CLKMODULE2_HWCONF ]

DEFAULT_CHAN_MODE: ad9552.OutputMode = ad9552.OutputMode.LVPECL
DEFAULT_CHAN_STATE: ad9552.OutputState = ad9552.OutputState.POWERED_DOWN
DEFAULT_CHAN_DRIVE: ad9552.DriveStrength = ad9552.DriveStrength.STRONG
DEFAULT_CHAN_POLARIY: ad9552.CmosPolarity = ad9552.CmosPolarity.DIFF_POS
DEFAULT_CHAN_SOURCE: ad9552.SourceControl = ad9552.SourceControl.PLL

DEFAULT_CHAN_CONFIG: Dict = {
    'ui': True,
    'mode': DEFAULT_CHAN_MODE,
    'state': DEFAULT_CHAN_STATE,
    'drive': DEFAULT_CHAN_DRIVE,
    'polarity': DEFAULT_CHAN_POLARIY,
    'source': DEFAULT_CHAN_SOURCE
}
# Only the negative output of the reference channel is
# enabled.
REF_CHAN_CONFIG: Dict = {
    'ui': False,
    'mode': ad9552.OutputMode.CMOS_NEG_ACTIVE,
    'state': ad9552.OutputState.ACTIVE,
    'drive': DEFAULT_CHAN_DRIVE,
    'polarity': DEFAULT_CHAN_POLARIY,
    'source': ad9552.SourceControl.REF
}
DEFAULT_MODULE_CONFIG: Dict = {
    'freq': DEFAULT_CLKOUT_FREQ,
    'channels': {
        '1': { 'label': '3', **DEFAULT_CHAN_CONFIG },
        '2': { 'label': '2', **DEFAULT_CHAN_CONFIG }
    }
}
REF_MODULE_CONFIG: Dict = {
    'freq': DEFAULT_CLKOUT_FREQ,
    'channels': {
        '1': { 'label': '1', **DEFAULT_CHAN_CONFIG },
        '2': { 'label': '', **REF_CHAN_CONFIG }
    }
}

DEFAULT_APP_CONFIG: Dict = {
    'modules': {
        CLKMODULE1_NAME: REF_MODULE_CONFIG,
        CLKMODULE2_NAME: DEFAULT_MODULE_CONFIG
    },
    'refsrc': ad9552.ReferenceSource.INTERNAL
}

The outputs of channel 1 of ClkMod1 are made available as Clk1+ and Clk1- on the rear panel of the clock generator enclosure. Channel 2 of ClkMod1 is configured to pass through the input reference. One of the channel 2 outputs from ClkMod1 connects to the external reference input of the ClkMod2 clock module. The Ref pin of ClkMod2 is tied to ground so that the reference clock is always sourced from ClkMod1.

The other channel 2 output from ClkMod1 is powered down.

build-app-ui

def build_ui(self):
    """Build the on-screen UI for the clock generator app."""

    vbox = QVBoxLayout()

    hbox = QHBoxLayout()
    tty_combo = QComboBox()
    tty_combo.currentIndexChanged.connect(
        lambda idx, w=tty_combo: self.tty_changed(w, idx))
    tty_combo.addItems(self.serial_ports)
    line_edit = QLineEdit()
    tty_combo.setLineEdit(line_edit)
    hbox.addWidget(QLabel("Control Port:"))
    hbox.addWidget(tty_combo)
    hbox.addStretch(1)
    configuration_btn = QPushButton("Configuration")
    config_menu = QMenu()
    config_menu.addAction('Save', self.save_config)
    config_menu.addAction('Load', self.open_config)
    configuration_btn.setMenu(config_menu)
    hbox.addWidget(configuration_btn)
    initialize_btn = QPushButton("Initialize")
    initialize_btn.clicked.connect(self.initialize)
    hbox.addWidget(initialize_btn)

    vbox.addLayout(hbox)

    hbox2 = QHBoxLayout()
    hbox2.addWidget(QLabel("Ref. Source:"))
    self.refsrcGroup = QButtonGroup(hbox2)
    rb = QRadioButton("Internal")
    self.refsrcGroup.addButton(rb)
    rb.src = ad9552.ReferenceSource.INTERNAL
    rb.setChecked(True)
    # rb.toggled.connect(lambda state, w=rb: self.set_refsrc(w.src))
    hbox2.addWidget(rb)
    rb2 = QRadioButton("External (10MHz)")
    self.refsrcGroup.addButton(rb2)
    rb2.src = ad9552.ReferenceSource.EXTERNAL
    self.refsrcGroup.buttonClicked.connect(self.set_refsrc)
    # rb2.toggled.connect(lambda state, w=rb2: self.set_refsrc(w.src))
    hbox2.addWidget(rb2)
    self._refsrc_btns = [rb, rb2]
    hbox2.addStretch(1)
    vbox.addLayout(hbox2)

    hbox3 = QHBoxLayout()
    hbox3.addWidget(self._modules[ClkApp.CLKMODULE1_NAME].build_ui())
    hbox3.addWidget(self._modules[ClkApp.CLKMODULE2_NAME].build_ui())

    vbox.addLayout(hbox3)

    self._widget.setLayout(vbox)
    self.disable_ui()

    self._widget.setGeometry(300, 300, 700, 150)
    self._widget.setWindowTitle('Clock Generator')

def show(self):
    self._widget.show()

app-configuration

def save_config(self) -> None:
    config = self.dump_config()
    filter_str = "JSON Configuration (*.json)"
    def_filepath = '.'
    name, selfilter = QFileDialog.getSaveFileName(self, 'Save configuration',
                                                  def_filepath, filter_str)
    output_file = str(name).strip()
    if len(output_file) == 0:
        return
    with open(output_file, 'w') as fd:
        json.dump(config, fd)

def open_config(self) -> None:
    filter_str = "JSON Configuration (*.json)"
    def_filepath = '.'
    names, selfilter = QFileDialog.getOpenFileNames(self, 'Load Configuration',
                                                    def_filepath, filter_str)
    if len(names):
        input_file = names[0]
        with open(input_file, 'r') as fd:
            config = json.load(fd)
        self.load_config(config)

def dump_config(self) -> Dict:
    mod1 = self._modules[ClkApp.CLKMODULE1_NAME]
    mod2 = self._modules[ClkApp.CLKMODULE2_NAME]
    config = {
        'refsrc':               mod1.refsrc,
        ClkApp.CLKMODULE1_NAME: mod1.dump_config(),
        ClkApp.CLKMODULE2_NAME: mod2.dump_config() }
    return config

def load_config(self, config: Dict) -> None:
    mod1 = self._modules[ClkApp.CLKMODULE1_NAME]
    mod2 = self._modules[ClkApp.CLKMODULE2_NAME]
    mod1.load_config(config[ClkApp.CLKMODULE1_NAME])
    mod2.load_config(config[ClkApp.CLKMODULE2_NAME])
    mod1.refsrc = config['refsrc']
    mod1.configure_ui(config[ClkApp.CLKMODULE1_NAME])
    mod2.configure_ui(config[ClkApp.CLKMODULE2_NAME])
    with create_serial(self.ctl_device, self.baudrate) as ser:
        mod1.configure_hw(ser)
        mod2.configure_hw(ser)
    self.enable_ui()

initialize

@asyncSlot()
async def initialize(self) -> None:
    try:
        self.initialize_ui()
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.initialize_modules)
    except serial.serialutil.SerialException as se:
        error_dialog = QErrorMessage(self)
        error_dialog.showMessage(str(se))
    finally:
        self.enable_ui()

def initialize_ui(self) -> None:
    for module in self._modules.values():
        module.initialize_ui()

def initialize_modules(self) -> None:
    with create_serial(self.ctl_device, self.baudrate) as ser:
        for module in self._modules.values():
            module.initialize_hw(ser)

clkgen-service

The Python remote procedure call framework RPyC is used to create a network service which makes the functionality of the clkgen app available to other clients.

The ClkgenService implements the functionality which is accessed via RPyC.

class ClkgenService(rpyc.Service):

    def __init__(self, app):
        super().__init__()
        self._app = app

    def initialize(self) -> None:
        """Initialize the clock generator hardware and software.

        >>> import rpyc
        >>> clkgen = rpyc.connect("127.0.0.1", 18861)
        >>> clkgen.root.initialize()

        """
        if not self._app._nogui:
            self._app.initialize_ui()
        self._app.initialize_modules()
        if not self._app._nogui:
            self._app.enable_ui()

    @property
    def controllers(self) -> Dict[str, AD9552Controller]:
        """A dictionary containing the clock generator clock module controllers.

        The controllers are instances of AD9552Controller keyed using
        the clock module identifiers 'clkmod1' and 'clkmod2' respectively.

        >>> import rpyc
        >>> clkgen = rpyc.connect("127.0.0.1", 18861)
        >>> clkgen.root.initialize()
        >>> clkmod1 = clkgen.root.controllers['clkmod1']
        >>> clkmod2 = clkgen.root.controllers['clkmod2']

        """
        return self._app.controllers

    def configure(self, ctl_id: str) -> bool:
        """Update the clock module hardware using the currently set configuration.

        :param ctl_id: A module id.  This will be either 'clkmod1' or 'clkmod2'.
        :type ctl_id: str

        Returns a boolean value indicating the clock module PLL lock status.
        True for locked.

        >>> import rpyc
        >>> clkgen = rpyc.connect("127.0.0.1", 18861)
        >>> clkgen.root.initialize()
        >>> clkmod1 = clkgen.root.controllers['clkmod1']
        >>> clkmod1.freq = 150.0
        >>> chan1 = clkmod1.channels['1']
        >>> chan1.state = ad9552.OutputState.ACTIVE
        >>> pll_lock = clkgen.root.configure('clkmod1')

        """
        ctl = self._app.controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            pll_lock = ctl.configure(ser)
        return pll_lock

    def configure_freq(self, ctl_id: str) -> bool:
        """Update the clock module hardware with the currently set
        output frequency.

        :param ctl_id: A module id.  This will be either 'clkmod1' or 'clkmod2'.
        :type ctl_id: str

        Returns a boolean value indicating the clock module PLL lock status.
        True for locked.

        >>> import rpyc
        >>> clkgen = rpyc.connect("127.0.0.1", 18861)
        >>> clkgen.root.initialize()
        >>> clkmod1 = clkgen.root.controllers['clkmod1']
        >>> clkmod1.freq = 150.0
        >>> clkgen.root.configure_freq('clkmod1')

        """
        ctl = self._app.controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            pll_lock = ctl.configure_freq(ser)
        return pll_lock


class RPyCServer(QObject):

    finished = pyqtSignal()

    def __init__(self, serviceInst, host, port):
        super().__init__()
        self._serviceInst = serviceInst
        self._host = host
        self._port = port

    def run(self):
        print("ClockGen rpyc service on {}:{}".format(self._host, self._port))
        self._server = ThreadedServer(
            self._serviceInst,
            hostname = self._host,
            port = self._port,
            protocol_config = {
                'allow_all_attrs': True,
                'allow_setattr': True,
                'allow_getattr': True})
        self._server.start()
        self.finished.emit()

msg-dialog

def popMsgDialog(icon, title, msg, info=''):
    """
    """
    msgBox = QMessageBox()
    msgBox.setIcon(QMessageBox.NoIcon)
    msgBox.setWindowTitle(title)
    msgBox.setText(msg)
    if len(info):
        msgBox.setInformativeText(info)
    msgBox.setStandardButtons(QMessageBox.Ok)
    status = msgBox.exec_()

Imports

from typing import (
    Optional, List, Dict
)
import sys
import signal
import asyncio
from argparse import ArgumentParser
import serial
import json
from serial.tools.list_ports import comports

from rfblocks import (
    ad9552, AD9552Controller,
    DividerRangeException, list_available_serial_ports,
    create_serial, write_cmd, query_cmd,
    DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL
)

from qtrfblocks import (
    ClkModule, ClkChannel
)

from qasync import (
    QEventLoop, QThreadExecutor, asyncSlot, asyncClose
)

from PyQt5.QtWidgets import (
    QWidget, QLabel, QAbstractSpinBox, QDoubleSpinBox, QVBoxLayout,
    QLineEdit, QHBoxLayout, QGroupBox, QMainWindow, QComboBox,
    QCheckBox, QPushButton, QRadioButton, QButtonGroup, QMessageBox,
    QFormLayout, QErrorMessage, QApplication, QScrollArea,
    QMenu, QFileDialog, QAbstractButton
)

from PyQt5.QtCore import (
    Qt, QCoreApplication, QObject, QThread, pyqtSignal, pyqtSlot
)

import rpyc
from rpyc.utils.server import ThreadedServer