clkgen.py
This document describes an app for controlling clock generator reference design. When executed, the script displays a GUI window which allows the clock generator reference design to be controlled via it's USB connection.
Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.
The Script
The script is structured as a single instance of the ClkApp
class.
The ClkApp
class includes instances of the ClkModule class which
represent the clock generator hardware boards. The ClkModule
class
models the clock generator boards as instances of ClkChannel which
contain configuration for the clock generator outputs.
Each class retains responsibility for the on-screen layout of UI controls for setting the configuration of the underlying hardware.
The main
Function
The asyncio
framework is used to coordinate communications over the
serial device without blocking user interface events. For this
purpose we make use of QEventLoop
and QThreadExecutor
from the
asyncqt package.
The event loop is equipped with an instance of QThreadExecutor
which
executes a single thread. The idea here is to ensure that requests to
the ECApp serial device will execute one by one in a manner which will
not block the operation of the Qt event loop.
def main(): global server_thread <<process-cmdline-args>> # This ensures that Cntl-C will work as expected: signal.signal(signal.SIGINT, signal.SIG_DFL) if args.nogui is True: app = QCoreApplication(sys.argv) else: app = QApplication(sys.argv) loop = QEventLoop(app) loop.set_default_executor(QThreadExecutor(1)) asyncio.set_event_loop(loop) clkgen_app = ClkApp(serial_device=args.device, baudrate=args.baudrate, headless=args.nogui) if args.dumphw: for mod_id, controller in clkgen_app.controllers.items(): print(f'{mod_id}: {vars(controller._ad9552)}') sys.exit(0) if not args.nogui: clkgen_app.show() server_thread = QThread() server = RPyCServer(ClkgenService(clkgen_app), args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() sys.exit(app.exec_())
process-cmdline-args
Command line arguments allow the specification of a serial device and baud rate for the purposes of testing.
defaultBaud = 0 parser = ArgumentParser(description= '''A clock generator.''') parser.add_argument("--nogui", action='store_true', help="Disable GUI and run 'headless'") parser.add_argument("-d", "--device", default=None, help="The hardware serial device") parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int, help="Baud rate (default: {})".format(defaultBaud)) parser.add_argument("-A", "--ipaddr", default="127.0.0.1", help="IP address for to bind the RPyC server instance") parser.add_argument("-P", "--port", default=18861, type=int, help="TCP port for the RPyC server instance") parser.add_argument("-H", "--dumphw", action='store_true', help="Dump device hardware config to stdout and exit") args = parser.parse_args()
clockgen-app-class
The class which encapsulates the overall application and builds the required on-screen controls.
class ClkApp(QObject): DEFAULT_CLKOUT_FREQ = 250.0 <<clkapp-config>> def __init__(self, config: Dict = DEFAULT_APP_CONFIG, serial_device: Optional[str] = None, baudrate: int = 0, headless: bool = False) -> None: super().__init__() self._nogui: bool = headless if self._nogui is False: self._widget = QWidget() d: Dict = config['modules'] devices: List[ad9552] = [ ad9552(**hw_conf) for hw_conf in ClkApp.MODULES_HWCONF] cz = zip(d.keys(), devices, d.values()) self.controllers: Dict = { mod_id: AD9552Controller(mod_id, dev, mod_config) for mod_id, dev, mod_config in cz } self._ctl_device: Optional[str] = serial_device self._baudrate: int = baudrate if baudrate == 0: self._baudrate = DEFAULT_BAUDRATE if not headless: mz = zip(d.keys(), self.controllers.values(), [[ch['ui'] for ch in mod['channels'].values()] \ for mod in d.values()]) self._modules = {mod_id: ClkModule(self, mod_id, ctl, *has_ui) for mod_id, ctl, has_ui in mz} self.build_ui() <<build-app-ui>> <<initialize>> <<app-configuration>> @property def ctl_device(self) -> Optional[str]: return self._ctl_device @property def serial_ports(self) -> List: ports = [DEFAULT_SOCKET_URL] ports.extend([port.device for port in comports() if port.serial_number]) if self.ctl_device: if self.ctl_device in ports: ports.pop(ports.index(self.ctl_device)) ports.insert(0, self.ctl_device) return ports @property def baudrate(self) -> int: return self._baudrate @asyncSlot(QAbstractButton) async def set_refsrc(self, btn) -> None: """Set the reference source for the generator. The reference source is controlled via the `Ref` pin on clock module 1. """ clk_ctl = self.controllers[ClkApp.CLKMODULE1_NAME] if btn.src == clk_ctl.refsrc: return clk_ctl.refsrc = btn.src try: cmd = clk_ctl._ad9552.config_refsrc() if len(cmd): loop = asyncio.get_event_loop() with create_serial(self.ctl_device, self.baudrate) as ser: await loop.run_in_executor( None, write_cmd, ser, cmd) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se)) def tty_changed(self, combo: QComboBox, idx: int) -> None: self._ctl_device = combo.itemText(idx) def disable_ui(self) -> None: for module in self._modules.values(): module.enabled = False for btn in self._refsrc_btns: btn.setEnabled(False) def enable_ui(self) -> None: for module in self._modules.values(): module.enabled = True for btn in self._refsrc_btns: btn.setEnabled(True)
clkapp-config
The clock modules are ClkMod1
and ClkMod2
. Each is configured
with an input reference frequency of 10 MHz. ClkMod1
is the clock
module which may optionally use the external reference via the
modified IF amplifier module U1
. The D1
controller pin is
used to control the reference source using the Ref
pin of
ClkMod1
:
|
Reference Source |
---|---|
0 |
External |
1 |
Internal |
Note that the clock generator module Ref
pin is pulled high by 10K
resistor and therefore the default clock reference will be the
on-board 10 MHz source.
The default channel configuration is:
Channel state ( |
POWERED_DOWN |
Output frequency |
250 MHz |
Output mode ( |
LVPECL |
Drive strength ( |
STRONG |
CMOS polarity ( |
DIFF_POS |
Channel source ( |
PLL |
The reference channel (output 2 of clkMod1
in this design) should
always be configured as ACTIVE
and with it's source control set to
REF
. In this way there will always be a signal present of the
reference input of clkMod2
.
CLKMODULE1_NAME: str = 'clkmod1' CLKMODULE2_NAME: str = 'clkmod2' CLKMODULE1_HWCONF: Dict = {'cs': 'D0', 'lockdetect': 'C4', 'reset': None, 'fref': 10.0, 'refselect': 'D1'} CLKMODULE2_HWCONF: Dict = {'cs': 'D3', 'lockdetect': 'C5', 'reset': None, 'fref': 10.0} MODULES_HWCONF: List[Dict] = [ CLKMODULE1_HWCONF, CLKMODULE2_HWCONF ] DEFAULT_CHAN_MODE: ad9552.OutputMode = ad9552.OutputMode.LVPECL DEFAULT_CHAN_STATE: ad9552.OutputState = ad9552.OutputState.POWERED_DOWN DEFAULT_CHAN_DRIVE: ad9552.DriveStrength = ad9552.DriveStrength.STRONG DEFAULT_CHAN_POLARIY: ad9552.CmosPolarity = ad9552.CmosPolarity.DIFF_POS DEFAULT_CHAN_SOURCE: ad9552.SourceControl = ad9552.SourceControl.PLL DEFAULT_CHAN_CONFIG: Dict = { 'ui': True, 'mode': DEFAULT_CHAN_MODE, 'state': DEFAULT_CHAN_STATE, 'drive': DEFAULT_CHAN_DRIVE, 'polarity': DEFAULT_CHAN_POLARIY, 'source': DEFAULT_CHAN_SOURCE } # Only the negative output of the reference channel is # enabled. REF_CHAN_CONFIG: Dict = { 'ui': False, 'mode': ad9552.OutputMode.CMOS_NEG_ACTIVE, 'state': ad9552.OutputState.ACTIVE, 'drive': DEFAULT_CHAN_DRIVE, 'polarity': DEFAULT_CHAN_POLARIY, 'source': ad9552.SourceControl.REF } DEFAULT_MODULE_CONFIG: Dict = { 'freq': DEFAULT_CLKOUT_FREQ, 'channels': { '1': { 'label': '3', **DEFAULT_CHAN_CONFIG }, '2': { 'label': '2', **DEFAULT_CHAN_CONFIG } } } REF_MODULE_CONFIG: Dict = { 'freq': DEFAULT_CLKOUT_FREQ, 'channels': { '1': { 'label': '1', **DEFAULT_CHAN_CONFIG }, '2': { 'label': '', **REF_CHAN_CONFIG } } } DEFAULT_APP_CONFIG: Dict = { 'modules': { CLKMODULE1_NAME: REF_MODULE_CONFIG, CLKMODULE2_NAME: DEFAULT_MODULE_CONFIG }, 'refsrc': ad9552.ReferenceSource.INTERNAL }
The outputs of channel 1 of ClkMod1
are made available as Clk1+
and Clk1-
on the rear panel of the clock generator enclosure.
Channel 2 of ClkMod1
is configured to pass through the input
reference. One of the channel 2 outputs from ClkMod1
connects to
the external reference input of the ClkMod2
clock module. The
Ref
pin of ClkMod2
is tied to ground so that the reference clock
is always sourced from ClkMod1
.
The other channel 2 output from ClkMod1
is powered down.
build-app-ui
def build_ui(self): """Build the on-screen UI for the clock generator app.""" vbox = QVBoxLayout() hbox = QHBoxLayout() tty_combo = QComboBox() tty_combo.currentIndexChanged.connect( lambda idx, w=tty_combo: self.tty_changed(w, idx)) tty_combo.addItems(self.serial_ports) line_edit = QLineEdit() tty_combo.setLineEdit(line_edit) hbox.addWidget(QLabel("Control Port:")) hbox.addWidget(tty_combo) hbox.addStretch(1) configuration_btn = QPushButton("Configuration") config_menu = QMenu() config_menu.addAction('Save', self.save_config) config_menu.addAction('Load', self.open_config) configuration_btn.setMenu(config_menu) hbox.addWidget(configuration_btn) initialize_btn = QPushButton("Initialize") initialize_btn.clicked.connect(self.initialize) hbox.addWidget(initialize_btn) vbox.addLayout(hbox) hbox2 = QHBoxLayout() hbox2.addWidget(QLabel("Ref. Source:")) self.refsrcGroup = QButtonGroup(hbox2) rb = QRadioButton("Internal") self.refsrcGroup.addButton(rb) rb.src = ad9552.ReferenceSource.INTERNAL rb.setChecked(True) # rb.toggled.connect(lambda state, w=rb: self.set_refsrc(w.src)) hbox2.addWidget(rb) rb2 = QRadioButton("External (10MHz)") self.refsrcGroup.addButton(rb2) rb2.src = ad9552.ReferenceSource.EXTERNAL self.refsrcGroup.buttonClicked.connect(self.set_refsrc) # rb2.toggled.connect(lambda state, w=rb2: self.set_refsrc(w.src)) hbox2.addWidget(rb2) self._refsrc_btns = [rb, rb2] hbox2.addStretch(1) vbox.addLayout(hbox2) hbox3 = QHBoxLayout() hbox3.addWidget(self._modules[ClkApp.CLKMODULE1_NAME].build_ui()) hbox3.addWidget(self._modules[ClkApp.CLKMODULE2_NAME].build_ui()) vbox.addLayout(hbox3) self._widget.setLayout(vbox) self.disable_ui() self._widget.setGeometry(300, 300, 700, 150) self._widget.setWindowTitle('Clock Generator') def show(self): self._widget.show()
app-configuration
def save_config(self) -> None: config = self.dump_config() filter_str = "JSON Configuration (*.json)" def_filepath = '.' name, selfilter = QFileDialog.getSaveFileName(self, 'Save configuration', def_filepath, filter_str) output_file = str(name).strip() if len(output_file) == 0: return with open(output_file, 'w') as fd: json.dump(config, fd) def open_config(self) -> None: filter_str = "JSON Configuration (*.json)" def_filepath = '.' names, selfilter = QFileDialog.getOpenFileNames(self, 'Load Configuration', def_filepath, filter_str) if len(names): input_file = names[0] with open(input_file, 'r') as fd: config = json.load(fd) self.load_config(config) def dump_config(self) -> Dict: mod1 = self._modules[ClkApp.CLKMODULE1_NAME] mod2 = self._modules[ClkApp.CLKMODULE2_NAME] config = { 'refsrc': mod1.refsrc, ClkApp.CLKMODULE1_NAME: mod1.dump_config(), ClkApp.CLKMODULE2_NAME: mod2.dump_config() } return config def load_config(self, config: Dict) -> None: mod1 = self._modules[ClkApp.CLKMODULE1_NAME] mod2 = self._modules[ClkApp.CLKMODULE2_NAME] mod1.load_config(config[ClkApp.CLKMODULE1_NAME]) mod2.load_config(config[ClkApp.CLKMODULE2_NAME]) mod1.refsrc = config['refsrc'] mod1.configure_ui(config[ClkApp.CLKMODULE1_NAME]) mod2.configure_ui(config[ClkApp.CLKMODULE2_NAME]) with create_serial(self.ctl_device, self.baudrate) as ser: mod1.configure_hw(ser) mod2.configure_hw(ser) self.enable_ui()
initialize
@asyncSlot() async def initialize(self) -> None: try: self.initialize_ui() loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.initialize_modules) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se)) finally: self.enable_ui() def initialize_ui(self) -> None: for module in self._modules.values(): module.initialize_ui() def initialize_modules(self) -> None: with create_serial(self.ctl_device, self.baudrate) as ser: for module in self._modules.values(): module.initialize_hw(ser)
clkgen-service
The Python remote procedure call framework RPyC is used to create a
network service which makes the functionality of the clkgen
app
available to other clients.
The ClkgenService
implements the functionality which is accessed via
RPyC.
class ClkgenService(rpyc.Service): def __init__(self, app): super().__init__() self._app = app def initialize(self) -> None: """Initialize the clock generator hardware and software. >>> import rpyc >>> clkgen = rpyc.connect("127.0.0.1", 18861) >>> clkgen.root.initialize() """ if not self._app._nogui: self._app.initialize_ui() self._app.initialize_modules() if not self._app._nogui: self._app.enable_ui() @property def controllers(self) -> Dict[str, AD9552Controller]: """A dictionary containing the clock generator clock module controllers. The controllers are instances of AD9552Controller keyed using the clock module identifiers 'clkmod1' and 'clkmod2' respectively. >>> import rpyc >>> clkgen = rpyc.connect("127.0.0.1", 18861) >>> clkgen.root.initialize() >>> clkmod1 = clkgen.root.controllers['clkmod1'] >>> clkmod2 = clkgen.root.controllers['clkmod2'] """ return self._app.controllers def configure(self, ctl_id: str) -> bool: """Update the clock module hardware using the currently set configuration. :param ctl_id: A module id. This will be either 'clkmod1' or 'clkmod2'. :type ctl_id: str Returns a boolean value indicating the clock module PLL lock status. True for locked. >>> import rpyc >>> clkgen = rpyc.connect("127.0.0.1", 18861) >>> clkgen.root.initialize() >>> clkmod1 = clkgen.root.controllers['clkmod1'] >>> clkmod1.freq = 150.0 >>> chan1 = clkmod1.channels['1'] >>> chan1.state = ad9552.OutputState.ACTIVE >>> pll_lock = clkgen.root.configure('clkmod1') """ ctl = self._app.controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: pll_lock = ctl.configure(ser) return pll_lock def configure_freq(self, ctl_id: str) -> bool: """Update the clock module hardware with the currently set output frequency. :param ctl_id: A module id. This will be either 'clkmod1' or 'clkmod2'. :type ctl_id: str Returns a boolean value indicating the clock module PLL lock status. True for locked. >>> import rpyc >>> clkgen = rpyc.connect("127.0.0.1", 18861) >>> clkgen.root.initialize() >>> clkmod1 = clkgen.root.controllers['clkmod1'] >>> clkmod1.freq = 150.0 >>> clkgen.root.configure_freq('clkmod1') """ ctl = self._app.controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: pll_lock = ctl.configure_freq(ser) return pll_lock class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("ClockGen rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname = self._host, port = self._port, protocol_config = { 'allow_all_attrs': True, 'allow_setattr': True, 'allow_getattr': True}) self._server.start() self.finished.emit()
msg-dialog
Imports
from typing import ( Optional, List, Dict ) import sys import signal import asyncio from argparse import ArgumentParser import serial import json from serial.tools.list_ports import comports from rfblocks import ( ad9552, AD9552Controller, DividerRangeException, list_available_serial_ports, create_serial, write_cmd, query_cmd, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL ) from qtrfblocks import ( ClkModule, ClkChannel ) from qasync import ( QEventLoop, QThreadExecutor, asyncSlot, asyncClose ) from PyQt5.QtWidgets import ( QWidget, QLabel, QAbstractSpinBox, QDoubleSpinBox, QVBoxLayout, QLineEdit, QHBoxLayout, QGroupBox, QMainWindow, QComboBox, QCheckBox, QPushButton, QRadioButton, QButtonGroup, QMessageBox, QFormLayout, QErrorMessage, QApplication, QScrollArea, QMenu, QFileDialog, QAbstractButton ) from PyQt5.QtCore import ( Qt, QCoreApplication, QObject, QThread, pyqtSignal, pyqtSlot ) import rpyc from rpyc.utils.server import ThreadedServer