Red Pitaya Super Heterodyne Front End
Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.
The Script
The main
Function
The asyncio
framework is used to coordinate communications over the
serial device without blocking user interface events. For this
purpose we make use of QEventLoop
and QThreadExecutor
from the
qasync package.
The event loop is equipped with an instance of QThreadExecutor
which
executes a single thread. The idea here is to ensure that requests to
the ECApp serial device will execute one by one in a manner which will
not block the operation of the Qt event loop.
def main(): <<process-cmdline-args>> # If specified, read HW config from file hw_config = None if args.sh_config is not None: try: with open(args.sh_config) as fd: hw_config = json.load(fd) except FileNotFoundError: print(f'No such file "{args.sh_config}" for super het HW config.') print('Using default super het HW config. instead.') # This ensures that Cntl-C will work as expected: signal.signal(signal.SIGINT, signal.SIG_DFL) app = QCoreApplication(sys.argv) loop = QEventLoop(app) loop.set_default_executor(QThreadExecutor(1)) asyncio.set_event_loop(loop) if hw_config is not None: sh_app = SuperHetApp( hw_config=hw_config['hw_config'], app_config=hw_config['app_config'] ) else: sh_app = SuperHetApp() if hw_config is not None and 'capabilities' in hw_config: front_end = SuperHetService( sh_app, serial_device=args.device, baudrate=args.baudrate, capabilities=hw_config['capabilities'] ) else: front_end = SuperHetService( sh_app, serial_device=args.device, baudrate=args.baudrate ) server_thread = QThread() server = RPyCServer( front_end, args.ipaddr, args.port) server.moveToThread(server_thread) server_thread.started.connect(server.run) server.finished.connect(server_thread.quit) server.finished.connect(server.deleteLater) server_thread.finished.connect(server_thread.deleteLater) server_thread.start() try: loop.set_debug(True) loop.run_forever() except KeyboardInterrupt: pass loop.close()
Process command line arguments
defaultBaud = 0 parser = ArgumentParser(description="Red Pitaya RF Front End") parser.add_argument("-d", "--device", default=None, required=True, help="The front end hardware serial device") parser.add_argument("-b", "--baudrate", default=defaultBaud, type=int, help="Baud rate (default: {})".format(defaultBaud)) parser.add_argument("-A", "--ipaddr", default="127.0.0.1", help="IP address for the RPyC server instance") parser.add_argument("-P", "--port", default=18866, type=int, help="TCP port for the RPyC server instance") parser.add_argument("-F", "--sh_config", default=None, help="Frontend hardware configuration.") args = parser.parse_args()
ADC Channel Class
from typing import ( List, Dict ) import serial from rfblocks import ( pe43711, PE43711Controller ) from firstlo import FirstLO_Synth from secondlo import SecondLO_Synth class ADCChan(object): def __init__(self, chan_id: str, hw_config: Dict, chan_config: Dict) -> None: """ """ self._chan_id: str = chan_id self._freq_resp_cal_data: Dict[float, float] = {} self._noise_floor_cal_data: Dict[float, float] = {} self._lo1 = FirstLO_Synth( chan_id, hw_config['lo1'], chan_config['lo1']) self._lo2 = SecondLO_Synth( chan_id, hw_config['lo2'], chan_config['lo2']) self._stepatten_ctl = PE43711Controller( f'stepatten_{chan_id[-1]}', pe43711(**hw_config['stepatten'])) self._stepatten_ctl.attenuation = chan_config['stepatten']['atten'] @property def stepatten_ctl(self) -> PE43711Controller: """Return a reference to the channel's front end step attenuator controller. """ return self._stepatten_ctl @property def lo1_synth(self) -> FirstLO_Synth: """Return a reference to the channel's LO1 synthesizer instance. """ return self._lo1 @property def lo2_synth(self) -> SecondLO_Synth: """Return a reference to the channel's LO2 synthesizer instance. """ return self._lo2 @property def freq_resp_cal_data(self) -> Dict[float, float]: return self._freq_resp_cal_data @freq_resp_cal_data.setter def freq_resp_cal_data(self, fr: Dict[float, float]) -> None: self._freq_resp_cal_data = dict(fr) @property def noise_floor_cal_data(self) -> Dict[float, float]: return self._noise_floor_cal_data @noise_floor_cal_data.setter def noise_floor_cal_data(self, nf: Dict[float, float]) -> None: self._noise_floor_cal_data = dict(nf) def initialize_hw(self, ser: serial.Serial) -> None: """Initialize the channel hardware. """ self.stepatten_ctl.initialize(ser) self.lo1_synth.initialize(ser) self.lo2_synth.initialize(ser) def initialize(self, ser: serial.Serial): self.initialize_hw(ser) self.stepatten_ctl.configure(ser) def configure_stepatten(self, ser, att) -> None: self.stepatten_ctl.attenuation = att self.stepatten_ctl.configure(ser) def configure_lo1_freq(self, ser, freq) -> None: self.lo1_synth.freq = freq self.lo1_synth.configure_freq(ser) def configure_lo2_freq(self, ser, freq) -> None: self.lo2_synth.freq = freq self.lo2_synth.configure_freq(ser)
App Class
It's possible to implement different types of front end depending on the hardware. Initially, the front end type will be frequency conversion (more properly, frequency down conversion). This may be done using direct conversion via a single LO and mixer stage or using a super heterodyne approach. Base band must be somewhere within the ADC sampling range which is approximately 0.5 to 55 MHz.
from typing import ( List, Dict ) from rfblocks import ( hmc833, pe42420 ) from adcchan import ADCChan class SuperHetApp: <<superhet-config>> NOISE_FLOOR: Dict = { 'channels': { ADCCH1_NAME: { 400000: (-91.238, 5.062), 200000: (-97.886, 5.586), 100000: (-100.887, 5.566), 50000: (-104.222, 5.531), 20000: (-106.397, 5.562), 10000: (-110.599, 5.494), 5000: (-114.466, 5.605), 2000: (-118.074, 5.581), 1000: (-120.604, 5.547), 500: (-123.662, 5.647), 200: (-127.920, 5.609), 100: (-130.131, 5.466), 50: (-134.579, 5.560), 20: (-137.322, 5.585), 10: (-140.600, 5.524), 5: (-144.054, 5.567), 2: (-148.833, 5.588), 1: (-149.271, 5.558) } } } FREQ_RESPONSE: Dict = { 'channels': { ADCCH1_NAME: { 4.9: 2.083, 54.90: 2.083, 104.90: 1.571, 154.90: 1.569, 204.90: 1.587, 254.90: 1.473, 304.90: 1.568, 354.90: 1.499, 404.90: 1.558, 454.90: 1.667, 504.90: 1.604, 554.90: 1.596, 604.90: 1.629, 654.90: 1.647, 704.90: 1.718, 754.90: 1.767, 804.90: 1.729, 854.90: 1.575, 904.90: 1.653, 954.90: 1.614, 1004.90: 1.723, 1054.90: 1.863, 1104.90: 1.686, 1154.90: 1.743, 1204.90: 1.777, 1254.90: 1.724, 1304.90: 1.804, 1354.90: 1.905, 1404.90: 1.826, 1454.90: 1.865, 1504.90: 1.898, 1554.90: 2.013, 1604.90: 2.069, 1654.90: 1.991, 1704.90: 2.091, 1754.90: 2.122, 1804.91: 2.275, 1854.91: 2.407, 1904.91: 2.286, 1954.91: 2.005, 2004.91: 2.128, 2054.91: 2.316, 2104.91: 2.654, 2154.91: 2.628, 2204.91: 2.457, 2254.91: 2.424, 2304.91: 2.471, 2354.91: 2.619, 2404.91: 2.668, 2454.91: 2.936, 2504.91: 2.841, 2554.91: 2.912, 2604.91: 3.341, 2654.91: 3.317, 2704.91: 3.565, 2754.91: 3.584, 2804.91: 3.355, 2854.91: 3.127, 2904.91: 2.965, 2954.91: 2.903, 3004.91: 2.897, 3054.91: 3.119, 3104.91: 3.231, 3154.91: 3.020, 3204.91: 2.970, 3254.91: 3.002, 3304.91: 3.280, 3354.91: 3.434, 3404.91: 3.331, 3454.91: 3.277, 3504.91: 3.489, 3554.91: 4.341, 3604.91: 4.161, 3654.91: 4.148, 3704.91: 4.320, 3754.91: 5.010, 3804.91: 5.177, 3854.91: 4.701, 3904.91: 5.072, 3954.91: 5.529, 4004.91: 6.070, 4054.91: 6.081 } } } def __init__(self, hw_config: Dict = SUPERHET_APP_HWCONF, app_config: Dict = SUPERHET_APP_CONFIG) -> None: d: Dict = app_config['channels'] z = zip(d.keys(), hw_config, d.values()) self._channels: Dict = { chan_id: ADCChan(chan_id, hw_conf, chan_config) for chan_id, hw_conf, chan_config in z} for chan_id, nf in SuperHetApp.NOISE_FLOOR['channels'].items(): self._channels[chan_id].noise_floor_cal_data = nf for chan_id, fr in SuperHetApp.FREQ_RESPONSE['channels'].items(): self._channels[chan_id].freq_resp_cal_data = fr <<initialize>> @property def channels(self): return self._channels def configure_lo_freq(self, ser, chan_id, lo_freq): lo1_freq = SuperHetApp.SUPERHET_BANDPASS_CENTRE + lo_freq self._channels[chan_id].configure_lo1_freq(ser, lo1_freq)
Hardware configuration
ADCCH1_NAME: str = 'Ch 1' ADCCH2_NAME: str = 'Ch 2' FIRSTLO_CH1_HWCONF: Dict = { 'firstlo_plo': {'sen': 'D0', 'ld_sdo': 'C4', 'ref': 'D2'}, 'firstlo_sw': {'c1': 'D4', 'c2': 'D5'} } SECONDLO_CH1_HWCONF: Dict = { 'sen': 'D1', 'ld_sdo': 'C5', 'ref': 'D2' } STEPATTEN_CH1_HWCONF: Dict = {'le': 'D3'} # The channel hardware is configured to match the hardware # currently in place. SUPERHET_CH1_HWCONF: Dict = { 'lo1': FIRSTLO_CH1_HWCONF, 'lo2': SECONDLO_CH1_HWCONF, 'stepatten': STEPATTEN_CH1_HWCONF } SUPERHET_APP_HWCONF: List = [SUPERHET_CH1_HWCONF] SUPERHET_DEFAULT_FREQ = 1000.0 SUPERHET_BANDPASS_CENTRE = 1575.0 SUPERHET_ADC_CENTRE_FREQ = 35.0 SUPERHET_DEFAULT_ATTEN = 10.0 SUPERHET_LO1_FREQ: float = SUPERHET_BANDPASS_CENTRE + SUPERHET_DEFAULT_FREQ SUPERHET_LO2_FREQ: float = SUPERHET_BANDPASS_CENTRE + SUPERHET_ADC_CENTRE_FREQ SUPERHET_CHAN_CONFIG: Dict = { 'lo1': { 'freq': SUPERHET_LO1_FREQ, 'ref_freq': hmc833.DEFAULT_REF_FREQ, 'refsrc': hmc833.DEFAULT_REFSRC, 'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB, 'vco_mute': False, 'initial_sw_state': pe42420.State.RF1 }, 'lo2': { 'freq': SUPERHET_LO2_FREQ, 'ref_freq': hmc833.DEFAULT_REF_FREQ, 'refsrc': hmc833.DEFAULT_REFSRC, 'bufgain': hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB, 'vco_mute': False }, 'stepatten': { 'atten': SUPERHET_DEFAULT_ATTEN } } SUPERHET_APP_CONFIG: Dict = { 'channels': { ADCCH1_NAME: SUPERHET_CHAN_CONFIG } }
The SuperHet Service
MIN_ATT = 0.0 MAX_ATT = 31.75 ATT_STEP = 0.25 class SuperHetService(rpyc.Service): DEFAULT_CAPABILITIES = { "channels": [SuperHetApp.ADCCH1_NAME], SuperHetApp.ADCCH1_NAME: { "adcdma_channel": 0, "adc_attenuation": { "min": RP_ADC_MIN_ATT, "max": RP_ADC_MAX_ATT, "step": RP_ADC_ATT_STEP }, "fe_attenuation": { "min": MIN_ATT, "max": MAX_ATT, "step": ATT_STEP }, "freq": { "min": 5.0, "max": 1500.0, "step": 0.0 }, "cal_freq": { "min": 3.0, "max": 1510.0, "step": 50.0 }, "reflevel": { "min": RP_ADC_MIN_REF_LEVEL, "max": RP_ADC_MAX_REF_LEVEL, "step": 1.0 }, "min_span": RP_ADC_MIN_SPAN, "chan_type": ChannelCapabilities.SUPERHET, "bandwidth": 20.0, "adc_centre_freq": 35.0 } } def __init__(self, app, serial_device: str, baudrate: int = 0, capabilities: Dict = DEFAULT_CAPABILITIES) -> None: super().__init__() self._app = app self._ctl_device: str = serial_device self._baudrate: int = baudrate if baudrate == 0: self._baudrate = DEFAULT_BAUDRATE self._capabilities = { 'channels': capabilities['channels'] } for chan_name in capabilities['channels']: self._capabilities[chan_name] = ChannelCapabilities.from_dict( capabilities[chan_name]) self._ser = None self._client_count = 0 def on_connect(self, conn): if self._ser is None: self._ser = create_serial(self._ctl_device, self._baudrate) self.initialize() self._client_count += 1 def on_disconnect(self, conn): self._client_count -= 1 if self._client_count == 0: self._app.cleanup() self._ser.close() self._ser = None @property def capabilities(self): return self._capabilities @property def adc_channels(self): return self._app.channels def initialize(self): self._app.initialize(self._ser) def configure_stepatten(self, chan_id, att): ch = self.adc_channels[chan_id] ch.configure_stepatten(self._ser, att) def configure_lo1(self, chan_id): ch = self.adc_channels[chan_id] ch.configure_lo1(self._ser) def configure_lo1_freq(self, chan_id, lo_freq): ch = self.adc_channels[chan_id] ch.configure_lo1_freq(self._ser, lo_freq) def configure_lo2(self, chan_id): ch = self.adc_channels[chan_id] ch.configure_lo2(self._ser) def configure_lo2_freq(self, chan_id, lo_freq): ch = self.adc_channels[chan_id] ch.configure_lo2_freq(self._ser, lo_freq) def configure_lo_freq(self, chan_id, lo_freq): self._app.configure_lo_freq(self._ser, chan_id, lo_freq) @classmethod def save_config(cls): print(FrontEndApp.DEFAULT_APP_CONFIG) class RPyCServer(QObject): finished = pyqtSignal() def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("RPRF rpyc service on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname = self._host, port = self._port, auto_register = False, protocol_config = { 'allow_all_attrs': True, 'allow_setattr': True, 'allow_pickle': True}) self._server.start() self.finished.emit()
First LO Controller
from typing import ( Dict ) import serial import numpy as np from rfblocks import ( pe42420, PE42420Controller, hmc833, HMC833Controller) class FirstLO_Synth: GAIN_TABLE = { 25.0: hmc833.OutputBufferGain.MAXGAIN, 50.0: hmc833.OutputBufferGain.MAXGAIN, 100.0: hmc833.OutputBufferGain.MAXGAIN, 200.0: hmc833.OutputBufferGain.MAXGAIN, 300.0: hmc833.OutputBufferGain.MAXGAIN, 400.0: hmc833.OutputBufferGain.MAXGAIN, 500.0: hmc833.OutputBufferGain.MAXGAIN, 600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 900.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 1000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 1100.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 1200.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 1300.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 1400.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 1500.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 1600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 1700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 1800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 1900.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 2000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 2100.0: hmc833.OutputBufferGain.MAXGAIN, 2200.0: hmc833.OutputBufferGain.MAXGAIN, 2300.0: hmc833.OutputBufferGain.MAXGAIN, 2400.0: hmc833.OutputBufferGain.MAXGAIN, 2500.0: hmc833.OutputBufferGain.MAXGAIN, 2600.0: hmc833.OutputBufferGain.MAXGAIN, 2700.0: hmc833.OutputBufferGain.MAXGAIN, 2800.0: hmc833.OutputBufferGain.MAXGAIN, 2900.0: hmc833.OutputBufferGain.MAXGAIN, 3000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 3100.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 3200.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 3300.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 3400.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 3500.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 3600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 3700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_3DB, 3800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 3900.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4000.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4100.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4200.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4300.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4400.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4500.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4600.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4700.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_9DB, 4800.0: hmc833.OutputBufferGain.MAXGAIN_MINUS_6DB, 4900.0: hmc833.OutputBufferGain.MAXGAIN, 5000.0: hmc833.OutputBufferGain.MAXGAIN, 5100.0: hmc833.OutputBufferGain.MAXGAIN, 5200.0: hmc833.OutputBufferGain.MAXGAIN, 5300.0: hmc833.OutputBufferGain.MAXGAIN, 5400.0: hmc833.OutputBufferGain.MAXGAIN, 5500.0: hmc833.OutputBufferGain.MAXGAIN, 5600.0: hmc833.OutputBufferGain.MAXGAIN, 5700.0: hmc833.OutputBufferGain.MAXGAIN, 5800.0: hmc833.OutputBufferGain.MAXGAIN, 5900.0: hmc833.OutputBufferGain.MAXGAIN, 6000.0: hmc833.OutputBufferGain.MAXGAIN, } def __init__(self, chan_id: str, synth_hw: Dict, synth_config: Dict) -> None: self._chan_id = chan_id self._synth = hmc833(**synth_hw['firstlo_plo']) self._sw = pe42420(**synth_hw['firstlo_sw']) self._synth_ctl = HMC833Controller( f'{chan_id}_lo1', self._synth, synth_config) self._sw_ctl = PE42420Controller( f'{chan_id}_lo1_sw', self._sw, synth_config['initial_sw_state']) self._freq = synth_config['freq'] @property def synth_ctl(self) -> HMC833Controller: return self._synth_ctl @property def sw_ctl(self) -> PE42420Controller: return self._sw_ctl def initialize(self, ser: serial.Serial) -> bool: self.synth_ctl.initialize(ser) self.sw_ctl.initialize(ser) self.sw_ctl.configure(ser) @property def freq(self) -> float: return self._freq @freq.setter def freq(self, f: float) -> None: self._freq = f def configure_freq(self, ser: serial.Serial) -> bool: def gain(f): for freq in FirstLO_Synth.GAIN_TABLE.keys(): if freq > f: break return FirstLO_Synth.GAIN_TABLE[freq] if self.freq >= 3000.0: self.sw_ctl.state = pe42420.State.RF2 else: self.sw_ctl.state = pe42420.State.RF1 self.sw_ctl.configure(ser) self.synth_ctl.buffer_gain = gain(self.freq) self.synth_ctl.freq = self.freq self.synth_ctl.configure(ser)
Second LO Controller
from typing import ( Dict ) import serial from rfblocks import ( hmc833, HMC833Controller) class SecondLO_Synth: HMC833_SECONDLO_HWCONF: Dict = {'sen': 'D1', 'ld_sdo': 'C5', 'ref': 'D2'} DEFAULT_SECONDLO_FREQ = 1605.0 DEFAULT_SYNTH_CONFIG: Dict = { 'freq': DEFAULT_SECONDLO_FREQ, 'ref_freq': 50.0, 'refsrc': hmc833.ReferenceSource.INTERNAL, 'bufgain': hmc833.OutputBufferGain.MAXGAIN, 'divgain': hmc833.DividerGain.MAXGAIN_MINUS_3DB, 'vco_mute': False } def __init__(self, chan_id: str, synth_hw: Dict = HMC833_SECONDLO_HWCONF, synth_config: Dict = DEFAULT_SYNTH_CONFIG) -> None: self._chan_id = chan_id self._synth = hmc833(**synth_hw) self._synth_ctl = HMC833Controller(f'{chan_id}_lo2', self._synth, synth_config) self._freq = synth_config['freq'] @property def synth_ctl(self) -> HMC833Controller: return self._synth_ctl def initialize(self, ser: serial.Serial) -> bool: self.synth_ctl.initialize(ser) @property def freq(self) -> float: return self._freq @freq.setter def freq(self, f: float) -> None: self._freq = f def configure_freq(self, ser: serial.Serial) -> bool: self.synth_ctl.freq = self.freq self.synth_ctl.configure_freq(ser)
Imports
from typing import ( Optional, List, Dict ) import sys from time import sleep import signal import json from argparse import ArgumentParser import asyncio import serial from rfblocks import ( hmc833, HMC833Controller, pe42420, PE42420Controller, DEFAULT_BAUDRATE, create_serial, write_cmd ) from tam import ( RP_ADC_MIN_FREQUENCY, RP_ADC_MAX_FREQUENCY, RP_ADC_MIN_SPAN, RP_ADC_MAX_REF_LEVEL, RP_ADC_MIN_REF_LEVEL, RP_ADC_REF_LEVEL_STEP, RP_ADC_MIN_ATT, RP_ADC_MAX_ATT, RP_ADC_ATT_STEP, Capability, ChannelCapabilities ) import rpyc from rpyc.utils.server import ThreadedServer from qasync import ( QEventLoop, QThreadExecutor, asyncSlot, asyncClose ) from PyQt5.QtCore import ( Qt, QObject, QCoreApplication, QThread, QTimer, pyqtSignal, pyqtSlot ) import numpy as np from adcchan import ADCChan from superhet import SuperHetApp