Open RF Prototyping

ddsgen.py

This document describes an app for controlling the DDS signal generator reference design.

Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.

The Script

# Generated from ddsgen.org
#

<<imports>>

<<ddsgen-app-class>>

<<ddsgen-service>>

<<main-func>>

if __name__ == '__main__':
    main()

The main Function

The asyncio framework is used to coordinate communications over the serial device without blocking user interface events. For this purpose we make use of QEventLoop and QThreadExecutor from the qasync package.

The event loop is equipped with an instance of QThreadExecutor which executes a single thread. The idea here is to ensure that requests to the ECApp serial device will execute one by one in a manner which will not block the operation of the Qt event loop.

def main():
    global server_thread

    <<process-cmdline-args>>

    # This ensures that Cntl-C will work as expected:
    signal.signal(signal.SIGINT, signal.SIG_DFL)

    if args.nogui is True:
        app = QCoreApplication(sys.argv)
    else:
        app = QApplication(sys.argv)
    loop = QEventLoop(app)
    loop.set_default_executor(QThreadExecutor(1))
    asyncio.set_event_loop(loop)

    ddsgen_app = DDSApp(
        clk_ref = args.reffreq,
        dds_board1 = args.dds1model,
        dds_board2 = args.dds2model,
        serial_device = args.device,
        baudrate = args.baudrate,
        headless = args.nogui)

    if args.dumphw is True:
        clk_ctl = ddsgen_app.clk_controller
        print(f'Clock Generator {clk_ctl.controller_id}: '
              f'{vars(clk_ctl._ad9552)}')
        for dds_id, dds_ctl in ddsgen_app.dds_controllers.items():
            print(f'{dds_id}: {vars(dds_ctl._ad9913)}')
        sys.exit(0)

    if not args.nogui:
        ddsgen_app.show()

    server_thread = QThread()
    server = RPyCServer(DDSGenService(ddsgen_app),
                        args.ipaddr,
                        args.port)
    server.moveToThread(server_thread)
    server_thread.started.connect(server.run)
    server.finished.connect(server_thread.quit)
    server.finished.connect(server.deleteLater)
    server_thread.finished.connect(server_thread.deleteLater)
    server_thread.start()

    with loop:
        loop.set_debug(True)
        sys.exit(app.exec_())

Process command line arguments

Command line arguments allow the clock generator reference frequency and DDS board models to be specified. This is useful when variants of the DDS boards are to be used.

Command line arguments also allow the specification of a serial device and baud rate for the purposes of testing.

defaultBaud = 0
defaultClkRefFreq = 10
defaultDDSBoard1 = '27dB-RF'
defaultDDSBoard2 = '20dB'

parser = ArgumentParser(description=
                        '''A DDS signal generator.''')

parser.add_argument("--nogui", action='store_true',
                    help="Disable GUI and run 'headless'")
parser.add_argument("-d", "--device", default=None,
                    help="The hardware serial device")
parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int,
                    help="Baud rate (default: {})".format(defaultBaud))
parser.add_argument("-A", "--ipaddr", default="127.0.0.1",
                    help="IP address for the RPyC server instance")
parser.add_argument("-P", "--port", default=18862, type=int,
                    help="TCP port for the RPyC server instance")
parser.add_argument("-H", "--dumphw", action='store_true',
                    help="Dump device hardware config to stdout and exit")
parser.add_argument("-F", "--reffreq", default=defaultClkRefFreq, type=int,
                    help="Clock reference frequency (in MHz).  Default: {}MHz".format(
                        defaultClkRefFreq))
parser.add_argument("--dds1model", default=defaultDDSBoard1,
                    help="Board variant model for DDS chan 1. Default: {}".format(
                        defaultDDSBoard1))
parser.add_argument("--dds2model", default=defaultDDSBoard2,
                    help="Board variant model for DDS chan 2. Default: {}".format(
                        defaultDDSBoard2))
args = parser.parse_args()

ddsgen-app-class

The class which encapsulates the overall application and builds the required on-screen controls.

The phase_sync method is used to bring the phase of both DDS channels into synchrony. This is done by writing the CFR1[14] bit for both DDS units and then subsequently issuing an IO_UPDATE on both channels simultaneously. This has the effect of clearing the phase accumulator for both DDS units. Finally, the CFR1[14] bit is cleared for both channels.

Another "synchronization" tool is the "Autoclear Phase Accum." bit (CFR1[2]). This bit ensures that each time IO_UPDATE is asserted, the phase accumulator resets to 0-phase (instead of just picking up where it left off).

<<twotone-dialog>>

class DDSApp(QObject):

    <<hardware-config>>

    def __init__(self,
                 clk_ref: float,
                 dds_board1: str,
                 dds_board2: str,
                 config: Dict = DEFAULT_APP_CONFIG,
                 serial_device: Optional[str] = None,
                 baudrate: int = 0,
                 headless: bool = False) -> None:
        super().__init__()
        self._nogui: bool = headless
        if self._nogui is False:
            self._widget = QWidget()
        self._initial_config: Dict = config
        self._ctl_device: Optional[str] = serial_device
        self._baudrate: int = baudrate
        if baudrate == 0:
            self._baudrate = DEFAULT_BAUDRATE
        self._ddsreset: str = config['ddsreset']

        clk_hwconf = DDSApp.CLKMODULE_HWCONF.copy()
        clk_hwconf['fref'] = clk_ref
        self._clk_device: ad9552 = ad9552(**clk_hwconf)
        self._clk_controller: AD9552Controller = AD9552Controller(
            DDSApp.CLKMODULE_NAME,
            self._clk_device,
            config['clk'])

        # Dubious hack to back patch clock module reference frequency
        # from command line args.
        #self._clk_controller._ad9552.fref = clk_ref
        self._dds_devices: List[ad9913] = [ad9913(**hw_conf)
                                for hw_conf in DDSApp.CHANNELS_HWCONF]
        for dds, board_model in zip(
                self._dds_devices, [dds_board1, dds_board2]):
            dds.board_model = board_model
        d: Dict = config['channels']
        z = zip(d.keys(), self._dds_devices, d.values())
        self._dds_controllers = { ctl_id: AD9913Controller(ctl_id,
                                                           dds_device,
                                                           ctl_config)
                                  for ctl_id, dds_device, ctl_config in z }

        if not headless:
            has_ui = [ch['ui'] for ch in config['clk']['channels'].values()]
            self._clkmod: ClkModule = ClkModule(
                self,
                DDSApp.CLKMODULE_NAME,
                self._clk_controller,
                *has_ui)
            self._channels: Dict = {
                chan_id: DDSChan(self, dds_ctl) for chan_id, dds_ctl in
                             zip(d.keys(), self._dds_controllers.values())}
            self.build_ui()

    <<build-app-ui>>

    <<initialize>>

    <<app-configuration>>

    @property
    def clk_controller(self) -> AD9552Controller:
        return self._clk_controller

    @property
    def dds_controllers(self):
        return self._dds_controllers

    @property
    def ctl_device(self) -> Optional[str]:
        return self._ctl_device

    @property
    def serial_ports(self) -> List:
        ports = [DEFAULT_SOCKET_URL]
        ports.extend([port.device for port in comports() if port.serial_number])
        if self.ctl_device:
            if self.ctl_device in ports:
                ports.pop(ports.index(self.ctl_device))
            ports.insert(0, self.ctl_device)
        return ports

    @property
    def baudrate(self) -> int:
        return self._baudrate

    @asyncSlot(QAbstractButton)
    async def set_refsrc(self, btn) -> None:
        """Set the reference source for the generator.

        The reference source is controlled via the `Ref` pin
        on the ad9552 clock module.
        """
        clk_ctl = self.clk_controller
        if btn.src == clk_ctl.refsrc:
            return
        clk_ctl.refsrc = btn.src
        try:
            cmd = clk_ctl._ad9552.config_refsrc()
            if len(cmd):
                loop = asyncio.get_event_loop()
                with create_serial(self.ctl_device,
                                   self.baudrate) as ser:
                    await loop.run_in_executor(
                        None, write_cmd, ser, cmd)
        except serial.serialutil.SerialException as se:
            error_dialog = QErrorMessage(self)
            error_dialog.showMessage(str(se))

    def tty_changed(self, combo: QComboBox, idx: int) -> None:
        self._ctl_device = combo.itemText(idx)

    def show_twotone(self) -> None:
        twotone_dialog = TwoToneDialog(self, self._channels)
        twotone_dialog.exec_()

    def disable_dds(self) -> None:
        for chan in self._channels.values():
            chan.enabled = False
        for btn in self._refsrc_btns:
            btn.setEnabled(False)

    def disable_auxclk(self) -> None:
        self._clkmod.enabled = False

    def enable_dds(self) -> None:
        if not self._nogui:
            for chan in self._channels.values():
                chan.enabled = True
            for btn in self._refsrc_btns:
                btn.setEnabled(True)

    def enable_auxclk(self) -> None:
        self._clkmod.enabled = True

    def set_auxclk_state(self, state: bool) -> None:
        if state:
            self.disable_dds()
            self._twotone_btn.setEnabled(False)
            self.enable_auxclk()
        else:
            self.disable_auxclk()
            self.enable_dds()
            self._twotone_btn.setEnabled(True)
            self._clkmod.initialize_ui()
            try:
                with create_serial(self.ctl_device,
                                   self.baudrate) as ser:
                    self._clkmod.initialize_hw(ser)
            except serial.serialutil.SerialException as se:
                error_dialog = QErrorMessage(self)
                error_dialog.showMessage(str(se))

    @asyncSlot()
    async def phase_sync(self):
        """Synchronize the channel phases.

        This is done as follows:
        1. Set the clear DDS phase accumulator bit (14) in CFR1
           and shift the CFR1 value into the AD9913 buffer for
           both channels.
        2. Pulse the IOUP pin high on both channels simultaneously
           in order to load the buffered CFR1 values. This has the
           effect of holding the DDS phase accumulator at 0.
        3. Clear the clear DDS phase accumulator bit in CFR1
           and shift the CFR1 value into the AD9913 buffer for
           both channels.
        4. Pulse the IOUP pin high on both channels simultaneously
           in order to load the buffered CFR1 values. This has the
           effect of restarting the DDS phase accumulator for both
           channels simultaneously.

        Note that the command to manipulate the IOUP pins assumes
        that these pins are connected to PB7 and PB6.
        """
        cmd1 = ''
        for chan in self._channels.values():
            chan.ctl._ad9913.set_clear_phase_accumulator(True)
            cmd1 += chan.ctl._ad9913.config_cfr1(False)
        cmd1 += 'MB,C0,00,C0,00:'
        cmd2 = ''
        for chan in self._channels.values():
            chan.ctl._ad9913.set_clear_phase_accumulator(False)
            cmd2 += chan.ctl._ad9913.config_cfr1(False)
        cmd2 += 'MB,C0,00,C0,00:'
        try:
            with create_serial(self.ctl_device,
                               self.baudrate) as ser:
                loop = asyncio.get_event_loop()
                await loop.run_in_executor(None, write_cmd, ser, cmd1)
                await loop.run_in_executor(None, write_cmd, ser, cmd2)
        except serial.serialutil.SerialException as se:
            error_dialog = QErrorMessage(self)
            error_dialog.showMessage(str(se))

hardware-config

The DDS clock generator module is ClkMod and is configured with an input reference frequency of 10 MHz. The B5 microcontroller pin is used to control the reference source using the Ref pin of ClkMod. CH1 and CH2 are the two DDS signal generator modules.

Table 1: Clock generator reference source control.

ClkMod Ref

Reference Source

0

External

1

Internal

Note that the clock generator module Ref pin is pulled high by 10K resistor and therefore the default clock reference will be the on-board 10 MHz source.

The default clock generator configuration is:

Table 2: Default clock generator configuration

Channel property

Value

Channel state (state)

ACTIVE

Output frequency

250 MHz

Output mode (mode)

CMOS_BOTH_ACTIVE

Drive strength (drive)

WEAK

CMOS polarity (polarity)

COMM_POS

Channel source (source)

PLL

CLKMODULE_NAME: str = 'clkmod'
DDSCH1_NAME: str = 'Chan 1'
DDSCH2_NAME: str = 'Chan 2'

CLKMODULE_HWCONF: Dict = { 'cs': 'C7', 'lockdetect': 'B4', 'reset': None,
                           'fref': 10.0, 'refselect': 'B5'}
DDSCH1_HWCONF: Dict = {'cs': 'C6', 'io_update': 'B6', 'reset': 'C4',
                       'ps0': 'D4', 'ps1': 'D5', 'ps2': 'D6',
                       'board_model': '27dB-RF'}
DDSCH2_HWCONF: Dict = {'cs': 'C5', 'io_update': 'B7', 'reset': 'C4',
                       'ps0': 'D0', 'ps1': 'D1', 'ps2': 'D2',
                       'board_model': '20dB'}
DDS_RESET: str = 'C4'
CHANNELS_HWCONF: List[Dict] = [ DDSCH1_HWCONF, DDSCH2_HWCONF ]

DEFAULT_CLK_FREQ: float = 250.0
DEFAULT_CHAN_MODE: ad9552.OutputMode = ad9552.OutputMode.CMOS_BOTH_ACTIVE
DEFAULT_CHAN_STATE: ad9552.OutputState = ad9552.OutputState.ACTIVE
DEFAULT_CHAN_DRIVE: ad9552.DriveStrength = ad9552.DriveStrength.WEAK
DEFAULT_CHAN_POLARIY: ad9552.CmosPolarity = ad9552.CmosPolarity.COMM_POS
DEFAULT_CHAN_SOURCE: ad9552.SourceControl = ad9552.SourceControl.PLL

AUX_CHAN_MODE: ad9552.OutputMode = ad9552.OutputMode.LVPECL
AUX_CHAN_STATE: ad9552.OutputState = ad9552.OutputState.POWERED_DOWN
AUX_CHAN_DRIVE: ad9552.DriveStrength = ad9552.DriveStrength.STRONG
AUX_CHAN_POLARIY: ad9552.CmosPolarity = ad9552.CmosPolarity.DIFF_POS
AUX_CHAN_SOURCE: ad9552.SourceControl = ad9552.SourceControl.PLL

DEFAULT_CHAN_CONFIG: Dict = {
    'ui': False,
    'mode': DEFAULT_CHAN_MODE,
    'state': DEFAULT_CHAN_STATE,
    'drive': DEFAULT_CHAN_DRIVE,
    'polarity': DEFAULT_CHAN_POLARIY,
    'source': DEFAULT_CHAN_SOURCE
}
AUX_CHAN_CONFIG: Dict = {
    'ui': True,
    'mode': AUX_CHAN_MODE,
    'state': AUX_CHAN_STATE,
    'drive': AUX_CHAN_DRIVE,
    'polarity': AUX_CHAN_POLARIY,
    'source': AUX_CHAN_SOURCE
}
DEFAULT_CLK_CONFIG: Dict = {
    'freq': DEFAULT_CLK_FREQ,
    'channels': {
        '1': { 'label': '1', **DEFAULT_CHAN_CONFIG },
        '2': { 'label': '2', **AUX_CHAN_CONFIG }
    }
}

DEFAULT_DDS_FREQ = 10.0
DEFAULT_DDS_PHASE = 0.0
DEFAULT_DDS_LEVEL = 512

DEFAULT_DDS_CONFIG: Dict = {
    'freq':      DEFAULT_DDS_FREQ,
    'ph':        DEFAULT_DDS_PHASE,
    'lvl':       DEFAULT_DDS_LEVEL,
    'lvl_units': DDSChan.LVL_UNITS.DBM,
    'pmod':      False,
    'state':     ad9913.POWER_DOWN,
    'sweep': {
        'type':     ad9913.SweepType.FREQUENCY,
        'start':    0.5,
        'end':      5.0,
        'ramp':     ad9913.SweepRampType.SWEEP_OFF,
        'delta':    [0.1, 0.1],   # MHz per step
        'rate':     [10.0, 10.0], # microsecs per step
        'dwell':    False,
        'trigsrc':  ad9913.SweepTriggerSource.REGISTER,
        'trigtype': ad9913.SweepTriggerType.EDGE_TRIGGER
    },
    'profiles': [[0.5, 0.0], [1.0, 0.0], [2.0, 0.0], [5.0, 0.0],
                 [10.0, 0.0], [20.0, 0.0], [50.0, 0.0], [100.0, 0.0]],
    'selected_profile': 0,
    'profile_type': ad9913.SweepType.FREQUENCY
}

DEFAULT_APP_CONFIG: Dict = {
    'clk': DEFAULT_CLK_CONFIG,
    'channels': {
        DDSCH1_NAME: { 'label': DDSCH1_NAME, **DEFAULT_DDS_CONFIG },
        DDSCH2_NAME: { 'label': DDSCH2_NAME, **DEFAULT_DDS_CONFIG }
    },
    'ddsreset': DDS_RESET,
    'refsrc': ad9552.ReferenceSource.INTERNAL
}

build-app-ui

def build_ui(self):
    """Build the on-screen UI for the DDS signal generator app."""

    vbox = QVBoxLayout()

    hbox = QHBoxLayout()
    tty_combo = QComboBox()
    tty_combo.currentIndexChanged.connect(
        lambda idx, w=tty_combo: self.tty_changed(w, idx))
    tty_combo.addItems(self.serial_ports)
    line_edit = QLineEdit()
    tty_combo.setLineEdit(line_edit)
    hbox.addWidget(QLabel("Control Port:"))
    hbox.addWidget(tty_combo)
    hbox.addStretch(1)
    configuration_btn = QPushButton("Configuration")
    config_menu = QMenu()
    config_menu.addAction('Save', self.save_config)
    config_menu.addAction('Load', self.open_config)
    configuration_btn.setMenu(config_menu)
    hbox.addWidget(configuration_btn)
    initialize_btn = QPushButton("Initialize")
    initialize_btn.clicked.connect(self.initialize)
    hbox.addWidget(initialize_btn)

    vbox.addLayout(hbox)

    hbox2 = QHBoxLayout()
    hbox2.addWidget(QLabel("Ref. Source:"))
    self.refsrcGroup = QButtonGroup(hbox2)
    rb = QRadioButton("Internal")
    self.refsrcGroup.addButton(rb)
    rb.src = ad9552.ReferenceSource.INTERNAL
    rb.setChecked(True)
    hbox2.addWidget(rb)
    rb2 = QRadioButton("External (10MHz)")
    self.refsrcGroup.addButton(rb2)
    rb2.src = ad9552.ReferenceSource.EXTERNAL
    self.refsrcGroup.buttonClicked.connect(self.set_refsrc)
    hbox2.addWidget(rb2)
    self._refsrc_btns = [rb, rb2]
    hbox2.addStretch()
    vbox.addLayout(hbox2)

    hbox3 = QHBoxLayout()
    hbox3.addWidget(self._channels[DDSApp.DDSCH1_NAME].build_ui())
    hbox3.addWidget(self._channels[DDSApp.DDSCH2_NAME].build_ui())
    hbox3.addWidget(self._clkmod.build_ui())
    self._clkmod._group_box.setTitle('Aux. Clk')

    vbox.addLayout(hbox3)

    hbox5 = QHBoxLayout()
    self._twotone_btn = QPushButton("Two Tone...")
    self._twotone_btn.clicked.connect(self.show_twotone)
    hbox5.addWidget(self._twotone_btn)
    self._sync_btn = QPushButton("Sync")
    self._sync_btn.clicked.connect(self.phase_sync)
    hbox5.addWidget(self._sync_btn)
    hbox5.addStretch()

    fbox3 = QFormLayout()
    auxclk_cb = QCheckBox("")
    auxclk_cb.setChecked(False)
    auxclk_cb.stateChanged.connect(self.set_auxclk_state)
    fbox3.addRow(QLabel("Enable Aux. Clk:"), auxclk_cb)
    hbox5.addLayout(fbox3)
    hbox5.addStretch()

    vbox.addLayout(hbox5)

    self._widget.setLayout(vbox)
    self.disable_dds()
    self.disable_auxclk()

    self._widget.setGeometry(300, 300, 700, 150)
    self._widget.setWindowTitle('DDS Signal Generator')

def show(self):
    self._widget.show()

twotone-dialog

Two tone UI

Figure 1: DDSGen two tone UI controls

class TwoToneDialog(QDialog):
    """
    """
    def __init__(self,
                 ddsApp: 'DDSApp',
                 dds_chans: Dict,
                 initial_level: int = 512,
                 parent: QWidget = None):
        QDialog.__init__(self, parent)

        self._app: 'DDSApp' = ddsApp
        self._dds_chans: Dict = dds_chans
        self._lower_freq: float = 10.0
        self._lvl: int = initial_level
        self._upper_freq: float = 11.0
        self._centre_freq: float = 10.5
        self._span_freq: float = 1.0
        self._lowerf_box: Optional[QDoubleSpinBox] = None
        self._upperf_box: Optional[QDoubleSpinBox] = None
        self._centref_box: Optional[QDoubleSpinBox] = None
        self._spanf_box: Optional[QDoubleSpinBox] = None

        self.setWindowTitle("Two Tone Settings")
        self.build_ui()

    @property
    def lower_freq(self) -> float:
        return self._lower_freq

    def set_lower_freq(self, f: float) -> None:
        self._lower_freq = f
        if self._upperf_box:
            self._upperf_box.setMinimum(f)
        self.update_centre_span()

    @property
    def level(self) -> int:
        return self._lvl

    def set_level(self, l: int) -> None:
        self._lvl = l

    @property
    def millivolts(self) -> float:
        chan = list(self._dds_chans.values())[0]
        return chan.ctl.level_to_millivolts(self._lvl)

    def set_millivolts(self, mv: float) -> None:
        chan = list(self._dds_chans.values())[0]
        self._lvl = chan.ctl.millivolts_to_level(mv)

    @property
    def dbm(self) -> float:
        chan = list(self._dds_chans.values())[0]
        return chan.ctl.level_to_dbm(self._lvl)

    def set_dbm(self, d: float) -> None:
        chan = list(self._dds_chans.values())[0]
        self._lvl = chan.ctl.dbm_to_level(d)

    @property
    def upper_freq(self) -> float:
        return self._upper_freq

    def set_upper_freq(self, f: float) -> None:
        self._upper_freq = f
        if self._lowerf_box:
            self._lowerf_box.setMaximum(f)
        self.update_centre_span()

    def update_centre(self) -> None:
        if self._centref_box:
            self._centre_freq = self._lower_freq + (self._span_freq/2.0)
            self._centref_box.disconnect()
            self._centref_box.setValue(self._centre_freq)
            self._centref_box.valueChanged.connect(self.set_centre_freq)

    def update_span(self) -> None:
        if self._spanf_box:
            self._span_freq = self._upper_freq - self._lower_freq
            self._spanf_box.disconnect()
            self._spanf_box.setValue(self._span_freq)
            self._spanf_box.valueChanged.connect(self.set_span_freq)

    def update_centre_span(self) -> None:
        self.update_centre()
        self.update_span()

    @property
    def centre_freq(self) -> float:
        return self._centre_freq

    def set_centre_freq(self, f: float) -> None:
        self._centre_freq = f
        self.update_lower_upper()

    @property
    def span_freq(self) -> float:
        return self._span_freq

    def set_span_freq(self, f: float):
        self._span_freq = f
        self.update_lower_upper()
        self.update_centre()

    def update_lower_upper(self) -> None:
        if self._lowerf_box:
            self._lower_freq = self._centre_freq - (self._span_freq/2.0)
            self._upper_freq = self._centre_freq + (self._span_freq/2.0)
            self._lowerf_box.disconnect()
            self._upperf_box.disconnect()
            self._lowerf_box.setValue(self._lower_freq)
            self._lowerf_box.setMaximum(self._upper_freq)
            self._upperf_box.setValue(self._upper_freq)
            self._upperf_box.setMinimum(self._lower_freq)
            self._lowerf_box.valueChanged.connect(self.set_lower_freq)
            self._upperf_box.valueChanged.connect(self.set_upper_freq)

    def build_ui(self) -> None:
        vbox = QVBoxLayout()

        hbox1 = QHBoxLayout()
        fbox1 = QFormLayout()
        self._lowerf_box = QDoubleSpinBox()
        self._lowerf_box.setRange(DDSChan.MIN_FREQUENCY, self.upper_freq)
        self._lowerf_box.setDecimals(5)
        self._lowerf_box.setValue(self.lower_freq)
        self._lowerf_box.setSuffix(DDSChan.FREQ_SUFFIX)
        self._lowerf_box.valueChanged.connect(self.set_lower_freq)
        fbox1.addRow(QLabel("Lower Freq.:"), self._lowerf_box)

        self._centref_box = QDoubleSpinBox()
        self._centref_box.setRange(DDSChan.MIN_FREQUENCY, DDSChan.MAX_FREQUENCY)
        self._centref_box.setDecimals(5)
        self._centref_box.setValue(self.centre_freq)
        self._centref_box.setSuffix(DDSChan.FREQ_SUFFIX)
        self._centref_box.valueChanged.connect(self.set_centre_freq)
        fbox1.addRow(QLabel("Centre Freq.:"), self._centref_box)

        hbox2 = QHBoxLayout()
        self._lvl_box = QDoubleSpinBox()
        # Assume that DDS channels have identical configuration
        self._lvl_box.setRange(*list(self._dds_chans.values())[0].dbm_range)
        self._lvl_box.setDecimals(1)
        self._lvl_box.setSuffix(' dBm')
        self._lvl_box.setValue(self.dbm)
        self._lvl_box.valueChanged.connect(self.set_dbm)
        hbox2.addWidget(self._lvl_box)
        level_units_btn = DDSChan.level_units_control(self, self)
        hbox2.addWidget(level_units_btn)
        fbox1.addRow(QLabel("Level:"), hbox2)

        fbox2 = QFormLayout()
        self._upperf_box = QDoubleSpinBox()
        self._upperf_box.setRange(self.lower_freq, DDSChan.MAX_FREQUENCY)
        self._upperf_box.setDecimals(5)
        self._upperf_box.setValue(self.upper_freq)
        self._upperf_box.setSuffix(DDSChan.FREQ_SUFFIX)
        self._upperf_box.valueChanged.connect(self.set_upper_freq)
        fbox2.addRow(QLabel("Upper Freq.:"), self._upperf_box)

        self._spanf_box = QDoubleSpinBox()
        self._spanf_box.setRange(DDSChan.MIN_FREQUENCY, DDSChan.MAX_FREQUENCY)
        self._spanf_box.setDecimals(5)
        self._spanf_box.setValue(self.span_freq)
        self._spanf_box.setSuffix(DDSChan.FREQ_SUFFIX)
        self._spanf_box.valueChanged.connect(self.set_span_freq)
        fbox2.addRow(QLabel("Span.:"), self._spanf_box)

        hbox1.addLayout(fbox1)
        hbox1.addLayout(fbox2)
        hbox1.addStretch()
        vbox.addLayout(hbox1)

        config_btn = QPushButton("Configure")
        bbox = QDialogButtonBox(QDialogButtonBox.Apply | QDialogButtonBox.Close)
        bbox.addButton(config_btn, QDialogButtonBox.ActionRole)
        vbox.addWidget(bbox)
        bbox.rejected.connect(self.reject)
        bbox.button(QDialogButtonBox.Apply).clicked.connect(self.apply)
        config_btn.clicked.connect(self.configure)

        self.setLayout(vbox)

    def set_chan_level(self, chan: DDSChan) -> None:
        if chan.level_units == DDSChan.LVL_UNITS.DAC:
            chan._lvl_box.setValue(self.level)
        elif chan.level_units == DDSChan.LVL_UNITS.MV:
            chan._lvl_box.setValue(self.millivolts)
        else:
            chan._lvl_box.setValue(self.dbm)

    def apply(self) -> None:
        lower_chan = self._app._channels[DDSApp.DDSCH1_NAME]
        upper_chan = self._app._channels[DDSApp.DDSCH2_NAME]
        lower_chan._freq_box.setValue(self.lower_freq)
        upper_chan._freq_box.setValue(self.upper_freq)
        self.set_chan_level(lower_chan)
        self.set_chan_level(upper_chan)

    @asyncSlot()
    async def configure(self):
        self.apply()
        loop = asyncio.get_event_loop()
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            for chan in self._app._channels.values():
                await loop.run_in_executor(None, chan.configure_hw, ser)

    def reject(self) -> None:
        QDialog.reject(self)

    def set_dac_units(self) -> None:
        self._lvl_box.disconnect()
        self._lvl_box.setRange(*DDSChan.DAC_CODE_RANGE)
        self._lvl_box.setDecimals(0)
        self._lvl_box.setSuffix('')
        self._lvl_box.setValue(self.level)
        self._lvl_box.valueChanged.connect(self.set_level)

    def set_millivolt_units(self) -> None:
        self._lvl_box.disconnect()
        self._lvl_box.setRange(*list(self._dds_chans.values())[0].millivolts_range)
        self._lvl_box.setDecimals(1)
        self._lvl_box.setSuffix(' mV')
        self._lvl_box.setValue(self.millivolts)
        self._lvl_box.valueChanged.connect(self.set_millivolts)

    def set_dbm_units(self) -> None:
        self._lvl_box.disconnect()
        self._lvl_box.setRange(*list(self._dds_chans.values())[0].dbm_range)
        self._lvl_box.setDecimals(1)
        self._lvl_box.setSuffix(' dBm')
        self._lvl_box.setValue(self.dbm)
        self._lvl_box.valueChanged.connect(self.set_dbm)

initialize

@asyncSlot()
async def initialize(self):
    try:
        self.initialize_ui()
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.initialize_hw)
    except serial.serialutil.SerialException as se:
        error_dialog = QErrorMessage(self)
        error_dialog.showMessage(str(se))
    finally:
        self.enable_dds()

def initialize_ui(self) -> None:
    self._clkmod.initialize_ui()
    for chan in self._channels.values():
        chan.initialize_ui()

def initialize_hw(self) -> None:
    with create_serial(self.ctl_device,
                       self.baudrate) as ser:
        if self._nogui is False:
            self._clkmod.initialize_hw(ser)
            for chan in self._channels.values():
                chan.initialize_hw(ser)
        else:
            self.clk_controller.initialize(ser)
            for ctl in self.dds_controllers.values():
                ctl.initialize(ser)

def reset_dds(self) -> None:
    try:
        for chan in self._channels.values():
            chan.chip_reset()
        with create_serial(self.ctl_device,
                           self.baudrate) as ser:
            cmd = 'H{}:L{}'.format(self._ddsreset, self._ddsreset)
            write_cmd(ser, cmd)
    except serial.serialutil.SerialException as se:
        error_dialog = QErrorMessage(self)
        error_dialog.showMessage(str(se))

Upon assertion of RESET, the phase accumulator is cleared (i.e., 0-phase) and is stalled (i.e., not accumulating). At de-assertion of reset, the accumulator is free to begin accumulating, which starts the DDS sinusoidal generation process. So, the onset of the output sinusoid begins when the accumulator starts accumulating. Furthermore the sinusoidal generation process starts at 0-phase (as the accumulator starts off cleared).

app-configuration

def save_config(self) -> None:
    config = self.dump_config()
    filter_str = "JSON Configuration (*.json)"
    def_filepath = '.'
    name, selfilter = QFileDialog.getSaveFileName(self, 'Save configuration',
                                                  def_filepath, filter_str)
    output_file = str(name).strip()
    if len(output_file) == 0:
        return
    with open(output_file, 'w') as fd:
        json.dump(config, fd)

def open_config(self) -> None:
    filter_str = "JSON Configuration (*.json)"
    def_filepath = '.'
    names, selfilter = QFileDialog.getOpenFileNames(self, 'Load Configuration',
                                                    def_filepath, filter_str)
    if len(names):
        input_file = names[0]
        with open(input_file, 'r') as fd:
            config = json.load(fd)
        self.load_config(config)

def dump_config(self) -> Dict:
    chan_config = {}
    for chan_id, chan in self._channels.items():
        chan_config[chan_id] = { 'label': chan.label, **chan.dump_config()}
    config = {
        'clk': self._clkmod.dump_config(),
        'channels': chan_config,
        'ddsreset': self._ddsreset,
        'refsrc': self._clkmod.refsrc
    }
    return config

def load_config(self, config: Dict) -> None:
    self._clkmod.load_config(config['clk'])
    self._ddsreset = config['ddsreset']
    self._clkmod.refsrc = config['refsrc']
    self._clkmod.configure_ui(config['clk'])
    for chan_id, chan in self._channels.items():
        chan_config = config['channels'][chan_id]
        chan.load_config(chan_config)
        chan.configure_ui(chan_config)

ddsgen-service

The Python remote procedure call framework RPyC is used to create a network service which makes the functionality of the ddsgen app available to other clients.

The DDSGenService implements the functionality which is accessed via RPyC.

class DDSGenService(rpyc.Service):

    def __init__(self, app):
        super().__init__()
        self._app = app

    def initialize(self) -> None:
        """Initialize the signal generator hardware and software.

        >>> import rpyc
        >>> ddsgen = rpyc.connect("127.0.0.1", 18862)
        >>> ddsgen.root.initialize()

        """
        if not self._app._nogui:
            self._app.initialize_ui()
        self._app.initialize_hw()
        self._app.enable_dds()

    @property
    def dds_controllers(self) -> Dict[str, AD9913Controller]:
        """A dictionary containing the signal generator DDS controllers.

        The controllers are instances of AD9913Controller keyed using
        the channel identifiers 'Chan 1' and 'Chan 2' respectively.

        >>> import rpyc
        >>> ddsgen = rpyc.connect("127.0.0.1", 18862)
        >>> ddsgen.root.initialize()
        >>> dds1 = ddsgen.root.dds_controllers['Chan 1']
        >>> dds1
        <rfblocks.ad9913_controller.AD9913Controller object at 0x126f8b4c0>
        >>> dds2 = ddsgen.root.dds_controllers['Chan 2']

        """
        return self._app.dds_controllers

    @property
    def clk_controller(self) -> AD9552Controller:
        """The controller for the DDS reference clock.

        An instance of AD9552Controller.

        >>> import rpyc
        >>> ddsgen = rpyc.connect("127.0.0.1", 18862)
        >>> ddsgen.root.initialize()
        >>> refclk = ddsgen.root.clk_controller

        """
        return self._app.clk_controller

    def configure(self, ctl_id: str) -> None:
        """Configure AD9913 DDS hardware registers for a specified channel.

        :param ctl_id: A channel id.  This will be one of "Chan 1" or "Chan 2".
        :type ctl_id: str

        >>> import rpyc
        >>> ddsgen = rpyc.connect("127.0.0.1", 18862)
        >>> ddsgen.root.initialize()
        >>> dds1 = ddsgen.root.dds_controllers['Chan 1']
        >>> dds1.freq = 25.0
        >>> dds1.dbm = -10.0
        >>> ddsgen.root.configure('Chan 1')

        """
        ctl = self._app.dds_controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            ctl.configure(ser)

    def configure_sweep(self, ctl_id: str) -> None:
        """Configure AD9913 DDS sweep registers for a specified channel.

        :param ctl_id: A channel id.  This will be one of "Chan 1" or "Chan 2".
        :type ctl_id: str
        """
        ctl = self._app.dds_controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            ctl.configure_sweep(ser)

    def update_profile(self, ctl_id: str, profnum: int) -> None:
        """Update AD9913 DDS profile registers for a specified channel and profile.

        :param ctl_id: A channel id.  This will be one of "Chan 1" or "Chan 2".
        :type ctl_id: str
        :param profnum: The AD9913 profile register to update.
        :type profnum: An integer in the range 0 to 7.
        """
        ctl = self._app.dds_controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            ctl.update_profile(ser, profnum)

    def configure_profile(self, ctl_id: str) -> None:
        """
        """
        ctl = self._app.dds_controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            ctl.configure_profile(ser)

    def start_sweep(self, ctl_id: str) -> None:
        """Start an AD9913 sweep operation for a specified channel.

        :param ctl_id: A channel id.  This will be one of "Chan 1" or "Chan 2".
        :type ctl_id: str
        """
        ctl = self._app.dds_controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            ctl.start_sweep(ser)

    def stop_sweep(self, ctl_id: str) -> None:
        """Stop an AD9913 sweep operation.

        :param ctl_id: A channel id.  This will be one of "Chan 1" or "Chan 2".
        :type ctl_id: str
        """
        ctl = self._app.dds_controllers[ctl_id]
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            ctl.stop_sweep(ser)

    def set_pmod(self, ctl_id: str, state: bool) -> None:
        """Set the DDS programmable modulus state.

        :param ctl_id: A channel id.  This will be one of "Chan 1" or "Chan 2".
        :type ctl_id: str
        :param state: Set to True in order to enable progammable modulus.
            False to disable it.
        :type state: bool

        Note that :py:meth:`configure` must be invoked in order to
        update the DDS hardware
        """
        ctl = self._app.dds_controllers[ctl_id]
        ctl.pmod = state
        if state:
            # Disable direct switch mode when enabling
            # programmable modulus
            ctl.set_direct_switch_enabled = False


class RPyCServer(QObject):

    finished = pyqtSignal()

    def __init__(self, serviceInst, host, port):
        super().__init__()
        self._serviceInst = serviceInst
        self._host = host
        self._port = port

    def run(self):
        print("DDSGen rpyc service on {}:{}".format(self._host, self._port))
        self._server = ThreadedServer(
            self._serviceInst,
            hostname = self._host,
            port = self._port,
            protocol_config = {
                'allow_all_attrs': True,
                'allow_setattr': True,
                'allow_pickle': True})
        self._server.start()
        self.finished.emit()

Imports

from typing import (
    Optional, List, Dict
)
import sys
import copy
from math import sqrt, log10
from enum import Enum
import signal
import asyncio
from argparse import ArgumentParser
import serial
import json
from serial.tools.list_ports import comports

from rfblocks import (
    ad9552, AD9552Controller,
    ad9913, AD9913Controller,
    create_serial, write_cmd, query_cmd,
    DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL
)

from qtrfblocks import (
    ClkModule, DDSChan
)

from qasync import (
    QEventLoop, QThreadExecutor, asyncSlot, asyncClose
)

from PyQt5.QtWidgets import (
    QWidget, QLabel, QAbstractSpinBox, QDoubleSpinBox, QVBoxLayout,
    QLineEdit, QHBoxLayout, QGroupBox, QMainWindow, QComboBox,
    QCheckBox, QPushButton, QRadioButton, QButtonGroup, QMessageBox,
    QFormLayout, QErrorMessage, QApplication, QScrollArea,
    QMenu, QFileDialog, QSpinBox, QDialog, QDialogButtonBox,
    QAction, QActionGroup, QAbstractButton
)

from PyQt5.QtCore import (
    Qt, QCoreApplication, QObject, QThread, pyqtSignal
)

import rpyc
from rpyc.utils.server import ThreadedServer