Open RF Prototyping

pwrmeter.py

This document describes an app for controlling the RF power meter reference design. When executed, the script displays a GUI window which allows the power meter to be controlled via it's USB connection.

Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.

The Script

Generate the script using C-c C-v t.

# Generated from pwrmeter.org
#

<<imports>>

<<app-class>>

<<pwrmeter-service>>

<<main-func>>

if __name__ == '__main__':
    main()

The main Function

The asyncio framework is used to coordinate communications over the serial device without blocking user interface events. For this purpose we make use of QEventLoop and QThreadExecutor from the asyncqt package.

The event loop is equipped with an instance of QThreadExecutor which executes a single thread. The idea here is to ensure that requests to the ECApp serial device will execute one by one in a manner which will not block the operation of the Qt event loop.

def main():
    global server_thread

    <<process-cmdline-args>>

    app = QApplication(sys.argv)
    loop = QEventLoop(app)
    loop.set_default_executor(QThreadExecutor(1))
    asyncio.set_event_loop(loop)

    pwrmeter_app = PwrMeterApp(
        chan_cspins = [args.chan0_cspin, args.chan1_cspin],
        atten_lepins = [args.atten0_lepin, args.atten1_lepin],
        serial_device = args.device,
        baudrate = args.baudrate,
        headless = args.nogui)

    if not args.nogui:
        pwrmeter_app.show()

    server_thread = QThread()
    server = RPyCServer(PwrMeterService(pwrmeter_app),
                        args.ipaddr,
                        args.port)
    server.moveToThread(server_thread)
    server_thread.started.connect(server.run)
    server.finished.connect(server_thread.quit)
    server.finished.connect(server.deleteLater)
    server_thread.finished.connect(server_thread.deleteLater)
    server_thread.start()

    with loop:
        loop.set_debug(True)
        sys.exit(loop.run_forever())

process-cmdline-args

Command line arguments allow the specification of a serial device and baud rate for the purposes of testing.

defaultBaud = 0

parser = ArgumentParser(description=
                        '''An RF power meter.''')

parser.add_argument(
    "--nogui", action='store_true',
    help="Disable GUI and run 'headless'")
parser.add_argument(
    "-d", "--device", default=None,
    help="The hardware serial device")
parser.add_argument(
    "-b", "--baudrate", default=defaultBaud, type=int,
    help="Baud rate (default: {})".format(defaultBaud))
parser.add_argument(
    "-A", "--ipaddr", default="127.0.0.1",
    help="IP address for to bind the RPyC server instance")
parser.add_argument(
    "-P", "--port", default=18863, type=int,
    help="TCP port for the RPyC server instance")
parser.add_argument(
    "--chan0_cspin", default='D0',
    help="CS pin for Chan0 power detector. Default: D0")
parser.add_argument(
    "--chan1_cspin", default='D1',
    help="CS pin for Chan1 power detector. Default: D1")
parser.add_argument(
    "--atten0_lepin", default=None,
    help="LE pin for Chan0 step attenuator. Default: no attenuator")
parser.add_argument(
    "--atten1_lepin", default=None,
    help="LE pin for Chan1 step attenuator. Default: no attenuator")
args = parser.parse_args()

app-class

The application class encapsulates the instrument functionality including the on-screen representation and controls.

The power meter consists of two power detectors, one on each of the channels '0' and '1'. Optionally, the detectors may be equipped with step attenuators. The presence of a step attenuator is signalled by the specification of a LE pin via the --atten0_lepin or --atten1_lepin.

class PwrMeterApp(QMainWindow):

    CALSTATUS_INCAL = 0
    CALSTATUS_ESTIMATED = 1
    CALSTATUS_UNCAL = 2

    CALSTATUS_STR = {
        CALSTATUS_INCAL: 'INCAL',
        CALSTATUS_ESTIMATED: 'ESTIMATED',
        CALSTATUS_UNCAL: 'UNCAL'}
    CALSTATUS_COLOURS = {
        CALSTATUS_INCAL: 'black',
        CALSTATUS_ESTIMATED: 'orange',
        CALSTATUS_UNCAL: 'red'
    }

    calstatus_changed = pyqtSignal(int, int)

    def __init__(self,
                 chan_cspins: List = None,
                 atten_lepins: List = None,
                 serial_device: Optional[str] = None,
                 baudrate: int = 0,
                 headless: bool = False) -> None:
        """
        """
        super().__init__()
        self._nogui: bool = headless
        self._detectors: List[PwrDetectorController] = [
            PwrDetectorController('0', LogDetector(chan_cspins[0])),
            PwrDetectorController('1', LogDetector(chan_cspins[1]))]
        self._attenuators: List[Optional[PE43711Controller]] = [None, None]
        if atten_lepins:
            for att_id in[0, 1]:
                if atten_lepins[att_id]:
                    att_ctl = PE43711Controller(
                        str(att_id), pe43711(atten_lepins[att_id]))
                    att_ctl.attenuation_changed.connect(
                        self.attenuation_changed)
                    self._attenuators[att_id] = att_ctl
        self._continuous: bool = False
        self._acquiring: bool = False
        self._shuttingdown: bool = False
        self._acquire_group_box: Optional[QGroupBox] = None
        self._ctl_device: str = serial_device
        self._baudrate: int = baudrate
        if baudrate == 0:
            self._baudrate = DEFAULT_BAUDRATE
        self._calstatus: List[int] = [
            PwrMeterApp.CALSTATUS_UNCAL, PwrMeterApp.CALSTATUS_UNCAL]
        for det_id, ctl in enumerate(self._detectors):
            ctl.caldata_changed.connect(partial(self.caldata_updated, det_id))
        self.calstatus_changed.connect(self.update_calstatus)

        if not headless:
            self._detector_displays: List[Optional[PowerDetector]] = [
                PowerDetector(ctl) for ctl in self.detectors]
            self._attenuator_displays: List[Optional[StepAttenuator]] = [None, None]
            for att_id in [0, 1]:
                att = self.attenuators[att_id]
                if att:
                    self._attenuator_displays[att_id] = StepAttenuator(
                        self, att)
            self.init_ui()

    def chan0_power_offset_fn(self, freq):
        """
        """
        return self._coupling + self._attn_insertion_loss

    def chan1_power_offset_fn(self, freq):
        """
        """
        return self._amplifier_gain + self._coupling

    @property
    def ctl_device(self) -> str:
        return self._ctl_device

    @property
    def serial_ports(self) -> List:
        ports = [DEFAULT_SOCKET_URL]
        ports.extend([port.device for port in comports() if port.serial_number])
        if self.ctl_device:
            if self.ctl_device in ports:
                ports.pop(ports.index(self.ctl_device))
            ports.insert(0, self.ctl_device)
        return ports

    @property
    def baudrate(self) -> int:
        return self._baudrate

    @property
    def detectors(self) -> List[PwrDetectorController]:
        """Returns a list containing the power detector controllers.
        """
        return self._detectors

    @property
    def attenuators(self) -> List[Optional[PE43711Controller]]:
        """Returns a list containing the step attenuator controllers.
        """
        return self._attenuators


    @property
    def calstatus(self) -> List[int]:
        """Returns a list containing the status of the detectors.
        """
        return self._calstatus

    @pyqtSlot(object)
    def attenuation_changed(self, att_ctl: PE43711Controller) -> None:
        """Device attenuation setting has changed.
        """
        det_id = int(att_ctl._controller_id)
        ctl: PwrDetectorController = self.detectors[det_id]

        # Check the calibration attenuation setting.
        # If there is no calibration attenuation setting
        # then just return without doing anything
        try:
            cal_atten = ctl.cal_data_conditions[
                PwrDetectorController.ATTEN_CAL_CONDITION]
        except KeyError:
            # There's no calibration attenuation value so
            # just set the insertion loss offset
            ctl.insertion_loss_offset = att_ctl.attenuation
            return

        # If the current attenuation setting for att_ctl
        # is different from the calibration attenuation
        # then calculate the insertion loss offset.
        loss_offset: float = att_ctl.attenuation - cal_atten
        if ctl.insertion_loss_offset == 0.0 and loss_offset != 0.0:
            # If the current value of the detector
            # controller insertion_loss_offset is zero and
            # the calculated loss offset is non-zero
            # emit a calstatus_changed signal.
            self.calstatus_changed.emit(det_id, PwrMeterApp.CALSTATUS_ESTIMATED)
        elif ctl.insertion_loss_offset != 0.0 and loss_offset == 0.0:
            # Likewise, if the detector controller
            # insertion_loss_offset is non-zero and the
            # calculated loss offset is zero
            # emit a calstatus_changed signal.
            self.calstatus_changed.emit(det_id, PwrMeterApp.CALSTATUS_INCAL)
        # Set the detector controller insertion loss offset
        ctl.insertion_loss_offset = loss_offset

    def disable_chan_controls(self, chan_id: int, f: bool) -> None:
        if chan_id in [0, 1]:
            self._detector_displays[chan_id].disable_controls(f)
            att_display = self._attenuator_displays[chan_id]
            if att_display:
                att_display.disable_controls(f)

    def disable_controls(self, f: bool) -> None:
        for det_id in [0, 1]:
            self.disable_chan_controls(det_id, f)
        if self._acquire_group_box:
            self._acquire_group_box.setDisabled(f)

    def initialize_detectors(self) -> None:
        with create_serial(self.ctl_device,
                           self.baudrate) as ser:
            for ctl in self.detectors:
                ctl.initialize(ser)

    @pyqtSlot()
    def caldata_updated(self, det_id):
        self.update_calstatus(det_id, PwrMeterApp.CALSTATUS_INCAL)

    @pyqtSlot(int, int)
    def update_calstatus(self, det_id: int, status: int):
        """
        """
        self.calstatus[det_id] = status
        if not self._nogui:
            grp: PowerDetector = self._detector_displays[det_id]
            grp._cal_status_lbl.setText(PwrMeterApp.CALSTATUS_STR[status])
            grp._cal_status_lbl.setStyleSheet(
                'QLabel { color: %s }' % PwrMeterApp.CALSTATUS_COLOURS[status])

    @property
    def continuous(self) -> bool:
        return self._continuous

    @property
    def acquiring(self) -> bool:
        return self._acquiring

    def set_continuous(self, c: bool) -> None:
        self._continuous = c

    def set_acquiring(self, a: bool) -> None:
        self._acquiring = a

    @property
    def shuttingdown(self) -> bool:
        return self._shuttingdown

    def shutdown(self, b: bool) -> None:
        self._shuttingdown = b

    <<hwconfig-funcs>>

    <<acquisition-funcs>>

    <<calibration>>

    def init_ui(self) -> None:
        vbox = QVBoxLayout()
        vbox.addWidget(self.create_hwconfig_group())

        hbox = QHBoxLayout()
        for det_id in [0, 1]:
            inner_vbox = QVBoxLayout()
            grp = self._detector_displays[det_id].create_detector_group(
                partial(self.calibrate, det_id))
            inner_vbox.addWidget(grp)
            att_display = self._attenuator_displays[det_id]
            if att_display:
                grp = att_display.create_attenuator_group()
                inner_vbox.addWidget(grp)
            else:
                inner_vbox.addStretch()
            hbox.addLayout(inner_vbox)
        self.disable_controls(True)
        vbox.addLayout(hbox)

        hbox2 = QHBoxLayout()
        grp = self.create_acquisition_control_group()
        hbox2.addWidget(grp)
        grp.setDisabled(True)
        vbox.addLayout(hbox2)

        self.container = QWidget()
        self.container.setLayout(vbox)

        self.setCentralWidget(self.container)

        self.setWindowTitle('Dual Detector Power Meter')

hwconfig-funcs

def create_hwconfig_group(self):
    group_box = QGroupBox("Hardware Config:")
    vbox = QVBoxLayout()
    hbox = QHBoxLayout()
    tty_combo = QComboBox()
    tty_combo.currentIndexChanged.connect(
        lambda idx, w=tty_combo: self.tty_changed(w, idx))
    tty_combo.addItems(self.serial_ports)
    line_edit = QLineEdit()
    line_edit.editingFinished.connect(
        lambda w=line_edit: self.tty_edit_finished(w))
    tty_combo.setLineEdit(line_edit)
    hbox.addWidget(QLabel("Control Port:"))
    hbox.addWidget(tty_combo)
    self.hw_config_btn = QPushButton("Initialize")
    self.hw_config_btn.clicked.connect(self.hw_config)
    hbox.addWidget(self.hw_config_btn)
    hbox.addStretch(1)
    vbox.addLayout(hbox)
    group_box.setLayout(vbox)
    return group_box

def tty_changed(self, combo, idx):
    self._ctl_device = combo.itemText(idx)

def tty_edit_finished(self, line_edit):
    self._ctl_device = line_edit.text()

def showErrorMessage(self, msg):
    error_dialog = QErrorMessage(self)
    error_dialog.setWindowModality(Qt.WindowModal)
    error_dialog.setParent(self, Qt.Sheet)
    error_dialog.setResult(0)
    error_dialog.showMessage(msg)

@asyncSlot()
async def hw_config(self):
    self.hw_config_btn.setEnabled(False)
    control_disable = False
    try:
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self.initialize_detectors)
    except serial.serialutil.SerialException:
        self.showErrorMessage("Bad serial device.")
        control_disable = True
    except DetectorReadError:
        self.showErrorMessage("Error reading detector data.")
        control_disable = True
    except Exception as e:
        print(e)
        control_disable = True
    finally:
        self.hw_config_btn.setEnabled(True)
        self.disable_controls(control_disable)

@asyncClose
async def closeEvent(self, event):
    if self.continuous:
        self.set_acquiring(False)
        self.shutdown(True)
        event.ignore()
    else:
        event.accept()

acquisition-funcs

On screen acquisition controls. Power levels are acquired from the detectors either as a single shot or continuously. Acquisition is initiated using the Acquire button. If continuous acquisition is in operation the acquire button will be re-labeled as the Stop button. During single shot acquisition the Acquire button is disabled until the power level is read.

The asyncio framework is used to manage the multi-threaded operation of the power level acquisition. This is implemented in the <<acquisition-slots>> coroutines. Note that the asyncqt module is used to integrate the Qt event loop into the asyncio event loop. This also necessitates the use of the asyncSlot and asyncClose decorators. The enable_stop_acquisition and enable_start_acquisition functions are used to manage the state of the Acquire~/~Stop button. During continuous acquisition the detector frequency controls and the hardware Initialize button are disabled.

<<acquisition-controls>>

<<acquire-pwr-levels>>

<<acquire-control>>

<<acquisition-slots>>
def create_acquisition_control_group(self):
    self._acquire_group_box = QGroupBox("Acquisition Control:")
    vbox = QVBoxLayout()
    hbox = QHBoxLayout()
    acqGroup = QButtonGroup(hbox)
    rb = QRadioButton("Single")
    acqGroup.addButton(rb)
    rb.setChecked(True)
    hbox.addWidget(rb)
    rb = QRadioButton("Continuous")
    acqGroup.addButton(rb)
    hbox.addWidget(rb)
    rb.toggled.connect(lambda state: self.set_continuous(state))
    hbox.addStretch(1)
    self.acquire_btn = QPushButton("Acquire")
    self.acquire_btn.clicked.connect(self.acquire_pwr)
    hbox.addWidget(self.acquire_btn)
    vbox.addLayout(hbox)
    self._acquire_group_box.setLayout(vbox)
    return self._acquire_group_box

The acquire_pwr and acquire_stop are async coroutines called from the Qt event loop when the Acquire/Stop button is pressed. The acquire_pwr coroutine will invoke the acquire_pwr_levels function inside a separate thread in order to communicate with the detector hardware without blocking the GUI code.

@asyncSlot()
async def acquire_pwr(self):
    loop = asyncio.get_event_loop()
    if self.continuous:
        self.enable_stop_acquisition()
        try:
            await loop.run_in_executor(None, self.acquire_pwr_levels)
            if self.shuttingdown:
                QApplication.quit()
        except serial.serialutil.SerialException as se:
            print(se)
            self.showErrorMessage("Bad serial device.")
        except Exception as e:
            print(e)
        finally:
            self.enable_start_acquisition()
    else:
        self.acquire_btn.setEnabled(False)
        try:
            await loop.run_in_executor(None, self.acquire_pwr_levels)
        except serial.serialutil.SerialException:
            self.showErrorMessage("Bad serial device.")
        except Exception as e:
            print(e)
        finally:
            self.acquire_btn.setEnabled(True)

@asyncSlot()
async def acquire_stop(self):
    if self.continuous:
        self.set_acquiring(False)
def enable_stop_acquisition(self):
    self.disable_freq_controls()
    self.hw_config_btn.setEnabled(False)
    self.acquire_btn.setText("Stop")
    self.set_acquiring(True)
    try:
        self.acquire_btn.clicked.disconnect()
    except Exception:
        pass
    self.acquire_btn.clicked.connect(self.acquire_stop)

def enable_start_acquisition(self):
    self.acquire_btn.setText("Acquire")
    try:
        self.acquire_btn.clicked.disconnect()
    except Exception:
        pass
    self.acquire_btn.clicked.connect(self.acquire_pwr)
    self.hw_config_btn.setEnabled(True)
    self.enable_freq_controls()

def disable_freq_controls(self):
    for det in self._detector_displays:
        det._freq_box.setEnabled(False)

def enable_freq_controls(self):
    for det in self._detector_displays:
        det._freq_box.setEnabled(True)
def acquire_pwr_levels(self):
    with create_serial(self.ctl_device,
                       self.baudrate) as ser:
        if self.continuous:
            while self.acquiring:
                for ctl in self.detectors:
                    ctl.measure(ser)
        else:
            for ctl in self.detectors:
                ctl.measure(ser)

calibration

The calibrate method is invoked as a callback from the rfblocks.PowerDetector class. A reference of the calibrate method is passed to the PowerDetector instances when building the PwrMeterApp UI - specifically, when creating the on-screen representations of the PowerDetector instances via create_detector_group.

The calibrate method is responsible for the actual measurement of the power detector response.

Calibration with the optional attenuator is carried out at the current attenuation level. If the attenuation level is changed after calibration the insertion loss is estimated by adding the difference between the new attenuation level and the attenuation level in effect during calibration. The app status line should display something like 'Estimating Cal' in this situation.

Note that when measurements are made during calibration no corrections should be made. This necessitates setting the detector controller's apply_correction attribute to False before starting the calibration and then restoring the old value after calibration is complete.

def calibrate(self, det_id, src_device, device_id, src_chan,
              srcpwr, start, stop, step):

    print("calibrate: {}, {}, {}".format(src_device, device_id, src_chan))
    mgr = InstrumentMgr()
    sigsrc = mgr.open_instrument(src_device, device_id)
    try:
        sigsrc.chan = src_chan
    except AttributeError:
        pass

    loss = {}
    ctl = self.detectors[det_id]
    old_correct_setting = ctl.apply_correction
    ctl.apply_correction = False
    ctl.calibrating = True
    sigsrc.output = True
    with create_serial(self.ctl_device,
                       self.baudrate) as ser:
        for f in np.arange(start, stop, step):
            # Note that the property assignment here may actually
            # be a rpyc remote operation depending on the selected
            # calibration device.  Since ~f~ (and possibly ~srcpwr~)
            # may be numpy array scalars it is necessary to explicitly
            # cast these to standard Python float values.  If this is
            # not done the remote rpyc process will almost certainly
            # generate strange numpy related exceptions.
            sigsrc.freq = float(f)
            sigsrc.level = float(srcpwr)
            sleep(1.0)
            ctl.freq = f
            ctl.measure(ser)
            loss['{}'.format(f)] = srcpwr - ctl.pwr

    sigsrc.output = False
    ctl.calibrating = False
    ctl.apply_correction = old_correct_setting
    mgr.close_instrument(sigsrc)
    mgr.close()

    # If this detector has a step attenuator associated with it
    # then we must take note of the attenuation setting for which
    # the calibration was carried out. This is done by
    # saving the attenuation setting as a value in the
    # detector controller cal_data_conditions dictionary.
    att = self.attenuators[det_id]
    if att:
        ctl.cal_data_conditions[
            PwrDetectorController.ATTEN_CAL_CONDITION] = att.attenuation
        # Any current loss offset should now be set to 0 since it
        # is now corrected by the calibration
        ctl.insertion_loss_offset = 0.0

    ctl.cal_data = loss
    self.calstatus_changed.emit(det_id, PwrMeterApp.CALSTATUS_INCAL)
    return ''

pwrmeter-service

The Python remote procedure call framework RPyC is used to create a network service which makes the functionality of the pwrmeter app available to other clients.

The PwrMeterService implements the functionality which is accessed via RPyC.

class PwrMeterService(rpyc.Service):

    def __init__(self, app):
        super().__init__()
        self._app = app

    def initialize(self) -> None:
        """Initialize the service app and associated hardware modules.
        """
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            for ctl in self.detectors:
                ctl.initialize(ser)
        if not self._app._nogui:
            self._app.hw_config_btn.setEnabled(True)
            self._app.disable_controls(False)


    @property
    def detectors(self):
        """Return a list of the power detector objects.

        :return: A list of PwrDetectorController instances one for
        each of the power meter channels.
        """
        return self._app.detectors

    @property
    def attenuators(self):
        """Return a list of channel attenuator objects.
        Note that one of more of the objects may be ``None`` depending
        on whether the corresponding channel is actually equipped with
        a step attenuator.

        :return: A list of PE43711Controller instances one for each of
        the power meter channels.
        """
        return self._app.attenuators

    def set_attenuation(self, ctl_id, atten):
        """Set the attenuation for the specified power meter channel.

        :param ctl_id: Power meter channel id.
        :type ctl_id: int
        :param atten: Required attenuation level.
        :type atten: float

        Note that this will only have any effect if the channel is
        actually equipped with a step attenuator.
        """
        att = self.attenuators[ctl_id]
        if att:
            if not self._app._nogui:
                grp = self._app._attenuator_displays[ctl_id]
                if grp:
                    grp._atten_box.setValue(atten)
            else:
                att.attenuation = atten
            with create_serial(self._app.ctl_device,
                               self._app.baudrate) as ser:
                att.configure(ser)

    def detector_enable(self, ctl_id, en):
        """Enable/disable the specified power meter channel.

        :param ctl_id: Power meter channel id.
        :type ctl_id: int
        :param en: True to enable the channel, False disables the channel.
        :type en: bool
        """
        if not self._app._nogui:
            state = Qt.Checked if en else Qt.Unchecked
            self._app._detector_displays[ctl_id].set_enabled(state)
            att_display = self._app._attenuator_displays[ctl_id]
            if att_display is not None:
                att_display.disable_controls(en)
        else:
            ctl = self.detectors[ctl_id].enabled = en

    def calibrate(self, ctl_id, src_dev, device_id, src_chan,
                  srcpwr, startf, stopf, stepf):
        """Carry out a power meter channel calibration.

        :param ctl_id: Power meter channel id.
        :type ctl_id: int
        :param src_dev: Calibration signal source device.
        :type src_dev: str
        :param device_id: Calibration signal source device id.
        :type device_id: str
        :param src_chan: Calibration signal source channel.
        :type src_chan: int
        :param srcpwr: Calibration signal source power level (in dBm).
        :type srcpwr: float
        :param startf: Calibration start frequency (in MHz).
        :type startf: float
        :param stopf: Calibration stop frequency (in MHz).
        :type stopf: float
        :param stepf: Calibration frequency step size (in MHz)
        :type stepf: float

        .. code:: python
            import rpyc
            from tam import DSG815_ID

            pwrmeter = rpyc.connect('127.0.0.1', 18863)
            pwrmeter.root.initialize()
            chan_controllers = pwrmeter.root.detectors
            chan_ctl0 = chan_controllers[0]
            chan_ctl1 = chan_controllers[1]

            # Calibration is a long running process and the calibrate call
            # is synchronous.  The rpyc request timeout should be either
            # disabled or set to a long period so that a timeout isn't triggered
            old_timeout = pwrmeter._config['sync_request_timeout']
            pwrmeter._config['sync_request_timeout'] = None
            status = pwrmeter.root.calibrate(0, 'DSG815', DSG815_ID, 0, 0.0, 100.0, 1500.0, 100.0)
            pwrmeter.root.calstatus[0]   # should return 0

            # Restore the rpyc request timeout value
            pwrmeter._config['sync_request_timeout'] = old_timeout

        .. seealso::

            :py:meth:`calstatus`
        """
        return self._app.calibrate(
            ctl_id, src_dev, device_id, src_chan,
            srcpwr, startf, stopf, stepf)

    def save_caldata(self, ctl_id, cal_data_file):
        """Save channel calibration data to the specified file.

        :param ctl_id: Power meter channel id.
        :type ctl_id: int
        :param cal_data_file: Path to file where cal. data will be saved.
        :type cal_data_file: str
        """
        ctl = self.detectors[ctl_id]
        ctl.save_caldata(cal_data_file)

    def load_caldata(self, ctl_id, cal_data_file):
        """Load channel calibration data from the specified file.

        :param ctl_id: Power meter channel id.
        :type ctl_id: int
        :param cal_data_file: Path to file from which cal. data will be loaded.
        :type cal_data_file: str
        """
        ctl = self.detectors[ctl_id]
        ctl.load_caldata(cal_data_file)

    @property
    def calstatus(self):
        """Return the status of the most recent calibration operation.

        :return: True if the calibration completed successfully,
        False otherwise.

        .. seealso::

            :py:meth:`calibrate`
        """
        return self._app.calstatus

    def measure(self, ctl_id: int) -> None:
        """Take a power measurement on the specified power meter channel.

        :param ctl_id: Power meter channel id.
        :type ctl_id: int

        .. code:: python
            import rpyc

            pwrmeter = rpyc.connect('127.0.0.1', 18863)
            pwrmeter.root.initialize()
            chan_controllers = pwrmeter.root.detectors
            chan_ctl0 = chan_controllers[0]
            chan_ctl1 = chan_controllers[1]

            chan_ctl0.freq = 500.0
            pwrmeter.root.measure(0)
            pwr = chan_ctl0.pwr
            chan_ctl0.apply_correction = True
            pwrmeter.root.measure(0)
            corrected_pwr = chan_ctl0.pwr

            chan_ctl1.freq = 150.0
            pwrmeter.root.measure(1)
            pwr = chan_ctl1.pwr

            pwrmeter.close()
        """
        with create_serial(self._app.ctl_device,
                           self._app.baudrate) as ser:
            self.detectors[ctl_id].measure(ser)


class RPyCServer(QObject):

    finished = pyqtSignal()

    def __init__(self, serviceInst, host, port):
        super().__init__()
        self._serviceInst = serviceInst
        self._host = host
        self._port = port

    def run(self):
        print("PwrMeter rpyc service on {}:{}".format(self._host, self._port))
        self._server = ThreadedServer(
            self._serviceInst,
            hostname = self._host,
            port = self._port,
            protocol_config = {
                'allow_all_attrs': True,
                'allow_setattr': True,
                'allow_pickle': True})
        self._server.start()
        self.finished.emit()

Generating calibration data from S-parameter measurements

In this example S21 measurements of a 12dB attenuator were made using a KC901V network analyzer. Because this is a "1 1/2" port analyzer, impedance mismatches cause a certain amount of ripple in the resulting measured transmission. To compensate for the ripple a cubic function is fitted prior to calculating the transmission at each of the calibration data frequencies. Figure 1 shows the measured transmission and the cubic fit used to estimate the calibration data.

transmission cal

Measured transmission and best fit cubic

The associated output calibration data looks like the following:

{
    "100.0": 11.78, "200.0": 11.77, "300.0": 11.77, "400.0": 11.77,
    "500.0": 11.77, "600.0": 11.78, "700.0": 11.78, "800.0": 11.78,
    "900.0": 11.78, "1000.0": 11.78, "1100.0": 11.79, "1200.0": 11.79,
    "1300.0": 11.79, "1400.0": 11.8, "1500.0": 11.8, "1600.0": 11.8,
    "1700.0": 11.81, "1800.0": 11.81, "1900.0": 11.82, "2000.0": 11.82,
    "2100.0": 11.83, "2200.0": 11.83, "2300.0": 11.84, "2400.0": 11.84,
    "2500.0": 11.85, "2600.0": 11.85, "2700.0": 11.86, "2800.0": 11.87,
    "2900.0": 11.87, "3000.0": 11.88, "3100.0": 11.88, "3200.0": 11.89,
    "3300.0": 11.89, "3400.0": 11.9, "3500.0": 11.9, "3600.0": 11.91,
    "3700.0": 11.91, "3800.0": 11.92, "3900.0": 11.92, "4000.0": 11.93,
    "4100.0": 11.93, "4200.0": 11.93, "4300.0": 11.94, "4400.0": 11.94,
    "4500.0": 11.94, "4600.0": 11.95, "4700.0": 11.95, "4800.0": 11.95,
    "4900.0": 11.95, "5000.0": 11.95, "5100.0": 11.95, "5200.0": 11.95,
    "5300.0": 11.95, "5400.0": 11.95, "5500.0": 11.95, "5600.0": 11.95,
    "5700.0": 11.95, "5800.0": 11.94, "5900.0": 11.94, "6000.0": 11.94
}
from pathlib import Path
import json
import skrf as rf
from skrf import Network, Frequency
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import curve_fit
from dyadic.splot import init_style

def cubic_func(x, a, b, c, d):
    return (((a*x + b)*x + c)*x + d)


freq_range = freqRange
max_freq = float(freqRange.split('-')[-1][:-3])*1e6
dataPath = Path(dataDir)
sparams = Network(str(dataPath / s21file))[freq_range]

popt, perr = curve_fit(cubic_func,
                       sparams.s21[freq_range].f/1e6,
                       sparams.s21[freq_range].s_db[:,0,0])

init_style()
annotation_color = 'darkgrey'
fig = plt.figure(num=None, figsize=(6.0, 4.0), dpi=72)
ax = fig.add_subplot(111)
ax.set_ylim(-15.0, -10.0)
ax.set_xlim(0, max_freq/1e6)
ax.grid(linestyle=':')
ax.grid(which='both', axis='x', linestyle=':')
ax.plot(sparams.s21[freq_range].f/1e6, sparams.s21[freq_range].s_db[:,0,0])
ax.plot(sparams.s21[freq_range].f/1e6,
        [cubic_func(f, *popt) for f in sparams.s21[freq_range].f/1e6],
        linestyle='dashed')

fig.tight_layout()
plt.savefig(outfile)

cal_data = {f"{f:.1f}": float(f"{cubic_func(f, *popt):.2f}")*(-1) for f in np.arange(100, 6050, 100)}
with open(calfile, 'w') as fd:
    json.dump(cal_data, fd)

Imports

from typing import (
    Optional, List, Dict, Tuple
)

from functools import (
    partial
)

import sys
import traceback
from time import sleep
import asyncio
import numpy as np
from argparse import ArgumentParser
import serial
from serial.tools.list_ports import comports

from rfblocks import (
    LogDetector, PwrDetectorController, pe43711, PE43711Controller,
    DetectorReadError, create_serial, write_cmd, query_cmd,
    DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL
)

from qtrfblocks import (
    PowerDetector, StepAttenuator
)

from qasync import (
    QEventLoop, QThreadExecutor, asyncSlot, asyncClose
)

import pyvisa

from tam import (
    InstrumentMgr
)

from PyQt5.QtWidgets import (
    QApplication, QWidget, QMainWindow, QLabel, QPushButton,
    QVBoxLayout, QHBoxLayout, QFormLayout, QComboBox, QGroupBox,
    QErrorMessage, QDoubleSpinBox, QLineEdit, QButtonGroup, QRadioButton,
    QCheckBox
)

from PyQt5.QtCore import (
    Qt, QObject, QThread, pyqtSignal, pyqtSlot
)

import rpyc
from rpyc.utils.server import ThreadedServer