Open RF Prototyping

Red Pitaya Super Heterodyne Front End

Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.

The Script

"""Package for the sh.py application."""

__app_name__ = 'rpsh'
__version__ = '0.1.0'
"""rprf application entry point."""

import rpsh


if __name__ == '__main__':
    rpsh.main()
# Generated from rp-rf.org
#

<<imports>>


<<rpsh-service>>


<<main-func>>

The main Function

The asyncio framework is used to coordinate communications over the serial device without blocking user interface events. For this purpose we make use of QEventLoop and QThreadExecutor from the qasync package.

The event loop is equipped with an instance of QThreadExecutor which executes a single thread. The idea here is to ensure that requests to the ECApp serial device will execute one by one in a manner which will not block the operation of the Qt event loop.

def main():

    <<process-cmdline-args>>

    # If specified, read HW config from file
    hw_config = None
    if args.sh_config is not None:
        try:
            with open(args.sh_config) as fd:
                hw_config = json.load(fd)
        except FileNotFoundError:
            print(f'No such file "{args.sh_config}" for super het HW config.')
            print('Using default super het HW config. instead.')

    # This ensures that Cntl-C will work as expected:
    signal.signal(signal.SIGINT, signal.SIG_DFL)

    app = QCoreApplication(sys.argv)
    loop = QEventLoop(app)
    loop.set_default_executor(QThreadExecutor(1))
    asyncio.set_event_loop(loop)

    if hw_config is not None:
        sh_app = SuperHetApp(
            hw_config=hw_config['hw_config'],
            app_config=hw_config['app_config']
        )
    else:
        sh_app = SuperHetApp()

    if hw_config is not None and 'capabilities' in hw_config:
        front_end = SuperHetService(
            sh_app,
            serial_device=args.device,
            baudrate=args.baudrate,
            capabilities=hw_config['capabilities']
        )
    else:
        front_end = SuperHetService(
            sh_app,
            serial_device=args.device,
            baudrate=args.baudrate
        )

    server_thread = QThread()
    server = RPyCServer(
        front_end,
        args.ipaddr,
        args.port)
    server.moveToThread(server_thread)
    server_thread.started.connect(server.run)
    server.finished.connect(server_thread.quit)
    server.finished.connect(server.deleteLater)
    server_thread.finished.connect(server_thread.deleteLater)
    server_thread.start()

    try:
        loop.set_debug(True)
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    loop.close()

Process command line arguments

defaultBaud = 0

parser = ArgumentParser(description="Red Pitaya RF Front End")

parser.add_argument("-d", "--device", default=None, required=True,
                    help="The front end hardware serial device")
parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int,
                    help="Baud rate (default: {})".format(defaultBaud))
parser.add_argument("-A", "--ipaddr", default="127.0.0.1",
                    help="IP address for the RPyC server instance")
parser.add_argument("-P", "--port", default=18866, type=int,
                    help="TCP port for the RPyC server instance")
parser.add_argument("-F", "--sh_config", default=None,
                    help="Frontend hardware configuration.")
args = parser.parse_args()

ADC Channel Class

from typing import (
    List, Dict
)
import serial

from rfblocks import (
    pe43711, PE43711Controller
)

from firstlo import FirstLO_Synth
from secondlo import SecondLO_Synth


class ADCChan(object):

    def __init__(self,
                 chan_id: str,
                 hw_config: Dict,
                 chan_config: Dict) -> None:
        """
        """
        self._chan_id: str = chan_id
        self._freq_resp_cal_data: Dict[float, float] = {}
        self._noise_floor_cal_data: Dict[float, float] = {}

        self._lo1 = FirstLO_Synth(
            chan_id, hw_config['lo1'], chan_config['lo1'])
        self._lo2 = SecondLO_Synth(
            chan_id, hw_config['lo2'], chan_config['lo2'])
        self._stepatten_ctl = PE43711Controller(
            f'stepatten_{chan_id[-1]}',
            pe43711(**hw_config['stepatten']))
        self._stepatten_ctl.attenuation = chan_config['stepatten']['atten']

    @property
    def stepatten_ctl(self) -> PE43711Controller:
        """Return a reference to the channel's front end step attenuator
        controller.
        """
        return self._stepatten_ctl

    @property
    def lo1_synth(self) -> FirstLO_Synth:
        """Return a reference to the channel's LO1 synthesizer
        instance.
        """
        return self._lo1

    @property
    def lo2_synth(self) -> SecondLO_Synth:
        """Return a reference to the channel's LO2 synthesizer
        instance.
        """
        return self._lo2

    @property
    def freq_resp_cal_data(self) -> Dict[float, float]:
        return self._freq_resp_cal_data

    @freq_resp_cal_data.setter
    def freq_resp_cal_data(self, fr: Dict[float, float]) -> None:
        self._freq_resp_cal_data = dict(fr)

    @property
    def noise_floor_cal_data(self) -> Dict[float, float]:
        return self._noise_floor_cal_data

    @noise_floor_cal_data.setter
    def noise_floor_cal_data(self, nf: Dict[float, float]) -> None:
        self._noise_floor_cal_data = dict(nf)

    def initialize_hw(self, ser: serial.Serial) -> None:
        """Initialize the channel hardware.
        """
        self.stepatten_ctl.initialize(ser)
        self.lo1_synth.initialize(ser)
        self.lo2_synth.initialize(ser)

    def initialize(self, ser: serial.Serial):
        self.initialize_hw(ser)
        self.stepatten_ctl.configure(ser)

    def configure_stepatten(self, ser, att) -> None:
        self.stepatten_ctl.attenuation = att
        self.stepatten_ctl.configure(ser)

    def configure_lo1_freq(self, ser, freq) -> None:
        self.lo1_synth.freq = freq
        self.lo1_synth.configure_freq(ser)

    def configure_lo2_freq(self, ser, freq) -> None:
        self.lo2_synth.freq = freq
        self.lo2_synth.configure_freq(ser)

App Class

It's possible to implement different types of front end depending on the hardware. Initially, the front end type will be frequency conversion (more properly, frequency down conversion). This may be done using direct conversion via a single LO and mixer stage or using a super heterodyne approach. Base band must be somewhere within the ADC sampling range which is approximately 0.5 to 55 MHz.

from typing import (
    List, Dict
)
from rfblocks import (
    hmc833, pe42420
)

from adcchan import ADCChan


class SuperHetApp:

    <<superhet-config>>

    NOISE_FLOOR: Dict = {
        'channels': {
            ADCCH1_NAME: {
                400000: (-91.238, 5.062), 200000: (-97.886, 5.586),
                100000: (-100.887, 5.566), 50000: (-104.222, 5.531),
                20000: (-106.397, 5.562), 10000: (-110.599, 5.494),
                5000: (-114.466, 5.605), 2000: (-118.074, 5.581),
                1000: (-120.604, 5.547), 500: (-123.662, 5.647),
                200: (-127.920, 5.609), 100: (-130.131, 5.466), 50: (-134.579, 5.560),
                20: (-137.322, 5.585), 10: (-140.600, 5.524), 5: (-144.054, 5.567),
                2: (-148.833, 5.588), 1: (-149.271, 5.558)
            }
        }
    }

    FREQ_RESPONSE: Dict = {
        'channels': {
            ADCCH1_NAME: {
                4.9: 2.083,
                54.90: 2.083, 104.90: 1.571, 154.90: 1.569, 204.90: 1.587,
                254.90: 1.473, 304.90: 1.568, 354.90: 1.499, 404.90: 1.558,
                454.90: 1.667, 504.90: 1.604, 554.90: 1.596, 604.90: 1.629,
                654.90: 1.647, 704.90: 1.718, 754.90: 1.767, 804.90: 1.729,
                854.90: 1.575, 904.90: 1.653, 954.90: 1.614, 1004.90: 1.723,
                1054.90: 1.863, 1104.90: 1.686, 1154.90: 1.743, 1204.90: 1.777,
                1254.90: 1.724, 1304.90: 1.804, 1354.90: 1.905, 1404.90: 1.826,
                1454.90: 1.865, 1504.90: 1.898, 1554.90: 2.013, 1604.90: 2.069,
                1654.90: 1.991, 1704.90: 2.091, 1754.90: 2.122, 1804.91: 2.275,
                1854.91: 2.407, 1904.91: 2.286, 1954.91: 2.005, 2004.91: 2.128,
                2054.91: 2.316, 2104.91: 2.654, 2154.91: 2.628, 2204.91: 2.457,
                2254.91: 2.424, 2304.91: 2.471, 2354.91: 2.619, 2404.91: 2.668,
                2454.91: 2.936, 2504.91: 2.841, 2554.91: 2.912, 2604.91: 3.341,
                2654.91: 3.317, 2704.91: 3.565, 2754.91: 3.584, 2804.91: 3.355,
                2854.91: 3.127, 2904.91: 2.965, 2954.91: 2.903, 3004.91: 2.897,
                3054.91: 3.119, 3104.91: 3.231, 3154.91: 3.020, 3204.91: 2.970,
                3254.91: 3.002, 3304.91: 3.280, 3354.91: 3.434, 3404.91: 3.331,
                3454.91: 3.277, 3504.91: 3.489, 3554.91: 4.341, 3604.91: 4.161,
                3654.91: 4.148, 3704.91: 4.320, 3754.91: 5.010, 3804.91: 5.177,
                3854.91: 4.701, 3904.91: 5.072, 3954.91: 5.529, 4004.91: 6.070,
                4054.91: 6.081
            }
        }
    }

    def __init__(self,
                 hw_config: Dict = SUPERHET_APP_HWCONF,
                 app_config: Dict = SUPERHET_APP_CONFIG) -> None:

        d: Dict = app_config['channels']
        z = zip(d.keys(), hw_config, d.values())
        self._channels: Dict = {
            chan_id: ADCChan(chan_id, hw_conf, chan_config)
            for chan_id, hw_conf, chan_config in z}
        for chan_id, nf in SuperHetApp.NOISE_FLOOR['channels'].items():
            self._channels[chan_id].noise_floor_cal_data = nf
        for chan_id, fr in SuperHetApp.FREQ_RESPONSE['channels'].items():
            self._channels[chan_id].freq_resp_cal_data = fr

    <<initialize>>

    @property
    def channels(self):
        return self._channels

    def configure_lo_freq(self, ser, chan_id, lo_freq):
        lo1_freq = SuperHetApp.SUPERHET_BANDPASS_CENTRE + lo_freq
        self._channels[chan_id].configure_lo1_freq(ser, lo1_freq)

Hardware configuration

ADCCH1_NAME: str = 'Ch 1'
ADCCH2_NAME: str = 'Ch 2'

FIRSTLO_CH1_HWCONF: Dict = {
    'firstlo_plo': {'sen': 'D0', 'ld_sdo': 'C4', 'ref': 'D2'},
    'firstlo_sw': {'c1': 'D4', 'c2': 'D5'}
}
SECONDLO_CH1_HWCONF: Dict = {
    'sen': 'D1', 'ld_sdo': 'C5', 'ref': 'D2'
}
STEPATTEN_CH1_HWCONF: Dict = {'le': 'D3'}

# The channel hardware is configured to match the hardware
# currently in place.
SUPERHET_CH1_HWCONF: Dict = {
    'lo1': FIRSTLO_CH1_HWCONF,
    'lo2': SECONDLO_CH1_HWCONF,
    'stepatten': STEPATTEN_CH1_HWCONF
}

SUPERHET_APP_HWCONF: List = [SUPERHET_CH1_HWCONF]

SUPERHET_DEFAULT_FREQ = 1000.0
SUPERHET_BANDPASS_CENTRE = 1575.0
SUPERHET_ADC_CENTRE_FREQ = 35.0
SUPERHET_DEFAULT_ATTEN = 10.0

SUPERHET_LO1_FREQ: float = SUPERHET_BANDPASS_CENTRE + SUPERHET_DEFAULT_FREQ
SUPERHET_LO2_FREQ: float = SUPERHET_BANDPASS_CENTRE + SUPERHET_ADC_CENTRE_FREQ

SUPERHET_CHAN_CONFIG: Dict = {
    'lo1': {
        'freq': SUPERHET_LO1_FREQ,
        'ref_freq': hmc833.DEFAULT_REF_FREQ,
        'refsrc': hmc833.DEFAULT_REFSRC,
        'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB,
        'vco_mute': False,
        'initial_sw_state': pe42420.State.RF1
    },
    'lo2': {
        'freq': SUPERHET_LO2_FREQ,
        'ref_freq': hmc833.DEFAULT_REF_FREQ,
        'refsrc': hmc833.DEFAULT_REFSRC,
        'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB,
        'vco_mute': False
    },
    'stepatten': {
        'atten': SUPERHET_DEFAULT_ATTEN
    }
}

SUPERHET_APP_CONFIG: Dict = {
    'channels': {
        ADCCH1_NAME: SUPERHET_CHAN_CONFIG
    }
}
def initialize_hw(self, ser) -> None:
    pass

def initialize(self, ser) -> None:
    for chan in self._channels.values():
        chan.initialize(ser)

def cleanup(self) -> None:
    pass

The SuperHet Service

MIN_ATT = 0.0
MAX_ATT = 31.75
ATT_STEP = 0.25


class SuperHetService(rpyc.Service):

    DEFAULT_CAPABILITIES = {
        "channels": [SuperHetApp.ADCCH1_NAME],
        SuperHetApp.ADCCH1_NAME: {
            "adcdma_channel": 0,
            "adc_attenuation": {
                "min": RP_ADC_MIN_ATT,
                "max": RP_ADC_MAX_ATT,
                "step": RP_ADC_ATT_STEP
            },
            "fe_attenuation": {
                "min": MIN_ATT,
                "max": MAX_ATT,
                "step": ATT_STEP
            },
            "freq": {
                "min": 5.0,
                "max": 1500.0,
                "step": 0.0
            },
            "cal_freq": {
                "min": 3.0,
                "max": 1510.0,
                "step": 50.0
            },
            "reflevel": {
                "min": RP_ADC_MIN_REF_LEVEL,
                "max": RP_ADC_MAX_REF_LEVEL,
                "step": 1.0
            },
            "min_span": RP_ADC_MIN_SPAN,
            "chan_type": ChannelCapabilities.SUPERHET,
            "bandwidth": 20.0,
            "adc_centre_freq": 35.0
        }
    }

    def __init__(self,
                 app,
                 serial_device: str,
                 baudrate: int = 0,
                 capabilities: Dict = DEFAULT_CAPABILITIES) -> None:
        super().__init__()
        self._app = app
        self._ctl_device: str = serial_device
        self._baudrate: int = baudrate
        if baudrate == 0:
            self._baudrate = DEFAULT_BAUDRATE
        self._capabilities = {
            'channels': capabilities['channels']
        }
        for chan_name in capabilities['channels']:
            self._capabilities[chan_name] = ChannelCapabilities.from_dict(
                capabilities[chan_name])
        self._ser = None
        self._client_count = 0

    def on_connect(self, conn):
        if self._ser is None:
            self._ser = create_serial(self._ctl_device,
                                      self._baudrate)
            self.initialize()
        self._client_count += 1

    def on_disconnect(self, conn):
        self._client_count -= 1
        if self._client_count == 0:
            self._app.cleanup()
            self._ser.close()
            self._ser = None

    @property
    def capabilities(self):
        return self._capabilities

    @property
    def adc_channels(self):
        return self._app.channels

    def initialize(self):
        self._app.initialize(self._ser)

    def configure_stepatten(self, chan_id, att):
        ch = self.adc_channels[chan_id]
        ch.configure_stepatten(self._ser, att)

    def configure_lo1(self, chan_id):
        ch = self.adc_channels[chan_id]
        ch.configure_lo1(self._ser)

    def configure_lo1_freq(self, chan_id, lo_freq):
        ch = self.adc_channels[chan_id]
        ch.configure_lo1_freq(self._ser, lo_freq)

    def configure_lo2(self, chan_id):
        ch = self.adc_channels[chan_id]
        ch.configure_lo2(self._ser)

    def configure_lo2_freq(self, chan_id, lo_freq):
        ch = self.adc_channels[chan_id]
        ch.configure_lo2_freq(self._ser, lo_freq)

    def configure_lo_freq(self, chan_id, lo_freq):
        self._app.configure_lo_freq(self._ser, chan_id, lo_freq)

    @classmethod
    def save_config(cls):
        print(FrontEndApp.DEFAULT_APP_CONFIG)


class RPyCServer(QObject):

    finished = pyqtSignal()

    def __init__(self, serviceInst, host, port):
        super().__init__()
        self._serviceInst = serviceInst
        self._host = host
        self._port = port

    def run(self):
        print("RPRF rpyc service on {}:{}".format(self._host, self._port))
        self._server = ThreadedServer(
            self._serviceInst,
            hostname = self._host,
            port = self._port,
            auto_register = False,
            protocol_config = {
                'allow_all_attrs': True,
                'allow_setattr': True,
                'allow_pickle': True})
        self._server.start()
        self.finished.emit()

First LO Controller

from typing import (
    Dict
)
import serial

import numpy as np

from rfblocks import (
    pe42420, PE42420Controller, hmc833, HMC833Controller)


class FirstLO_Synth:

    GAIN_TABLE = {
        25.0: hmc833.OutputBufferGain.MAXGAIN,
        50.0: hmc833.OutputBufferGain.MAXGAIN,
        100.0: hmc833.OutputBufferGain.MAXGAIN,
        200.0: hmc833.OutputBufferGain.MAXGAIN,
        300.0: hmc833.OutputBufferGain.MAXGAIN,
        400.0: hmc833.OutputBufferGain.MAXGAIN,
        500.0: hmc833.OutputBufferGain.MAXGAIN,
        600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        900.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        1000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        1100.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        1200.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        1300.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        1400.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        1500.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        1600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        1700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        1800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        1900.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        2000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        2100.0: hmc833.OutputBufferGain.MAXGAIN,
        2200.0: hmc833.OutputBufferGain.MAXGAIN,
        2300.0: hmc833.OutputBufferGain.MAXGAIN,
        2400.0: hmc833.OutputBufferGain.MAXGAIN,
        2500.0: hmc833.OutputBufferGain.MAXGAIN,
        2600.0: hmc833.OutputBufferGain.MAXGAIN,
        2700.0: hmc833.OutputBufferGain.MAXGAIN,
        2800.0: hmc833.OutputBufferGain.MAXGAIN,
        2900.0: hmc833.OutputBufferGain.MAXGAIN,
        3000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        3100.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        3200.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        3300.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        3400.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        3500.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        3600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        3700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB,
        3800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        3900.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4100.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4200.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4300.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4400.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4500.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB,
        4800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB,
        4900.0: hmc833.OutputBufferGain.MAXGAIN,
        5000.0: hmc833.OutputBufferGain.MAXGAIN,
        5100.0: hmc833.OutputBufferGain.MAXGAIN,
        5200.0: hmc833.OutputBufferGain.MAXGAIN,
        5300.0: hmc833.OutputBufferGain.MAXGAIN,
        5400.0: hmc833.OutputBufferGain.MAXGAIN,
        5500.0: hmc833.OutputBufferGain.MAXGAIN,
        5600.0: hmc833.OutputBufferGain.MAXGAIN,
        5700.0: hmc833.OutputBufferGain.MAXGAIN,
        5800.0: hmc833.OutputBufferGain.MAXGAIN,
        5900.0: hmc833.OutputBufferGain.MAXGAIN,
        6000.0: hmc833.OutputBufferGain.MAXGAIN,
    }

    def __init__(self,
                 chan_id: str,
                 synth_hw: Dict,
                 synth_config: Dict) -> None:

        self._chan_id = chan_id
        self._synth = hmc833(**synth_hw['firstlo_plo'])
        self._sw = pe42420(**synth_hw['firstlo_sw'])

        self._synth_ctl = HMC833Controller(
            f'{chan_id}_lo1',
            self._synth,
            synth_config)
        self._sw_ctl = PE42420Controller(
            f'{chan_id}_lo1_sw',
            self._sw,
            synth_config['initial_sw_state'])
        self._freq = synth_config['freq']

    @property
    def synth_ctl(self) -> HMC833Controller:
        return self._synth_ctl

    @property
    def sw_ctl(self) -> PE42420Controller:
        return self._sw_ctl

    def initialize(self, ser: serial.Serial) -> bool:
        self.synth_ctl.initialize(ser)
        self.sw_ctl.initialize(ser)
        self.sw_ctl.configure(ser)

    @property
    def freq(self) -> float:
        return self._freq

    @freq.setter
    def freq(self, f: float) -> None:
        self._freq = f

    def configure_freq(self, ser: serial.Serial) -> bool:
        def gain(f):
            for freq in FirstLO_Synth.GAIN_TABLE.keys():
                if freq > f:
                    break
            return FirstLO_Synth.GAIN_TABLE[freq]
        if self.freq >= 3000.0:
            self.sw_ctl.state = pe42420.State.RF2
        else:
            self.sw_ctl.state = pe42420.State.RF1
        self.sw_ctl.configure(ser)
        self.synth_ctl.buffer_gain = gain(self.freq)
        self.synth_ctl.freq = self.freq
        self.synth_ctl.configure(ser)

Second LO Controller

from typing import (
    Dict
)
import serial

from rfblocks import (
    hmc833, HMC833Controller)


class SecondLO_Synth:

    HMC833_SECONDLO_HWCONF: Dict = {'sen': 'D1', 'ld_sdo': 'C5', 'ref': 'D2'}

    DEFAULT_SECONDLO_FREQ = 1605.0

    DEFAULT_SYNTH_CONFIG: Dict = {
        'freq': DEFAULT_SECONDLO_FREQ,
        'ref_freq': 50.0,
        'refsrc': hmc833.ReferenceSource.INTERNAL,
        'bufgain': hmc833.OutputBufferGain.MAXGAIN,
        'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB,
        'vco_mute': False
    }

    def __init__(self,
                 chan_id: str,
                 synth_hw: Dict = HMC833_SECONDLO_HWCONF,
                 synth_config: Dict = DEFAULT_SYNTH_CONFIG) -> None:

        self._chan_id = chan_id
        self._synth = hmc833(**synth_hw)
        self._synth_ctl = HMC833Controller(f'{chan_id}_lo2',
                                           self._synth,
                                           synth_config)
        self._freq = synth_config['freq']

    @property
    def synth_ctl(self) -> HMC833Controller:
        return self._synth_ctl

    def initialize(self, ser: serial.Serial) -> bool:
        self.synth_ctl.initialize(ser)

    @property
    def freq(self) -> float:
        return self._freq

    @freq.setter
    def freq(self, f: float) -> None:
        self._freq = f

    def configure_freq(self, ser: serial.Serial) -> bool:
        self.synth_ctl.freq = self.freq
        self.synth_ctl.configure_freq(ser)

Imports

from typing import (
    Optional, List, Dict
)
import sys
from time import sleep
import signal
import json
from argparse import ArgumentParser
import asyncio
import serial

from rfblocks import (
    hmc833, HMC833Controller, pe42420, PE42420Controller,
    DEFAULT_BAUDRATE, create_serial, write_cmd
)
from tam import (
    RP_ADC_MIN_FREQUENCY, RP_ADC_MAX_FREQUENCY, RP_ADC_MIN_SPAN,
    RP_ADC_MAX_REF_LEVEL, RP_ADC_MIN_REF_LEVEL, RP_ADC_REF_LEVEL_STEP,
    RP_ADC_MIN_ATT, RP_ADC_MAX_ATT, RP_ADC_ATT_STEP,
    Capability, ChannelCapabilities
)
import rpyc
from rpyc.utils.server import ThreadedServer
from qasync import (
    QEventLoop, QThreadExecutor, asyncSlot, asyncClose
)
from PyQt5.QtCore import (
    Qt, QObject, QCoreApplication, QThread, QTimer,
    pyqtSignal, pyqtSlot
)
import numpy as np

from adcchan import ADCChan
from superhet import SuperHetApp