#
# Copyright (C) 2021 Dyadic Pty Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import (
Any, Optional, Callable, List, Sequence, Dict
)
from functools import partial
import asyncio
import serial
from qasync import (
asyncSlot
)
from rfblocks import (
PwrDetectorController, create_serial, write_cmd
)
from tam import (
SIGGEN_LIMITS, SMHU58_GPIBID, DSG815_ID, SIGGEN_MODELS
)
from PyQt5.QtWidgets import (
QGroupBox, QVBoxLayout, QHBoxLayout, QFormLayout, QButtonGroup,
QRadioButton, QLabel, QDoubleSpinBox, QPushButton, QCheckBox,
QMessageBox, QErrorMessage, QWidget, QLineEdit, QDialog,
QComboBox, QSpinBox, QDialogButtonBox, QFileDialog
)
from PyQt5.QtCore import (
Qt, pyqtSignal, pyqtSlot, QObject
)
[docs]class PowerDetector(QObject):
def __init__(self,
controller: PwrDetectorController,
cal_devices: Optional[Dict] = None) -> None:
"""
"""
super().__init__()
self._controller: PwrDetectorController = controller
self._group_box = None
self._pwr_textbox = None
self._freq_box = None
self._cal_btn = None
self._cal_callback = None
self._caldevices = {'SMHU58': SMHU58_GPIBID,
'DSG815': DSG815_ID,
'DDSGEN': '127.0.0.1:18862'}
if cal_devices is not None:
self._caldevices = cal_devices
@property
def ctl(self) -> PwrDetectorController:
return self._controller
@property
def caldevices(self) -> Dict:
return self._caldevices
[docs] @pyqtSlot(float)
def update_freq(self, f: float) -> None:
"""Update the displayed detector signal frequency.
"""
self._freq_box.setValue(f)
[docs] @pyqtSlot(float)
def update_pwr(self, pwr: float) -> None:
"""Update the displayed detector measured power level.
"""
(min_pwr, max_pwr) = self.ctl.linear_range
color = 'black'
if (pwr < min_pwr) or (pwr > max_pwr):
color = 'red'
else:
if (pwr - min_pwr < 0.5) or (max_pwr - pwr < 0.5):
color = 'orange'
self._pwr_textbox.setStyleSheet(
'QLineEdit { font-size: 25pt; color: %s }' % color)
self._pwr_textbox.setText("{:5.1f}".format(pwr))
@pyqtSlot()
def caldata_updated(self):
if self.ctl.cal_data:
self._correct_box.setDisabled(False)
self._correct_label.setDisabled(False)
else:
self._correct_box.setDisabled(True)
self._correct_label.setDisabled(True)
@pyqtSlot()
def update_freq_limits(self):
self._freq_box.setRange(self.ctl.min_frequency,
self.ctl.max_frequency)
self._group_box.setTitle("Detector {}: ({} MHz -> {} MHz)".format(
self.ctl._controller_id,
self.ctl.min_frequency,
self.ctl.max_frequency))
@pyqtSlot(bool)
def update_apply_correction(self, f):
self._correct_box.setChecked(f)
def disable_controls(self, f):
if self._group_box:
self._group_box.setDisabled(f)
@pyqtSlot(bool)
def update_enable_state(self, f):
self._enable_box.setChecked(f)
if f is True:
self._pwr_textbox.setDisabled(False)
self._freq_box.setDisabled(False)
if self._cal_btn:
self._cal_btn.setDisabled(False)
else:
self._pwr_textbox.setDisabled(True)
self._freq_box.setDisabled(True)
if self._cal_btn:
self._cal_btn.setDisabled(True)
def set_enabled(self, state) -> None:
if state == Qt.Checked:
self.ctl.enabled = True
else:
self.ctl.enabled = False
def set_correct(self, state) -> None:
if state == Qt.Checked:
self.ctl.apply_correction = True
else:
self.ctl.apply_correction = False
[docs] def create_detector_group(self,
cal_callback: Callable = None,
caldev_list: List[str] = None) -> QGroupBox:
"""Create UI controls for a power detector.
If ``cal_callback`` is not ``None`` a *Calibration* button will
be included in the UI control group. This button gives access
to the Calibrate dialog window which allows the specification
of calibration parameters and the initiation of the calibration
process.
:param cal_callback: An optional callback which will perform a
calibration on the detector.
:type cal_callback: Callable
:param caldev_list: An optional list of device names for the
calibration signal source.
:type caldev_list: List[str]
"""
self._group_box = QGroupBox("Detector {}:".format(self.ctl._controller_id))
fbox = QFormLayout()
self._freq_box = QDoubleSpinBox()
self._freq_box.setStyleSheet("""QDoubleSpinBox {
font-size: 25pt; }""")
self._freq_box.setRange(self.ctl.min_frequency,
self.ctl.max_frequency)
self._freq_box.setValue(self.ctl.min_frequency)
self._freq_box.setSuffix(" MHz")
self._freq_box.setFixedSize(
self._freq_box.sizeHint().width(), 30)
self._freq_box.valueChanged.connect(self.ctl.set_freq)
fbox.addRow(QLabel("Frequency:"), self._freq_box)
self._pwr_textbox = QLineEdit()
self._pwr_textbox.setStyleSheet("""QLineEdit {
font-size: 25pt; }""")
self._pwr_textbox.setText("0.0")
self._pwr_textbox.setReadOnly(True)
self._pwr_textbox.setFixedSize(5*25, 30)
fbox.addRow(QLabel("Power (dBm):"), self._pwr_textbox)
self._enable_box = QCheckBox("")
self._enable_box.setChecked(True)
self._enable_box.stateChanged.connect(self.set_enabled)
fbox.addRow(QLabel("Enabled:"), self._enable_box)
if cal_callback:
self._cal_callback = cal_callback
self._correct_box = QCheckBox("")
self._correct_box.setChecked(False)
self._correct_box.stateChanged.connect(self.set_correct)
self._correct_box.setDisabled(True)
self._correct_label = QLabel("Correction:")
self._correct_label.setDisabled(True)
fbox.addRow(self._correct_label, self._correct_box)
self.ctl.freq_changed.connect(self.update_freq)
self.ctl.pwr_changed.connect(self.update_pwr)
self.ctl.caldata_changed.connect(self.caldata_updated)
self.ctl.detector_initialized.connect(self.update_freq_limits)
self.ctl.correction_state_changed.connect(
self.update_apply_correction)
self.ctl.enable_state_changed.connect(
self.update_enable_state)
vbox = QVBoxLayout()
vbox.addLayout(fbox)
if cal_callback:
if caldev_list:
self._caldev_list = caldev_list
bbox = QHBoxLayout()
self._cal_btn = QPushButton("Calibration")
self._cal_btn.clicked.connect(self.show_calibration)
bbox.addWidget(self._cal_btn)
self._cal_status_lbl = QLabel("UNCAL")
self._cal_status_lbl.setStyleSheet(
'QLabel { color: %s }' % 'red')
bbox.addWidget(self._cal_status_lbl)
bbox.addStretch()
vbox.addLayout(bbox)
self._group_box.setLayout(vbox)
return self._group_box
def show_calibration(self) -> None:
calibration_dialog = PwrCalibrationDialog(
self,
self.ctl,
self._cal_callback,
self._caldevices.keys())
calibration_dialog.exec_()
[docs]class PwrCalibrationDialog(QDialog):
"""
"""
MAX_SRCPWR = 30.0
MIN_SRCPWR = -100.0
FREQ_SUFFIX = ' MHz'
PWR_SUFFIX = ' dBm'
def __init__(self,
app: PowerDetector,
controller: PwrDetectorController,
cal_callback: Callable,
caldev_list: List[str],
parent: Optional[QObject] = None) -> None:
"""
"""
QDialog.__init__(self, parent)
self._app: PowerDetector = app
self._controller: PwrDetectorController = controller
self._cal_callback: Callable = cal_callback
self._caldev_list: List[str] = caldev_list
self._caldev: str = None
self._srcpwr: float = 0.0
self._start_freq: float = 100.0
self._stop_freq: float = 4000.0
self._step_size: float = 100.0
self._sigsrc_chan: int = 0
self._stopf_box: Optional[QDoubleSpinBox] = None
self._startf_box: Optional[QDoubleSpinBox] = None
self._srcpwr_box: Optional[QDoubleSpinBox] = None
self._steps_box: Optional[QDoubleSpinBox] = None
self._bbox: Optional[QDialogButtonBox] = None
self._sigsrc_chan_controls: Optional[QHBoxLayout] = None
self._chan_widgets: Dict = {}
self.setWindowTitle("Detector Calibration")
self.build_ui()
@property
def srcpwr(self) -> float:
return self._srcpwr
def set_srcpwr(self, pwr: float) -> None:
self._srcpwr = pwr
@property
def start_freq(self) -> float:
return self._start_freq
def set_start_freq(self, f: float) -> None:
self._start_freq = f
if self._stopf_box:
self._stopf_box.setMinimum(f)
@property
def stop_freq(self) -> float:
return self._stop_freq
def set_stop_freq(self, f: float) -> None:
self._stop_freq = f
if self._start_freq:
self._startf_box.setMaximum(f)
@property
def step_size(self) -> float:
return self._step_size
def set_step_size(self, s: float) -> None:
self._step_size = s
def set_sigsrc_chan(self, state: bool, chan: int) -> None:
if state:
self._sigsrc_chan = chan
@property
def caldev(self) -> str:
return self._caldev
def caldev_changed(self, combo: QComboBox, idx: int) -> None:
self._caldev = combo.itemText(idx)
limits = SIGGEN_LIMITS[self._caldev]
self._srcpwr_box.setRange(*limits[2:])
self._stopf_box.setRange(self.start_freq, limits[1])
self._startf_box.setRange(self._controller.min_frequency,
self.stop_freq)
self._startf_box.setValue(self._controller.min_frequency)
self._stopf_box.setValue(limits[1])
if self._sigsrc_chan_controls:
for rb in self._chan_widgets['sigsrc']:
rb.toggled.disconnect()
self._srcdev_layout.removeRow(self._sigsrc_chan_controls)
self._chan_widgets['sigsrc'] = None
self._sigsrc_chan_controls = None
# Check the number of channels for the calibration device
try:
chan_count = SIGGEN_MODELS[self._caldev].CHANNELS
hbox = QHBoxLayout()
buttons = []
for chan in range(chan_count):
rb = QRadioButton(str(chan))
rb.toggled.connect(partial(self.set_sigsrc_chan, chan=chan))
buttons.append(rb)
hbox.addWidget(rb)
self._chan_widgets['sigsrc'] = buttons
self._chan_widgets['sigsrc'][0].setChecked(True)
self._sigsrc_chan_controls = hbox
self._srcdev_layout.addRow("Channels:", hbox)
except AttributeError:
pass
def save_caldata(self):
filter_str = "JSON Configuration (*.json)"
def_filepath = '.'
dialog = QFileDialog(self)
dialog.setFileMode(QFileDialog.AnyFile)
dialog.setNameFilter(filter_str)
dialog.setDirectory(def_filepath)
dialog.setAcceptMode(QFileDialog.AcceptSave)
if dialog.exec_():
names = dialog.selectedFiles()
if len(names) == 0:
return
output_file = str(names[0]).strip()
self._controller.save_caldata(output_file)
def load_caldata(self):
filter_str = "JSON Configuration (*.json)"
def_filepath = '.'
dialog = QFileDialog(self)
dialog.setFileMode(QFileDialog.ExistingFile)
dialog.setNameFilter(filter_str)
dialog.setDirectory(def_filepath)
dstate = dialog.exec_()
print(f'{dstate=}')
if dstate:
names = dialog.selectedFiles()
print(f'{names=}')
if len(names) == 0:
return
input_file = names[0]
self._controller.load_caldata(input_file)
def build_ui(self):
outer_vbox = QVBoxLayout()
hbox = QHBoxLayout()
fbox = QFormLayout()
caldev_combo = QComboBox()
fbox.addRow(QLabel("Source Device:"), caldev_combo)
self._srcpwr_box = QDoubleSpinBox()
self._srcpwr_box.setRange(self._controller.min_power,
self._controller.max_power)
self._srcpwr_box.setDecimals(1)
self._srcpwr_box.setSuffix(PwrCalibrationDialog.PWR_SUFFIX)
self._srcpwr_box.setValue(self.srcpwr)
self._srcpwr_box.valueChanged.connect(self.set_srcpwr)
fbox.addRow(QLabel("Source Power:"), self._srcpwr_box)
inner_vbox = QVBoxLayout()
inner_vbox.addLayout(fbox)
self._srcdev_layout = fbox
inner_vbox.addStretch()
hbox.addLayout(inner_vbox)
fbox2 = QFormLayout()
self._startf_box = QDoubleSpinBox()
self._startf_box.setRange(self._controller.min_frequency,
self.stop_freq)
self._startf_box.setDecimals(1)
self._startf_box.setSuffix(PwrCalibrationDialog.FREQ_SUFFIX)
self._startf_box.setValue(self.start_freq)
self._startf_box.valueChanged.connect(self.set_start_freq)
fbox2.addRow(QLabel("Start Freq:"), self._startf_box)
self._stopf_box = QDoubleSpinBox()
self._stopf_box.setRange(self.start_freq,
self._controller.max_frequency)
self._stopf_box.setDecimals(1)
self._stopf_box.setSuffix(PwrCalibrationDialog.FREQ_SUFFIX)
self._stopf_box.setValue(self.stop_freq)
self._stopf_box.valueChanged.connect(self.set_stop_freq)
fbox2.addRow(QLabel("Stop Freq:"), self._stopf_box)
self._steps_box = QDoubleSpinBox()
self._steps_box.setRange(10.0, 500.0)
self._stopf_box.setDecimals(1)
self._steps_box.setSuffix(PwrCalibrationDialog.FREQ_SUFFIX)
self._steps_box.setValue(self.step_size)
self._steps_box.valueChanged.connect(self.set_step_size)
fbox2.addRow(QLabel("Step Size:"), self._steps_box)
caldev_combo.currentIndexChanged.connect(
lambda idx, w=caldev_combo: self.caldev_changed(w, idx))
caldev_combo.addItems(self._caldev_list)
hbox.addLayout(fbox2)
outer_vbox.addLayout(hbox)
hbox2 = QHBoxLayout()
self._save_cal_btn = QPushButton("Save")
if not self._controller.cal_data:
self._save_cal_btn.setDisabled(True)
self._load_cal_btn = QPushButton("Load")
self._bbox = QDialogButtonBox(
QDialogButtonBox.Apply | QDialogButtonBox.Close)
self._bbox.addButton(self._save_cal_btn, QDialogButtonBox.ActionRole)
self._bbox.addButton(self._load_cal_btn, QDialogButtonBox.ActionRole)
hbox2.addWidget(self._bbox)
self._bbox.button(QDialogButtonBox.Apply).setText('Calibrate')
self._bbox.button(QDialogButtonBox.Apply).clicked.connect(self.calibrate)
self._save_cal_btn.clicked.connect(self.save_caldata)
self._load_cal_btn.clicked.connect(self.load_caldata)
self._bbox.rejected.connect(self.close)
outer_vbox.addLayout(hbox2)
self.setLayout(outer_vbox)
[docs] def close(self) -> None:
QDialog.reject(self)
def showErrorMessage(self, msg):
error_dialog = QErrorMessage(self)
error_dialog.setWindowModality(Qt.WindowModal)
error_dialog.setParent(self, Qt.Sheet)
error_dialog.setResult(0)
error_dialog.showMessage(msg)
@asyncSlot()
async def calibrate(self):
loop = asyncio.get_event_loop()
self._bbox.setDisabled(True)
try:
model = self.caldev
device_id = self._app.caldevices[self.caldev]
status_str = await loop.run_in_executor(
None,
self._cal_callback, model, device_id, self._sigsrc_chan,
self.srcpwr, self.start_freq, self.stop_freq, self.step_size)
if len(status_str):
self.showErrorMessage(status_str)
except serial.serialutil.SerialException as se:
self.showErrorMessage(str(se))
finally:
self._bbox.setDisabled(False)
self._save_cal_btn.setDisabled(False)