pwrmeter.py
This document describes an app for controlling the RF power meter reference design. When executed, the script displays a GUI window which allows the power meter to be controlled via it's USB connection.
The Script
Generate the script using C-c C-v t
.
The main
Function
The asyncio
framework is used to coordinate communications over the
serial device without blocking user interface events. For this
purpose we make use of QEventLoop
and QThreadExecutor
from the
asyncqt package.
The event loop is equipped with an instance of QThreadExecutor
which
executes a single thread. The idea here is to ensure that requests to
the ECApp serial device will execute one by one in a manner which will
not block the operation of the Qt event loop.
def main(): global server_thread <<process-cmdline-args>> app = QApplication(sys.argv) loop = QEventLoop(app) loop.set_default_executor(QThreadExecutor(1)) asyncio.set_event_loop(loop) pwrmeter_app = PwrMeterApp( chan_cspins = [args.chan0_cspin, args.chan1_cspin], atten_lepins = [args.atten0_lepin, args.atten1_lepin], serial_device = args.device, baudrate = args.baudrate, headless = args.nogui) if not args.nogui: pwrmeter_app.show() server_thread = QThread() server = RPyCServer(PwrMeterService(pwrmeter_app), args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() with loop: loop.set_debug(True) sys.exit(loop.run_forever())
process-cmdline-args
Command line arguments allow the specification of a serial device and baud rate for the purposes of testing.
defaultBaud = 0 parser = ArgumentParser(description= '''An RF power meter.''') parser.add_argument( "--nogui", action='store_true', help="Disable GUI and run 'headless'") parser.add_argument( "-d", "--device", default=None, help="The hardware serial device") parser.add_argument( "-b", "--baudrate", default=defaultBaud, type=int, help="Baud rate (default: {})".format(defaultBaud)) parser.add_argument( "-A", "--ipaddr", default="127.0.0.1", help="IP address for to bind the RPyC server instance") parser.add_argument( "-P", "--port", default=18863, type=int, help="TCP port for the RPyC server instance") parser.add_argument( "--chan0_cspin", default='D0', help="CS pin for Chan0 power detector. Default: D0") parser.add_argument( "--chan1_cspin", default='D1', help="CS pin for Chan1 power detector. Default: D1") parser.add_argument( "--atten0_lepin", default=None, help="LE pin for Chan0 step attenuator. Default: no attenuator") parser.add_argument( "--atten1_lepin", default=None, help="LE pin for Chan1 step attenuator. Default: no attenuator") args = parser.parse_args()
app-class
The application class encapsulates the instrument functionality including the on-screen representation and controls.
The power meter consists of two power detectors, one on each of the channels
'0' and '1'. Optionally, the detectors may be equipped with step
attenuators. The presence of a step attenuator is signalled by the
specification of a LE
pin via the --atten0_lepin
or
--atten1_lepin
.
class PwrMeterApp(QMainWindow): CALSTATUS_INCAL = 0 CALSTATUS_ESTIMATED = 1 CALSTATUS_UNCAL = 2 CALSTATUS_STR = { CALSTATUS_INCAL: 'INCAL', CALSTATUS_ESTIMATED: 'ESTIMATED', CALSTATUS_UNCAL: 'UNCAL'} CALSTATUS_COLOURS = { CALSTATUS_INCAL: 'black', CALSTATUS_ESTIMATED: 'orange', CALSTATUS_UNCAL: 'red' } calstatus_changed = pyqtSignal(int, int) def __init__(self, chan_cspins: List = None, atten_lepins: List = None, serial_device: Optional[str] = None, baudrate: int = 0, headless: bool = False) -> None: """ """ super().__init__() self._nogui: bool = headless self._detectors: List[PwrDetectorController] = [ PwrDetectorController('0', LogDetector(chan_cspins[0])), PwrDetectorController('1', LogDetector(chan_cspins[1]))] self._attenuators: List[Optional[PE43711Controller]] = [None, None] if atten_lepins: for att_id in[0, 1]: if atten_lepins[att_id]: att_ctl = PE43711Controller( str(att_id), pe43711(atten_lepins[att_id])) att_ctl.attenuation_changed.connect( self.attenuation_changed) self._attenuators[att_id] = att_ctl self._continuous: bool = False self._acquiring: bool = False self._shuttingdown: bool = False self._acquire_group_box: Optional[QGroupBox] = None self._ctl_device: str = serial_device self._baudrate: int = baudrate if baudrate == 0: self._baudrate = DEFAULT_BAUDRATE self._calstatus: List[int] = [ PwrMeterApp.CALSTATUS_UNCAL, PwrMeterApp.CALSTATUS_UNCAL] for det_id, ctl in enumerate(self._detectors): ctl.caldata_changed.connect(partial(self.caldata_updated, det_id)) self.calstatus_changed.connect(self.update_calstatus) if not headless: self._detector_displays: List[Optional[PowerDetector]] = [ PowerDetector(ctl) for ctl in self.detectors] self._attenuator_displays: List[Optional[StepAttenuator]] = [None, None] for att_id in [0, 1]: att = self.attenuators[att_id] if att: self._attenuator_displays[att_id] = StepAttenuator( self, att) self.init_ui() def chan0_power_offset_fn(self, freq): """ """ return self._coupling + self._attn_insertion_loss def chan1_power_offset_fn(self, freq): """ """ return self._amplifier_gain + self._coupling @property def ctl_device(self) -> str: return self._ctl_device @property def serial_ports(self) -> List: ports = [DEFAULT_SOCKET_URL] ports.extend([port.device for port in comports() if port.serial_number]) if self.ctl_device: if self.ctl_device in ports: ports.pop(ports.index(self.ctl_device)) ports.insert(0, self.ctl_device) return ports @property def baudrate(self) -> int: return self._baudrate @property def detectors(self) -> List[PwrDetectorController]: """Returns a list containing the power detector controllers. """ return self._detectors @property def attenuators(self) -> List[Optional[PE43711Controller]]: """Returns a list containing the step attenuator controllers. """ return self._attenuators @property def calstatus(self) -> List[int]: """Returns a list containing the status of the detectors. """ return self._calstatus @pyqtSlot(object) def attenuation_changed(self, att_ctl: PE43711Controller) -> None: """Device attenuation setting has changed. """ det_id = int(att_ctl._controller_id) ctl: PwrDetectorController = self.detectors[det_id] # Check the calibration attenuation setting. # If there is no calibration attenuation setting # then just return without doing anything try: cal_atten = ctl.cal_data_conditions[ PwrDetectorController.ATTEN_CAL_CONDITION] except KeyError: # There's no calibration attenuation value so # just set the insertion loss offset ctl.insertion_loss_offset = att_ctl.attenuation return # If the current attenuation setting for att_ctl # is different from the calibration attenuation # then calculate the insertion loss offset. loss_offset: float = att_ctl.attenuation - cal_atten if ctl.insertion_loss_offset == 0.0 and loss_offset != 0.0: # If the current value of the detector # controller insertion_loss_offset is zero and # the calculated loss offset is non-zero # emit a calstatus_changed signal. self.calstatus_changed.emit(det_id, PwrMeterApp.CALSTATUS_ESTIMATED) elif ctl.insertion_loss_offset != 0.0 and loss_offset == 0.0: # Likewise, if the detector controller # insertion_loss_offset is non-zero and the # calculated loss offset is zero # emit a calstatus_changed signal. self.calstatus_changed.emit(det_id, PwrMeterApp.CALSTATUS_INCAL) # Set the detector controller insertion loss offset ctl.insertion_loss_offset = loss_offset def disable_chan_controls(self, chan_id: int, f: bool) -> None: if chan_id in [0, 1]: self._detector_displays[chan_id].disable_controls(f) att_display = self._attenuator_displays[chan_id] if att_display: att_display.disable_controls(f) def disable_controls(self, f: bool) -> None: for det_id in [0, 1]: self.disable_chan_controls(det_id, f) if self._acquire_group_box: self._acquire_group_box.setDisabled(f) def initialize_detectors(self) -> None: with create_serial(self.ctl_device, self.baudrate) as ser: for ctl in self.detectors: ctl.initialize(ser) @pyqtSlot() def caldata_updated(self, det_id): self.update_calstatus(det_id, PwrMeterApp.CALSTATUS_INCAL) @pyqtSlot(int, int) def update_calstatus(self, det_id: int, status: int): """ """ self.calstatus[det_id] = status if not self._nogui: grp: PowerDetector = self._detector_displays[det_id] grp._cal_status_lbl.setText(PwrMeterApp.CALSTATUS_STR[status]) grp._cal_status_lbl.setStyleSheet( 'QLabel { color: %s }' % PwrMeterApp.CALSTATUS_COLOURS[status]) @property def continuous(self) -> bool: return self._continuous @property def acquiring(self) -> bool: return self._acquiring def set_continuous(self, c: bool) -> None: self._continuous = c def set_acquiring(self, a: bool) -> None: self._acquiring = a @property def shuttingdown(self) -> bool: return self._shuttingdown def shutdown(self, b: bool) -> None: self._shuttingdown = b <<hwconfig-funcs>> <<acquisition-funcs>> <<calibration>> def init_ui(self) -> None: vbox = QVBoxLayout() vbox.addWidget(self.create_hwconfig_group()) hbox = QHBoxLayout() for det_id in [0, 1]: inner_vbox = QVBoxLayout() grp = self._detector_displays[det_id].create_detector_group( partial(self.calibrate, det_id)) inner_vbox.addWidget(grp) att_display = self._attenuator_displays[det_id] if att_display: grp = att_display.create_attenuator_group() inner_vbox.addWidget(grp) else: inner_vbox.addStretch() hbox.addLayout(inner_vbox) self.disable_controls(True) vbox.addLayout(hbox) hbox2 = QHBoxLayout() grp = self.create_acquisition_control_group() hbox2.addWidget(grp) grp.setDisabled(True) vbox.addLayout(hbox2) self.container = QWidget() self.container.setLayout(vbox) self.setCentralWidget(self.container) self.setWindowTitle('Dual Detector Power Meter')
hwconfig-funcs
def create_hwconfig_group(self): group_box = QGroupBox("Hardware Config:") vbox = QVBoxLayout() hbox = QHBoxLayout() tty_combo = QComboBox() tty_combo.currentIndexChanged.connect( lambda idx, w=tty_combo: self.tty_changed(w, idx)) tty_combo.addItems(self.serial_ports) line_edit = QLineEdit() line_edit.editingFinished.connect( lambda w=line_edit: self.tty_edit_finished(w)) tty_combo.setLineEdit(line_edit) hbox.addWidget(QLabel("Control Port:")) hbox.addWidget(tty_combo) self.hw_config_btn = QPushButton("Initialize") self.hw_config_btn.clicked.connect(self.hw_config) hbox.addWidget(self.hw_config_btn) hbox.addStretch(1) vbox.addLayout(hbox) group_box.setLayout(vbox) return group_box def tty_changed(self, combo, idx): self._ctl_device = combo.itemText(idx) def tty_edit_finished(self, line_edit): self._ctl_device = line_edit.text() def showErrorMessage(self, msg): error_dialog = QErrorMessage(self) error_dialog.setWindowModality(Qt.WindowModal) error_dialog.setParent(self, Qt.Sheet) error_dialog.setResult(0) error_dialog.showMessage(msg) @asyncSlot() async def hw_config(self): self.hw_config_btn.setEnabled(False) control_disable = False try: loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.initialize_detectors) except serial.serialutil.SerialException: self.showErrorMessage("Bad serial device.") control_disable = True except DetectorReadError: self.showErrorMessage("Error reading detector data.") control_disable = True except Exception as e: print(e) control_disable = True finally: self.hw_config_btn.setEnabled(True) self.disable_controls(control_disable) @asyncClose async def closeEvent(self, event): if self.continuous: self.set_acquiring(False) self.shutdown(True) event.ignore() else: event.accept()
acquisition-funcs
On screen acquisition controls. Power levels are acquired from the
detectors either as a single shot or continuously. Acquisition is
initiated using the Acquire
button. If continuous acquisition is
in operation the acquire button will be re-labeled as the Stop
button. During single shot acquisition the Acquire
button is
disabled until the power level is read.
The asyncio
framework is used to manage the multi-threaded
operation of the power level acquisition. This is implemented in the
<<acquisition-slots>>
coroutines. Note that the asyncqt
module
is used to integrate the Qt event loop into the asyncio
event
loop. This also necessitates the use of the asyncSlot
and
asyncClose
decorators. The enable_stop_acquisition
and
enable_start_acquisition
functions are used to manage the state of
the Acquire~/~Stop
button. During continuous acquisition the
detector frequency controls and the hardware Initialize
button are
disabled.
def create_acquisition_control_group(self): self._acquire_group_box = QGroupBox("Acquisition Control:") vbox = QVBoxLayout() hbox = QHBoxLayout() acqGroup = QButtonGroup(hbox) rb = QRadioButton("Single") acqGroup.addButton(rb) rb.setChecked(True) hbox.addWidget(rb) rb = QRadioButton("Continuous") acqGroup.addButton(rb) hbox.addWidget(rb) rb.toggled.connect(lambda state: self.set_continuous(state)) hbox.addStretch(1) self.acquire_btn = QPushButton("Acquire") self.acquire_btn.clicked.connect(self.acquire_pwr) hbox.addWidget(self.acquire_btn) vbox.addLayout(hbox) self._acquire_group_box.setLayout(vbox) return self._acquire_group_box
The acquire_pwr
and acquire_stop
are async coroutines called from
the Qt event loop when the Acquire/Stop
button is pressed. The
acquire_pwr
coroutine will invoke the acquire_pwr_levels
function
inside a separate thread in order to communicate with the detector
hardware without blocking the GUI code.
@asyncSlot() async def acquire_pwr(self): loop = asyncio.get_event_loop() if self.continuous: self.enable_stop_acquisition() try: await loop.run_in_executor(None, self.acquire_pwr_levels) if self.shuttingdown: QApplication.quit() except serial.serialutil.SerialException as se: print(se) self.showErrorMessage("Bad serial device.") except Exception as e: print(e) finally: self.enable_start_acquisition() else: self.acquire_btn.setEnabled(False) try: await loop.run_in_executor(None, self.acquire_pwr_levels) except serial.serialutil.SerialException: self.showErrorMessage("Bad serial device.") except Exception as e: print(e) finally: self.acquire_btn.setEnabled(True) @asyncSlot() async def acquire_stop(self): if self.continuous: self.set_acquiring(False)
def enable_stop_acquisition(self): self.disable_freq_controls() self.hw_config_btn.setEnabled(False) self.acquire_btn.setText("Stop") self.set_acquiring(True) try: self.acquire_btn.clicked.disconnect() except Exception: pass self.acquire_btn.clicked.connect(self.acquire_stop) def enable_start_acquisition(self): self.acquire_btn.setText("Acquire") try: self.acquire_btn.clicked.disconnect() except Exception: pass self.acquire_btn.clicked.connect(self.acquire_pwr) self.hw_config_btn.setEnabled(True) self.enable_freq_controls() def disable_freq_controls(self): for det in self._detector_displays: det._freq_box.setEnabled(False) def enable_freq_controls(self): for det in self._detector_displays: det._freq_box.setEnabled(True)
calibration
The calibrate
method is invoked as a callback from the
rfblocks.PowerDetector
class. A reference of the calibrate
method
is passed to the PowerDetector
instances when building the
PwrMeterApp UI - specifically, when creating the on-screen
representations of the PowerDetector instances via
create_detector_group
.
The calibrate
method is responsible for the actual measurement of
the power detector response.
Calibration with the optional attenuator is carried out at the current attenuation level. If the attenuation level is changed after calibration the insertion loss is estimated by adding the difference between the new attenuation level and the attenuation level in effect during calibration. The app status line should display something like 'Estimating Cal' in this situation.
Note that when measurements are made during calibration no corrections
should be made. This necessitates setting the detector controller's
apply_correction
attribute to False
before starting the
calibration and then restoring the old value after calibration is
complete.
def calibrate(self, det_id, src_device, device_id, src_chan, srcpwr, start, stop, step): print("calibrate: {}, {}, {}".format(src_device, device_id, src_chan)) mgr = InstrumentMgr() sigsrc = mgr.open_instrument(src_device, device_id) try: sigsrc.chan = src_chan except AttributeError: pass loss = {} ctl = self.detectors[det_id] old_correct_setting = ctl.apply_correction ctl.apply_correction = False ctl.calibrating = True sigsrc.output = True with create_serial(self.ctl_device, self.baudrate) as ser: for f in np.arange(start, stop, step): # Note that the property assignment here may actually # be a rpyc remote operation depending on the selected # calibration device. Since ~f~ (and possibly ~srcpwr~) # may be numpy array scalars it is necessary to explicitly # cast these to standard Python float values. If this is # not done the remote rpyc process will almost certainly # generate strange numpy related exceptions. sigsrc.freq = float(f) sigsrc.level = float(srcpwr) sleep(1.0) ctl.freq = f ctl.measure(ser) loss['{}'.format(f)] = srcpwr - ctl.pwr sigsrc.output = False ctl.calibrating = False ctl.apply_correction = old_correct_setting mgr.close_instrument(sigsrc) mgr.close() # If this detector has a step attenuator associated with it # then we must take note of the attenuation setting for which # the calibration was carried out. This is done by # saving the attenuation setting as a value in the # detector controller cal_data_conditions dictionary. att = self.attenuators[det_id] if att: ctl.cal_data_conditions[ PwrDetectorController.ATTEN_CAL_CONDITION] = att.attenuation # Any current loss offset should now be set to 0 since it # is now corrected by the calibration ctl.insertion_loss_offset = 0.0 ctl.cal_data = loss self.calstatus_changed.emit(det_id, PwrMeterApp.CALSTATUS_INCAL) return ''
pwrmeter-service
The Python remote procedure call framework RPyC is used to create a
network service which makes the functionality of the pwrmeter
app
available to other clients.
The PwrMeterService
implements the functionality which is accessed via
RPyC.
class PwrMeterService(rpyc.Service): def __init__(self, app): super().__init__() self._app = app def initialize(self) -> None: """Initialize the service app and associated hardware modules. """ with create_serial(self._app.ctl_device, self._app.baudrate) as ser: for ctl in self.detectors: ctl.initialize(ser) if not self._app._nogui: self._app.hw_config_btn.setEnabled(True) self._app.disable_controls(False) @property def detectors(self): """Return a list of the power detector objects. :return: A list of PwrDetectorController instances one for each of the power meter channels. """ return self._app.detectors @property def attenuators(self): """Return a list of channel attenuator objects. Note that one of more of the objects may be ``None`` depending on whether the corresponding channel is actually equipped with a step attenuator. :return: A list of PE43711Controller instances one for each of the power meter channels. """ return self._app.attenuators def set_attenuation(self, ctl_id, atten): """Set the attenuation for the specified power meter channel. :param ctl_id: Power meter channel id. :type ctl_id: int :param atten: Required attenuation level. :type atten: float Note that this will only have any effect if the channel is actually equipped with a step attenuator. """ att = self.attenuators[ctl_id] if att: if not self._app._nogui: grp = self._app._attenuator_displays[ctl_id] if grp: grp._atten_box.setValue(atten) else: att.attenuation = atten with create_serial(self._app.ctl_device, self._app.baudrate) as ser: att.configure(ser) def detector_enable(self, ctl_id, en): """Enable/disable the specified power meter channel. :param ctl_id: Power meter channel id. :type ctl_id: int :param en: True to enable the channel, False disables the channel. :type en: bool """ if not self._app._nogui: state = Qt.Checked if en else Qt.Unchecked self._app._detector_displays[ctl_id].set_enabled(state) att_display = self._app._attenuator_displays[ctl_id] if att_display is not None: att_display.disable_controls(en) else: ctl = self.detectors[ctl_id].enabled = en def calibrate(self, ctl_id, src_dev, device_id, src_chan, srcpwr, startf, stopf, stepf): """Carry out a power meter channel calibration. :param ctl_id: Power meter channel id. :type ctl_id: int :param src_dev: Calibration signal source device. :type src_dev: str :param device_id: Calibration signal source device id. :type device_id: str :param src_chan: Calibration signal source channel. :type src_chan: int :param srcpwr: Calibration signal source power level (in dBm). :type srcpwr: float :param startf: Calibration start frequency (in MHz). :type startf: float :param stopf: Calibration stop frequency (in MHz). :type stopf: float :param stepf: Calibration frequency step size (in MHz) :type stepf: float .. code:: python import rpyc from tam import DSG815_ID pwrmeter = rpyc.connect('127.0.0.1', 18863) pwrmeter.root.initialize() chan_controllers = pwrmeter.root.detectors chan_ctl0 = chan_controllers[0] chan_ctl1 = chan_controllers[1] # Calibration is a long running process and the calibrate call # is synchronous. The rpyc request timeout should be either # disabled or set to a long period so that a timeout isn't triggered old_timeout = pwrmeter._config['sync_request_timeout'] pwrmeter._config['sync_request_timeout'] = None status = pwrmeter.root.calibrate(0, 'DSG815', DSG815_ID, 0, 0.0, 100.0, 1500.0, 100.0) pwrmeter.root.calstatus[0] # should return 0 # Restore the rpyc request timeout value pwrmeter._config['sync_request_timeout'] = old_timeout .. seealso:: :py:meth:`calstatus` """ return self._app.calibrate( ctl_id, src_dev, device_id, src_chan, srcpwr, startf, stopf, stepf) def save_caldata(self, ctl_id, cal_data_file): """Save channel calibration data to the specified file. :param ctl_id: Power meter channel id. :type ctl_id: int :param cal_data_file: Path to file where cal. data will be saved. :type cal_data_file: str """ ctl = self.detectors[ctl_id] ctl.save_caldata(cal_data_file) def load_caldata(self, ctl_id, cal_data_file): """Load channel calibration data from the specified file. :param ctl_id: Power meter channel id. :type ctl_id: int :param cal_data_file: Path to file from which cal. data will be loaded. :type cal_data_file: str """ ctl = self.detectors[ctl_id] ctl.load_caldata(cal_data_file) @property def calstatus(self): """Return the status of the most recent calibration operation. :return: True if the calibration completed successfully, False otherwise. .. seealso:: :py:meth:`calibrate` """ return self._app.calstatus def measure(self, ctl_id: int) -> None: """Take a power measurement on the specified power meter channel. :param ctl_id: Power meter channel id. :type ctl_id: int .. code:: python import rpyc pwrmeter = rpyc.connect('127.0.0.1', 18863) pwrmeter.root.initialize() chan_controllers = pwrmeter.root.detectors chan_ctl0 = chan_controllers[0] chan_ctl1 = chan_controllers[1] chan_ctl0.freq = 500.0 pwrmeter.root.measure(0) pwr = chan_ctl0.pwr chan_ctl0.apply_correction = True pwrmeter.root.measure(0) corrected_pwr = chan_ctl0.pwr chan_ctl1.freq = 150.0 pwrmeter.root.measure(1) pwr = chan_ctl1.pwr pwrmeter.close() """ with create_serial(self._app.ctl_device, self._app.baudrate) as ser: self.detectors[ctl_id].measure(ser) class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("PwrMeter rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname = self._host, port = self._port, protocol_config = { 'allow_all_attrs': True, 'allow_setattr': True, 'allow_pickle': True}) self._server.start() self.finished.emit()
Generating calibration data from S-parameter measurements
In this example S21 measurements of a 12dB attenuator were made using a KC901V network analyzer. Because this is a "1 1/2" port analyzer, impedance mismatches cause a certain amount of ripple in the resulting measured transmission. To compensate for the ripple a cubic function is fitted prior to calculating the transmission at each of the calibration data frequencies. Figure 1 shows the measured transmission and the cubic fit used to estimate the calibration data.
The associated output calibration data looks like the following:
{ "100.0": 11.78, "200.0": 11.77, "300.0": 11.77, "400.0": 11.77, "500.0": 11.77, "600.0": 11.78, "700.0": 11.78, "800.0": 11.78, "900.0": 11.78, "1000.0": 11.78, "1100.0": 11.79, "1200.0": 11.79, "1300.0": 11.79, "1400.0": 11.8, "1500.0": 11.8, "1600.0": 11.8, "1700.0": 11.81, "1800.0": 11.81, "1900.0": 11.82, "2000.0": 11.82, "2100.0": 11.83, "2200.0": 11.83, "2300.0": 11.84, "2400.0": 11.84, "2500.0": 11.85, "2600.0": 11.85, "2700.0": 11.86, "2800.0": 11.87, "2900.0": 11.87, "3000.0": 11.88, "3100.0": 11.88, "3200.0": 11.89, "3300.0": 11.89, "3400.0": 11.9, "3500.0": 11.9, "3600.0": 11.91, "3700.0": 11.91, "3800.0": 11.92, "3900.0": 11.92, "4000.0": 11.93, "4100.0": 11.93, "4200.0": 11.93, "4300.0": 11.94, "4400.0": 11.94, "4500.0": 11.94, "4600.0": 11.95, "4700.0": 11.95, "4800.0": 11.95, "4900.0": 11.95, "5000.0": 11.95, "5100.0": 11.95, "5200.0": 11.95, "5300.0": 11.95, "5400.0": 11.95, "5500.0": 11.95, "5600.0": 11.95, "5700.0": 11.95, "5800.0": 11.94, "5900.0": 11.94, "6000.0": 11.94 }
from pathlib import Path import json import skrf as rf from skrf import Network, Frequency import numpy as np import matplotlib.pyplot as plt from scipy.optimize import curve_fit from dyadic.splot import init_style def cubic_func(x, a, b, c, d): return (((a*x + b)*x + c)*x + d) freq_range = freqRange max_freq = float(freqRange.split('-')[-1][:-3])*1e6 dataPath = Path(dataDir) sparams = Network(str(dataPath / s21file))[freq_range] popt, perr = curve_fit(cubic_func, sparams.s21[freq_range].f/1e6, sparams.s21[freq_range].s_db[:,0,0]) init_style() annotation_color = 'darkgrey' fig = plt.figure(num=None, figsize=(6.0, 4.0), dpi=72) ax = fig.add_subplot(111) ax.set_ylim(-15.0, -10.0) ax.set_xlim(0, max_freq/1e6) ax.grid(linestyle=':') ax.grid(which='both', axis='x', linestyle=':') ax.plot(sparams.s21[freq_range].f/1e6, sparams.s21[freq_range].s_db[:,0,0]) ax.plot(sparams.s21[freq_range].f/1e6, [cubic_func(f, *popt) for f in sparams.s21[freq_range].f/1e6], linestyle='dashed') fig.tight_layout() plt.savefig(outfile) cal_data = {f"{f:.1f}": float(f"{cubic_func(f, *popt):.2f}")*(-1) for f in np.arange(100, 6050, 100)} with open(calfile, 'w') as fd: json.dump(cal_data, fd)
Imports
from typing import ( Optional, List, Dict, Tuple ) from functools import ( partial ) import sys import traceback from time import sleep import asyncio import numpy as np from argparse import ArgumentParser import serial from serial.tools.list_ports import comports from rfblocks import ( LogDetector, PwrDetectorController, pe43711, PE43711Controller, DetectorReadError, create_serial, write_cmd, query_cmd, DEFAULT_BAUDRATE, DEFAULT_SOCKET_URL ) from qtrfblocks import ( PowerDetector, StepAttenuator ) from qasync import ( QEventLoop, QThreadExecutor, asyncSlot, asyncClose ) import pyvisa from tam import ( InstrumentMgr ) from PyQt5.QtWidgets import ( QApplication, QWidget, QMainWindow, QLabel, QPushButton, QVBoxLayout, QHBoxLayout, QFormLayout, QComboBox, QGroupBox, QErrorMessage, QDoubleSpinBox, QLineEdit, QButtonGroup, QRadioButton, QCheckBox ) from PyQt5.QtCore import ( Qt, QObject, QThread, pyqtSignal, pyqtSlot ) import rpyc from rpyc.utils.server import ThreadedServer