ddsgen.py
This document describes an app for controlling the DDS signal generator reference design.
Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.
The Script
The main
Function
The asyncio
framework is used to coordinate communications over the
serial device without blocking user interface events. For this
purpose we make use of QEventLoop
and QThreadExecutor
from the
qasync package.
The event loop is equipped with an instance of QThreadExecutor
which
executes a single thread. The idea here is to ensure that requests to
the ECApp serial device will execute one by one in a manner which will
not block the operation of the Qt event loop.
def main(): global server_thread <<process-cmdline-args>> # This ensures that Cntl-C will work as expected: signal.signal(signal.SIGINT, signal.SIG_DFL) if args.nogui is True: app = QCoreApplication(sys.argv) else: app = QApplication(sys.argv) loop = QEventLoop(app) loop.set_default_executor(QThreadExecutor(1)) asyncio.set_event_loop(loop) ddsgen_app = DDSApp( clk_ref = args.reffreq, dds_board1 = args.dds1model, dds_board2 = args.dds2model, serial_device = args.device, baudrate = args.baudrate, headless = args.nogui) if args.dumphw is True: clk_ctl = ddsgen_app.clk_controller print(f'Clock Generator {clk_ctl.controller_id}: ' f'{vars(clk_ctl._ad9552)}') for dds_id, dds_ctl in ddsgen_app.dds_controllers.items(): print(f'{dds_id}: {vars(dds_ctl._ad9913)}') sys.exit(0) if not args.nogui: ddsgen_app.show() server_thread = QThread() server = RPyCServer(DDSGenService(ddsgen_app), args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() with loop: loop.set_debug(True) sys.exit(app.exec_())
Process command line arguments
Command line arguments allow the clock generator reference frequency and DDS board models to be specified. This is useful when variants of the DDS boards are to be used.
Command line arguments also allow the specification of a serial device and baud rate for the purposes of testing.
defaultBaud = 0 defaultClkRefFreq = 10 defaultDDSBoard1 = '27dB-RF' defaultDDSBoard2 = '20dB' parser = ArgumentParser(description= '''A DDS signal generator.''') parser.add_argument("--nogui", action='store_true', help="Disable GUI and run 'headless'") parser.add_argument("-d", "--device", default=None, help="The hardware serial device") parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int, help="Baud rate (default: {})".format(defaultBaud)) parser.add_argument("-A", "--ipaddr", default="127.0.0.1", help="IP address for the RPyC server instance") parser.add_argument("-P", "--port", default=18862, type=int, help="TCP port for the RPyC server instance") parser.add_argument("-H", "--dumphw", action='store_true', help="Dump device hardware config to stdout and exit") parser.add_argument("-F", "--reffreq", default=defaultClkRefFreq, type=int, help="Clock reference frequency (in MHz). Default: {}MHz".format( defaultClkRefFreq)) parser.add_argument("--dds1model", default=defaultDDSBoard1, help="Board variant model for DDS chan 1. Default: {}".format( defaultDDSBoard1)) parser.add_argument("--dds2model", default=defaultDDSBoard2, help="Board variant model for DDS chan 2. Default: {}".format( defaultDDSBoard2)) args = parser.parse_args()
ddsgen-app-class
The class which encapsulates the overall application and builds the required on-screen controls.
The phase_sync
method is used to bring the phase of both DDS
channels into synchrony. This is done by writing the CFR1[14]
bit
for both DDS units and then subsequently issuing an IO_UPDATE
on
both channels simultaneously. This has the effect of clearing the
phase accumulator for both DDS units. Finally, the CFR1[14]
bit is
cleared for both channels.
Another "synchronization" tool is the "Autoclear Phase Accum." bit
(CFR1[2]
). This bit ensures that each time IO_UPDATE
is asserted,
the phase accumulator resets to 0-phase (instead of just picking up
where it left off).
<<twotone-dialog>> class DDSApp(QObject): <<hardware-config>> def __init__(self, clk_ref: float, dds_board1: str, dds_board2: str, config: Dict = DEFAULT_APP_CONFIG, serial_device: Optional[str] = None, baudrate: int = 0, headless: bool = False) -> None: super().__init__() self._nogui: bool = headless if self._nogui is False: self._widget = QWidget() self._initial_config: Dict = config self._ctl_device: Optional[str] = serial_device self._baudrate: int = baudrate if baudrate == 0: self._baudrate = DEFAULT_BAUDRATE self._ddsreset: str = config['ddsreset'] clk_hwconf = DDSApp.CLKMODULE_HWCONF.copy() clk_hwconf['fref'] = clk_ref self._clk_device: ad9552 = ad9552(**clk_hwconf) self._clk_controller: AD9552Controller = AD9552Controller( DDSApp.CLKMODULE_NAME, self._clk_device, config['clk']) # Dubious hack to back patch clock module reference frequency # from command line args. #self._clk_controller._ad9552.fref = clk_ref self._dds_devices: List[ad9913] = [ad9913(**hw_conf) for hw_conf in DDSApp.CHANNELS_HWCONF] for dds, board_model in zip( self._dds_devices, [dds_board1, dds_board2]): dds.board_model = board_model d: Dict = config['channels'] z = zip(d.keys(), self._dds_devices, d.values()) self._dds_controllers = { ctl_id: AD9913Controller(ctl_id, dds_device, ctl_config) for ctl_id, dds_device, ctl_config in z } if not headless: has_ui = [ch['ui'] for ch in config['clk']['channels'].values()] self._clkmod: ClkModule = ClkModule( self, DDSApp.CLKMODULE_NAME, self._clk_controller, *has_ui) self._channels: Dict = { chan_id: DDSChan(self, dds_ctl) for chan_id, dds_ctl in zip(d.keys(), self._dds_controllers.values())} self.build_ui() <<build-app-ui>> <<initialize>> <<app-configuration>> @property def clk_controller(self) -> AD9552Controller: return self._clk_controller @property def dds_controllers(self): return self._dds_controllers @property def ctl_device(self) -> Optional[str]: return self._ctl_device @property def serial_ports(self) -> List: ports = [DEFAULT_SOCKET_URL] ports.extend([port.device for port in comports() if port.serial_number]) if self.ctl_device: if self.ctl_device in ports: ports.pop(ports.index(self.ctl_device)) ports.insert(0, self.ctl_device) return ports @property def baudrate(self) -> int: return self._baudrate @asyncSlot(QAbstractButton) async def set_refsrc(self, btn) -> None: """Set the reference source for the generator. The reference source is controlled via the `Ref` pin on the ad9552 clock module. """ clk_ctl = self.clk_controller if btn.src == clk_ctl.refsrc: return clk_ctl.refsrc = btn.src try: cmd = clk_ctl._ad9552.config_refsrc() if len(cmd): loop = asyncio.get_event_loop() with create_serial(self.ctl_device, self.baudrate) as ser: await loop.run_in_executor( None, write_cmd, ser, cmd) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se)) def tty_changed(self, combo: QComboBox, idx: int) -> None: self._ctl_device = combo.itemText(idx) def show_twotone(self) -> None: twotone_dialog = TwoToneDialog(self, self._channels) twotone_dialog.exec_() def disable_dds(self) -> None: for chan in self._channels.values(): chan.enabled = False for btn in self._refsrc_btns: btn.setEnabled(False) def disable_auxclk(self) -> None: self._clkmod.enabled = False def enable_dds(self) -> None: if not self._nogui: for chan in self._channels.values(): chan.enabled = True for btn in self._refsrc_btns: btn.setEnabled(True) def enable_auxclk(self) -> None: self._clkmod.enabled = True def set_auxclk_state(self, state: bool) -> None: if state: self.disable_dds() self._twotone_btn.setEnabled(False) self.enable_auxclk() else: self.disable_auxclk() self.enable_dds() self._twotone_btn.setEnabled(True) self._clkmod.initialize_ui() try: with create_serial(self.ctl_device, self.baudrate) as ser: self._clkmod.initialize_hw(ser) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se)) @asyncSlot() async def phase_sync(self): """Synchronize the channel phases. This is done as follows: 1. Set the clear DDS phase accumulator bit (14) in CFR1 and shift the CFR1 value into the AD9913 buffer for both channels. 2. Pulse the IOUP pin high on both channels simultaneously in order to load the buffered CFR1 values. This has the effect of holding the DDS phase accumulator at 0. 3. Clear the clear DDS phase accumulator bit in CFR1 and shift the CFR1 value into the AD9913 buffer for both channels. 4. Pulse the IOUP pin high on both channels simultaneously in order to load the buffered CFR1 values. This has the effect of restarting the DDS phase accumulator for both channels simultaneously. Note that the command to manipulate the IOUP pins assumes that these pins are connected to PB7 and PB6. """ cmd1 = '' for chan in self._channels.values(): chan.ctl._ad9913.set_clear_phase_accumulator(True) cmd1 += chan.ctl._ad9913.config_cfr1(False) cmd1 += 'MB,C0,00,C0,00:' cmd2 = '' for chan in self._channels.values(): chan.ctl._ad9913.set_clear_phase_accumulator(False) cmd2 += chan.ctl._ad9913.config_cfr1(False) cmd2 += 'MB,C0,00,C0,00:' try: with create_serial(self.ctl_device, self.baudrate) as ser: loop = asyncio.get_event_loop() await loop.run_in_executor(None, write_cmd, ser, cmd1) await loop.run_in_executor(None, write_cmd, ser, cmd2) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se))
hardware-config
The DDS clock generator module is ClkMod
and is configured with an
input reference frequency of 10 MHz. The B5
microcontroller pin is
used to control the reference source using the Ref
pin of ClkMod
.
CH1
and CH2
are the two DDS signal generator modules.
|
Reference Source |
---|---|
0 |
External |
1 |
Internal |
Note that the clock generator module Ref
pin is pulled high by 10K
resistor and therefore the default clock reference will be the
on-board 10 MHz source.
The default clock generator configuration is:
Channel property |
Value |
---|---|
Channel state ( |
ACTIVE |
Output frequency |
250 MHz |
Output mode ( |
CMOS_BOTH_ACTIVE |
Drive strength ( |
WEAK |
CMOS polarity ( |
COMM_POS |
Channel source ( |
PLL |
CLKMODULE_NAME: str = 'clkmod' DDSCH1_NAME: str = 'Chan 1' DDSCH2_NAME: str = 'Chan 2' CLKMODULE_HWCONF: Dict = { 'cs': 'C7', 'lockdetect': 'B4', 'reset': None, 'fref': 10.0, 'refselect': 'B5'} DDSCH1_HWCONF: Dict = {'cs': 'C6', 'io_update': 'B6', 'reset': 'C4', 'ps0': 'D4', 'ps1': 'D5', 'ps2': 'D6', 'board_model': '27dB-RF'} DDSCH2_HWCONF: Dict = {'cs': 'C5', 'io_update': 'B7', 'reset': 'C4', 'ps0': 'D0', 'ps1': 'D1', 'ps2': 'D2', 'board_model': '20dB'} DDS_RESET: str = 'C4' CHANNELS_HWCONF: List[Dict] = [ DDSCH1_HWCONF, DDSCH2_HWCONF ] DEFAULT_CLK_FREQ: float = 250.0 DEFAULT_CHAN_MODE: ad9552.OutputMode = ad9552.OutputMode.CMOS_BOTH_ACTIVE DEFAULT_CHAN_STATE: ad9552.OutputState = ad9552.OutputState.ACTIVE DEFAULT_CHAN_DRIVE: ad9552.DriveStrength = ad9552.DriveStrength.WEAK DEFAULT_CHAN_POLARIY: ad9552.CmosPolarity = ad9552.CmosPolarity.COMM_POS DEFAULT_CHAN_SOURCE: ad9552.SourceControl = ad9552.SourceControl.PLL AUX_CHAN_MODE: ad9552.OutputMode = ad9552.OutputMode.LVPECL AUX_CHAN_STATE: ad9552.OutputState = ad9552.OutputState.POWERED_DOWN AUX_CHAN_DRIVE: ad9552.DriveStrength = ad9552.DriveStrength.STRONG AUX_CHAN_POLARIY: ad9552.CmosPolarity = ad9552.CmosPolarity.DIFF_POS AUX_CHAN_SOURCE: ad9552.SourceControl = ad9552.SourceControl.PLL DEFAULT_CHAN_CONFIG: Dict = { 'ui': False, 'mode': DEFAULT_CHAN_MODE, 'state': DEFAULT_CHAN_STATE, 'drive': DEFAULT_CHAN_DRIVE, 'polarity': DEFAULT_CHAN_POLARIY, 'source': DEFAULT_CHAN_SOURCE } AUX_CHAN_CONFIG: Dict = { 'ui': True, 'mode': AUX_CHAN_MODE, 'state': AUX_CHAN_STATE, 'drive': AUX_CHAN_DRIVE, 'polarity': AUX_CHAN_POLARIY, 'source': AUX_CHAN_SOURCE } DEFAULT_CLK_CONFIG: Dict = { 'freq': DEFAULT_CLK_FREQ, 'channels': { '1': { 'label': '1', **DEFAULT_CHAN_CONFIG }, '2': { 'label': '2', **AUX_CHAN_CONFIG } } } DEFAULT_DDS_FREQ = 10.0 DEFAULT_DDS_PHASE = 0.0 DEFAULT_DDS_LEVEL = 512 DEFAULT_DDS_CONFIG: Dict = { 'freq': DEFAULT_DDS_FREQ, 'ph': DEFAULT_DDS_PHASE, 'lvl': DEFAULT_DDS_LEVEL, 'lvl_units': DDSChan.LVL_UNITS.DBM, 'pmod': False, 'state': ad9913.POWER_DOWN, 'sweep': { 'type': ad9913.SweepType.FREQUENCY, 'start': 0.5, 'end': 5.0, 'ramp': ad9913.SweepRampType.SWEEP_OFF, 'delta': [0.1, 0.1], # MHz per step 'rate': [10.0, 10.0], # microsecs per step 'dwell': False, 'trigsrc': ad9913.SweepTriggerSource.REGISTER, 'trigtype': ad9913.SweepTriggerType.EDGE_TRIGGER }, 'profiles': [[0.5, 0.0], [1.0, 0.0], [2.0, 0.0], [5.0, 0.0], [10.0, 0.0], [20.0, 0.0], [50.0, 0.0], [100.0, 0.0]], 'selected_profile': 0, 'profile_type': ad9913.SweepType.FREQUENCY } DEFAULT_APP_CONFIG: Dict = { 'clk': DEFAULT_CLK_CONFIG, 'channels': { DDSCH1_NAME: { 'label': DDSCH1_NAME, **DEFAULT_DDS_CONFIG }, DDSCH2_NAME: { 'label': DDSCH2_NAME, **DEFAULT_DDS_CONFIG } }, 'ddsreset': DDS_RESET, 'refsrc': ad9552.ReferenceSource.INTERNAL }
build-app-ui
def build_ui(self): """Build the on-screen UI for the DDS signal generator app.""" vbox = QVBoxLayout() hbox = QHBoxLayout() tty_combo = QComboBox() tty_combo.currentIndexChanged.connect( lambda idx, w=tty_combo: self.tty_changed(w, idx)) tty_combo.addItems(self.serial_ports) line_edit = QLineEdit() tty_combo.setLineEdit(line_edit) hbox.addWidget(QLabel("Control Port:")) hbox.addWidget(tty_combo) hbox.addStretch(1) configuration_btn = QPushButton("Configuration") config_menu = QMenu() config_menu.addAction('Save', self.save_config) config_menu.addAction('Load', self.open_config) configuration_btn.setMenu(config_menu) hbox.addWidget(configuration_btn) initialize_btn = QPushButton("Initialize") initialize_btn.clicked.connect(self.initialize) hbox.addWidget(initialize_btn) vbox.addLayout(hbox) hbox2 = QHBoxLayout() hbox2.addWidget(QLabel("Ref. Source:")) self.refsrcGroup = QButtonGroup(hbox2) rb = QRadioButton("Internal") self.refsrcGroup.addButton(rb) rb.src = ad9552.ReferenceSource.INTERNAL rb.setChecked(True) hbox2.addWidget(rb) rb2 = QRadioButton("External (10MHz)") self.refsrcGroup.addButton(rb2) rb2.src = ad9552.ReferenceSource.EXTERNAL self.refsrcGroup.buttonClicked.connect(self.set_refsrc) hbox2.addWidget(rb2) self._refsrc_btns = [rb, rb2] hbox2.addStretch() vbox.addLayout(hbox2) hbox3 = QHBoxLayout() hbox3.addWidget(self._channels[DDSApp.DDSCH1_NAME].build_ui()) hbox3.addWidget(self._channels[DDSApp.DDSCH2_NAME].build_ui()) hbox3.addWidget(self._clkmod.build_ui()) self._clkmod._group_box.setTitle('Aux. Clk') vbox.addLayout(hbox3) hbox5 = QHBoxLayout() self._twotone_btn = QPushButton("Two Tone...") self._twotone_btn.clicked.connect(self.show_twotone) hbox5.addWidget(self._twotone_btn) self._sync_btn = QPushButton("Sync") self._sync_btn.clicked.connect(self.phase_sync) hbox5.addWidget(self._sync_btn) hbox5.addStretch() fbox3 = QFormLayout() auxclk_cb = QCheckBox("") auxclk_cb.setChecked(False) auxclk_cb.stateChanged.connect(self.set_auxclk_state) fbox3.addRow(QLabel("Enable Aux. Clk:"), auxclk_cb) hbox5.addLayout(fbox3) hbox5.addStretch() vbox.addLayout(hbox5) self._widget.setLayout(vbox) self.disable_dds() self.disable_auxclk() self._widget.setGeometry(300, 300, 700, 150) self._widget.setWindowTitle('DDS Signal Generator') def show(self): self._widget.show()
twotone-dialog
class TwoToneDialog(QDialog): """ """ def __init__(self, ddsApp: 'DDSApp', dds_chans: Dict, initial_level: int = 512, parent: QWidget = None): QDialog.__init__(self, parent) self._app: 'DDSApp' = ddsApp self._dds_chans: Dict = dds_chans self._lower_freq: float = 10.0 self._lvl: int = initial_level self._upper_freq: float = 11.0 self._centre_freq: float = 10.5 self._span_freq: float = 1.0 self._lowerf_box: Optional[QDoubleSpinBox] = None self._upperf_box: Optional[QDoubleSpinBox] = None self._centref_box: Optional[QDoubleSpinBox] = None self._spanf_box: Optional[QDoubleSpinBox] = None self.setWindowTitle("Two Tone Settings") self.build_ui() @property def lower_freq(self) -> float: return self._lower_freq def set_lower_freq(self, f: float) -> None: self._lower_freq = f if self._upperf_box: self._upperf_box.setMinimum(f) self.update_centre_span() @property def level(self) -> int: return self._lvl def set_level(self, l: int) -> None: self._lvl = l @property def millivolts(self) -> float: chan = list(self._dds_chans.values())[0] return chan.ctl.level_to_millivolts(self._lvl) def set_millivolts(self, mv: float) -> None: chan = list(self._dds_chans.values())[0] self._lvl = chan.ctl.millivolts_to_level(mv) @property def dbm(self) -> float: chan = list(self._dds_chans.values())[0] return chan.ctl.level_to_dbm(self._lvl) def set_dbm(self, d: float) -> None: chan = list(self._dds_chans.values())[0] self._lvl = chan.ctl.dbm_to_level(d) @property def upper_freq(self) -> float: return self._upper_freq def set_upper_freq(self, f: float) -> None: self._upper_freq = f if self._lowerf_box: self._lowerf_box.setMaximum(f) self.update_centre_span() def update_centre(self) -> None: if self._centref_box: self._centre_freq = self._lower_freq + (self._span_freq/2.0) self._centref_box.disconnect() self._centref_box.setValue(self._centre_freq) self._centref_box.valueChanged.connect(self.set_centre_freq) def update_span(self) -> None: if self._spanf_box: self._span_freq = self._upper_freq - self._lower_freq self._spanf_box.disconnect() self._spanf_box.setValue(self._span_freq) self._spanf_box.valueChanged.connect(self.set_span_freq) def update_centre_span(self) -> None: self.update_centre() self.update_span() @property def centre_freq(self) -> float: return self._centre_freq def set_centre_freq(self, f: float) -> None: self._centre_freq = f self.update_lower_upper() @property def span_freq(self) -> float: return self._span_freq def set_span_freq(self, f: float): self._span_freq = f self.update_lower_upper() self.update_centre() def update_lower_upper(self) -> None: if self._lowerf_box: self._lower_freq = self._centre_freq - (self._span_freq/2.0) self._upper_freq = self._centre_freq + (self._span_freq/2.0) self._lowerf_box.disconnect() self._upperf_box.disconnect() self._lowerf_box.setValue(self._lower_freq) self._lowerf_box.setMaximum(self._upper_freq) self._upperf_box.setValue(self._upper_freq) self._upperf_box.setMinimum(self._lower_freq) self._lowerf_box.valueChanged.connect(self.set_lower_freq) self._upperf_box.valueChanged.connect(self.set_upper_freq) def build_ui(self) -> None: vbox = QVBoxLayout() hbox1 = QHBoxLayout() fbox1 = QFormLayout() self._lowerf_box = QDoubleSpinBox() self._lowerf_box.setRange(DDSChan.MIN_FREQUENCY, self.upper_freq) self._lowerf_box.setDecimals(5) self._lowerf_box.setValue(self.lower_freq) self._lowerf_box.setSuffix(DDSChan.FREQ_SUFFIX) self._lowerf_box.valueChanged.connect(self.set_lower_freq) fbox1.addRow(QLabel("Lower Freq.:"), self._lowerf_box) self._centref_box = QDoubleSpinBox() self._centref_box.setRange(DDSChan.MIN_FREQUENCY, DDSChan.MAX_FREQUENCY) self._centref_box.setDecimals(5) self._centref_box.setValue(self.centre_freq) self._centref_box.setSuffix(DDSChan.FREQ_SUFFIX) self._centref_box.valueChanged.connect(self.set_centre_freq) fbox1.addRow(QLabel("Centre Freq.:"), self._centref_box) hbox2 = QHBoxLayout() self._lvl_box = QDoubleSpinBox() # Assume that DDS channels have identical configuration self._lvl_box.setRange(*list(self._dds_chans.values())[0].dbm_range) self._lvl_box.setDecimals(1) self._lvl_box.setSuffix(' dBm') self._lvl_box.setValue(self.dbm) self._lvl_box.valueChanged.connect(self.set_dbm) hbox2.addWidget(self._lvl_box) level_units_btn = DDSChan.level_units_control(self, self) hbox2.addWidget(level_units_btn) fbox1.addRow(QLabel("Level:"), hbox2) fbox2 = QFormLayout() self._upperf_box = QDoubleSpinBox() self._upperf_box.setRange(self.lower_freq, DDSChan.MAX_FREQUENCY) self._upperf_box.setDecimals(5) self._upperf_box.setValue(self.upper_freq) self._upperf_box.setSuffix(DDSChan.FREQ_SUFFIX) self._upperf_box.valueChanged.connect(self.set_upper_freq) fbox2.addRow(QLabel("Upper Freq.:"), self._upperf_box) self._spanf_box = QDoubleSpinBox() self._spanf_box.setRange(DDSChan.MIN_FREQUENCY, DDSChan.MAX_FREQUENCY) self._spanf_box.setDecimals(5) self._spanf_box.setValue(self.span_freq) self._spanf_box.setSuffix(DDSChan.FREQ_SUFFIX) self._spanf_box.valueChanged.connect(self.set_span_freq) fbox2.addRow(QLabel("Span.:"), self._spanf_box) hbox1.addLayout(fbox1) hbox1.addLayout(fbox2) hbox1.addStretch() vbox.addLayout(hbox1) config_btn = QPushButton("Configure") bbox = QDialogButtonBox(QDialogButtonBox.Apply | QDialogButtonBox.Close) bbox.addButton(config_btn, QDialogButtonBox.ActionRole) vbox.addWidget(bbox) bbox.rejected.connect(self.reject) bbox.button(QDialogButtonBox.Apply).clicked.connect(self.apply) config_btn.clicked.connect(self.configure) self.setLayout(vbox) def set_chan_level(self, chan: DDSChan) -> None: if chan.level_units == DDSChan.LVL_UNITS.DAC: chan._lvl_box.setValue(self.level) elif chan.level_units == DDSChan.LVL_UNITS.MV: chan._lvl_box.setValue(self.millivolts) else: chan._lvl_box.setValue(self.dbm) def apply(self) -> None: lower_chan = self._app._channels[DDSApp.DDSCH1_NAME] upper_chan = self._app._channels[DDSApp.DDSCH2_NAME] lower_chan._freq_box.setValue(self.lower_freq) upper_chan._freq_box.setValue(self.upper_freq) self.set_chan_level(lower_chan) self.set_chan_level(upper_chan) @asyncSlot() async def configure(self): self.apply() loop = asyncio.get_event_loop() with create_serial(self._app.ctl_device, self._app.baudrate) as ser: for chan in self._app._channels.values(): await loop.run_in_executor(None, chan.configure_hw, ser) def reject(self) -> None: QDialog.reject(self) def set_dac_units(self) -> None: self._lvl_box.disconnect() self._lvl_box.setRange(*DDSChan.DAC_CODE_RANGE) self._lvl_box.setDecimals(0) self._lvl_box.setSuffix('') self._lvl_box.setValue(self.level) self._lvl_box.valueChanged.connect(self.set_level) def set_millivolt_units(self) -> None: self._lvl_box.disconnect() self._lvl_box.setRange(*list(self._dds_chans.values())[0].millivolts_range) self._lvl_box.setDecimals(1) self._lvl_box.setSuffix(' mV') self._lvl_box.setValue(self.millivolts) self._lvl_box.valueChanged.connect(self.set_millivolts) def set_dbm_units(self) -> None: self._lvl_box.disconnect() self._lvl_box.setRange(*list(self._dds_chans.values())[0].dbm_range) self._lvl_box.setDecimals(1) self._lvl_box.setSuffix(' dBm') self._lvl_box.setValue(self.dbm) self._lvl_box.valueChanged.connect(self.set_dbm)
initialize
@asyncSlot() async def initialize(self): try: self.initialize_ui() loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.initialize_hw) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se)) finally: self.enable_dds() def initialize_ui(self) -> None: self._clkmod.initialize_ui() for chan in self._channels.values(): chan.initialize_ui() def initialize_hw(self) -> None: with create_serial(self.ctl_device, self.baudrate) as ser: if self._nogui is False: self._clkmod.initialize_hw(ser) for chan in self._channels.values(): chan.initialize_hw(ser) else: self.clk_controller.initialize(ser) for ctl in self.dds_controllers.values(): ctl.initialize(ser) def reset_dds(self) -> None: try: for chan in self._channels.values(): chan.chip_reset() with create_serial(self.ctl_device, self.baudrate) as ser: cmd = 'H{}:L{}'.format(self._ddsreset, self._ddsreset) write_cmd(ser, cmd) except serial.serialutil.SerialException as se: error_dialog = QErrorMessage(self) error_dialog.showMessage(str(se))
Upon assertion of RESET
, the phase accumulator is cleared (i.e.,
0-phase) and is stalled (i.e., not accumulating). At de-assertion of
reset, the accumulator is free to begin accumulating, which starts
the DDS sinusoidal generation process. So, the onset of the output
sinusoid begins when the accumulator starts accumulating. Furthermore
the sinusoidal generation process starts at 0-phase (as the
accumulator starts off cleared).
app-configuration
def save_config(self) -> None: config = self.dump_config() filter_str = "JSON Configuration (*.json)" def_filepath = '.' name, selfilter = QFileDialog.getSaveFileName(self, 'Save configuration', def_filepath, filter_str) output_file = str(name).strip() if len(output_file) == 0: return with open(output_file, 'w') as fd: json.dump(config, fd) def open_config(self) -> None: filter_str = "JSON Configuration (*.json)" def_filepath = '.' names, selfilter = QFileDialog.getOpenFileNames(self, 'Load Configuration', def_filepath, filter_str) if len(names): input_file = names[0] with open(input_file, 'r') as fd: config = json.load(fd) self.load_config(config) def dump_config(self) -> Dict: chan_config = {} for chan_id, chan in self._channels.items(): chan_config[chan_id] = { 'label': chan.label, **chan.dump_config()} config = { 'clk': self._clkmod.dump_config(), 'channels': chan_config, 'ddsreset': self._ddsreset, 'refsrc': self._clkmod.refsrc } return config def load_config(self, config: Dict) -> None: self._clkmod.load_config(config['clk']) self._ddsreset = config['ddsreset'] self._clkmod.refsrc = config['refsrc'] self._clkmod.configure_ui(config['clk']) for chan_id, chan in self._channels.items(): chan_config = config['channels'][chan_id] chan.load_config(chan_config) chan.configure_ui(chan_config)
ddsgen-service
The Python remote procedure call framework RPyC is used to create a
network service which makes the functionality of the ddsgen
app
available to other clients.
The DDSGenService
implements the functionality which is accessed via
RPyC.
class DDSGenService(rpyc.Service): def __init__(self, app): super().__init__() self._app = app def initialize(self) -> None: """Initialize the signal generator hardware and software. >>> import rpyc >>> ddsgen = rpyc.connect("127.0.0.1", 18862) >>> ddsgen.root.initialize() """ if not self._app._nogui: self._app.initialize_ui() self._app.initialize_hw() self._app.enable_dds() @property def dds_controllers(self) -> Dict[str, AD9913Controller]: """A dictionary containing the signal generator DDS controllers. The controllers are instances of AD9913Controller keyed using the channel identifiers 'Chan 1' and 'Chan 2' respectively. >>> import rpyc >>> ddsgen = rpyc.connect("127.0.0.1", 18862) >>> ddsgen.root.initialize() >>> dds1 = ddsgen.root.dds_controllers['Chan 1'] >>> dds1 <rfblocks.ad9913_controller.AD9913Controller object at 0x126f8b4c0> >>> dds2 = ddsgen.root.dds_controllers['Chan 2'] """ return self._app.dds_controllers @property def clk_controller(self) -> AD9552Controller: """The controller for the DDS reference clock. An instance of AD9552Controller. >>> import rpyc >>> ddsgen = rpyc.connect("127.0.0.1", 18862) >>> ddsgen.root.initialize() >>> refclk = ddsgen.root.clk_controller """ return self._app.clk_controller def configure(self, ctl_id: str) -> None: """Configure AD9913 DDS hardware registers for a specified channel. :param ctl_id: A channel id. This will be one of "Chan 1" or "Chan 2". :type ctl_id: str >>> import rpyc >>> ddsgen = rpyc.connect("127.0.0.1", 18862) >>> ddsgen.root.initialize() >>> dds1 = ddsgen.root.dds_controllers['Chan 1'] >>> dds1.freq = 25.0 >>> dds1.dbm = -10.0 >>> ddsgen.root.configure('Chan 1') """ ctl = self._app.dds_controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: ctl.configure(ser) def configure_sweep(self, ctl_id: str) -> None: """Configure AD9913 DDS sweep registers for a specified channel. :param ctl_id: A channel id. This will be one of "Chan 1" or "Chan 2". :type ctl_id: str """ ctl = self._app.dds_controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: ctl.configure_sweep(ser) def update_profile(self, ctl_id: str, profnum: int) -> None: """Update AD9913 DDS profile registers for a specified channel and profile. :param ctl_id: A channel id. This will be one of "Chan 1" or "Chan 2". :type ctl_id: str :param profnum: The AD9913 profile register to update. :type profnum: An integer in the range 0 to 7. """ ctl = self._app.dds_controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: ctl.update_profile(ser, profnum) def configure_profile(self, ctl_id: str) -> None: """ """ ctl = self._app.dds_controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: ctl.configure_profile(ser) def start_sweep(self, ctl_id: str) -> None: """Start an AD9913 sweep operation for a specified channel. :param ctl_id: A channel id. This will be one of "Chan 1" or "Chan 2". :type ctl_id: str """ ctl = self._app.dds_controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: ctl.start_sweep(ser) def stop_sweep(self, ctl_id: str) -> None: """Stop an AD9913 sweep operation. :param ctl_id: A channel id. This will be one of "Chan 1" or "Chan 2". :type ctl_id: str """ ctl = self._app.dds_controllers[ctl_id] with create_serial(self._app.ctl_device, self._app.baudrate) as ser: ctl.stop_sweep(ser) def set_pmod(self, ctl_id: str, state: bool) -> None: """Set the DDS programmable modulus state. :param ctl_id: A channel id. This will be one of "Chan 1" or "Chan 2". :type ctl_id: str :param state: Set to True in order to enable progammable modulus. False to disable it. :type state: bool Note that :py:meth:`configure` must be invoked in order to update the DDS hardware """ ctl = self._app.dds_controllers[ctl_id] ctl.pmod = state if state: # Disable direct switch mode when enabling # programmable modulus ctl.set_direct_switch_enabled = False class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("DDSGen rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname = self._host, port = self._port, protocol_config = { 'allow_all_attrs': True, 'allow_setattr': True, 'allow_pickle': True}) self._server.start() self.finished.emit()
Imports
from typing import ( Optional, List, Dict ) import sys import copy from math import sqrt, log10 from enum import Enum import signal import asyncio from argparse import ArgumentParser import serial import json from serial.tools.list_ports import comports from rfblocks import ( ad9552, AD9552Controller, ad9913, AD9913Controller, create_serial, write_cmd, query_cmd, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL ) from qtrfblocks import ( ClkModule, DDSChan ) from qasync import ( QEventLoop, QThreadExecutor, asyncSlot, asyncClose ) from PyQt5.QtWidgets import ( QWidget, QLabel, QAbstractSpinBox, QDoubleSpinBox, QVBoxLayout, QLineEdit, QHBoxLayout, QGroupBox, QMainWindow, QComboBox, QCheckBox, QPushButton, QRadioButton, QButtonGroup, QMessageBox, QFormLayout, QErrorMessage, QApplication, QScrollArea, QMenu, QFileDialog, QSpinBox, QDialog, QDialogButtonBox, QAction, QActionGroup, QAbstractButton ) from PyQt5.QtCore import ( Qt, QCoreApplication, QObject, QThread, pyqtSignal ) import rpyc from rpyc.utils.server import ThreadedServer