Open RF Prototyping

Spectrum Analyzer Testbed Application

The Application

This is a spectrum analyzer application targeting systems using the Red Pitaya ADC/DAC board in combination with RF front ends constructed using RF Blocks modules. Figure 1 illustrates the relationships between the various hardware and software components of a typical system.

RPSA HW environment

Figure 1: RPSA hardware environment. Heavy black lines indicate RF signal connections, light black lines indication control connections, heavy blue lines indicate TCP/IP network connections, heavy orange lines indicate USB connections.

Hardware Setup

Hardware setup for the Red Pitaya board is presented in more detail here: A Dual ADC/DAC Using the Red Pitaya Board. The hardware can be used in a base band configuration or with an RF front end which down converts the test signal into the base band.

  • Base band

  • Direct conversion RF front end

  • Superhet RF front end

Installing and Running the Application

  • Install the dual ADC/DAC SD card image as documented.

  • Install the adcdma application

  • Create a virtual environment and install rfblocks as documented here: install rfblocks into a virtual environment.

  • Install the rp-sa reference application

Command line usage

cd ${RFBLOCKS_REFDESIGNS}/DualDacAdc/scripts/rp
export PYTHONPATH=.
python sa
usage: sa [-h] [-a ADCADDR] [-p ADCPORT] [-A IPADDR] [-P PORT] [--frontend_ip FRONTEND_IP] [--frontend_port FRONTEND_PORT]
          [--cal_device CAL_DEVICE] [--cal_data CAL_DATA] [-F FRONTEND_CONFIG]

Red Pitaya spectrum analyzer application

optional arguments:
  -h, --help            show this help message and exit
  -a ADCADDR, --adcaddr ADCADDR
                        IP address for the ADC DMA server instance
  -p ADCPORT, --adcport ADCPORT
                        TCP port for the ADC DMA server instance
  -A IPADDR, --ipaddr IPADDR
                        IP address for the SA RPyC server instance
  -P PORT, --port PORT  TCP port for the SA RPyC server instance
  --frontend_ip FRONTEND_IP
                        IP address for the RF frontend server instance
  --frontend_port FRONTEND_PORT
                        TCP port for the RF frontend server instance
  --cal_device CAL_DEVICE
                        Calibration source device. The device is specified as a comma separated string of the form:
                        DEVICE_NAME,DEVICE_ID for example: SMHU58,28
  --cal_data CAL_DATA   A file containing ADC calibration data.
  -F FRONTEND_CONFIG, --frontend_config FRONTEND_CONFIG
                        Frontend hardware configuration.

Note that on some Linux platforms a bug in the QT library manifests as a segfault under some circumstances. A workaround for this bug is to set the following environment variable before running the application:

export QT_NO_FT_CACHE=1

See stackoverflow for more information.

Configuration and capabilities

On startup, the sa application will attempt to read spectrum analyzer capabilities from the default front end application service address (127.0.0.1:18870). If the front end application service is at a listening on a different address the --frontend_ip and --frontend_port command line parameters should be used to specify it.

If the sa application fails to read analyzer capabilities from a front end application service it will default to making use of base band capabilities as specified in BaseFrontEndService.BASE_CAPABILITIES class attribute (see RF Front End section of the sa application source code).

Application Source Code

A detailed description of the spectrum analyzer app code is provided here: Red Pitaya Spectrum Analyzer.

The Application Main Window

SA app window

Figure 2: The spectrum analyzer application window

The application is presented with channel signal displays on the left (either one or two channels may be displayed), and application controls on the right. Channel sweep controls allow for Single sweep with continuous sweeping initiated using the Start button. Continuous sweeping is suspended by selecting the Stop button. Channel controls allow either of two channels to be displayed or, if the Split button is selected, both channels displayed simultaneously. Per channel controls are available by selectcing one of the three panes at the bottom right of the application window: Freq/Ampl, Markers and Trace.

Table 1. Global keyboard shortcuts

Shortcut

Description

<Alt>-s

Start continuous sweep

<Alt>-n

Single sweep

<Alt>-t

Stop sweep

<Alt>-1

Select channel 1 spectrum display

<Alt>-2

Select channel 2 spectrum display

<Alt>-f

Select the Freq/Ampl control pane

<Alt>-m

Select the Markers control pane

<Alt>-t

Select the Trace control pane

Single channel

Figure 3a: Single channel display

Split channel

Figure 3b: Split channel display

Taking Measurements

Freq/ampl ctls

Figure 4: Frequency and amplitude controls

The spectrum frequency controls, Centre, Start, Stop and Span are coupled so that changes to one with cause appropriate updates in the others. The Full Span button will reset the spectum display to be the full span, generally 0.5 to 55 MHz although this may vary if an external RF frontend is used with the Red Pitaya board.

Resolution bandwidth (RBW) of the spectrum display is, by default, set automatically. The Auto checkbox can be un-checked to allow the manual selection of RBW using the associated spinbox controls.

The ADC Attenuation control sets the value of the channel attenuator connected to the ADC input of the Red Pitaya board. Maximum attenuation is 31.75 dB in steps of 0.25 dB.

Marker ctls

Figure 5: Marker controls

A marker is selected by clicking on one of the four Marker selection buttons. By default, markers are set to the Off state. Selecting one of the marker types Normal, Delta pair or Span pair will make the marker visible within the channel signal display.

Only one of the markers is selected at any particular time.

Delta marker

Figure 6a: Delta marker

Span marker

Figure 6b: Span marker

The Ref and Delta buttons become active when the selected marker is a delta marker. The reference marker is denoted by an R next to the marker number. Hold down Alt key and move cursor in order to move the selected marker. Ctl-Left-Arrow and Ctl-Right-Arrow to move selected marker left and right respectively.

Keyboard accelerators: <Alt>-p move currently selected marker to signal peak, <Alt>-c change signal display centre frequency to position of the currently selected marke z to zoom on span marker.

Trace ctls

Figure 7: Trace controls

Calibration

Cal. menu

Figure 8: Channel calibration menu

RF Front Ends

Front end capabilities are read from the RPyC service associated with the front end.

Direct Conversion

RF front end ctrls

Figure 9: SA App showing front end controls

Software Control

A quick start to using software control with the SA application. More detailed examples of the use of software control are given in the Scripts.

import rpyc

sa = rpyc.connect("127.0.0.1", 18865)
dds = rpyc.connect("127.0.0.1", 18862)

dds.root.initialize()
dds1 = dds.root.dds_controllers['Chan 1']
dds1.state = ad9913.POWER_UP
dds1.freq = 25.0
dds1.dbm = -10.0
dds.root.configure('Chan 1')

sa_app = sa.root.sa_app
sa_app.channel = 1
sa_app.centre_freq = 25.0
sa_app.freq_span = 0.5
sa_app.detector_type_pos()
sa_app.trace_type_avg()
sa_app.sweeps(5)

sa_app.marker_normal()
freq, pwr = sa_app.marker_posn()
print(f"{freq=:.2f}, {pwr=:.2f}")

sa.close()
dds.close()

Frequency and bandwidth

@property
def centre_freq(self) -> float:
    """The centre frequency for the currently selected channel (in MHz).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.channel = 1
    >>> sa_app.centre_freq
    2027.5
    >>> sa_app.centre_freq = 1000
    >>> sa_app.centre_freq
    1000.0
    """

@property
def start_freq(self) -> float:
    """The start frequency for the currently selected channel (in MHz).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.centre_freq = 1000
    >>> sa_app.start_freq = 999.95
    >>> sa_app.stop_freq = 1000.5
    >>> f"{sa_app.freq_span:.4f}"
    '0.5500'
    """

@property
def stop_freq(self) -> float:
    """The stop frequency for the currently selected channel (in MHz).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.centre_freq = 1000
    >>> sa_app.start_freq = 999.95
    >>> sa_app.stop_freq = 1000.5
    >>> f"{sa_app.freq_span:.4f}"
    '0.5500'
    """

@property
def freq_span(self) -> float:
    """The frequency span for the currently selected channel (in MHz).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.centre_freq = 1000
    >>> sa_app.start_freq = 999.95
    >>> sa_app.stop_freq = 1000.5
    >>> f"{sa_app.freq_span:.4f}"
    '0.5500'
    >>> sa_app.freq_span = 1.0
    >>> sa_app.start_freq
    999.725
    >>> sa_app.stop_freq
    1000.725
    """

@property
def rbw(self) -> float:
    """The resolution bandwidth of the currently selected channel (in MHz).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.rbw
    5000
    >>> sa_app.rbw = 2000
    >>> sa_app.sweeps(5)
    >>> sa_app.auto_rbw
    False
    """

@property
def auto_rbw(self) -> bool:
    """Enable/disable auto setting of RBW for the currently selected
    channel (in MHz).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.auto_rbw
    True
    """

Amplitude

@property
def ref_level(self) -> float:
    """The reference level for the currently selected channel (in dBm).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.centre_freq = 1000
    >>> sa_app.ref_level = 5.0
    >>> sa_app.ampl_scale = 12
    >>> sa_app.sweep()
    """

@property
def ampl_scale(self) -> float:
    """The amplitude scaling for the currently selected channel (in dB).

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.centre_freq = 1000
    >>> sa_app.ref_level = 5.0
    >>> sa_app.ampl_scale = 12
    >>> sa_app.sweep()
    """

@property
def atten(self) -> float:
    """The ADC attenuator setting for the currently selected channel (in dB)

    This is set in steps of 0.25dB from 0.0dB to 31.75dB.

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.centre_freq = 1000
    >>> sa_app.ref_level = 5.0
    >>> sa_app.ampl_scale = 12
    >>> sa_app.sweep()
    >>> sa_app.atten
    0.0
    >>> sa_app.atten = 10
    >>> sa_app.atten
    10
    >>> sa_app.sweep()
    """

Markers

def marker_normal(self, mkr_id: int=1) -> None:
    """Set the specified marker for the currently selected channel to
    be in 'normal' mode.

    :param mkr_id: The id for the marker (1 to 4).
    :type mkr_id: int
    """

def marker_delta(self, mkr_id: int=1) -> None:
    """Set the specified marker for the currently selected channel to
    be in 'delta' mode.

    :param mkr_id: The id for the marker (1 to 4).
    :type mkr_id: int
    """

def marker_off(self, mkr_id: int=1) -> None:
    """Turn the specified marker for the currently selected channel off.

    :param mkr_id: The id for the marker (1 to 4).
    :type mkr_id: int
    """

@property
def selected_marker(self) -> SpectrumMarker:
    """Return a reference to the currently selected marker.
    """

@property
def marker_type(self) -> int:
    """Return the type of the currently selected marker
    """

@property
def marker_posn(self) -> Tuple[float, float]:
    """Return the position of the currently selected marker.

    :returns: A tuple containing the frequency (in MHz)
        and power (in dBm) of the currently selected marker.
        If the marker is either a delta or span type then the
        position will be for the reference marker.
    """

@property
def marker_delta_posn(self) -> Tuple[float, float]:
    """Return the delta frequency and power values for the currently
    selected marker.

    Note that if the currently selected marker is not either a delta
    or span type marker the values returned in the tuple will be None.

    :returns: A tuple containing the frequency and power delta of the
        currently selected marker.
    """

def set_marker_delta_posn(self, f:float) -> None:
    """Move the offset frequency position of the currently selected marker.
    If the type of the currently selected marker is not ``MKR_TYPE_DELTA``
    this function will do nothing.

    :param f: The offset from the reference marker to move the delta
        marker to.
    :type f: float

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.marker_delta()
    >>> from plot import SpectrumMarker
    >>> sa_app.marker_delta_type(mkr_id=1, mkr_type=SpectrumMarker.REF_DELTA_TYPE)
    >>> sa_app.peak_search()
    >>> sa_app.marker_delta_type(mkr_id=1, mkr_type=SpectrumMarker.DELTA_DELTA_TYPE)
    >>> sa_app.set_marker_delta_posn(0.5)
    >>> sa_app.marker_delta_posn
    (0.5009602327528455, -91.42057798668877)
    """

def peak_search(self) -> None:
    """
    """

def next_peak(self) -> None:
    """
    """

Trace

def detector_type_normal(self) -> None:
    """
    """

def detector_type_pos(self) -> None:
    """
    """

def detector_type_neg(self) -> None:
    """
    """

def detector_type_sample(self) -> None:
    """
    """

def trace_type_normal(self, trace_id: int=1) -> None:
    """
    """

def trace_type_avg(self, trace_id: int=1) -> None:
    """
    """

def trace_type_min(self, trace_id: int=1) -> None:
    """
    """

def trace_type_max(self, trace_id: int=1) -> None:
    """
    """

@property
def trace_average(self) -> int:
    """
    """

Sweeping

def sweep(self) -> None:
    """Perform a single spectrum sweep on the currently selected channel.
    """

def sweeps(self, sweep_count: int) -> None:
    """Perform the specified number of spectrum sweeps on the currently
    selected channel.

    :param sweep_count: The number of spectrum sweeps to carry out.
    :type sweep_count: int
    """

@property
def running(self) -> bool:
    """The status of the currently selected channel.

    :return: True if the channel is currently running and performing
    spectrum sweeps.  False if the channel is not running.
    """

Front end controls

@property
def chan_capabilities(self) -> Dict:
    """
    """

@property
def chan_type_is_direct(self) -> bool:
    """
    """

@property
def fe_atten(self) -> float:
    """The value of the RF front end attenuator for the currently
    selected channel.

    :returns: The current front end attenuation in dB.

    :raises: NotImplementedError if there is no RF front end hardware
        configured.

    >>> import rpyc
    >>> sa = rpyc.connect("127.0.0.1", 18865)
    >>> sa_app = sa.root.sa_app
    >>> sa_app.fe_atten
    10.0
    >>> sa_app.fe_atten = 0.0
    >>> sa_app.fe_atten
    0.0
    """

@property
def fe_gain(self) -> float:
    """The value of the RF front end gain for the recently selected
    channel.

    :returns: The current front end gain in dB.

    """

Data store

class DataStore:

    @property
    def spectra(self) -> List[Spectrum]:
    """A list containing the spectra for each of the traces.
    """

    @property
    def selected_trace_id(self) -> int:
    """The index of the currently selected trace.
    """

    @property
    def selected_trace_type(self) -> int:
    """The currently selected trace type.
    """

    @property
    def freq(self) -> np.ndarray:
    """The frequency value array for the currently selected trace.
    """

    @property
    def pwr(self) -> np.ndarray:
    """The power value array for the currently selected trace.
    """

    @property
    def sweep_params(self) -> SweepParameters:
    """The sweep parameters for the currently selected trace.
    """


@dataclass
class Spectrum():
    freq: np.ndarray
    pwr: np.ndarray
    iq_data: np.ndarray
    parameters: SweepParameters
    timestamp: datetime


@dataclass
class SweepParameters():
    # Start and stop frequencies in MHz.
    f_start: float
    f_stop: float

    fe_lo: float = 0.0
    s1: float = 0.0
    s2: float = 0.0
    window_type: str = 'HFT144D'
    window_w3db: float = 4.4697
    f_clk: float = ADC_122_16_F_CLK


    @property
    def fs(self):
    """
    """

    @property
    def bin_width(self):
    """
    """

    @property
    def decimation_rate(self):
    """
    """

    @property
    def enbw(self):
    """
    """

    @property
    def rbw(self):
    """
    """

    @property
    def window(self):
    """
    """

Channel capabilities

Scripts

Some of these scripts may import modules from the RPSA application. In order to allow this, PYTHONPATH should be set as follows:

PYTHONPATH=$PYTHONPATH:$RFBLOCKS_REFDESIGNS/DualDacAdc/scripts/rp:$RFBLOCKS_REFDESIGNS/DualDacAdc/scripts/rp/sa

Save and restore SA settings

This script will save or restore spectrum analyzer application settings using a JSON formatted file.

usage: settings.py [-h] [-a SAADDR] [-p SAPORT] [-S] [-R] [-f SETTINGS_FILE] [-C {1,2}]

Save/restore settings for the RPSA app.

optional arguments:
  -h, --help            show this help message and exit
  -a SAADDR, --saaddr SAADDR
                        IP address for the RPSA RPyC service (127.0.0.1)
  -p SAPORT, --saport SAPORT
                        TCP port for the RPSA RPyC service (18865)
  -S, --save            Save the current RPSA settings
  -R, --restore         Restore RPSA settings
  -f SETTINGS_FILE, --settings_file SETTINGS_FILE
                        File to save or restore SA settings to or from (sa-settings.json)
  -C {1,2}, --channel {1,2}
                        Channel settings to save or restore.
                        Note that not specifying a channel will save or
                        restore both channels.
import sys
from argparse import ArgumentParser, RawTextHelpFormatter
import json
import textwrap
import rpyc


DEFAULT_SETTINGS_FILE = "sa-settings.json"
CHANNEL_LABELS = ['Ch 1', 'Ch_2']


def save_settings(sa_app, outfile, chan_list):
    settings = {}

    for ch in chan_list:
        ch_settings = {}
        chan_label = CHANNEL_LABELS[ch]
        sa_app.channel = ch
        ch_settings['centre_freq'] = sa_app.centre_freq
        ch_settings['start_freq'] = sa_app.start_freq
        ch_settings['stop_freq'] = sa_app.stop_freq
        ch_settings['freq_span'] = sa_app.freq_span
        ch_settings['ref_level'] = sa_app.ref_level
        ch_settings['ampl_scale'] = sa_app.ampl_scale
        ch_settings['atten'] = sa_app.atten
        ch_settings['auto_rbw'] = sa_app.auto_rbw
        ch_settings['rbw'] = sa_app.rbw
        settings[chan_label] = ch_settings

    with open(outfile, 'w') as fd:
        json.dump(settings, fd)


def restore_settings(sa_app, infile, chan_list):
    with open(infile) as fd:
        settings = json.load(fd)

    for ch in chan_list:
        chan_label = CHANNEL_LABELS[ch]
        ch_settings = settings[chan_label]
        sa_app.channel = ch
        sa_app.centre_freq = float(ch_settings['centre_freq'])
        sa_app.start_freq = float(ch_settings['start_freq'])
        sa_app.stop_freq = float(ch_settings['stop_freq'])
        sa_app.freq_span = float(ch_settings['freq_span'])
        sa_app.ref_level = float(ch_settings['ref_level'])
        sa_app.ampl_scale = float(ch_settings['ampl_scale'])
        sa_app.atten = float(ch_settings['atten'])
        sa_app.auto_rbw = ch_settings['auto_rbw']
        sa_app.rbw = float(ch_settings['rbw'])


def main():
    default_rpyc_addr = "127.0.0.1"
    default_rpyc_port = 18865

    parser = ArgumentParser(description=
                            "Save/restore settings for the RPSA app.",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpyc_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpyc_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})")
    parser.add_argument(
        "-S", "--save", action='store_true',
        help="Save the current RPSA settings")
    parser.add_argument(
        "-R", "--restore", action='store_true',
        help="Restore RPSA settings")
    parser.add_argument(
        "-f", "--settings_file", default=DEFAULT_SETTINGS_FILE,
        help=f"File to save or restore SA settings to or from ({DEFAULT_SETTINGS_FILE})")
    parser.add_argument(
        "-C", "--channel", choices=["1", "2"],
        help=textwrap.dedent('''\
        Channel settings to save or restore.
        Note that not specifying a channel will save or
        restore both channels.
        '''))

    args = parser.parse_args()
    if args.save and args.restore:
        print("Both 'save' and 'restore' have been specified.")
        print("Please choose just one operation.")
        sys.exit(0)
    if args.channel is not None:
        channels = [int(args.channel) - 1]
    else:
        channels = [0, 1]

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app

    if args.save:
        save_settings(sa_app, args.settings_file, channels)
    else:
        restore_settings(sa_app, args.settings_file, channels)


if __name__ == '__main__':
    main()

Measure/set signal generator output level

This makes use of the Simple RF Signal Generator as the signal source.

Set/measure setup

Figure 10: Setup for setting/measuring signal generator output level

usage: rfgen.py [-h] [-a SAADDR] [-p SAPORT] [-A RFGEN_ADDR] [-P RFGEN_PORT] [--rfgen_chan {1,2}] [--sa_chan {1,2}]
                [--sa_span SA_SPAN] [--sweep_count SWEEP_COUNT] -F FREQ [-L LEVEL]

Measure/set RF signal generator output

optional arguments:
  -h, --help            show this help message and exit
  -a SAADDR, --saaddr SAADDR
                        IP address for the RPSA RPyC service (127.0.0.1)
  -p SAPORT, --saport SAPORT
                        TCP port for the RPSA RPyC service (18865)
  -A RFGEN_ADDR, --rfgen_addr RFGEN_ADDR
                        IP address for the RFGen RPyC service (127.0.0.1)
  -P RFGEN_PORT, --rfgen_port RFGEN_PORT
                        TCP port for the RFGen RPyC service (18864)
  --rfgen_chan {1,2}    Signal generator channel to use. (2)
  --sa_chan {1,2}       Spectrum analyzer channel to use. (1)
  --sa_span SA_SPAN     Spectrum analyzer span to use (in MHz). (0.1)
  --sweep_count SWEEP_COUNT
                        Number of SA sweeps to average measurement over. (5)
  -F FREQ, --freq FREQ  Signal generator frequency (in MHz). (1000.0)
  -L LEVEL, --level LEVEL
                        Signal generator output power, in dBm.

Note that this uses a variation of the RFGenService.set_channel_output method in order to (optionally) set the output level of the signal generator. Instead of using a power detector head connected to the signal generator monitoring port the spectrum analyzer facilities are used to measure the power output. See Control output power using the monitor port for more details on using RFGenService.set_channel_output.

import sys
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import rpyc
from rfblocks import BridgeCoupler


AVERAGING_SWEEPS = 5
MEASUREMENT_SPAN = 0.1


def set_rf(rf_chan, f_rf, rf_pwr):
    rf_chan.plo_ctl.freq = float(f_rf)
    rf_chan.level = rf_pwr
    lock_status = rf_chan.configure()
    return lock_status


def sweep(sa_service, sa_app, sweep_count=AVERAGING_SWEEPS):
    rpyc_timeout = sa_service._config['sync_request_timeout']
    sa_service._config['sync_request_timeout'] = None
    sa_app.sweeps(sweep_count)
    sa_service._config['sync_request_timeout'] = rpyc_timeout


def measure_signal_level(sa_service, sa_app, sa_chan, freq,
                         span, sweep_count):
    sa_app.trace_type_normal()
    sa_app.channel = sa_chan
    sa_app.centre_freq = freq
    sa_app.freq_span = span
    sa_app.sweep()
    sa_app.trace_type_avg()
    sweep(sa_service, sa_app, sweep_count)
    sa_app.marker_normal()
    sa_app.peak_search()
    signal_freq, signal_pwr = sa_app.marker_posn
    return signal_freq, signal_pwr


def measure_or_set_signal_level(sa_service, sa_app,
                                rfgen_chan, sa_chan,
                                freq, level, span, sweep_count):
    lo_chan = rfgen_chan  # Default RFGen channel 2
    lo_sa_chan = sa_chan  # Default RPSA 'Chan 1'

    min_pwr, max_pwr = lo_chan.power_range(freq)
    if level < min_pwr:
        p_des = min_pwr
    elif level > max_pwr:
        p_des = max_pwr
    else:
        p_des = level

    if level is not None:
        # Set freq and level, measure actual level
        lo_chan.plo_ctl.freq = freq
        lo_chan.level = level
        lo_chan.configure()

        attempts = 0
        while attempts < 5:
            measured_freq, measured_pwr = measure_signal_level(
                sa_service, sa_app, lo_sa_chan,
                freq, span, sweep_count)
            actual_pwr = (measured_pwr - lo_chan.coupler.coupling(freq)
                          + lo_chan.coupler.insertion_loss(freq))
            delta_p = p_des - actual_pwr
            attempts += 1
            if abs(delta_p) > 0.3:
                lo_chan.level = lo_chan.level + delta_p
                lo_chan.configure()
            else:
                break
    else:
        measured_freq, measured_pwr = measure_signal_level(
            sa_service, sa_app, lo_sa_chan, freq, span, sweep_count)
        actual_pwr = (measured_pwr
                      - lo_chan.coupler.coupling(measured_freq)
                      + lo_chan.coupler.insertion_loss(measured_freq))
    return (measured_freq, actual_pwr)


def main():
    default_rpsa_addr = "127.0.0.1"
    default_rpsa_port = 18865
    default_rfgen_addr = "127.0.0.1"
    default_rfgen_port = 18864
    default_signal_freq = 1000.0
    default_rfgen_chan = '2'
    default_sa_chan = '1'
    default_sa_span = MEASUREMENT_SPAN
    default_sweep_count = AVERAGING_SWEEPS

    parser = ArgumentParser(description=
                            "Measure/set RF signal generator output",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpsa_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpsa_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})")
    parser.add_argument(
        "-A", "--rfgen_addr",
        default=default_rfgen_addr,
        help=f"IP address for the RFGen RPyC service ({default_rfgen_addr})")
    parser.add_argument(
        "-P", "--rfgen_port",
        default=default_rfgen_port, type=int,
        help=f"TCP port for the RFGen RPyC service ({default_rfgen_port})")
    parser.add_argument(
        "--rfgen_chan",
        default=default_rfgen_chan, choices=["1", "2"],
        help=f"Signal generator channel to use. ({default_rfgen_chan})")
    parser.add_argument(
        "--sa_chan",
        default=default_sa_chan, choices=["1", "2"],
        help=f"Spectrum analyzer channel to use. ({default_sa_chan})")
    parser.add_argument(
        "--sa_span",
        default=default_sa_span, type=float,
        help=f"Spectrum analyzer span to use (in MHz). ({default_sa_span})")
    parser.add_argument(
        "--sweep_count",
        default=default_sweep_count, type=int,
        help=f"Number of SA sweeps to average measurement over. ({default_sweep_count})")
    parser.add_argument(
        "-F", "--freq", required=True, type=float,
        help=f"Signal generator frequency (in MHz). ({default_signal_freq})")
    parser.add_argument(
        "-L", "--level", type=float,
        help=f"Signal generator output power, in dBm.")

    args = parser.parse_args()

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app
    rfgen = rpyc.connect(args.rfgen_addr, args.rfgen_port)
    rfgen_app = rfgen.root
    rfgen_chan = rfgen_app.channels[f"Chan {args.rfgen_chan}"]
    sa_chan = int(args.sa_chan) - 1
    freq, pwr = measure_or_set_signal_level(sa, sa_app,
                                            rfgen_chan, sa_chan,
                                            args.freq, args.level,
                                            args.sa_span, args.sweep_count)
    print(f"{freq=}, {pwr=}")


if __name__ == '__main__':
    main()

Produce spectrum plots

Example plot

Figure 11: Example plot produced by plot.py

usage: plot.py [-h] [-a SAADDR] [-p SAPORT] [-C {1,2}] [-O PLOT_FILE]

Generate spectrum plots from the RPSA app.

optional arguments:
  -h, --help            show this help message and exit
  -a SAADDR, --saaddr SAADDR
                        IP address for the RPSA RPyC service (127.0.0.1)
  -p SAPORT, --saport SAPORT
                        TCP port for the RPSA RPyC service (18865)
  -C {1,2}, --channel {1,2}
                        Channel spectra to plot.  Note that not specifying a channel will
                        cause plots of both channels to be produced.
  -O PLOT_FILE, --plot_file PLOT_FILE
                        File to save the generated plot to (plot.svg)
import sys
from argparse import ArgumentParser, RawTextHelpFormatter
import json
import textwrap

import numpy as np

import matplotlib
import matplotlib.pyplot as plt
matplotlib.use('Agg')

import rpyc

from dyadic.splot import init_style
from sa.utils import (
    FREQ_SUFFIXES,
    TRACE_TYPE_NORMAL, TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG,
    TRACE_TYPE_BLANK, TRACE_TYPE_FREEZE
)
from sa.plot import (
    SpectrumPlotWidget, SpectrumMarker
)


DEFAULT_PLOT_FILE = "plot.svg"
ANNOTATION_SIZE = 'x-small'
TITLE_SIZE = "xx-large"
ANNOTATION_COLOUR = "#0f0f0f80"
TITLE_COLOUR = "#0f0f0f40"

MARKER_COLOUR = 'r'
DELTA_MARKER_COLOUR = 'b'
MARKER_SIZE = 4
DOWN_TRIANGLE = 7
UP_TRIANGLE = 6


def format_freq(f: float, prec: int = 3) -> str:
    """f is in MHz
    """
    if abs(f) < 0.001:
        return f"{(f*1e6):4.1f}{FREQ_SUFFIXES[2]}"
    elif abs(f) < 1.0:
        return f"{(f*1e3):6.2f}{FREQ_SUFFIXES[1]}"
    else:
        return f"{f:7.{prec}f}{FREQ_SUFFIXES[0]}"


def draw_marker(sa_app, ax, mkr):
    ampl_scale = sa_app.ampl_scale
    fspan = sa_app.freq_span
    f_offset = fspan * 0.005
    a_offset = ampl_scale * 0.175
    if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]:
        mkr_text = f"{mkr.marker_id:d}R"
    else:
        mkr_text = f"{mkr.marker_id:d}"
    if mkr.pwr > (sa_app.ref_level - (ampl_scale * 0.2)):
        ax.plot(mkr.freq, sa_app.ref_level,
                marker=UP_TRIANGLE,
                markersize=MARKER_SIZE,
                color=MARKER_COLOUR)
        ax.annotate(mkr_text,
                    xy=(mkr.freq - f_offset, sa_app.ref_level - (2.0 * a_offset)),
                    fontsize=ANNOTATION_SIZE,
                    color=MARKER_COLOUR)
    else:
        ax.plot(mkr.freq, mkr.pwr,
                marker=DOWN_TRIANGLE,
                markersize=MARKER_SIZE,
                color=MARKER_COLOUR)
        ax.annotate(mkr_text,
                    xy=(mkr.freq - f_offset, mkr.pwr + a_offset),
                    fontsize=ANNOTATION_SIZE,
                    color=MARKER_COLOUR)
    if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]:
        if mkr.aux_pwr > (sa_app.ref_level - (sa_app.ampl_scale * 0.2)):
            ax.plot(mkr.aux_freq, sa_app.ref_level,
                    marker=UP_TRIANGLE,
                    markersize=MARKER_SIZE,
                    color=MARKER_COLOUR)
            ax.annotate(f"{mkr.marker_id:d}",
                        xy=(mkr.aux_freq - f_offset,
                            sa_app.ref_level - (2.0 * a_offset)),
                        fontsize=ANNOTATION_SIZE,
                        color=MARKER_COLOUR)
        else:
            ax.plot(mkr.aux_freq, mkr.aux_pwr,
                    marker=DOWN_TRIANGLE,
                    markersize=MARKER_SIZE,
                    color=MARKER_COLOUR)
            ax.annotate(f"{mkr.marker_id:d}",
                        xy=(mkr.aux_freq - f_offset, mkr.aux_pwr + a_offset),
                        fontsize=ANNOTATION_SIZE,
                        color=MARKER_COLOUR)


def annotate_marker(sa_app, ax, mkr):
    if mkr.marker_type in [SpectrumMarker.MKR_TYPE_NORMAL,
                           SpectrumMarker.MKR_TYPE_DELTA]:
        mkr_id = mkr.marker_id
        draw_marker(sa_app, ax, mkr)
        ax.annotate(f"Mkr{mkr_id}: {mkr.pwr:.2f} dBm",
                    xy=(0.74, 0.95),
                    xycoords="axes fraction",
                    fontsize=ANNOTATION_SIZE,
                    color=ANNOTATION_COLOUR)
        fspan = sa_app.freq_span
        if fspan < SpectrumPlotWidget.HIGH_PREC_SPAN:
            freq_text = f"          {format_freq(mkr.freq, prec=6)}"
        else:
            freq_text = f"          {format_freq(mkr.freq)}"
        ax.annotate(freq_text,
                    xy=(0.74, 0.91),
                    xycoords="axes fraction",
                    fontsize=ANNOTATION_SIZE,
                    color=ANNOTATION_COLOUR)
        if mkr.marker_type in [SpectrumMarker.MKR_TYPE_DELTA]:
            ax.annotate(f"\u0394Mkr{mkr_id}: {mkr.delta_pwr:.2f} dBm",
                        xy=(0.74, 0.87),
                        xycoords="axes fraction",
                        fontsize=ANNOTATION_SIZE,
                        color=ANNOTATION_COLOUR)
            ax.annotate(f"            {format_freq(mkr.delta_freq)}",
                        xy=(0.74, 0.83),
                        xycoords="axes fraction",
                        fontsize=ANNOTATION_SIZE,
                        color=ANNOTATION_COLOUR)


def annotate_plot(sa_app, chan, ax):
    sweep_params = sa_app.data_store().sweep_params
    ax.annotate(f"Ch {chan+1:d}",
                xy=(0.21, 0.92),
                xycoords="axes fraction",
                fontsize=TITLE_SIZE,
                fontweight="bold",
                color=TITLE_COLOUR)
    ax.annotate(f"Start: {format_freq(sa_app.start_freq)}",
                xy=(0.01, -0.05),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"Span: {format_freq(sa_app.freq_span)}",
                xy=(0.41, -0.05),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"Stop: {format_freq(sa_app.stop_freq)}",
                xy=(0.74, -0.05),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"Ref: {sa_app.ref_level:.2f} dBm",
                xy=(0.01, 0.95),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"RBW: {format_freq(sweep_params.rbw/1e6)}",
                xy=(0.01, 0.91),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"Atten: {sa_app.atten:.2f} dB",
                xy=(0.41, 0.95),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    trace_type = sa_app.data_store().selected_trace_type
    if trace_type in [TRACE_TYPE_MAX, TRACE_TYPE_MIN, TRACE_TYPE_AVG]:
        avg_count = sa_app.data_store().average_counter
        if trace_type == TRACE_TYPE_AVG:
            avg_text = f"Avg: {avg_count:d}"
        elif trace_type == TRACE_TYPE_MAX:
            avg_text = f"Max: {avg_count:d}"
        elif trace_type == TRACE_TYPE_MIN:
            avg_text = f"Min: {avg_count:d}"
        ax.annotate(avg_text,
                    xy=(0.01, 0.65),
                    xycoords="axes fraction",
                    fontsize=ANNOTATION_SIZE,
                    color=ANNOTATION_COLOUR)
    mkr = sa_app.selected_marker
    annotate_marker(sa_app, ax, mkr)


def plot(sa_app, chan_list, plot_file):
    init_style()
    if len(chan_list) == 2:
        fig = plt.figure(num=None, figsize=(6.0, 6.5), dpi=72)
        ax = [fig.add_subplot(2, 1, 1), fig.add_subplot(2, 1, 2)]
    else:
        fig = plt.figure(num=None, figsize=(6.0, 3), dpi=72)
        ax = [fig.add_subplot(1, 1, 1)]

    for ch in chan_list:
        if len(chan_list) == 1:
            axes = ax[0]
        else:
            axes = ax[ch]
        sa_app.channel = ch

        min_freq = sa_app.start_freq
        max_freq = sa_app.stop_freq
        min_pwr = sa_app.ref_level - 10*sa_app.ampl_scale
        max_pwr = sa_app.ref_level

        data = sa_app.data_store()
        pwr = np.array(data.pwr)
        freq = np.array(data.freq)
        axes.set_xlim(min_freq, max_freq)
        axes.set_xticks(np.linspace(min_freq, max_freq, 11))
        axes.xaxis.set_ticklabels([])
        axes.set_ylim(min_pwr, max_pwr)
        axes.tick_params(axis="x", which="both", direction="in")
        axes.tick_params(axis="y", which="both", length=0.0)
        axes.grid(linestyle=':')
        axes.grid(which='both', axis='x', linestyle=':')
        axes.plot(freq, pwr)
        annotate_plot(sa_app, ch, axes)

    fig.tight_layout()
    plt.savefig(plot_file)


def main():
    default_rpyc_addr = "127.0.0.1"
    default_rpyc_port = 18865

    parser = ArgumentParser(description=
                            "Generate spectrum plots from the RPSA app.",
                            formatter_class=RawTextHelpFormatter)
    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpyc_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpyc_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})")
    parser.add_argument(
        "-C", "--channel", choices=["1", "2"],
        help=textwrap.dedent('''\
        Channel spectra to plot.  Note that not specifying a channel will
        cause plots of both channels to be produced.
        '''))
    parser.add_argument(
        "-O", "--plot_file", default=DEFAULT_PLOT_FILE,
        help=f"File to save the generated plot to ({DEFAULT_PLOT_FILE})"
    )

    args = parser.parse_args()
    if args.channel is not None:
        channels = [int(args.channel) - 1]
    else:
        channels = [0, 1]

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app
    plot(sa_app, channels, args.plot_file)


if __name__ == '__main__':
    main()

Sweep a wideband frequency range

This carries out a wideband sweep by breaking the specified sweep frequency span into smaller spans. Each smaller span is swept multiple times using a slightly different LO setting and the resulting sweeps are averaged. This process serves to remove or greatly attenuate any spurs or images produced by the down conversion.

Example wideband sweep

Figure 12: Example wideband sweep of a 2 GHz signal (-10 dBm)

usage: wideband.py [-h] [-a SAADDR] [-p SAPORT] -s START_FREQ -e STOP_FREQ [-S SECTION_SPAN] [-R RBW] [-D {NORMAL,SAMPLE,POS,NEG}]
                   [-T {LINEAR,RANDOM}] [-C SWEEP_COUNT] [-P PLOT_FILE] [-O OUTPUT_FILE]

Generate a wideband spectrum plot

optional arguments:
  -h, --help            show this help message and exit
  -a SAADDR, --saaddr SAADDR
                        IP address for the RPSA RPyC service (127.0.0.1)
  -p SAPORT, --saport SAPORT
                        TCP port for the RPSA RPyC service (18865)
  -s START_FREQ, --start_freq START_FREQ
                        Start frequency for the wideband sweep
  -e STOP_FREQ, --stop_freq STOP_FREQ
                        Stop frequency for the wideband sweep
  -S SECTION_SPAN, --section_span SECTION_SPAN
                        Frequency span of each section of the wideband sweep.
                        Default: 20.0
  -R RBW, --rbw RBW     RBW of the sweeps (in Hz).
  -D {NORMAL,SAMPLE,POS,NEG}, --detector {NORMAL,SAMPLE,POS,NEG}
                        The detector type to use for the wideband sweep.
                        Default: NORMAL
  -T {LINEAR,RANDOM}, --sweep_type {LINEAR,RANDOM}
                        The type of sweep for each section of the wideband sweep.
                        Default: RANDOM
  -C SWEEP_COUNT, --sweep_count SWEEP_COUNT
                        The number of sweeps to average for each section of the wideband sweep.
                        Default: 40
  -P PLOT_FILE, --plot_file PLOT_FILE
                        The file to write the plot of the wideband sweep.
                        Default: wb-plot.svg
  -O OUTPUT_FILE, --output_file OUTPUT_FILE
                        The file to write the wideband sweep data to.
from math import floor
import random
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import numpy as np

import matplotlib
import matplotlib.pyplot as plt
matplotlib.use('Agg')

import rpyc

from dyadic.splot import init_style
from sa.utils import (
    FREQ_SUFFIXES,
    DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS,
    DETECTOR_TYPE_NEG
)


LINEAR = 0
RANDOM = 1

WIDEBAND_SWEEP_BINS = 1000
DETECTOR_TYPES = {
    "NORMAL": DETECTOR_TYPE_NORMAL,
    "SAMPLE": DETECTOR_TYPE_SAMPLE,
    "POS": DETECTOR_TYPE_POS,
    "NEG": DETECTOR_TYPE_NEG}

ANNOTATION_SIZE = 'x-small'
TITLE_SIZE = "xx-large"
ANNOTATION_COLOUR = "#0f0f0f80"
TITLE_COLOUR = "#0f0f0f40"


def format_freq(f: float, prec: int = 3) -> str:
    """f is in MHz
    """
    if abs(f) < 0.001:
        return f"{(f*1e6):4.1f}{FREQ_SUFFIXES[2]}"
    elif abs(f) < 1.0:
        return f"{(f*1e3):6.2f}{FREQ_SUFFIXES[1]}"
    else:
        return f"{f:7.{prec}f}{FREQ_SUFFIXES[0]}"


def peak_detect(pwr, resampled_idx, window_size):
    """Find peaks within a spectrum prior to resampling.

    :param pwr: A numpy array containing the spectrum power values
             as computed by the :py:meth:`power_spectrum` function.
    :type pwr: nparray
    :param resampled_idx: Array indexes for the resampled spectrum
    :type resampled_idx: List[int]
    :param window_size: The window size for the resampled spectrum
    :type window_size: int

    :return: A numpy array of bool values. Each value corresponds to
             a value in the resampled spectrum and is True if the
             associated power value is a peak and False if the value
             is noise.
    """

    NOISE_PERCENTILE = 50
    STD_THRESHOLD = 4.0
    MEAN_THRESHOLD = 4.0

    def stats(arr):
        return np.mean(arr), np.std(arr)

    j = k = window_size//2
    if window_size % 2:
        k += 1
    means = []
    stds = []
    for i in resampled_idx:
        if i - j < 0:
            m, s = stats(pwr[0:k])
        elif i + k >= len(pwr):
            m, s = stats(pwr[i-j:])
        else:
            m, s = stats(pwr[i-j:i+k])
        means.append(m)
        stds.append(s)
    baseline_mean = np.percentile(means, NOISE_PERCENTILE)
    baseline_std = np.percentile(stds, NOISE_PERCENTILE)
    bins = len(resampled_idx)
    if baseline_std == 0 or baseline_mean == 0:
        is_peak = np.ones(bins, dtype=bool)
    else:
        is_peak = np.where(
            ((stds/baseline_std > STD_THRESHOLD)
             & (means/baseline_mean > MEAN_THRESHOLD)),
            np.ones(bins, dtype=bool),
            np.zeros(bins, dtype=bool))
    return is_peak


def detector(detect_type, freq, pwr, bins):
    """Resample a power spectrum.

    :param detect_type: The detector type to use when resampling.
                       One of DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG,
                       DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_NORMAL.
    :type detect_type: int.
    :param freq: Spectrum frequency array
    :type freq: nparray
    :param pwr: Spectrum power array
    :type pwr: nparray
    :param bins: The number of bins required in the resampled spectrum.
    :type bins: int
    """

    def max_pwr(arr, idx):
        if idx - j < 0:
            a = arr[0:k]
        elif idx + k >= len(pwr):
            a = arr[idx-j:]
        else:
            a = arr[idx-j:idx+k]
        return a.max()

    def min_pwr(arr, idx):
        if idx - j < 0:
            a = arr[0:k]
        elif idx + k >= len(pwr):
            a = arr[idx-j:]
        else:
            a = arr[idx-j:idx+k]
        return a.min()

    def sample_pwr(arr, idx):
        if idx - j < 0:
            a = arr[0:k]
        elif idx + k >= len(pwr):
            a = arr[idx-j:]
        else:
            a = arr[idx-j:idx+k]
        return np.take(a, a.size // 2)

    f_start = freq[0]
    f_stop = freq[-1]

    window_size = floor(len(freq)/bins)
    j = k = window_size//2
    if window_size % 2:
        k += 1
    resampled_freqs = np.linspace(f_start, f_stop, bins)
    indexes = [(np.abs(freq-f)).argmin() for f in resampled_freqs]
    resampled_pwr = []
    if detect_type == DETECTOR_TYPE_POS:
        for i in indexes:
            resampled_pwr.append(max_pwr(pwr, i))
    elif detect_type == DETECTOR_TYPE_NEG:
        for i in indexes:
            resampled_pwr.append(min_pwr(pwr, i))
    elif detect_type == DETECTOR_TYPE_SAMPLE:
        for i in indexes:
            resampled_pwr.append(sample_pwr(pwr, i))
    elif detect_type == DETECTOR_TYPE_NORMAL:
        is_peak = peak_detect(pwr, indexes, window_size)
        for i in range(bins):
            if is_peak[i]:
                resampled_pwr.append(max_pwr(pwr, indexes[i]))
            else:
                if i % 2:
                    # Odd
                    resampled_pwr.append(min_pwr(pwr, indexes[i]))
                else:
                    resampled_pwr.append(max_pwr(pwr, indexes[i]))

    return (resampled_freqs, resampled_pwr)


def set_span(sa_app, freq_span, rbw, detector):
    if freq_span > 45.0:
        freq_span = 45.0
    elif freq_span < 5.0:
        freq_span = 5.0
    if freq_span < 20.01:
        offset_step = 1.0
    elif freq_span < 40.01:
        offset_step = 0.5
    else:
        offset_step = 0.25
    sa_app.trace_type_normal()
    sa_app.detector_type = detector
    sa_app.freq_span = freq_span
    if rbw is not None:
        sa_app.rbw = rbw
    else:
        sa_app.auto_rbw = True
    sa_app.sweep()
    return offset_step


def sweep(sa_app, centre_freq,
          sweep_type=LINEAR, sweep_count=20, offset_step=1.0):
    sa_app.trace_type_normal()
    sa_app.centre_freq = centre_freq
    sa_app.sweep()
    max_offset = sa_app.max_felo_offset
    sa_app.trace_type_avg()
    if sweep_type == RANDOM:
        for offset in [0.25 + (random.random() * (max_offset - 0.25))
                       for i in range(sweep_count)]:
            sa_app.felo_offset = float(offset)
            sa_app.sweep()
    else:
        for offset in np.arange(max_offset, 0.0, -offset_step):
            sa_app.felo_offset = float(offset)
            sa_app.sweep()
    sa_app.trace_type_normal()
    data = sa_app.data_store()
    return (np.array(data.freq), np.array(data.pwr))


def wideband_sweep(sa_app, start_freq, stop_freq,
                   section_span, rbw, detector, **kwargs):
    offset_step = set_span(sa_app, section_span, rbw, detector)
    centre_freqs = np.arange(start_freq + (section_span/2), stop_freq,
                             section_span)
    freq = np.array([])
    pwr = np.array([])
    for centre_freq in centre_freqs:
        f, p = sweep(sa_app, centre_freq, offset_step=offset_step, **kwargs)
        freq = np.concatenate((freq, f))
        pwr = np.concatenate((pwr, p))
    return (freq, pwr)


def annotate_plot(sa_app, ax, rbw):
    chan = sa_app.channel
    if rbw is None:
        rbw = sa_app.rbw
    ax.annotate(f"Ch {chan+1:d}",
                xy=(0.21, 0.92),
                xycoords="axes fraction",
                fontsize=TITLE_SIZE,
                fontweight="bold",
                color=TITLE_COLOUR)
    ax.annotate(f"Start: {format_freq(sa_app.start_freq)}",
                xy=(0.01, 0.05),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"Stop: {format_freq(sa_app.stop_freq)}",
                xy=(0.74, 0.05),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"Ref: {sa_app.ref_level:.2f} dBm",
                xy=(0.01, 0.95),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"RBW: {format_freq(rbw/1e6)}",
                xy=(0.01, 0.91),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)
    ax.annotate(f"Atten: {sa_app.atten:.2f} dB",
                xy=(0.41, 0.95),
                xycoords="axes fraction",
                fontsize=ANNOTATION_SIZE,
                color=ANNOTATION_COLOUR)


def plot(sa_app, min_freq, max_freq, rbw, freq, pwr, plot_file):
    init_style()
    fig = plt.figure(num=None, figsize=(6.0, 3), dpi=72)
    axes = fig.add_subplot(1, 1, 1)

    min_pwr = sa_app.ref_level - 10*sa_app.ampl_scale
    max_pwr = sa_app.ref_level

    axes.set_xlim(min_freq, max_freq)
    #axes.set_xticks(np.linspace(min_freq, max_freq, 11))
    # axes.xaxis.set_ticklabels([])
    axes.set_ylim(min_pwr, max_pwr)
    #axes.set_yticks(np.linspace(min_pwr, max_pwr, 11))
    axes.tick_params(axis="x", which="both", direction="in")
    axes.tick_params(axis="y", which="both", length=0.0)
    axes.grid(linestyle=':')
    axes.grid(which='both', axis='x', linestyle=':')
    axes.plot(freq, pwr)

    annotate_plot(sa_app, axes, rbw)

    fig.tight_layout()
    plt.savefig(plot_file)


def main():
    default_rpyc_addr = "127.0.0.1"
    default_rpyc_port = 18865
    default_sweep_type = "RANDOM"
    default_sweep_count = 40
    default_section_span = 20.0
    default_plot_file = "wb-plot.svg"
    default_detector_type = "NORMAL"

    parser = ArgumentParser(description=
                            "Generate a wideband spectrum plot",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpyc_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpyc_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})")
    parser.add_argument(
        "-s", "--start_freq", type=float, required=True,
        help="Start frequency for the wideband sweep")
    parser.add_argument(
        "-e", "--stop_freq", type=float, required=True,
        help="Stop frequency for the wideband sweep")
    parser.add_argument(
        "-S", "--section_span", type=float, default=default_section_span,
        help=textwrap.dedent(f'''\
        Frequency span of each section of the wideband sweep.
        Default: {default_section_span}'''))
    parser.add_argument(
        "-R", "--rbw", type=float, default=None,
       help="RBW of the sweeps (in Hz).")
    parser.add_argument(
        "-D", "--detector",
        choices=["NORMAL", "SAMPLE", "POS", "NEG"],
        default=default_detector_type,
        help=textwrap.dedent(f'''\
        The detector type to use for the wideband sweep.
        Default: {default_detector_type}'''))
    parser.add_argument(
        "-T", "--sweep_type", choices=["LINEAR", "RANDOM"],
        default=default_sweep_type,
        help=textwrap.dedent(f'''\
        The type of sweep for each section of the wideband sweep.
        Default: {default_sweep_type}'''))
    parser.add_argument(
        "-C", "--sweep_count", type=int, default=default_sweep_count,
        help=textwrap.dedent(f'''\
        The number of sweeps to average for each section of the wideband sweep.
        Default: {default_sweep_count}'''))
    parser.add_argument(
        "-P", "--plot_file", default=default_plot_file,
        help=textwrap.dedent(f'''\
        The file to write the plot of the wideband sweep.
        Default: {default_plot_file}'''))
    parser.add_argument(
        "-O", "--output_file",
        help="The file to write the wideband sweep data to.")

    args = parser.parse_args()

    if args.sweep_type == "RANDOM":
        sweep_type = RANDOM
    else:
        sweep_type = LINEAR

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app

    raw_freq, raw_pwr = wideband_sweep(
        sa_app, args.start_freq, args.stop_freq,
        args.section_span, args.rbw, detector=DETECTOR_TYPES[args.detector],
        sweep_type=sweep_type, sweep_count=args.sweep_count)
    freq, pwr = detector(DETECTOR_TYPES[args.detector],
                         raw_freq, raw_pwr, WIDEBAND_SWEEP_BINS)
    if args.output_file is not None:
        pass
    plot(sa_app, args.start_freq, args.stop_freq, args.rbw,
         freq, pwr, args.plot_file)


if __name__ == '__main__':
    main()

Measure phase noise

Example phase noise plot

Figure 13: Example phase noise plot for a 1 GHz signal.

usage: pn.py [-h] [-a SAADDR] [-p SAPORT] [-C {1,2}] [-P PLOT_FILE] [-T PLOT_TITLE] [-R] carrier_freq

Measure phase noise

positional arguments:
  carrier_freq          Frequency of the carrier tone (in MHz)

optional arguments:
  -h, --help            show this help message and exit
  -a SAADDR, --saaddr SAADDR
                        IP address for the RPSA RPyC service (127.0.0.1)
  -p SAPORT, --saport SAPORT
                        TCP port for the RPSA RPyC service (18865)
  -C {1,2}, --channel {1,2}
                        Channel spectra for phase noise measurement (1).
  -P PLOT_FILE, --plot_file PLOT_FILE
                        Write the phase noise plot to this file (pn.svg)
  -T PLOT_TITLE, --plot_title PLOT_TITLE
                        Title to use for the phase noise plot
  -R, --resample        Resample the phase noise plot.
import sys
from math import log10, floor
from dataclasses import dataclass
from argparse import ArgumentParser, RawTextHelpFormatter
import json

import numpy as np

import matplotlib
import matplotlib.pyplot as plt
matplotlib.use('Agg')
from dyadic.splot import init_style

import rpyc

from sa.utils import (
    FREQ_SUFFIXES,
    DETECTOR_TYPE_NORMAL, DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_POS,
    DETECTOR_TYPE_NEG
)


RESAMPLED_SWEEP_BINS = 1000

# Phase noise sweeps are specified as tuples with the following values:
#
#  span (MHz),
#  centre freq. offset (from carrier, MHz)
#  RBW (Hz)
#  Trace averages
#  Frequency of first sample bin (in MHz, offset from carrier)
#  Frequency of last sample bin (in MHz, offset from carrier)
#
# For example: (1.0, 0.45, 2000, 30, 0.25, 0.5)
# Uses a spectrum analyzer frequency span of 1 MHz, places the
# centre frequency of the analyzer as (carrier freq.) + 0.45 MHz,
# sets the analyzer RBW to 2000 Hz, averages over 30 sweeps.
# The samples extracted from the resulting spectrum are taken
# from 0.25 to 0.5 MHz (offset from carrier frequency).
#
pn_sweeps = [
    (2.0, 0.9, 2000, 30, 0.5, 1.5),
    (1.0, 0.45, 2000, 30, 0.25, 0.5),
    (0.5, 0.24, 1000, 30, 0.1, 0.25),
    (0.2, 0.095, 500, 30, 0.05, 0.1),
    (0.1, 0.045, 200, 20, 0.025, 0.05),
    (0.05, 0.022, 200, 20, 0.01, 0.025),
    (0.02, 0.008, 100, 20, 0.005, 0.01),
    (0.01, 0.004, 20, 20, 0.002, 0.005),
    (0.005, 0.002, 20, 20, 0.001, 0.002),
    (0.002, 0.0008, 10, 10, 0.0005, 0.001),
    (0.001, 0.0004, 5, 10, 0.00025, 0.0005),
    (0.0005, 0.0002, 5, 10, 0.0001, 0.00025)
]


@dataclass
class SweepParameters:
    span: float
    freq_offset: float
    rbw: float
    trace_avg: int
    sample_low: float
    sample_high: float


def peak_detect(pwr, resampled_idx, window_size):
    """Find peaks within a spectrum prior to resampling.

    :param pwr: A numpy array containing the spectrum power values
             as computed by the :py:meth:`power_spectrum` function.
    :type pwr: nparray
    :param resampled_idx: Array indexes for the resampled spectrum
    :type resampled_idx: List[int]
    :param window_size: The window size for the resampled spectrum
    :type window_size: int

    :return: A numpy array of bool values. Each value corresponds to
             a value in the resampled spectrum and is True if the
             associated power value is a peak and False if the value
             is noise.
    """

    NOISE_PERCENTILE = 50
    STD_THRESHOLD = 4.0
    MEAN_THRESHOLD = 4.0

    def stats(arr):
        return np.mean(arr), np.std(arr)

    j = k = window_size//2
    if window_size % 2:
        k += 1
    means = []
    stds = []
    for i in resampled_idx:
        if i - j < 0:
            m, s = stats(pwr[0:k])
        elif i + k >= len(pwr):
            m, s = stats(pwr[i-j:])
        else:
            m, s = stats(pwr[i-j:i+k])
        means.append(m)
        stds.append(s)
    baseline_mean = np.percentile(means, NOISE_PERCENTILE)
    baseline_std = np.percentile(stds, NOISE_PERCENTILE)
    bins = len(resampled_idx)
    if baseline_std == 0 or baseline_mean == 0:
        is_peak = np.ones(bins, dtype=bool)
    else:
        is_peak = np.where(
            ((stds/baseline_std > STD_THRESHOLD)
             & (means/baseline_mean > MEAN_THRESHOLD)),
            np.ones(bins, dtype=bool),
            np.zeros(bins, dtype=bool))
    return is_peak


def detector(detect_type, freq, pwr, bins):
    """Resample a power spectrum.

    :param detect_type: The detector type to use when resampling.
                       One of DETECTOR_TYPE_POS, DETECTOR_TYPE_NEG,
                       DETECTOR_TYPE_SAMPLE, DETECTOR_TYPE_NORMAL.
    :type detect_type: int.
    :param freq: Spectrum frequency array
    :type freq: nparray
    :param pwr: Spectrum power array
    :type pwr: nparray
    :param bins: The number of bins required in the resampled spectrum.
    :type bins: int
    """

    def max_pwr(arr, idx):
        if idx - j < 0:
            a = arr[0:k]
        elif idx + k >= len(pwr):
            a = arr[idx-j:]
        else:
            a = arr[idx-j:idx+k]
        return a.max()

    def min_pwr(arr, idx):
        if idx - j < 0:
            a = arr[0:k]
        elif idx + k >= len(pwr):
            a = arr[idx-j:]
        else:
            a = arr[idx-j:idx+k]
        return a.min()

    def sample_pwr(arr, idx):
        if idx - j < 0:
            a = arr[0:k]
        elif idx + k >= len(pwr):
            a = arr[idx-j:]
        else:
            a = arr[idx-j:idx+k]
        return np.take(a, a.size // 2)

    f_start = freq[0]
    f_stop = freq[-1]

    window_size = floor(len(freq)/bins)
    j = k = window_size//2
    if window_size % 2:
        k += 1
    resampled_freqs = np.linspace(f_start, f_stop, bins)
    indexes = [(np.abs(freq-f)).argmin() for f in resampled_freqs]
    resampled_pwr = []
    if detect_type == DETECTOR_TYPE_POS:
        for i in indexes:
            resampled_pwr.append(max_pwr(pwr, i))
    elif detect_type == DETECTOR_TYPE_NEG:
        for i in indexes:
            resampled_pwr.append(min_pwr(pwr, i))
    elif detect_type == DETECTOR_TYPE_SAMPLE:
        for i in indexes:
            resampled_pwr.append(sample_pwr(pwr, i))
    elif detect_type == DETECTOR_TYPE_NORMAL:
        is_peak = peak_detect(pwr, indexes, window_size)
        for i in range(bins):
            if is_peak[i]:
                resampled_pwr.append(max_pwr(pwr, indexes[i]))
            else:
                if i % 2:
                    # Odd
                    resampled_pwr.append(min_pwr(pwr, indexes[i]))
                else:
                    resampled_pwr.append(max_pwr(pwr, indexes[i]))

    return (resampled_freqs, resampled_pwr)


def find_nearest_index(array, value):
    return (np.abs(array - value)).argmin()


def initialize(sa_app, sa_chan):
    sa_app.channel = sa_chan
    sa_app.fe_atten = 0.0
    sa_app.ampl_scale = 12.0
    sa_app.detector_type_pos()
    sa_app.auto_rbw = False
    sa_app.marker_normal()


def sweep(sa_app, f_centre, f_span, rbw, sweep_count=20):
    sa_app.freq_span = f_span
    sa_app.centre_freq = f_centre
    sa_app.rbw = rbw
    sa_app.trace_type_normal()
    sa_app.sweep()
    if sweep_count > 1:
        sa_app.trace_type_avg()
        sa_app.sweeps(sweep_count)
    sa_app.peak_search()
    return sa_app.marker_posn


def measure_pn(sa_app, peak_pwr, rbw, f_low, f_high):
    data = sa_app.data_store()
    freq_arr = np.array(data.freq)
    pwr_arr = np.array(data.pwr)
    pn_arr = (pwr_arr - peak_pwr) - 10*log10(rbw)
    idx_low = find_nearest_index(freq_arr, f_low)
    idx_high = find_nearest_index(freq_arr, f_high)
    return freq_arr[idx_low:idx_high], pn_arr[idx_low:idx_high]


def create_fig(title):
    fig = plt.figure(figsize=(6.0, 4.0), dpi=72)
    ax = fig.add_subplot(111)
    if len(title):
        _ = ax.set_title(title)
    _ = ax.set_xlim(100, 2e6)
    _ = ax.set_xscale('log')
    _ = ax.set_xticks([100, 200, 500, 1e3, 2e3, 5e3, 1e4, 2e4, 5e4,
                       1e5, 2e5, 5e5, 1e6, 2e6])
    _ = ax.set_xticklabels(['100', '', '', '1K', '', '', '10K', '', '',
                            '100K', '', '', '1M', ''])
    _ = ax.set_xlabel('Frequency (Hz)')
    _ = ax.set_ylim(-160, -60)
    _ = ax.set_yticks(np.linspace(-160, -60, 11))
    _ = ax.set_yticklabels(['', '-150', '-140', '-130', '-120',
                            '-110', '-100', '-90', '-80', '-70', '-60'])
    _ = ax.set_ylabel('Phase Noise (dBc/Hz)')
    ax.grid(linestyle=':')
    ax.grid(which='both', axis='x', linestyle=':')
    return fig, ax


def main():
    default_rpyc_addr = "127.0.0.1"
    default_rpyc_port = 18865
    default_sa_chan = "1"
    default_plotfile = "pn.svg"
    default_savefile = "pn.json"

    parser = ArgumentParser(description=
                            "Measure phase noise",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpyc_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpyc_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})")
    parser.add_argument(
        "carrier_freq",
        help="Frequency of the carrier tone (in MHz)")
    parser.add_argument(
        "-C", "--channel", choices=["1", "2"], default=default_sa_chan,
        help=f"Channel spectra for phase noise measurement ({default_sa_chan}).")
    parser.add_argument(
        "-P", "--plot_file",
        default=default_plotfile,
        help=f"Write the phase noise plot to this file ({default_plotfile})")
    parser.add_argument(
        "-S", "--save_file",
        default=default_savefile,
        help=f"Write phase noise data to this file ({default_savefile})")
    parser.add_argument(
        "-T", "--plot_title",
        help=f"Title to use for the phase noise plot")
    parser.add_argument(
        "-R", "--resample", action='store_true',
        help="Resample the phase noise plot.")

    args = parser.parse_args()

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app
    sa_chan = int(args.channel) - 1

    freq_arr = np.array([])
    pn_arr = np.array([])
    initialize(sa_app, sa_chan)
    f_sig, p_sig = sweep(sa_app, float(args.carrier_freq), 2.0, 5000, 1)
    for pn_sweep in pn_sweeps:
        sweep_params = SweepParameters(*pn_sweep)
        f_sig, p_sig = sweep(sa_app, f_sig + sweep_params.freq_offset,
                             sweep_params.span, sweep_params.rbw,
                             sweep_params.trace_avg)
        sample_low = f_sig + sweep_params.sample_low
        sample_high = f_sig + sweep_params.sample_high
        f_arr, p_arr = measure_pn(sa_app, p_sig, sweep_params.rbw,
                                  sample_low, sample_high)
        freq_arr = np.insert(freq_arr, 0, f_arr)
        pn_arr = np.insert(pn_arr, 0, p_arr)

    # Convert the frequencies to offsets from the carrier.
    # Note that we use the last measured value of the carrier
    # frequency peak
    freq_arr -= f_sig
    if args.resample is True:
        freq_arr, pn_arr = detector(DETECTOR_TYPE_NORMAL,
                                     freq_arr, pn_arr, RESAMPLED_SWEEP_BINS)

    with open(args.save_file, 'w') as fd:
        json.dump({'freq': list(freq_arr), 'pn': list(pn_arr)}, fd)

    fig, ax = create_fig(args.plot_title)
    _ = ax.plot(freq_arr * 1e6, pn_arr)
    fig.tight_layout()
    fig.savefig(args.plot_file)


if __name__ == '__main__':
    main()

Measure IM3 and IP3

This makes use of the Simple RF Signal Generator and the Low Spurious RF Signal Generator to produce both the two tone test signal and the mixer LO signal.

An external resistive bridge coupler is used to combine tones from channel 1 of the RFGen and the low spurious signal generator. Two generators are used in this configuration in order to keep channel cross talk to a minimum.

LO signal power is taken from channel 2 of the RFGen signal generator with the signal power being measured from the channel 2 monitor port.

Note that when using a down converter RF front end for the spectrum analyzer, mixer spurs and images may become a problem. In order to manage this the --im3_lo_offset command line parameter is provided in order to shift any such spurious mixer products away from the signal of interest. The same thing can be done for the fundamental tones by using the --lo_offset command line parameter.

IM3 down conv. test setup

Figure 14a: Test setup for measuring mixer down conversion IM3 and IP3.

IM3 up conv. test setup

Figure 14b: Test setup for measuring mixer up conversion IM3 and IP3.

usage: ip3.py [-h] [-a SAADDR] [-p SAPORT] [-C {1,2}] [--fe_atten FE_ATTEN] [--im3_span IM3_SPAN] [--tone_span TONE_SPAN]
              [--lo_freq LO_FREQ] [--lo_offset LO_OFFSET] [--im3_lo_offset IM3_LO_OFFSET]
              lower_tone upper_tone

Measure intermodulation distortion and IP3

positional arguments:
  lower_tone            Frequency of the lower fundamental tone (in MHz)
  upper_tone            Frequency of the upper fundamental tone (in MHz)

optional arguments:
  -h, --help            show this help message and exit
  -a SAADDR, --saaddr SAADDR
                        IP address for the RPSA RPyC service (127.0.0.1)
  -p SAPORT, --saport SAPORT
                        TCP port for the RPSA RPyC service (18865)
  -C {1,2}, --channel {1,2}
                        The target channel for the measurement.
  --fe_atten FE_ATTEN   Front end attenuation to use ({default_fe_atten} dB)
  --im3_span IM3_SPAN   SA frequency span when measuring IM3 amplitudes (in MHz)
  --tone_span TONE_SPAN
                        SA frequency span when measuring tone amplitudes (in MHz)
  --lo_freq LO_FREQ     LO frequency (in MHz). Used to adjust reported frequencies.
                        Positive LO frequencies imply low side injection.
                        Negative LO frequencies imply high side injection
  --lo_offset LO_OFFSET
                        SA LO offset (in MHz) when using a down converter RF frontend
  --im3_lo_offset IM3_LO_OFFSET
                        SA LO offset (in MHz) when measuring IM3 amplitudes using
                        a down converter RF frontend

Code for the ip3.py script:

import sys
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import rpyc


AVERAGING_SWEEPS = 10


def save_settings(sa_app):
    settings = {}

    settings['centre_freq'] = sa_app.centre_freq
    settings['start_freq'] = sa_app.start_freq
    settings['stop_freq'] = sa_app.stop_freq
    settings['freq_span'] = sa_app.freq_span
    settings['ref_level'] = sa_app.ref_level
    settings['ampl_scale'] = sa_app.ampl_scale
    settings['atten'] = sa_app.atten
    settings['auto_rbw'] = sa_app.auto_rbw
    settings['rbw'] = sa_app.rbw
    settings['detector_type'] = sa_app.detector_type
    settings['trace_type'] = sa_app.trace_type
    if sa_app.chan_type_is_direct is False:
        settings['fe_atten'] = sa_app.fe_atten
    return settings


def restore_settings(sa_app, settings):
    sa_app.centre_freq = settings['centre_freq']
    sa_app.start_freq = settings['start_freq']
    sa_app.stop_freq = settings['stop_freq']
    sa_app.freq_span = settings['freq_span']
    sa_app.ref_level = settings['ref_level']
    sa_app.ampl_scale = settings['ampl_scale']
    sa_app.atten = settings['atten']
    sa_app.auto_rbw = settings['auto_rbw']
    sa_app.rbw = settings['rbw']
    sa_app.detector_type = settings['detector_type']
    sa_app.trace_type = settings['trace_type']
    if sa_app.chan_type_is_direct is False:
        sa_app.fe_atten = settings['fe_atten']


def sweep(sa_service, sa_app, sweep_count=AVERAGING_SWEEPS):
    rpyc_timeout = sa_service._config['sync_request_timeout']
    sa_service._config['sync_request_timeout'] = None
    sa_app.sweeps(sweep_count)
    sa_service._config['sync_request_timeout'] = rpyc_timeout


def measure_tone(sa_service, sa_app, freq, span, sweep_count=AVERAGING_SWEEPS):
    sa_app.centre_freq = freq
    sa_app.freq_span = span
    sa_app.sweep()
    sa_app.trace_type_avg()
    sweep(sa_service, sa_app, sweep_count)
    sa_app.marker_normal()
    sa_app.peak_search()
    tone_freq, tone_pwr = sa_app.marker_posn
    return tone_freq, tone_pwr


def measure_ip3(sa_service, sa_app,
                lower: float, upper: float,
                fe_atten=0.0, rbw=None,
                tone_span=None, im3_span=None, lo_offset=None,
                im3_lo_offset=None, lo_freq=None) -> str:

    LOWER = 0
    UPPER = 1
    PWR = 1
    FREQ = 0

    tone_separation = upper - lower
    lower_im3 = lower - tone_separation
    upper_im3 = upper + tone_separation

    if rbw is None:
        sa_app.auto_rbw = True
    else:
        sa_app.rbw = rbw
    if sa_app.chan_type_is_direct is False:
        sa_app.fe_atten = fe_atten
    if lo_offset is not None:
        sa_app.felo_offset = lo_offset

    if tone_span is None:
        tone_span = tone_separation
    tone1_freq, tone1_pwr = measure_tone(sa_service, sa_app,
                                         upper, tone_span, sweep_count=5)
    tone2_freq, tone2_pwr = measure_tone(sa_service, sa_app,
                                         lower, tone_span, sweep_count=5)
    if tone1_freq < tone2_freq:
        fundamentals = [(tone1_freq, tone1_pwr), (tone2_freq, tone2_pwr)]
    else:
        fundamentals = [(tone2_freq, tone2_pwr), (tone1_freq, tone1_pwr)]

    if im3_span is None:
        im3_span = tone_separation / 2

    sa_app.freq_span = im3_span
    sa_app.centre_freq = lower_im3
    if im3_lo_offset is not None:
        sa_app.felo_offset = im3_lo_offset
    sa_app.sweeps(1)
    sweep(sa_service, sa_app)
    sa_app.peak_search()
    im3_freq, im3_pwr = sa_app.marker_posn
    im3 = [(im3_freq, im3_pwr)]
    sa_app.centre_freq = upper_im3
    if im3_lo_offset is not None:
        sa_app.felo_offset = im3_lo_offset
    sa_app.sweeps(1)
    sweep(sa_service, sa_app)
    sa_app.peak_search()
    im3_freq, im3_pwr = sa_app.marker_posn
    im3.append((im3_freq, im3_pwr))
    ip3 = [
        fundamentals[LOWER][PWR] + ((fundamentals[UPPER][PWR] - im3[LOWER][PWR]) / 2),
        fundamentals[UPPER][PWR] + ((fundamentals[LOWER][PWR] - im3[UPPER][PWR]) / 2)
    ]

    rf1 = fundamentals[LOWER][FREQ]
    rf2 = fundamentals[UPPER][FREQ]
    im3_f1 = im3[LOWER][FREQ]
    im3_f2 = im3[UPPER][FREQ]
    if lo_freq is not None:
        if lo_freq > 0.0:
            rf1 += lo_freq
            rf2 += lo_freq
            im3_f1 += lo_freq
            im3_f2 += lo_freq
        else:
            rf1 = (-lo_freq) - rf1
            rf2 = (-lo_freq) - rf2
            im3_f1 = (-lo_freq) - im3_f1
            im3_f2 = (-lo_freq) - im3_f2

    return f"{rf1:.2f}:{fundamentals[LOWER][PWR]:.2f} {rf2:.2f}:{fundamentals[UPPER][PWR]:.2f} {im3_f1:.2f}:{im3[LOWER][PWR]:.2f} {im3_f2:.2f}:{im3[UPPER][PWR]:.2f} {ip3[LOWER]:.2f} {ip3[UPPER]:.2f}  {fe_atten:.1f}"


def main():
    default_rpyc_addr = "127.0.0.1"
    default_rpyc_port = 18865
    default_fe_atten = 0.0

    parser = ArgumentParser(description=
                            "Measure intermodulation distortion and IP3",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpyc_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpyc_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpyc_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpyc_port})")
    parser.add_argument(
        "-C", "--channel", choices=["1", "2"],
        help=textwrap.dedent('''\
        The target channel for the measurement.
        '''))
    parser.add_argument(
        "lower_tone",
        help="Frequency of the lower fundamental tone (in MHz)")
    parser.add_argument(
        "upper_tone",
        help="Frequency of the upper fundamental tone (in MHz)")
    parser.add_argument(
        "--fe_atten", type=float, default=default_fe_atten,
        help="Front end attenuation to use ({default_fe_atten} dB)")
    parser.add_argument(
        "--im3_span", type=float,
        help="SA frequency span when measuring IM3 amplitudes (in MHz)")
    parser.add_argument(
        "--tone_span", type=float,
        help="SA frequency span when measuring tone amplitudes (in MHz)")
    parser.add_argument(
        "--lo_freq", type=float,
        help=textwrap.dedent('''\
        LO frequency (in MHz). Used to adjust reported frequencies.
        Positive LO frequencies imply low side injection.
        Negative LO frequencies imply high side injection'''))
    parser.add_argument(
        "--lo_offset", type=float,
        help=textwrap.dedent('''\
        SA LO offset (in MHz) when using a down converter RF frontend'''))
    parser.add_argument(
        "--im3_lo_offset", type=float,
        help=textwrap.dedent('''\
        SA LO offset (in MHz) when measuring IM3 amplitudes using
        a down converter RF frontend'''))

    args = parser.parse_args()

    chan = 0
    if args.channel is not None:
        chan = int(args.channel) - 1

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app

    sa_app.channel = chan
    settings = save_settings(sa_app)
    im3_str = measure_ip3(
        sa,
        sa_app,
        float(args.lower_tone), float(args.upper_tone),
        tone_span=args.tone_span, im3_span=args.im3_span, fe_atten=args.fe_atten,
        lo_offset=args.lo_offset, im3_lo_offset=args.im3_lo_offset,
        lo_freq=args.lo_freq)

    print(f"{im3_str}")

    restore_settings(sa_app, settings)
    sa_app.sweep()


if __name__ == '__main__':
    main()

Functions from ip3.py and rfgen.py (from Measure/set signal generator output level) can be used in a script to characterise mixer IP3/IM3 over a range of frequencies.

The ip3-downconv.py is intended to characterise IP3/IM3 for mixer down conversion. See Figure 14a for the test setup. Note that the single conversion down converter is configured to make use of one channel only:

python frontend -d socket://192.168.0.155:7006 -F frontend/config/hw-single-chan-config.json

The IF output is base band and is connected directly to the RPSA.

import time
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import numpy as np
import rpyc
from tam import (
    SIGGEN_LIMITS, SMHU58_GPIBID,
    InstrumentMgr, InstrumentInitializeException,
    UnknownInstrumentModelException
)
from ip3 import measure_ip3
from rfgen import measure_or_set_signal_level

TONE_SEPARATION = 2.0


def set_rf(rf_chan, f_rf, rf_pwr):
    rf_chan.plo_ctl.freq = float(f_rf)
    rf_chan.level = rf_pwr
    lock_status = rf_chan.configure()
    return lock_status


def read_ip3_settings(settings_file):
    with open(settings_file) as fd:
        lines = fd.readlines()

    settings = {}
    for line in lines:
        l = line.strip()
        if len(l) == 0 or l[0] == '#':
            continue
        tone_freq_str, tone1_pwr, tone2_pwr, lo_freq, lo_pwrs = line.split()
        lo_pwr_range = [float(l) for l in lo_pwrs.split(',')]
        if len(lo_pwr_range) == 1:
            lo_pwr_array = np.array(lo_pwr_range)
        else:
            lo_pwr_range[-1] += 1.0
            lo_pwr_array = np.arange(*lo_pwr_range, 1.0)
        tone_freq = float(tone_freq_str)
        settings[tone_freq_str] = ((tone_freq, float(tone1_pwr)),
                                   (tone_freq+TONE_SEPARATION, float(tone2_pwr)),
                                   float(lo_freq), lo_pwr_array)
    return settings


def main():

    default_im3_span = 0.1
    default_lo_offset = 12.5
    default_im3_lo_offset = 15.5

    parser = ArgumentParser(description=
                            "Measure mixer IMD and IP3",
                            formatter_class=RawTextHelpFormatter)
    parser.add_argument(
        "--smhu_lo", action='store_true',
        help="Use SMHU signal gen. to produce the LO signal"
    )
    parser.add_argument(
        "--im3_span", type=float, default=default_im3_span,
        help=f"SA frequency span when measuring IM3 amplitudes (Default: {default_im3_span} in MHz)")
    parser.add_argument(
        "--lo_offset", type=float,
        help=textwrap.dedent(f'''\
        SA LO offset (in MHz) when using a down converter RF frontend
        (Default: {default_lo_offset})'''))
    parser.add_argument(
        "--im3_lo_offset", type=float,
        help=textwrap.dedent(f'''\
        SA LO offset (in MHz) when measuring IM3 amplitudes using
        a down converter RF frontend. (Default: {default_im3_lo_offset})'''))

    args = parser.parse_args()

    sa = rpyc.connect("127.0.0.1", 18865)
    sa_app = sa.root.sa_app
    rfgen = rpyc.connect('127.0.0.1', 18864)
    lowspur_rfgen = rpyc.connect('127.0.0.1', 18867)
    rfgen_app = rfgen.root

    rfgen_chan = rfgen_app.channels['Chan 1']
    sa_tone_chan = 1
    rfgen_lo_chan = rfgen_app.channels['Chan 2']
    sa_lo_chan = 0
    lowspur_chan = lowspur_rfgen.root.plo_chan

    if args.smhu_lo is True:
        smhu_mgr = InstrumentMgr()
        smhu = smhu_mgr.open_instrument('SMHU58', SMHU58_GPIBID)

    settings = read_ip3_settings('ip3-downconv-settings.txt')

    for tone_str in ['850']:
        tone1, tone2, lo_freq, lo_pwrs = settings[tone_str]
        lock_status = set_rf(rfgen_chan, tone1[0], tone1[1])
        lock_status = set_rf(lowspur_chan, tone2[0], tone2[1])
        for lo_pwr in lo_pwrs:
            if args.smhu_lo is True:
                smhu.output = True
                smhu.freq = float(lo_freq)
                smhu.level = float(lo_pwr)
                time.sleep(0.25)
                freq, pwr = lo_freq, lo_pwr
            else:
                _, lo_pwr = measure_or_set_signal_level(
                    sa, sa_app,
                    rfgen_lo_chan, sa_lo_chan,
                    lo_freq, float(lo_pwr), 0.1, 5)
            sa_app.channel = sa_tone_chan
            #
            # Note: currently set up to measure IP3 for down conversion
            # to base band so lower and upper tones are 30.0 and 32.0 MHz
            #
            im3_str = measure_ip3(sa, sa_app,
                                  30.0, 32.0, tone_span=None,
                                  im3_span=args.im3_span, fe_atten=15.0,
                                  lo_offset=args.lo_offset,
                                  im3_lo_offset=args.im3_lo_offset,
                                  lo_freq=lo_freq)
            print(f"{im3_str} {lo_pwr:.2f}")
        print("----------")

    if args.smhu_lo is True:
        smhu_mgr.close_instrument(smhu)
        smhu_mgr.close()


if __name__ == '__main__':
    main()

The ip3-upconv.py is intended to characterise IP3/IM3 for mixer up conversion. See Figure 14b for the test setup. In this case the single conversion down converter front end is configured for both channels, channel 1 for the LO signal, and channel 2 for the upconverted RF signal.

import time
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import numpy as np
import rpyc
from tam import (
    SIGGEN_LIMITS, SMHU58_GPIBID,
    InstrumentMgr, InstrumentInitializeException,
    UnknownInstrumentModelException
)
from ip3 import measure_ip3
from rfgen import measure_or_set_signal_level

TONE_SEPARATION = 2.0


def read_ip3_settings(settings_file, tone_offset):
    with open(settings_file) as fd:
        lines = fd.readlines()

    settings = {}
    for line in lines:
        l = line.strip()
        if len(l) == 0 or l[0] == '#':
            continue
        tone_freq_str, lo_freq, lo_pwrs = line.split()
        lo_pwr_range = [float(l) for l in lo_pwrs.split(',')]
        if len(lo_pwr_range) == 1:
            lo_pwr_array = np.array(lo_pwr_range)
        else:
            lo_pwr_range[-1] += 1.0
            lo_pwr_array = np.arange(*lo_pwr_range, 1.0)
        tone_freq = float(tone_freq_str) + tone_offset
        settings[tone_freq_str] = (tone_freq, tone_freq+TONE_SEPARATION,
                                   float(lo_freq), lo_pwr_array)
    return settings

def main():
    default_tone_offset = 0.0

    parser = ArgumentParser(description=
                            "Measure mixer IMD and IP3",
                            formatter_class=RawTextHelpFormatter)
    parser.add_argument(
        "--smhu_lo", action='store_true',
        help="Use SMHU signal gen. to produce the LO signal"
    )
    parser.add_argument(
        "--tone_offset", type=float,
        default=default_tone_offset,
        help="Offset for the specified tone frequency (in MHz)"
    )

    args = parser.parse_args()

    sa = rpyc.connect("127.0.0.1", 18865)
    sa_app = sa.root.sa_app
    if args.smhu_lo is False:
        rfgen = rpyc.connect('127.0.0.1', 18864)
        rfgen_app = rfgen.root
        rfgen_lo_chan = rfgen_app.channels['Chan 2']
    ddsgen = rpyc.connect('127.0.0.1', 18862)
    ddsgen_app = ddsgen.root

    sa_lo_chan = 1
    sa_rf_chan = 0

    if args.smhu_lo is True:
        smhu_mgr = InstrumentMgr()
        smhu = smhu_mgr.open_instrument('SMHU58', SMHU58_GPIBID)

    settings = read_ip3_settings('ip3-upconv-settings.txt',
                                 args.tone_offset)

    for tone_str in ['200', '210', '220', '230', '240', '250', '260',
                     '270', '280', '290']:
        lower_tone, upper_tone, lo_freq, lo_pwrs = settings[tone_str]
        for lo_pwr in lo_pwrs:
            if args.smhu_lo is True:
                smhu.output = True
                smhu.freq = float(lo_freq)
                smhu.level = float(lo_pwr)
                time.sleep(0.25)
                freq, pwr = lo_freq, lo_pwr
            else:
                freq, pwr = measure_or_set_signal_level(
                    sa, sa_app, rfgen_lo_chan, sa_lo_chan,
                    lo_freq, float(lo_pwr), 0.1, 5)
            sa_app.channel = sa_rf_chan
            im3_str = measure_ip3(
                sa, sa_app, lower_tone, upper_tone, tone_span=None,
                im3_span=0.05, fe_atten=15.0,
                lo_offset=12.5, im3_lo_offset=15.5)
            print(f"{im3_str} {pwr:.2f}")
        print("----------")

    if args.smhu_lo is True:
        smhu_mgr.close_instrument(smhu)
        smhu_mgr.close()


if __name__ == '__main__':
    main()

Measure mixer insertion loss

IM3 test setup

Test setup for measuring mixer insertion loss.

A single channel of the spectrum analyzer down converter frontend is used with the RP spectrum analyzer to measure the RF signal power. The mixer IF signal power is measured directly by the spectrum analyzer. The down converter frontend software is therefore configured to match this set up:

python frontend -d socket://192.168.0.155:7006 -F frontend/config/hw-single-chan-config.json
usage: mixer_il.py [-h] [-a SAADDR] [-p SAPORT] [-A RFGEN_ADDR] [-P RFGEN_PORT] [-R RF_FREQ RF_FREQ RF_FREQ] [-I IF_FREQ]
                   [-L LO_PWR]

Measure mixer insertion loss or feedthrough

optional arguments:
  -h, --help            show this help message and exit
  -a SAADDR, --saaddr SAADDR
                        IP address for the RPSA RPyC service (127.0.0.1)
  -p SAPORT, --saport SAPORT
                        TCP port for the RPSA RPyC service (18865)
  -A RFGEN_ADDR, --rfgen_addr RFGEN_ADDR
                        IP address for the RFGen RPyC service (127.0.0.1)
  -P RFGEN_PORT, --rfgen_port RFGEN_PORT
                        TCP port for the RFGen RPyC service (18864)
  -R RF_FREQ RF_FREQ RF_FREQ, --rf_freq RF_FREQ RF_FREQ RF_FREQ
                        Mixer RF freq. range to cover. This is a triplet of float
                        numbers in the format: <start> <stop> <step>.
                        These values are passed to numpy.arange in order to generate
                        an array of RF freq. values.
                        Default: [200.0, 3950.0, 100.0]
  -I IF_FREQ, --if_freq IF_FREQ
                        The Mixer IF freq. to use (30.0)
  -L LO_PWR, --lo_pwr LO_PWR
                        The LO power level to use, in dBm. (0.0)
import os
import sys
import time
import rpyc
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import numpy as np

from rfblocks import BridgeCoupler
from tam import (
    SIGGEN_LIMITS, SMHU58_GPIBID,
    InstrumentMgr, InstrumentInitializeException,
    UnknownInstrumentModelException
)


def read_loss_settings(settings_file):
    with open(settings_file) as fd:
        lines = fd.readlines()

    settings = {}
    for line in lines:
        l = line.strip()
        if len(l) == 0 or l[0] == '#':
            continue
        tone_freq_str, lo_freq, lo_pwrs = line.split()
        lo_pwr_range = [float(l) for l in lo_pwrs.split(',')]
        if len(lo_pwr_range) == 1:
            lo_pwr_array = np.array(lo_pwr_range)
        else:
            lo_pwr_range[-1] += 1.0
            lo_pwr_array = np.arange(*lo_pwr_range, 1.0)
        tone_freq = float(tone_freq_str)
        settings[tone_freq_str] = (tone_freq, float(lo_freq), lo_pwr_array)
    return settings


def measure_rf(sa_app, rf_sa_chan, rf_chan, f_rf):
    sa_app.channel = rf_sa_chan
    sa_app.centre_freq = f_rf
    sa_app.freq_span = 0.5
    sa_app.trace_type_normal()
    sa_app.sweep()
    sa_app.trace_type_avg()
    sa_app.sweeps(5)
    sa_app.marker_normal()
    sa_app.peak_search()
    mon_freq, mon_pwr = sa_app.marker_posn
    actual_rf_pwr = mon_pwr - rf_chan.coupler.coupling(mon_freq) \
        + rf_chan.coupler.insertion_loss(mon_freq)
    return actual_rf_pwr


def measure_if(sa_app, if_sa_chan, freq_if):
    sa_app.channel = if_sa_chan
    sa_app.centre_freq = freq_if
    sa_app.freq_span = 0.5
    sa_app.trace_type_normal()
    sa_app.sweep()
    sa_app.trace_type_avg()
    sa_app.sweeps(5)
    sa_app.marker_normal()
    sa_app.peak_search()
    if_freq, if_pwr = sa_app.marker_posn
    return if_pwr


def set_lo(lo_chan, f_lo, lo_pwr):
    """Set the frequency and level of the LO source channel.
    """
    lo_chan.plo_ctl.freq = float(f_lo)
    lo_chan.level = float(lo_pwr)
    lock_status = lo_chan.configure()
    return lock_status


def set_rf(rf_chan, f_rf, rf_pwr):
    """Set the frequency and level of the RF source channel.
    """
    rf_chan.plo_ctl.freq = float(f_rf)
    rf_chan.level = float(rf_pwr)
    lock_status = rf_chan.configure()
    return lock_status


def measure_insertion_loss(sa_app, rf_chan, rf_sa_chan, if_sa_chan, freq_rf, freq_if):
    """Measure mixer insertion loss for one specific RF frequency

    :param rf_chan: RFGen source channel for the RF signal
    :type rf_chan: PLOChan
    :param rf_sa_chan: The RPSA channel used to measure the RF monitor
        port signal.
    :type rf_sa_chan: int
    :param if_sa_chan: The RPSA channel used to measure the mixer IF signal.
    :type if_sa_chan: int
    """
    actual_rf_pwr = measure_rf(sa_app, rf_sa_chan, rf_chan, freq_rf)
    if_pwr = measure_if(sa_app, if_sa_chan, abs(freq_if))
    mixer_gain = if_pwr - actual_rf_pwr
    return mixer_gain


def main():
    default_rpsa_addr = "127.0.0.1"
    default_rpsa_port = 18865
    default_rfgen_addr = "127.0.0.1"
    default_rfgen_port = 18864
    default_rf_freq = [200.0, 3950.0, 100.0]
    default_if_freq = 30.0
    default_lo_pwr = 1.0
    default_cal_file = f"{os.environ['RFBLOCKS_REFDESIGNS']}/PowerMeter/app/cal-files/24dB-atten-48-0007-Det3-100-6000MHz.json"

    parser = ArgumentParser(description=
                            "Measure mixer insertion loss or feedthrough",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpsa_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpsa_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})")
    parser.add_argument(
        "-A", "--rfgen_addr",
        default=default_rfgen_addr,
        help=f"IP address for the RFGen RPyC service ({default_rfgen_addr})")
    parser.add_argument(
        "-P", "--rfgen_port",
        default=default_rfgen_port, type=int,
        help=f"TCP port for the RFGen RPyC service ({default_rfgen_port})")
    parser.add_argument(
        "-R", "--rf_freq", nargs=3, type=float,
        default=default_rf_freq,
        help=textwrap.dedent(f'''\
        Mixer RF freq. range to cover. This is a triplet of float
        numbers in the format: <start> <stop> <step>.
        These values are passed to numpy.arange in order to generate
        an array of RF freq. values.
        Default: {default_rf_freq}'''))
    parser.add_argument(
        "-I", "--if_freq", type=float,
        default=default_if_freq,
        help=f"The Mixer IF freq. to use ({default_if_freq})")
    parser.add_argument(
        "-L", "--lo_pwr", type=float,
        default=default_lo_pwr,
        help=f"The LO power level to use, in dBm. ({default_lo_pwr})")
    parser.add_argument(
        "--upconv", action='store_true',
        help="Measurements using the up conversion test setup"
    )
    parser.add_argument(
        "--smhu_lo", action='store_true',
        help="Use SMHU signal gen. to produce the LO signal"
    )

    args = parser.parse_args()

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app
    rfgen = rpyc.connect(args.rfgen_addr, args.rfgen_port)
    rfgen_app = rfgen.root
    rfgen_app.initialize()

    rf_pwr = -10.0
    lo_chan_id = 'Chan 1'
    rf_chan_id = 'Chan 2'
    lo_chan = rfgen_app.channels[lo_chan_id]
    rf_chan = rfgen_app.channels[rf_chan_id]
    rf_sa_chan = 0  # RPSA 'Chan 1'
    if_sa_chan = 0  # RPSA 'Chan 2'

    if args.smhu_lo is True:
        smhu_mgr = InstrumentMgr()
        smhu = smhu_mgr.open_instrument('SMHU58', SMHU58_GPIBID)

    settings = read_loss_settings('loss-settings.txt')
    for tone_str in ['3000', '3049', '3105', '3150', '3200', '3255', '3300',
                     '3355', '3400', '3450', '3500', '3550', '3600', '3650',
                     '3700', '3755', '3800', '3850', '3900', '3950']:
                     # '3105', '3155', '3200', '3250', '3300', '3350', '3400',
                     # '3450', '3500', '3550', '3600', '3650', '3700', '3750',
                     # '3800', '3850', '3900', '3950']:
                     # '1000', '1050', '1100', '1150', '1200', '1250', '1300', '1350',
                     # '1400', '1450', '1500', '1550', '1600', '1650', '1700', '1750',
                     # '1800', '1850', '1900', '1950', '2000']:
                     # '2050', '2100', '2150', '2200', '2250', '2300', '2350', '2400',
                     # '2450', '2500', '2550', '2600', '2650', '2700', '2750', '2800',
                     # '2850', '2900', '2950', '3000']:
                     # '1000', '1050', '1100', '1150', '1200', '1250', '1300', '1350',
                     # '1400', '1450', '1500', '1550', '1600', '1650', '1700', '1750',
                     # '1800', '1850', '1900', '1950', '2000']:
                     # '200', '210', '220', '230', '240', '250', '260', '270',
                     # '280', '290', '300', '350', '400', '450', '500', '550',
                     # '600', '650', '700', '750', '800', '850', '900', '950',
                     # '2900', '2950', '3000',
                     # '3105', '3155', '3200', '3250', '3300', '3350', '3400',
                     # '3450', '3500', '3550', '3600', '3650', '3700', '3750',
                     # '3800', '3850', '3900', '3950']:
        rf_freq, lo_freq, lo_pwrs = settings[tone_str]
        if_freq = abs(rf_freq - lo_freq)
        if args.upconv:
            for lo_pwr in lo_pwrs:
                if args.smhu_lo is True:
                    smhu.output = True
                    smhu.freq = float(lo_freq)
                    smhu.level = float(lo_pwr)
                    time.sleep(0.25)
                    freq, pwr = lo_freq, lo_pwr
                else:
                    if set_lo(lo_chan, lo_freq, lo_pwr) is not True:
                        print(f"Warning: Can't lock LO source for freq {lo_freq:.2f}")
                if_pwr = measure_if(sa_app, if_sa_chan, rf_freq)
                mixer_gain = if_pwr - rf_pwr
                print(f"{rf_freq:.2f}:{mixer_gain:.2f} {lo_pwr:.2f}")
        else:
            if set_rf(rf_chan, rf_freq, rf_pwr) is not True:
                print(f"Warning: Can't lock RF source for freq {rf_freq:.2f}")
            for lo_pwr in lo_pwrs:
                if args.smhu_lo is True:
                    smhu.output = True
                    smhu.freq = float(lo_freq)
                    smhu.level = float(lo_pwr)
                    time.sleep(0.25)
                    freq, pwr = lo_freq, lo_pwr
                else:
                    if set_lo(lo_chan, lo_freq, lo_pwr) is not True:
                        print(f"Warning: Can't lock LO source for freq {lo_freq:.2f}")
                mixer_gain = measure_insertion_loss(
                    sa_app, rf_chan,
                    rf_sa_chan, if_sa_chan, rf_freq, if_freq)
                print(f"{rf_freq:.2f}:{mixer_gain:.2f} {lo_pwr:.2f}")
        print("----------")

    if args.smhu_lo is True:
        smhu_mgr.close_instrument(smhu)
        smhu_mgr.close()


if __name__ == '__main__':
    main()

Measure mixer feedthrough

By default the test hardware is assumed to be set up as follows:

  • LO signal is taken from channel 2 of the RFGen signal generator.

  • Mixer RF (or IF) output is connected to the channel 1 RF input of the direct down converter.

  • RFGen channel 2 monitor output (the LO signal monitoring port) is connected to the channel 2 RF input of the direct down converter.

  • Direct down converter channel 1 IF output is connected to ADC1 input of the dual DAC/ADC.

  • Direct down converter channel 2 IF output is connected to ADC2 input of the dual DAC/ADC.

import sys
import rpyc
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import numpy as np

from rfblocks import BridgeCoupler


def read_lo_settings(lo_settings_file):

    with open(lo_settings_file) as fd:
        lines = fd.readlines()

    settings = {}
    for line in lines:
        l = line.strip()
        if len(l) == 0 or l[0] == '#':
            continue
        lo_freq, lo_pwr = line.split()
        settings[lo_freq] = float(lo_pwr)
    return settings


def measure_rf(sa_app, rf_sa_chan, f_rf):
    sa_app.channel = rf_sa_chan
    sa_app.centre_freq = f_rf
    sa_app.freq_span = 1.0
    sa_app.trace_type_normal()
    sa_app.sweep()
    sa_app.trace_type_avg()
    sa_app.sweeps(5)
    sa_app.marker_normal()
    sa_app.peak_search()
    rf_freq, rf_pwr = sa_app.marker_posn
    return rf_pwr


def measure_lo(sa_app, lo_sa_chan, coupler, freq_lo):
    sa_app.channel = lo_sa_chan
    sa_app.centre_freq = freq_lo
    sa_app.freq_span = 1.0
    sa_app.trace_type_normal()
    sa_app.sweep()
    sa_app.trace_type_avg()
    sa_app.sweeps(5)
    sa_app.marker_normal()
    sa_app.peak_search()
    lo_freq, lo_pwr = sa_app.marker_posn
    actual_lo_pwr = lo_pwr - coupler.coupling(lo_freq) \
        + coupler.insertion_loss(lo_freq)
    return actual_lo_pwr


def set_lo(lo_chan, f_lo, lo_pwr):
    lo_chan.plo_ctl.freq = float(f_lo)
    lo_chan.level = lo_pwr
    lock_status = lo_chan.configure()
    return lock_status


def measure_feedthrough(sa_app, rfgen_app, lo_settings_file):

    lo_chan = rfgen_app.channels['Chan 2']
    rf_sa_chan = 0
    lo_sa_chan = 1

    coupler = BridgeCoupler()

    lo_settings = read_lo_settings(lo_settings_file)
    for lo_freq_str, lo_pwr in lo_settings.items():
        lo_freq = float(lo_freq_str)
        lock_status = set_lo(lo_chan, lo_freq, lo_pwr)
        actual_lo_pwr = measure_lo(sa_app, lo_sa_chan, coupler, lo_freq)
        actual_rf_pw = measure_rf(sa_app, rf_sa_chan, lo_freq)
        feedthru = abs(actual_rf_pw) + actual_lo_pwr
        print(f"{lo_freq_str} {feedthru:.2f} {actual_lo_pwr:.2f}")


def main():
    default_rpsa_addr = "127.0.0.1"
    default_rpsa_port = 18865
    default_rfgen_addr = "127.0.0.1"
    default_rfgen_port = 18864
    default_lo_settings = 'lo_settings.txt'

    parser = ArgumentParser(description=
                            "Measure mixer feedthrough",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpsa_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpsa_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})")
    parser.add_argument(
        "-A", "--rfgen_addr",
        default=default_rfgen_addr,
        help=f"IP address for the RFGen RPyC service ({default_rfgen_addr})")
    parser.add_argument(
        "-P", "--rfgen_port",
        default=default_rfgen_port, type=int,
        help=f"TCP port for the RFGen RPyC service ({default_rfgen_port})")
    parser.add_argument(
        "-L", "--lo_settings",
        default=default_lo_settings,
        help=textwrap.dedent(f'''\
        A file containing the settings for the LO signal.
        The file must have pairs of values on each line with the first
        being the LO freq. (in MHz) and the second being the LO power (in dBm).
        Default: {default_lo_settings}'''))

    args = parser.parse_args()

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app
    rfgen = rpyc.connect(args.rfgen_addr, args.rfgen_port)
    rfgen_app = rfgen.root
    measure_feedthrough(sa_app, rfgen_app, args.lo_settings)


if __name__ == '__main__':
    main()

Measure mixer modulation products

import sys
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import rpyc


FREQ_DIFF = 0.02

def freq_close_to(f1, f2):
    """Returns True if f1 and f2 are within FREQ_DIFF Mhz.
    """
    if abs(f1-f2) < FREQ_DIFF:
        return True
    else:
        return False


def measure_power(sa_app, sa_chan, freq):
    sa_app.channel = sa_chan
    sa_app.centre_freq = freq
    sa_app.freq_span = 0.5
    sa_app.trace_type_normal()
    sa_app.sweep()
    sa_app.trace_type_avg()
    sa_app.sweeps(10)
    sa_app.marker_normal()
    sa_app.peak_search()
    rf_freq, rf_pwr = sa_app.marker_posn
    return rf_freq, rf_pwr


def measure_products(sa_app, sa_chan, f1, f2, order):
    f = []
    f.append([n*f1 for n in range(1, order+1)])
    for m in range(1,order+1):
        f.append([abs(freq+(m*f2)) for freq in f[0]])
    p = []
    meas_f = []
    for n in range(order+1):
        p.append([])
        meas_f.append([])
        for m in range(order):
            p[n].append(0.0)
            meas_f[n].append(0.0)

    for n in range(order+1):
        for m in range(order):
            freq = f[n][m]
            meas_f[n][m], p[n][m] = measure_power(sa_app, sa_chan, freq)

    return f, meas_f, p


def main():
    default_rpsa_addr = "127.0.0.1"
    default_rpsa_port = 18865
    default_f1 = 500.0
    default_f2 = 30.0
    default_order = 5
    default_chan = "2"

    parser = ArgumentParser(description=
                            "Measure mixer modulation products",
                            formatter_class=RawTextHelpFormatter)

    parser.add_argument(
        "-a", "--saaddr",
        default=default_rpsa_addr,
        help=f"IP address for the RPSA RPyC service ({default_rpsa_addr})")
    parser.add_argument(
        "-p", "--saport",
        default=default_rpsa_port, type=int,
        help=f"TCP port for the RPSA RPyC service ({default_rpsa_port})")
    parser.add_argument(
        "-C", "--channel", choices=["1", "2"],
        default=default_chan,
        help=textwrap.dedent(f'''\
        The target channel for the measurement (default: {default_chan}).
        '''))
    parser.add_argument(
        "--flo", type=float,
        default=default_f1,
        help=textwrap.dedent(f'''\
        The mixer LO frequency in MHz (default: {default_f1}).
        '''))
    parser.add_argument(
        "--fsignal", type=float,
        default=default_f2,
        help=textwrap.dedent(f'''\
        The mixer signal frequency in MHz (default: {default_f2}).
        '''))
    parser.add_argument(
        "--order", type=int,
        default=default_order,
        help=textwrap.dedent(f'''\
        Measure modulation products up to this order (default: {default_order}).
        '''))

    args = parser.parse_args()

    sa_chan = 0
    if args.channel is not None:
        sa_chan = int(args.channel) - 1

    sa = rpyc.connect(args.saaddr, args.saport)
    sa_app = sa.root.sa_app
    (f, measured_f, p) = measure_products(
        sa_app, sa_chan, args.flo, args.fsignal, args.order)
    for n in range(args.order+1):
        for m in range(args.order):
            print(f"{f[n][m]:.1f} ", end='')
        print()
    print()
    for n in range(args.order+1):
        for m in range(args.order):
            if freq_close_to(f[n][m], measured_f[n][m]):
                print(f"{p[n][m]:.2f} ", end='')
            else:
                print("* ", end='')
        print()


if __name__ == '__main__':
    main()

Measure SA IP3

import time
import textwrap
from argparse import ArgumentParser, RawTextHelpFormatter
import numpy as np
import rpyc

from ip3 import measure_ip3

TONE_SEPARATION = 2.0


def set_rf(rf_chan, f_rf, rf_pwr):
    rf_chan.plo_ctl.freq = float(f_rf)
    rf_chan.level = rf_pwr
    lock_status = rf_chan.configure()
    return lock_status


def read_ip3_settings(settings_file):
    with open(settings_file) as fd:
        lines = fd.readlines()

    settings = {}
    for line in lines:
        l = line.strip()
        if len(l) == 0 or l[0] == '#':
            continue
        tone_freq_str, tone1_pwr, tone2_pwr, lo_freq = line.split()
        tone_freq = float(tone_freq_str)
        settings[tone_freq_str] = ((tone_freq, float(tone1_pwr)),
                                   (tone_freq+TONE_SEPARATION, float(tone2_pwr)),
                                   float(lo_freq))
    return settings


def main():

    default_im3_span = 0.1
    default_lo_offset = 12.5
    default_im3_lo_offset = 15.5

    parser = ArgumentParser(description=
                            "Measure mixer IMD and IP3",
                            formatter_class=RawTextHelpFormatter)
    parser.add_argument(
        "--im3_span", type=float, default=default_im3_span,
        help=f"SA frequency span when measuring IM3 amplitudes (Default: {default_im3_span} in MHz)")
    parser.add_argument(
        "--lo_offset", type=float,
        help=textwrap.dedent(f'''\
        SA LO offset (in MHz) when using a down converter RF frontend
        (Default: {default_lo_offset})'''))
    parser.add_argument(
        "--im3_lo_offset", type=float,
        help=textwrap.dedent(f'''\
        SA LO offset (in MHz) when measuring IM3 amplitudes using
        a down converter RF frontend. (Default: {default_im3_lo_offset})'''))

    args = parser.parse_args()

    sa = rpyc.connect("127.0.0.1", 18865)
    sa_app = sa.root.sa_app
    rfgen = rpyc.connect('127.0.0.1', 18864)
    lowspur_rfgen = rpyc.connect('127.0.0.1', 18867)
    rfgen_app = rfgen.root

    rfgen_chan = rfgen_app.channels['Chan 1']
    sa_tone_chan = 0
    lowspur_chan = lowspur_rfgen.root.plo_chan

    settings = read_ip3_settings('ip3-sa-settings.txt')

    for tone_str in ['850']:
        tone1, tone2, lo_freq = settings[tone_str]
        lock_status = set_rf(rfgen_chan, tone1[0], tone1[1])
        lock_status = set_rf(lowspur_chan, tone2[0], tone2[1])
        sa_app.channel = sa_tone_chan

        for fe_atten in [0, 5, 7, 10, 12, 15, 17, 20, 25]:
            im3_str = measure_ip3(sa, sa_app,
                                  tone1[0], tone2[0], tone_span=None,
                                  im3_span=args.im3_span,
                                  fe_atten=fe_atten,
                                  lo_offset=args.lo_offset,
                                  im3_lo_offset=args.im3_lo_offset,
                                  lo_freq=None)
            print(f"{im3_str}")
        print("----------")


if __name__ == '__main__':
    main()

Known Issues