A Spectrum Analyzer Testbed Application
The Application
This is a spectrum analyzer application targeting systems using the Red Pitaya ADC/DAC board in combination with RF front ends constructed using RF Blocks modules. Figure 1 illustrates the relationships between the various hardware and software components of a typical system.
Software components in the system are:
sa.py
-
The spectrum analyzer application described in this document.
adcdma.py
-
Provides application level access to the RedPitaya ADC hardware.
rprf.py
-
Application level control for the RF front end hardware.
bridge.py
-
A network to USB/RS232 serial bridge providing control access to USB connected RF Blocks hardware.
Hardware components of the system are:
Figure 1: RPSA hardware environment. Heavy black lines indicate RF signal connections, light black lines indication control connections, heavy blue lines indicate TCP/IP network connections, heavy orange lines indicate USB connections.
Figure 2: RPSA class summary
import sys from os import path from pathlib import Path import json import warnings import traceback from scipy import signal from argparse import ArgumentParser import asyncio import faulthandler from PyQt5.QtCore import QThread from PyQt5.QtGui import qt_set_sequence_auto_mnemonic from PyQt5.QtWidgets import QApplication from qasync import ( QEventLoop ) from tam import ( BASEBAND_CAL, FREQRESP_CAL, NO_CAL ) from app import AnalyzerApp from service import RPyCServer, RPSaService _formatwarning = warnings.formatwarning def formatwarning_tb(*args, **kwargs): s = _formatwarning(*args, **kwargs) tb = traceback.format_stack() s += ''.join(tb[:-1]) return s warnings.formatwarning = formatwarning_tb def main(): global server_thread <<process-cmdline-args>> faulthandler.enable() # If specified, read HW config from file hw_config = None if args.frontend_config is not None: try: with open(args.frontend_config) as fd: hw_config = json.load(fd) except FileNotFoundError: print(f'No such file "{args.frontend_config}" for front end HW config.') print('Using default front end HW config. instead.') cal_data = None if args.cal_data is not None: try: with open(args.cal_data) as fd: cal_data = json.load(fd) except FileNotFoundError: print(f'No such file "{args.cal_data}" for ADC calibration data.') print('Using the default calibration data instead.') # ----------------------------- # FFTW provides no appreciable improvement in performance # over that for scipy.fftpack routines. # # Monkey patch fftpack with pyfftw.interfaces.scipy_fftpack # scipy.fftpack = pyfftw.interfaces.scipy_fftpack # Turn on the cache for optimum performance # pyfftw.interfaces.cache.enable() app = QApplication(sys.argv) if args.qt_style is not None: qt_path = Path(args.qt_style) if qt_path.is_file(): with open(args.qt_style) as fd: app.setStyleSheet(fd.read()) else: print(f"WARNING: Can't find the Qt style file '{args.qt_style}'") loop = QEventLoop(app) asyncio.set_event_loop(loop) qt_set_sequence_auto_mnemonic(True) # Suppress the user warning from the scipy.signal.welch function # This just tells us that the output from scipy.signal.welch will # be two sided since the input is complex. warnings.filterwarnings("ignore", category=UserWarning, module=signal.__name__) adc_device_props = {'adc_ip': args.adcaddr, 'adc_port': args.adcport} sa_app = AnalyzerApp(adc_device_props, frontend_ip=args.frontend_ip, frontend_port=args.frontend_port, frontend_config=hw_config, cal_device=args.cal_device, cal_data=cal_data) sa_app.show() server_thread = QThread() server = RPyCServer(RPSaService(sa_app), args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() with loop: loop.set_debug(True) sys.exit(app.exec_())
process-cmdline-args
defaultBaud = 0 parser = ArgumentParser(description= '''Red Pitaya spectrum analyzer application''') parser.add_argument("-a", "--adcaddr", default="192.168.0.155", help="IP address for the ADC DMA server instance") parser.add_argument("-p", "--adcport", default=18900, type=int, help="TCP port for the ADC DMA server instance") parser.add_argument("-A", "--ipaddr", default="127.0.0.1", help="IP address for the SA RPyC server instance") parser.add_argument("-P", "--port", default=18865, type=int, help="TCP port for the SA RPyC server instance") parser.add_argument("--frontend_ip", default="127.0.0.1", help="IP address for the RF frontend server instance") parser.add_argument("--frontend_port", default="18870", help="TCP port for the RF frontend server instance") parser.add_argument("--cal_device", default=None, help="Calibration source device. The device is specified" " as a comma separated string of the form:" " DEVICE_NAME,DEVICE_ID for example: SMHU58,28") parser.add_argument("--cal_data", default=None, help="A file containing ADC calibration data.") parser.add_argument("-F", "--frontend_config", default=None, help="Frontend hardware configuration.") parser.add_argument("--qt_style", default=None, help="Qt style sheet file") args = parser.parse_args()
Front end configuration
AnalyzerApp Class
from typing import ( Optional, Dict, List ) import sys from math import ceil from functools import partial import numpy as np import rpyc from PyQt5.QtCore import ( Qt, QObject, QRunnable, QThread, QThreadPool, QPointF, QEventLoop, QWaitCondition, QMutex, pyqtSignal, pyqtSlot ) from PyQt5.QtGui import ( QFont, QKeySequence ) from PyQt5.QtWidgets import ( QWidget, QLabel, QDoubleSpinBox, QLayout, QVBoxLayout, QHBoxLayout, QGroupBox, QMainWindow, QComboBox, QPushButton, QMessageBox, QFormLayout, QErrorMessage, QApplication, QTableView, QStatusBar, QMenu, QGridLayout, QCheckBox, QFrame, QSpinBox, QRadioButton, QGraphicsView, QToolBox, QSizePolicy, QButtonGroup, QTabWidget, QAction, QActionGroup, QMenu, QAbstractButton, QDialog, QLineEdit, QDialogButtonBox, QSizePolicy ) import pyqtgraph as pg from tam import ( ChannelCapabilities ) from utils import ( format_freq, parse_freq_spec, FREQ_SUFFIXES, PWR_SUFFIX, REFLEVEL_SUFFIX, TRACE_TYPE_NORMAL, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG, TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE, DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) from data_store import DataStore from device import AdcDevice from channel import AnalyzerChannel, CalibrationDialog, RbwSpinBox from plot import SpectrumPlotWidget, SpectrumMarker from fe_client import FrontEndClient, BaseFrontEndService from adc import AdcThread <<noise-floor-calibration-dialog>> <<freq-calibration-dialog>> class AnalyzerLayoutWidget(pg.GraphicsLayoutWidget): DARK_BACKGROUND_COLOR = "#2a2d31" LIGHT_BACKGROUND_COLOR = 1.0 sigKeyPress = pyqtSignal(object) sigKeyRelease = pyqtSignal(object) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def keyPressEvent(self, ev): self.scene().keyPressEvent(ev) self.sigKeyPress.emit(ev) def keyReleaseEvent(self, ev): self.scene().keyReleaseEvent(ev) self.sigKeyRelease.emit(ev) class AnalyzerApp(QMainWindow): MIN_PLOT_WIDTH = 600 MIN_PLOT_HEIGHT = 400 DEFAULT_CAL_LEVEL = -30.0 MIN_CAL_LEVEL = -80.0 MAX_CAL_LEVEL = -5.0 FREQAMPL_TAB = 0 MARKER_TAB = 1 TRACE_TAB = 2 processAppEvents = pyqtSignal() def __init__(self, adc_device_props: Dict, frontend_ip: str, frontend_port: int, frontend_config: Optional[Dict] = None, cal_device: str = None, cal_data: Optional[List] = None, min_plot_height: int = MIN_PLOT_HEIGHT, min_plot_width: int = MIN_PLOT_WIDTH) -> None: """ """ super().__init__() self._adc_device = AdcDevice(adc_device_props) self._adc_device.initialize() self.frontend: FrontEndClient = FrontEndClient( frontend_ip, frontend_port, frontend_config) self.channels: Dict[str, AnalyzerChannel] = { fe_chan_id: AnalyzerChannel( fe_chan_id, self, self.frontend, self._adc_device, ) for fe_chan_id in self.frontend.capabilities['channels']} self._selected_chan = None selected_chan_id: str = [*self.channels][0] self._selected_chan: AnalyzerChannel = self.channels[selected_chan_id] self.selected_chan.select() self._split_display: bool = False self._cal_device: str = cal_device if cal_data is not None: self.selected_chan.set_cal_data(cal_data) self._min_plot_width: int = min_plot_width self._min_plot_height: int = min_plot_height self.processEventsMutex = QMutex() self.eventsProcessedCond = QWaitCondition() self.processAppEvents.connect(self.processEvents) self.build_ui() self.update_sweep_buttons() @property def selected_chan(self) -> AnalyzerChannel: return self._selected_chan def processEvents(self) -> None: self.processEventsMutex.lock() QApplication.sendPostedEvents() QApplication.processEvents(QEventLoop.AllEvents, 200) self.eventsProcessedCond.wakeAll() self.processEventsMutex.unlock() def closeEvent(self, event) -> None: for chan in self.channels.values(): chan.stop() self.frontend.cleanup() def channel_select(self, chan_id: str) -> None: if self.selected_chan: if self._split_display: self.selected_chan.spectrum_plot.viewbox.setBorder( color=SpectrumPlotWidget.BORDER_COLOR, width=SpectrumPlotWidget.BORDER_WIDTH) self.selected_chan.spectrum_plot.enable_mouse_events(False) else: self.selected_chan.remove_plots(self._graphic_layout) if self.selected_chan.show_waterfall: self.hide_wf_histogram() self.selected_chan.deselect() self.base_cal_action.triggered.disconnect() self.fe_cal_action.triggered.disconnect() self.noise_floor_action.triggered.disconnect() self.load_cal_action.triggered.disconnect() self.save_cal_action.triggered.disconnect() self._selected_chan = self.channels[chan_id] self.selected_chan.select() self.update_marker_controls() self.base_cal_action.triggered.connect(partial(self.selected_chan.calibrate, AdcThread.CAL_START)) self.fe_cal_action.triggered.connect(self.selected_chan.freqresp_calibrate) self.noise_floor_action.triggered.connect(self.selected_chan.cal_noise_floor) self.load_cal_action.triggered.connect(self.selected_chan.load_cal_data) self.save_cal_action.triggered.connect(self.selected_chan.save_cal_data) if self._split_display: self.selected_chan.spectrum_plot.viewbox.setBorder( color=SpectrumPlotWidget.SELECTED_BORDER_COLOR, width=SpectrumPlotWidget.SELECTED_BORDER_WIDTH) self.selected_chan.spectrum_plot.enable_mouse_events(True) else: self.selected_chan.add_plots(self._graphic_layout) if self.selected_chan.show_waterfall: self.show_wf_histogram() self.update_freq_controls() self.update_rbw_controls() self.update_ampl_controls() self.update_trace_controls() self.update_sweep_buttons() def build_ui(self): vbox = QVBoxLayout() vbox.setSizeConstraint(QLayout.SetMinimumSize) vbox.addWidget(self.create_analyzer_group()) self.container = QWidget() self.container.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Minimum) self.container.setLayout(vbox) self.setCentralWidget(self.container) self.status_bar = QStatusBar(self) self.setStatusBar(self.status_bar) self.setWindowTitle('Red Pitaya Spectrum Analyzer') return vbox def set_fe_cal_action_state(self): if isinstance(self.frontend.front_end, BaseFrontEndService): self.fe_cal_action.setEnabled(False) else: self.fe_cal_action.setEnabled(True) if (self.selected_chan.capabilities.chan_type in [ChannelCapabilities.DIRECT]): self.fe_cal_action.setEnabled(False) def show_freqresp_calibration_dialog(self): if self._cal_device is not None: cal_name, dev_id = self._cal_device.split(',') calibration_dialog = FreqRespCalibrationDialog( self, {cal_name: dev_id}, cal_level=AnalyzerApp.DEFAULT_CAL_LEVEL) else: calibration_dialog = FreqRespCalibrationDialog( self, cal_level=AnalyzerApp.DEFAULT_CAL_LEVEL) result = calibration_dialog.exec_() if result == QDialog.Accepted: return (calibration_dialog.calibration_device, calibration_dialog.calibration_device_addr, calibration_dialog.calibration_level) else: return (None, None, None) def show_noise_floor_calibration_dialog(self): noise_floor_dialog = NoiseFloorCalibrationDialog(self) result = noise_floor_dialog.exec_() if result == QDialog.Accepted: return True else: return False <<analyzer-controls>> <<control-update-functions>>
control-update-functions
@pyqtSlot(float) def _update_centre_freq(self, f): self._update_freq_values(centre=f) @pyqtSlot(float) def _update_start_freq(self, f): self._update_freq_values(start=f) @pyqtSlot(float) def _update_stop_freq(self, f): self._update_freq_values(stop=f) @pyqtSlot(float) def _update_freq_span(self, f): self._update_freq_values(span=f) @pyqtSlot(float) def _update_rbw(self, rbw): self.selected_chan.update_rbw(rbw) @pyqtSlot(bool) def _update_auto_rbw(self, auto): self._auto_rbw_box.setChecked(auto) @pyqtSlot(float) def _update_felo_offset(self, offset): self._update_displayed_frequencies() def _update_freq_values(self, start=None, stop=None, centre=None, span=None) -> None: """Keeps the start, stop, centre and span values consistent. """ self.selected_chan.update_freq_values(start, stop, centre, span) self._update_displayed_frequencies() self.selected_chan.spectrum_plot.update_plot_annotations() self.selected_chan.spectrum_plot.update_markers() def _update_displayed_frequencies(self): self.selected_chan.update_displayed_freq(self._fstart_box, self.selected_chan.start_freq) self.selected_chan.update_displayed_freq(self._fstop_box, self.selected_chan.stop_freq) self.selected_chan.update_displayed_freq(self._fcentre_box, self.selected_chan.centre_freq) self.selected_chan.update_displayed_fspan(self._fspan_box, self.selected_chan.freq_span) self.selected_chan.update_displayed_felo_offset(self._fe_lo_box) @pyqtSlot(float) def _update_ref_level(self, p): if self._reflevel_box is not None: self._update_ampl_values(reflevel=p) @pyqtSlot(float) def _update_ampl_scale(self, s): if self.selected_chan.spectrum_plot: self._update_ampl_values(ampl_scale=s) @pyqtSlot(float) def _update_atten(self, p): self._adc_device.set_channel_attenuation( self.selected_chan.adc_chan_id, p) if self._atten_box is not None: self._update_ampl_values(atten=p) @pyqtSlot(float) def _update_fe_atten(self, p): self.frontend.set_fe_atten(self.selected_chan.fe_chan_id, p) if self._fe_atten_box is not None: self._update_ampl_values(fe_atten=p) @pyqtSlot(float) def _update_fe_gain(self, p): if self._fe_gain_box is not None: self._update_ampl_values(fe_gain=p) def _update_ampl_values(self, reflevel=None, ampl_scale=None, atten=None, fe_atten=None, fe_gain=None): if reflevel is not None: self._reflevel_box.setValue(reflevel) if ampl_scale is not None: self._ampl_scale_box.setValue(ampl_scale) if atten is not None: self._atten_box.setValue(atten) if fe_atten is not None: self._fe_atten_box.setValue(fe_atten) if fe_gain is not None: self._fe_gain_box.setValue(fe_gain) self.selected_chan.spectrum_plot.update_plot_annotations() self.selected_chan.spectrum_plot.update_markers() def _update_trace_average(self, avg): self._trace_avg_box.setValue(avg)
analyzer-controls
def create_display_layout(self) -> QHBoxLayout: self._display_layout = QHBoxLayout() # self._display_layout.setSizeConstraint(QLayout.SetMinimumSize) vbox = QVBoxLayout() pg.setConfigOptions(foreground='k') self._graphic_layout = AnalyzerLayoutWidget() self._graphic_layout.setMinimumWidth(self._min_plot_width) self._graphic_layout.setMinimumHeight(self._min_plot_height) self._graphic_layout.setSizePolicy(QSizePolicy.MinimumExpanding, QSizePolicy.Minimum) self._graphic_layout.setBackground(AnalyzerLayoutWidget.DARK_BACKGROUND_COLOR) self._graphic_layout.setViewportUpdateMode( QGraphicsView.FullViewportUpdate) self.selected_chan.spectrum_plot.add_plot(self._graphic_layout) vbox.addWidget(self._graphic_layout) self._display_layout.addLayout(vbox) vbox = QVBoxLayout() vbox.setSpacing(0) vbox.setSizeConstraint(QLayout.SetMinimumSize) vbox.addLayout(self._create_sweepbtn_layout()) vbox.addLayout(self._create_chanbtn_layout()) self._tb = QTabWidget() self._tb.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Preferred) self._tb.setTabPosition(QTabWidget.South) self._tb.currentChanged.connect(self.tab_changed) self._freqampl_ctl_frame = QFrame() freqampl_vbox = QVBoxLayout() # freqampl_vbox.setSpacing(10) freqampl_vbox.addLayout(self._create_freqctl_layout()) freqampl_vbox.addLayout(self._create_amplctl_layout()) freqampl_vbox.addStretch() self._freqampl_ctl_frame.setLayout(freqampl_vbox) self._tb.addTab(self._freqampl_ctl_frame, '&Freq/Ampl') self._marker_ctl_frame = QFrame() self._marker_ctl_frame.setLayout(self._create_marker_layout()) self._tb.addTab(self._marker_ctl_frame, '&Markers') self._trace_ctl_frame = QFrame() self._trace_ctl_frame.setLayout(self._create_trace_layout()) self._tb.addTab(self._trace_ctl_frame, 'Tr&ace') vbox.addWidget(self._tb) self._display_layout.addLayout(vbox) self._graphic_layout.sigKeyPress.connect(self.key_pressed) self._graphic_layout.sigKeyRelease.connect(self.key_released) return self._display_layout @pyqtSlot(int) def tab_changed(self, tab_idx: int) -> None: if tab_idx == AnalyzerApp.FREQAMPL_TAB: pass elif tab_idx == AnalyzerApp.MARKER_TAB: pass elif tab_idx == AnalyzerApp.TRACE_TAB: pass def key_pressed(self, key_ev) -> None: if key_ev.key() == Qt.Key_R: self._delta_buttons[SpectrumMarker.REF_DELTA_TYPE].click() elif key_ev.key() == Qt.Key_D: self._delta_buttons[SpectrumMarker.DELTA_DELTA_TYPE].click() self.selected_chan.spectrum_plot.key_pressed(key_ev) def key_released(self, key_ev) -> None: self.selected_chan.spectrum_plot.key_released(key_ev) def create_analyzer_group(self) -> QGroupBox: self._analyzer_group = QGroupBox("") self._analyzer_group.setSizePolicy(QSizePolicy.Minimum, QSizePolicy.Minimum) self._analyzer_group.setLayout(self.create_display_layout()) return self._analyzer_group def enable_controls(self, en) -> None: self._tb.setEnabled(en) self.enable_sweep_buttons(en) self.enable_chan_buttons(en) <<sweep-controls>> <<channel-controls>> <<frequency-controls>> <<amplitude-controls>> <<marker-controls>> <<trace-controls>>
sweep-controls
def _create_sweepbtn_layout(self): """ """ self._btn_layout = QGridLayout() self._start_btn = QPushButton("&Start") self._btn_layout.addWidget(self._start_btn, 0, 0) self._start_btn.clicked.connect(self.start_btn_clicked) self._stop_btn = QPushButton("S&top") self._btn_layout.addWidget(self._stop_btn, 0, 1) self._stop_btn.clicked.connect(self.stop_btn_clicked) self._single_btn = QPushButton("Si&ngle") self._btn_layout.addWidget(self._single_btn, 0, 2) self._single_btn.clicked.connect(self.single_btn_clicked) return self._btn_layout @pyqtSlot() def on_adc_thread_started(self): if not self._split_display: self.enable_chan_buttons(False) self.update_sweep_buttons() @pyqtSlot() def on_adc_thread_stopped(self): self.update_sweep_buttons() if not self._split_display: self.enable_chan_buttons(True) def enable_sweep_buttons(self, en): self._start_btn.setEnabled(en) self._single_btn.setEnabled(en) self._stop_btn.setEnabled(en) def update_sweep_buttons(self): self._start_btn.setEnabled(not self.selected_chan.adc_thread.running) self._single_btn.setEnabled(not self.selected_chan.adc_thread.running) self._stop_btn.setEnabled(self.selected_chan.adc_thread.running) @pyqtSlot() def start_btn_clicked(self): self.selected_chan.start() @pyqtSlot() def stop_btn_clicked(self): self.selected_chan.stop() @pyqtSlot() def single_btn_clicked(self): self.selected_chan.start(single_sweep=True) @pyqtSlot(int) def start_sweep_count(self, count): self.selected_chan.start(sweep_count=count)
channel-controls
def _create_chanbtn_layout(self): self.chan_hbox = QHBoxLayout() self.chan_grid = QGridLayout() self.chan_grid.setHorizontalSpacing(0) self._chan_buttons = [] chan_grp = QButtonGroup(self) chan_btn = QPushButton('CH &1') chan_btn.setCheckable(True) chan_grp.addButton(chan_btn) chan_grp.setId(chan_btn, 0) self.chan_grid.addWidget(chan_btn, 0, 0) self._chan_buttons.append(chan_btn) chan_btn = QPushButton('CH &2') chan_btn.setCheckable(True) chan_grp.addButton(chan_btn) chan_grp.setId(chan_btn, 1) self.chan_grid.addWidget(chan_btn, 0, 1) self._chan_buttons.append(chan_btn) self._chan_button_group = chan_grp chan_grp.buttonClicked.connect(self.select_channel) self._chan_buttons[0].setChecked(True) self.chan_hbox.addLayout(self.chan_grid) self.split_plot_btn = QPushButton('Sp&lit') self.split_plot_btn.setCheckable(True) self.split_plot_btn.toggled.connect(self.split_spectrum_display) self.split_plot_btn.setChecked(False) self.chan_hbox.addWidget(self.split_plot_btn) self.cal_btn = QPushButton('Cal') self.cal_type_menu = QMenu() cal_type_group = QActionGroup(self) self.base_cal_action = QAction('Base Cal', cal_type_group, checkable=False) self.base_cal_action.triggered.connect(partial(self.selected_chan.calibrate, AdcThread.CAL_START)) self.fe_cal_action = QAction('Freq. Response', cal_type_group, checkable=False) self.fe_cal_action.triggered.connect( self.selected_chan.freqresp_calibrate) self.noise_floor_action = QAction('Noise Floor', cal_type_group, checkable=False) self.noise_floor_action.triggered.connect(self.selected_chan.cal_noise_floor) self.load_cal_action = QAction('Load Cal. Data', cal_type_group, checkable=False) self.load_cal_action.triggered.connect(self.selected_chan.load_cal_data) self.save_cal_action = QAction('Save Cal. Data', cal_type_group, checkable=False) self.save_cal_action.triggered.connect(self.selected_chan.save_cal_data) self.cal_type_menu.addAction(self.fe_cal_action) self.cal_type_menu.addAction(self.noise_floor_action) self.cal_type_menu.addAction(self.base_cal_action) self.cal_type_menu.addSeparator() self.cal_type_menu.addAction(self.load_cal_action) self.cal_type_menu.addAction(self.save_cal_action) self.cal_btn.setMenu(self.cal_type_menu) self.chan_hbox.addWidget(self.cal_btn) return self.chan_hbox def enable_chan_buttons(self, en): for btn in self._chan_buttons: btn.setEnabled(en) self.split_plot_btn.setEnabled(en) self.cal_btn.setEnabled(en) @pyqtSlot(QAbstractButton) def select_channel(self, btn): chan_btn_id = self._chan_button_group.id(btn) chan_state = btn.isChecked() chan_id = [*self.channels][chan_btn_id] self.channel_select(chan_id) def split_spectrum_display(self, is_checked): if is_checked: self.selected_chan.remove_plots(self._graphic_layout) for row, chan in enumerate(self.channels.values()): chan.spectrum_plot.set_split_display(True) chan.spectrum_plot.add_split_plot(self._graphic_layout, row) self.selected_chan.spectrum_plot.viewbox.setBorder( color=SpectrumPlotWidget.SELECTED_BORDER_COLOR, width=SpectrumPlotWidget.SELECTED_BORDER_WIDTH) self.selected_chan.spectrum_plot.enable_mouse_events(True) self._show_waterfall_cb.setEnabled(False) if self.selected_chan.show_waterfall: self.hide_wf_histogram() self._split_display = True else: for row, chan in enumerate(self.channels.values()): chan.spectrum_plot.remove_split_plot(self._graphic_layout) chan.spectrum_plot.set_split_display(False) self.selected_chan.spectrum_plot.viewbox.setBorder( color=SpectrumPlotWidget.BORDER_COLOR, width=SpectrumPlotWidget.BORDER_WIDTH) self.selected_chan.add_plots(self._graphic_layout) self._show_waterfall_cb.setEnabled(True) if self.selected_chan.show_waterfall: self.show_wf_histogram() self._split_display = False
frequency-controls
class RbwSpinBox(QSpinBox): ACCEPTED_RBW_VALUES = [ 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10_000, 20_000, 50_000, 100_000, 200_000, 400_000 ] def __init__(self) -> None: super().__init__() self.setRange(RbwSpinBox.ACCEPTED_RBW_VALUES[0], RbwSpinBox.ACCEPTED_RBW_VALUES[-1]) def textFromValue(self, value: int) -> str: if value < 1000: return f"{value} Hz" else: return f"{value//1000} kHz" def valueFromText(self, text: str) -> int: try: value, suffix = text.split() except ValueError: # Only one value in text. Assume that it's kHz suffix = 'kHz' value = text if suffix == 'Hz': return int(value) else: return int(value) * 1000 def stepBy(self, steps: int) -> None: index = ( (max(0, RbwSpinBox.ACCEPTED_RBW_VALUES.index(self.value())) + steps) % len(RbwSpinBox.ACCEPTED_RBW_VALUES)) self.setValue(RbwSpinBox.ACCEPTED_RBW_VALUES[index])
def _create_freqctl_layout(self) -> QVBoxLayout: """ """ self._freq_vbox = QVBoxLayout() fbox = QFormLayout() fbox.setFormAlignment(Qt.AlignHCenter | Qt.AlignTop); fbox.setFieldGrowthPolicy(QFormLayout.FieldsStayAtSizeHint) self._freq_vbox.addLayout(fbox) self._fcentre_box = QDoubleSpinBox() self._fcentre_box.setDecimals(3) self._fcentre_box.setSuffix(FREQ_SUFFIXES[0]) self._fcentre_box.lineEdit().editingFinished.connect( self.set_centre_freq) fbox.addRow(QLabel("Centre Freq:"), self._fcentre_box) self._fstart_box = QDoubleSpinBox() self._fstart_box.setDecimals(3) self._fstart_box.setSuffix(FREQ_SUFFIXES[0]) self._fstart_box.lineEdit().editingFinished.connect( self.set_start_freq) fbox.addRow(QLabel("Start Freq:"), self._fstart_box) self._fstop_box = QDoubleSpinBox() self._fstop_box.setDecimals(3) self._fstop_box.setSuffix(FREQ_SUFFIXES[0]) self._fstop_box.lineEdit().editingFinished.connect( self.set_stop_freq) fbox.addRow(QLabel("Stop Freq:"), self._fstop_box) self._fspan_box = QDoubleSpinBox() self._fspan_box.setDecimals(4) self._fspan_box.setSuffix(FREQ_SUFFIXES[0]) self._fspan_box.lineEdit().editingFinished.connect( self.set_freq_span) fbox.addRow(QLabel("Span:"), self._fspan_box) hbox = QHBoxLayout() self._rbw_box = RbwSpinBox() self._rbw_box.valueChanged.connect(self.set_rbw) self._rbw_box.setValue(RbwSpinBox.ACCEPTED_RBW_VALUES[-1]) hbox.addWidget(self._rbw_box) hbox.addWidget(QLabel("Auto:")) self._auto_rbw_box = QCheckBox() self._auto_rbw_box.stateChanged.connect(self.set_auto_rbw) self._auto_rbw_box.setChecked(True) hbox.addWidget(self._auto_rbw_box) fbox.addRow(QLabel("RBW:"), hbox) self._fe_lo_box = QDoubleSpinBox() self._fe_lo_box.setDecimals(3) self._fe_lo_box.setSuffix(FREQ_SUFFIXES[0]) self._fe_lo_box.lineEdit().editingFinished.connect( self.set_felo_offset) self._fe_lo_label = QLabel("FE LO Offset:") fbox.addRow(self._fe_lo_label, self._fe_lo_box) self._fe_lo_box_displayed = True # QFormLayout rows start at 0 for the first row. self._fe_lo_box_row = 5 self._freq_fbox = fbox self.update_freq_controls() hbox = QHBoxLayout() self._full_span_btn = QPushButton("Full Span") self._full_span_btn.clicked.connect(self.set_full_span) hbox.addStretch() hbox.addWidget(self._full_span_btn) hbox.addStretch() self._freq_vbox.addLayout(hbox) #self._freq_vbox.addStretch() return self._freq_vbox def update_freq_controls(self): self._fcentre_box.setRange(self.selected_chan.min_freq, self.selected_chan.max_freq) self._fstart_box.setRange(self.selected_chan.min_freq, self.selected_chan.max_freq) self._fstop_box.setRange(self.selected_chan.min_freq, self.selected_chan.max_freq) self._fspan_box.setRange(self.selected_chan.min_span, self.selected_chan.max_span) self.update_felo_offset_frequency() self._update_displayed_frequencies() def update_rbw_controls(self): self._rbw_box.setValue(self.selected_chan.rbw) if self.selected_chan.auto_rbw is True: self._auto_rbw_box.setCheckState(Qt.CheckState.Checked) else: self._auto_rbw_box.setCheckState(Qt.CheckState.Unchecked) def update_felo_offset_frequency(self): if self.selected_chan.adc_chan_type == ChannelCapabilities.SINGLE: if self._fe_lo_box_displayed is False: self._fe_lo_box.show() self._fe_lo_label.show() self._freq_fbox.insertRow(self._fe_lo_box_row, self._fe_lo_label, self._fe_lo_box) self._fe_lo_box_displayed = True else: if self._fe_lo_box_displayed is True: self._freq_fbox.takeRow(self._fe_lo_box_row) self._fe_lo_box_displayed = False self._fe_lo_box.hide() self._fe_lo_label.hide() def set_start_freq(self) -> None: val = self._fstart_box.lineEdit().text() self.selected_chan.start_freq = parse_freq_spec(val) def set_stop_freq(self) -> None: val = self._fstop_box.lineEdit().text() self.selected_chan.stop_freq = parse_freq_spec(val) def set_centre_freq(self) -> None: val = self._fcentre_box.lineEdit().text() self.selected_chan.centre_freq = parse_freq_spec(val) def set_freq_span(self) -> None: val = self._fspan_box.lineEdit().text() self.selected_chan.freq_span = parse_freq_spec(val) def set_full_span(self) -> None: self.selected_chan.start_freq = self.selected_chan.min_freq self.selected_chan.stop_freq = self.selected_chan.max_freq def set_rbw(self, value: int) -> None: self.selected_chan.rbw = value def set_nearest_accepted_rbw(self, rbw: float) -> None: idx = np.absolute(np.array(RbwSpinBox.ACCEPTED_RBW_VALUES) - rbw).argmin() self._rbw_box.setValue(RbwSpinBox.ACCEPTED_RBW_VALUES[idx]) def set_auto_rbw(self, state: int) -> None: if state == Qt.CheckState.Checked: self._rbw_box.setEnabled(False) self.selected_chan.auto_rbw = True elif state == Qt.CheckState.Unchecked: self._rbw_box.setEnabled(True) self.selected_chan.auto_rbw = False def set_felo_offset(self) -> None: val = self._fe_lo_box.lineEdit().text() self.selected_chan._felo_offset = float(val.split()[0])
amplitude-controls
def _create_amplctl_layout(self): """ """ self._ampl_vbox = QVBoxLayout() fbox = QFormLayout() fbox.setFormAlignment(Qt.AlignHCenter | Qt.AlignTop); fbox.setFieldGrowthPolicy(QFormLayout.FieldsStayAtSizeHint) self._ampl_vbox.addLayout(fbox) self._reflevel_box = QDoubleSpinBox() self._reflevel_box.setDecimals(1) self._reflevel_box.setSuffix(REFLEVEL_SUFFIX) self._reflevel_box.lineEdit().editingFinished.connect( self.set_reflevel) fbox.addRow(QLabel("Ref. Level:"), self._reflevel_box) self._atten_box = QDoubleSpinBox() self._atten_box.setDecimals(2) self._atten_box.setSuffix(PWR_SUFFIX) self._atten_box.lineEdit().editingFinished.connect( self.set_atten) fbox.addRow(QLabel("ADC Attenuation:"), self._atten_box) self._fe_atten_box = QDoubleSpinBox() self._fe_atten_box.setDecimals(2) self._fe_atten_box.setSuffix(PWR_SUFFIX) self._fe_atten_box.lineEdit().editingFinished.connect( self.set_fe_atten) self._fe_atten_label = QLabel("FE Attenuation:") self._fe_atten_box_displayed = True # QFormLayout rows start at 0 for the first row. self._fe_atten_box_row = 2 fbox.addRow(self._fe_atten_label, self._fe_atten_box) hbox = QHBoxLayout() self._fe_gain_box = QDoubleSpinBox() self._fe_gain_box.setDecimals(1) self._fe_gain_box.setSuffix(PWR_SUFFIX) self._fe_gain_box.lineEdit().editingFinished.connect( self.set_fe_gain) hbox.addWidget(self._fe_gain_box) # self._load_fegain_btn = QPushButton("Load...") # self._load_fegain_btn.clicked.connect(self.load_fe_gain) # hbox.addWidget(self._load_fegain_btn) fbox.addRow(QLabel("FE Gain:"), hbox) self._ampl_scale_box = QDoubleSpinBox() self._ampl_scale_box.setDecimals(1) self._ampl_scale_box.setSuffix(PWR_SUFFIX) self._ampl_scale_box.lineEdit().editingFinished.connect( self.set_ampl_scale) fbox.addRow(QLabel("Ampl. Scale:"), self._ampl_scale_box) self._ampl_fbox = fbox self._ampl_vbox.addStretch() self.update_ampl_controls() return self._ampl_vbox def update_ampl_controls(self): self._reflevel_box.setRange(self.selected_chan.min_reflevel, self.selected_chan.max_reflevel) self._reflevel_box.setValue(self.selected_chan.reflevel) self._reflevel_box.setSingleStep(self.selected_chan.reflevel_step) self._atten_box.setRange(self.selected_chan.min_atten, self.selected_chan.max_atten) self._atten_box.setValue(self.selected_chan.atten) self._atten_box.setSingleStep(self.selected_chan.atten_step) self.update_fe_attenuation() self._ampl_scale_box.setRange(self.selected_chan.min_ampl_scale, self.selected_chan.max_ampl_scale) self._ampl_scale_box.setValue(self.selected_chan.ampl_scale) self._ampl_scale_box.setSingleStep(1.0) self._fe_gain_box.setValue(self.selected_chan.fe_gain) def update_fe_attenuation(self): if self.selected_chan.fe_attenuation is None: if self._fe_atten_box_displayed is True: self._ampl_fbox.takeRow(self._fe_atten_box_row) self._fe_atten_box_displayed = False self._fe_atten_box.hide() self._fe_atten_label.hide() else: if self._fe_atten_box_displayed is False: self._fe_atten_box.show() self._fe_atten_label.show() self._ampl_fbox.insertRow(self._fe_atten_box_row, self._fe_atten_label, self._fe_atten_box) self._fe_atten_box_displayed = True self._fe_atten_box.setRange(self.selected_chan.fe_attenuation.min, self.selected_chan.fe_attenuation.max) self._fe_atten_box.setValue(self.selected_chan.fe_atten) self._fe_atten_box.setSingleStep(self.selected_chan.fe_attenuation.step) def set_reflevel(self) -> None: val = self._reflevel_box.lineEdit().text() self.selected_chan.reflevel = float(val.split()[0]) def set_ampl_scale(self) -> None: val = self._ampl_scale_box.lineEdit().text() self.selected_chan.ampl_scale = float(val.split()[0]) def set_atten(self) -> None: val = self._atten_box.lineEdit().text() self.selected_chan.atten = float(val.split()[0]) def set_fe_atten(self) -> None: val = self._fe_atten_box.lineEdit().text() self.selected_chan.fe_atten = float(val.split()[0]) def set_fe_gain(self) -> None: val = self._fe_gain_box.lineEdit().text() self.selected_chan.fe_gain = float(val.split()[0]) # def load_fe_gain(self) -> None: # pass
marker-controls
Figure 3: Activate a spectrum plot marker
Figure 4: Change a spectrum plot marker type
def _create_marker_layout(self) -> QVBoxLayout: """ """ self._mkr_vbox = QVBoxLayout() fbox = QFormLayout() fbox.setFormAlignment(Qt.AlignHCenter | Qt.AlignTop); fbox.setFieldGrowthPolicy(QFormLayout.FieldsStayAtSizeHint) fbox.setVerticalSpacing(5) self._mkr_vbox.addLayout(fbox) mkr_grid = QGridLayout() #mkr_grid.setHorizontalSpacing(0) self._mkr_buttons = [] mbg = QButtonGroup(self) for i in range(SpectrumPlotWidget.MKR_COUNT): mb = QPushButton(f'{i+1}') mb.setCheckable(True) mb.setShortcut(QKeySequence(self.tr(f"Ctrl+{i+1}"))) mbg.addButton(mb) mbg.setId(mb, i) mkr_grid.addWidget(mb, i//2, i%2) self._mkr_buttons.append(mb) fbox.addRow(QLabel("Marker:"), mkr_grid) cb = QComboBox() cb.addItems(SpectrumMarker.MKR_TYPES) fbox.addRow(QLabel("Type:"), cb) self._mkr_type_box = cb self._mkr_button_group = mbg self.md_grid = QGridLayout() self.md_grid.setHorizontalSpacing(0) self._delta_buttons = [] mdg = QButtonGroup(self) mdb = QPushButton('Ref') mdb.setCheckable(True) mdg.addButton(mdb) mdg.setId(mdb, SpectrumMarker.REF_DELTA_TYPE) self.md_grid.addWidget(mdb, 0, 0) self._delta_buttons.append(mdb) mdb = QPushButton('Delta') mdb.setCheckable(True) mdg.addButton(mdb) mdg.setId(mdb, SpectrumMarker.DELTA_DELTA_TYPE) self.md_grid.addWidget(mdb, 0, 1) self._delta_buttons.append(mdb) fbox.addRow(QLabel(""), self.md_grid) self._mkr_delta_group = mdg for b in self._delta_buttons: b.setEnabled(False) peak_grid = QGridLayout() peak_grid.setHorizontalSpacing(0) peak_grid.setVerticalSpacing(0) for i, (lbl, slot_fn) in enumerate( [('&Peak', self.peak_search), ('Next', self.next_peak), ('Right', self.peak_right), ('Left', self.peak_left)]): btn = QPushButton(lbl) btn.clicked.connect(slot_fn) peak_grid.addWidget(btn, i//2, i%2) fbox.addRow(QLabel('Peak:'), peak_grid) mkr_move_grid = QGridLayout() mkr_move_grid.setHorizontalSpacing(0) mkr_move_grid.setVerticalSpacing(0) self._mkr_move_buttons = [] for i, (lbl, slot_fn) in enumerate( [('Mkr \u2192 &CF', self.marker_to_cf), ('Mkr \u2192 RL', self.marker_to_rl)]): btn = QPushButton(lbl) btn.clicked.connect(slot_fn) mkr_move_grid.addWidget(btn, i//2, i%2) self._mkr_move_buttons.append(btn) fbox.addRow(QLabel('Mkr \u2192:'), mkr_move_grid) for b in self._mkr_move_buttons: b.setEnabled(False) self.mf_grid = QGridLayout() self.mf_grid.setHorizontalSpacing(0) self._mkr_fn_buttons = [] mfg = QButtonGroup(self) mfb = QPushButton('Off') mfb.setCheckable(True) mfg.addButton(mfb) mfg.setId(mfb, SpectrumMarker.MKR_FN_OFF) self.mf_grid.addWidget(mfb, 0, 0) self._mkr_fn_buttons.append(mfb) mfb = QPushButton('Noise') mfb.setCheckable(True) mfg.addButton(mfb) mfg.setId(mfb, SpectrumMarker.MKR_FN_NOISE) self.mf_grid.addWidget(mfb, 0, 1) self._mkr_fn_buttons.append(mfb) fbox.addRow(QLabel("Function:"), self.mf_grid) self._mkr_fn_group = mfg mbg.buttonClicked.connect(self.select_marker) self._mkr_buttons[0].setChecked(True) self._mkr_type_box.currentIndexChanged.connect( self.select_marker_type) mdg.buttonClicked.connect(self.select_delta_marker) self._delta_buttons[0].setChecked(True) mfg.buttonClicked.connect(self.select_mkr_fn) self._mkr_fn_buttons[0].setChecked(True) self._mkr_vbox.addStretch() return self._mkr_vbox def marker_type(self): mkr = self.selected_chan.spectrum_plot.selected_mkr return mkr.marker_type def marker_posn(self): mkr = self.selected_chan.spectrum_plot.selected_mkr return (mkr.freq, mkr.pwr) def marker_delta_posn(self): mkr = self.selected_chan.spectrum_plot.selected_mkr delta_pwr = delta_freq = None if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA, SpectrumMarker.MKR_TYPE_SPAN]: delta_pwr = mkr.delta_pwr delta_freq = mkr.delta_freq return (delta_freq, delta_pwr) def select_marker(self, btn): mkr_id = self._mkr_button_group.id(btn) self.selected_chan.spectrum_plot.select_marker(mkr_id) mkr = self.selected_chan.spectrum_plot.marker(mkr_id) self._mkr_type_box.setCurrentIndex( SpectrumMarker.MKR_TYPES.index(mkr.marker_type)) def select_marker_type(self, type_idx): spectrum_plot = self.selected_chan.spectrum_plot spectrum_plot.select_marker_type(type_idx) new_marker_type = SpectrumMarker.MKR_TYPES[type_idx] if new_marker_type == SpectrumMarker.MKR_TYPE_DELTA: for b in self._delta_buttons: b.setEnabled(True) self._delta_buttons[spectrum_plot.delta_mkr_type].click() else: for b in self._delta_buttons: b.setEnabled(False) if new_marker_type == SpectrumMarker.MKR_TYPE_OFF: for b in self._mkr_move_buttons: b.setEnabled(False) else: for b in self._mkr_move_buttons: b.setEnabled(True) def select_delta_marker(self, btn): mkr_id = self._mkr_delta_group.id(btn) self.selected_chan.spectrum_plot.select_delta_marker(mkr_id) def select_mkr_fn(self, btn): mkr_id = self._mkr_fn_group.id(btn) self.selected_chan.spectrum_plot.select_mkr_fn(mkr_id) def update_marker_controls(self): spectrum_plot = self.selected_chan.spectrum_plot self._mkr_buttons[spectrum_plot.selected_mkr_id].click() self._delta_buttons[spectrum_plot.delta_mkr_type].click() self._mkr_fn_buttons[spectrum_plot.mkr_function].click() @pyqtSlot(int, str) def update_marker_type(self, mkr_id, mkr_type): if mkr_id not in range(1,5): return if mkr_type not in SpectrumMarker.MKR_TYPES: return self._mkr_buttons[mkr_id-1].click() self._mkr_type_box.setCurrentIndex( SpectrumMarker.MKR_TYPES.index(mkr_type)) @pyqtSlot(int, int) def update_marker_delta_type(self, mkr_id, delta_type): if mkr_id not in range(1,5): return if delta_type not in [SpectrumMarker.DELTA_DELTA_TYPE, SpectrumMarker.REF_DELTA_TYPE]: return self._delta_buttons[delta_type].click() @pyqtSlot(bool) def peak_search(self, checked): self.selected_chan.spectrum_plot.peak_search(checked) @pyqtSlot(bool) def next_peak(self, checked): self.selected_chan.spectrum_plot.next_peak(checked) def peak_right(self, checked): self.selected_chan.spectrum_plot.peak_right(checked) def peak_left(self, checked): self.selected_chan.spectrum_plot.peak_left(checked) def marker_to_cf(self, checked): mkr = self.selected_chan.spectrum_plot.selected_mkr self.selected_chan.centre_freq = mkr.freq def marker_to_rl(self, checked): mkr = self.selected_chan.spectrum_plot.selected_mkr self.selected_chan.reflevel = mkr.pwr
trace-controls
def _create_trace_layout(self) -> QVBoxLayout: """ """ self._trace_vbox = QVBoxLayout() fbox = QFormLayout() fbox.setFormAlignment(Qt.AlignHCenter | Qt.AlignTop); fbox.setFieldGrowthPolicy(QFormLayout.FieldsStayAtSizeHint) self._trace_fbox = fbox fbox.setVerticalSpacing(5) self._trace_vbox.addLayout(fbox) detector_type_grid = QGridLayout() self._detector_type_buttons = [] detector_type_grp = QButtonGroup(self) detector_type_grid.setHorizontalSpacing(0) detector_type_grid.setVerticalSpacing(0) detector_types = [DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG] for i, lbl in enumerate(['Normal', 'Sample', 'Pos. Peak', 'Neg. Peak']): btn = QPushButton(lbl) btn.setCheckable(True) detector_type_grp.addButton(btn) detector_type_grp.setId(btn, detector_types[i]) self._detector_type_buttons.append(btn) detector_type_grid.addWidget(btn, i//2, i%2) fbox.addRow(QLabel('Detector:'), detector_type_grid) detector_type_grp.buttonClicked.connect(self.select_detector_type) self._detector_type_buttons[0].setChecked(True) self._detector_type_group = detector_type_grp trace_grid = QGridLayout() #trace_grid.setHorizontalSpacing(0) self._trace_buttons = [] trace_group = QButtonGroup(self) for i in range(SpectrumPlotWidget.TRACE_COUNT): tb = QPushButton(f'{i+1}') tb.setCheckable(True) trace_group.addButton(tb) trace_group.setId(tb, i) trace_grid.addWidget(tb, i//2, i%2) self._trace_buttons.append(tb) fbox.addRow(QLabel("Trace:"), trace_grid) trace_group.buttonClicked.connect(self.set_selected_trace) self._trace_buttons[0].setChecked(True) self._trace_group = trace_group trace_type_grid = QGridLayout() self._trace_type_buttons = [] trace_type_grp = QButtonGroup(self) trace_type_grid.setHorizontalSpacing(0) trace_type_grid.setVerticalSpacing(0) trace_types = [TRACE_TYPE_NORMAL, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG, TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE] for i, lbl in enumerate([ 'Normal', 'Max Hold', 'Min Hold', 'Trace Avg', 'Blank', 'Freeze']): btn = QPushButton(lbl) btn.setCheckable(True) trace_type_grp.addButton(btn) trace_type_grp.setId(btn, trace_types[i]) self._trace_type_buttons.append(btn) trace_type_grid.addWidget(btn, i//2, i%2) fbox.addRow(QLabel('Trace type:'), trace_type_grid) trace_type_grp.buttonClicked.connect(self.select_trace_type) self._trace_type_buttons[0].setChecked(True) self._trace_type_group = trace_type_grp self._trace_avg_box = QSpinBox() self._trace_avg_box.setRange(0, AnalyzerChannel.MAX_TRACE_AVG) self._trace_avg_box.lineEdit().editingFinished.connect( self.set_trace_avg) self._trace_avg_box.setValue(AnalyzerChannel.DEFAULT_TRACE_AVG) fbox.addRow(QLabel("Average:"), self._trace_avg_box) self._noise_floor_cb = QCheckBox("") self._noise_floor_cb.setChecked(False) self._noise_floor_cb.stateChanged.connect(self.set_display_noise_floor) fbox.addRow(QLabel("Noise Floor:"), self._noise_floor_cb) self._apply_cal_cb = QCheckBox("") self._apply_cal_cb.setChecked(True) self._apply_cal_cb.stateChanged.connect(self.set_apply_cal) fbox.addRow(QLabel("Apply Cal.:"), self._apply_cal_cb) self._show_waterfall_cb = QCheckBox("") self._show_waterfall_cb.setChecked(False) self._show_waterfall_cb.stateChanged.connect(self.set_waterfall_status) fbox.addRow(QLabel("Waterfall:"), self._show_waterfall_cb) if self._split_display is True: self._show_waterfall_cb.setEnabled(False) else: self._show_waterfall_cb.setEnabled(True) # QFormLayout row count starts at 0 self._wf_lut_box_row = 6 self._wf_histogram = pg.HistogramLUTWidget() self._wf_histogram.gradient.loadPreset("flame") self._wf_histogram_label = QLabel("") self._wf_histogram.hide() self._wf_histogram_label.hide() self._trace_vbox.addStretch() return self._trace_vbox def update_trace_controls(self): self._detector_type_buttons[self.selected_chan.detector_type].click() self._trace_type_buttons[self.selected_chan.trace_type].click() self._trace_buttons[self.selected_chan.selected_trace - 1].click() self._trace_avg_box.setValue(self.selected_chan.trace_average) self._noise_floor_cb.setChecked(self.selected_chan.show_noise_floor) self._apply_cal_cb.setChecked(self.selected_chan.apply_freq_resp_correction) self._show_waterfall_cb.setChecked(self.selected_chan.show_waterfall) def show_wf_histogram(self): self._wf_histogram.show() self._wf_histogram_label.show() self._trace_fbox.insertRow(self._wf_lut_box_row, self._wf_histogram_label, self._wf_histogram) def hide_wf_histogram(self): self._trace_fbox.takeRow(self._wf_lut_box_row) self._wf_histogram.hide() self._wf_histogram_label.hide() def select_detector_type(self, btn) -> None: detector_type = self._detector_type_group.id(btn) self.selected_chan.detector_type = detector_type @pyqtSlot(int) def update_detector_type(self, detector_type: int) -> None: self.selected_chan.detector_type = detector_type self._detector_type_buttons[self.selected_chan.detector_type].click() @pyqtSlot(int, int) def update_trace_type(self, trace_id: int, trace_type: int) -> None: self.selected_chan.trace_type = trace_type self._trace_type_buttons[self.selected_chan.trace_type].click() @pyqtSlot(int) def update_selected_trace(self, trace_id: int): self.selected_chan.selected_trace = trace_id self._trace_buttons[self.selected_chan.selected_trace - 1].click() def set_selected_trace(self, btn) -> None: trace_id = self._trace_group.id(btn) + 1 self.selected_chan.selected_trace = trace_id def select_trace_type(self, btn) -> None: trace_type = self._trace_type_group.id(btn) self.selected_chan.trace_type = trace_type def set_trace_avg(self) -> None: val = self._trace_avg_box.lineEdit().text() self.selected_chan.trace_average = float(val.split()[0]) def set_display_noise_floor(self, state: int) -> None: self.selected_chan.show_noise_floor = state def set_apply_cal(self, state: int) -> None: self.selected_chan.apply_freq_resp_correction = state def set_waterfall_status(self, state: int) -> None: if self.selected_chan.show_waterfall != state: self.selected_chan.show_waterfall = state if state: self.show_wf_histogram() else: self.hide_wf_histogram()
AnalyzerChannel Class
from typing import ( Optional, Dict ) from time import sleep import json import numpy as np from PyQt5.QtCore import ( Qt, QObject, QThread, QWaitCondition, QMutex, pyqtSignal, pyqtSlot ) from PyQt5.QtWidgets import ( QMessageBox, QDialog, QFileDialog, QMainWindow, QComboBox, QVBoxLayout, QHBoxLayout, QLabel, QFormLayout, QLineEdit, QDialogButtonBox, QSpinBox ) from tam import ( BASEBAND_CAL, FREQRESP_CAL, NO_CAL, SIGGEN_MODELS, RP_ADC_MIN_FREQUENCY, RP_ADC_MAX_FREQUENCY, RP_ADC_MAX_SPAN, RP_ADC_MIN_SPAN, SIGGEN_LIMITS, SMHU58_GPIBID, DSG815_ID, DG4162_ID, InstrumentMgr, InstrumentInitializeException, UnknownInstrumentModelException, ChannelCapabilities ) from utils import ( FREQ_SUFFIXES, PWR_SUFFIX, REFLEVEL_SUFFIX, TRACE_TYPE_NORMAL, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG, TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE, DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) from fe_client import ( FrontEndClient, BaseFrontEndApp ) from data_store import DataStore from adccal import AdcCalibration from adc import AdcThread from plot import SpectrumPlotWidget, SpectrumMarker from waterfall import WaterfallPlotWidget <<calibration-dialog>> <<noise-floor-calibrator>> <<freq-response-calibrator>> <<rbw-spin-box>> class AnalyzerChannel(QObject): DEFAULT_ADC_ATTEN = 0.0 DEFAULT_DETECTOR_TYPE = DETECTOR_TYPE_NORMAL DEFAULT_TRACE = 1 DEFAULT_TRACE_TYPE = TRACE_TYPE_NORMAL DEFAULT_TRACE_AVG = 100 MAX_TRACE_AVG = 1000 centre_freq_changed = pyqtSignal(float) start_freq_changed = pyqtSignal(float) stop_freq_changed = pyqtSignal(float) freq_span_changed = pyqtSignal(float) auto_rbw_changed = pyqtSignal(bool) rbw_changed = pyqtSignal(float) ref_level_changed = pyqtSignal(float) ampl_scale_changed = pyqtSignal(float) atten_changed = pyqtSignal(float) fe_atten_changed = pyqtSignal(float) fe_gain_changed = pyqtSignal(float) felo_offset_changed = pyqtSignal(float) invoke_single_sweep = pyqtSignal() invoke_sweep_count = pyqtSignal(int) modify_marker_state = pyqtSignal(int, str) modify_marker_delta_type = pyqtSignal(int, int) modify_detector_state = pyqtSignal(int) modify_trace_state = pyqtSignal(int, int) move_marker = pyqtSignal(int) selected_trace_changed = pyqtSignal(int) trace_avg_changed = pyqtSignal(int) invoke_peak_search = pyqtSignal(bool) invoke_next_peak = pyqtSignal(bool) spectrum_plot_updated = pyqtSignal() def __init__(self, fe_chan_id: str, app, frontend: FrontEndClient, adc_device): super().__init__() self._fe_chan_id: str = fe_chan_id self._app = app self._frontend: FrontEndClient = frontend self.capabilities = frontend.capabilities[fe_chan_id] self._adc_device = adc_device self.adc_thread = None self._min_plot_width: int = self.app.MIN_PLOT_WIDTH self._min_plot_height: int = self.app.MIN_PLOT_HEIGHT self._min_ampl_scale: float = 1.0 self._max_ampl_scale: float = 20.0 self._detector_type: int = AnalyzerChannel.DEFAULT_DETECTOR_TYPE self._trace_avg: int = AnalyzerChannel.DEFAULT_TRACE_AVG self._trace_type: int = AnalyzerChannel.DEFAULT_TRACE_TYPE self._selected_trace: int = AnalyzerChannel.DEFAULT_TRACE self._is_selected = False self.is_calibrating = False self._show_noise_floor = False self._show_waterfall = False self.initialize_frequency_params() self.initialize_amplitude_params() self.updatePlotMutex = QMutex() self.plotUpdatedCond = QWaitCondition() self._final_sweep = True self.spectrum_plot_updated.connect(self.plot_updated) self.spectrum_plot: SpectrumPlotWidget = SpectrumPlotWidget(app, self) self.move_marker.connect(self.move_selected_marker) self.waterfall_plot: WaterfallPlotWidget = WaterfallPlotWidget(app, self) self.create_adc_thread() @property def fe_chan_id(self) -> str: return self._fe_chan_id @property def fe(self) -> FrontEndClient: return self._frontend @property def fe_attenuation(self): return self.capabilities.fe_attenuation @property def app(self): return self._app @property def adc_chan_id(self) -> int: return self.fe.capabilities[self.fe_chan_id].adcdma_channel @property def chan_capabilities(self) -> Dict: return self.fe.capabilities[self.fe_chan_id] @property def adc_chan_type(self) -> int: return self.fe.capabilities[self.fe_chan_id].chan_type @property def chan_cal_data(self): if self.adc_thread is not None: return self.adc_thread.cal_data._cal_config[self.adc_chan_id] else: return None @property def is_selected(self) -> bool: return self._is_selected def create_adc_thread(self) -> None: if self.adc_thread is not None: self.adc_thread.stop() self.data_store = DataStore(averages=AnalyzerChannel.DEFAULT_TRACE_AVG) self.data_store.selected_trace_type = AnalyzerChannel.DEFAULT_TRACE_TYPE self.data_store.data_updated.connect( self.spectrum_plot.update_plot) self.data_store.data_updated.connect( self.waterfall_plot.update_plot) self._sweep_data_reset_required = False self.adc_thread = AdcThread(self._adc_device, self) self.adc_thread.adc_thread_started.connect( self.app.on_adc_thread_started) self.adc_thread.adc_thread_stopped.connect( self.app.on_adc_thread_stopped) self.adc_thread.cal_state_changed.connect(self.calibrate) self.adc_thread.sweep_started.connect(self.sweep_started) self.adc_thread.enable_sweep_progress.connect(self.enable_sweep_progress) self.adc_thread.show_sweep_progress.connect(self.set_sweep_progress) def start(self, bin_width: float=0.0, sweep_count=0, single_sweep: bool=False) -> None: if self._sweep_data_reset_required: self.data_store.reset_trace() self._sweep_data_reset_required = False self.adc_thread.set_sweep_parameters( self.start_freq, self.stop_freq, scaling='spectrum', single=single_sweep, sweep_count=sweep_count) self.adc_thread.start() def stop(self) -> None: self.spectrum_plot.set_sweep_progress(0.0) if self.adc_thread and self.adc_thread.running is True: self.adc_thread.stop() @property def running(self) -> bool: if self.adc_thread: return self.adc_thread.running else: return False def close(self) -> None: if self.adc_thread is not None: self.adc_thread.close() self.adc_thread = None def select(self) -> None: """Select the current channel as being active.""" self.centre_freq_changed.connect(self.app._update_centre_freq) self.start_freq_changed.connect(self.app._update_start_freq) self.stop_freq_changed.connect(self.app._update_stop_freq) self.freq_span_changed.connect(self.app._update_freq_span) self.rbw_changed.connect(self.app._update_rbw) self.auto_rbw_changed.connect(self.app._update_auto_rbw) self.ref_level_changed.connect(self.app._update_ref_level) self.ampl_scale_changed.connect(self.app._update_ampl_scale) self.atten_changed.connect(self.app._update_atten) self.fe_atten_changed.connect(self.app._update_fe_atten) self.fe_gain_changed.connect(self.app._update_fe_gain) self.felo_offset_changed.connect(self.app._update_felo_offset) self.trace_avg_changed.connect(self.app._update_trace_average) self.invoke_single_sweep.connect(self.app.single_btn_clicked) self.invoke_sweep_count.connect(self.app.start_sweep_count) self.modify_marker_state.connect(self.app.update_marker_type) self.modify_marker_delta_type.connect(self.app.update_marker_delta_type) self.modify_detector_state.connect(self.app.update_detector_type) self.modify_trace_state.connect(self.app.update_trace_type) self.selected_trace_changed.connect(self.app.update_selected_trace) self.invoke_peak_search.connect(self.app.peak_search) self.invoke_next_peak.connect(self.app.next_peak) self._is_selected = True def deselect(self) -> None: """Make the current channel inactive""" self._is_selected = False self.centre_freq_changed.disconnect(self.app._update_centre_freq) self.start_freq_changed.disconnect(self.app._update_start_freq) self.stop_freq_changed.disconnect(self.app._update_stop_freq) self.freq_span_changed.disconnect(self.app._update_freq_span) self.rbw_changed.disconnect(self.app._update_rbw) self.auto_rbw_changed.disconnect(self.app._update_auto_rbw) self.ref_level_changed.disconnect(self.app._update_ref_level) self.ampl_scale_changed.disconnect(self.app._update_ampl_scale) self.atten_changed.disconnect(self.app._update_atten) self.fe_atten_changed.disconnect(self.app._update_fe_atten) self.fe_gain_changed.disconnect(self.app._update_fe_gain) self.felo_offset_changed.disconnect(self.app._update_felo_offset) self.trace_avg_changed.disconnect(self.app._update_trace_average) self.invoke_sweep_count.disconnect(self.app.start_sweep_count) self.invoke_single_sweep.disconnect(self.app.single_btn_clicked) self.selected_trace_changed.disconnect(self.app.update_selected_trace) self.modify_trace_state.disconnect(self.app.update_trace_type) self.modify_detector_state.disconnect(self.app.update_detector_type) self.modify_marker_state.disconnect(self.app.update_marker_type) self.modify_marker_delta_type.disconnect(self.app.update_marker_delta_type) self.invoke_peak_search.disconnect(self.app.peak_search) self.invoke_next_peak.disconnect(self.app.next_peak) def initialize_frequency_params(self) -> None: self._centre_freq: float = ( (self.max_freq - self.min_freq) / 2) + self.min_freq self._start_freq: float = self._centre_freq - (self.max_span / 2) self._stop_freq: float = self._centre_freq + (self.max_span / 2) self._freq_span: float = self.fe.freq_window(self.fe_chan_id) self._auto_rbw: bool = True self._rbw: float = RbwSpinBox.ACCEPTED_RBW_VALUES[-1] self.update_felo_offset() def initialize_amplitude_params(self): self._reflevel: float = 0.0 self._atten: float = AnalyzerChannel.DEFAULT_ADC_ATTEN self._adc_device.set_channel_attenuation(self.adc_chan_id, self._atten) self._fe_atten: float = 0.0 self._fe_gain: float = 0.0 if self.fe_attenuation is not None: self._fe_atten: float = self._frontend.fe_atten(self.fe_chan_id) self._auto_att: bool = False self._ampl_scale: float = 10.0 def add_plots(self, layout): self.spectrum_plot.add_plot(layout) if self.show_waterfall: self.waterfall_plot.add_plot(layout) def remove_plots(self, layout): self.spectrum_plot.remove_plot(layout) if self.show_waterfall: self.waterfall_plot.remove_plot(layout) @pyqtSlot(int) def move_selected_marker(self, idx): if self.spectrum_plot.selected_mkr is not None: self.spectrum_plot.move_current_marker(idx) @pyqtSlot() def plot_updated(self): if self._final_sweep is True: self.plotUpdatedCond.wakeAll() @pyqtSlot(int, int) def sweep_started(self, reqd_sweeps: int, sweep_count: int) -> None: self._final_sweep = True if reqd_sweeps > 0: if (sweep_count + 1) == reqd_sweeps: self._final_sweep = True else: self._final_sweep = False @pyqtSlot(int) def enable_sweep_progress(self, enable: int): self.spectrum_plot.enable_sweep_progress(enable) @pyqtSlot(float) def set_sweep_progress(self, progress: float): self.spectrum_plot.set_sweep_progress(progress) def update_freq_values(self, start: Optional[float]=None, stop: Optional[float]=None, centre: Optional[float]=None, span: Optional[float]=None) -> None: """Keeps the start, stop, centre and span values consistent. """ if start is not None: stop = self.stop_freq if start < self.min_freq: start = self.min_freq elif start > self.max_freq - self.min_span: start = self.max_freq - self.min_span span = stop - start if span < self.min_span: span = self.min_span if span > self.max_span: span = self.max_span stop = start + span centre = start + (span/2) elif stop is not None: start = self.start_freq if stop < self.min_freq + self.min_span: stop = self.min_freq + self.min_span elif stop > self.max_freq: stop = self.max_freq span = stop - start if span < self.min_span: span = self.min_span if span > self.max_span: span = self.max_span start = stop - span centre = start + (span/2) elif centre is not None: # Keep the span constant if possible start = self.start_freq stop = self.stop_freq span = self.freq_span if centre < self.min_freq + self.min_span/2: centre = self.min_freq + self.min_span/2 elif centre > self.max_freq - (self.min_span/2): centre = self.max_freq - (self.min_span/2) stop = centre + (span/2) start = centre - (span/2) if stop > self.max_freq: stop = self.max_freq span = 2 * (stop - centre) start = centre - (span/2) if start < self.min_freq: start = self.min_freq span = 2 * (centre - start) stop = centre + (span/2) elif span is not None: # Keep the centre freq. constant if possible centre = self.centre_freq if span < self.min_span: span = self.min_span elif span > self.max_span: span = self.max_span stop = centre + (span/2) start = centre - (span/2) if stop > self.max_freq: stop = self.max_freq span = stop - start centre = start + (span/2) start = stop - span if start < self.min_freq: start = self.min_freq span = stop - start centre = start + (span/2) else: return # Reset the data store only if the sweep parameters are going # to change. Reset should be carried out just prior to # capturing the next set of sweep data. if ((start != self._start_freq) or (stop != self._stop_freq) or (centre != self.centre_freq) or (span != self._freq_span)): self._sweep_data_reset_required = True self._start_freq = start self._stop_freq = stop self._centre_freq = centre self._freq_span = span self.update_felo_offset() if self._sweep_data_reset_required: self.adc_thread.data_store_reset(start, stop) self._sweep_data_reset_required = False def update_rbw(self, rbw: float) -> None: self.adc_thread.data_store_reset(self._start_freq, self._stop_freq) if self.is_selected is True: self.app.set_nearest_accepted_rbw(rbw) def update_felo_offset(self): adc_centre_freq = (RP_ADC_MIN_FREQUENCY + ((RP_ADC_MAX_FREQUENCY - RP_ADC_MIN_FREQUENCY)/2)) self._min_felo_offset = 0.0 if self._freq_span > RP_ADC_MAX_SPAN: self._max_felo_offset = (adc_centre_freq - (self.fe.freq_window(self.fe_chan_id)/2)) self._felo_offset = self._min_felo_offset else: self._max_felo_offset = adc_centre_freq - (self._freq_span/2) self._felo_offset = self._max_felo_offset def update_displayed_felo_offset(self, w) -> None: if self.adc_chan_type == ChannelCapabilities.SINGLE: w.setEnabled(True) w.setRange(self.min_felo_offset, self.max_felo_offset) w.setValue(self.felo_offset) def update_displayed_fspan(self, w, f: float) -> None: if f < 0.001: f_disp = f * 1e6 f_suffix = FREQ_SUFFIXES[2] w.setRange(self.min_span * 1e6, self.max_freq * 1e6) elif f < 1.0: f_disp = f * 1e3 f_suffix = FREQ_SUFFIXES[1] w.setRange(self.min_span * 1e3, self.max_freq * 1e3) else: f_disp = f f_suffix = FREQ_SUFFIXES[0] w.setRange(self.min_span, self.max_freq) w.setValue(f_disp) w.setSuffix(f_suffix) def update_displayed_freq(self, w, f: float) -> None: if f < 0.001: f_disp = f * 1e6 f_suffix = FREQ_SUFFIXES[2] w.setRange(self.min_freq * 1e6, self.max_freq * 1e6) elif f < 1.0: f_disp = f * 1e3 f_suffix = FREQ_SUFFIXES[1] w.setRange(self.min_freq * 1e3, self.max_freq * 1e3) else: f_disp = f f_suffix = FREQ_SUFFIXES[0] w.setRange(self.min_freq, self.max_freq) w.setValue(f_disp) w.setSuffix(f_suffix) <<chan-noise-floor-cal-methods>> <<chan-freq-resp-cal-methods>> <<load-save-cal-data>> @pyqtSlot(int) def calibrate(self, cal_state: int) -> None: print(f'calibrate: {cal_state=}') self.stop() self.adc_thread.cal_state = cal_state if cal_state == AdcThread.CAL_START: self.show_calibrating_msg() self.app.enable_controls(False) print(' CAL_START') self.start() elif cal_state == AdcThread.CALIBRATING: # Display dialog for start of calibration for the channel calibration_dialog = CalibrationDialog(self.app) result = calibration_dialog.exec_() if result == QDialog.Accepted: self.adc_thread.cal_device = calibration_dialog.calibration_device self.adc_thread.cal_device_addr = calibration_dialog.calibration_device_addr self.start() else: self.adc_thread.cal_state = AdcThread.CAL_COMPLETE self.remove_calibrating_msg() self.app.enable_controls(True) else: print(' CAL_COMPLETE') # Do a single sweep self.remove_calibrating_msg() self.start(single_sweep=True) self.app.enable_controls(True) def show_calibrating_msg(self): self.is_calibrating = True self.spectrum_plot.update_calibrating_msg() def remove_calibrating_msg(self): self.is_calibrating = False self.spectrum_plot.update_calibrating_msg() @property def start_freq(self) -> float: return self._start_freq @start_freq.setter def start_freq(self, f: float) -> None: if self._start_freq != f: self.start_freq_changed.emit(f) @property def stop_freq(self) -> float: return self._stop_freq @stop_freq.setter def stop_freq(self, f: float) -> None: if self._stop_freq != f: self.stop_freq_changed.emit(f) @property def centre_freq(self) -> float: return self._centre_freq @centre_freq.setter def centre_freq(self, f: float) -> None: if self._centre_freq != f: self.centre_freq_changed.emit(f) @property def freq_span(self) -> float: return self._freq_span @freq_span.setter def freq_span(self, f: float) -> None: if self._freq_span != f: self.freq_span_changed.emit(f) @property def rbw(self) -> float: return self._rbw @rbw.setter def rbw(self, value: float) -> None: if self._rbw != value: self._rbw = value self.rbw_changed.emit(value) @property def auto_rbw(self) -> bool: return self._auto_rbw @auto_rbw.setter def auto_rbw(self, state: bool) -> None: if self._auto_rbw != state: self._auto_rbw = state self.auto_rbw_changed.emit(state) @property def reflevel(self) -> float: return self._reflevel @reflevel.setter def reflevel(self, p: float) -> None: if self._reflevel != p: self._reflevel = p self.ref_level_changed.emit(p) @property def ampl_scale(self) -> float: return self._ampl_scale @ampl_scale.setter def ampl_scale(self, s: float) -> None: if self._ampl_scale != s: self._ampl_scale = s self.ampl_scale_changed.emit(s) @property def atten(self) -> float: return self._atten @atten.setter def atten(self, p: float) -> None: if self._atten != p: self._atten = p self.atten_changed.emit(p) @property def fe_atten(self) -> float: return self._fe_atten @fe_atten.setter def fe_atten(self, p: float) -> None: if self._fe_atten != p: self._fe_atten = p self.fe_atten_changed.emit(p) @property def fe_gain(self) -> float: return self._fe_gain @fe_gain.setter def fe_gain(self, p: float) -> None: if self._fe_gain != p: self._fe_gain = p self.fe_gain_changed.emit(p) @property def felo_offset(self) -> float: return self._felo_offset @felo_offset.setter def felo_offset(self, offset: float) -> None: if self._felo_offset != offset: self._felo_offset = offset self.felo_offset_changed.emit(offset) @property def min_felo_offset(self) -> float: return self._min_felo_offset @property def max_felo_offset(self) -> float: return self._max_felo_offset @property def detector_type(self): return self._detector_type @detector_type.setter def detector_type(self, t): self._detector_type = t @property def trace_type(self): return self.data_store.selected_trace_type @trace_type.setter def trace_type(self, t): self._trace_type = t self.data_store.selected_trace_type = t @property def selected_trace(self) -> int: return self._selected_trace @selected_trace.setter def selected_trace(self, trace_id: int) -> None: self._selected_trace = trace_id self.data_store.selected_trace_id = trace_id self.app.update_trace_type(trace_id, self.data_store.selected_trace_type) @property def trace_average(self): return self._trace_avg @trace_average.setter def trace_average(self, avg): if self._trace_avg != avg: self._trace_avg = avg self.data_store.averages = avg self.trace_avg_changed.emit(avg) @property def min_freq(self) -> float: if self.fe.calibration_mode == NO_CAL: return self.capabilities.freq.min else: return self.capabilities.cal_freq.min @property def max_freq(self) -> float: if self.fe.calibration_mode == NO_CAL: return self.capabilities.freq.max else: return self.capabilities.cal_freq.max @property def min_span(self) -> float: return self.capabilities.min_span @property def max_span(self) -> float: #return self.fe.freq_window(self.fe_chan_id) return self.max_freq - self.min_freq @property def min_reflevel(self) -> float: return self.capabilities.reflevel.min @property def max_reflevel(self) -> float: return self.capabilities.reflevel.max @property def reflevel_step(self) -> float: return self.capabilities.reflevel.step @property def min_atten(self) -> float: return self.capabilities.adc_attenuation.min @property def max_atten(self) -> float: return self.capabilities.adc_attenuation.max @property def atten_step(self) -> float: return self.capabilities.adc_attenuation.step @property def min_ampl_scale(self) -> float: return self._min_ampl_scale @property def max_ampl_scale(self) -> float: return self._max_ampl_scale @property def show_noise_floor(self) -> bool: return self._show_noise_floor @show_noise_floor.setter def show_noise_floor(self, state) -> None: if self._show_noise_floor != state: self._show_noise_floor = state if state: self.spectrum_plot.add_noise_floor_line() else: self.spectrum_plot.remove_noise_floor_line() @property def show_waterfall(self) -> bool: return self._show_waterfall @show_waterfall.setter def show_waterfall(self, state) -> None: if self._show_waterfall != state: self._show_waterfall = state if state: self.waterfall_plot.add_plot(self.app._graphic_layout) self.spectrum_plot.set_waterfall_display(True) else: self.spectrum_plot.set_waterfall_display(False) self.waterfall_plot.remove_plot(self.app._graphic_layout) @property def constrained_rbw(self): rbw = self.rbw if rbw > RbwSpinBox.ACCEPTED_RBW_VALUES[-1]: rbw = RbwSpinBox.ACCEPTED_RBW_VALUES[-1] if rbw < RbwSpinBox.ACCEPTED_RBW_VALUES[0]: rbw = RbwSpinBox.ACCEPTED_RBW_VALUES[0] return rbw @property def noise_floor(self) -> float: return (float(self.fe.noise_floor(self.fe_chan_id, self.constrained_rbw)) + self.fe_atten + self.atten - self.fe_gain) @property def noise_stddev(self) -> float: return (self.noise_floor + self.fe.noise_stddev(self.fe_chan_id, self.constrained_rbw)) @property def apply_freq_resp_correction(self) -> bool: return self.adc_thread.apply_freq_resp_corrections @apply_freq_resp_correction.setter def apply_freq_resp_correction(self, state: bool) -> None: if state != self.adc_thread.apply_freq_resp_corrections: self.adc_thread.apply_freq_resp_corrections = True if state else False self.spectrum_plot.update_nocorrection_msg()
SpectrumPlotWidget Class
from typing import ( Dict, List, Tuple ) import sys from math import log10 from datetime import datetime import numpy as np from scipy import signal from PyQt5.QtCore import ( Qt, QObject, QTimer, QPointF, pyqtSlot ) from PyQt5.QtGui import ( QFont, QFontMetrics ) import pyqtgraph as pg import time from utils import ( format_freq, TRACE_TYPE_NORMAL, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG, TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE, DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) <<spectrum-marker>> class SpectrumPlotWidget(QObject): """Main spectrum plot""" PLOT_LINE_WIDTH = 1 MACOS_PLOT_LINE_WIDTH = 2 MKR_COUNT = 4 TRACE_COUNT = 4 PLOT_COLORS = [(52,138,189), (183,67,49), (142,186,66), (251,193,94), (152,142,213), (255,181,184), (119,119,119)] MKR_COLORS = [(255,153,153), (255,204,153), (255,255,153), (204,255,153), (153,255,153), (153,255,204), (153,255,255), (153,204,255), (153,153,255), (204,153,255), (255,153,255), (255,153,204), (224,224,224)] LIGHT_BACKGROUND_COLOR = 1.0 DARK_BACKGROUND_COLOR = 0.2 BORDER_COLOR = 0.9 BORDER_WIDTH = 1 SELECTED_BORDER_COLOR = 0.5 SELECTED_BORDER_WIDTH = 2 SWEEP_PROGRESS_COLOR = "#F006" SWEEP_PROGRESS_WIDTH = 5 TITLE_FONT_SIZE = 20 DEFAULT_FONT_SIZE = 9 SPLIT_FONT_SIZE = 7 RESIZE_EVENT_TIMEOUT = 400 default_annotation_font = QFont('Arial', pointSize=DEFAULT_FONT_SIZE) split_annotation_font = QFont('Arial', pointSize=SPLIT_FONT_SIZE) PEAK_THRESHOLD = 10.0 PEAK_WIDTH = 1.0 # Separation between lines in the plot annotations. # These are in units of 1/10 of the plot y-scale DEFAULT_LINE_SEP = 0.3 SPLIT_LINE_SEP = 0.4 HIGH_PREC_SPAN = 0.05 def __init__(self, app, chan): super().__init__() self._app = app self._chan = chan self.peaks = [] self._mkr_table = [] self._selected_mkr = None self._selected_mkr_id = 0 self._line_sep = SpectrumPlotWidget.DEFAULT_LINE_SEP self.data = None self.main_curve = True self.curve_colors = [pg.mkColor(SpectrumPlotWidget.PLOT_COLORS[i]) for i in range(SpectrumPlotWidget.TRACE_COUNT)] self._annotation_font = SpectrumPlotWidget.default_annotation_font self._annotation_font_metrics = QFontMetrics(self._annotation_font) self.create_plot() @property def app(self): return self._app @property def chan(self): return self._chan @property def scene(self): return self._plot.scene() @property def viewbox(self): return self._plot.getViewBox() @property def selected_mkr(self): return self._selected_mkr @selected_mkr.setter def selected_mkr(self, m) -> None: self._selected_mkr = m @property def annotation_font(self): return self._annotation_font @property def annotation_line_spacing(self): return self._annotation_font_metrics.lineSpacing() def create_plot(self): self._plot = pg.PlotItem() self.viewbox.setMenuEnabled(False) self.viewbox.setBackgroundColor(SpectrumPlotWidget.DARK_BACKGROUND_COLOR) self._plot.setMouseEnabled(x=False, y=False) self._plot.showAxis('right', True) self._plot.getAxis('right').setStyle(showValues=False) self._plot.showAxis('top', True) self._plot.getAxis('top').setStyle(showValues=False) self._plot.showGrid(True, True, alpha=0.15) #self._plot.setLogMode(x=True) x_axis = self._plot.getAxis('bottom') x_axis.setStyle(showValues=False) self.ref_text = pg.TextItem() self.rbw_text = pg.TextItem() self.fe_lo_text = pg.TextItem() self.atten_text = pg.TextItem() self.fe_atten_text = pg.TextItem() self.startf_text = pg.TextItem() self.fspan_text = pg.TextItem() self.stopf_text = pg.TextItem() self.mkr_pwr_text = pg.TextItem() self.mkr_freq_text = pg.TextItem() self.delta_pwr_text = pg.TextItem() self.delta_freq_text = pg.TextItem() self.cal_msg_text = pg.TextItem() self.no_correct_msg_text = pg.TextItem() self.trace_avg_text = pg.TextItem() self.channel_text = pg.TextItem() self.sweep_progress = pg.GraphItem() self._sweep_progress = 0.0 self.channel_text.setFont(QFont('Arial', SpectrumPlotWidget.TITLE_FONT_SIZE)) self.set_annotation_font(SpectrumPlotWidget.default_annotation_font) self._plot.addItem(self.ref_text) self._plot.addItem(self.rbw_text) self._plot.addItem(self.fe_lo_text) self._plot.addItem(self.atten_text) self._plot.addItem(self.fe_atten_text) self._plot.addItem(self.startf_text) self._plot.addItem(self.fspan_text) self._plot.addItem(self.stopf_text) self._plot.addItem(self.mkr_pwr_text) self._plot.addItem(self.mkr_freq_text) self._plot.addItem(self.delta_pwr_text) self._plot.addItem(self.delta_freq_text) self._plot.addItem(self.cal_msg_text) self._plot.addItem(self.no_correct_msg_text) self._plot.addItem(self.trace_avg_text) self._plot.addItem(self.channel_text) #self._plot.addItem(self.sweep_progress) self.sweep_progress.setZValue(1000) self._mkr_table = [SpectrumMarker(self, i+1) for i in range(SpectrumPlotWidget.MKR_COUNT)] self.selected_mkr = self._mkr_table[0] self.selected_mkr.is_selected = True self._delta_mkr_type = SpectrumMarker.REF_DELTA_TYPE self._mkr_function = SpectrumMarker.MKR_FN_OFF self.noise_floor_line = pg.InfiniteLine( pen=pg.mkPen('r', style=Qt.DashLine), pos=-60.0, angle=0) self.noise_stddev_line = pg.InfiniteLine( pen=pg.mkPen('g', style=Qt.DashLine), pos=-50.0, angle=0) self.create_main_curves() self._last_time = time.time() self._fps = None self._ctl_pressed = False self._alt_pressed = False self._resize_timer = QTimer() self._resize_timer.setSingleShot(True) self._resize_timer.setInterval(SpectrumPlotWidget.RESIZE_EVENT_TIMEOUT) self._resize_timer.timeout.connect(self.resize_plot) self._plot.geometryChanged.connect(self.plot_resized) @pyqtSlot() def plot_resized(self): if self._resize_timer.isActive() is True: return self._resize_timer.start() @pyqtSlot() def resize_plot(self): self.update_plot_annotations() def add_plot(self, layout): layout.addItem(self._plot, row=0, col=0) self.enable_mouse_events(True) self.update_plot_annotations() self.update_markers() self._plot.geometryChanged.connect(self.plot_resized) def add_split_plot(self, layout, row=0, col=0): layout.addItem(self._plot, row, col) self.update_plot_annotations() self.update_markers() self._plot.geometryChanged.connect(self.plot_resized) def remove_plot(self, layout): self._plot.geometryChanged.disconnect(self.plot_resized) self.enable_mouse_events(False) layout.removeItem(self._plot) def remove_split_plot(self, layout): self._plot.geometryChanged.disconnect(self.plot_resized) layout.removeItem(self._plot) def create_main_curves(self): """Create main spectrum curve for each trace""" line_width = SpectrumPlotWidget.PLOT_LINE_WIDTH if sys.platform == "darwin": line_width = SpectrumPlotWidget.MACOS_PLOT_LINE_WIDTH self.curves = [ self._plot.plot(pen=pg.mkPen(self.curve_colors[i], width=line_width)) for i in range(SpectrumPlotWidget.TRACE_COUNT)] for curve in self.curves: curve.setZValue(900) def enable_mouse_events(self, en): if en: self.scene.sigMouseMoved.connect(self.mouse_moved) else: try: self.scene.sigMouseMoved.disconnect() except TypeError as te: print(te) def set_waterfall_display(self, show_wf): if not self.app._split_display: if show_wf: self._line_sep = SpectrumPlotWidget.SPLIT_LINE_SEP self.set_annotation_font(SpectrumPlotWidget.split_annotation_font) else: self._line_sep = SpectrumPlotWidget.DEFAULT_LINE_SEP self.set_annotation_font(SpectrumPlotWidget.default_annotation_font) for mkr in self._mkr_table: mkr.set_split_display(show_wf) def set_split_display(self, is_split): if is_split: self._line_sep = SpectrumPlotWidget.SPLIT_LINE_SEP self.set_annotation_font(SpectrumPlotWidget.split_annotation_font) else: self._line_sep = SpectrumPlotWidget.DEFAULT_LINE_SEP self.set_annotation_font(SpectrumPlotWidget.default_annotation_font) for mkr in self._mkr_table: mkr.set_split_display(is_split) def set_annotation_font(self, font): self._annotation_font = font self._annotation_font_metrics = QFontMetrics(font) self.ref_text.setFont(font) self.rbw_text.setFont(font) self.fe_lo_text.setFont(font) self.atten_text.setFont(font) self.fe_atten_text.setFont(font) self.startf_text.setFont(font) self.fspan_text.setFont(font) self.stopf_text.setFont(font) self.mkr_pwr_text.setFont(font) self.mkr_freq_text.setFont(font) self.delta_pwr_text.setFont(font) self.delta_freq_text.setFont(font) self.cal_msg_text.setFont(font) self.no_correct_msg_text.setFont(font) def mouse_moved(self, pos): if self._selected_mkr is not None and self.data is not None: if ((self._alt_pressed is True) and (self._selected_mkr.is_selected)): mouse_pt = self.viewbox.mapSceneToView(pos) idx = (np.abs(self.data.freq - mouse_pt.x())).argmin() self.move_current_marker(idx) def key_pressed(self, key_ev): if key_ev.key() == Qt.Key_Meta: self._ctl_pressed = True elif key_ev.key() == Qt.Key_Alt: self._alt_pressed = True elif key_ev.key() == Qt.Key_Z: self.zoom_freq() if self._ctl_pressed is True: if self.selected_mkr and self.data: if self.selected_mkr.is_selected: if key_ev.key() == Qt.Key_Left: self.move_current_marker_left() elif key_ev.key() == Qt.Key_Right: self.move_current_marker_right() def key_released(self, key_ev): if key_ev.key() == Qt.Key_Meta: self._ctl_pressed = False elif key_ev.key() == Qt.Key_Alt: self._alt_pressed = False def update_plot(self, data_storage): """Update main spectrum curve """ self.data = data_storage for i in range(SpectrumPlotWidget.TRACE_COUNT): if self.data.trace_types[i] == TRACE_TYPE_BLANK: self.curves[i].setData(np.array([]), np.array([])) continue else: spectrum = self.data.spectra[i] if ((spectrum.freq is not None) and (spectrum.pwr is not None)): self.curves[i].setData(spectrum.freq, spectrum.pwr) if i+1 == self.chan.selected_trace: self.update_markers() self.update_rbw_annotations() self.update_trace_annotations(data_storage) self.chan.spectrum_plot_updated.emit() def update_markers(self): for mkr in self._mkr_table: mkr.update_marker() def peak_search(self, checked): if (self.data is not None and self.selected_mkr and self.selected_mkr.is_enabled): self.peaks = self.find_peaks(self.data.pwr) if self.peaks and len(self.peaks): self.move_current_marker(self.peaks[0]) def next_peak(self, checked): if len(self.peaks) == 0: return if self.selected_mkr and self.selected_mkr.is_enabled: try: idx = self.peaks.index(self.selected_mkr.data_index) except ValueError: return idx += 1 if idx >= len(self.peaks): return self.move_current_marker(self.peaks[idx]) def peak_right(self, checked): if len(self.peaks) == 0: return if self.selected_mkr and self.selected_mkr.is_enabled: pk_arr = np.array(self.peaks) res_arr = np.where(pk_arr > self.selected_mkr.data_index) if len(res_arr[0]): self.move_current_marker(self.peaks[res_arr[0][0]]) def peak_left(self, checked): if len(self.peaks) == 0: return if self.selected_mkr and self.selected_mkr.is_enabled: pk_arr = np.array(self.peaks) res_arr = np.where(pk_arr < self.selected_mkr.data_index) if len(res_arr[0]): self.move_current_marker(self.peaks[res_arr[0][-1]]) def find_peaks(self, data): """Return a list of signal peak indexes. The indexes are sorted in descending order of the signal power at each index. """ if data is None: return [] peaks, _ = signal.find_peaks( data, prominence=SpectrumPlotWidget.PEAK_THRESHOLD, width=SpectrumPlotWidget.PEAK_WIDTH) sorted_peaks = sorted(peaks, key=lambda idx: data[idx], reverse=True) if sorted_peaks: p = sorted_peaks[0] return sorted_peaks def zoom_freq(self): if self.selected_mkr and self.selected_mkr.is_enabled: mkr = self.selected_mkr if mkr.marker_type == SpectrumMarker.MKR_TYPE_SPAN: self.chan.start_freq = mkr.freq self.chan.stop_freq = mkr.aux_freq self.chan.start(single_sweep=True) @property def selected_mkr_type(self): return self.selected_mkr.marker_type @property def selected_mkr_id(self): return self._selected_mkr_id @property def mkr_function(self): return self._mkr_function @property def delta_mkr_type(self): return self._delta_mkr_type def marker(self, mkr_id): return self._mkr_table[mkr_id] def select_marker(self, mkr_id): if len(self._mkr_table) >= mkr_id: if self._selected_mkr_id != mkr_id: if self.selected_mkr: self.selected_mkr.is_selected = False self._selected_mkr_id = mkr_id mkr = self._mkr_table[mkr_id] self.selected_mkr = mkr mkr.is_selected = True self.update_marker_annotations() def select_marker_type(self, type_idx): mkr = self._mkr_table[self._selected_mkr_id] new_marker_type = SpectrumMarker.MKR_TYPES[type_idx] if new_marker_type == SpectrumMarker.MKR_TYPE_OFF: mkr.is_enabled = False mkr.marker_type = new_marker_type else: mkr.marker_type = new_marker_type mkr.is_enabled = True self.selected_mkr = mkr mkr.is_selected = True mkr.update_marker() self.update_marker_annotations() def select_delta_marker(self, mkr_id): self._delta_mkr_type = mkr_id def select_mkr_fn(self, mkr_id): self._mkr_function = mkr_id self.update_marker_annotations() def move_current_marker(self, idx): mkr_type = self._selected_mkr.marker_type if mkr_type == SpectrumMarker.MKR_TYPE_DELTA: if self._delta_mkr_type == SpectrumMarker.DELTA_DELTA_TYPE: self._selected_mkr.move_aux_marker(idx) else: self._selected_mkr.move_marker(idx) else: self._selected_mkr.move_marker(idx) def move_current_marker_right(self): mkr_type = self._selected_mkr.marker_type if mkr_type == SpectrumMarker.MKR_TYPE_DELTA: if self._delta_mkr_type == SpectrumMarker.DELTA_DELTA_TYPE: self.selected_mkr.move_aux_marker_right() else: self.selected_mkr.move_marker_right() else: self.selected_mkr.move_marker_right() def move_current_marker_left(self): mkr_type = self._selected_mkr.marker_type if mkr_type == SpectrumMarker.MKR_TYPE_DELTA: if self._delta_mkr_type == SpectrumMarker.DELTA_DELTA_TYPE: self.selected_mkr.move_aux_marker_left() else: self.selected_mkr.move_marker_left() else: self.selected_mkr.move_marker_left() def index_from_freq(self, f: float) -> int: if f < self.data.freq[0]: f = self.data.freq[0] if f > self.data.freq[-1]: f = self.data.freq[-1] return (np.abs(self.data.freq - f)).argmin() def add_noise_floor_line(self): self._plot.addItem(self.noise_floor_line) self._plot.addItem(self.noise_stddev_line) self.update_noise_floor_line() def remove_noise_floor_line(self): self._plot.removeItem(self.noise_stddev_line) self._plot.removeItem(self.noise_floor_line) def update_plot_annotations(self): rl_low = self.chan.reflevel - 10*self.chan.ampl_scale try: y_pixel_size = self._plot.getViewBox().viewPixelSize()[1] except TypeError: return line_spacing = y_pixel_size * self.annotation_line_spacing self._plot.setXRange(self.chan.start_freq, self.chan.stop_freq, padding=0.0) self._plot.setYRange(rl_low, self.chan.reflevel, padding=0.0) xtick_posns = np.linspace(self.chan.start_freq, self.chan.stop_freq, 11) xticks = [] for f in xtick_posns: xticks.append((f, '')) self._plot.getAxis('bottom').setTicks([xticks]) self._plot.getAxis('top').setTicks([xticks]) self.channel_text.setText(self.chan.fe_chan_id) self.channel_text.setPos(xtick_posns[2], self.chan.reflevel) self.ref_text.setText(f"Ref: {self.chan.reflevel:.2f} dBm") self.ref_text.setPos(xtick_posns[0], self.chan.reflevel) self.atten_text.setText(f"Att: {self.chan.atten} dB") self.atten_text.setPos(xtick_posns[4], self.chan.reflevel) if self.chan.fe_attenuation is not None: self.fe_atten_text.setText(f"FE Att: {self.chan.fe_atten} dB") self.fe_atten_text.setPos( xtick_posns[4], self.chan.reflevel - (1.25 * line_spacing)) else: self.fe_atten_text.setText(f"") if self.chan.freq_span < SpectrumPlotWidget.HIGH_PREC_SPAN: self.startf_text.setText( f"Start: {format_freq(self.chan.start_freq, prec=6)}") else: self.startf_text.setText( f"Start: {format_freq(self.chan.start_freq)}") self.startf_text.setPos(xtick_posns[0], rl_low + (1.5 * line_spacing)) self.fspan_text.setText( f"Span: {format_freq(self.chan.freq_span)}") self.fspan_text.setPos(xtick_posns[4], rl_low + (1.5 * line_spacing)) if self.chan.freq_span < SpectrumPlotWidget.HIGH_PREC_SPAN: self.stopf_text.setText( f"Stop: {format_freq(self.chan.stop_freq, prec=6)}") else: self.stopf_text.setText( f"Stop: {format_freq(self.chan.stop_freq)}") self.stopf_text.setPos(xtick_posns[7], rl_low + (1.5 * line_spacing)) self.update_marker_annotations() self.update_rbw_annotations() self.update_nocorrection_msg() self.update_calibrating_msg() self.update_sweep_progress() def update_marker_annotations(self): if (self.selected_mkr is None or self.data is None or self.data.pwr is None): return if self.selected_mkr.is_enabled: y_pixel_size = self._plot.getViewBox().viewPixelSize()[1] line_spacing = y_pixel_size * self.annotation_line_spacing xtick_posns = np.linspace(self.chan.start_freq, self.chan.stop_freq, 11) mkr_id = self._selected_mkr_id mkr = self.selected_mkr if self._mkr_function == SpectrumMarker.MKR_FN_NOISE: pwr = mkr.pwr - 10*log10(self.data.sweep_params.enbw) pwr_text = f"Mkr{mkr_id+1}: {pwr:.2f} dBm/Hz" else: pwr_text = f"Mkr{mkr_id+1}: {mkr.pwr:.2f} dBm" if self.chan.freq_span < SpectrumPlotWidget.HIGH_PREC_SPAN: freq_text = f" {format_freq(mkr.freq, prec=6)}" else: freq_text = f" {format_freq(mkr.freq)}" self.mkr_pwr_text.setText(pwr_text) self.mkr_pwr_text.setPos(xtick_posns[7], self.chan.reflevel) self.mkr_freq_text.setText(freq_text) self.mkr_freq_text.setPos( xtick_posns[7], self.chan.reflevel - (1.25 * line_spacing)) if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA, SpectrumMarker.MKR_TYPE_SPAN]: pwr_text = f"\u0394Mkr{mkr_id+1}: {mkr.delta_pwr:.2f} dBm" freq_text = f" {format_freq(mkr.delta_freq)}" self.delta_pwr_text.setText(pwr_text) self.delta_freq_text.setText(freq_text) self.delta_pwr_text.setPos( xtick_posns[7], self.chan.reflevel - (3.0 * line_spacing)) self.delta_freq_text.setPos( xtick_posns[7], self.chan.reflevel - (4.25 * line_spacing)) else: self.delta_pwr_text.setText('') self.delta_freq_text.setText('') else: self.mkr_pwr_text.setText('') self.mkr_freq_text.setText('') self.delta_pwr_text.setText('') self.delta_freq_text.setText('') def update_rbw_annotations(self): if (self.data is None or self.data.sweep_params is None): self.rbw_text.setText('') self.fe_lo_text.setText('') return rbw = self.data.sweep_params.rbw/1e6 lo = self.data.sweep_params.fe_lo xtick_posns = np.linspace(self.chan.start_freq, self.chan.stop_freq, 11) y_pixel_size = self._plot.getViewBox().viewPixelSize()[1] line_spacing = y_pixel_size * self.annotation_line_spacing self.rbw_text.setText( f"RBW: {format_freq(rbw)}") self.rbw_text.setPos( xtick_posns[0], self.chan.reflevel - (1.25 * line_spacing)) if lo > 0.0: self.fe_lo_text.setText( f"LO: {format_freq(lo)}") self.fe_lo_text.setPos( xtick_posns[0], self.chan.reflevel - (2.5 * line_spacing)) else: self.fe_lo_text.setText('') self.update_noise_floor_line() def update_trace_annotations(self, data_store): if self.chan.trace_type == TRACE_TYPE_AVG: xtick_posns = np.linspace(self.chan.start_freq, self.chan.stop_freq, 11) rl_low = self.chan.reflevel - 10*self.chan.ampl_scale self.trace_avg_text.setText( f"Avg: {data_store.average_counter:d}") self.trace_avg_text.setPos(xtick_posns[0], rl_low + (7*self.chan.ampl_scale)) else: self.trace_avg_text.setText('') def update_calibrating_msg(self): if self.chan.is_calibrating is True: xtick_posns = np.linspace(self.chan.start_freq, self.chan.stop_freq, 11) self.cal_msg_text.setText("Calibrating...") self.cal_msg_text.setColor('r') self.cal_msg_text.setPos( xtick_posns[0], self.chan.reflevel - (3*self.chan.ampl_scale * self._line_sep)) else: self.cal_msg_text.setText('') def update_nocorrection_msg(self): if not self.chan.apply_freq_resp_correction: xtick_posns = np.linspace(self.chan.start_freq, self.chan.stop_freq, 11) self.no_correct_msg_text.setText("No correction") self.no_correct_msg_text.setColor('r') self.no_correct_msg_text.setPos( xtick_posns[0], self.chan.reflevel - (3*self.chan.ampl_scale * self._line_sep)) else: self.no_correct_msg_text.setText('') def update_noise_floor_line(self): if self.chan.show_noise_floor: self.noise_floor_line.setValue(self.chan.noise_floor) self.noise_stddev_line.setValue(self.chan.noise_stddev) def enable_sweep_progress(self, enable: int): if enable > 0: self._plot.addItem(self.sweep_progress) else: self._plot.removeItem(self.sweep_progress) def set_sweep_progress(self, progress: float): self._sweep_progress = progress self.update_sweep_progress() def update_sweep_progress(self): xtick_posns = np.linspace(self.chan.start_freq, self.chan.stop_freq, 11) rl_low = self.chan.reflevel - 10*self.chan.ampl_scale sweep_pos = (xtick_posns[1] + ((xtick_posns[9] - xtick_posns[1]) * self._sweep_progress)) self.sweep_progress.setData( pos=np.array([[xtick_posns[1], rl_low + (self.chan.ampl_scale)/3], [sweep_pos, rl_low + (self.chan.ampl_scale)/3]]), adj=np.array([[0, 1],]), pen=pg.mkPen(color=pg.mkColor(SpectrumPlotWidget.SWEEP_PROGRESS_COLOR), width=SpectrumPlotWidget.SWEEP_PROGRESS_WIDTH), size=0)
Spectrum marker objects are stored in the
SpectrumPlotWidget._mkr_table
. They are sorted in order of descending
amplitude with the marker at index 0 being positioned at the highest
peak of the spectrum. By default this becomes the currently selected
marker.
class SpectrumMarker: MKR_TYPE_OFF: str = 'Off' MKR_TYPE_NORMAL: str = 'Normal' MKR_TYPE_DELTA: str = 'Delta Pair' MKR_TYPE_SPAN: str = 'Span Pair' MKR_TYPES: List[str] = [MKR_TYPE_OFF, MKR_TYPE_NORMAL, MKR_TYPE_DELTA, MKR_TYPE_SPAN] MKR_FN_OFF: int = 0 MKR_FN_NOISE: int = 1 SELECTED_PEN_COLOR: str = 'r' SELECTED_BRUSH_COLOR: str = 'r' NORMAL_PEN_COLOR = (200,200,200) NORMAL_BRUSH_COLOR = (50,50,200) DEFAULT_MKR_LEN: float = 10.0 SPLIT_MKR_LEN: float = 7.0 DEFAULT_MKR_TIPANGLE: float = 25.0 SPLIT_MKR_TIPANGLE: float = 20.0 DEFAULT_MKRID_OFFSET: float = 30.0 SPLIT_MKRID_OFFSET: float = 23.0 REF_DELTA_TYPE: int = 0 DELTA_DELTA_TYPE: int = 1 def __init__(self, plot_widget, mkr_id: int) -> None: self._plot_widget = plot_widget self._mkr_id: int = mkr_id self._mkr_type: str = SpectrumMarker.MKR_TYPE_OFF self._headlen: float = SpectrumMarker.DEFAULT_MKR_LEN self._tipangle: float = SpectrumMarker.DEFAULT_MKR_TIPANGLE self._is_enabled: bool = False self._is_selected: bool = False self._data_index: int = -1 self._aux_data_index: int = -1 self._mkr = pg.ArrowItem(pos=(0, 0), headLen=self._headlen, tipAngle=self._tipangle) self._mkr_id_text = pg.TextItem() self._mkr_id_text.setText(str(self.marker_id)) self._mkr_id_text.setColor(SpectrumMarker.NORMAL_PEN_COLOR) self._aux_mkr = pg.ArrowItem(pos=(0, 0), headLen=self._headlen, tipAngle=self._tipangle) self._aux_mkr_id_text = pg.TextItem() self._aux_mkr_id_text.setText(str(self.marker_id)+'R') self._aux_mkr_id_text.setColor(SpectrumMarker.NORMAL_PEN_COLOR) self._mkr_id_text_offset = SpectrumMarker.DEFAULT_MKRID_OFFSET self._rgn_mkr = pg.LinearRegionItem() self._rgn_mkr.setZValue(10) @property def data_index(self) -> int: return self._data_index @property def aux_data_index(self) -> int: return self._aux_data_index @property def freq(self) -> float: return float(self._plot_widget.data.freq[self.data_index]) @property def aux_freq(self) -> float: return float(self._plot_widget.data.freq[self.aux_data_index]) @property def pwr(self) -> float: return float(self._plot_widget.data.pwr[self.data_index]) @property def aux_pwr(self) -> float: return float(self._plot_widget.data.pwr[self.aux_data_index]) @property def delta_freq(self) -> float: fdata = self._plot_widget.data.freq return float(fdata[self.aux_data_index] - fdata[self.data_index]) @property def delta_pwr(self) -> float: pdata = self._plot_widget.data.pwr return float(pdata[self.aux_data_index] - pdata[self.data_index]) @property def is_enabled(self) -> bool: return self._is_enabled @is_enabled.setter def is_enabled(self, b: bool) -> None: self._is_enabled = b plot = self._plot_widget._plot if b is True: graphItems = plot.allChildItems() if self.marker_type in [SpectrumMarker.MKR_TYPE_SPAN]: plot.addItem(self._rgn_mkr) self._rgn_mkr.sigRegionChangeFinished.connect( self.span_changed) elif self.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]: if self._mkr not in graphItems: plot.addItem(self._mkr) if self._mkr_id_text not in graphItems: plot.addItem(self._mkr_id_text) if self._aux_mkr not in graphItems: plot.addItem(self._aux_mkr) if self._aux_mkr_id_text not in graphItems: plot.addItem(self._aux_mkr_id_text) else: if self._mkr not in graphItems: plot.addItem(self._mkr) if self._mkr_id_text not in graphItems: plot.addItem(self._mkr_id_text) else: if self.marker_type in [SpectrumMarker.MKR_TYPE_SPAN]: plot.removeItem(self._rgn_mkr) self._rgn_mkr.sigRegionChangeFinished.disconnect( self.span_changed) elif self.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]: plot.removeItem(self._mkr) plot.removeItem(self._mkr_id_text) plot.removeItem(self._aux_mkr) plot.removeItem(self._aux_mkr_id_text) else: plot.removeItem(self._mkr) plot.removeItem(self._mkr_id_text) @property def is_selected(self) -> bool: return self._is_selected @is_selected.setter def is_selected(self, b: bool) -> None: self._is_selected = b if self.is_selected: self._mkr.setStyle( pen=pg.mkPen(SpectrumMarker.SELECTED_PEN_COLOR, width=2), brush=SpectrumMarker.SELECTED_BRUSH_COLOR) self._mkr_id_text.setColor(SpectrumMarker.SELECTED_PEN_COLOR) if self.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]: self._aux_mkr.setStyle( pen=pg.mkPen(SpectrumMarker.SELECTED_PEN_COLOR, width=2), brush=SpectrumMarker.SELECTED_BRUSH_COLOR) self._aux_mkr_id_text.setColor(SpectrumMarker.SELECTED_PEN_COLOR) if self.marker_type in [SpectrumMarker.MKR_TYPE_SPAN]: self._rgn_mkr.setMovable(True) else: self._mkr.setStyle(pen=SpectrumMarker.NORMAL_PEN_COLOR, brush=SpectrumMarker.NORMAL_BRUSH_COLOR) self._mkr_id_text.setColor(SpectrumMarker.NORMAL_PEN_COLOR) if self.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]: self._aux_mkr.setStyle(pen=SpectrumMarker.NORMAL_PEN_COLOR, brush=SpectrumMarker.NORMAL_BRUSH_COLOR) self._aux_mkr_id_text.setColor(SpectrumMarker.NORMAL_PEN_COLOR) if self.marker_type in [SpectrumMarker.MKR_TYPE_SPAN]: self._rgn_mkr.setMovable(False) def set_split_display(self, is_split): if is_split: self._mkr_id_text_offset = SpectrumMarker.SPLIT_MKRID_OFFSET self._headlen = SpectrumMarker.SPLIT_MKR_LEN self._tipangle = SpectrumMarker.SPLIT_MKR_TIPANGLE self._mkr_id_text.setFont( SpectrumPlotWidget.split_annotation_font) self._aux_mkr_id_text.setFont( SpectrumPlotWidget.split_annotation_font) else: self._mkr_id_text_offset = SpectrumMarker.DEFAULT_MKRID_OFFSET self._headlen = SpectrumMarker.DEFAULT_MKR_LEN self._tipangle = SpectrumMarker.DEFAULT_MKR_TIPANGLE self._mkr_id_text.setFont( SpectrumPlotWidget.default_annotation_font) self._aux_mkr_id_text.setFont( SpectrumPlotWidget.default_annotation_font) self.update_marker() @property def marker_id(self): return self._mkr_id @property def marker_item(self): return self._mkr @property def marker_type(self): return self._mkr_type @marker_type.setter def marker_type(self, t): if t in SpectrumMarker.MKR_TYPES: plot = self._plot_widget._plot if self._mkr_type in [SpectrumMarker.MKR_TYPE_DELTA]: if t in [SpectrumMarker.MKR_TYPE_NORMAL, SpectrumMarker.MKR_TYPE_SPAN]: plot.removeItem(self._aux_mkr) plot.removeItem(self._aux_mkr_id_text) if t in [SpectrumMarker.MKR_TYPE_SPAN]: plot.removeItem(self._mkr) plot.removeItem(self._mkr_id_text) if self._mkr_type in [SpectrumMarker.MKR_TYPE_SPAN]: if t in [SpectrumMarker.MKR_TYPE_NORMAL, SpectrumMarker.MKR_TYPE_DELTA]: plot.removeItem(self._rgn_mkr) self._rgn_mkr.sigRegionChangeFinished.disconnect(self.span_changed) if self._mkr_type in [SpectrumMarker.MKR_TYPE_NORMAL]: if t in [SpectrumMarker.MKR_TYPE_SPAN]: plot.removeItem(self._mkr) plot.removeItem(self._mkr_id_text) self._mkr_type = t def _get_text_posn(self, mkr_pos): text_pos = QPointF(mkr_pos[0], mkr_pos[1]) text_scene_pos = self._plot_widget.viewbox.mapViewToScene(text_pos) text_scene_pos.setX(text_scene_pos.x()-7.0) text_scene_pos.setY(text_scene_pos.y()-self._mkr_id_text_offset) return self._plot_widget.viewbox.mapSceneToView(text_scene_pos) def update_marker(self): if ((self._mkr_type == SpectrumMarker.MKR_TYPE_OFF) or (self._plot_widget.data is None) or (self._plot_widget.data.pwr is None) or (self._plot_widget.data.freq is None)): return data_len = len(self._plot_widget.data.freq) if self.data_index < 0: self._data_index = data_len // 2 if self.data_index >= data_len: self._data_index = data_len - 1 if self.aux_data_index < 0: self._aux_data_index = self.data_index + 10 if self.aux_data_index >= data_len: self._aux_data_index = self.data_index - 10 if self.pwr > (self._plot_widget.chan.reflevel - (self._plot_widget.chan.ampl_scale * 0.2)): mkr_pos = (self.freq, self._plot_widget.chan.reflevel) mkr_angle = 90 else: mkr_pos = (self.freq, self.pwr) mkr_angle = -90 if self.aux_pwr > (self._plot_widget.chan.reflevel - (self._plot_widget.chan.ampl_scale * 0.2)): aux_mkr_pos = (self.aux_freq, self._plot_widget.chan.reflevel) aux_mkr_angle = 90 else: aux_mkr_pos = (self.aux_freq, self.aux_pwr) aux_mkr_angle = -90 self._mkr.setPos(mkr_pos[0], mkr_pos[1]) self._aux_mkr.setPos(aux_mkr_pos[0], aux_mkr_pos[1]) self._mkr.setStyle(angle=mkr_angle, headLen=self._headlen, tipAngle=self._tipangle) self._aux_mkr.setStyle(angle=aux_mkr_angle, headLen=self._headlen, tipAngle=self._tipangle) if self.marker_type == SpectrumMarker.MKR_TYPE_DELTA: self._mkr_id_text.setPos(self._get_text_posn(aux_mkr_pos)) self._aux_mkr_id_text.setPos(self._get_text_posn(mkr_pos)) else: self._mkr_id_text.setPos(self._get_text_posn(mkr_pos)) self._aux_mkr_id_text.setPos(self._get_text_posn(aux_mkr_pos)) self._rgn_mkr.setBounds((self._plot_widget.data.freq[0], self._plot_widget.data.freq[-1])) self._rgn_mkr.setRegion((mkr_pos[0], aux_mkr_pos[0])) self._plot_widget.update_marker_annotations() def span_changed(self, rgn): rgn_pos = self._rgn_mkr.getRegion() farr = self._plot_widget.data.freq idx_lo = (np.abs(farr - rgn_pos[0])).argmin() idx_hi = (np.abs(farr - rgn_pos[1])).argmin() self._data_index = idx_lo self._aux_data_index = idx_hi self._plot_widget.update_marker_annotations() def move_marker(self, data_idx) -> None: self._data_index = data_idx self.update_marker() def move_marker_left(self) -> None: new_data_idx = self.data_index - 1 if new_data_idx < 0: new_data_idx = 0 self.move_marker(new_data_idx) def move_marker_right(self) -> None: data_len = len(self._plot_widget.data.freq) new_data_idx = self.data_index + 1 if new_data_idx >= data_len: new_data_idx = data_len - 1 self.move_marker(new_data_idx) def move_aux_marker(self, data_idx) -> None: self._aux_data_index = data_idx self.update_marker() def move_aux_marker_left(self) -> None: new_data_idx = self.aux_data_index - 1 if new_data_idx < 0: new_data_idx = 0 self.move_aux_marker(new_data_idx) def move_aux_marker_right(self) -> None: data_len = len(self._plot_widget.data.freq) new_data_idx = self.aux_data_index + 1 if new_data_idx >= data_len: new_data_idx = data_len - 1 self.move_aux_marker(new_data_idx)
WaterfallPlotWidget Class
from typing import ( Dict, List, Tuple ) from math import log10 from datetime import datetime import numpy as np from scipy import signal import pyqtgraph as pg class WaterfallPlotWidget: """Spectrum waterfall plot""" def __init__(self, app, chan): self._app = app self._chan = chan self.create_plot() self._counter = 0 @property def app(self): return self._app @property def chan(self): return self._chan @chan.setter def chan(self, ch): self._chan = ch @property def data_storage(self): return self.chan.data_storage @property def history_size(self): return self.chan.data_storage.history.history_size @property def scene(self): return self._plot.scene() @property def viewbox(self): return self._plot.getViewBox() def create_plot(self): """Create spectrum waterfall plot""" self._plot = pg.PlotItem() self.viewbox.setMenuEnabled(False) self._plot.setMouseEnabled(x=False, y=False) self._plot.showAxis('right', True) self._plot.getAxis('right').setStyle(showValues=False) self._plot.showAxis('top', True) self._plot.getAxis('top').setStyle(showValues=False) self._plot.showAxis('left', True) self._plot.getAxis('left').setStyle(showValues=True) self._plot.showAxis('bottom', True) self._plot.getAxis('bottom').setStyle(showValues=False) x_axis = self._plot.getAxis('bottom') x_axis.setStyle(showValues=False) def add_plot(self, layout): layout.addItem(self._plot, row=1, col=0) def remove_plot(self, layout): layout.removeItem(self._plot) def update_plot(self, data_storage): if not self.chan.show_waterfall: return self._counter += 1 data = data_storage if self._counter == 1: self._waterfallImg = pg.ImageItem() #self._waterfallImg.scale( # (data.freq[-1] - data.freq[0]) / len(data.freq), 1) self._plot.clear() self._plot.addItem(self._waterfallImg) # Roll down one and replace leading edge with new data self._waterfallImg.setImage( data.history.buffer[-self._counter:].T, autoLevels=False, autoRange=False) # Move waterfall image to always start at 0 self._waterfallImg.setPos( data.freq[0], -self._counter if self._counter < data.history.history_size else -data.history.history_size ) # Link histogram widget to waterfall image on first run # (must be done after first data is received or else levels would be wrong) if self._counter == 1: self.app._wf_histogram.setImageItem(self._waterfallImg)
AdcThread Class
from typing import( Optional, Union ) from math import ceil, floor, log2 from random import gauss import time from datetime import datetime import numpy as np import rpyc from scipy import interpolate, signal, optimize from scipy.stats import truncnorm from numba import jit from PyQt5 import QtTest from PyQt5.QtCore import ( QObject, QRunnable, QThread, QThreadPool, QTimer, QMutex, QThread, QEventLoop, pyqtSignal, pyqtSlot ) # import yappi from tam import ( RP_ADC_MIN_FREQUENCY, RP_ADC_MAX_FREQUENCY, RP_ADC_MAX_SPAN, RP_ADC_MIN_SPAN, BASEBAND_CAL, FREQRESP_CAL, NO_CAL, ChannelCapabilities, InstrumentMgr, InstrumentInitializeException, UnknownInstrumentModelException ) from design import ( CIC_BANDPASS, MIN_DECIMATION_RATE, CHAN_BUFSIZE, AVG_CPG_CORRECTIONS, BIN_COUNT, ADC_122_16_F_CLK, ADC_125_14_F_CLK ) from utils import ( DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) from adccal import AdcCalibration from plot import SpectrumPlotWidget from data import SweepParameters, Spectrum from device import AdcDevice from fe_client import FrontEndClient class AdcThread(QThread): MIN_FREQUENCY: float = 0.01 MAX_FREQUENCY: float = 61.44 CAL_FREQUENCY: float = 10.0 CAL_START: int = 0 CALIBRATING: int = 1 CAL_COMPLETE: int = 2 CAL_ERROR: int = 3 CAL_DELAY: float = 0.25 cal_state_changed = pyqtSignal(int) adc_thread_started = pyqtSignal() adc_thread_stopped = pyqtSignal() sweep_started = pyqtSignal(int, int) enable_sweep_progress = pyqtSignal(int) show_sweep_progress = pyqtSignal(float) def __init__(self, adc_device: AdcDevice, chan) -> None: super().__init__() self._adc_device: AdcDevice = adc_device self._ch = chan self.running: bool = False self._sweep_count: int = 0 self._narrowband_sweep_params: Optional[SweepParameters] = None self._narrowband_freq_offset: float = 0.0 self._narrowband_freq: Optional[np.ndarray] = None self._narrowband_pwr: Optional[np.ndarray] = None self._wideband_sweep_params: Optional[SweepParameters] = None self._wideband_freq: Optional[np.ndarray] = None self._wideband_pwr: Optional[np.ndarray] = None self._wideband_fe_lo: float = 0.0 self._wideband_start_f: float = 0.0 self._wideband_sweep_count = 1 self._wideband_sweep_num = 0 self._reqd_sweeps: int = 0 self.cal_state: int = AdcThread.CAL_COMPLETE self.cal_device: str = None self.cal_device_addr: Union[str, int] = None self.adc_chan = None self.freq_resp_fn = None self.cpg_corr_fn = interpolate.interp1d(AVG_CPG_CORRECTIONS['x'], AVG_CPG_CORRECTIONS['y'], kind='cubic') self._mutex = QMutex() self._reset_datastore: bool = False self._single_sweep: bool = False self._start_freq: float = 0.0 self._stop_freq: float = 0.0 self._scaling: str = 'spectrum' self.apply_freq_resp_corrections: bool = True self.wideband_enabled: bool = False self.sweep_progress_enabled: bool = False @property def adc_device(self) -> AdcDevice: return self._adc_device @property def chan(self): """Return a reference to the associated AnalyzerChannel instance""" return self._ch @property def data_store(self): return self._ch.data_store @property def required_sweeps(self) -> int: return self._reqd_sweeps @property def sweep_count(self) -> int: return self._sweep_count @property def fe(self) -> FrontEndClient: return self._ch.fe @property def fe_chan_id(self) -> str: return self._ch.fe_chan_id @property def adc_chan_id(self) -> int: return self.fe.capabilities[self.fe_chan_id].adcdma_channel @property def adc_chan_type(self) -> int: return self.fe.capabilities[self.fe_chan_id].chan_type @property def single_sweep(self) -> bool: return self._single_sweep @property def start_freq(self) -> float: return self._start_freq @property def stop_freq(self) -> float: return self._stop_freq @property def scaling(self) -> str: return self._scaling def alloc_adc_chan(self, sample_callback_fn=None) -> None: if self.adc_chan is None: self.adc_chan = self.adc_device.alloc_adc_chan( self.adc_chan_id, AdcCalibration.dc_offset(self.adc_chan_id), sample_callback_fn=sample_callback_fn) def free_adc_chan(self) -> None: self.adc_device.free_adc_chan(self.adc_chan_id) self.adc_chan = None def set_sweep_parameters(self, start_freq: float, stop_freq: float, scaling: str, single: bool, sweep_count: int): self._mutex.lock() self._start_freq = start_freq self._stop_freq = stop_freq self._scaling = scaling self._single_sweep = single self._reqd_sweeps = sweep_count self._mutex.unlock() def run(self): if self.cal_state not in [AdcThread.CAL_COMPLETE, AdcThread.CAL_ERROR]: self.run_calibration() return # yappi.set_clock_type("cpu") # yappi.start() self.running = True self.alloc_adc_chan(self.sweep_sample_cb) self.adc_thread_started.emit() while self.running: self._mutex.lock() try: self.sweep_started.emit(self.required_sweeps, self.sweep_count) self.sweep(self.start_freq, self.stop_freq) if self.single_sweep is True: break if self.required_sweeps > 0: self._sweep_count += 1 if self._sweep_count >= self.required_sweeps: self._sweep_count = 0 break except EOFError as eof_error: print(f'{eof_error}') break finally: self._mutex.unlock() self.running = False self.adc_thread_stopped.emit() # yappi.get_func_stats().print_all() <<frequency-compensation>> <<sweeping-the-spectrum>> <<power-spectrum>> <<adc-calibration>> def data_store_reset(self, start_f, stop_f): self._mutex.lock() self._reset_datastore = True self._start_freq = start_f self._stop_freq = stop_f self._mutex.unlock() def stop(self): self._mutex.lock() self.running = False self._mutex.unlock() def close(self): self.stop() self.free_adc_chan()
Sweeping the spectrum
For SINGLE
conversion type RF front ends and given the Red Pitaya ADC
frequency range (together with the anti-alias filter) is approximately 55
MHz, narrow band sweeps are those where the requested frequency span is
less than about 55 MHz. For SUPERHET
type RF front ends the maximum
frequency span for narrow band sweeps is generally determined by the
bandwidth of the image reject filter. For the front end designs used here
the image reject filter will be a provided by some combination of SAW
filters with usable pass bands of approximately 15 to 30 MHz.
Note that for the case of DIRECT
front ends no frequency conversion
takes place and all spectrum sweeps are, in effect, narrow band.
The AdcThread.sweep
method is invoked in order to carry out a spectrum
sweep. The RF front end client freq_window
method is called in order to
determine the bandwidth capability for the channel. The requested
frequency span for the sweep is compared against the channel bandwidth and
either AdcThread.wideband_sweep
or AdcThread.narrowband_sweep
invoked.
def sweep(self, start_freq, stop_freq): freq_window = self.fe.freq_window(self.fe_chan_id) span = stop_freq - start_freq if span > freq_window: if self.wideband_enabled is not True: self.wideband_enabled = True self.initiate_wideband_sweep(start_freq, stop_freq) else: if self.wideband_enabled is True: self.wideband_enabled = False self._wideband_sweep_count = 1 self._wideband_sweep_num = 0 self.initiate_narrowband_sweep(start_freq, stop_freq) def initiate_wideband_sweep(self, start_freq, stop_freq): self._wideband_freq = np.array([]) self._wideband_pwr = np.array([]) adc_centre_freq = self.fe.adc_centre_freq(self.fe_chan_id) bandwidth = self.fe.freq_window(self.fe_chan_id) start_freqs = np.arange(start_freq, stop_freq, bandwidth) stop_freqs = start_freqs + bandwidth stop_freqs[-1] = stop_freq if stop_freqs[-1] > stop_freq else stop_freqs[-1] spans = np.cumsum(stop_freqs - start_freqs) frac_spans = spans / spans[-1] self._wideband_sweep_count = len(frac_spans) self._wideband_sweep_num = 0 if self._reset_datastore is True: self.data_store.reset_trace() self._reset_datastore = False if self.adc_chan_type == ChannelCapabilities.SUPERHET: initial_lo = start_freqs[0] + (stop_freqs[0] - start_freqs[0])/2 else: initial_lo = start_freqs[0] - self.chan.felo_offset for start_f, stop_f, frac_span in zip(start_freqs, stop_freqs, frac_spans): span = stop_f - start_f if self.adc_chan_type == ChannelCapabilities.SUPERHET: offset_freq = adc_centre_freq - (span/2) fe_lo = start_f + span/2 adc_start_freq = offset_freq adc_stop_freq = offset_freq + span else: offset_freq = self.chan.felo_offset fe_lo = start_f - offset_freq adc_start_freq = offset_freq adc_stop_freq = offset_freq + span self.fe.set_lo(self.fe_chan_id, fe_lo) if self.chan.auto_rbw is True: sweep_params = self.configure(adc_start_freq, adc_stop_freq) if self.chan.is_selected is True: self.chan.app.set_nearest_accepted_rbw(sweep_params.rbw) else: sweep_params = self.configure( adc_start_freq, adc_stop_freq, rbw=self.chan.rbw/1e6) sweep_params.fe_lo = initial_lo # This will block until the rpyc async call to adc_chan.fft_sweep # has completed. self._wideband_sweep_params = sweep_params self._wideband_fe_lo = fe_lo self._wideband_start_f = start_f print(f"{sweep_params.nperseg=}") self.initiate_narrowband_sweep(start_f, stop_f) def initiate_narrowband_sweep(self, start_freq, stop_freq): if self.adc_chan_type == ChannelCapabilities.DIRECT: fe_lo = 0.0 adc_start_freq = start_freq adc_stop_freq = stop_freq elif self.adc_chan_type == ChannelCapabilities.SUPERHET: adc_centre_freq = self.fe.adc_centre_freq(self.fe_chan_id) bandwidth = self.fe.freq_window(self.fe_chan_id) span = stop_freq - start_freq offset_freq = adc_centre_freq - (span/2) fe_lo = start_freq + span/2 self.fe.set_lo(self.fe_chan_id, fe_lo) adc_start_freq = offset_freq adc_stop_freq = offset_freq + span else: # ChannelCapabilities.SINGLE span = stop_freq - start_freq offset_freq = self.chan.felo_offset fe_lo = start_freq - offset_freq self.fe.set_lo(self.fe_chan_id, fe_lo) adc_start_freq = offset_freq adc_stop_freq = offset_freq + span if self.chan.auto_rbw is True: sweep_params = self.configure(adc_start_freq, adc_stop_freq) if self.chan.is_selected is True: self.chan.app.set_nearest_accepted_rbw(sweep_params.rbw) else: sweep_params = self.configure( adc_start_freq, adc_stop_freq, rbw=self.chan.rbw/1e6) # Estimate narrowband sweep timing # If sweep progress not enabled and (sweep timing > threshold or wideband enabled): # enable narrowband sweep progress # else: disable narrowband sweep progress if (sweep_params.decimation_rate > 1) or (self.wideband_enabled is True): if self.sweep_progress_enabled is False: self.sweep_progress_enabled = True self.enable_sweep_progress.emit(1) else: if self.sweep_progress_enabled is True: self.enable_sweep_progress.emit(0) self.sweep_progress_enabled = False self._narrowband_freq = np.array([]) self._narrowband_pwr = np.array([]) self._narrowband_sweep_params = sweep_params self._narrowband_freq_offset = start_freq - adc_start_freq adc_conv_factor = 1.0 / AdcCalibration.adc_to_volts(self.adc_chan_id) async_fft_sweep = rpyc.async_(self.adc_chan.fft_sweep) res = async_fft_sweep(adc_start_freq, adc_stop_freq, sweep_params.rbw/1e6, adc_conv_factor) self._mutex.unlock() while not res.ready: QtTest.QTest.qWait(5) self.adc_device.poll() if not self.running: # Check to see if there are more aync responses # incoming. If there are, process them. while not res.ready: QtTest.QTest.qWait(5) self.adc_device.poll() self._mutex.lock() return self._mutex.lock() def wideband_sweep_update(self): self._wideband_freq = np.concatenate( (self._wideband_freq, self._narrowband_freq)) self._wideband_pwr = np.concatenate( (self._wideband_pwr, self._narrowband_pwr)) def wideband_sweep_complete(self): lin_pwr = 10**(self._wideband_pwr/10.0) freq, lin_pwr = self.resample_spectrum(self._wideband_freq, lin_pwr) pwr = 10.0 * np.log10(lin_pwr) sweep_data = Spectrum(freq, pwr, None, self._wideband_sweep_params, datetime.now()) self.data_store.update(sweep_data) self.show_sweep_progress.emit(0.0) def narrowband_sweep_update(self, lo_num: int, lo_count: int) -> None: sweep_frac = ((self._wideband_sweep_num/self._wideband_sweep_count) + (((lo_num+1)/lo_count) * 1/self._wideband_sweep_count)) self.show_sweep_progress.emit(sweep_frac) def narrowband_sweep_complete(self): if len(self._narrowband_freq) == 0: return df = self._narrowband_freq[-1] - self._narrowband_freq[0] bandwidth = self.fe.freq_window(self.fe_chan_id) if self.wideband_enabled is True: freq, pwr = self.resample_spectrum( self._narrowband_freq, self._narrowband_pwr * (bandwidth/df), bw_frac=df/bandwidth) else: freq, pwr = self.resample_spectrum(self._narrowband_freq, self._narrowband_pwr) rate_correction = AdcCalibration.rate_correction( self.adc_chan_id, self._narrowband_sweep_params.decimation_rate) avg_corr = self.avg_cpg_correction( self._narrowband_sweep_params) pwr = self.rms_volts_sq_to_dB(pwr, rate_correct=rate_correction, avg_corr=avg_corr) freq += self._narrowband_freq_offset pwr += self.chan.fe_atten + self.chan.atten - self.chan.fe_gain if self.wideband_enabled is True: if self.adc_chan_type == ChannelCapabilities.SUPERHET: freq_shift = self._wideband_start_f - self._narrowband_freq[0] else: freq_shift = self._wideband_fe_lo pwr += self.frequency_response_correction( pwr, freq, self.chan.rbw, lo=freq_shift) else: pwr += self.frequency_response_correction( pwr, freq, self.chan.rbw, lo=self._narrowband_freq_offset) if self.wideband_enabled is True: self._narrowband_freq = freq self._narrowband_pwr = pwr self.wideband_sweep_update() else: sweep_data = Spectrum( freq, pwr, None, self._narrowband_sweep_params, datetime.now()) self.data_store.update(sweep_data) # Reset narrowband sweep progress to 0 self.show_sweep_progress.emit(0.0) def sweep_sample_cb(self, freq: np.ndarray, pwr: np.ndarray, num: int, count: int): print("sweep_sample_cb:") print(f" {np.array(freq).mean()=}") self._narrowband_freq = np.concatenate((self._narrowband_freq, np.array(freq))) self._narrowband_pwr = np.concatenate((self._narrowband_pwr, np.array(pwr))) self.narrowband_sweep_update(num, count) if num == count-1: self.narrowband_sweep_complete() if self.wideband_enabled is True: self._wideband_sweep_num += 1 if self._wideband_sweep_num == self._wideband_sweep_count: self.wideband_sweep_complete() def narrowband_sweep(self, start_freq, stop_freq): if self.adc_chan_type == ChannelCapabilities.DIRECT: fe_lo = 0.0 adc_start_freq = start_freq adc_stop_freq = stop_freq elif self.adc_chan_type == ChannelCapabilities.SUPERHET: adc_centre_freq = self.fe.adc_centre_freq(self.fe_chan_id) bandwidth = self.fe.freq_window(self.fe_chan_id) span = stop_freq - start_freq offset_freq = adc_centre_freq - (span/2) fe_lo = start_freq + span/2 self.fe.set_lo(self.fe_chan_id, fe_lo) adc_start_freq = offset_freq adc_stop_freq = offset_freq + span else: span = stop_freq - start_freq offset_freq = self.chan.felo_offset fe_lo = start_freq - offset_freq self.fe.set_lo(self.fe_chan_id, fe_lo) adc_start_freq = offset_freq adc_stop_freq = offset_freq + span if self.chan.auto_rbw is True: sweep_params = self.configure(adc_start_freq, adc_stop_freq) if self.chan.is_selected is True: self.chan.app.set_nearest_accepted_rbw(sweep_params.rbw) else: sweep_params = self.configure( adc_start_freq, adc_stop_freq, rbw=self.chan.rbw/1e6) sweep_params.fe_lo = fe_lo if self._reset_datastore is True: self.data_store.reset_trace() self._reset_datastore = False raw_freq, raw_pwr, iq_data = self.power_spectrum(sweep_params) if raw_freq is None or raw_pwr is None: return rate_correction = AdcCalibration.rate_correction(self.adc_chan_id, sweep_params.decimation_rate) avg_corr = self.avg_cpg_correction(sweep_params) freq, pwr = self.resample_spectrum(raw_freq, raw_pwr) pwr = self.rms_volts_sq_to_dB(pwr, rate_correct=rate_correction, avg_corr=avg_corr) freq += start_freq - adc_start_freq pwr += self.chan.fe_atten + self.chan.atten - self.chan.fe_gain pwr += self.frequency_response_correction( pwr, freq, self.chan.rbw, lo=start_freq - adc_start_freq) sweep_data = Spectrum(freq, pwr, iq_data, sweep_params, datetime.now()) self.data_store.update(sweep_data) def wideband_sweep(self, start_freq, stop_freq, max_sample_span=40.0): freq = np.array([]) pwr = np.array([]) adc_centre_freq = self.fe.adc_centre_freq(self.fe_chan_id) bandwidth = self.fe.freq_window(self.fe_chan_id) start_freqs = np.arange(start_freq, stop_freq, max_sample_span) stop_freqs = start_freqs + max_sample_span stop_freqs[-1] = stop_freq if stop_freqs[-1] > stop_freq else stop_freqs[-1] spans = np.cumsum(stop_freqs - start_freqs) frac_spans = spans / spans[-1] if self._reset_datastore is True: self.data_store.reset_trace() self._reset_datastore = False if self.adc_chan_type == ChannelCapabilities.SUPERHET: initial_lo = start_freqs[0] + (stop_freqs[0] - start_freqs[0])/2 else: initial_lo = start_freqs[0] - self.chan.felo_offset for start_f, stop_f, frac_span in zip(start_freqs, stop_freqs, frac_spans): span = stop_f - start_f if self.adc_chan_type == ChannelCapabilities.SUPERHET: offset_freq = adc_centre_freq - (span/2) fe_lo = start_f + span/2 adc_start_freq = offset_freq adc_stop_freq = offset_freq + span else: offset_freq = self.chan.felo_offset fe_lo = start_f - offset_freq adc_start_freq = offset_freq adc_stop_freq = offset_freq + span self.fe.set_lo(self.fe_chan_id, fe_lo) if self.chan.auto_rbw is True: sweep_params = self.configure(adc_start_freq, adc_stop_freq) if self.chan.is_selected is True: self.chan.app.set_nearest_accepted_rbw(sweep_params.rbw) else: sweep_params = self.configure( adc_start_freq, adc_stop_freq, rbw=self.chan.rbw/1e6) sweep_params.fe_lo = initial_lo raw_freq, raw_pwr, iq_data = self.power_spectrum(sweep_params) if raw_freq is None or raw_pwr is None: return if self.adc_chan_type == ChannelCapabilities.SUPERHET: freq_shift = start_f - raw_freq[0] else: freq_shift = fe_lo rate_correction = AdcCalibration.rate_correction(self.adc_chan_id, sweep_params.decimation_rate) avg_corr = self.avg_cpg_correction(sweep_params) sample_freq, sample_pwr = self.resample_spectrum( raw_freq + freq_shift, raw_pwr) sample_pwr = self.rms_volts_sq_to_dB(sample_pwr, rate_correct=rate_correction, avg_corr=avg_corr) sample_pwr += self.chan.fe_atten + self.chan.atten - self.chan.fe_gain sample_pwr += self.frequency_response_correction( sample_pwr, sample_freq, self.chan.rbw, lo=freq_shift) freq = np.concatenate((freq, sample_freq)) pwr = np.concatenate((pwr, sample_pwr)) lin_pwr = 10**(pwr/10.0) freq, lin_pwr = self.resample_spectrum(freq, lin_pwr) pwr = 10.0 * np.log10(lin_pwr) sweep_data = Spectrum(freq, pwr, iq_data, sweep_params, datetime.now()) self.data_store.update(sweep_data) def configure(self, start_freq: float, stop_freq: float, scaling: str='spectrum', rbw: Optional[float]=None, reqd_decimation_rate: Optional[int]=None): if start_freq < RP_ADC_MIN_FREQUENCY: start_freq = RP_ADC_MIN_FREQUENCY if stop_freq > RP_ADC_MAX_FREQUENCY: stop_freq = RP_ADC_MAX_FREQUENCY return SweepParameters(start_freq, stop_freq, rbw, reqd_decimation_rate) def resample_spectrum(self, freq, pwr, bw_frac=1.0): if bw_frac < 0.99: bin_count = int(SweepParameters.displayed_bin_count * bw_frac) else: bin_count = SweepParameters.displayed_bin_count if len(freq) > bin_count: resampled_freq, resampled_pwr = self.detector( self.chan.detector_type, freq, pwr, bin_count) elif len(freq) < bin_count: resampled_freq, resampled_pwr = self.interpolate_sweep( freq[0], freq[-1], freq, pwr, bin_count) else: resampled_freq = freq resampled_pwr = pwr return (resampled_freq, resampled_pwr) def rms_volts_sq_to_dB(self, pwr, rate_correct=0.0, avg_corr=0.0): # Note that the scipy welch function returns the spectrum in units # of V**2. This needs to be converted to dBm. Some care is required # here since the input to welch is in peak volts (rather than rms). # Since the system has a characteristic impedance of 50 ohms this # then implies that the output from welch is converted to dBm as # follows where the constant 1.60206 is actually log10(2/50e-3) which # takes care of the characteristic impedance and the conversion of # peak to rms voltage. p = np.array(pwr) log_pwr = (10.0 * (1.60206 + np.log10( np.where(p > 0, p, np.full(len(p), 1e-18)))) + rate_correct + avg_corr) return log_pwr def avg_cpg_correction(self, sweep_params: SweepParameters) -> float: window_count = sweep_params.chunk_size / sweep_params.nperseg if window_count < 32: if window_count < 1: window_count = 1 avg_corr = self.cpg_corr_fn(window_count) else: avg_corr = 0.0 return avg_corr def interpolate_sweep(self, f_start, f_stop, freq, pwr, bins): resampled_freq = np.linspace(f_start, f_stop, bins) resampled_pwr = np.interp(resampled_freq, freq, pwr) return (resampled_freq, resampled_pwr) def resample_sweep(self, f_start, f_stop, freq, pwr, bins): def resample(start, stop, f, p, window_size, count): j = k = window_size//2 if window_size % 2: k += 1 reqd_freq_bins = np.linspace(start, stop, count) resampled_list = [] for f in reqd_freq_bins: i = (np.abs(freq-f)).argmin() if i - j < 0: p = pwr[0:k].max() elif i + k >= len(pwr): p = pwr[i-j:].max() else: p = pwr[i-j:i+k].max() resampled_list.append(p) resampled_freq = reqd_freq_bins resampled_pwr = np.array(resampled_list) return (resampled_freq, resampled_pwr) resample_window_size = floor(len(freq)/bins) return resample(f_start, f_stop, freq, pwr, resample_window_size, bins)
Wide band sweeps
Figure 5: Wide band sweep strategy
In order to cover the requested frequency span a number of front end
local oscillator settings are required. A set of ADC samples must be
retrieved for each LO setting. In comparison to the speed to the narrow
band sweep this makes wide band sweeps quite slow. If fast wide band
sweeps are a requirement then as a first step to achieving this the
ADCService
and FrontEndService
would need to be integrated into a
single process running on the Red Pitaya board. This single process
would also need to include the spectrum sweep code.
from typing import ( Optional, Dict ) import sys from dataclasses import dataclass from math import ceil, floor from datetime import datetime import numpy as np import scipy from scipy import signal, interpolate from tam import ( RP_ADC_MIN_FREQUENCY, RP_ADC_MAX_FREQUENCY, RP_ADC_MAX_SPAN, RP_ADC_MIN_SPAN ) from utils import ( DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) from design import ( CIC_BANDPASS, ADC_122_16_F_CLK, HFT144D, BIN_COUNT, CHAN_BUFSIZE, MIN_DECIMATION_RATE, MAX_DECIMATION_RATE, AVG_CPG_CORRECTIONS ) from adccal import AdcCalibration from fe_client import FrontEndClient @dataclass class SweepParameters(): cic_bandpass = CIC_BANDPASS offset_bins = 5 # displayed_bin_count = BIN_COUNT displayed_bin_count = 1000 # Start and stop frequencies in MHz. f_start: float f_stop: float # Resolution bandwidth in MHz. res_bw: Optional[float] = None reqd_decimation_rate: Optional[int] = None fe_lo: float = 0.0 s1: float = 0.0 s2: float = 0.0 window_type: str = 'HFT144D' window_w3db: float = 4.4697 f_clk: float = ADC_122_16_F_CLK _window = None @property def f_span(self): return self.f_stop - self.f_start @property def initial_bin_width(self): """The bandwidth of each bin in the FFT spectrum (in Hz) """ if self.res_bw is not None: return (self.res_bw*1e6) / self.window_w3db else: return (self.f_span*1e6)/SweepParameters.displayed_bin_count @property def bin_count(self): if self.initial_bin_width > 10000.0: multiplier = 1 if self.initial_bin_width > 5000.0: multiplier = 2 elif self.initial_bin_width > 1000.0: multiplier = 3 elif self.initial_bin_width > 100.0: multiplier = 4 elif self.initial_bin_width > 20.0: multiplier = 6 elif self.initial_bin_width > 10.0: multiplier = 8 elif self.initial_bin_width > 1.0: multiplier = 15 else: multiplier = 30 return self.displayed_bin_count * multiplier @property def bin_width(self): return self.fs/self.nperseg @property def decimation_rate(self): def estimate_rate(): N = self.bin_count R = self.f_clk / (N * 2 * self.initial_bin_width) if R < MIN_DECIMATION_RATE: R = 1 else: R *= SweepParameters.cic_bandpass return R if self.reqd_decimation_rate is not None: return self.reqd_decimation_rate R = estimate_rate() # Adjust the sample size to make the decimation rate an integer value N = self.bin_count delta_R = R - round(R) delta_N = delta_R * N * N * (self.initial_bin_width/self.f_clk) actual_bin_count = N + delta_N R = self.f_clk / (actual_bin_count * 2 * self.initial_bin_width) if R < MIN_DECIMATION_RATE: rate = 1 else: rate = round(R * SweepParameters.cic_bandpass) if rate < MIN_DECIMATION_RATE: rate = 1 return rate @property def fs(self): return self.f_clk/self.decimation_rate @property def fft_bins(self): return floor((self.f_span*1e6)/self.bin_width) @property def sample_span(self): # The CIC decimator is used to generate samples with decimation # rates >1. The bandpass for the decimator is approximately # 0.3 * Nyquist. In order to prevent aliases within the bandpass # region the sample span is set so that the frequency span of # interest falls within the decimator bandpass. if self.decimation_rate == 1: return self.f_span else: return ((self.f_clk/(self.decimation_rate * 1e6)) * SweepParameters.cic_bandpass) @property def nperseg(self): n = round(self.f_clk / (self.decimation_rate * self.initial_bin_width)) return scipy.fft.next_fast_len(n) @property def sample_bins(self): """A count of the valid bins within a single sample. """ if self.decimation_rate == 1: return self.nperseg else: return round(self.nperseg * SweepParameters.cic_bandpass) @property def chunk_size(self): chunk_size = 2 * self.nperseg if chunk_size > CHAN_BUFSIZE: chunk_size = CHAN_BUFSIZE return chunk_size def initialize_window(self): self._window = scipy.signal.get_window( HFT144D, Nx=self.nperseg + self.lo_offset_bins) self.s1 = self._window.sum() self.s2 = (self._window * self._window).sum() @property def window(self): if self._window is None: self.initialize_window() return self._window @property def lo_offset_bins(self): bins = SweepParameters.offset_bins while bins > 0 and (self.f_start - (bins * self.bin_width)/1e6) < 0: bins -= 1 return bins @property def lo_offset(self): return (self.lo_offset_bins * self.bin_width)/1e6 @property def enbw(self): if self._window is None: self.initialize_window() return (self.fs * self.s2)/(self.s1 * self.s1) @property def rbw(self): """The resolution bandwidth (in Hz). """ return self.window_w3db * self.bin_width @property def lo_count(self): return ceil(self.f_span / self.sample_span) def resample_sweep_2(f_start, f_stop, freq, pwr, bins): new_freq = np.linspace(f_start, f_stop, bins) df = freq[1] - freq[0] int_pwr = np.cumsum(pwr) * df int_pwr2 = np.interp(new_freq, freq, int_pwr) pwr2 = np.diff(np.concatenate(([0], int_pwr2))) / np.diff(np.concatenate(([0], new_freq))) return (new_freq, pwr2) def peak_detect(pwr, resampled_idx, window_size): """Find peaks within a spectrum prior to resampling. Peaks are found by first finding the mean and std. dev. of the spectrum power values in each resampling window (bin). The values of the lowest NOISE_PERCENTILE means and std. devs. are then calculated. If both the mean and std. dev. of a given resampled spectrum bin is greater than the MEAN_THRESHOLD and STD_THRESHOLD respectively then the bin is marked as a peak otherwise it is marked as noise. :param pwr: A numpy array containing the spectrum power values as computed by the :py:meth:`power_spectrum` function. :type pwr: nparray :param resampled_idx: Array indexes for the resampled spectrum :type resampled_idx: List[int] :param window_size: The window size for the resampled spectrum :type window_size: int :return: A numpy array of bool values. Each value corresponds to a value in the resampled spectrum and is True if the associated power value is a peak and False if the value is noise. """ NOISE_PERCENTILE = 50 STD_THRESHOLD = 4.0 MEAN_THRESHOLD = 4.0 def stats(arr): return np.mean(arr), np.std(arr) j = k = window_size//2 if window_size % 2: k += 1 means = [] stds = [] for i in resampled_idx: if i - j < 0: m, s = stats(pwr[0:k]) elif i + k >= len(pwr): m, s = stats(pwr[i-j:]) else: m, s = stats(pwr[i-j:i+k]) means.append(m) stds.append(s) baseline_mean = np.percentile(means, NOISE_PERCENTILE) baseline_std = np.percentile(stds, NOISE_PERCENTILE) print(f"{baseline_mean=}, {baseline_std=}") bins = len(resampled_idx) if baseline_std == 0 or baseline_mean == 0: is_peak = np.ones(bins, dtype=bool) else: is_peak = np.where( ((stds/baseline_std > STD_THRESHOLD) & (means/baseline_mean > MEAN_THRESHOLD)), np.ones(bins, dtype=bool), np.zeros(bins, dtype=bool)) return is_peak def detector(detect_type, freq, pwr, bins): """Resample a power spectrum. :param detect_type: The detector type to use when resampling. One of DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_NORMAL. :type detect_type: int. :param freq: Spectrum frequency array :type freq: nparray :param pwr: Spectrum power array :type pwr: nparray :param bins: The number of bins required in the resampled spectrum. :type bins: int """ def max_pwr(arr, idx): """Find the maximum value of the idx'th bucket in arr. """ if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.max() def min_pwr(arr, idx): """Find the minimum value of the idx'th bucket in arr. """ if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.min() def sample_pwr(arr, idx): """Return the middle value of the idx'th bucket in arr. """ if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return np.take(a, a.size // 2) f_start = freq[0] f_stop = freq[-1] window_size = floor(len(freq)/bins) j = k = window_size//2 if window_size % 2: k += 1 resampled_freqs = np.linspace(f_start, f_stop, bins) indexes = [(np.abs(freq-f)).argmin() for f in resampled_freqs] resampled_pwr = [] if detect_type == DETECTOR_TYPE_POS: for i in indexes: resampled_pwr.append(max_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NEG: for i in indexes: resampled_pwr.append(min_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_SAMPLE: for i in indexes: resampled_pwr.append(sample_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NORMAL: is_peak = peak_detect(pwr, indexes, window_size) for i in range(bins): if is_peak[i]: resampled_pwr.append(max_pwr(pwr, indexes[i])) else: if i % 2: # Odd resampled_pwr.append(min_pwr(pwr, indexes[i])) else: resampled_pwr.append(max_pwr(pwr, indexes[i])) return (resampled_freqs, resampled_pwr) def resample_sweep(f_start, f_stop, freq, pwr, bins): window_size = floor(len(freq)/bins) j = k = window_size//2 if window_size % 2: k += 1 reqd_freq_bins = np.linspace(f_start, f_stop, bins) resampled_list = [] for f in reqd_freq_bins: i = (np.abs(freq-f)).argmin() if i - j < 0: p = pwr[0:k].max() elif i + k >= len(pwr): p = pwr[i-j:].max() else: p = pwr[i-j:i+k].max() resampled_list.append(p) resampled_freq = reqd_freq_bins resampled_pwr = np.array(resampled_list) return (resampled_freq, resampled_pwr) def interpolate_sweep(f_start, f_stop, freq, pwr, bins): resampled_freq = np.linspace(f_start, f_stop, bins) resampled_pwr = np.interp(resampled_freq, freq, pwr) return (resampled_freq, resampled_pwr) def convolve_sweep(f_start, f_stop, freq, pwr, bins): window = scipy.signal.get_window(HFT144D, Nx=bins, fftbins=False) resampled_pwr = scipy.signal.convolve(pwr, window, mode='same') # init_style() # fig = plt.figure(num=None, figsize=(6.0, 4.0), dpi=72) # ax = fig.add_subplot(111) # ax.grid(linestyle=':') # ax.grid(which='both', axis='x', linestyle=':') # _ = ax.plot(range(len(resampled_pwr)), resampled_pwr) # fig.tight_layout() # fig.show() # input('Press a key to exit> ') return (freq, resampled_pwr) def resample_spectrum(freq, pwr): if len(freq) > SweepParameters.displayed_bin_count: resampled_freq, resampled_pwr = detector( DETECTOR_TYPE_POS, freq, pwr, SweepParameters.displayed_bin_count) elif len(freq) < SweepParameters.displayed_bin_count: resampled_freq, resampled_pwr = interpolate_sweep( freq[0], freq[-1], freq, pwr, SweepParameters.displayed_bin_count) else: resampled_freq = freq resampled_pwr = pwr return (resampled_freq, resampled_pwr) def felo_offset(span): adc_centre_freq = (RP_ADC_MIN_FREQUENCY + ((RP_ADC_MAX_FREQUENCY - RP_ADC_MIN_FREQUENCY)/2)) min_felo_offset = 0.0 if span > RP_ADC_MAX_SPAN: return 0.0 else: return adc_centre_freq - (span/2) def configure(start_freq: float, stop_freq: float, scaling: str='spectrum', rbw: Optional[float]=None, reqd_decimation_rate: Optional[int]=None): if start_freq < RP_ADC_MIN_FREQUENCY: start_freq = RP_ADC_MIN_FREQUENCY if stop_freq > RP_ADC_MAX_FREQUENCY: stop_freq = RP_ADC_MAX_FREQUENCY sweep_params = SweepParameters(start_freq, stop_freq, rbw, reqd_decimation_rate) print(f'{sweep_params.decimation_rate=}') print(f'{sweep_params.bin_width=}') print(f'{sweep_params.nperseg=}') print(f'{sweep_params.fs=}') print(f'{sweep_params.sample_span=}') print(f'{sweep_params.sample_bins=}') print(f'{sweep_params.rbw=}') print(f'{sweep_params.fft_bins=}') print(f'{sweep_params.lo_count=}') print(f'{sweep_params.f_start=}') print(f'{sweep_params.f_stop=}') print(f'{sweep_params.lo_offset_bins=}') print(f'{sweep_params.lo_offset=}') lo_list = np.arange( sweep_params.f_start, sweep_params.f_stop, sweep_params.sample_span) bucket_size = sweep_params.fft_bins / SweepParameters.displayed_bin_count print(f'{lo_list=}') print(f'{bucket_size=}') return sweep_params def adc_conv_factor(chan_id: int) -> float: return 1.0 / AdcCalibration.adc_to_volts(chan_id) def adc_dc_offset(chan_id: int) -> float: return AdcCalibration.dc_offset(chan_id) def rate_correction(adc_chan_id: int, decimation_rate: int) -> float: return AdcCalibration.rate_correction(adc_chan_id, decimation_rate) def avg_cpg_correction(sweep_params: SweepParameters) -> float: cpg_corr_fn = interpolate.interp1d(AVG_CPG_CORRECTIONS['x'], AVG_CPG_CORRECTIONS['y'], kind='cubic') window_count = sweep_params.chunk_size / sweep_params.nperseg if window_count < 32: if window_count < 1: window_count = 1 avg_corr = cpg_corr_fn(window_count) else: avg_corr = 0.0 return avg_corr def rms_volts_sq_to_dB(pwr, rate_correct=0.0, avg_corr=0.0): # Note that the scipy welch function returns the spectrum in units # of V**2. This needs to be converted to dBm. Some care is required # here since the input to welch is in peak volts (rather than rms). # Since the system has a characteristic impedance of 50 ohms this # then implies that the output from welch is converted to dBm as # follows where the constant 1.60206 is actually log10(2/50e-3) which # takes care of the characteristic impedance and the conversion of # peak to rms voltage. log_pwr = (10.0 * (1.60206 + np.log10(pwr)) + rate_correct + avg_corr) return log_pwr def sweep(lo_list, params, freq, pwr): """ """ # The number of (time domain) samples should be # sweep_params.nperseg * number of windows to use # Let's try window_count = 5 as a minimum window_count = 2 samples = ch.sweep( lo_list - params.lo_offset, params.decimation_rate, (params.nperseg * window_count) + params.lo_offset_bins) print(f'{len(samples)=}') if params.fft_bins >= params.sample_bins: sample_sizes = [params.sample_bins for i in range(params.lo_count)] sample_sizes[-1] = params.fft_bins % params.sample_bins else: sample_sizes = [params.fft_bins+2] adc_conv_factor = adc_conv_factor(channel) avg_corr = AVG_CPG_CORRECTIONS['y'][0] rate_correction = AdcCalibration.rate_correction( channel, params.decimation_rate) for i, sample in samples.items(): sample_lo = lo_list[i] sample_arr = np.array(sample) scaled_data = (adc_conv_factor*sample_arr.real + (1j)*adc_conv_factor*sample_arr.imag) f, Pxx_den = signal.welch( scaled_data, params.fs, detrend='linear', noverlap=params.nperseg*0.8, average='median', window=params.window, return_onesided=False, scaling='spectrum') freq = np.concatenate( (freq, (fe_lo + sample_lo + f[:sample_sizes[i]] / 1e6))) pwr = np.concatenate( (pwr, Pxx_den[params.lo_offset_bins:(sample_sizes[i] + params.lo_offset_bins)]))
import matplotlib import matplotlib.pyplot as plt from dyadic.splot import init_style matplotlib.use('Qt5Agg') import rpyc from utils import ( DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) def baseband_test(): def freq_limits(fspan): centre_freq = 10.0 return (centre_freq-(fspan/2), centre_freq+(fspan/2)) startf, stopf = freq_limits(10.0) print(f'{startf:.1f} -> {stopf:.1f}, reqd_decimation_rate=1') configure(startf, stopf, reqd_decimation_rate=1) print() startf, stopf = freq_limits(1.2) print(f'{startf:.1f} -> {stopf:.1f}, reqd_decimation_rate=16') configure(startf, stopf, reqd_decimation_rate=16) print() startf, stopf = freq_limits(0.29) print(f'{startf:.3f} -> {stopf:.3f}, reqd_decimation_rate=64') configure(startf, stopf, reqd_decimation_rate=64) print() startf, stopf = freq_limits(0.0722) print(f'{startf:.4f} -> {stopf:.4f}, reqd_decimation_rate=256') configure(startf, stopf, reqd_decimation_rate=256) print() startf, stopf = freq_limits(0.0045) print(f'{startf:.5f} -> {stopf:.5f}, reqd_decimation_rate=4096') configure(startf, stopf, reqd_decimation_rate=4096) print() startf, stopf = freq_limits(0.00225) print(f'{startf:.5f} -> {stopf:.5f}, reqd_decimation_rate=8192') configure(startf, stopf, reqd_decimation_rate=8192) print() print('7.0 -> 17.0, rbw=500Hz') configure(6.0, 16.0, rbw=0.0005) print() print('10.0 -> 15.0, rbw=10kHz') configure(10.0, 15.0, rbw=0.01) print() print('10.0 -> 15.0') configure(10.0, 15.0) print() print('11.99995 -> 12.00005') configure(11.99995, 12.00005) print() print('10.0 -> 15.0, rbw=1kHz') configure(10.0, 15.0, rbw=0.001) print() print('9.5 -> 14.5, rbw=1kHz') params = configure(9.5, 14.5, rbw=0.001) print() print('11.5 -> 12.5, rbw=100Hz') configure(11.5, 12.5, rbw=0.0001) print() print('11.9 -> 12.1, rbw=10Hz') configure(11.9, 12.1, rbw=0.00001) print() print('11.95 -> 12.05, rbw=5Hz') configure(11.95, 12.05, rbw=0.000005) print() print('11.9975 -> 12.0025, rbw=1Hz') configure(11.9975, 12.0025, rbw=0.000001) print() print('11.99 -> 12.01, rbw=1Hz') configure(11.99, 12.01, rbw=0.000001) print() print('0.5 -> 55.0') configure(0.5, 55.0) print() print('0.5 -> 55.0, rbw=500kHz') configure(0.5, 55.0, rbw=0.5) print('----------') #print('10.0 -> 24.0, rbw=10kHz') #params = configure(10.0, 24.0, rbw=0.01) #print('10.0 -> 15.0, rbw=1kHz') #configure(10.0, 15.0, rbw=0.001) print('17.3728 -> 24.0, rbw=1kHz') params = configure(17.3728, 24.0, rbw=0.001) return params if __name__ == '__main__': frontend_ip = '127.0.0.1' frontend_port = 18870 frontend = FrontEndClient(frontend_ip, frontend_port, fe_config=None) adc_ip = '192.168.0.155' channel = 0 chan_id = 'Ch 1' adc = rpyc.connect(adc_ip, 18900) status = adc.root.initialize() if status is False: print(f"Can't connect to ADC at {adc_ip}") sys.exit(-1) ch = adc.root.allocate_channel( channel, 1, 0.0, 0, 32*1024, 8*1024, 'F', double_buffer=False, prefill_buffers=False) if ch is None: print(f"Can't allocate ADC channel at {adc_ip}") sys.exit(-2) print('980.0 - 1020.0, rbw=20kHz') start_freq = 200.0 stop_freq = 1000.0 rbw = 0.1 max_sample_span = 40.0 freq = np.array([]) pwr = np.array([]) fe_lo_list = [] lo_lists = [] init_style() start_freqs = np.arange(start_freq, stop_freq, max_sample_span) stop_freqs = start_freqs + max_sample_span stop_freqs[-1] = stop_freq if stop_freqs[-1] > stop_freq else stop_freqs[-1] for start_f, stop_f in zip(start_freqs, stop_freqs): print(f'{start_f=}, {stop_f=}') span = stop_f - start_f offset_freq = felo_offset(span) fe_lo = start_f - offset_freq fe_lo_list.append(fe_lo) print(f"{offset_freq=}, {fe_lo=}") adc_start_freq = offset_freq adc_stop_freq = offset_freq + span frontend.set_lo(chan_id, fe_lo) params = configure(adc_start_freq, adc_stop_freq, rbw=rbw) lo_list = np.arange(params.f_start, params.f_stop, params.sample_span) lo_lists.append(lo_list) sweep(lo_list, params, freq, pwr) fig = plt.figure(num=None, figsize=(6.0, 4.0), dpi=72) ax = fig.add_subplot(111) _ = ax.set_xlabel('Frequency (MHz)') _ = ax.set_ylabel('Power (dBm)') _ = ax.set_xlim(start_freq, stop_freq) _ = ax.set_ylim(-160.0, 0.0) ax.grid(linestyle=':') ax.grid(which='both', axis='x', linestyle=':') if len(freq) > params.displayed_bin_count: resampled_freq, resampled_pwr = detector( DETECTOR_TYPE_POS, freq, pwr, params.displayed_bin_count) log_pwr = 10.0 * (1.60206 + np.log10(resampled_pwr)) + rate_correction + avg_corr _ = ax.plot(resampled_freq, log_pwr) elif len(freq) == params.displayed_bin_count: log_pwr = 10.0 * (1.60206 + np.log10(pwr)) + rate_correction + avg_corr _ = ax.plot(freq, log_pwr) else: # len(freq) < params.displayed_bin_count resampled_freq, resampled_pwr = interpolate_sweep( freq[0], freq[-1], freq, pwr, params.displayed_bin_count) log_pwr = 10.0 * (1.60206 + np.log10(resampled_pwr)) + rate_correction + avg_corr _ = ax.plot(resampled_freq, log_pwr) lo_freqs = [] for fe_lo, lo_list in zip(fe_lo_list, lo_lists): lo_freqs = lo_list + fe_lo for lo in lo_freqs: _ = ax.axvline(lo, color='r', linestyle='dotted') print(f"{len(resampled_freq)=}") fig.tight_layout() fig.show() adc.root.free_channel(channel) adc.close() input('Press a key to exit> ')
from typing import ( Optional, List, Dict ) import sys import signal import time import asyncio from argparse import ArgumentParser from qasync import ( QEventLoop, QThreadExecutor, asyncSlot, asyncClose ) from PyQt5.QtCore import ( Qt, QCoreApplication, QObject, QThread, pyqtSignal, pyqtSlot ) import numpy as np import rpyc from rpyc.utils.server import ThreadedServer from test_fns import ( configure, adc_conv_factor, adc_dc_offset, rate_correction, avg_cpg_correction, resample_spectrum, rms_volts_sq_to_dB ) class SweepTestApp(QObject): sample_data_updated = pyqtSignal(int, int) sample_data_complete = pyqtSignal() def __init__(self, adc_addr: str, adc_port: int) -> None: super().__init__() self._adc_addr = adc_addr self._adc_port = adc_port self.adc_service = None self.adc_chan = None self._sample_freq = None self._sample_pwr = None self._freq = None self._pwr = None self._busy = False self._sweep_res = None @property def freq(self) -> np.ndarray: return self._freq @property def pwr(self) -> np.ndarray: return self._pwr def initialize_adc_service(self) -> None: try: if self.adc_service is not None: self.adc_service.close() self.adc_service = rpyc.connect( self._adc_addr, self._adc_port) except ConnectionError as ce: print(ce) print('Please check to ensure that the ADCDMA service is running.') sys.exit(-1) status = self.adc_service.root.initialize() if status is not True: print("Unable to initialize the Red Pitaya board") return self.sample_data_updated.connect(self.update_sample_data) self.sample_data_complete.connect(self.sample_complete) def alloc_chan(self, chan_id: int) -> None: if self.adc_chan is None: self.adc_chan = self.adc_service.root.allocate_channel( chan_id, 1, 0.0, adc_dc_offset(chan_id), buffer_size=32*1024, chunk_size=8*1024, dtype='F', double_buffer=False, prefill_buffers=False, sample_callback_fn=self.sweep_sample_cb ) def free_chan(self) -> None: if self.adc_chan is not None: self.adc_service.root.free_channel(self.adc_chan.chan_id) self.adc_chan = None @property def is_busy(self) -> bool: return self._busy def initiate_sweep(self, chan_id: int, start_freq: float, stop_freq: float, rbw: float) -> None: if self.adc_chan is None: print("An ADC channel must be allocated using alloc_chan") return None if self.is_busy: print("ADC is currently busy acquiring sample data") return None self._busy = True self._sample_freq = np.array([]) self._sample_pwr = np.array([]) self._sweep_params = configure(start_freq, stop_freq, rbw=rbw) lo_list = np.arange(self._sweep_params.f_start, self._sweep_params.f_stop, self._sweep_params.sample_span) self._sample_size = len(lo_list) self._sample_count = 0 # Call the remote ADCDMA fft_sweep method asynchronously. # This then allows us to actively poll the local RPyC service # which is necessary for the remote callback invocations (Events) # to be delivered. For more information look at the bottom of: # https://rpyc.readthedocs.io/en/latest/tutorial/tut5.html#tut5-events async_fft_sweep = rpyc.async_(self.adc_chan.fft_sweep) res = async_fft_sweep(start_freq, stop_freq, rbw, adc_conv_factor(chan_id)) while not res.ready: time.sleep(0.01) self.adc_service.poll_all() def sweep_sample_cb(self, freq, pwr, num, count): self._sample_freq = np.concatenate((self._sample_freq, np.array(freq))) self._sample_pwr = np.concatenate((self._sample_pwr, np.array(pwr))) self.sample_data_updated.emit(num, count) if num == count-1: self._busy = False self.sample_data_complete.emit() @pyqtSlot(int, int) def update_sample_data(self, sample_number: int, sample_count: int) -> None: print(f"Sample data updated, {sample_number=}, {sample_count=}") print(f"{len(self._sample_pwr)=}") @pyqtSlot() def sample_complete(self): print("Sample acquisition complete") print(f"{len(self._sample_pwr)=}") rate_correct = rate_correction( self.adc_chan.chan_id, self._sweep_params.decimation_rate) avg_corr = avg_cpg_correction(self._sweep_params) self._freq, self._pwr = resample_spectrum(self._sample_freq, self._sample_pwr) self._pwr = rms_volts_sq_to_dB(self._pwr, rate_correct=rate_correct, avg_corr=avg_corr) print(f"{len(self._pwr)=}") class SweepTestService(rpyc.Service): def __init__(self, app): super().__init__() self._app = app @property def app(self) -> SweepTestApp: return self._app def initialize(self) -> None: self._app.initialize_adc_service() def allocate_channel(self, chan_id: int) -> None: self._app.alloc_chan(chan_id) def free_channel(self) -> None: self._app.free_chan() def initiate_sweep(self, chan_id: int, start_freq: float, stop_freq: float, rbw: float) -> None: self._app.initiate_sweep(chan_id, start_freq, stop_freq, rbw) class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("SweepTest rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname = self._host, port = self._port, protocol_config = { 'allow_all_attrs': True, 'allow_setattr': True, 'allow_getattr': True}) self._server.start() self.finished.emit() def main(): global server_thread default_adcaddr = "192.168.0.155" default_adcport = 18900 default_svraddr = "127.0.0.1" default_svrport = 19010 parser = ArgumentParser(description="Test for adcdma frequency sweeps.") parser.add_argument("-a", "--adcaddr", default=default_adcaddr, help=f"IP address for the ADC DMA server instance ({default_adcaddr})") parser.add_argument("-p", "--adcport", default=default_adcport, type=int, help=f"TCP port for the ADC DMA server instance ({default_adcport})") parser.add_argument("-A", "--ipaddr", default=default_svraddr, help=f"IP address for to bind the RPyC server instance ({default_svraddr})") parser.add_argument("-P", "--port", default=default_svrport, type=int, help=f"TCP port for the RPyC server instance ({default_svrport})") args = parser.parse_args() # This ensures that Cntl-C will work as expected: signal.signal(signal.SIGINT, signal.SIG_DFL) app = QCoreApplication(sys.argv) loop = QEventLoop(app) loop.set_default_executor(QThreadExecutor(1)) asyncio.set_event_loop(loop) test_app = SweepTestApp(args.adcaddr, args.adcport) server_thread = QThread() server = RPyCServer(SweepTestService(test_app), args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() sys.exit(app.exec_()) if __name__ == '__main__': main()
Narrow band sweeps
Figure 6: Narrow band sweep strategy
The simplest case occurs when the channel is configured as DIRECT
then
\(\textrm{FE}_{min} = \textrm{ADC}_{min}\) and
\(\textrm{FE}_{max} = \textrm{ADC}_{max}\) so \(\textrm{FE}_{LO} = 0\).
For the other channel types, \(\textrm{FE}_{LO}\) must be chosen so that the ADC effective frequency range will contain the specified down converted frequency span, \(\textrm{f}_{adc\_start}\) to \(\textrm{f}_{adc\_stop}\). Initially, \(\textrm{FE}_{LO}\) is set to:
where \(\textrm{f}_{span} = \textrm{f}_{stop} - \textrm{f}_{start}\) and \(\textrm{ADC}_{centre} = \textrm{ADC}_{min} + (\textrm{ADC}_{max} + \textrm{ADC}_{min})/2\).
In general we can write:
where \(0\le\textrm{f}_{offset}\le(\textrm{ADC}_{centre} - \textrm{f}_{span}/2)\) so that:
If \(\textrm{f}_{span}\) is substantially less than the full ADC effective
frequency range then there will be some scope for varying
\(\textrm{FE}_{LO}\). This is useful for channels with the SINGLE
configuration for the purposes of identifying images in the spectrum.
Estimating the power spectrum
The heavy lifting is done by the Python scipy.signal.welch function.
HFT144D = ('general_cosine', [1, 1.96760033, 1.57983607, 0.81123644, 0.22583558, 0.02773848, 0.00090360]) W_3DB = 4.4697
def power_spectrum(self, sweep_params: SweepParameters): adc_conv_factor = 1.0 / AdcCalibration.adc_to_volts(self.adc_chan_id) lo_list = np.arange( sweep_params.f_start, sweep_params.f_stop, sweep_params.sample_span) start = time.time() # The local oscillator setting for each spectral segment must be offset # a little so as to eliminate the fourier DC components causing # spurious artifacts in the final spectrum. The SweepParameters.lo_offset # is used to compute the offset. try: async_sweep = rpyc.async_(self.adc_chan.sweep) res = async_sweep( lo_list - sweep_params.lo_offset, sweep_params.decimation_rate, sweep_params.nperseg + sweep_params.lo_offset_bins) self._mutex.unlock() while not res.ready: # Hack to get the thread to wait n millisecs # https://doc.qt.io/qt-6/qtest.html#qWait QtTest.QTest.qWait(5) if not self.running: self._mutex.lock() return (None, None, None) self._mutex.lock() samples = res.value except TimeoutError: print("Timeout waiting for sweep data.") print("Stopping ADC thread.") self.stop() return (None, None, None) duration = time.time() - start print(f'acquisition took {duration:.4f} seconds') if sweep_params.fft_bins >= sweep_params.sample_bins: sample_sizes = [ sweep_params.sample_bins for i in range(sweep_params.lo_count)] sample_sizes[-1] = sweep_params.fft_bins % sweep_params.sample_bins else: sample_sizes = [sweep_params.fft_bins+2] print("----------------") print(f"sample_span: {sweep_params.sample_span}") print(f"nperseg: {sweep_params.nperseg}") print(f"bin_width: {sweep_params.bin_width}") print(f"lo_offset_bins: {sweep_params.lo_offset_bins}") print(f"decimation_rate: {sweep_params.decimation_rate}") print(f"lo_offset: {sweep_params.lo_offset}") print(f"sample_bins: {sweep_params.sample_bins}") print(f"fft_bins: {sweep_params.fft_bins}") print(f"{sample_sizes=}") print(f"{lo_list=}") print(f"sample item len: {len(samples[0])}") freq = np.array([]) pwr = np.array([]) iq_data = np.array([], dtype='F') start = time.time() for i, sample in samples.items(): sample_lo = lo_list[i] sample_arr = np.array(sample) scaled_data = (adc_conv_factor*sample_arr.real + (1j)*adc_conv_factor*sample_arr.imag) iq_data = np.concatenate((iq_data, scaled_data)) f, Pxx_den = signal.welch( scaled_data, sweep_params.fs, detrend='linear', noverlap=sweep_params.nperseg*0.8, average='median', window=sweep_params.window, return_onesided=False, scaling='spectrum') freq = np.concatenate( (freq, (sample_lo + f[:sample_sizes[i]] / 1e6))) # The LO is shifted when the sweep data is captured. # That shift is compensated for here. offset_bins = sweep_params.lo_offset_bins pwr = np.concatenate( (pwr, Pxx_den[offset_bins:(sample_sizes[i] + offset_bins)])) duration = time.time() - start print(f'fft took {duration:.4f} seconds') return (freq, pwr, iq_data) <<detector>> <<peak-detector>>
Detectors
def detector(self, detect_type, freq, pwr, bins): """Resample a power spectrum. :param detect_type: The detector type to use when resampling. One of DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_NORMAL. :type detect_type: int. :param freq: Spectrum frequency array :type freq: nparray :param pwr: Spectrum power array :type pwr: nparray :param bins: The number of bins required in the resampled spectrum. :type bins: int """ def max_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.max() def min_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.min() def sample_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return np.take(a, a.size // 2) f_start = freq[0] f_stop = freq[-1] window_size = floor(len(freq)/bins) j = k = window_size//2 if window_size % 2: k += 1 resampled_freqs = np.linspace(f_start, f_stop, bins) indexes = [(np.abs(freq-f)).argmin() for f in resampled_freqs] resampled_pwr = [] if detect_type == DETECTOR_TYPE_POS: for i in indexes: resampled_pwr.append(max_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NEG: for i in indexes: resampled_pwr.append(min_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_SAMPLE: for i in indexes: resampled_pwr.append(sample_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NORMAL: is_peak = self.peak_detect(pwr, indexes, window_size) for i in range(bins): if is_peak[i]: resampled_pwr.append(max_pwr(pwr, indexes[i])) else: if i % 2: # Odd resampled_pwr.append(min_pwr(pwr, indexes[i])) else: resampled_pwr.append(max_pwr(pwr, indexes[i])) return (np.array(resampled_freqs), np.array(resampled_pwr))
Finding peaks
Peaks are found by first finding the mean and std. dev. of the spectrum power values in each resampling window (bin). The values of the lowest NOISE_PERCENTILE means and std. devs. are then calculated. If both the mean and std. dev. of a given resampled spectrum bin is greater than the MEAN_THRESHOLD and STD_THRESHOLD respectively then the bin is marked as a peak otherwise it is marked as noise.
def peak_detect(self, pwr, resampled_idx, window_size): """Find peaks within a spectrum prior to resampling. :param pwr: A numpy array containing the spectrum power values as computed by the :py:meth:`power_spectrum` function. :type pwr: nparray :param resampled_idx: Array indexes for the resampled spectrum :type resampled_idx: List[int] :param window_size: The window size for the resampled spectrum :type window_size: int :return: A numpy array of bool values. Each value corresponds to a value in the resampled spectrum and is True if the associated power value is a peak and False if the value is noise. """ NOISE_PERCENTILE = 50 STD_THRESHOLD = 4.0 MEAN_THRESHOLD = 4.0 def stats(arr): return np.mean(arr), np.std(arr) j = k = window_size//2 if window_size % 2: k += 1 means = [] stds = [] for i in resampled_idx: if i - j < 0: m, s = stats(pwr[0:k]) elif i + k >= len(pwr): m, s = stats(pwr[i-j:]) else: m, s = stats(pwr[i-j:i+k]) means.append(m) stds.append(s) baseline_mean = np.percentile(means, NOISE_PERCENTILE) baseline_std = np.percentile(stds, NOISE_PERCENTILE) bins = len(resampled_idx) if baseline_std == 0 or baseline_mean == 0: is_peak = np.ones(bins, dtype=bool) else: is_peak = np.where( ((stds/baseline_std > STD_THRESHOLD) & (means/baseline_mean > MEAN_THRESHOLD)), np.ones(bins, dtype=bool), np.zeros(bins, dtype=bool)) return is_peak
Frequency compensation
Frequency response compensation is carried out using frequency response characteristics previously measured using the Frequency response calibration and Noise floor calibration facilities.
The frequency response correction must take into account the noise floor of the spectrum. This is done by looking at the noise margin as calculated by subtracting the noise floor from the spectrum power values.
If a spectrum power value is under the noise floor (that is, if the margin is < 0) then no frequency response correction is applied.
If the noise margin is >0 but less than the mean of the frequency response correction across the spectrum then the correction is applied in proportional to the size of the margin but modulated with noise taken from a (truncated) normal distribution.
If the noise margin is greater than the mean of the frequency response correction then the full correction is applied.
def frequency_response_correction(self, pwr, freq, rbw, lo=None): """Calculate the frequency response correction for the spectrum. :param pwr: A numpy array containing the spectrum power values as computed by the :py:meth:`power_spectrum` function. :type pwr: nparray :param freq: A numpy array containing the frequencies of each of the spectrum power values. :type freq: nparray :param rbw: The prevailing resolution bandwidth when the sweep data was captured. :type rbw: float :param lo: :return: A numpy nparray containing the corrections to apply to the spectrum (in dB) """ if ((self.chan.apply_freq_resp_correction is False) or (self.fe.calibration_mode not in [NO_CAL, FREQRESP_CAL])): fresp_corr = np.zeros(len(freq)) else: fresp_corr_fn = self.fe.freq_response_correction(self.fe_chan_id) baseband_freq_corr_fn = self.fe.baseband_freq_response_correction( self.fe_chan_id) try: if self.fe.calibration_mode == FREQRESP_CAL: # If this method is being called in the frequency # response calibration mode then only the baseband # frequency response correction is used. baseband_freq_corr = baseband_freq_corr_fn(freq-lo) freq_corr = baseband_freq_corr else: # For other cases the frequency response correction will # be the sum of the baseband and front end corrections. freq_corr = fresp_corr_fn(freq) if self.adc_chan_type in [ChannelCapabilities.SINGLE, ChannelCapabilities.SUPERHET]: # For these channel capabilities additional front end # hardware is present. if lo is not None: # If an LO frequency has been specified then # calculate the baseband frequency values by # shifting the given spectrum frequencies. bb_freq = freq - lo else: bb_freq = freq baseband_freq_corr = baseband_freq_corr_fn(bb_freq) freq_corr += baseband_freq_corr except ValueError: #print(f'{freq=}') raise atten = self.chan.fe_atten + self.chan.atten gain = self.chan.fe_gain noise_floor = self.fe.noise_floor(self.fe_chan_id, rbw) + atten - gain noise_stddev = self.fe.noise_stddev(self.fe_chan_id, rbw) try: abs_freq_corr = np.abs(freq_corr) freq_corr_mean = np.mean(abs_freq_corr) noise_margin = pwr - noise_floor def calculate_multiplier(margin): if margin < 0.0: return 0.0 elif margin < freq_corr_mean: return ( margin + truncnorm.rvs(-noise_stddev, noise_stddev, size=1)[0] ) / freq_corr_mean else: return 1.0 multiplier = np.array( [calculate_multiplier(m) for m in noise_margin]) fresp_corr = freq_corr * multiplier except ValueError as ve: print(ve) print(f'{freq=}') return fresp_corr
Analyzer Calibration
Calibration of the analyzer is divided into the following parts:
-
Base calibration. This consists of:
Measuring the ADC input DC offset (The ADC input DC offset).
Measuring the conversion factor which takes the ADC raw readings and returns voltages (The ADC code to voltage conversion)
Measuring the parameters required to compensate for the CIC decimator gain variations (Correcting for CIC decimator gain variations).
Frequency response calibration (Frequency response calibration).
Noise floor calibration (Noise floor calibration).
Running the calibration
def run_calibration(self): print('run_calibration:') if self.cal_state == AdcThread.CAL_START: print(' CAL_START') time.sleep(1) self.cal_state_changed.emit(AdcThread.CALIBRATING) elif self.cal_state == AdcThread.CALIBRATING: print(' CALIBRATING') self.running = True self.cal_dc_offset() self.cal_adc_to_volts() self.cal_base_rate_corrections() self.running = False self.cal_state_changed.emit(AdcThread.CAL_COMPLETE) <<cal-dc-offset>> <<cal-adc-to-volts>> <<cal-base-rate-corrections>>
class CalibrationDialog(QDialog): CAL_DEVICES = { 'SMHU58': SMHU58_GPIBID, 'DSG815': DSG815_ID, 'RFGen': '127.0.0.1:18864', 'DG4162': DG4162_ID, 'DDSGEN': '127.0.0.1:18862', } def __init__(self, app: QMainWindow, cal_devices: Optional[Dict] = None) -> None: super().__init__(app) self._app = app self._cal_devices = self.valid_devices() if cal_devices is not None: self._cal_devices = cal_devices self._caldev = None self._caldev_addr = None self._fbox = None self.build_ui() self.setWindowTitle("Base Calibration") @property def calibration_device(self): return self._caldev @property def calibration_device_addr(self): return self._caldev_addr @property def dialog_form(self): return self._fbox def valid_devices(self): chan_id = self._app.selected_chan.fe_chan_id min_freq = self._app.frontend.capabilities[chan_id].freq.min max_freq = self._app.frontend.capabilities[chan_id].freq.max name_list = [] for dev_name in CalibrationDialog.CAL_DEVICES.keys(): dev_class = SIGGEN_MODELS[dev_name] if ((dev_class.MIN_FREQUENCY <= min_freq) and (dev_class.MAX_FREQUENCY >= max_freq)): name_list.append(dev_name) return {name: CalibrationDialog.CAL_DEVICES[name] for name in name_list} def caldev_changed(self, combo: QComboBox, idx: int) -> None: self._caldev = combo.itemText(idx) self._caldev_addr = str(self._cal_devices[self._caldev]) self._caldev_addr_ledit.setText(self._caldev_addr) def caldev_addr_changed(self): self._caldev_addr = self._caldev_addr_ledit.text() def build_ui(self): outer_vbox = QVBoxLayout() hbox = QHBoxLayout() hbox3 = QHBoxLayout() msg_text = QLabel() msg_text.setTextFormat(Qt.RichText) msg_text.setText(f"""<b><ol><li>Select a calibration source device.</li><li>Ensure that the source device address is correct.</li><li>Connect the source device signal output to the {self._app.selected_chan.fe_chan_id} input.</li><li>Select 'Ok' to begin the calibration process or 'Cancel' to abandon it.</li></ol></b>""") hbox3.addWidget(msg_text) self._fbox = QFormLayout() caldev_combo = QComboBox() self._fbox.addRow(QLabel("Source Device:"), caldev_combo) self._caldev_addr_ledit = QLineEdit() self._caldev_addr_ledit.editingFinished.connect(self.caldev_addr_changed) self._fbox.addRow(QLabel("Source Addr:"), self._caldev_addr_ledit) hbox.addLayout(self._fbox) hbox2 = QHBoxLayout() self._bbox = QDialogButtonBox( QDialogButtonBox.Ok | QDialogButtonBox.Cancel) hbox2.addWidget(self._bbox) self._bbox.button(QDialogButtonBox.Ok).clicked.connect(self.calibrate) self._bbox.rejected.connect(self.close) caldev_combo.currentIndexChanged.connect( lambda idx, w=caldev_combo: self.caldev_changed(w, idx)) caldev_combo.addItems(self._cal_devices.keys()) outer_vbox.addLayout(hbox3) outer_vbox.addLayout(hbox) outer_vbox.addLayout(hbox2) self.setLayout(outer_vbox) def calibrate(self): QDialog.accept(self) print('FreqRespCalibrationDialog.calibrate') def close(self) -> None: QDialog.reject(self)
The ADC input DC offset
def cal_dc_offset(self): print('cal_dc_offset:') try: mgr = InstrumentMgr() sigsrc = mgr.open_instrument(self.cal_device, self.cal_device_addr) sigsrc.output = True sigsrc.level = -10.0 sigsrc.freq = AdcThread.CAL_FREQUENCY time.sleep(AdcThread.CAL_DELAY) buf = self.alloc_raw_adc_buffer() data = np.array(buf.get_next_chunk()) if self.adc_chan_id == 0: dc_offset = np.average(data[::2]) else: dc_offset = np.average(data[1::2]) self.free_raw_adc_buffer() mgr.close_instrument(sigsrc) mgr.close() except (InstrumentInitializeException, UnknownInstrumentModelException) as ie: print(ie) print(f' {dc_offset=}') AdcCalibration.CAL_CONFIG[self.adc_chan_id].dc_offset = -int(dc_offset)
The ADC code to voltage conversion
Calibration with RLP-40 anti-alias filter and PE43711 step attenuator (set to 0.0dB attenuation). Calibration is carried out at a signal frequency of 10 MHz using the DSG815 and the front SMA connectors are the reference plane. Voltages are measured at the RP board input SMA connectors. Note that saturation of the ADC takes place with an input power of between -1.0 and 0.0dBm. Measured peak voltage at some input power P (in dBm) is:
for a system impedance of 50 ohm.
So for P = -10dBm:
def measure_signal_amplitude(self): def sine_fn(t, a, b, phase): return a*np.sin(2*np.pi*b*t + phase) buf = self.alloc_raw_adc_buffer() data = np.array(buf.get_next_chunk()) dc_offset = AdcCalibration.dc_offset(self.adc_chan_id) if self.adc_chan_id == 0: chan_data = data[::2] + dc_offset else: chan_data = data[1::2] + dc_offset N = 200 Ts = 1.0/self.f_clk xdata = np.linspace(0, Ts*(N-1), N) ydata = chan_data[100:300] popt, pcov = optimize.curve_fit(sine_fn, xdata, ydata, p0=[1e4, AdcThread.CAL_FREQUENCY*1e6, 0]) self.free_raw_adc_buffer() return popt[0] def cal_adc_to_volts(self): print('cal_adc_to_volts:') try: mgr = InstrumentMgr() sigsrc = mgr.open_instrument(self.cal_device, self.cal_device_addr) sigsrc.output = True sigsrc.level = -9.9 sigsrc.freq = AdcThread.CAL_FREQUENCY time.sleep(AdcThread.CAL_DELAY) ampl_10 = self.measure_signal_amplitude() print(f' {ampl_10=}') mgr.close_instrument(sigsrc) mgr.close() except (InstrumentInitializeException, UnknownInstrumentModelException) as ie: print(ie) # The input filter and attenuator has insertion loss of 1.52 dB # at 10 MHz for both ADC channels. This means that an input power # of -10 dBm will be -11.52 at the ADC input connector. # Assuming an input impedance of exactly 50 ohms this produces # a peak input voltage of 83.946 mV. adc_to_volts = int(abs(ampl_10/0.083946)) print(f' {adc_to_volts=}') # Update the stored ADC -> volts conversion factor in the # calibration data AdcCalibration.CAL_CONFIG[self.adc_chan_id].adc_to_volts = adc_to_volts
Correcting for CIC decimator gain variations
The ADC design uses a variable rate CIC decimator followed by a FIR filter to compensate for the CIC frequency response characteristics. The CIC decimator is capable of decimation rates from 4 through to 8192. The gain of the CIC decimator will vary with the decimation rate. An empirical approach is taken to estimating the gain variation and measurements of the output power from the CIC decimator/FIR filter show that these deviations may be up to 6dB from the actual, true power value.
To explore these effects further focus turns to the expression for calculating the bit width of a CIC decimator output with full precision (eqn 3-7 from the Xilinx CIC Compiler 4.0 product guide):
where \(R\) is the decimation rate and \(N\), \(M\) and \(B\) (the number of decimator stages, the differential delay and the input sample precision respectively) are constants of the design. Closer examination of measured CIC decimator gain deviations indicates that they are proportional to the size of the fractional part of expression for \(B_{max}\) contained inside the ceiling operator. For convenience this is referred to as the \(B_{max}\) 'residue'. For example, the \(B_{max}\) residue for a decimation rate of 1019 is calculated as 0.04237 (to 5 decimal places) using:
from math import log2, ceil rate = 1019 N = 6 M = 1 B = 24 f_bmax = N * log2(M*rate) + B bmax = ceil(f_bmax) bmax_residue = bmax - f_bmax
Note that for decimation rates which are powers of 2 (4, 8, 16, 32...), the \(B_{max}\) residue is 0 and the deviations for these rates are 0. Table 5 shows examples:
Decimation rate |
Measured power (dBm) |
B(max) residue |
---|---|---|
1024 |
-10.292 |
0 |
1025 |
-16.261 |
0.991551 |
1026 |
-16.221 |
0.983110 |
1027 |
-16.171 |
0.974677 |
1028 |
-16.119 |
0.966253 |
1029 |
-16.069 |
0.957836 |
1030 |
-16.019 |
0.949428 |
1129 |
-11.233 |
0.155021 |
1130 |
-11.188 |
0.147358 |
1149 |
-10.321 |
0.003022 |
1150 |
-16.296 |
0.995491 |
To a good approximation there is a linear relationship between the \(B_{max}\) residue and the deviation between the measured and actual signal power. This relationship can be given as:
where \(D_T\) is the total power deviation at decimation rate
, \(\Delta D\)
is the rate of change of the deviation as a function of the \(B_{max}\)
residue, \(B_{res}\), and \(K\) is the change in \(B_{res}\) which gives rise
to a change \(\Delta D\) in the deviation.
For example, \(\Delta D\) can be estimated by dividing the total deviation between decimation rates 1024 and 1149 and dividing by the change in rate:
Similarly, \(K\) is estimated as:
The deviation for a decimation rate of 1129 can now be calculated as:
which gives a corrected power of 11.233 - 0.936 = 10.297. This is in close agreement with the measured power at decimation rate 1024 (a power of 2 with zero \(B_{max}\) residue).
Decimation range |
\(\Delta D\) |
\(K\) |
---|---|---|
4-8 |
1.4712 |
0.2451125 |
9-16 |
5.4889 |
0.9120185 |
17-32 |
2.7465 |
0.4560093 |
33-64 |
1.3731 |
0.2280046 |
65-128 |
0.6864264 |
0.1140023 |
129-256 |
0.3421029 |
0.05682066 |
257-512 |
0.1522719 |
0.02530085 |
513-1024 |
0.06048189 |
0.01004718 |
1025-2048 |
0.02693499 |
0.004474957 |
2049-4096 |
0.02138473 |
0.003552269 |
4097-8192 |
0.01069906 |
0.0017761345 |
def cal_base_rate_corrections(self): self.alloc_adc_chan() self._mutex.lock() # This is a dictionary of the power of 2 decimation rates # with their associated frequency spans in MHz. rates = {1: 10.0, 4: 6.0, 8: 2.5, 16: 1.2, 32: 0.58, 64: 0.29, 128: 0.145, 256: 0.0722, 512: 0.03605, 1024: 0.01801, 2048: 0.009, 4096: 0.0045, 8192: 0.00225} peak_pwrs = {} for rate, fspan in rates.items(): startf = AdcThread.CAL_FREQUENCY - (fspan/2) stopf = AdcThread.CAL_FREQUENCY + (fspan/2) sweep_params = self.configure(startf, stopf, reqd_decimation_rate=rate) self.adc_chan.lo_freq = sweep_params.f_start - sweep_params.lo_offset self.adc_chan.decimation_rate = sweep_params.decimation_rate self.adc_chan.chunk_size = sweep_params.chunk_size self.adc_chan.flush_buffers() raw_freq, raw_pwr, iq_data = self.power_spectrum(sweep_params) rate_correction = 0.0 avg_corr=self.avg_cpg_correction(sweep_params) freq, pwr = self.resample_spectrum(raw_freq, raw_pwr) pwr = self.rms_volts_sq_to_dB(pwr, rate_correct=rate_correction, avg_corr=avg_corr) peaks, _ = signal.find_peaks( pwr, prominence=SpectrumPlotWidget.PEAK_THRESHOLD, width=SpectrumPlotWidget.PEAK_WIDTH) sorted_peaks = sorted(peaks, key=lambda idx: pwr[idx], reverse=True) try: p = sorted_peaks[0] except IndexError: print("Calibration Error: No signal peaks found.") print(" Check calibration signal input.") self.cal_state = AdcThread.CAL_ERROR break else: peak_pwrs[rate] = pwr[p] self._mutex.unlock() self.free_adc_chan() if self.cal_state != AdcThread.CAL_ERROR: # Update the stored base rate corrections in the calibration data AdcCalibration.CAL_CONFIG[self.adc_chan_id].base_rate_correction = { rate:float(peak_pwrs[1]-pwr) for rate, pwr in peak_pwrs.items()} print(f'{AdcCalibration.CAL_CONFIG[self.adc_chan_id].base_rate_correction=}')
Frequency response calibration
Frequency response calibration is guided by the capabilities of the signal processing front end being used at the time of calibration.
Base band frequency response calibration is carried out when the dual DAC/ADC is configured for base band operation.
RF front end calibration is carried iut when the dual DAC/ADC is configured with front end RF signal processing hardware.
class FreqRespCalibrationDialog(CalibrationDialog): def __init__(self, app: QMainWindow, cal_devices: Optional[Dict] = None, cal_level = -30.0) -> None: self._cal_level = cal_level super().__init__(app, cal_devices) self.setWindowTitle("Freq. Response Calibration") @property def calibration_level(self): return self._cal_level def set_cal_level(self): val = self._caldev_lvl_box.lineEdit().text() self._cal_level = float(val.split()[0]) print(f'{self._cal_level=}') def build_ui(self): super().build_ui() self._caldev_lvl_box = QDoubleSpinBox() self._caldev_lvl_box.setDecimals(1) self._caldev_lvl_box.setSuffix(PWR_SUFFIX) self._caldev_lvl_box.setRange(AnalyzerApp.MIN_CAL_LEVEL, AnalyzerApp.MAX_CAL_LEVEL) self._caldev_lvl_box.lineEdit().editingFinished.connect( self.set_cal_level) self._caldev_lvl_box.setValue(self._cal_level) self.dialog_form.addRow(QLabel("Calibration Level:"), self._caldev_lvl_box)
class FreqResponseCalibrator(QObject): DELAY = 0.25 SWEEPS = 5 freqresp_cal_start = pyqtSignal() freqresp_cal_complete = pyqtSignal() def __init__(self, app: QObject, chan_id: str, caldev: str, caldev_addr: str, cal_level: float) -> None: super().__init__() self._app = app self._chan_id = chan_id self._ch = self._app.selected_chan self.cal_level = cal_level self.cal_span = 0.2 self.cal_data = {} self.freq_step = self._ch.fe.capabilities[chan_id].cal_freq.step self.start_freq = ( self._ch.fe.capabilities[chan_id].freq.min - (self.cal_span/2)) self.stop_freq = ( self._ch.fe.capabilities[chan_id].freq.max + (2*self.freq_step)) self._caldev = caldev self._caldev_addr = caldev_addr def save_settings(self, chan): self.centre_freq = chan.centre_freq self.freq_span = chan.freq_span self.reflevel = chan.reflevel self.fe_atten = chan.fe_atten def restore_settings(self, chan): chan.fe_atten = self.fe_atten chan.reflevel = self.reflevel chan.centre_freq = self.centre_freq chan.freq_span = self.freq_span def run(self): self.freqresp_cal_start.emit() self.save_settings(self._ch) self._ch.fe_atten = 0.0 try: self._ch.reflevel = 0.0 self._ch.freq_span = self.cal_span self._ch.invoke_single_sweep.emit() while self._ch.running is True: sleep(FreqResponseCalibrator.DELAY) self._ch.modify_marker_state.emit(1, SpectrumMarker.MKR_TYPE_NORMAL) mgr = InstrumentMgr() sigsrc = mgr.open_instrument(self._caldev, self._caldev_addr) sigsrc.output = True sigsrc.level = float(self.cal_level) sleep(FreqResponseCalibrator.DELAY) for f in np.arange(self.start_freq, self.stop_freq, self.freq_step): sigsrc.freq = float(f) self._ch.centre_freq = f sleep(FreqResponseCalibrator.DELAY) self._ch.invoke_sweep_count.emit(FreqResponseCalibrator.SWEEPS) sleep(FreqResponseCalibrator.DELAY) while self._ch.running is True: sleep(FreqResponseCalibrator.DELAY) self._ch.invoke_peak_search.emit(False) sleep(FreqResponseCalibrator.DELAY) freq, pwr = self._app.marker_posn() self.cal_data[freq] = self.cal_level - pwr mgr.close_instrument(sigsrc) mgr.close() except (InstrumentInitializeException, UnknownInstrumentModelException) as ie: print(ie) self.restore_settings(self._ch) self.freqresp_cal_complete.emit()
@pyqtSlot() def freqresp_calibrate(self): """Initiate a freq. response calibration. The user is shown a dialog window and asked to connect a signal source for calibration. Once this is done, the calibration device specification is passed to the actual front end calibration method. """ caldev, caldev_addr, cal_level = self.app.show_freqresp_calibration_dialog() if caldev is not None: self.calibrate_freqresp(self.fe_chan_id, caldev, caldev_addr, cal_level) def calibrate_freqresp(self, chan_id: str, caldev: str, caldev_addr: str, cal_level: float): """Run a freq. response calibration for the specified front end channel :param chan_id: The front end channel identifier. :type chan_id: str :param caldev: The signal source device to use for the calibration. This will be one of the devices listed in :py:`tam.SIGGEN_MODELS`. :type caldev: str :param caldev_addr: The address of the signal source device. The format for this will depend on the type of device being used and will be one of GPIB, VISA, or RPYC. :type caldev_addr: str :param cal_level: The calibration signal level (in dBm). :type cal_level: float """ msgBox = QMessageBox() msgBox.setIcon(QMessageBox.Information) msgBox.setWindowTitle("Freq. Response Calibration") msgBox.setTextFormat(Qt.RichText) msgBox.setText(f"""<p>Calibrating channel {self.fe_chan_id}</p><p>Press OK when ready.</p>""") msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel) returnValue = msgBox.exec() if returnValue == QMessageBox.Ok: self.calibrator = FreqResponseCalibrator( self.app, self.fe_chan_id, caldev, caldev_addr, cal_level) # Set the calibration mode after creating the FreqResponseCalibrator # instance so that the calibrator will access the correct # front end min and max frequency limits if self.adc_chan_type == ChannelCapabilities.DIRECT: self.fe.calibration_mode = BASEBAND_CAL else: self.fe.calibration_mode = FREQRESP_CAL self.calibrator_thread = QThread() self.calibrator.moveToThread(self.calibrator_thread) self.calibrator_thread.started.connect(self.calibrator.run) self.calibrator.freqresp_cal_start.connect( self.freqresp_cal_started) self.calibrator.freqresp_cal_complete.connect( self.freqresp_cal_complete) self.calibrator.freqresp_cal_complete.connect( self.calibrator_thread.quit) self.calibrator.freqresp_cal_complete.connect( self.calibrator.deleteLater) self.calibrator_thread.finished.connect( self.calibrator_thread.deleteLater) self.calibrator_thread.start() @pyqtSlot() def freqresp_cal_started(self): print('Starting freq. response calibration...') self.show_calibrating_msg() self.app.enable_controls(False) @pyqtSlot() def freqresp_cal_complete(self): print('Freq. response calibration complete.') self.fe.set_freq_resp_cal_data( self.fe_chan_id, self.calibrator.cal_data) print(', '.join([f'{freq:.2f}: {pwr:.3f}' for freq, pwr in self.calibrator.cal_data.items()])) self.fe.calibration_mode = NO_CAL self.fe.initialize_cal_data() self.remove_calibrating_msg() self.start(single_sweep=True) self.app.enable_controls(True)
Noise floor calibration
Noise floor calibration is guided by the capabilities of the signal processing front end being used at the time of calibration.
class NoiseFloorCalibrationDialog(QDialog): def __init__(self, app: QMainWindow) -> None: QDialog.__init__(self, app) self._app: AnalyzerApp = app self.setWindowTitle("Noise Floor Calibration") self.build_ui() def build_ui(self): outer_vbox = QVBoxLayout() hbox3 = QHBoxLayout() msg_text = QLabel() msg_text.setTextFormat(Qt.RichText) msg_text.setText(f"""<b><ol><li>Please ensure that the ADC input port for {self._app.selected_chan.fe_chan_id} is terminated with a 50 ohm load.</li><li>Select 'Ok' to begin the calibration process or 'Cancel' to abandon it.</li></ol></b>""") hbox3.addWidget(msg_text) hbox2 = QHBoxLayout() self._bbox = QDialogButtonBox( QDialogButtonBox.Ok | QDialogButtonBox.Cancel) hbox2.addWidget(self._bbox) self._bbox.button(QDialogButtonBox.Ok).clicked.connect(self.calibrate) self._bbox.rejected.connect(self.close) outer_vbox.addLayout(hbox3) outer_vbox.addLayout(hbox2) self.setLayout(outer_vbox) def calibrate(self): QDialog.accept(self) print('NoiseFloorCalibrationDialog.calibrate') def close(self) -> None: QDialog.reject(self)
class NoiseFloorCalibrator(QObject): DELAY = 0.1 RBWS = [ (400_000, RP_ADC_MAX_SPAN), (200_000, 50.0), (100_000, 25.0), (50_000, 12.0), (20_000, 5.0), (10_000, 2.5), (5000, 1.2), (2000, 0.5), (1000, 0.25), (500, 0.12), (200, 0.05), (100, 0.025), (50, 0.012), (20, 0.005), (10, 0.0025), (5, 0.0012), (2, 0.0005), (1, 0.0005)] AVERAGES = 40 noisefloor_cal_start = pyqtSignal() noisefloor_cal_complete = pyqtSignal() def __init__(self, app: QObject, chan_id: str) -> None: super().__init__() self._app = app self._chan_id = chan_id self._ch = self._app.selected_chan self.cal_data = {} chan_capability = self._ch.fe.capabilities[chan_id] self._measure_freq = (chan_capability.freq.max - chan_capability.freq.min)/2 def save_settings(self, chan): self.centre_freq = chan.centre_freq self.freq_span = chan.freq_span self.reflevel = chan.reflevel self.fe_atten = chan.fe_atten self.auto_rbw = chan.auto_rbw self.rbw = chan.rbw def restore_settings(self, chan): chan.rbw = self.rbw chan.auto_rbw = self.auto_rbw chan.fe_atten = self.fe_atten chan.reflevel = self.reflevel chan.centre_freq = self.centre_freq chan.freq_span = self.freq_span def run(self): print("NoiseFloorCalibrator.run") self.noisefloor_cal_start.emit() self.save_settings(self._ch) self._ch.fe_atten = 0.0 self._ch.reflevel = -60.0 self._ch.centre_freq = self._measure_freq self._ch.auto_rbw = False self._ch.rbw = NoiseFloorCalibrator.RBWS[0][0] self._ch.freq_span = NoiseFloorCalibrator.RBWS[0][1] print(f" {self._ch.rbw=}, {self._ch.freq_span=}") self._ch.invoke_single_sweep.emit() while self._ch.running is True: sleep(NoiseFloorCalibrator.DELAY) self._ch.modify_marker_state.emit(1, SpectrumMarker.MKR_TYPE_NORMAL) self._ch.modify_trace_state.emit(1, TRACE_TYPE_AVG) self._ch.trace_average = NoiseFloorCalibrator.AVERAGES sleep(NoiseFloorCalibrator.DELAY) for rbw, span in NoiseFloorCalibrator.RBWS: self._ch.rbw = rbw self._ch.freq_span = span print(f" {self._ch.rbw=}, {self._ch.freq_span=}") self._ch.invoke_sweep_count.emit(NoiseFloorCalibrator.AVERAGES) sleep(NoiseFloorCalibrator.DELAY) while self._ch.running is True: sleep(1.0) freq, pwr = self._app.marker_posn() self.cal_data[rbw] = (pwr, self._ch.data_store.stddev) self._ch.modify_trace_state.emit(1, TRACE_TYPE_NORMAL) sleep(NoiseFloorCalibrator.DELAY) self.restore_settings(self._ch) self.noisefloor_cal_complete.emit()
@pyqtSlot() def cal_noise_floor(self): """Initiate calibration of the noise floor. """ print('cal_noise_floor') calibrate = self.app.show_noise_floor_calibration_dialog() if calibrate is True: self.calibrate_noise_floor() def calibrate_noise_floor(self): """Run a calibration of the noise floor for the currently selected channel. """ print('calibrate_noise_floor') print(f' {self.fe_chan_id=}') self.calibrator = NoiseFloorCalibrator(self.app, self.fe_chan_id) self.fe.calibration_mode = BASEBAND_CAL self.calibrator_thread = QThread() self.calibrator.moveToThread(self.calibrator_thread) self.calibrator_thread.started.connect(self.calibrator.run) self.calibrator.noisefloor_cal_start.connect( self.noisefloor_cal_started) self.calibrator.noisefloor_cal_complete.connect( self.noisefloor_cal_complete) self.calibrator.noisefloor_cal_complete.connect( self.calibrator_thread.quit) self.calibrator.noisefloor_cal_complete.connect( self.calibrator.deleteLater) self.calibrator_thread.finished.connect( self.calibrator_thread.deleteLater) self.calibrator_thread.start() @pyqtSlot() def noisefloor_cal_started(self): print('Starting noise floor calibration...') self.show_calibrating_msg() self.app.enable_controls(False) @pyqtSlot() def noisefloor_cal_complete(self): print('Noise floor calibration complete.') self.fe.set_noise_floor_cal_data( self.fe_chan_id, self.calibrator.cal_data) print(', '.join([f'{rbw:.4f}: ({stats[0]:.3f}, {stats[1]:.3f})' for rbw, stats in self.calibrator.cal_data.items()])) self.fe.calibration_mode = NO_CAL self.fe.initialize_cal_data() self.remove_calibrating_msg() self.start(single_sweep=True) self.app.enable_controls(True)
Calibration data
For the 14-bit 125MHz Red Pitaya board:
{ "channels": { "Ch 1": { "dc_offset": -184.0, "adc_to_volts": 7917, "base_rate_correction": { "1": 0.0, "4": 0.001156807, "8": 0.0037240982, "16": 0.0019836426, "32": -0.000664711, "64": 0.0012378693, "128": 0.0045871735, "256": 0.0011301041, "512": 1.7166138e-05, "1024": 0.0020961761, "2048": 0.0018911362, "4096": 0.0032072067, "8192": -0.0024461746} }, "Ch 2": { "dc_offset": -83.0, "adc_to_volts": 7859, "base_rate_correction": { "1": 0.0, "4": -0.000869751, "8": -0.0017118454, "16": -0.001282692, "32": -0.002169609, "64": -0.0013494492, "128": -0.00020980835, "256": 0.00016212463, "512": -0.00094127655, "1024": -0.002076149, "2048": -0.00088882446, "4096": -0.0030088425, "8192": -0.0030851364 } } }, "noise_floor": { "Ch 1": { "54.500": [-76.691, 1.750], "50.000": [-76.649, 1.581], "40.000": [-77.099, 1.562], "30.000": [-78.586, 1.370], "20.000": [-80.378, 1.384], "15.000": [-81.723, 1.697], "10.000": [-82.995, 2.107], "8.000": [-84.270, 2.456], "5.000": [-84.026, 1.367], "2.000": [-87.675, 1.397], "1.000": [-90.423, 1.371], "0.500": [-93.593, 1.393], "0.200": [-96.758, 1.382], "0.100": [-100.026, 1.381], "0.050": [-102.391, 1.385], "0.020": [-109.102, 1.371], "0.010": [-117.922, 1.374], "0.003": [-109.625, 1.460] }, "Ch 2": { "54.500": [-76.196, 1.700], "50.000": [-76.600, 1.423], "40.000": [-77.867, 1.403], "30.000": [-78.935, 1.388], "20.000": [-80.762, 1.400], "15.000": [-81.796, 1.702], "10.000": [-84.051, 2.103], "8.000": [-84.154, 2.459], "5.000": [-83.262, 1.406], "2.000": [-87.594, 1.388], "1.000": [-90.164, 1.369], "0.500": [-93.407, 1.377], "0.200": [-96.295, 1.384], "0.100": [-99.647, 1.388], "0.050": [-102.324, 1.389], "0.020": [-108.650, 1.372], "0.010": [-117.568, 1.361], "0.003": [-109.431, 1.431] } }, "freq_response": { "Ch 1": { "0.40": -0.256, "0.90": -0.123, "1.40": -0.137, "1.90": -0.164, "2.40": -0.121, "2.90": -0.148, "3.40": -0.082, "3.90": -0.044, "4.40": -0.022, "4.90": -0.023, "5.40": -0.031, "5.90": -0.045, "6.40": -0.068, "6.90": -0.076, "7.40": -0.072, "7.90": -0.054, "8.40": -0.027, "8.90": -0.007, "9.40": 0.009, "9.90": 0.027, "10.40": 0.042, "10.90": 0.050, "11.40": 0.068, "11.90": 0.067, "12.40": 0.080, "12.90": 0.094, "13.40": 0.104, "13.90": 0.122, "14.40": 0.116, "14.90": 0.119, "15.40": 0.125, "15.90": 0.122, "16.40": 0.134, "16.90": 0.137, "17.40": 0.141, "17.90": 0.130, "18.40": 0.131, "18.90": 0.136, "19.40": 0.152, "19.90": 0.173, "20.40": 0.197, "20.90": 0.208, "21.40": 0.221, "21.90": 0.232, "22.40": 0.269, "22.90": 0.282, "23.40": 0.294, "23.90": 0.309, "24.40": 0.330, "24.90": 0.338, "25.40": 0.367, "25.90": 0.373, "26.40": 0.382, "26.90": 0.406, "27.40": 0.418, "27.90": 0.430, "28.40": 0.454, "28.90": 0.450, "29.40": 0.475, "29.90": 0.484, "30.40": 0.491, "30.90": 0.511, "31.40": 0.517, "31.90": 0.540, "32.40": 0.543, "32.90": 0.552, "33.40": 0.569, "33.90": 0.575, "34.40": 0.594, "34.90": 0.610, "35.40": 0.616, "35.90": 0.620, "36.40": 0.635, "36.90": 0.643, "37.40": 0.645, "37.90": 0.654, "38.40": 0.668, "38.90": 0.665, "39.40": 0.677, "39.90": 0.687, "40.40": 0.685, "40.90": 0.679, "41.40": 0.690, "41.90": 0.685, "42.40": 0.694, "42.90": 0.691, "43.40": 0.698, "43.90": 0.691, "44.40": 0.698, "44.90": 0.693, "45.40": 0.700, "45.90": 0.694, "46.40": 0.689, "46.90": 0.692, "47.40": 0.704, "47.90": 0.703, "48.40": 0.709, "48.90": 0.714, "49.40": 0.725, "49.90": 0.742, "50.40": 0.763, "50.90": 0.783, "51.40": 0.804, "51.90": 0.840, "52.40": 0.874, "52.90": 0.926, "53.40": 0.975, "53.90": 1.034, "54.40": 1.112, "54.90": 1.194, "55.40": 1.286, "55.90": 1.381 }, "Ch 2": { "0.40": -0.255, "0.90": -0.225, "1.40": -0.146, "1.90": -0.161, "2.40": -0.131, "2.90": -0.164, "3.40": -0.099, "3.90": -0.062, "4.40": -0.037, "4.90": -0.040, "5.40": -0.052, "5.90": -0.062, "6.40": -0.086, "6.90": -0.099, "7.40": -0.090, "7.90": -0.061, "8.40": -0.038, "8.90": -0.023, "9.40": 0.003, "9.90": 0.010, "10.40": 0.029, "10.90": 0.049, "11.40": 0.065, "11.90": 0.084, "12.40": 0.087, "12.90": 0.094, "13.40": 0.115, "13.90": 0.113, "14.40": 0.112, "14.90": 0.124, "15.40": 0.138, "15.90": 0.140, "16.40": 0.140, "16.90": 0.148, "17.40": 0.141, "17.90": 0.147, "18.40": 0.151, "18.90": 0.144, "19.40": 0.174, "19.90": 0.183, "20.40": 0.207, "20.90": 0.230, "21.40": 0.243, "21.90": 0.264, "22.40": 0.280, "22.90": 0.299, "23.40": 0.321, "23.90": 0.327, "24.40": 0.351, "24.90": 0.358, "25.40": 0.386, "25.90": 0.412, "26.40": 0.413, "26.90": 0.438, "27.40": 0.449, "27.90": 0.475, "28.40": 0.474, "28.90": 0.488, "29.40": 0.515, "29.90": 0.534, "30.40": 0.534, "30.90": 0.553, "31.40": 0.563, "31.90": 0.584, "32.40": 0.592, "32.90": 0.600, "33.40": 0.621, "33.90": 0.627, "34.40": 0.630, "34.90": 0.649, "35.40": 0.653, "35.90": 0.670, "36.40": 0.681, "36.90": 0.692, "37.40": 0.692, "37.90": 0.706, "38.40": 0.713, "38.90": 0.718, "39.40": 0.726, "39.90": 0.732, "40.40": 0.725, "40.90": 0.736, "41.40": 0.735, "41.90": 0.741, "42.40": 0.738, "42.90": 0.747, "43.40": 0.742, "43.90": 0.747, "44.40": 0.745, "44.90": 0.738, "45.40": 0.746, "45.90": 0.740, "46.40": 0.746, "46.90": 0.749, "47.40": 0.752, "47.90": 0.752, "48.40": 0.759, "48.90": 0.779, "49.40": 0.797, "49.90": 0.818, "50.40": 0.838, "50.90": 0.869, "51.40": 0.912, "51.90": 0.955, "52.40": 1.003, "52.90": 1.064, "53.40": 1.130, "53.90": 1.211, "54.40": 1.294, "54.90": 1.389, "55.40": 1.502, "55.90": 1.616 } } }
For the 16-bit 122.88MHz SDR board:
{ "channels": { "Ch 1": { "dc_offset": 31, "adc_to_volts": 134321, "base_rate_correction": { "1": 0.0, "4": 0.225387, "8": 0.152197, "16": 0.130298, "32": 0.083155, "64": 0.084689, "128": 0.084476, "256": 0.148412, "512": 0.185514, "1024": 0.079623, "2048": 0.075906, "4096": 0.074382, "8192": 0.065472 } }, "Ch 2": { "dc_offset": 47, "adc_to_volts": 136966, "base_rate_correction": { "1": 0.0, "4": 0.6493232, "8": 0.154616, "16": 0.131082, "32": 0.084392, "64": 0.085693, "128": 0.087580, "256": 0.147904, "512": 0.183568, "1024": 0.079818, "2048": 0.076223, "4096": 0.074909, "8192": 0.068031 } } }, "noise_floor": { "Ch 1": { "400000": [-96.844, 4.913], "200000": [-99.515, 5.582], "100000": [-101.416, 5.593], "50000": [-104.877, 5.491], "20000": [-109.576, 5.519], "10000": [-111.158, 5.564], "5000": [-117.337, 5.562], "2000": [-118.675, 5.555], "1000": [-123.148, 5.537], "500": [-124.314, 5.497], "200": [-130.157, 5.600], "100": [-132.071, 5.537], "50": [-134.248, 5.533], "20": [-139.382, 5.617], "10": [-140.746, 5.595], "5": [-144.465, 5.560], "2": [-148.017, 5.526], "1": [-150.230, 5.526] }, "Ch 2": { "400000": [-91.365, 5.116], "200000": [-98.336, 5.495], "100000": [-103.596, 5.586], "50000": [-105.337, 5.540], "20000": [-109.331, 5.559], "10000": [-112.460, 5.617], "5000": [-114.616, 5.481], "2000": [-119.935, 5.527], "1000": [-122.643, 5.533], "500": [-123.989, 5.535], "200": [-130.619, 5.577], "100": [-130.507, 5.579], "50": [-136.482, 5.557], "20": [-139.912, 5.572], "10": [-140.932, 5.580], "5": [-142.238, 5.548], "2": [-147.861, 5.539], "1": [-149.752, 5.539] } }, "freq_response": { "Ch 1": { "0.40": 2.824, "0.90": 2.179, "1.40": 2.063, "1.90": 1.801, "2.40": 1.485, "2.90": 1.195, "3.40": 1.020, "3.90": 0.861, "4.40": 0.725, "4.90": 0.592, "5.40": 0.484, "5.90": 0.392, "6.40": 0.313, "6.90": 0.230, "7.40": 0.212, "7.90": 0.208, "8.40": 0.212, "8.90": 0.223, "9.40": 0.232, "9.90": 0.254, "10.40": 0.270, "10.90": 0.277, "11.40": 0.296, "11.90": 0.314, "12.40": 0.320, "12.90": 0.339, "13.40": 0.348, "13.90": 0.365, "14.40": 0.362, "14.90": 0.377, "15.40": 0.382, "15.90": 0.389, "16.40": 0.401, "16.90": 0.407, "17.40": 0.407, "17.90": 0.412, "18.40": 0.410, "18.90": 0.414, "19.40": 0.434, "19.90": 0.451, "20.40": 0.470, "20.90": 0.472, "21.40": 0.497, "21.90": 0.512, "22.40": 0.519, "22.90": 0.532, "23.40": 0.554, "23.90": 0.563, "24.40": 0.584, "24.90": 0.607, "25.40": 0.622, "25.90": 0.635, "26.40": 0.652, "26.90": 0.670, "27.40": 0.697, "27.90": 0.705, "28.40": 0.739, "28.90": 0.758, "29.40": 0.780, "29.90": 0.802, "30.40": 0.824, "30.90": 0.847, "31.40": 0.883, "31.90": 0.907, "32.40": 0.945, "32.90": 0.978, "33.40": 1.002, "33.90": 1.041, "34.40": 1.073, "34.90": 1.105, "35.40": 1.149, "35.90": 1.183, "36.40": 1.230, "36.90": 1.266, "37.40": 1.308, "37.90": 1.346, "38.40": 1.385, "38.90": 1.426, "39.40": 1.467, "39.90": 1.508, "40.40": 1.548, "40.90": 1.588, "41.40": 1.624, "41.90": 1.666, "42.40": 1.704, "42.90": 1.741, "43.40": 1.773, "43.90": 1.806, "44.40": 1.840, "44.90": 1.872, "45.40": 1.898, "45.90": 1.930, "46.40": 1.958, "46.90": 1.985, "47.40": 2.016, "47.90": 2.043, "48.40": 2.072, "48.90": 2.101, "49.40": 2.130, "49.90": 2.167, "50.40": 2.207, "50.90": 2.249, "51.40": 2.299, "51.90": 2.357, "52.40": 2.429, "52.90": 2.508, "53.40": 2.611, "53.90": 2.732, "54.40": 2.872, "54.90": 3.045, "55.40": 3.253, "55.90": 3.496 }, "Ch 2": { "0.40": 2.152, "0.90": 1.662, "1.40": 1.569, "1.90": 1.351, "2.40": 1.059, "2.90": 0.809, "3.40": 0.691, "3.90": 0.585, "4.40": 0.492, "4.90": 0.394, "5.40": 0.328, "5.90": 0.260, "6.40": 0.206, "6.90": 0.166, "7.40": 0.156, "7.90": 0.165, "8.40": 0.187, "8.90": 0.214, "9.40": 0.244, "9.90": 0.261, "10.40": 0.286, "10.90": 0.309, "11.40": 0.339, "11.90": 0.351, "12.40": 0.377, "12.90": 0.402, "13.40": 0.421, "13.90": 0.430, "14.40": 0.442, "14.90": 0.456, "15.40": 0.474, "15.90": 0.484, "16.40": 0.492, "16.90": 0.508, "17.40": 0.521, "17.90": 0.531, "18.40": 0.546, "18.90": 0.558, "19.40": 0.574, "19.90": 0.588, "20.40": 0.615, "20.90": 0.629, "21.40": 0.662, "21.90": 0.685, "22.40": 0.700, "22.90": 0.719, "23.40": 0.729, "23.90": 0.753, "24.40": 0.764, "24.90": 0.788, "25.40": 0.803, "25.90": 0.823, "26.40": 0.830, "26.90": 0.847, "27.40": 0.875, "27.90": 0.882, "28.40": 0.912, "28.90": 0.931, "29.40": 0.949, "29.90": 0.970, "30.40": 0.988, "30.90": 1.015, "31.40": 1.043, "31.90": 1.067, "32.40": 1.092, "32.90": 1.124, "33.40": 1.153, "33.90": 1.187, "34.40": 1.216, "34.90": 1.256, "35.40": 1.293, "35.90": 1.330, "36.40": 1.368, "36.90": 1.408, "37.40": 1.448, "37.90": 1.491, "38.40": 1.533, "38.90": 1.571, "39.40": 1.613, "39.90": 1.654, "40.40": 1.694, "40.90": 1.733, "41.40": 1.770, "41.90": 1.810, "42.40": 1.843, "42.90": 1.880, "43.40": 1.915, "43.90": 1.949, "44.40": 1.981, "44.90": 2.011, "45.40": 2.035, "45.90": 2.061, "46.40": 2.083, "46.90": 2.110, "47.40": 2.128, "47.90": 2.145, "48.40": 2.167, "48.90": 2.188, "49.40": 2.216, "49.90": 2.237, "50.40": 2.266, "50.90": 2.302, "51.40": 2.341, "51.90": 2.385, "52.40": 2.442, "52.90": 2.511, "53.40": 2.602, "53.90": 2.708, "54.40": 2.837, "54.90": 2.995, "55.40": 3.177, "55.90": 3.408 } } }
Applying the calibration
from typing import ( Dict, List, Tuple ) from math import ceil, floor, log2 from dataclasses import dataclass from scipy import interpolate from design import CIC_STAGES, CIC_DIFF_DELAY @dataclass class AdcChannelCalData: dc_offset: float adc_to_volts: float base_rate_correction: Dict rate_correction_coeffs: Dict class AdcCalibration(object): CAL_CONFIG = [ AdcChannelCalData( 31, 134321, {1: 0.0, 4: 0.225387, 8: 0.152197, 16: 0.130298, 32: 0.083155, 64: 0.084689, 128: 0.084476, 256: 0.148412, 512: 0.185514, 1024: 0.079623, 2048: 0.075906, 4096: 0.074382, 8192: 0.065472}, {1: (0.0, 1.0), 4: (1.4712, 0.2451125), 8: (1.4712, 0.2451125), 16: (5.4889, 0.9120185), 32: (2.7465, 0.4560093), 64: (1.3731, 0.2280046), 128: (0.6864264, 0.1140023), 256: (0.3421029, 0.05682066), 512: (0.1522719, 0.02530085), 1024: (0.06048189, 0.01004718), 2048: (0.02693499, 0.004474957), 4096: (0.02138473, 0.003552269), 8192: (0.01069906, 0.0017761345)}, ), AdcChannelCalData( 47, 136966, {1: 0.0, 4: 0.6493232, 8: 0.154616, 16: 0.131082, 32: 0.084392, 64: 0.085693, 128: 0.087580, 256: 0.147904, 512: 0.183568, 1024: 0.079818, 2048: 0.076223, 4096: 0.074909, 8192: 0.068031}, {1: (0.0, 1.0), 4: (1.4712, 0.2451125), 8: (1.4712, 0.2451125), 16: (5.4889, 0.9120185), 32: (2.7465, 0.4560093), 64: (1.3731, 0.2280046), 128: (0.6864264, 0.1140023), 256: (0.3421029, 0.05682066), 512: (0.1522719, 0.02530085), 1024: (0.06048189, 0.01004718), 2048: (0.02693499, 0.004474957), 4096: (0.02138473, 0.003552269), 8192: (0.01069906, 0.0017761345)}, ) ] POWERS_OF_2 = [1, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192] @classmethod def next_power_of_2(cls, r): if r in AdcCalibration.POWERS_OF_2: return r return 1 << floor(log2(r))+1 @classmethod def dc_offset(cls, ch): return cls.CAL_CONFIG[ch].dc_offset @classmethod def adc_to_volts(cls, ch): return cls.CAL_CONFIG[ch].adc_to_volts @classmethod def base_rate_correction(cls, ch, rate): return cls.CAL_CONFIG[ch].base_rate_correction[cls.next_power_of_2(rate)] @classmethod def rate_correction_coeffs(cls, ch, rate): return cls.CAL_CONFIG[ch].rate_correction_coeffs[cls.next_power_of_2(rate)] @classmethod def rate_correction(cls, ch, rate): def bmax_residue(r): f_bmax = CIC_STAGES * log2(CIC_DIFF_DELAY*r) return ceil(f_bmax) - f_bmax if rate == 1: return 0.0 coeffs = cls.rate_correction_coeffs(ch, rate) corr = (coeffs[0] * (bmax_residue(rate)/coeffs[1]) + cls.base_rate_correction(ch, rate)) return corr
Load and save calibration data
def set_cal_data(self, config): for idx, chan in enumerate( [BaseFrontEndApp.ADCCH1_NAME, BaseFrontEndApp.ADCCH2_NAME]): new_cal = config['channels'][chan] chan_cal = AdcCalibration.CAL_CONFIG[idx] chan_cal.dc_offset = int(new_cal['dc_offset']) chan_cal.adc_to_volts = new_cal['adc_to_volts'] chan_cal.base_rate_correction = { int(k): v for k, v in new_cal['base_rate_correction'].items()} BaseFrontEndApp.NOISE_FLOOR['channels'][chan] = { float(k): tuple(v) for k, v in config['noise_floor'][chan].items()} BaseFrontEndApp.FREQ_RESPONSE['channels'][chan] = { float(k): v for k, v in config['freq_response'][chan].items()} print(f'{AdcCalibration.CAL_CONFIG[idx]=}') @pyqtSlot() def load_cal_data(self): filter_str = "JSON Configuration (*.json)" def_filepath = '.' names, selfilter = QFileDialog.getOpenFileNames( self.app, 'Load Configuration', def_filepath, filter_str) if len(names): input_file = names[0] with open(input_file, 'r') as fd: config = json.load(fd) self.set_cal_data(config) @pyqtSlot() def save_cal_data(self): filter_str = "JSON Configuration (*.json)" def_filepath = '.' name, selfilter = QFileDialog.getSaveFileName( self.app, 'Save configuration', def_filepath, filter_str) output_file = str(name).strip() if len(output_file) == 0: return config = { "channels": { BaseFrontEndApp.ADCCH1_NAME: {}, BaseFrontEndApp.ADCCH2_NAME: {} }, "noise_floor": { BaseFrontEndApp.ADCCH1_NAME: {}, BaseFrontEndApp.ADCCH2_NAME: {} }, "freq_response": { BaseFrontEndApp.ADCCH1_NAME: {}, BaseFrontEndApp.ADCCH2_NAME: {} } } for idx, chan in enumerate( [BaseFrontEndApp.ADCCH1_NAME, BaseFrontEndApp.ADCCH2_NAME]): chan_cal = AdcCalibration.CAL_CONFIG[idx] conf_cal = config['channels'][chan] conf_cal['dc_offset'] = chan_cal.dc_offset conf_cal['adc_to_volts'] = chan_cal.adc_to_volts conf_cal['base_rate_correction'] = chan_cal.base_rate_correction config['noise_floor'][chan] = self.fe.channel(chan).noise_floor_cal_data config['freq_response'][chan] = self.fe.channel(chan).freq_resp_cal_data with open(output_file, 'w') as fd: json.dump(config, fd)
AdcDevice Class
from typing import ( Dict ) import sys import numpy as np from scipy import interpolate import rpyc class AdcDevice: CHANNEL_COUNT=2 def __init__(self, properties: Dict) -> None: self._adc_ip: str = properties['adc_ip'] self._adc_port: int = properties['adc_port'] self._adcdma_service = None self._adc_channels = [None, None] def initialize(self) -> None: try: if self._adcdma_service is not None: self._adcdma_service.close() self._adcdma_service = rpyc.connect(self._adc_ip, self._adc_port) except ConnectionError as ce: print(ce) print('Please check to ensure that the Red Pitaya board is') print('powered on and connected to the network.') sys.exit(-1) status = self._adcdma_service.root.initialize() if status is False: print('Unable to initialize the Red Pitaya board.') sys.exit(-2) def _alloc_raw_adc_buffer(self): return self._adcdma_service.root.allocate_raw_buffer() def _free_raw_adc_buffer(self) -> None: self._adcdma_service.root.free_raw_buffer() @property def f_clk(self) -> float: return self._adcdma_service.root.adc_clock @property def adc_resolution(self) -> int: return self._adcdma_service.root.adc_resolution def poll(self): self._adcdma_service.poll_all() def measure_dc_offset(self, adc_chan_id: int) -> float: buf = self._alloc_raw_adc_buffer() data = np.array(buf.get_next_chunk()) if adc_chan_id == 0: dc_offset = np.average(data[::2]) else: dc_offset = np.average(data[1::2]) self._free_raw_adc_buffer() return dc_offset def measure_signal_amplitude(self, adc_chan_id: int, signal_freq: float, dc_offset: float) -> float: """ """ def sine_fn(t, a, b, phase): return a*np.sin(2*np.pi*b*t + phase) buf = self._alloc_raw_adc_buffer() data = np.array(buf.get_next_chunk()) if adc_chan_id == 0: chan_data = data[::2] + dc_offset else: chan_data = data[1::2] + dc_offset N = 200 Ts = 1.0/self.f_clk xdata = np.linspace(0, Ts*(N-1), N) ydata = chan_data[100:300] popt, pcov = optimize.curve_fit(sine_fn, xdata, ydata, p0=[1e4, signal_freq*1e6, 0]) self._free_raw_adc_buffer() return popt[0] def alloc_adc_chan(self, adc_chan_id: int, dc_offset: float, sample_callback_fn=None): """ """ if adc_chan_id not in range(AdcDevice.CHANNEL_COUNT): print(f"ADC chan id, {adc_chan_id}, is out of range") return None adc_chan = self._adcdma_service.root.allocate_channel( adc_chan_id, decimation_rate=1, lo_freq=0.0, dc_offset=dc_offset, buffer_size=32*1024, chunk_size=8*1024, dtype='F', double_buffer=False, prefill_buffers=False, sample_callback_fn=sample_callback_fn) if adc_chan is None: print(f"Can't allocate ADC chan {adc_chan_id}") else: self._adc_channels[adc_chan_id] = adc_chan return adc_chan def free_adc_chan(self, adc_chan_id: int) -> None: """ """ try: adc_chan = self._adc_channels[adc_chan_id] if adc_chan is not None: self._adcdma_service.root.free_channel(adc_chan_id) self._adc_channels[adc_chan_id] = None except IndexError: print(f"ADC channel id, {adc_chan_id}, out of range") def adc_channel(self, adc_chan_id: int): return self._adc_channels[adc_chan_id] def set_channel_attenuation(self, adc_chan_id: int, att: float): self._adcdma_service.root.set_channel_attenuation(adc_chan_id, att)
Data Storage
from typing import ( Optional, List ) import dataclasses import numpy as np from PyQt5.QtCore import ( QObject, QRunnable, QThreadPool, pyqtSignal ) from data import SweepParameters, Spectrum from utils import ( TRACE_TYPE_NORMAL, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG, TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE, DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) from plot import SpectrumPlotWidget class HistoryBuffer: """Fixed-size NumPy array ring buffer""" def __init__(self, data_size, max_history_size, dtype=float): self.data_size = data_size self.max_history_size = max_history_size self.history_size = 0 self.counter = 0 self.buffer = np.empty(shape=(max_history_size, data_size), dtype=dtype) def append(self, data): """Append new data to ring buffer""" self.counter += 1 if self.history_size < self.max_history_size: self.history_size += 1 self.buffer = np.roll(self.buffer, -1, axis=0) self.buffer[-1] = data def get_buffer(self): """Return buffer stripped to size of actual data""" if self.history_size < self.max_history_size: return self.buffer[-self.history_size:] else: return self.buffer def __getitem__(self, key): return self.buffer[key] class Task(QRunnable): """Threaded task (run it with QThreadPool worker threads)""" def __init__(self, task, *args, **kwargs): super().__init__() self.task = task self.args = args self.kwargs = kwargs def run(self): """Run task in worker thread and emit signal with result""" result = self.task(*self.args, **self.kwargs) class DataStore(QObject): history_updated = pyqtSignal(object) data_updated = pyqtSignal(object) #average_updated = pyqtSignal(object) #peak_hold_max_updated = pyqtSignal(object) #peak_hold_min_updated = pyqtSignal(object) def __init__(self, max_history_size=100, averages=100): super().__init__() self.max_history_size = max_history_size self.thread_pool = QThreadPool() self.thread_pool.setMaxThreadCount(1) self._initialize_traces() self._averages = [averages for i in range(SpectrumPlotWidget.TRACE_COUNT)] self._stds = [0.0 for i in range(SpectrumPlotWidget.TRACE_COUNT)] self._average_counters = [0 for i in range(SpectrumPlotWidget.TRACE_COUNT)] self.reset() def _initialize_traces(self) -> None: self._initialize_spectra() self._trace_types = [ TRACE_TYPE_NORMAL, TRACE_TYPE_BLANK, TRACE_TYPE_BLANK, TRACE_TYPE_BLANK] self._selected_trace_id = 1 def _initialize_spectra(self) -> None: self._spectra: List[Spectrum] = [ Spectrum(None, None, None, None, None) for i in range(SpectrumPlotWidget.TRACE_COUNT)] def _reset_trace_data(self): """Reset power data in the currently selected trace. """ trace_idx = self.selected_trace_id - 1 self.wait() self._spectra[trace_idx].pwr = None self._average_counters[trace_idx] = 0 self._stds[trace_idx] = 0.0 def reset(self): """Reset all traces. """ self.wait() self._initialize_spectra() self.history = None def reset_trace(self): """Reset currently selected trace data only. """ trace_idx = self.selected_trace_id - 1 self.wait() spectrum = self._spectra[self.selected_trace_id - 1] spectrum.pwr = spectrum.freq = None self._average_counters[trace_idx] = 0 self._stds[trace_idx] = 0.0 @property def selected_trace_type(self) -> int: return self._trace_types[self.selected_trace_id - 1] @selected_trace_type.setter def selected_trace_type(self, t: int): trace_idx = self.selected_trace_id - 1 if t != self._trace_types[trace_idx]: self._trace_types[trace_idx] = t self.wait() self._average_counters[trace_idx] = 0 self._stds[trace_idx] = 0.0 @property def selected_trace_id(self) -> int: """The seleccted trace id. This is in the range 1..SpectrumPlotWidget.TRACE_COUNT. """ return self._selected_trace_id @selected_trace_id.setter def selected_trace_id(self, trace_id: int) -> None: self._selected_trace_id = trace_id @property def spectra(self) -> List[Spectrum]: return self._spectra @property def trace_types(self) -> List[int]: return self._trace_types @property def averages(self): return self._averages[self.selected_trace_id - 1] @averages.setter def averages(self, avg): trace_idx = self.selected_trace_id - 1 if avg != self._averages[trace_idx]: self._averages[trace_idx] = avg self._reset_trace_data() @property def freq(self) -> np.ndarray: return self._spectra[self.selected_trace_id - 1].freq @property def pwr(self) -> np.ndarray: return self._spectra[self.selected_trace_id - 1].pwr @property def sweep_params(self) -> SweepParameters: return self._spectra[self.selected_trace_id - 1].parameters @property def average_counter(self) -> int: return self._average_counters[self.selected_trace_id - 1] @property def stddev(self) -> float: trace_idx = self.selected_trace_id - 1 if self._average_counters[trace_idx] == 0: return 0.0 else: return self._stds[trace_idx] / self._average_counters[trace_idx] def valid(self): if self.freq is None or self.pwr is None: return False else: return True def wait(self): """Wait for worker threads to complete all tasks """ self.thread_pool.waitForDone() def start_task(self, fn, *args, **kwargs): """Run function asynchronously in worker thread""" task = Task(fn, *args, **kwargs) self.thread_pool.start(task) def update(self, data: Spectrum): self.update_history(data.pwr.copy()) self.start_task(self.update_data, data) def update_data(self, data: Spectrum): trace_idx = self.selected_trace_id - 1 if self.selected_trace_type == TRACE_TYPE_NORMAL: if data.iq_data is not None: iq_data = data.iq_data.copy() else: iq_data = None self._spectra[trace_idx] = Spectrum( data.freq.copy(), data.pwr.copy(), iq_data, dataclasses.replace(data.parameters), data.timestamp ) elif self.selected_trace_type == TRACE_TYPE_AVG: spectrum = self._spectra[trace_idx] if spectrum.freq is None: spectrum.freq = data.freq.copy() if self._average_counters[trace_idx] == 0: spectrum.pwr = data.pwr.copy() else: spectrum.pwr = np.average((spectrum.pwr, data.pwr), axis=0, weights=(self._average_counters[trace_idx], 1)) if self._average_counters[trace_idx] < self._averages[trace_idx]: self._average_counters[trace_idx] += 1 self._stds[trace_idx] += np.std(data.pwr) elif self.selected_trace_type == TRACE_TYPE_MAX: spectrum = self._spectra[trace_idx] if spectrum.freq is None: spectrum.freq = data.freq.copy() if spectrum.pwr is None: spectrum.pwr = data.pwr.copy() else: spectrum.pwr = np.maximum(spectrum.pwr, data.pwr) elif self.selected_trace_type == TRACE_TYPE_MIN: spectrum = self._spectra[trace_idx] if spectrum.freq is None: spectrum.freq = data.freq.copy() if spectrum.pwr is None: spectrum.pwr = data.pwr.copy() else: spectrum.pwr = np.minimum(spectrum.pwr, data.pwr) elif self.selected_trace_type == TRACE_TYPE_FREEZE: pass elif self.selected_trace_type == TRACE_TYPE_BLANK: pass else: print(f'update_data: unknown trace_type, {self.selected_trace_type=}') spectrum.pwr = data.pwr self.data_updated.emit(self) def update_history(self, pwr): """Update spectrum measurements history""" if self.history is None: self.history = HistoryBuffer(len(pwr), self.max_history_size) self.history.append(pwr) self.history_updated.emit(self)
from typing import ( Optional, Dict ) from dataclasses import dataclass from math import ceil, floor from datetime import datetime import numpy as np import scipy from design import ( CIC_BANDPASS, ADC_122_16_F_CLK, HFT144D, BIN_COUNT, CHAN_BUFSIZE, MIN_DECIMATION_RATE ) @dataclass class SweepParameters(): cic_bandpass = CIC_BANDPASS offset_bins = 5 displayed_bin_count = 1000 # Start and stop frequencies in MHz. f_start: float f_stop: float # Resolution bandwidth in MHz. res_bw: Optional[float] = None reqd_decimation_rate: Optional[int] = None fe_lo: float = 0.0 s1: float = 0.0 s2: float = 0.0 window_type: str = 'HFT144D' window_w3db: float = 4.4697 f_clk: float = ADC_122_16_F_CLK _window = None @property def f_span(self): return self.f_stop - self.f_start @property def initial_bin_width(self): """The bandwidth of each bin in the FFT spectrum (in Hz) """ if self.res_bw is not None: return (self.res_bw*1e6) / self.window_w3db else: return (self.f_span*1e6)/SweepParameters.displayed_bin_count @property def bin_count(self): if self.initial_bin_width > 10000.0: multiplier = 1 if self.initial_bin_width > 5000.0: multiplier = 2 elif self.initial_bin_width > 1000.0: multiplier = 3 elif self.initial_bin_width > 100.0: multiplier = 4 elif self.initial_bin_width > 20.0: multiplier = 6 elif self.initial_bin_width > 10.0: multiplier = 8 elif self.initial_bin_width > 1.0: multiplier = 15 else: multiplier = 30 return self.displayed_bin_count * multiplier @property def bin_width(self): return self.fs/self.nperseg @property def decimation_rate(self): def estimate_rate(): N = self.bin_count R = self.f_clk / (N * 2 * self.initial_bin_width) if R < MIN_DECIMATION_RATE: R = 1 else: R *= SweepParameters.cic_bandpass return R if self.reqd_decimation_rate is not None: return self.reqd_decimation_rate R = estimate_rate() # Adjust the sample size to make the decimation rate an integer value N = self.bin_count delta_R = R - round(R) delta_N = delta_R * N * N * (self.initial_bin_width/self.f_clk) actual_bin_count = N + delta_N R = self.f_clk / (actual_bin_count * 2 * self.initial_bin_width) if R < MIN_DECIMATION_RATE: rate = 1 else: rate = round(R * SweepParameters.cic_bandpass) if rate < MIN_DECIMATION_RATE: rate = 1 return rate @property def fs(self): return self.f_clk/self.decimation_rate @property def fft_bins(self): return floor((self.f_span*1e6)/self.bin_width) @property def sample_span(self): # The CIC decimator is used to generate samples with decimation # rates >1. The bandpass for the decimator is approximately # cic_bandpass * Nyquist. In order to prevent aliases within the bandpass # region the sample span is set so that the frequency span of # interest falls within the decimator bandpass. if self.decimation_rate == 1: return self.f_span else: return ((self.f_clk/(self.decimation_rate * 1e6)) * SweepParameters.cic_bandpass) @property def nperseg(self): n = round(self.f_clk / (self.decimation_rate * self.initial_bin_width)) return scipy.fft.next_fast_len(n) @property def sample_bins(self): """A count of the valid bins within a single sample. """ if self.decimation_rate == 1: return self.nperseg else: return round(self.nperseg * SweepParameters.cic_bandpass) @property def chunk_size(self): chunk_size = 5 * self.nperseg if chunk_size > CHAN_BUFSIZE: chunk_size = CHAN_BUFSIZE return chunk_size def initialize_window(self): self._window = scipy.signal.get_window( HFT144D, Nx=self.nperseg + self.lo_offset_bins) self.s1 = self._window.sum() self.s2 = (self._window * self._window).sum() @property def window(self): if self._window is None: self.initialize_window() return self._window @property def lo_offset_bins(self): bins = SweepParameters.offset_bins while bins > 0 and (self.f_start - (bins * self.bin_width)/1e6) < 0: bins -= 1 return bins @property def lo_offset(self): return (self.lo_offset_bins * self.bin_width)/1e6 @property def enbw(self): if self._window is None: self.initialize_window() return (self.fs * self.s2)/(self.s1 * self.s2) @property def rbw(self): """The resolution bandwidth (in Hz). """ return self.window_w3db * self.bin_width @property def lo_count(self): return ceil(self.f_span / self.sample_span) @dataclass class Spectrum(): freq: np.ndarray pwr: np.ndarray iq_data: np.ndarray parameters: SweepParameters timestamp: datetime
RF Front End
ADCCH1_NAME: str = 'Ch 1' ADCCH2_NAME: str = 'Ch 2' CH1_HWCONF: Dict = {} CH2_HWCONF: Dict = {} APP_HWCONF: List = [CH1_HWCONF, CH2_HWCONF] DEFAULT_CHAN_CONFIG: Dict = {} DEFAULT_APP_CONFIG: Dict = { 'channels': { ADCCH1_NAME: DEFAULT_CHAN_CONFIG, ADCCH2_NAME: DEFAULT_CHAN_CONFIG } }
When there is no actual RF front end service available we still need to be
able to access the ADC attenuator hardware. This is done using the
BaseFrontEndApp
and BaseFrontEndService
.
class BaseFrontEndApp(object): <<base-frontend-config>> NOISE_FLOOR: Dict = { 'channels': { ADCCH1_NAME: { 400000: (-96.844, 4.913), 200000: (-99.515, 5.582), 100000: (-101.416, 5.593), 50000: (-104.877, 5.491), 20000: (-109.576, 5.519), 10000: (-111.158, 5.564), 5000: (-117.337, 5.562), 2000: (-118.675, 5.555), 1000: (-123.148, 5.537), 500: (-124.314, 5.497), 200: (-130.157, 5.600), 100: (-132.071, 5.537), 50: (-134.248, 5.533), 20: (-139.382, 5.617), 10: (-140.746, 5.595), 5: (-144.465, 5.560), 2: (-148.017, 5.526), 1: (-150.230, 5.526) }, ADCCH2_NAME: { 400000: (-91.365, 5.116), 200000: (-98.336, 5.495), 100000: (-103.596, 5.586), 50000: (-105.337, 5.540), 20000: (-109.331, 5.559), 10000: (-112.460, 5.617), 5000: (-114.616, 5.481), 2000: (-119.935, 5.527), 1000: (-122.643, 5.533), 500: (-123.989, 5.535), 200: (-130.619, 5.577), 100: (-130.507, 5.579), 50: (-136.482, 5.557), 20: (-139.912, 5.572), 10: (-140.932, 5.580), 5: (-142.238, 5.548), 2: (-147.861, 5.539), 1: (-149.752, 5.539) } } } FREQ_RESPONSE: Dict = { 'channels': { ADCCH1_NAME: { 0.40: 2.824, 0.90: 2.179, 1.40: 2.063, 1.90: 1.801, 2.40: 1.485, 2.90: 1.195, 3.40: 1.020, 3.90: 0.861, 4.40: 0.725, 4.90: 0.592, 5.40: 0.484, 5.90: 0.392, 6.40: 0.313, 6.90: 0.230, 7.40: 0.212, 7.90: 0.208, 8.40: 0.212, 8.90: 0.223, 9.40: 0.232, 9.90: 0.254, 10.40: 0.270, 10.90: 0.277, 11.40: 0.296, 11.90: 0.314, 12.40: 0.320, 12.90: 0.339, 13.40: 0.348, 13.90: 0.365, 14.40: 0.362, 14.90: 0.377, 15.40: 0.382, 15.90: 0.389, 16.40: 0.401, 16.90: 0.407, 17.40: 0.407, 17.90: 0.412, 18.40: 0.410, 18.90: 0.414, 19.40: 0.434, 19.90: 0.451, 20.40: 0.470, 20.90: 0.472, 21.40: 0.497, 21.90: 0.512, 22.40: 0.519, 22.90: 0.532, 23.40: 0.554, 23.90: 0.563, 24.40: 0.584, 24.90: 0.607, 25.40: 0.622, 25.90: 0.635, 26.40: 0.652, 26.90: 0.670, 27.40: 0.697, 27.90: 0.705, 28.40: 0.739, 28.90: 0.758, 29.40: 0.780, 29.90: 0.802, 30.40: 0.824, 30.90: 0.847, 31.40: 0.883, 31.90: 0.907, 32.40: 0.945, 32.90: 0.978, 33.40: 1.002, 33.90: 1.041, 34.40: 1.073, 34.90: 1.105, 35.40: 1.149, 35.90: 1.183, 36.40: 1.230, 36.90: 1.266, 37.40: 1.308, 37.90: 1.346, 38.40: 1.385, 38.90: 1.426, 39.40: 1.467, 39.90: 1.508, 40.40: 1.548, 40.90: 1.588, 41.40: 1.624, 41.90: 1.666, 42.40: 1.704, 42.90: 1.741, 43.40: 1.773, 43.90: 1.806, 44.40: 1.840, 44.90: 1.872, 45.40: 1.898, 45.90: 1.930, 46.40: 1.958, 46.90: 1.985, 47.40: 2.016, 47.90: 2.043, 48.40: 2.072, 48.90: 2.101, 49.40: 2.130, 49.90: 2.167, 50.40: 2.207, 50.90: 2.249, 51.40: 2.299, 51.90: 2.357, 52.40: 2.429, 52.90: 2.508, 53.40: 2.611, 53.90: 2.732, 54.40: 2.872, 54.90: 3.045, 55.40: 3.253, 55.90: 3.496 }, ADCCH2_NAME: { 0.40: 2.152, 0.90: 1.662, 1.40: 1.569, 1.90: 1.351, 2.40: 1.059, 2.90: 0.809, 3.40: 0.691, 3.90: 0.585, 4.40: 0.492, 4.90: 0.394, 5.40: 0.328, 5.90: 0.260, 6.40: 0.206, 6.90: 0.166, 7.40: 0.156, 7.90: 0.165, 8.40: 0.187, 8.90: 0.214, 9.40: 0.244, 9.90: 0.261, 10.40: 0.286, 10.90: 0.309, 11.40: 0.339, 11.90: 0.351, 12.40: 0.377, 12.90: 0.402, 13.40: 0.421, 13.90: 0.430, 14.40: 0.442, 14.90: 0.456, 15.40: 0.474, 15.90: 0.484, 16.40: 0.492, 16.90: 0.508, 17.40: 0.521, 17.90: 0.531, 18.40: 0.546, 18.90: 0.558, 19.40: 0.574, 19.90: 0.588, 20.40: 0.615, 20.90: 0.629, 21.40: 0.662, 21.90: 0.685, 22.40: 0.700, 22.90: 0.719, 23.40: 0.729, 23.90: 0.753, 24.40: 0.764, 24.90: 0.788, 25.40: 0.803, 25.90: 0.823, 26.40: 0.830, 26.90: 0.847, 27.40: 0.875, 27.90: 0.882, 28.40: 0.912, 28.90: 0.931, 29.40: 0.949, 29.90: 0.970, 30.40: 0.988, 30.90: 1.015, 31.40: 1.043, 31.90: 1.067, 32.40: 1.092, 32.90: 1.124, 33.40: 1.153, 33.90: 1.187, 34.40: 1.216, 34.90: 1.256, 35.40: 1.293, 35.90: 1.330, 36.40: 1.368, 36.90: 1.408, 37.40: 1.448, 37.90: 1.491, 38.40: 1.533, 38.90: 1.571, 39.40: 1.613, 39.90: 1.654, 40.40: 1.694, 40.90: 1.733, 41.40: 1.770, 41.90: 1.810, 42.40: 1.843, 42.90: 1.880, 43.40: 1.915, 43.90: 1.949, 44.40: 1.981, 44.90: 2.011, 45.40: 2.035, 45.90: 2.061, 46.40: 2.083, 46.90: 2.110, 47.40: 2.128, 47.90: 2.145, 48.40: 2.167, 48.90: 2.188, 49.40: 2.216, 49.90: 2.237, 50.40: 2.266, 50.90: 2.302, 51.40: 2.341, 51.90: 2.385, 52.40: 2.442, 52.90: 2.511, 53.40: 2.602, 53.90: 2.708, 54.40: 2.837, 54.90: 2.995, 55.40: 3.177, 55.90: 3.408 }, } } def __init__(self, hw_config: List = APP_HWCONF, app_config: Dict = DEFAULT_APP_CONFIG) -> None: d: Dict = app_config['channels'] z = zip(d.keys(), hw_config, d.values()) self._channels: Dict = { chan_id: ADCChan(chan_id, hw_conf, chan_config) for chan_id, hw_conf, chan_config in z} for chan_id, nf in BaseFrontEndApp.NOISE_FLOOR['channels'].items(): self._channels[chan_id].noise_floor_cal_data = nf for chan_id, fr in BaseFrontEndApp.FREQ_RESPONSE['channels'].items(): self._channels[chan_id].freq_resp_cal_data = fr @property def channels(self) -> Dict[str, ADCChan]: return self._channels
class BaseFrontEndService(object): BASE_CAPABILITIES = { "channels": [ BaseFrontEndApp.ADCCH1_NAME, BaseFrontEndApp.ADCCH2_NAME], BaseFrontEndApp.ADCCH1_NAME: { "adcdma_channel": 0, "adc_attenuation": { "min": RP_ADC_MIN_ATT, "max": RP_ADC_MAX_ATT, "step": RP_ADC_ATT_STEP }, "fe_attenuation": None, "freq": { "min": RP_ADC_MIN_FREQUENCY, "max": RP_ADC_MAX_FREQUENCY, "step": 0.0 }, "cal_freq": { "min": 0.1, "max": 56.5, "step": 0.5 }, "reflevel": { "min": RP_ADC_MIN_REF_LEVEL, "max": RP_ADC_MAX_REF_LEVEL, "step": 1.0 }, "min_span": RP_ADC_MIN_SPAN }, BaseFrontEndApp.ADCCH2_NAME: { "adcdma_channel": 1, "adc_attenuation": { "min": RP_ADC_MIN_ATT, "max": RP_ADC_MAX_ATT, "step": RP_ADC_ATT_STEP }, "fe_attenuation": None, "freq": { "min": RP_ADC_MIN_FREQUENCY, "max": RP_ADC_MAX_FREQUENCY, "step": 0.0 }, "cal_freq": { "min": 0.1, "max": 56.5, "step": 0.5 }, "reflevel": { "min": RP_ADC_MIN_REF_LEVEL, "max": RP_ADC_MAX_REF_LEVEL, "step": 1.0 }, "min_span": RP_ADC_MIN_SPAN } } def __init__(self, app, capabilities: Dict = BASE_CAPABILITIES) -> None: self._app = app self._capabilities = { 'channels': capabilities['channels'], BaseFrontEndApp.ADCCH1_NAME: ChannelCapabilities.from_dict( capabilities[BaseFrontEndApp.ADCCH1_NAME]), BaseFrontEndApp.ADCCH2_NAME: ChannelCapabilities.from_dict( capabilities[BaseFrontEndApp.ADCCH2_NAME]) } @property def capabilities(self): return self._capabilities def initialize(self) -> None: pass def cleanup(self) -> None: pass @property def adc_channels(self) -> Dict[str, ADCChan]: return self._app.channels @classmethod def save_config(cls): print(BaseFrontEndApp.DEFAULT_APP_CONFIG)
from typing import ( Optional, List, Dict, Callable ) import copy import rpyc from scipy import interpolate from tam import ( RP_ADC_MIN_FREQUENCY, RP_ADC_MAX_FREQUENCY, RP_ADC_MIN_SPAN, RP_ADC_MAX_REF_LEVEL, RP_ADC_MIN_REF_LEVEL, RP_ADC_REF_LEVEL_STEP, RP_ADC_MIN_ATT, RP_ADC_MAX_ATT, RP_ADC_ATT_STEP, BASEBAND_CAL, FREQRESP_CAL, NO_CAL, Capability, ChannelCapabilities ) from rfblocks import ( DEFAULT_BAUDRATE, create_serial ) from frontend.adcchan import ADCChan <<base-frontend-app-class>> <<base-frontend-service>> class FrontEndClient(object): def __init__(self, fe_ip, fe_port, fe_config): """ """ self._calibration_mode = NO_CAL try: self.fe = rpyc.connect(fe_ip, fe_port, config = {"allow_all_attrs" : True}) self.front_end = self.fe.root if fe_config is None: print(f'Using capabilities from {fe_ip}:{fe_port}') self.capabilities = self.fe.root.capabilities else: capabilities = fe_config['capabilities'] self.capabilities = { 'channels': capabilities['channels'], BaseFrontEndApp.ADCCH1_NAME: ChannelCapabilities.from_dict( capabilities[BaseFrontEndApp.ADCCH1_NAME]), BaseFrontEndApp.ADCCH2_NAME: ChannelCapabilities.from_dict( capabilities[BaseFrontEndApp.ADCCH2_NAME]) } self.front_end.initialize() self._lo_freq_cache = {} for chan_id in self.capabilities['channels']: self._lo_freq_cache[chan_id] = 0.0 chan_type = self.capabilities[chan_id].chan_type if chan_type in [ChannelCapabilities.SINGLE, ChannelCapabilities.SUPERHET]: self.set_fe_atten(chan_id, 10.0) except OSError: print(f'FrontEnd service at {fe_ip}:{fe_port} is not available.') print(f'Using BASE capabilities.') if fe_config is not None: app = BaseFrontEndApp( hw_config=fe_config['hw_config'], app_config=fe_config['app_config']) else: app = BaseFrontEndApp() if fe_config is not None and 'capabilities' in fe_config: self.front_end = BaseFrontEndService( app, fe_config['capabilities']) else: self.front_end = BaseFrontEndService(app) self.capabilities = self.front_end.capabilities self.front_end.initialize() self.initialize_cal_data() def initialize_cal_data(self): self._noise_floor_fns = {} self._noise_stddev_fns = {} self._freq_resp_fns = {} self._baseband_freq_resp_fns = {} for chan_id in self.capabilities['channels']: chan_type = self.capabilities[chan_id].chan_type if chan_type == ChannelCapabilities.DIRECT: nf = BaseFrontEndApp.NOISE_FLOOR['channels'][chan_id] else: nf = self.channel(chan_id).noise_floor_cal_data s = list(nf.keys()) means = [stats[0] for stats in nf.values()] self._noise_floor_fns[chan_id] = interpolate.interp1d( s, means, kind='cubic') stddevs = [stats[1] for stats in nf.values()] self._noise_stddev_fns[chan_id] = interpolate.interp1d( s, stddevs, kind='cubic') if chan_type == ChannelCapabilities.DIRECT: fr = BaseFrontEndApp.FREQ_RESPONSE['channels'][chan_id] else: fr = self.channel(chan_id).freq_resp_cal_data f = list(fr.keys()) p = list(fr.values()) self._freq_resp_fns[chan_id] = interpolate.interp1d( f, p, kind='cubic') if chan_type == ChannelCapabilities.DIRECT: self._baseband_freq_resp_fns[chan_id] = self._freq_resp_fns[chan_id] else: fr = BaseFrontEndApp.FREQ_RESPONSE['channels'][chan_id] f = list(fr.keys()) p = list(fr.values()) self._baseband_freq_resp_fns[chan_id] = interpolate.interp1d( f, p, kind='cubic') def cleanup(self) -> None: if isinstance(self.front_end, BaseFrontEndService): self.front_end.cleanup() @property def calibration_mode(self) -> int: return self._calibration_mode @calibration_mode.setter def calibration_mode(self, mode: int) -> None: if self._calibration_mode != mode: self._calibration_mode = mode def freq_window(self, chan_id: str) -> float: chan_type = self.fe_chan_type(chan_id) if chan_type in [ChannelCapabilities.SUPERHET, ChannelCapabilities.SINGLE]: return self.capabilities[chan_id].bandwidth else: # DIRECT return RP_ADC_MAX_FREQUENCY - RP_ADC_MIN_FREQUENCY def adc_centre_freq(self, chan_id: str) -> Optional[float]: if self.fe_chan_type(chan_id) in [ChannelCapabilities.SUPERHET]: return self.capabilities[chan_id].adc_centre_freq else: return None def channel(self, chan_id: str) -> ADCChan: return self.front_end.adc_channels[chan_id] def fe_chan_type(self, chan_id: str): return self.capabilities[chan_id].chan_type def set_fe_atten(self, chan_id: str, att: float) -> None: if self.fe_chan_type(chan_id) == ChannelCapabilities.DIRECT: raise NotImplementedError self.front_end.configure_stepatten(chan_id, att) def fe_atten(self, chan_id: str) -> float: if self.fe_chan_type(chan_id) == ChannelCapabilities.DIRECT: raise NotImplementedError ch = self.channel(chan_id) return ch.stepatten_ctl.attenuation def set_lo(self, chan_id: str, lo_freq: float) -> None: """Set the LO frequency of the front end for the specified channel. :param chan_id: The front end channel identifier. :type chan_id: str :param lo_freq: The new LO frequency (in MHz) for the channel. :type lo_freq: float Note that the LO frequency values for the front end channels are cached here so that (a possibly expensive) round trip to the FrontEnd service is not required each time the LO frequency is read. """ if lo_freq != self._lo_freq_cache[chan_id]: chan_type = self.fe_chan_type(chan_id) if chan_type == ChannelCapabilities.SINGLE: self.front_end.configure_lo1_freq(chan_id, lo_freq) elif chan_type == ChannelCapabilities.SUPERHET: self.front_end.configure_lo_freq(chan_id, lo_freq) else: raise NotImplementedError self._lo_freq_cache[chan_id] = lo_freq def lo(self, chan_id: str) -> float: if self.fe_chan_type(chan_id) == ChannelCapabilities.SINGLE: return self._lo_freq_cache[chan_id] else: raise NotImplementedError def freq_response_correction(self, chan_id: str) -> Callable: """Returns a function which produces the front end frequency response for any frequency within the front end frequency range. Note that if the front end consists of only the usual baseband hardware then the correction will be the baseband correction (as would be returned by :py:meth:`baseband_freq_response_correction`) """ return self._freq_resp_fns[chan_id] def baseband_freq_response_correction(self, chan_id: str) -> Callable: """Returns a function which produces the baseband frequency response for any frequency within the baseband frequency range. """ return self._baseband_freq_resp_fns[chan_id] def set_freq_resp_cal_data(self, chan_id: str, fr: Dict[float, float]) -> None: ch = self.channel(chan_id) ch.freq_resp_cal_data = fr def noise_floor(self, chan_id: str, rbw: float) -> float: noise = self._noise_floor_fns[chan_id](rbw) return float(noise) def noise_stddev(self, chan_id: str, rbw: float) -> float: stddev = self._noise_stddev_fns[chan_id](rbw) return float(stddev) def set_noise_floor_cal_data( self, chan_id: str, nf: Dict[float, float]) -> None: ch = self.channel(chan_id) ch.noise_floor_cal_data = nf
RPyC Service
The Python remote procedure call framework RPyC is used to create a network service which allows the spectrum analyzer to be accessed remotely.
The RPSaService
implements the functionality which is accessed via
RPyC.
from typing import ( List, Tuple, Dict, Optional ) from time import sleep import rpyc from rpyc.utils.server import ThreadedServer from PyQt5.QtCore import ( QObject, QDeadlineTimer, pyqtSignal ) from tam import ChannelCapabilities from utils import ( TRACE_TYPE_NORMAL, TRACE_TYPE_AVG, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE, DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) from plot import SpectrumMarker from app import AnalyzerApp class ProxyApp: def __init__(self, app: AnalyzerApp): self._app: AnalyzerApp = app def processAppEvents(self): self._app.processEventsMutex.lock() self._app.processAppEvents.emit() try: self._app.eventsProcessedCond.wait( self._app.processEventsMutex) finally: self._app.processEventsMutex.unlock() @property def channel(self): """The currently selected SA channel. This will be either 0 ("Chan 1")or 1 ("Chan 2"). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.channel 0 >>> sa_app.channel = 1 """ selected_chan_id: str = self._app.selected_chan.fe_chan_id return list(self._app.channels.keys()).index(selected_chan_id) @channel.setter def channel(self, chan_id): self._app._chan_buttons[chan_id].click() self.processAppEvents() @property def centre_freq(self) -> float: """The centre frequency for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.channel = 1 >>> sa_app.centre_freq 2027.5 >>> sa_app.centre_freq = 1000 >>> sa_app.centre_freq 1000.0 """ return self._app.selected_chan.centre_freq @centre_freq.setter def centre_freq(self, f: float) -> None: self._app.selected_chan.centre_freq = f self.processAppEvents() @property def start_freq(self) -> float: """The start frequency for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.start_freq = 999.95 >>> sa_app.stop_freq = 1000.5 >>> f"{sa_app.freq_span:.4f}" '0.5500' """ return self._app.selected_chan.start_freq @start_freq.setter def start_freq(self, f: float) -> None: self._app.selected_chan.start_freq = f self.processAppEvents() @property def stop_freq(self) -> float: """The stop frequency for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.start_freq = 999.95 >>> sa_app.stop_freq = 1000.5 >>> f"{sa_app.freq_span:.4f}" '0.5500' """ return self._app.selected_chan.stop_freq @stop_freq.setter def stop_freq(self, f: float) -> None: self._app.selected_chan.stop_freq = f self.processAppEvents() @property def freq_span(self) -> float: """The frequency span for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.start_freq = 999.95 >>> sa_app.stop_freq = 1000.5 >>> f"{sa_app.freq_span:.4f}" '0.5500' >>> sa_app.freq_span = 1.0 >>> sa_app.start_freq 999.725 >>> sa_app.stop_freq 1000.725 """ return self._app.selected_chan.freq_span @freq_span.setter def freq_span(self, s: float) -> None: self._app.selected_chan.freq_span = s self.processAppEvents() @property def rbw(self) -> float: """The resolution bandwidth of the currently selected channel (in Hz). Note that setting an RBW value using this property will force the 'auto RBW' property to False. >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.rbw 5000 >>> sa_app.rbw = 2000 >>> sa_app.sweeps(5) """ return self._app.selected_chan.rbw @rbw.setter def rbw(self, f: float) -> None: self._app.selected_chan.auto_rbw = False self._app.selected_chan.rbw = f self.processAppEvents() @property def auto_rbw(self) -> bool: """Enable/disable auto setting of RBW for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.auto_rbw True """ return self._app.selected_chan.auto_rbw @auto_rbw.setter def auto_rbw(self, b: bool) -> None: self._app.selected_chan.auto_rbw = b self.processAppEvents() @property def felo_offset(self) -> float: """ """ return self._app.selected_chan.felo_offset @felo_offset.setter def felo_offset(self, offset: float) -> None: self._app.selected_chan.felo_offset = offset self.processAppEvents() @property def max_felo_offset(self) -> float: """ """ return self._app.selected_chan.max_felo_offset @property def ref_level(self) -> float: """The reference level for the currently selected channel (in dBm). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.ref_level = 5.0 >>> sa_app.ampl_scale = 12 >>> sa_app.sweep() """ return self._app.selected_chan.reflevel @ref_level.setter def ref_level(self, p: float) -> None: self._app.selected_chan.reflevel = p self.processAppEvents() @property def ampl_scale(self) -> float: """The amplitude scaling for the currently selected channel (in dB). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.ref_level = 5.0 >>> sa_app.ampl_scale = 12 >>> sa_app.sweep() """ return self._app.selected_chan.ampl_scale @ampl_scale.setter def ampl_scale(self, s: float) -> None: self._app.selected_chan.ampl_scale = s self.processAppEvents() @property def atten(self) -> float: """The ADC attenuator setting for the currently selected channel (in dB) This is set in steps of 0.25dB from 0.0dB to 31.75dB. >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.ref_level = 5.0 >>> sa_app.ampl_scale = 12 >>> sa_app.sweep() >>> sa_app.atten 0.0 >>> sa_app.atten = 10 >>> sa_app.atten 10 >>> sa_app.sweep() """ return self._app.selected_chan.atten @atten.setter def atten(self, a: float) -> None: self._app.selected_chan.atten = a self.processAppEvents() def sweep(self) -> None: """Perform a single spectrum sweep on the currently selected channel. Note that this will block until the sweep operation completes. """ self._app.selected_chan.invoke_single_sweep.emit() self._app.selected_chan.updatePlotMutex.lock() try: self._app.selected_chan.plotUpdatedCond.wait( self._app.selected_chan.updatePlotMutex) finally: self._app.selected_chan.updatePlotMutex.unlock() def sweeps(self, sweep_count: int) -> None: """Perform the specified number of spectrum sweeps on the currently selected channel. :param sweep_count: The number of spectrum sweeps to carry out. :type sweep_count: int Note that this will block until the sweep operation completes. """ self._app.selected_chan.invoke_sweep_count.emit(sweep_count) self._app.selected_chan.updatePlotMutex.lock() try: self._app.selected_chan.plotUpdatedCond.wait( self._app.selected_chan.updatePlotMutex) finally: self._app.selected_chan.updatePlotMutex.unlock() @property def running(self) -> bool: """The status of the currently selected channel. :return: True if the channel is currently running and performing spectrum sweeps. False if the channel is not running. """ return self._app.selected_chan.running def marker_normal(self, mkr_id: int=1) -> None: """Set the specified marker for the currently selected channel to be in 'normal' mode. :param mkr_id: The id for the marker (1 to 4). :type mkr_id: int """ self._app.selected_chan.modify_marker_state.emit( mkr_id, SpectrumMarker.MKR_TYPE_NORMAL) self.processAppEvents() def marker_delta(self, mkr_id: int=1) -> None: """Set the specified marker for the currently selected channel to be in 'delta' mode. :param mkr_id: The id for the marker (1 to 4). :type mkr_id: int """ self._app.selected_chan.modify_marker_state.emit( mkr_id, SpectrumMarker.MKR_TYPE_DELTA) self.processAppEvents() def marker_delta_type(self, mkr_id: int=1, mkr_type: int=SpectrumMarker.DELTA_DELTA_TYPE) -> None: """ """ self._app.selected_chan.modify_marker_delta_type.emit( mkr_id, mkr_type) self.processAppEvents() def marker_off(self, mkr_id: int=1) -> None: """Turn the specified marker for the currently selected channel off. :param mkr_id: The id for the marker (1 to 4). :type mkr_id: int """ self._app.selected_chan.modify_marker_state.emit( mkr_id, SpectrumMarker.MKR_TYPE_OFF) self.processAppEvents() @property def selected_marker(self) -> SpectrumMarker: """Return a reference to the currently selected marker. """ return self._app.selected_chan.spectrum_plot.selected_mkr @property def marker_type(self) -> int: """Return the type of the currently selected marker """ return self._app.marker_type() @property def marker_posn(self) -> Tuple[float, float]: """Return the position of the currently selected marker. :returns: A tuple containing the frequency (in MHz) and power (in dBm) of the currently selected marker. If the marker is either a delta or span type then the position will be for the reference marker. """ return self._app.marker_posn() @property def marker_delta_posn(self) -> Tuple[float, float]: """Return the delta frequency and power values for the currently selected marker. Note that if the currently selected marker is not either a delta or span type marker the values returned in the tuple will be None. :returns: A tuple containing the frequency and power delta of the currently selected marker. """ return self._app.marker_delta_posn() def set_marker_delta_posn(self, f:float) -> None: """Move the offset frequency position of the currently selected marker. If the type of the currently selected marker is not ``MKR_TYPE_DELTA`` this function will do nothing. :param f: The offset from the reference marker to move the delta marker to. :type f: float >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.marker_delta() >>> from plot import SpectrumMarker >>> sa_app.marker_delta_type(mkr_id=1, mkr_type=SpectrumMarker.REF_DELTA_TYPE) >>> sa_app.peak_search() >>> sa_app.marker_delta_type(mkr_id=1, mkr_type=SpectrumMarker.DELTA_DELTA_TYPE) >>> sa_app.set_marker_delta_posn(0.5) >>> sa_app.marker_delta_posn (0.5009602327528455, -91.42057798668877) """ if self.selected_marker.marker_type == SpectrumMarker.MKR_TYPE_DELTA: freq, pwr = self._app.marker_posn() idx = self._app.selected_chan.spectrum_plot.index_from_freq(freq + f) self._app.selected_chan.move_marker.emit(idx) self.processAppEvents() @property def detector_type(self) -> int: """ """ return self._app.selected_chan.detector_type @detector_type.setter def detector_type(self, detector: int) -> None: self._app.selected_chan.modify_detector_state.emit(detector) def detector_type_normal(self) -> None: """Set the currently selected channel to use the normal detector """ self._app.selected_chan.modify_detector_state.emit( DETECTOR_TYPE_NORMAL) self.processAppEvents() def detector_type_pos(self) -> None: """Set the currently selected channel to use the positive peak detector """ self._app.selected_chan.modify_detector_state.emit( DETECTOR_TYPE_POS) self.processAppEvents() def detector_type_neg(self) -> None: """Set the currently selected channel to use the negative peak detector """ self._app.selected_chan.modify_detector_state.emit( DETECTOR_TYPE_NEG) self.processAppEvents() def detector_type_sample(self) -> None: """Set the currently selected channel to use the sample detector """ self._app.selected_chan.modify_detector_state.emit( DETECTOR_TYPE_SAMPLE) self.processAppEvents() @property def trace(self) -> int: """The index of the currently selected trace for the currently selected channel. This is in the range 1..SpectrumPlotWidget.TRACE_COUNT. """ return self._app.selected_chan.selected_trace @trace.setter def trace(self, trace_id: int) -> None: self._app.selected_chan.selected_trace_changed.emit(trace_id) self.processAppEvents() @property def trace_type(self) -> int: """The trace type of the currently active trace. """ return self._app.selected_chan.trace_type @trace_type.setter def trace_type(self, trace_type: int) -> None: self._app.selected_chan.modify_trace_state.emit( self.trace, trace_type) def trace_type_normal(self, trace_id: int=1) -> None: self._app.selected_chan.modify_trace_state.emit( trace_id, TRACE_TYPE_NORMAL) self.processAppEvents() def trace_type_avg(self, trace_id: int=1) -> None: self._app.selected_chan.modify_trace_state.emit( trace_id, TRACE_TYPE_AVG) self.processAppEvents() def trace_type_min(self, trace_id: int=1) -> None: self._app.selected_chan.modify_trace_state.emit( trace_id, TRACE_TYPE_MIN) self.processAppEvents() def trace_type_max(self, trace_id: int=1) -> None: self._app.selected_chan.modify_trace_state.emit( trace_id, TRACE_TYPE_MAX) self.processAppEvents() def trace_type_freeze(self, trace_id: int=1) -> None: self._app.selected_chan.modify_trace_state.emit( trace_id, TRACE_TYPE_FREEZE) self.processAppEvents() def trace_type_blank(self, trace_id: int=1) -> None: self._app.selected_chan.modify_trace_state.emit( trace_id, TRACE_TYPE_BLANK) self.processAppEvents() @property def trace_average(self) -> int: return self._app.selected_chan.trace_average @trace_average.setter def trace_average(self, avg: int) -> None: self._app.selected_chan.trace_average = avg self.processAppEvents() def peak_search(self) -> None: """ """ self._app.selected_chan.invoke_peak_search.emit(False) self.processAppEvents() def next_peak(self) -> None: """ """ self._app.selected_chan.invoke_next_peak.emit(False) self.processAppEvents() def noise_stats(self) -> (float, float): """Return the noise floor statistics for the currently selected channel. :returns: A tuple containing (<the noise floor in dBm>, <the std. deviation of the noise in dB) >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000.0 >>> sa_app.sweep() >>> sa_app.noise_stats() (-87.886, 5.5859999999999985) """ noise_floor = self._app.selected_chan.noise_floor noise_stddev = self._app.selected_chan.noise_stddev - noise_floor return (noise_floor, noise_stddev) def data_store(self): return self._app.selected_chan.adc_thread.data_store @property def chan_capabilities(self) -> Dict: return self._app.selected_chan.chan_capabilities @property def chan_type_is_direct(self) -> bool: return (self._app.selected_chan.adc_chan_type == ChannelCapabilities.DIRECT) @property def fe_atten(self) -> float: if self._app.selected_chan.adc_chan_type == ChannelCapabilities.DIRECT: raise NotImplementedError return self._app.selected_chan.fe_atten @fe_atten.setter def fe_atten(self, a: float) -> None: if self._app.selected_chan.adc_chan_type == ChannelCapabilities.DIRECT: raise NotImplementedError self._app.selected_chan.fe_atten = a self.processAppEvents() @property def fe_gain(self) -> float: return self._app.selected_chan.fe_gain @fe_gain.setter def fe_gain(self, a: float) -> None: self._app.selected_chan.fe_gain = a self.processAppEvents() class RPSaService(rpyc.Service): def __init__(self, app: AnalyzerApp) -> None: super().__init__() self._app = ProxyApp(app) @property def sa_app(self) -> ProxyApp: return self._app class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("RPSA rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname=self._host, port=self._port, protocol_config={ 'allow_all_attrs': True, 'allow_setattr': True, 'allow_pickle': True}) self._server.start() self.finished.emit()
Design Parameters
The design parameters
# DDC design parameters CIC_STAGES = 6 CIC_DIFF_DELAY = 2 CIC_BANDPASS = 0.14 MIN_DECIMATION_RATE = 4 MAX_DECIMATION_RATE = 8192 # RP board model parameters ADC_122_16_F_CLK = 122.88e6 ADC_125_14_F_CLK = 125e6 ADC_FULL_SPAN = 55.0 - 0.5 # Power spectrum parameters BIN_COUNT = 600 CHAN_BUFSIZE = BIN_COUNT*50 AVG_CPG_CORRECTIONS = { 'x': [1, 2, 4, 8, 16, 32, 64, 128], 'y': [1.587, 0.525, 0.195, 0.086, 0.042, 0.017, 0.01, 0.0] } <<fft-window-parameters>>
Globals
A note on using Welch's method
The code uses the scipy
implementation of Welch's method to find the
power spectrum of the signal. The required parameters for the
scipy.welch
function are described in more detail in Estimating the power spectrum. In general, Welch's method is used with large data sets
and a correspondingly large number of windows for averaging the computed
spectra. When a small sample size is used we have noticed a discrepancy
between the calculated power and the actual power. As the size of the
input sample increases this discrepancy becomes smaller until the
computed power levels converge to the actual signal power. This effect
is illustrated in Table 1.
No. of Windows |
Measured power (dBm) |
Correction (dB) |
---|---|---|
1 |
-11.740 |
1.592 |
2 |
-10.680 |
0.532 |
4 |
-10.346 |
0.198 |
8 |
-10.234 |
0.086 |
16 |
-10.191 |
0.043 |
32 |
-10.166 |
0.018 |
64 |
-10.158 |
0.010 |
128 |
-10.152 |
0.004 |
256 |
-10.146 |
-0.002 |
512 |
-10.148 |
0.000 |
We are uncertain as to the source of this discrepancy. Examination of
the scipy
source code indicates that all the appropriate corrections
seem to be made when calculating the power spectrum. In the absence of
an explanation for the discrepancy we will use the difference between
measured and actual power levels for each of the sample sizes listed in
Table 7 to generate a set of corrections. These corrections (from column
3 in Table 7) are entered into the AVG_CPG_CORRECTIONS
dictionary and
the appropriate value selected at run time using the prevailing value of
CHAN_BUFSIZE
and stored in AVG_CPG_CORR
. This correction is later
applied to the output from the call to scipy.signal.welch
in the
AdcThread.power_spectrum
method.
Utility Functions
FREQ_SUFFIXES = [' MHz', ' kHz', ' Hz'] PWR_SUFFIX = ' dB' REFLEVEL_SUFFIX = ' dBm' TRACE_TYPE_NORMAL = 0 TRACE_TYPE_MAX = 1 TRACE_TYPE_MIN = 2 TRACE_TYPE_AVG = 3 TRACE_TYPE_BLANK = 4 TRACE_TYPE_FREEZE = 5 DETECTOR_TYPE_NORMAL = 0 DETECTOR_TYPE_SAMPLE = 1 DETECTOR_TYPE_POS = 2 DETECTOR_TYPE_NEG = 3 DETECTOR_TYPE_AVG_PWR = 4 DETECTOR_TYPE_AVG_VOLT = 5 def parse_freq_spec(s): f_disp, f_suffix = s.split() if f_suffix == FREQ_SUFFIXES[2].strip(): return float(f_disp) / 1e6 elif f_suffix == FREQ_SUFFIXES[1].strip(): return float(f_disp) / 1e3 else: return float(f_disp) def format_freq(f: float, prec: int = 3) -> str: """f is in MHz """ if abs(f) < 0.001: return f"{(f*1e6):4.1f}{FREQ_SUFFIXES[2]}" elif abs(f) < 1.0: return f"{(f*1e3):6.2f}{FREQ_SUFFIXES[1]}" else: return f"{f:7.{prec}f}{FREQ_SUFFIXES[0]}"