Spectrum Analyzer Testbed Application
The Application
This is a spectrum analyzer application targeting systems using the Red Pitaya ADC/DAC board in combination with RF front ends constructed using RF Blocks modules. Figure 1 illustrates the relationships between the various hardware and software components of a typical system.
Figure 1: RPSA hardware environment. Heavy black lines indicate RF signal connections, light black lines indication control connections, heavy blue lines indicate TCP/IP network connections, heavy orange lines indicate USB connections.
Hardware Setup
Hardware setup for the Red Pitaya board is presented in more detail here: A Dual ADC/DAC Using the Red Pitaya Board. The hardware can be used in a base band configuration or with an RF front end which down converts the test signal into the base band.
Base band
Direct conversion RF front end
Superhet RF front end
Installing and Running the Application
Install the dual ADC/DAC SD card image as documented.
Install the adcdma application
Create a virtual environment and install rfblocks as documented here: install rfblocks into a virtual environment.
Install the rp-sa reference application
Command line usage
usage: sa [-h] [-a ADCADDR] [-p ADCPORT] [-A IPADDR] [-P PORT] [--frontend_ip FRONTEND_IP] [--frontend_port FRONTEND_PORT] [--cal_device CAL_DEVICE] [--cal_data CAL_DATA] [-F FRONTEND_CONFIG] Red Pitaya spectrum analyzer application optional arguments: -h, --help show this help message and exit -a ADCADDR, --adcaddr ADCADDR IP address for the ADC DMA server instance -p ADCPORT, --adcport ADCPORT TCP port for the ADC DMA server instance -A IPADDR, --ipaddr IPADDR IP address for the SA RPyC server instance -P PORT, --port PORT TCP port for the SA RPyC server instance --frontend_ip FRONTEND_IP IP address for the RF frontend server instance --frontend_port FRONTEND_PORT TCP port for the RF frontend server instance --cal_device CAL_DEVICE Calibration source device. The device is specified as a comma separated string of the form: DEVICE_NAME,DEVICE_ID for example: SMHU58,28 --cal_data CAL_DATA A file containing ADC calibration data. -F FRONTEND_CONFIG, --frontend_config FRONTEND_CONFIG Frontend hardware configuration.
Note that on some Linux platforms a bug in the QT library manifests as a segfault under some circumstances. A workaround for this bug is to set the following environment variable before running the application:
See stackoverflow for more information.
Configuration and capabilities
On startup, the sa
application will attempt to read spectrum analyzer
capabilities from the default front end application service address
(127.0.0.1:18870
). If the front end application service is at a
listening on a different address the --frontend_ip
and
--frontend_port
command line parameters should be used to specify it.
If the sa
application fails to read analyzer capabilities from a front
end application service it will default to making use of base band
capabilities as specified in BaseFrontEndService.BASE_CAPABILITIES
class
attribute (see RF Front End section of the sa application source code).
Application Source Code
A detailed description of the spectrum analyzer app code is provided here: Red Pitaya Spectrum Analyzer.
The Application Main Window
Figure 2: The spectrum analyzer application window
The application is presented with channel signal displays on the left
(either one or two channels may be displayed), and application controls on
the right.
Channel sweep controls
allow for Single
sweep with continuous sweeping
initiated using the Start
button. Continuous sweeping is suspended by
selecting the Stop
button.
Channel controls
allow either of two channels to be displayed or, if the
Split
button is selected, both channels displayed simultaneously.
Per channel controls are available by selectcing one of the three panes at
the bottom right of the application window: Freq/Ampl
, Markers
and
Trace
.
Shortcut |
Description |
---|---|
|
Start continuous sweep |
|
Single sweep |
|
Stop sweep |
|
Select channel 1 spectrum display |
|
Select channel 2 spectrum display |
|
Select the |
|
Select the |
|
Select the |

Figure 3a: Single channel display

Figure 3b: Split channel display
Taking Measurements
Figure 4: Frequency and amplitude controls
The spectrum frequency controls, Centre
, Start
, Stop
and Span
are
coupled so that changes to one with cause appropriate updates in the
others. The Full Span
button will reset the spectum display to be the
full span, generally 0.5 to 55 MHz although this may vary if an external RF
frontend is used with the Red Pitaya board.
Resolution bandwidth (RBW) of the spectrum display is, by default, set
automatically. The Auto
checkbox can be un-checked to allow the manual
selection of RBW using the associated spinbox controls.
The ADC Attenuation
control sets the value of the channel attenuator
connected to the ADC input of the Red Pitaya board. Maximum attenuation is
31.75 dB in steps of 0.25 dB.
Figure 5: Marker controls
A marker is selected by clicking on one of the four Marker selection
buttons. By default, markers are set to the Off
state. Selecting one of
the marker types Normal
, Delta pair
or Span pair
will make the marker
visible within the channel signal display.
Only one of the markers is selected at any particular time.

Figure 6a: Delta marker

Figure 6b: Span marker
The Ref
and Delta
buttons become active when the selected marker is a delta
marker. The reference marker is denoted by an R
next to the marker number.
Hold down Alt
key and move cursor in order to move the selected marker.
Ctl-Left-Arrow
and Ctl-Right-Arrow
to move selected marker left and right
respectively.
Keyboard accelerators: <Alt>-p
move currently selected marker
to signal peak, <Alt>-c
change signal display centre frequency to
position of the currently selected marke z
to zoom on span marker.
Figure 7: Trace controls
Calibration
Figure 8: Channel calibration menu
RF Front Ends
Front end capabilities are read from the RPyC service associated with the front end.
Direct Conversion

Figure 9: SA App showing front end controls
Software Control
A quick start to using software control with the SA application. More detailed examples of the use of software control are given in the Scripts.
import rpyc sa = rpyc.connect("127.0.0.1", 18865) dds = rpyc.connect("127.0.0.1", 18862) dds.root.initialize() dds1 = dds.root.dds_controllers['Chan 1'] dds1.state = ad9913.POWER_UP dds1.freq = 25.0 dds1.dbm = -10.0 dds.root.configure('Chan 1') sa_app = sa.root.sa_app sa_app.channel = 1 sa_app.centre_freq = 25.0 sa_app.freq_span = 0.5 sa_app.detector_type_pos() sa_app.trace_type_avg() sa_app.sweeps(5) sa_app.marker_normal() freq, pwr = sa_app.marker_posn() print(f"{freq=:.2f}, {pwr=:.2f}") sa.close() dds.close()
Frequency and bandwidth
@property def centre_freq(self) -> float: """The centre frequency for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.channel = 1 >>> sa_app.centre_freq 2027.5 >>> sa_app.centre_freq = 1000 >>> sa_app.centre_freq 1000.0 """ @property def start_freq(self) -> float: """The start frequency for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.start_freq = 999.95 >>> sa_app.stop_freq = 1000.5 >>> f"{sa_app.freq_span:.4f}" '0.5500' """ @property def stop_freq(self) -> float: """The stop frequency for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.start_freq = 999.95 >>> sa_app.stop_freq = 1000.5 >>> f"{sa_app.freq_span:.4f}" '0.5500' """ @property def freq_span(self) -> float: """The frequency span for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.start_freq = 999.95 >>> sa_app.stop_freq = 1000.5 >>> f"{sa_app.freq_span:.4f}" '0.5500' >>> sa_app.freq_span = 1.0 >>> sa_app.start_freq 999.725 >>> sa_app.stop_freq 1000.725 """ @property def rbw(self) -> float: """The resolution bandwidth of the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.rbw 5000 >>> sa_app.rbw = 2000 >>> sa_app.sweeps(5) >>> sa_app.auto_rbw False """ @property def auto_rbw(self) -> bool: """Enable/disable auto setting of RBW for the currently selected channel (in MHz). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.auto_rbw True """
Amplitude
@property def ref_level(self) -> float: """The reference level for the currently selected channel (in dBm). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.ref_level = 5.0 >>> sa_app.ampl_scale = 12 >>> sa_app.sweep() """ @property def ampl_scale(self) -> float: """The amplitude scaling for the currently selected channel (in dB). >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.ref_level = 5.0 >>> sa_app.ampl_scale = 12 >>> sa_app.sweep() """ @property def atten(self) -> float: """The ADC attenuator setting for the currently selected channel (in dB) This is set in steps of 0.25dB from 0.0dB to 31.75dB. >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.centre_freq = 1000 >>> sa_app.ref_level = 5.0 >>> sa_app.ampl_scale = 12 >>> sa_app.sweep() >>> sa_app.atten 0.0 >>> sa_app.atten = 10 >>> sa_app.atten 10 >>> sa_app.sweep() """
Markers
def marker_normal(self, mkr_id: int=1) -> None: """Set the specified marker for the currently selected channel to be in 'normal' mode. :param mkr_id: The id for the marker (1 to 4). :type mkr_id: int """ def marker_delta(self, mkr_id: int=1) -> None: """Set the specified marker for the currently selected channel to be in 'delta' mode. :param mkr_id: The id for the marker (1 to 4). :type mkr_id: int """ def marker_off(self, mkr_id: int=1) -> None: """Turn the specified marker for the currently selected channel off. :param mkr_id: The id for the marker (1 to 4). :type mkr_id: int """ @property def selected_marker(self) -> SpectrumMarker: """Return a reference to the currently selected marker. """ @property def marker_type(self) -> int: """Return the type of the currently selected marker """ @property def marker_posn(self) -> Tuple[float, float]: """Return the position of the currently selected marker. :returns: A tuple containing the frequency (in MHz) and power (in dBm) of the currently selected marker. If the marker is either a delta or span type then the position will be for the reference marker. """ @property def marker_delta_posn(self) -> Tuple[float, float]: """Return the delta frequency and power values for the currently selected marker. Note that if the currently selected marker is not either a delta or span type marker the values returned in the tuple will be None. :returns: A tuple containing the frequency and power delta of the currently selected marker. """ def set_marker_delta_posn(self, f:float) -> None: """Move the offset frequency position of the currently selected marker. If the type of the currently selected marker is not ``MKR_TYPE_DELTA`` this function will do nothing. :param f: The offset from the reference marker to move the delta marker to. :type f: float >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.marker_delta() >>> from plot import SpectrumMarker >>> sa_app.marker_delta_type(mkr_id=1, mkr_type=SpectrumMarker.REF_DELTA_TYPE) >>> sa_app.peak_search() >>> sa_app.marker_delta_type(mkr_id=1, mkr_type=SpectrumMarker.DELTA_DELTA_TYPE) >>> sa_app.set_marker_delta_posn(0.5) >>> sa_app.marker_delta_posn (0.5009602327528455, -91.42057798668877) """ def peak_search(self) -> None: """ """ def next_peak(self) -> None: """ """
Trace
def detector_type_normal(self) -> None: """ """ def detector_type_pos(self) -> None: """ """ def detector_type_neg(self) -> None: """ """ def detector_type_sample(self) -> None: """ """ def trace_type_normal(self, trace_id: int=1) -> None: """ """ def trace_type_avg(self, trace_id: int=1) -> None: """ """ def trace_type_min(self, trace_id: int=1) -> None: """ """ def trace_type_max(self, trace_id: int=1) -> None: """ """ @property def trace_average(self) -> int: """ """
Sweeping
def sweep(self) -> None: """Perform a single spectrum sweep on the currently selected channel. """ def sweeps(self, sweep_count: int) -> None: """Perform the specified number of spectrum sweeps on the currently selected channel. :param sweep_count: The number of spectrum sweeps to carry out. :type sweep_count: int """ @property def running(self) -> bool: """The status of the currently selected channel. :return: True if the channel is currently running and performing spectrum sweeps. False if the channel is not running. """
Front end controls
@property def chan_capabilities(self) -> Dict: """ """ @property def chan_type_is_direct(self) -> bool: """ """ @property def fe_atten(self) -> float: """The value of the RF front end attenuator for the currently selected channel. :returns: The current front end attenuation in dB. :raises: NotImplementedError if there is no RF front end hardware configured. >>> import rpyc >>> sa = rpyc.connect("127.0.0.1", 18865) >>> sa_app = sa.root.sa_app >>> sa_app.fe_atten 10.0 >>> sa_app.fe_atten = 0.0 >>> sa_app.fe_atten 0.0 """ @property def fe_gain(self) -> float: """The value of the RF front end gain for the recently selected channel. :returns: The current front end gain in dB. """
Data store
class DataStore: @property def spectra(self) -> List[Spectrum]: """A list containing the spectra for each of the traces. """ @property def selected_trace_id(self) -> int: """The index of the currently selected trace. """ @property def selected_trace_type(self) -> int: """The currently selected trace type. """ @property def freq(self) -> np.ndarray: """The frequency value array for the currently selected trace. """ @property def pwr(self) -> np.ndarray: """The power value array for the currently selected trace. """ @property def sweep_params(self) -> SweepParameters: """The sweep parameters for the currently selected trace. """ @dataclass class Spectrum(): freq: np.ndarray pwr: np.ndarray iq_data: np.ndarray parameters: SweepParameters timestamp: datetime @dataclass class SweepParameters(): # Start and stop frequencies in MHz. f_start: float f_stop: float fe_lo: float = 0.0 s1: float = 0.0 s2: float = 0.0 window_type: str = 'HFT144D' window_w3db: float = 4.4697 f_clk: float = ADC_122_16_F_CLK @property def fs(self): """ """ @property def bin_width(self): """ """ @property def decimation_rate(self): """ """ @property def enbw(self): """ """ @property def rbw(self): """ """ @property def window(self): """ """
Channel capabilities
Scripts
Some of these scripts may import modules from the RPSA application. In order to allow this, PYTHONPATH should be set as follows:
PYTHONPATH=$PYTHONPATH:$RFBLOCKS_REFDESIGNS/DualDacAdc/scripts/rp:$RFBLOCKS_REFDESIGNS/DualDacAdc/scripts/rp/sa
Save and restore SA settings
This script will save or restore spectrum analyzer application settings using a JSON formatted file.
usage: settings.py [-h] [-a SAADDR] [-p SAPORT] [-S] [-R] [-f SETTINGS_FILE] [-C {1,2}] Save/restore settings for the RPSA app. optional arguments: -h, --help show this help message and exit -a SAADDR, --saaddr SAADDR IP address for the RPSA RPyC service (127.0.0.1) -p SAPORT, --saport SAPORT TCP port for the RPSA RPyC service (18865) -S, --save Save the current RPSA settings -R, --restore Restore RPSA settings -f SETTINGS_FILE, --settings_file SETTINGS_FILE File to save or restore SA settings to or from (sa-settings.json) -C {1,2}, --channel {1,2} Channel settings to save or restore. Note that not specifying a channel will save or restore both channels.
import sys from argparse import ArgumentParser, RawTextHelpFormatter import json import textwrap import rpyc DEFAULT_SETTINGS_FILE = "sa-settings.json" CHANNEL_LABELS = ['Ch 1', 'Ch_2'] def save_settings(sa_app, outfile, chan_list): settings = {} for ch in chan_list: ch_settings = {} chan_label = CHANNEL_LABELS[ch] sa_app.channel = ch ch_settings['centre_freq'] = sa_app.centre_freq ch_settings['start_freq'] = sa_app.start_freq ch_settings['stop_freq'] = sa_app.stop_freq ch_settings['freq_span'] = sa_app.freq_span ch_settings['ref_level'] = sa_app.ref_level ch_settings['ampl_scale'] = sa_app.ampl_scale ch_settings['atten'] = sa_app.atten ch_settings['auto_rbw'] = sa_app.auto_rbw ch_settings['rbw'] = sa_app.rbw settings[chan_label] = ch_settings with open(outfile, 'w') as fd: json.dump(settings, fd) def restore_settings(sa_app, infile, chan_list): with open(infile) as fd: settings = json.load(fd) for ch in chan_list: chan_label = CHANNEL_LABELS[ch] ch_settings = settings[chan_label] sa_app.channel = ch sa_app.centre_freq = float(ch_settings['centre_freq']) sa_app.start_freq = float(ch_settings['start_freq']) sa_app.stop_freq = float(ch_settings['stop_freq']) sa_app.freq_span = float(ch_settings['freq_span']) sa_app.ref_level = float(ch_settings['ref_level']) sa_app.ampl_scale = float(ch_settings['ampl_scale']) sa_app.atten = float(ch_settings['atten']) sa_app.auto_rbw = ch_settings['auto_rbw'] sa_app.rbw = float(ch_settings['rbw']) def main(): default_rpyc_addr = "127.0.0.1" default_rpyc_port = 18865 parser = ArgumentParser(description= "Save/restore settings for the RPSA app.", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpyc_addr, help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})") parser.add_argument( "-p", "--saport", default=default_rpyc_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})") parser.add_argument( "-S", "--save", action='store_true', help="Save the current RPSA settings") parser.add_argument( "-R", "--restore", action='store_true', help="Restore RPSA settings") parser.add_argument( "-f", "--settings_file", default=DEFAULT_SETTINGS_FILE, help=f"File to save or restore SA settings to or from ({DEFAULT_SETTINGS_FILE})") parser.add_argument( "-C", "--channel", choices=["1", "2"], help=textwrap.dedent('''\ Channel settings to save or restore. Note that not specifying a channel will save or restore both channels. ''')) args = parser.parse_args() if args.save and args.restore: print("Both 'save' and 'restore' have been specified.") print("Please choose just one operation.") sys.exit(0) if args.channel is not None: channels = [int(args.channel) - 1] else: channels = [0, 1] sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app if args.save: save_settings(sa_app, args.settings_file, channels) else: restore_settings(sa_app, args.settings_file, channels) if __name__ == '__main__': main()
Measure/set signal generator output level
This makes use of the Simple RF Signal Generator as the signal source.
Figure 10: Setup for setting/measuring signal generator output level
usage: rfgen.py [-h] [-a SAADDR] [-p SAPORT] [-A RFGEN_ADDR] [-P RFGEN_PORT] [--rfgen_chan {1,2}] [--sa_chan {1,2}] [--sa_span SA_SPAN] [--sweep_count SWEEP_COUNT] -F FREQ [-L LEVEL] Measure/set RF signal generator output optional arguments: -h, --help show this help message and exit -a SAADDR, --saaddr SAADDR IP address for the RPSA RPyC service (127.0.0.1) -p SAPORT, --saport SAPORT TCP port for the RPSA RPyC service (18865) -A RFGEN_ADDR, --rfgen_addr RFGEN_ADDR IP address for the RFGen RPyC service (127.0.0.1) -P RFGEN_PORT, --rfgen_port RFGEN_PORT TCP port for the RFGen RPyC service (18864) --rfgen_chan {1,2} Signal generator channel to use. (2) --sa_chan {1,2} Spectrum analyzer channel to use. (1) --sa_span SA_SPAN Spectrum analyzer span to use (in MHz). (0.1) --sweep_count SWEEP_COUNT Number of SA sweeps to average measurement over. (5) -F FREQ, --freq FREQ Signal generator frequency (in MHz). (1000.0) -L LEVEL, --level LEVEL Signal generator output power, in dBm.
Note that this uses a variation of the RFGenService.set_channel_output
method in order to (optionally) set the output level of the signal
generator. Instead of using a power detector head connected to the signal
generator monitoring port the spectrum analyzer facilities are used to
measure the power output. See Control output power using the monitor port
for more details on using RFGenService.set_channel_output
.
import sys import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import rpyc from rfblocks import BridgeCoupler AVERAGING_SWEEPS = 5 MEASUREMENT_SPAN = 0.1 def set_rf(rf_chan, f_rf, rf_pwr): rf_chan.plo_ctl.freq = float(f_rf) rf_chan.level = rf_pwr lock_status = rf_chan.configure() return lock_status def sweep(sa_service, sa_app, sweep_count=AVERAGING_SWEEPS): rpyc_timeout = sa_service._config['sync_request_timeout'] sa_service._config['sync_request_timeout'] = None sa_app.sweeps(sweep_count) sa_service._config['sync_request_timeout'] = rpyc_timeout def measure_signal_level(sa_service, sa_app, sa_chan, freq, span, sweep_count): sa_app.trace_type_normal() sa_app.channel = sa_chan sa_app.centre_freq = freq sa_app.freq_span = span sa_app.sweep() sa_app.trace_type_avg() sweep(sa_service, sa_app, sweep_count) sa_app.marker_normal() sa_app.peak_search() signal_freq, signal_pwr = sa_app.marker_posn return signal_freq, signal_pwr def measure_or_set_signal_level(sa_service, sa_app, rfgen_chan, sa_chan, freq, level, span, sweep_count): lo_chan = rfgen_chan # Default RFGen channel 2 lo_sa_chan = sa_chan # Default RPSA 'Chan 1' min_pwr, max_pwr = lo_chan.power_range(freq) if level < min_pwr: p_des = min_pwr elif level > max_pwr: p_des = max_pwr else: p_des = level if level is not None: # Set freq and level, measure actual level lo_chan.plo_ctl.freq = freq lo_chan.level = level lo_chan.configure() attempts = 0 while attempts < 5: measured_freq, measured_pwr = measure_signal_level( sa_service, sa_app, lo_sa_chan, freq, span, sweep_count) actual_pwr = (measured_pwr - lo_chan.coupler.coupling(freq) + lo_chan.coupler.insertion_loss(freq)) delta_p = p_des - actual_pwr attempts += 1 if abs(delta_p) > 0.3: lo_chan.level = lo_chan.level + delta_p lo_chan.configure() else: break else: measured_freq, measured_pwr = measure_signal_level( sa_service, sa_app, lo_sa_chan, freq, span, sweep_count) actual_pwr = (measured_pwr - lo_chan.coupler.coupling(measured_freq) + lo_chan.coupler.insertion_loss(measured_freq)) return (measured_freq, actual_pwr) def main(): default_rpsa_addr = "127.0.0.1" default_rpsa_port = 18865 default_rfgen_addr = "127.0.0.1" default_rfgen_port = 18864 default_signal_freq = 1000.0 default_rfgen_chan = '2' default_sa_chan = '1' default_sa_span = MEASUREMENT_SPAN default_sweep_count = AVERAGING_SWEEPS parser = ArgumentParser(description= "Measure/set RF signal generator output", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpsa_addr, help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})") parser.add_argument( "-p", "--saport", default=default_rpsa_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})") parser.add_argument( "-A", "--rfgen_addr", default=default_rfgen_addr, help=f"IP address for the RFGen RPyC service ({default_rfgen_addr})") parser.add_argument( "-P", "--rfgen_port", default=default_rfgen_port, type=int, help=f"TCP port for the RFGen RPyC service ({default_rfgen_port})") parser.add_argument( "--rfgen_chan", default=default_rfgen_chan, choices=["1", "2"], help=f"Signal generator channel to use. ({default_rfgen_chan})") parser.add_argument( "--sa_chan", default=default_sa_chan, choices=["1", "2"], help=f"Spectrum analyzer channel to use. ({default_sa_chan})") parser.add_argument( "--sa_span", default=default_sa_span, type=float, help=f"Spectrum analyzer span to use (in MHz). ({default_sa_span})") parser.add_argument( "--sweep_count", default=default_sweep_count, type=int, help=f"Number of SA sweeps to average measurement over. ({default_sweep_count})") parser.add_argument( "-F", "--freq", required=True, type=float, help=f"Signal generator frequency (in MHz). ({default_signal_freq})") parser.add_argument( "-L", "--level", type=float, help=f"Signal generator output power, in dBm.") args = parser.parse_args() sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app rfgen = rpyc.connect(args.rfgen_addr, args.rfgen_port) rfgen_app = rfgen.root rfgen_chan = rfgen_app.channels[f"Chan {args.rfgen_chan}"] sa_chan = int(args.sa_chan) - 1 freq, pwr = measure_or_set_signal_level(sa, sa_app, rfgen_chan, sa_chan, args.freq, args.level, args.sa_span, args.sweep_count) print(f"{freq=}, {pwr=}") if __name__ == '__main__': main()
Produce spectrum plots
Figure 11: Example plot produced by plot.py
usage: plot.py [-h] [-a SAADDR] [-p SAPORT] [-C {1,2}] [-O PLOT_FILE] Generate spectrum plots from the RPSA app. optional arguments: -h, --help show this help message and exit -a SAADDR, --saaddr SAADDR IP address for the RPSA RPyC service (127.0.0.1) -p SAPORT, --saport SAPORT TCP port for the RPSA RPyC service (18865) -C {1,2}, --channel {1,2} Channel spectra to plot. Note that not specifying a channel will cause plots of both channels to be produced. -O PLOT_FILE, --plot_file PLOT_FILE File to save the generated plot to (plot.svg)
import sys from argparse import ArgumentParser, RawTextHelpFormatter import json import textwrap import numpy as np import matplotlib import matplotlib.pyplot as plt matplotlib.use('Agg') import rpyc from dyadic.splot import init_style from sa.utils import ( FREQ_SUFFIXES, TRACE_TYPE_NORMAL, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG, TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE ) from sa.plot import ( SpectrumPlotWidget, SpectrumMarker ) DEFAULT_PLOT_FILE = "plot.svg" ANNOTATION_SIZE = 'x-small' TITLE_SIZE = "xx-large" ANNOTATION_COLOUR = "#0f0f0f80" TITLE_COLOUR = "#0f0f0f40" MARKER_COLOUR = 'r' DELTA_MARKER_COLOUR = 'b' MARKER_SIZE = 4 DOWN_TRIANGLE = 7 UP_TRIANGLE = 6 def format_freq(f: float, prec: int = 3) -> str: """f is in MHz """ if abs(f) < 0.001: return f"{(f*1e6):4.1f}{FREQ_SUFFIXES[2]}" elif abs(f) < 1.0: return f"{(f*1e3):6.2f}{FREQ_SUFFIXES[1]}" else: return f"{f:7.{prec}f}{FREQ_SUFFIXES[0]}" def draw_marker(sa_app, ax, mkr): ampl_scale = sa_app.ampl_scale fspan = sa_app.freq_span f_offset = fspan * 0.005 a_offset = ampl_scale * 0.175 if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]: mkr_text = f"{mkr.marker_id:d}R" else: mkr_text = f"{mkr.marker_id:d}" if mkr.pwr > (sa_app.ref_level - (ampl_scale * 0.2)): ax.plot(mkr.freq, sa_app.ref_level, marker=UP_TRIANGLE, markersize=MARKER_SIZE, color=MARKER_COLOUR) ax.annotate(mkr_text, xy=(mkr.freq - f_offset, sa_app.ref_level - (2.0 * a_offset)), fontsize=ANNOTATION_SIZE, color=MARKER_COLOUR) else: ax.plot(mkr.freq, mkr.pwr, marker=DOWN_TRIANGLE, markersize=MARKER_SIZE, color=MARKER_COLOUR) ax.annotate(mkr_text, xy=(mkr.freq - f_offset, mkr.pwr + a_offset), fontsize=ANNOTATION_SIZE, color=MARKER_COLOUR) if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]: if mkr.aux_pwr > (sa_app.ref_level - (sa_app.ampl_scale * 0.2)): ax.plot(mkr.aux_freq, sa_app.ref_level, marker=UP_TRIANGLE, markersize=MARKER_SIZE, color=MARKER_COLOUR) ax.annotate(f"{mkr.marker_id:d}", xy=(mkr.aux_freq - f_offset, sa_app.ref_level - (2.0 * a_offset)), fontsize=ANNOTATION_SIZE, color=MARKER_COLOUR) else: ax.plot(mkr.aux_freq, mkr.aux_pwr, marker=DOWN_TRIANGLE, markersize=MARKER_SIZE, color=MARKER_COLOUR) ax.annotate(f"{mkr.marker_id:d}", xy=(mkr.aux_freq - f_offset, mkr.aux_pwr + a_offset), fontsize=ANNOTATION_SIZE, color=MARKER_COLOUR) def annotate_marker(sa_app, ax, mkr): if mkr.marker_type in [SpectrumMarker.MKR_TYPE_NORMAL, SpectrumMarker.MKR_TYPE_DELTA]: mkr_id = mkr.marker_id draw_marker(sa_app, ax, mkr) ax.annotate(f"Mkr{mkr_id}: {mkr.pwr:.2f} dBm", xy=(0.74, 0.95), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) fspan = sa_app.freq_span if fspan < SpectrumPlotWidget.HIGH_PREC_SPAN: freq_text = f" {format_freq(mkr.freq, prec=6)}" else: freq_text = f" {format_freq(mkr.freq)}" ax.annotate(freq_text, xy=(0.74, 0.91), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]: ax.annotate(f"\u0394Mkr{mkr_id}: {mkr.delta_pwr:.2f} dBm", xy=(0.74, 0.87), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f" {format_freq(mkr.delta_freq)}", xy=(0.74, 0.83), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) def annotate_plot(sa_app, chan, ax): sweep_params = sa_app.data_store().sweep_params ax.annotate(f"Ch {chan+1:d}", xy=(0.21, 0.92), xycoords="axes fraction", fontsize=TITLE_SIZE, fontweight="bold", color=TITLE_COLOUR) ax.annotate(f"Start: {format_freq(sa_app.start_freq)}", xy=(0.01, -0.05), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"Span: {format_freq(sa_app.freq_span)}", xy=(0.41, -0.05), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"Stop: {format_freq(sa_app.stop_freq)}", xy=(0.74, -0.05), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"Ref: {sa_app.ref_level:.2f} dBm", xy=(0.01, 0.95), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"RBW: {format_freq(sweep_params.rbw/1e6)}", xy=(0.01, 0.91), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"Atten: {sa_app.atten:.2f} dB", xy=(0.41, 0.95), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) trace_type = sa_app.data_store().selected_trace_type if trace_type in [TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG]: avg_count = sa_app.data_store().average_counter if trace_type == TRACE_TYPE_AVG: avg_text = f"Avg: {avg_count:d}" elif trace_type == TRACE_TYPE_MAX: avg_text = f"Max: {avg_count:d}" elif trace_type == TRACE_TYPE_MIN: avg_text = f"Min: {avg_count:d}" ax.annotate(avg_text, xy=(0.01, 0.65), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) mkr = sa_app.selected_marker annotate_marker(sa_app, ax, mkr) def plot(sa_app, chan_list, plot_file): init_style() if len(chan_list) == 2: fig = plt.figure(num=None, figsize=(6.0, 6.5), dpi=72) ax = [fig.add_subplot(2, 1, 1), fig.add_subplot(2, 1, 2)] else: fig = plt.figure(num=None, figsize=(6.0, 3), dpi=72) ax = [fig.add_subplot(1, 1, 1)] for ch in chan_list: if len(chan_list) == 1: axes = ax[0] else: axes = ax[ch] sa_app.channel = ch min_freq = sa_app.start_freq max_freq = sa_app.stop_freq min_pwr = sa_app.ref_level - 10*sa_app.ampl_scale max_pwr = sa_app.ref_level data = sa_app.data_store() pwr = np.array(data.pwr) freq = np.array(data.freq) axes.set_xlim(min_freq, max_freq) axes.set_xticks(np.linspace(min_freq, max_freq, 11)) axes.xaxis.set_ticklabels([]) axes.set_ylim(min_pwr, max_pwr) axes.tick_params(axis="x", which="both", direction="in") axes.tick_params(axis="y", which="both", length=0.0) axes.grid(linestyle=':') axes.grid(which='both', axis='x', linestyle=':') axes.plot(freq, pwr) annotate_plot(sa_app, ch, axes) fig.tight_layout() plt.savefig(plot_file) def main(): default_rpyc_addr = "127.0.0.1" default_rpyc_port = 18865 parser = ArgumentParser(description= "Generate spectrum plots from the RPSA app.", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpyc_addr, help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})") parser.add_argument( "-p", "--saport", default=default_rpyc_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})") parser.add_argument( "-C", "--channel", choices=["1", "2"], help=textwrap.dedent('''\ Channel spectra to plot. Note that not specifying a channel will cause plots of both channels to be produced. ''')) parser.add_argument( "-O", "--plot_file", default=DEFAULT_PLOT_FILE, help=f"File to save the generated plot to ({DEFAULT_PLOT_FILE})" ) args = parser.parse_args() if args.channel is not None: channels = [int(args.channel) - 1] else: channels = [0, 1] sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app plot(sa_app, channels, args.plot_file) if __name__ == '__main__': main()
Sweep a wideband frequency range
This carries out a wideband sweep by breaking the specified sweep frequency span into smaller spans. Each smaller span is swept multiple times using a slightly different LO setting and the resulting sweeps are averaged. This process serves to remove or greatly attenuate any spurs or images produced by the down conversion.
Figure 12: Example wideband sweep of a 2 GHz signal (-10 dBm)
usage: wideband.py [-h] [-a SAADDR] [-p SAPORT] -s START_FREQ -e STOP_FREQ [-S SECTION_SPAN] [-R RBW] [-D {NORMAL,SAMPLE,POS,NEG}] [-T {LINEAR,RANDOM}] [-C SWEEP_COUNT] [-P PLOT_FILE] [-O OUTPUT_FILE] Generate a wideband spectrum plot optional arguments: -h, --help show this help message and exit -a SAADDR, --saaddr SAADDR IP address for the RPSA RPyC service (127.0.0.1) -p SAPORT, --saport SAPORT TCP port for the RPSA RPyC service (18865) -s START_FREQ, --start_freq START_FREQ Start frequency for the wideband sweep -e STOP_FREQ, --stop_freq STOP_FREQ Stop frequency for the wideband sweep -S SECTION_SPAN, --section_span SECTION_SPAN Frequency span of each section of the wideband sweep. Default: 20.0 -R RBW, --rbw RBW RBW of the sweeps (in Hz). -D {NORMAL,SAMPLE,POS,NEG}, --detector {NORMAL,SAMPLE,POS,NEG} The detector type to use for the wideband sweep. Default: NORMAL -T {LINEAR,RANDOM}, --sweep_type {LINEAR,RANDOM} The type of sweep for each section of the wideband sweep. Default: RANDOM -C SWEEP_COUNT, --sweep_count SWEEP_COUNT The number of sweeps to average for each section of the wideband sweep. Default: 40 -P PLOT_FILE, --plot_file PLOT_FILE The file to write the plot of the wideband sweep. Default: wb-plot.svg -O OUTPUT_FILE, --output_file OUTPUT_FILE The file to write the wideband sweep data to.
from math import floor import random import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import numpy as np import matplotlib import matplotlib.pyplot as plt matplotlib.use('Agg') import rpyc from dyadic.splot import init_style from sa.utils import ( FREQ_SUFFIXES, DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) LINEAR = 0 RANDOM = 1 WIDEBAND_SWEEP_BINS = 1000 DETECTOR_TYPES = { "NORMAL": DETECTOR_TYPE_NORMAL, "SAMPLE": DETECTOR_TYPE_SAMPLE, "POS": DETECTOR_TYPE_POS, "NEG": DETECTOR_TYPE_NEG} ANNOTATION_SIZE = 'x-small' TITLE_SIZE = "xx-large" ANNOTATION_COLOUR = "#0f0f0f80" TITLE_COLOUR = "#0f0f0f40" def format_freq(f: float, prec: int = 3) -> str: """f is in MHz """ if abs(f) < 0.001: return f"{(f*1e6):4.1f}{FREQ_SUFFIXES[2]}" elif abs(f) < 1.0: return f"{(f*1e3):6.2f}{FREQ_SUFFIXES[1]}" else: return f"{f:7.{prec}f}{FREQ_SUFFIXES[0]}" def peak_detect(pwr, resampled_idx, window_size): """Find peaks within a spectrum prior to resampling. :param pwr: A numpy array containing the spectrum power values as computed by the :py:meth:`power_spectrum` function. :type pwr: nparray :param resampled_idx: Array indexes for the resampled spectrum :type resampled_idx: List[int] :param window_size: The window size for the resampled spectrum :type window_size: int :return: A numpy array of bool values. Each value corresponds to a value in the resampled spectrum and is True if the associated power value is a peak and False if the value is noise. """ NOISE_PERCENTILE = 50 STD_THRESHOLD = 4.0 MEAN_THRESHOLD = 4.0 def stats(arr): return np.mean(arr), np.std(arr) j = k = window_size//2 if window_size % 2: k += 1 means = [] stds = [] for i in resampled_idx: if i - j < 0: m, s = stats(pwr[0:k]) elif i + k >= len(pwr): m, s = stats(pwr[i-j:]) else: m, s = stats(pwr[i-j:i+k]) means.append(m) stds.append(s) baseline_mean = np.percentile(means, NOISE_PERCENTILE) baseline_std = np.percentile(stds, NOISE_PERCENTILE) bins = len(resampled_idx) if baseline_std == 0 or baseline_mean == 0: is_peak = np.ones(bins, dtype=bool) else: is_peak = np.where( ((stds/baseline_std > STD_THRESHOLD) & (means/baseline_mean > MEAN_THRESHOLD)), np.ones(bins, dtype=bool), np.zeros(bins, dtype=bool)) return is_peak def detector(detect_type, freq, pwr, bins): """Resample a power spectrum. :param detect_type: The detector type to use when resampling. One of DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_NORMAL. :type detect_type: int. :param freq: Spectrum frequency array :type freq: nparray :param pwr: Spectrum power array :type pwr: nparray :param bins: The number of bins required in the resampled spectrum. :type bins: int """ def max_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.max() def min_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.min() def sample_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return np.take(a, a.size // 2) f_start = freq[0] f_stop = freq[-1] window_size = floor(len(freq)/bins) j = k = window_size//2 if window_size % 2: k += 1 resampled_freqs = np.linspace(f_start, f_stop, bins) indexes = [(np.abs(freq-f)).argmin() for f in resampled_freqs] resampled_pwr = [] if detect_type == DETECTOR_TYPE_POS: for i in indexes: resampled_pwr.append(max_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NEG: for i in indexes: resampled_pwr.append(min_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_SAMPLE: for i in indexes: resampled_pwr.append(sample_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NORMAL: is_peak = peak_detect(pwr, indexes, window_size) for i in range(bins): if is_peak[i]: resampled_pwr.append(max_pwr(pwr, indexes[i])) else: if i % 2: # Odd resampled_pwr.append(min_pwr(pwr, indexes[i])) else: resampled_pwr.append(max_pwr(pwr, indexes[i])) return (resampled_freqs, resampled_pwr) def set_span(sa_app, freq_span, rbw, detector): if freq_span > 45.0: freq_span = 45.0 elif freq_span < 5.0: freq_span = 5.0 if freq_span < 20.01: offset_step = 1.0 elif freq_span < 40.01: offset_step = 0.5 else: offset_step = 0.25 sa_app.trace_type_normal() sa_app.detector_type = detector sa_app.freq_span = freq_span if rbw is not None: sa_app.rbw = rbw else: sa_app.auto_rbw = True sa_app.sweep() return offset_step def sweep(sa_app, centre_freq, sweep_type=LINEAR, sweep_count=20, offset_step=1.0): sa_app.trace_type_normal() sa_app.centre_freq = centre_freq sa_app.sweep() max_offset = sa_app.max_felo_offset sa_app.trace_type_avg() if sweep_type == RANDOM: for offset in [0.25 + (random.random() * (max_offset - 0.25)) for i in range(sweep_count)]: sa_app.felo_offset = float(offset) sa_app.sweep() else: for offset in np.arange(max_offset, 0.0, -offset_step): sa_app.felo_offset = float(offset) sa_app.sweep() sa_app.trace_type_normal() data = sa_app.data_store() return (np.array(data.freq), np.array(data.pwr)) def wideband_sweep(sa_app, start_freq, stop_freq, section_span, rbw, detector, **kwargs): offset_step = set_span(sa_app, section_span, rbw, detector) centre_freqs = np.arange(start_freq + (section_span/2), stop_freq, section_span) freq = np.array([]) pwr = np.array([]) for centre_freq in centre_freqs: f, p = sweep(sa_app, centre_freq, offset_step=offset_step, **kwargs) freq = np.concatenate((freq, f)) pwr = np.concatenate((pwr, p)) return (freq, pwr) def annotate_plot(sa_app, ax, rbw): chan = sa_app.channel if rbw is None: rbw = sa_app.rbw ax.annotate(f"Ch {chan+1:d}", xy=(0.21, 0.92), xycoords="axes fraction", fontsize=TITLE_SIZE, fontweight="bold", color=TITLE_COLOUR) ax.annotate(f"Start: {format_freq(sa_app.start_freq)}", xy=(0.01, 0.05), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"Stop: {format_freq(sa_app.stop_freq)}", xy=(0.74, 0.05), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"Ref: {sa_app.ref_level:.2f} dBm", xy=(0.01, 0.95), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"RBW: {format_freq(rbw/1e6)}", xy=(0.01, 0.91), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) ax.annotate(f"Atten: {sa_app.atten:.2f} dB", xy=(0.41, 0.95), xycoords="axes fraction", fontsize=ANNOTATION_SIZE, color=ANNOTATION_COLOUR) def plot(sa_app, min_freq, max_freq, rbw, freq, pwr, plot_file): init_style() fig = plt.figure(num=None, figsize=(6.0, 3), dpi=72) axes = fig.add_subplot(1, 1, 1) min_pwr = sa_app.ref_level - 10*sa_app.ampl_scale max_pwr = sa_app.ref_level axes.set_xlim(min_freq, max_freq) #axes.set_xticks(np.linspace(min_freq, max_freq, 11)) # axes.xaxis.set_ticklabels([]) axes.set_ylim(min_pwr, max_pwr) #axes.set_yticks(np.linspace(min_pwr, max_pwr, 11)) axes.tick_params(axis="x", which="both", direction="in") axes.tick_params(axis="y", which="both", length=0.0) axes.grid(linestyle=':') axes.grid(which='both', axis='x', linestyle=':') axes.plot(freq, pwr) annotate_plot(sa_app, axes, rbw) fig.tight_layout() plt.savefig(plot_file) def main(): default_rpyc_addr = "127.0.0.1" default_rpyc_port = 18865 default_sweep_type = "RANDOM" default_sweep_count = 40 default_section_span = 20.0 default_plot_file = "wb-plot.svg" default_detector_type = "NORMAL" parser = ArgumentParser(description= "Generate a wideband spectrum plot", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpyc_addr, help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})") parser.add_argument( "-p", "--saport", default=default_rpyc_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})") parser.add_argument( "-s", "--start_freq", type=float, required=True, help="Start frequency for the wideband sweep") parser.add_argument( "-e", "--stop_freq", type=float, required=True, help="Stop frequency for the wideband sweep") parser.add_argument( "-S", "--section_span", type=float, default=default_section_span, help=textwrap.dedent(f'''\ Frequency span of each section of the wideband sweep. Default: {default_section_span}''')) parser.add_argument( "-R", "--rbw", type=float, default=None, help="RBW of the sweeps (in Hz).") parser.add_argument( "-D", "--detector", choices=["NORMAL", "SAMPLE", "POS", "NEG"], default=default_detector_type, help=textwrap.dedent(f'''\ The detector type to use for the wideband sweep. Default: {default_detector_type}''')) parser.add_argument( "-T", "--sweep_type", choices=["LINEAR", "RANDOM"], default=default_sweep_type, help=textwrap.dedent(f'''\ The type of sweep for each section of the wideband sweep. Default: {default_sweep_type}''')) parser.add_argument( "-C", "--sweep_count", type=int, default=default_sweep_count, help=textwrap.dedent(f'''\ The number of sweeps to average for each section of the wideband sweep. Default: {default_sweep_count}''')) parser.add_argument( "-P", "--plot_file", default=default_plot_file, help=textwrap.dedent(f'''\ The file to write the plot of the wideband sweep. Default: {default_plot_file}''')) parser.add_argument( "-O", "--output_file", help="The file to write the wideband sweep data to.") args = parser.parse_args() if args.sweep_type == "RANDOM": sweep_type = RANDOM else: sweep_type = LINEAR sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app raw_freq, raw_pwr = wideband_sweep( sa_app, args.start_freq, args.stop_freq, args.section_span, args.rbw, detector=DETECTOR_TYPES[args.detector], sweep_type=sweep_type, sweep_count=args.sweep_count) freq, pwr = detector(DETECTOR_TYPES[args.detector], raw_freq, raw_pwr, WIDEBAND_SWEEP_BINS) if args.output_file is not None: pass plot(sa_app, args.start_freq, args.stop_freq, args.rbw, freq, pwr, args.plot_file) if __name__ == '__main__': main()
Measure phase noise
Figure 13: Example phase noise plot for a 1 GHz signal.
usage: pn.py [-h] [-a SAADDR] [-p SAPORT] [-C {1,2}] [-P PLOT_FILE] [-T PLOT_TITLE] [-R] carrier_freq Measure phase noise positional arguments: carrier_freq Frequency of the carrier tone (in MHz) optional arguments: -h, --help show this help message and exit -a SAADDR, --saaddr SAADDR IP address for the RPSA RPyC service (127.0.0.1) -p SAPORT, --saport SAPORT TCP port for the RPSA RPyC service (18865) -C {1,2}, --channel {1,2} Channel spectra for phase noise measurement (1). -P PLOT_FILE, --plot_file PLOT_FILE Write the phase noise plot to this file (pn.svg) -T PLOT_TITLE, --plot_title PLOT_TITLE Title to use for the phase noise plot -R, --resample Resample the phase noise plot.
import sys from math import log10, floor from dataclasses import dataclass from argparse import ArgumentParser, RawTextHelpFormatter import json import numpy as np import matplotlib import matplotlib.pyplot as plt matplotlib.use('Agg') from dyadic.splot import init_style import rpyc from sa.utils import ( FREQ_SUFFIXES, DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG ) RESAMPLED_SWEEP_BINS = 1000 # Phase noise sweeps are specified as tuples with the following values: # # span (MHz), # centre freq. offset (from carrier, MHz) # RBW (Hz) # Trace averages # Frequency of first sample bin (in MHz, offset from carrier) # Frequency of last sample bin (in MHz, offset from carrier) # # For example: (1.0, 0.45, 2000, 30, 0.25, 0.5) # Uses a spectrum analyzer frequency span of 1 MHz, places the # centre frequency of the analyzer as (carrier freq.) + 0.45 MHz, # sets the analyzer RBW to 2000 Hz, averages over 30 sweeps. # The samples extracted from the resulting spectrum are taken # from 0.25 to 0.5 MHz (offset from carrier frequency). # pn_sweeps = [ (2.0, 0.9, 2000, 30, 0.5, 1.5), (1.0, 0.45, 2000, 30, 0.25, 0.5), (0.5, 0.24, 1000, 30, 0.1, 0.25), (0.2, 0.095, 500, 30, 0.05, 0.1), (0.1, 0.045, 200, 20, 0.025, 0.05), (0.05, 0.022, 200, 20, 0.01, 0.025), (0.02, 0.008, 100, 20, 0.005, 0.01), (0.01, 0.004, 20, 20, 0.002, 0.005), (0.005, 0.002, 20, 20, 0.001, 0.002), (0.002, 0.0008, 10, 10, 0.0005, 0.001), (0.001, 0.0004, 5, 10, 0.00025, 0.0005), (0.0005, 0.0002, 5, 10, 0.0001, 0.00025) ] @dataclass class SweepParameters: span: float freq_offset: float rbw: float trace_avg: int sample_low: float sample_high: float def peak_detect(pwr, resampled_idx, window_size): """Find peaks within a spectrum prior to resampling. :param pwr: A numpy array containing the spectrum power values as computed by the :py:meth:`power_spectrum` function. :type pwr: nparray :param resampled_idx: Array indexes for the resampled spectrum :type resampled_idx: List[int] :param window_size: The window size for the resampled spectrum :type window_size: int :return: A numpy array of bool values. Each value corresponds to a value in the resampled spectrum and is True if the associated power value is a peak and False if the value is noise. """ NOISE_PERCENTILE = 50 STD_THRESHOLD = 4.0 MEAN_THRESHOLD = 4.0 def stats(arr): return np.mean(arr), np.std(arr) j = k = window_size//2 if window_size % 2: k += 1 means = [] stds = [] for i in resampled_idx: if i - j < 0: m, s = stats(pwr[0:k]) elif i + k >= len(pwr): m, s = stats(pwr[i-j:]) else: m, s = stats(pwr[i-j:i+k]) means.append(m) stds.append(s) baseline_mean = np.percentile(means, NOISE_PERCENTILE) baseline_std = np.percentile(stds, NOISE_PERCENTILE) bins = len(resampled_idx) if baseline_std == 0 or baseline_mean == 0: is_peak = np.ones(bins, dtype=bool) else: is_peak = np.where( ((stds/baseline_std > STD_THRESHOLD) & (means/baseline_mean > MEAN_THRESHOLD)), np.ones(bins, dtype=bool), np.zeros(bins, dtype=bool)) return is_peak def detector(detect_type, freq, pwr, bins): """Resample a power spectrum. :param detect_type: The detector type to use when resampling. One of DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_NORMAL. :type detect_type: int. :param freq: Spectrum frequency array :type freq: nparray :param pwr: Spectrum power array :type pwr: nparray :param bins: The number of bins required in the resampled spectrum. :type bins: int """ def max_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.max() def min_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return a.min() def sample_pwr(arr, idx): if idx - j < 0: a = arr[0:k] elif idx + k >= len(pwr): a = arr[idx-j:] else: a = arr[idx-j:idx+k] return np.take(a, a.size // 2) f_start = freq[0] f_stop = freq[-1] window_size = floor(len(freq)/bins) j = k = window_size//2 if window_size % 2: k += 1 resampled_freqs = np.linspace(f_start, f_stop, bins) indexes = [(np.abs(freq-f)).argmin() for f in resampled_freqs] resampled_pwr = [] if detect_type == DETECTOR_TYPE_POS: for i in indexes: resampled_pwr.append(max_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NEG: for i in indexes: resampled_pwr.append(min_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_SAMPLE: for i in indexes: resampled_pwr.append(sample_pwr(pwr, i)) elif detect_type == DETECTOR_TYPE_NORMAL: is_peak = peak_detect(pwr, indexes, window_size) for i in range(bins): if is_peak[i]: resampled_pwr.append(max_pwr(pwr, indexes[i])) else: if i % 2: # Odd resampled_pwr.append(min_pwr(pwr, indexes[i])) else: resampled_pwr.append(max_pwr(pwr, indexes[i])) return (resampled_freqs, resampled_pwr) def find_nearest_index(array, value): return (np.abs(array - value)).argmin() def initialize(sa_app, sa_chan): sa_app.channel = sa_chan sa_app.fe_atten = 0.0 sa_app.ampl_scale = 12.0 sa_app.detector_type_pos() sa_app.auto_rbw = False sa_app.marker_normal() def sweep(sa_app, f_centre, f_span, rbw, sweep_count=20): sa_app.freq_span = f_span sa_app.centre_freq = f_centre sa_app.rbw = rbw sa_app.trace_type_normal() sa_app.sweep() if sweep_count > 1: sa_app.trace_type_avg() sa_app.sweeps(sweep_count) sa_app.peak_search() return sa_app.marker_posn def measure_pn(sa_app, peak_pwr, rbw, f_low, f_high): data = sa_app.data_store() freq_arr = np.array(data.freq) pwr_arr = np.array(data.pwr) pn_arr = (pwr_arr - peak_pwr) - 10*log10(rbw) idx_low = find_nearest_index(freq_arr, f_low) idx_high = find_nearest_index(freq_arr, f_high) return freq_arr[idx_low:idx_high], pn_arr[idx_low:idx_high] def create_fig(title): fig = plt.figure(figsize=(6.0, 4.0), dpi=72) ax = fig.add_subplot(111) if len(title): _ = ax.set_title(title) _ = ax.set_xlim(100, 2e6) _ = ax.set_xscale('log') _ = ax.set_xticks([100, 200, 500, 1e3, 2e3, 5e3, 1e4, 2e4, 5e4, 1e5, 2e5, 5e5, 1e6, 2e6]) _ = ax.set_xticklabels(['100', '', '', '1K', '', '', '10K', '', '', '100K', '', '', '1M', '']) _ = ax.set_xlabel('Frequency (Hz)') _ = ax.set_ylim(-160, -60) _ = ax.set_yticks(np.linspace(-160, -60, 11)) _ = ax.set_yticklabels(['', '-150', '-140', '-130', '-120', '-110', '-100', '-90', '-80', '-70', '-60']) _ = ax.set_ylabel('Phase Noise (dBc/Hz)') ax.grid(linestyle=':') ax.grid(which='both', axis='x', linestyle=':') return fig, ax def main(): default_rpyc_addr = "127.0.0.1" default_rpyc_port = 18865 default_sa_chan = "1" default_plotfile = "pn.svg" default_savefile = "pn.json" parser = ArgumentParser(description= "Measure phase noise", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpyc_addr, help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})") parser.add_argument( "-p", "--saport", default=default_rpyc_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})") parser.add_argument( "carrier_freq", help="Frequency of the carrier tone (in MHz)") parser.add_argument( "-C", "--channel", choices=["1", "2"], default=default_sa_chan, help=f"Channel spectra for phase noise measurement ({default_sa_chan}).") parser.add_argument( "-P", "--plot_file", default=default_plotfile, help=f"Write the phase noise plot to this file ({default_plotfile})") parser.add_argument( "-S", "--save_file", default=default_savefile, help=f"Write phase noise data to this file ({default_savefile})") parser.add_argument( "-T", "--plot_title", help=f"Title to use for the phase noise plot") parser.add_argument( "-R", "--resample", action='store_true', help="Resample the phase noise plot.") args = parser.parse_args() sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app sa_chan = int(args.channel) - 1 freq_arr = np.array([]) pn_arr = np.array([]) initialize(sa_app, sa_chan) f_sig, p_sig = sweep(sa_app, float(args.carrier_freq), 2.0, 5000, 1) for pn_sweep in pn_sweeps: sweep_params = SweepParameters(*pn_sweep) f_sig, p_sig = sweep(sa_app, f_sig + sweep_params.freq_offset, sweep_params.span, sweep_params.rbw, sweep_params.trace_avg) sample_low = f_sig + sweep_params.sample_low sample_high = f_sig + sweep_params.sample_high f_arr, p_arr = measure_pn(sa_app, p_sig, sweep_params.rbw, sample_low, sample_high) freq_arr = np.insert(freq_arr, 0, f_arr) pn_arr = np.insert(pn_arr, 0, p_arr) # Convert the frequencies to offsets from the carrier. # Note that we use the last measured value of the carrier # frequency peak freq_arr -= f_sig if args.resample is True: freq_arr, pn_arr = detector(DETECTOR_TYPE_NORMAL, freq_arr, pn_arr, RESAMPLED_SWEEP_BINS) with open(args.save_file, 'w') as fd: json.dump({'freq': list(freq_arr), 'pn': list(pn_arr)}, fd) fig, ax = create_fig(args.plot_title) _ = ax.plot(freq_arr * 1e6, pn_arr) fig.tight_layout() fig.savefig(args.plot_file) if __name__ == '__main__': main()
Measure IM3 and IP3
This makes use of the Simple RF Signal Generator and the Low Spurious RF Signal Generator to produce both the two tone test signal and the mixer LO signal.
An external resistive bridge coupler is used to combine tones from channel 1 of the RFGen and the low spurious signal generator. Two generators are used in this configuration in order to keep channel cross talk to a minimum.
LO signal power is taken from channel 2 of the RFGen signal generator with the signal power being measured from the channel 2 monitor port.
Note that when using a down converter RF front end for the spectrum analyzer,
mixer spurs and images may become a problem. In order to manage this the
--im3_lo_offset
command line parameter is provided in order to shift
any such spurious mixer products away from the signal of interest. The
same thing can be done for the fundamental tones by using the
--lo_offset
command line parameter.
Figure 14a: Test setup for measuring mixer down conversion IM3 and IP3.
Figure 14b: Test setup for measuring mixer up conversion IM3 and IP3.
usage: ip3.py [-h] [-a SAADDR] [-p SAPORT] [-C {1,2}] [--fe_atten FE_ATTEN] [--im3_span IM3_SPAN] [--tone_span TONE_SPAN] [--lo_freq LO_FREQ] [--lo_offset LO_OFFSET] [--im3_lo_offset IM3_LO_OFFSET] lower_tone upper_tone Measure intermodulation distortion and IP3 positional arguments: lower_tone Frequency of the lower fundamental tone (in MHz) upper_tone Frequency of the upper fundamental tone (in MHz) optional arguments: -h, --help show this help message and exit -a SAADDR, --saaddr SAADDR IP address for the RPSA RPyC service (127.0.0.1) -p SAPORT, --saport SAPORT TCP port for the RPSA RPyC service (18865) -C {1,2}, --channel {1,2} The target channel for the measurement. --fe_atten FE_ATTEN Front end attenuation to use ({default_fe_atten} dB) --im3_span IM3_SPAN SA frequency span when measuring IM3 amplitudes (in MHz) --tone_span TONE_SPAN SA frequency span when measuring tone amplitudes (in MHz) --lo_freq LO_FREQ LO frequency (in MHz). Used to adjust reported frequencies. Positive LO frequencies imply low side injection. Negative LO frequencies imply high side injection --lo_offset LO_OFFSET SA LO offset (in MHz) when using a down converter RF frontend --im3_lo_offset IM3_LO_OFFSET SA LO offset (in MHz) when measuring IM3 amplitudes using a down converter RF frontend
Code for the ip3.py
script:
import sys import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import rpyc AVERAGING_SWEEPS = 10 def save_settings(sa_app): settings = {} settings['centre_freq'] = sa_app.centre_freq settings['start_freq'] = sa_app.start_freq settings['stop_freq'] = sa_app.stop_freq settings['freq_span'] = sa_app.freq_span settings['ref_level'] = sa_app.ref_level settings['ampl_scale'] = sa_app.ampl_scale settings['atten'] = sa_app.atten settings['auto_rbw'] = sa_app.auto_rbw settings['rbw'] = sa_app.rbw settings['detector_type'] = sa_app.detector_type settings['trace_type'] = sa_app.trace_type if sa_app.chan_type_is_direct is False: settings['fe_atten'] = sa_app.fe_atten return settings def restore_settings(sa_app, settings): sa_app.centre_freq = settings['centre_freq'] sa_app.start_freq = settings['start_freq'] sa_app.stop_freq = settings['stop_freq'] sa_app.freq_span = settings['freq_span'] sa_app.ref_level = settings['ref_level'] sa_app.ampl_scale = settings['ampl_scale'] sa_app.atten = settings['atten'] sa_app.auto_rbw = settings['auto_rbw'] sa_app.rbw = settings['rbw'] sa_app.detector_type = settings['detector_type'] sa_app.trace_type = settings['trace_type'] if sa_app.chan_type_is_direct is False: sa_app.fe_atten = settings['fe_atten'] def sweep(sa_service, sa_app, sweep_count=AVERAGING_SWEEPS): rpyc_timeout = sa_service._config['sync_request_timeout'] sa_service._config['sync_request_timeout'] = None sa_app.sweeps(sweep_count) sa_service._config['sync_request_timeout'] = rpyc_timeout def measure_tone(sa_service, sa_app, freq, span, sweep_count=AVERAGING_SWEEPS): sa_app.centre_freq = freq sa_app.freq_span = span sa_app.sweep() sa_app.trace_type_avg() sweep(sa_service, sa_app, sweep_count) sa_app.marker_normal() sa_app.peak_search() tone_freq, tone_pwr = sa_app.marker_posn return tone_freq, tone_pwr def measure_ip3(sa_service, sa_app, lower: float, upper: float, fe_atten=0.0, rbw=None, tone_span=None, im3_span=None, lo_offset=None, im3_lo_offset=None, lo_freq=None) -> str: LOWER = 0 UPPER = 1 PWR = 1 FREQ = 0 tone_separation = upper - lower lower_im3 = lower - tone_separation upper_im3 = upper + tone_separation if rbw is None: sa_app.auto_rbw = True else: sa_app.rbw = rbw if sa_app.chan_type_is_direct is False: sa_app.fe_atten = fe_atten if lo_offset is not None: sa_app.felo_offset = lo_offset if tone_span is None: tone_span = tone_separation tone1_freq, tone1_pwr = measure_tone(sa_service, sa_app, upper, tone_span, sweep_count=5) tone2_freq, tone2_pwr = measure_tone(sa_service, sa_app, lower, tone_span, sweep_count=5) if tone1_freq < tone2_freq: fundamentals = [(tone1_freq, tone1_pwr), (tone2_freq, tone2_pwr)] else: fundamentals = [(tone2_freq, tone2_pwr), (tone1_freq, tone1_pwr)] if im3_span is None: im3_span = tone_separation / 2 sa_app.freq_span = im3_span sa_app.centre_freq = lower_im3 if im3_lo_offset is not None: sa_app.felo_offset = im3_lo_offset sa_app.sweeps(1) sweep(sa_service, sa_app) sa_app.peak_search() im3_freq, im3_pwr = sa_app.marker_posn im3 = [(im3_freq, im3_pwr)] sa_app.centre_freq = upper_im3 if im3_lo_offset is not None: sa_app.felo_offset = im3_lo_offset sa_app.sweeps(1) sweep(sa_service, sa_app) sa_app.peak_search() im3_freq, im3_pwr = sa_app.marker_posn im3.append((im3_freq, im3_pwr)) ip3 = [ fundamentals[LOWER][PWR] + ((fundamentals[UPPER][PWR] - im3[LOWER][PWR]) / 2), fundamentals[UPPER][PWR] + ((fundamentals[LOWER][PWR] - im3[UPPER][PWR]) / 2) ] rf1 = fundamentals[LOWER][FREQ] rf2 = fundamentals[UPPER][FREQ] im3_f1 = im3[LOWER][FREQ] im3_f2 = im3[UPPER][FREQ] if lo_freq is not None: if lo_freq > 0.0: rf1 += lo_freq rf2 += lo_freq im3_f1 += lo_freq im3_f2 += lo_freq else: rf1 = (-lo_freq) - rf1 rf2 = (-lo_freq) - rf2 im3_f1 = (-lo_freq) - im3_f1 im3_f2 = (-lo_freq) - im3_f2 return f"{rf1:.2f}:{fundamentals[LOWER][PWR]:.2f} {rf2:.2f}:{fundamentals[UPPER][PWR]:.2f} {im3_f1:.2f}:{im3[LOWER][PWR]:.2f} {im3_f2:.2f}:{im3[UPPER][PWR]:.2f} {ip3[LOWER]:.2f} {ip3[UPPER]:.2f} {fe_atten:.1f}" def main(): default_rpyc_addr = "127.0.0.1" default_rpyc_port = 18865 default_fe_atten = 0.0 parser = ArgumentParser(description= "Measure intermodulation distortion and IP3", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpyc_addr, help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})") parser.add_argument( "-p", "--saport", default=default_rpyc_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})") parser.add_argument( "-C", "--channel", choices=["1", "2"], help=textwrap.dedent('''\ The target channel for the measurement. ''')) parser.add_argument( "lower_tone", help="Frequency of the lower fundamental tone (in MHz)") parser.add_argument( "upper_tone", help="Frequency of the upper fundamental tone (in MHz)") parser.add_argument( "--fe_atten", type=float, default=default_fe_atten, help="Front end attenuation to use ({default_fe_atten} dB)") parser.add_argument( "--im3_span", type=float, help="SA frequency span when measuring IM3 amplitudes (in MHz)") parser.add_argument( "--tone_span", type=float, help="SA frequency span when measuring tone amplitudes (in MHz)") parser.add_argument( "--lo_freq", type=float, help=textwrap.dedent('''\ LO frequency (in MHz). Used to adjust reported frequencies. Positive LO frequencies imply low side injection. Negative LO frequencies imply high side injection''')) parser.add_argument( "--lo_offset", type=float, help=textwrap.dedent('''\ SA LO offset (in MHz) when using a down converter RF frontend''')) parser.add_argument( "--im3_lo_offset", type=float, help=textwrap.dedent('''\ SA LO offset (in MHz) when measuring IM3 amplitudes using a down converter RF frontend''')) args = parser.parse_args() chan = 0 if args.channel is not None: chan = int(args.channel) - 1 sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app sa_app.channel = chan settings = save_settings(sa_app) im3_str = measure_ip3( sa, sa_app, float(args.lower_tone), float(args.upper_tone), tone_span=args.tone_span, im3_span=args.im3_span, fe_atten=args.fe_atten, lo_offset=args.lo_offset, im3_lo_offset=args.im3_lo_offset, lo_freq=args.lo_freq) print(f"{im3_str}") restore_settings(sa_app, settings) sa_app.sweep() if __name__ == '__main__': main()
Functions from ip3.py
and rfgen.py
(from Measure/set signal generator output level) can be used in a script to characterise mixer IP3/IM3 over a
range of frequencies.
The ip3-downconv.py
is intended to characterise IP3/IM3 for mixer down
conversion. See Figure 14a for the test setup. Note that the single
conversion down converter is configured to make use of one channel only:
The IF output is base band and is connected directly to the RPSA.
import time import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import numpy as np import rpyc from tam import ( SIGGEN_LIMITS, SMHU58_GPIBID, InstrumentMgr, InstrumentInitializeException, UnknownInstrumentModelException ) from ip3 import measure_ip3 from rfgen import measure_or_set_signal_level TONE_SEPARATION = 2.0 def set_rf(rf_chan, f_rf, rf_pwr): rf_chan.plo_ctl.freq = float(f_rf) rf_chan.level = rf_pwr lock_status = rf_chan.configure() return lock_status def read_ip3_settings(settings_file): with open(settings_file) as fd: lines = fd.readlines() settings = {} for line in lines: l = line.strip() if len(l) == 0 or l[0] == '#': continue tone_freq_str, tone1_pwr, tone2_pwr, lo_freq, lo_pwrs = line.split() lo_pwr_range = [float(l) for l in lo_pwrs.split(',')] if len(lo_pwr_range) == 1: lo_pwr_array = np.array(lo_pwr_range) else: lo_pwr_range[-1] += 1.0 lo_pwr_array = np.arange(*lo_pwr_range, 1.0) tone_freq = float(tone_freq_str) settings[tone_freq_str] = ((tone_freq, float(tone1_pwr)), (tone_freq+TONE_SEPARATION, float(tone2_pwr)), float(lo_freq), lo_pwr_array) return settings def main(): default_im3_span = 0.1 default_lo_offset = 12.5 default_im3_lo_offset = 15.5 parser = ArgumentParser(description= "Measure mixer IMD and IP3", formatter_class=RawTextHelpFormatter) parser.add_argument( "--smhu_lo", action='store_true', help="Use SMHU signal gen. to produce the LO signal" ) parser.add_argument( "--im3_span", type=float, default=default_im3_span, help=f"SA frequency span when measuring IM3 amplitudes (Default: {default_im3_span} in MHz)") parser.add_argument( "--lo_offset", type=float, help=textwrap.dedent(f'''\ SA LO offset (in MHz) when using a down converter RF frontend (Default: {default_lo_offset})''')) parser.add_argument( "--im3_lo_offset", type=float, help=textwrap.dedent(f'''\ SA LO offset (in MHz) when measuring IM3 amplitudes using a down converter RF frontend. (Default: {default_im3_lo_offset})''')) args = parser.parse_args() sa = rpyc.connect("127.0.0.1", 18865) sa_app = sa.root.sa_app rfgen = rpyc.connect('127.0.0.1', 18864) lowspur_rfgen = rpyc.connect('127.0.0.1', 18867) rfgen_app = rfgen.root rfgen_chan = rfgen_app.channels['Chan 1'] sa_tone_chan = 1 rfgen_lo_chan = rfgen_app.channels['Chan 2'] sa_lo_chan = 0 lowspur_chan = lowspur_rfgen.root.plo_chan if args.smhu_lo is True: smhu_mgr = InstrumentMgr() smhu = smhu_mgr.open_instrument('SMHU58', SMHU58_GPIBID) settings = read_ip3_settings('ip3-downconv-settings.txt') for tone_str in ['850']: tone1, tone2, lo_freq, lo_pwrs = settings[tone_str] lock_status = set_rf(rfgen_chan, tone1[0], tone1[1]) lock_status = set_rf(lowspur_chan, tone2[0], tone2[1]) for lo_pwr in lo_pwrs: if args.smhu_lo is True: smhu.output = True smhu.freq = float(lo_freq) smhu.level = float(lo_pwr) time.sleep(0.25) freq, pwr = lo_freq, lo_pwr else: _, lo_pwr = measure_or_set_signal_level( sa, sa_app, rfgen_lo_chan, sa_lo_chan, lo_freq, float(lo_pwr), 0.1, 5) sa_app.channel = sa_tone_chan # # Note: currently set up to measure IP3 for down conversion # to base band so lower and upper tones are 30.0 and 32.0 MHz # im3_str = measure_ip3(sa, sa_app, 30.0, 32.0, tone_span=None, im3_span=args.im3_span, fe_atten=15.0, lo_offset=args.lo_offset, im3_lo_offset=args.im3_lo_offset, lo_freq=lo_freq) print(f"{im3_str} {lo_pwr:.2f}") print("----------") if args.smhu_lo is True: smhu_mgr.close_instrument(smhu) smhu_mgr.close() if __name__ == '__main__': main()
The ip3-upconv.py
is intended to characterise IP3/IM3 for mixer up
conversion. See Figure 14b for the test setup. In this case the single
conversion down converter front end is configured for both channels,
channel 1 for the LO signal, and channel 2 for the upconverted RF signal.
import time import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import numpy as np import rpyc from tam import ( SIGGEN_LIMITS, SMHU58_GPIBID, InstrumentMgr, InstrumentInitializeException, UnknownInstrumentModelException ) from ip3 import measure_ip3 from rfgen import measure_or_set_signal_level TONE_SEPARATION = 2.0 def read_ip3_settings(settings_file, tone_offset): with open(settings_file) as fd: lines = fd.readlines() settings = {} for line in lines: l = line.strip() if len(l) == 0 or l[0] == '#': continue tone_freq_str, lo_freq, lo_pwrs = line.split() lo_pwr_range = [float(l) for l in lo_pwrs.split(',')] if len(lo_pwr_range) == 1: lo_pwr_array = np.array(lo_pwr_range) else: lo_pwr_range[-1] += 1.0 lo_pwr_array = np.arange(*lo_pwr_range, 1.0) tone_freq = float(tone_freq_str) + tone_offset settings[tone_freq_str] = (tone_freq, tone_freq+TONE_SEPARATION, float(lo_freq), lo_pwr_array) return settings def main(): default_tone_offset = 0.0 parser = ArgumentParser(description= "Measure mixer IMD and IP3", formatter_class=RawTextHelpFormatter) parser.add_argument( "--smhu_lo", action='store_true', help="Use SMHU signal gen. to produce the LO signal" ) parser.add_argument( "--tone_offset", type=float, default=default_tone_offset, help="Offset for the specified tone frequency (in MHz)" ) args = parser.parse_args() sa = rpyc.connect("127.0.0.1", 18865) sa_app = sa.root.sa_app if args.smhu_lo is False: rfgen = rpyc.connect('127.0.0.1', 18864) rfgen_app = rfgen.root rfgen_lo_chan = rfgen_app.channels['Chan 2'] ddsgen = rpyc.connect('127.0.0.1', 18862) ddsgen_app = ddsgen.root sa_lo_chan = 1 sa_rf_chan = 0 if args.smhu_lo is True: smhu_mgr = InstrumentMgr() smhu = smhu_mgr.open_instrument('SMHU58', SMHU58_GPIBID) settings = read_ip3_settings('ip3-upconv-settings.txt', args.tone_offset) for tone_str in ['200', '210', '220', '230', '240', '250', '260', '270', '280', '290']: lower_tone, upper_tone, lo_freq, lo_pwrs = settings[tone_str] for lo_pwr in lo_pwrs: if args.smhu_lo is True: smhu.output = True smhu.freq = float(lo_freq) smhu.level = float(lo_pwr) time.sleep(0.25) freq, pwr = lo_freq, lo_pwr else: freq, pwr = measure_or_set_signal_level( sa, sa_app, rfgen_lo_chan, sa_lo_chan, lo_freq, float(lo_pwr), 0.1, 5) sa_app.channel = sa_rf_chan im3_str = measure_ip3( sa, sa_app, lower_tone, upper_tone, tone_span=None, im3_span=0.05, fe_atten=15.0, lo_offset=12.5, im3_lo_offset=15.5) print(f"{im3_str} {pwr:.2f}") print("----------") if args.smhu_lo is True: smhu_mgr.close_instrument(smhu) smhu_mgr.close() if __name__ == '__main__': main()
Measure mixer insertion loss
Test setup for measuring mixer insertion loss.
A single channel of the spectrum analyzer down converter frontend is used with the RP spectrum analyzer to measure the RF signal power. The mixer IF signal power is measured directly by the spectrum analyzer. The down converter frontend software is therefore configured to match this set up:
usage: mixer_il.py [-h] [-a SAADDR] [-p SAPORT] [-A RFGEN_ADDR] [-P RFGEN_PORT] [-R RF_FREQ RF_FREQ RF_FREQ] [-I IF_FREQ] [-L LO_PWR] Measure mixer insertion loss or feedthrough optional arguments: -h, --help show this help message and exit -a SAADDR, --saaddr SAADDR IP address for the RPSA RPyC service (127.0.0.1) -p SAPORT, --saport SAPORT TCP port for the RPSA RPyC service (18865) -A RFGEN_ADDR, --rfgen_addr RFGEN_ADDR IP address for the RFGen RPyC service (127.0.0.1) -P RFGEN_PORT, --rfgen_port RFGEN_PORT TCP port for the RFGen RPyC service (18864) -R RF_FREQ RF_FREQ RF_FREQ, --rf_freq RF_FREQ RF_FREQ RF_FREQ Mixer RF freq. range to cover. This is a triplet of float numbers in the format: <start> <stop> <step>. These values are passed to numpy.arange in order to generate an array of RF freq. values. Default: [200.0, 3950.0, 100.0] -I IF_FREQ, --if_freq IF_FREQ The Mixer IF freq. to use (30.0) -L LO_PWR, --lo_pwr LO_PWR The LO power level to use, in dBm. (0.0)
import os import sys import time import rpyc import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import numpy as np from rfblocks import BridgeCoupler from tam import ( SIGGEN_LIMITS, SMHU58_GPIBID, InstrumentMgr, InstrumentInitializeException, UnknownInstrumentModelException ) def read_loss_settings(settings_file): with open(settings_file) as fd: lines = fd.readlines() settings = {} for line in lines: l = line.strip() if len(l) == 0 or l[0] == '#': continue tone_freq_str, lo_freq, lo_pwrs = line.split() lo_pwr_range = [float(l) for l in lo_pwrs.split(',')] if len(lo_pwr_range) == 1: lo_pwr_array = np.array(lo_pwr_range) else: lo_pwr_range[-1] += 1.0 lo_pwr_array = np.arange(*lo_pwr_range, 1.0) tone_freq = float(tone_freq_str) settings[tone_freq_str] = (tone_freq, float(lo_freq), lo_pwr_array) return settings def measure_rf(sa_app, rf_sa_chan, rf_chan, f_rf): sa_app.channel = rf_sa_chan sa_app.centre_freq = f_rf sa_app.freq_span = 0.5 sa_app.trace_type_normal() sa_app.sweep() sa_app.trace_type_avg() sa_app.sweeps(5) sa_app.marker_normal() sa_app.peak_search() mon_freq, mon_pwr = sa_app.marker_posn actual_rf_pwr = mon_pwr - rf_chan.coupler.coupling(mon_freq) \ + rf_chan.coupler.insertion_loss(mon_freq) return actual_rf_pwr def measure_if(sa_app, if_sa_chan, freq_if): sa_app.channel = if_sa_chan sa_app.centre_freq = freq_if sa_app.freq_span = 0.5 sa_app.trace_type_normal() sa_app.sweep() sa_app.trace_type_avg() sa_app.sweeps(5) sa_app.marker_normal() sa_app.peak_search() if_freq, if_pwr = sa_app.marker_posn return if_pwr def set_lo(lo_chan, f_lo, lo_pwr): """Set the frequency and level of the LO source channel. """ lo_chan.plo_ctl.freq = float(f_lo) lo_chan.level = float(lo_pwr) lock_status = lo_chan.configure() return lock_status def set_rf(rf_chan, f_rf, rf_pwr): """Set the frequency and level of the RF source channel. """ rf_chan.plo_ctl.freq = float(f_rf) rf_chan.level = float(rf_pwr) lock_status = rf_chan.configure() return lock_status def measure_insertion_loss(sa_app, rf_chan, rf_sa_chan, if_sa_chan, freq_rf, freq_if): """Measure mixer insertion loss for one specific RF frequency :param rf_chan: RFGen source channel for the RF signal :type rf_chan: PLOChan :param rf_sa_chan: The RPSA channel used to measure the RF monitor port signal. :type rf_sa_chan: int :param if_sa_chan: The RPSA channel used to measure the mixer IF signal. :type if_sa_chan: int """ actual_rf_pwr = measure_rf(sa_app, rf_sa_chan, rf_chan, freq_rf) if_pwr = measure_if(sa_app, if_sa_chan, abs(freq_if)) mixer_gain = if_pwr - actual_rf_pwr return mixer_gain def main(): default_rpsa_addr = "127.0.0.1" default_rpsa_port = 18865 default_rfgen_addr = "127.0.0.1" default_rfgen_port = 18864 default_rf_freq = [200.0, 3950.0, 100.0] default_if_freq = 30.0 default_lo_pwr = 1.0 default_cal_file = f"{os.environ['RFBLOCKS_REFDESIGNS']}/PowerMeter/app/cal-files/24dB-atten-48-0007-Det3-100-6000MHz.json" parser = ArgumentParser(description= "Measure mixer insertion loss or feedthrough", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpsa_addr, help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})") parser.add_argument( "-p", "--saport", default=default_rpsa_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})") parser.add_argument( "-A", "--rfgen_addr", default=default_rfgen_addr, help=f"IP address for the RFGen RPyC service ({default_rfgen_addr})") parser.add_argument( "-P", "--rfgen_port", default=default_rfgen_port, type=int, help=f"TCP port for the RFGen RPyC service ({default_rfgen_port})") parser.add_argument( "-R", "--rf_freq", nargs=3, type=float, default=default_rf_freq, help=textwrap.dedent(f'''\ Mixer RF freq. range to cover. This is a triplet of float numbers in the format: <start> <stop> <step>. These values are passed to numpy.arange in order to generate an array of RF freq. values. Default: {default_rf_freq}''')) parser.add_argument( "-I", "--if_freq", type=float, default=default_if_freq, help=f"The Mixer IF freq. to use ({default_if_freq})") parser.add_argument( "-L", "--lo_pwr", type=float, default=default_lo_pwr, help=f"The LO power level to use, in dBm. ({default_lo_pwr})") parser.add_argument( "--upconv", action='store_true', help="Measurements using the up conversion test setup" ) parser.add_argument( "--smhu_lo", action='store_true', help="Use SMHU signal gen. to produce the LO signal" ) args = parser.parse_args() sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app rfgen = rpyc.connect(args.rfgen_addr, args.rfgen_port) rfgen_app = rfgen.root rfgen_app.initialize() rf_pwr = -10.0 lo_chan_id = 'Chan 1' rf_chan_id = 'Chan 2' lo_chan = rfgen_app.channels[lo_chan_id] rf_chan = rfgen_app.channels[rf_chan_id] rf_sa_chan = 0 # RPSA 'Chan 1' if_sa_chan = 0 # RPSA 'Chan 2' if args.smhu_lo is True: smhu_mgr = InstrumentMgr() smhu = smhu_mgr.open_instrument('SMHU58', SMHU58_GPIBID) settings = read_loss_settings('loss-settings.txt') for tone_str in ['3000', '3049', '3105', '3150', '3200', '3255', '3300', '3355', '3400', '3450', '3500', '3550', '3600', '3650', '3700', '3755', '3800', '3850', '3900', '3950']: # '3105', '3155', '3200', '3250', '3300', '3350', '3400', # '3450', '3500', '3550', '3600', '3650', '3700', '3750', # '3800', '3850', '3900', '3950']: # '1000', '1050', '1100', '1150', '1200', '1250', '1300', '1350', # '1400', '1450', '1500', '1550', '1600', '1650', '1700', '1750', # '1800', '1850', '1900', '1950', '2000']: # '2050', '2100', '2150', '2200', '2250', '2300', '2350', '2400', # '2450', '2500', '2550', '2600', '2650', '2700', '2750', '2800', # '2850', '2900', '2950', '3000']: # '1000', '1050', '1100', '1150', '1200', '1250', '1300', '1350', # '1400', '1450', '1500', '1550', '1600', '1650', '1700', '1750', # '1800', '1850', '1900', '1950', '2000']: # '200', '210', '220', '230', '240', '250', '260', '270', # '280', '290', '300', '350', '400', '450', '500', '550', # '600', '650', '700', '750', '800', '850', '900', '950', # '2900', '2950', '3000', # '3105', '3155', '3200', '3250', '3300', '3350', '3400', # '3450', '3500', '3550', '3600', '3650', '3700', '3750', # '3800', '3850', '3900', '3950']: rf_freq, lo_freq, lo_pwrs = settings[tone_str] if_freq = abs(rf_freq - lo_freq) if args.upconv: for lo_pwr in lo_pwrs: if args.smhu_lo is True: smhu.output = True smhu.freq = float(lo_freq) smhu.level = float(lo_pwr) time.sleep(0.25) freq, pwr = lo_freq, lo_pwr else: if set_lo(lo_chan, lo_freq, lo_pwr) is not True: print(f"Warning: Can't lock LO source for freq {lo_freq:.2f}") if_pwr = measure_if(sa_app, if_sa_chan, rf_freq) mixer_gain = if_pwr - rf_pwr print(f"{rf_freq:.2f}:{mixer_gain:.2f} {lo_pwr:.2f}") else: if set_rf(rf_chan, rf_freq, rf_pwr) is not True: print(f"Warning: Can't lock RF source for freq {rf_freq:.2f}") for lo_pwr in lo_pwrs: if args.smhu_lo is True: smhu.output = True smhu.freq = float(lo_freq) smhu.level = float(lo_pwr) time.sleep(0.25) freq, pwr = lo_freq, lo_pwr else: if set_lo(lo_chan, lo_freq, lo_pwr) is not True: print(f"Warning: Can't lock LO source for freq {lo_freq:.2f}") mixer_gain = measure_insertion_loss( sa_app, rf_chan, rf_sa_chan, if_sa_chan, rf_freq, if_freq) print(f"{rf_freq:.2f}:{mixer_gain:.2f} {lo_pwr:.2f}") print("----------") if args.smhu_lo is True: smhu_mgr.close_instrument(smhu) smhu_mgr.close() if __name__ == '__main__': main()
Measure mixer feedthrough
By default the test hardware is assumed to be set up as follows:
LO signal is taken from channel 2 of the RFGen signal generator.
Mixer RF (or IF) output is connected to the channel 1 RF input of the direct down converter.
RFGen channel 2 monitor output (the LO signal monitoring port) is connected to the channel 2 RF input of the direct down converter.
Direct down converter channel 1 IF output is connected to ADC1 input of the dual DAC/ADC.
Direct down converter channel 2 IF output is connected to ADC2 input of the dual DAC/ADC.
import sys import rpyc import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import numpy as np from rfblocks import BridgeCoupler def read_lo_settings(lo_settings_file): with open(lo_settings_file) as fd: lines = fd.readlines() settings = {} for line in lines: l = line.strip() if len(l) == 0 or l[0] == '#': continue lo_freq, lo_pwr = line.split() settings[lo_freq] = float(lo_pwr) return settings def measure_rf(sa_app, rf_sa_chan, f_rf): sa_app.channel = rf_sa_chan sa_app.centre_freq = f_rf sa_app.freq_span = 1.0 sa_app.trace_type_normal() sa_app.sweep() sa_app.trace_type_avg() sa_app.sweeps(5) sa_app.marker_normal() sa_app.peak_search() rf_freq, rf_pwr = sa_app.marker_posn return rf_pwr def measure_lo(sa_app, lo_sa_chan, coupler, freq_lo): sa_app.channel = lo_sa_chan sa_app.centre_freq = freq_lo sa_app.freq_span = 1.0 sa_app.trace_type_normal() sa_app.sweep() sa_app.trace_type_avg() sa_app.sweeps(5) sa_app.marker_normal() sa_app.peak_search() lo_freq, lo_pwr = sa_app.marker_posn actual_lo_pwr = lo_pwr - coupler.coupling(lo_freq) \ + coupler.insertion_loss(lo_freq) return actual_lo_pwr def set_lo(lo_chan, f_lo, lo_pwr): lo_chan.plo_ctl.freq = float(f_lo) lo_chan.level = lo_pwr lock_status = lo_chan.configure() return lock_status def measure_feedthrough(sa_app, rfgen_app, lo_settings_file): lo_chan = rfgen_app.channels['Chan 2'] rf_sa_chan = 0 lo_sa_chan = 1 coupler = BridgeCoupler() lo_settings = read_lo_settings(lo_settings_file) for lo_freq_str, lo_pwr in lo_settings.items(): lo_freq = float(lo_freq_str) lock_status = set_lo(lo_chan, lo_freq, lo_pwr) actual_lo_pwr = measure_lo(sa_app, lo_sa_chan, coupler, lo_freq) actual_rf_pw = measure_rf(sa_app, rf_sa_chan, lo_freq) feedthru = abs(actual_rf_pw) + actual_lo_pwr print(f"{lo_freq_str} {feedthru:.2f} {actual_lo_pwr:.2f}") def main(): default_rpsa_addr = "127.0.0.1" default_rpsa_port = 18865 default_rfgen_addr = "127.0.0.1" default_rfgen_port = 18864 default_lo_settings = 'lo_settings.txt' parser = ArgumentParser(description= "Measure mixer feedthrough", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpsa_addr, help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})") parser.add_argument( "-p", "--saport", default=default_rpsa_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})") parser.add_argument( "-A", "--rfgen_addr", default=default_rfgen_addr, help=f"IP address for the RFGen RPyC service ({default_rfgen_addr})") parser.add_argument( "-P", "--rfgen_port", default=default_rfgen_port, type=int, help=f"TCP port for the RFGen RPyC service ({default_rfgen_port})") parser.add_argument( "-L", "--lo_settings", default=default_lo_settings, help=textwrap.dedent(f'''\ A file containing the settings for the LO signal. The file must have pairs of values on each line with the first being the LO freq. (in MHz) and the second being the LO power (in dBm). Default: {default_lo_settings}''')) args = parser.parse_args() sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app rfgen = rpyc.connect(args.rfgen_addr, args.rfgen_port) rfgen_app = rfgen.root measure_feedthrough(sa_app, rfgen_app, args.lo_settings) if __name__ == '__main__': main()
Measure mixer modulation products
import sys import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import rpyc FREQ_DIFF = 0.02 def freq_close_to(f1, f2): """Returns True if f1 and f2 are within FREQ_DIFF Mhz. """ if abs(f1-f2) < FREQ_DIFF: return True else: return False def measure_power(sa_app, sa_chan, freq): sa_app.channel = sa_chan sa_app.centre_freq = freq sa_app.freq_span = 0.5 sa_app.trace_type_normal() sa_app.sweep() sa_app.trace_type_avg() sa_app.sweeps(10) sa_app.marker_normal() sa_app.peak_search() rf_freq, rf_pwr = sa_app.marker_posn return rf_freq, rf_pwr def measure_products(sa_app, sa_chan, f1, f2, order): f = [] f.append([n*f1 for n in range(1, order+1)]) for m in range(1,order+1): f.append([abs(freq+(m*f2)) for freq in f[0]]) p = [] meas_f = [] for n in range(order+1): p.append([]) meas_f.append([]) for m in range(order): p[n].append(0.0) meas_f[n].append(0.0) for n in range(order+1): for m in range(order): freq = f[n][m] meas_f[n][m], p[n][m] = measure_power(sa_app, sa_chan, freq) return f, meas_f, p def main(): default_rpsa_addr = "127.0.0.1" default_rpsa_port = 18865 default_f1 = 500.0 default_f2 = 30.0 default_order = 5 default_chan = "2" parser = ArgumentParser(description= "Measure mixer modulation products", formatter_class=RawTextHelpFormatter) parser.add_argument( "-a", "--saaddr", default=default_rpsa_addr, help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})") parser.add_argument( "-p", "--saport", default=default_rpsa_port, type=int, help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})") parser.add_argument( "-C", "--channel", choices=["1", "2"], default=default_chan, help=textwrap.dedent(f'''\ The target channel for the measurement (default: {default_chan}). ''')) parser.add_argument( "--flo", type=float, default=default_f1, help=textwrap.dedent(f'''\ The mixer LO frequency in MHz (default: {default_f1}). ''')) parser.add_argument( "--fsignal", type=float, default=default_f2, help=textwrap.dedent(f'''\ The mixer signal frequency in MHz (default: {default_f2}). ''')) parser.add_argument( "--order", type=int, default=default_order, help=textwrap.dedent(f'''\ Measure modulation products up to this order (default: {default_order}). ''')) args = parser.parse_args() sa_chan = 0 if args.channel is not None: sa_chan = int(args.channel) - 1 sa = rpyc.connect(args.saaddr, args.saport) sa_app = sa.root.sa_app (f, measured_f, p) = measure_products( sa_app, sa_chan, args.flo, args.fsignal, args.order) for n in range(args.order+1): for m in range(args.order): print(f"{f[n][m]:.1f} ", end='') print() print() for n in range(args.order+1): for m in range(args.order): if freq_close_to(f[n][m], measured_f[n][m]): print(f"{p[n][m]:.2f} ", end='') else: print("* ", end='') print() if __name__ == '__main__': main()
Measure SA IP3
import time import textwrap from argparse import ArgumentParser, RawTextHelpFormatter import numpy as np import rpyc from ip3 import measure_ip3 TONE_SEPARATION = 2.0 def set_rf(rf_chan, f_rf, rf_pwr): rf_chan.plo_ctl.freq = float(f_rf) rf_chan.level = rf_pwr lock_status = rf_chan.configure() return lock_status def read_ip3_settings(settings_file): with open(settings_file) as fd: lines = fd.readlines() settings = {} for line in lines: l = line.strip() if len(l) == 0 or l[0] == '#': continue tone_freq_str, tone1_pwr, tone2_pwr, lo_freq = line.split() tone_freq = float(tone_freq_str) settings[tone_freq_str] = ((tone_freq, float(tone1_pwr)), (tone_freq+TONE_SEPARATION, float(tone2_pwr)), float(lo_freq)) return settings def main(): default_im3_span = 0.1 default_lo_offset = 12.5 default_im3_lo_offset = 15.5 parser = ArgumentParser(description= "Measure mixer IMD and IP3", formatter_class=RawTextHelpFormatter) parser.add_argument( "--im3_span", type=float, default=default_im3_span, help=f"SA frequency span when measuring IM3 amplitudes (Default: {default_im3_span} in MHz)") parser.add_argument( "--lo_offset", type=float, help=textwrap.dedent(f'''\ SA LO offset (in MHz) when using a down converter RF frontend (Default: {default_lo_offset})''')) parser.add_argument( "--im3_lo_offset", type=float, help=textwrap.dedent(f'''\ SA LO offset (in MHz) when measuring IM3 amplitudes using a down converter RF frontend. (Default: {default_im3_lo_offset})''')) args = parser.parse_args() sa = rpyc.connect("127.0.0.1", 18865) sa_app = sa.root.sa_app rfgen = rpyc.connect('127.0.0.1', 18864) lowspur_rfgen = rpyc.connect('127.0.0.1', 18867) rfgen_app = rfgen.root rfgen_chan = rfgen_app.channels['Chan 1'] sa_tone_chan = 0 lowspur_chan = lowspur_rfgen.root.plo_chan settings = read_ip3_settings('ip3-sa-settings.txt') for tone_str in ['850']: tone1, tone2, lo_freq = settings[tone_str] lock_status = set_rf(rfgen_chan, tone1[0], tone1[1]) lock_status = set_rf(lowspur_chan, tone2[0], tone2[1]) sa_app.channel = sa_tone_chan for fe_atten in [0, 5, 7, 10, 12, 15, 17, 20, 25]: im3_str = measure_ip3(sa, sa_app, tone1[0], tone2[0], tone_span=None, im3_span=args.im3_span, fe_atten=fe_atten, lo_offset=args.lo_offset, im3_lo_offset=args.im3_lo_offset, lo_freq=None) print(f"{im3_str}") print("----------") if __name__ == '__main__': main()