Red Pitaya ADC/DAC Service
Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.
The Application
The adcdma.py
script provides application level access to the Zynq
FPGA CPUs and programmable logic (PL) which in turn interface to the
Red Pitaya ADC and DAC hardware. A description of and code for the
programmable logic, kernel driver, user space library and associated
Python module is provided here: DMA on the Red Pitaya/Zynq.
# Generated from rp-adc-dma.org # import faulthandler from argparse import ArgumentParser import rpyc from service import RPyCServer, ADCService def main(): faulthandler.enable() parser = ArgumentParser(description="RedPitaya ADC service.") parser.add_argument("-A", "--ipaddr", default="127.0.0.1", help="IP address for to bind the RPyC server instance") parser.add_argument("-P", "--port", default=18900, type=int, help="TCP port for the RPyC server instance") args = parser.parse_args() # Setting a large CHUNK_SIZE should improve network performance over # the default value of 64K. # For more detail see: # https://github.com/tomerfiliba-org/rpyc/issues/398 rpyc.core.SocketStream.MAX_IO_CHUNK = 35 * 1024 * 1024 rpyc.core.Channel.COMPRESSION_THRESHOLD = 64 * 1024 * 1024 server = RPyCServer(ADCService(), args.ipaddr, args.port) server.run()
The ADCService
The Python remote procedure call framework RPyC is used to create a network service which allows the Red Pitaya ADC channels to be accessed remotely.
The ADCService
implements the functionality which is accessed via
RPyC. It's a simple matter to use the standard Python REPL to interact
with the service.
>>> import rpyc >>> import numpy as np >>> >>> adcsvc = rpyc.connect("192.168.0.155", 18900) >>> adcsvc.root.initialize() True >>> chanid = 0 >>> adcchan = adcsvc.root.allocate_channel(chanid, 1, 0.0, 0, 8*1024, 8*1024, "F")
Sampled data is now read using the allocated ADC channel object. Note that the sample data is returned as Numpy array of complex64 I/Q data.
>>> data = np.array(adcchan.next_chunk()) >>> np.set_printoptions(precision=1, suppress=True, edgeitems=10) >>> data array([-7442.9+2903.1j, -7144.9+2999.5j, -6858. +3087.6j, -6555.2+3155.2j, -6225.9+3195.2j, -5841.2+3189.j , -5463.6+3166.8j, -5080.9+3121.2j, 475.7 -471.9j, 461.4 -481.7j, ..., -570.1 -667.7j, -603. -670.8j, -632.4 -668.2j, -650.7 -653.2j, -664.2 -633.6j, -695.8 -630.5j, -716.2 -616.5j, -728.7 -595.4j, -755.5 -585.8j, -754.1 -554.5j], dtype=complex64) >>> len(data) 8192
Channel attenuation is adjustable between 0.0 and 31.75 dB. Note that in the code below, after flushing the ADC channel there are still a few samples in the data which remain from the previous attenuation setting. This is due to the DMA latency and will beb of the order of 10 samples.
>>> adcsvc.root.set_channel_attenuation(chanid, 0.0) >>> adcsvc.root.flush_channel(chanid) >>> data = np.array(adcchan.next_chunk()) >>> np.max(np.abs(data)) 9443.004 >>> adcsvc.root.set_channel_attenuation(chanid, 10.0) >>> adcsvc.root.flush_channel(chanid) >>> data = np.array(adcchan.next_chunk()) >>> np.max(np.abs(data)) 9363.997 >>> np.max(np.abs(data[10:])) 3087.9922 >>> adcsvc.root.set_channel_attenuation(chanid, 20.0) >>> adcsvc.root.flush_channel(chanid) >>> data = np.array(adcchan.next_chunk()) >>> np.max(np.abs(data)) 2805.0051 >>> np.max(np.abs(data[10:])) 1082.9929 >>> adcsvc.root.free_channel(chanid)
Raw samples acquired by the ADC (and prior to processing by the digital
down conversion) can accessed by allocating the raw buffer. The acquired
samples are from both channels and are interleaved. In the code below,
samples from the first channel are accessed using data[::2]
and from the
second channel using data[1::2]
.
rawbuff = adcsvc.root.allocate_raw_buffer() >>> data = np.array(rawbuff.get_next_chunk()) >>> data[::2] array([-35, -31, -27, -34, -32, -42, -27, -35, -31, -27, ..., -32, -32, -39, -37, -27, -52, -29, -35, -34, -38], dtype=int16) >>> data[1::2] >>> array([ 229, -320, -796, -1096, -1123, -866, -391, 150, 670, 990, ..., 977, 651, 124, -428, -872, -1112, -1080, -787, -297, 250], dtype=int16) >>> adcsvc.free_raw_buffer()
A small command line script which pulls together some of the code snippets above is provided here: ADC service test script.
import rpyc from rpyc.utils.server import ThreadedServer import _axidma from buffer import ADCBuffer from channel import ADCChannel from dacchannel import DACChannel from attenuator import ADCAttenuator class InvalidChannelException(Exception): """Raised when an invalid channel access is attempted. """ pass class ADCService(rpyc.Service): ADC_CLOCK = {'122-16': 122.88, '125-14': 125.0} ADC_BITS = {'122-16': 16, '125-14': 14} DAC_BITS = {'122-16': 14, '125-14': 14} RAW_CHAN_ID = 2 def __init__(self): super().__init__() self._adc_type = None self.adc_channels = [None, None] self.adc_channel_cb = [None, None] self.dac_channels = [None, None] self._raw_buffer = None self._axidma_dev = None self._attenuator = None self._connect_count = 0 self._initialized = False def on_connect(self, conn): self._connect_count += 1 def on_disconnect(self, conn): self._connect_count -= 1 if self.connect_count == 0: self.cleanup() @property def connect_count(self): return self._connect_count def initialize(self, adc_type='122-16', cfg_device='uio0', dac_cfg_device='uio1', sw_device='uio2', sw_device_2='uio3', spi_device='spidev1.0', gpio_device='gpiochip1'): if self._initialized is False: self._adc_type = adc_type self._cfg_device = cfg_device self._dac_cfg_device = dac_cfg_device self._sw_devices = [sw_device, sw_device_2] self._axidma_dev = _axidma.lib.axidma_init() if self._axidma_dev is None: return False self._rx_chans = _axidma.lib.axidma_get_dma_rx(self._axidma_dev) self._attenuator = ADCAttenuator( '/dev/'+spi_device, '/dev/'+gpio_device) self._initialized = True return True def finalize(self): self.cleanup() def cleanup(self): self._initialized = False for index, ch in enumerate(self.adc_channels): if ch is not None: ch.close() self.adc_channels[index] = None for index, dac_ch in enumerate(self.dac_channels): if dac_ch is not None: dac_ch.close() self.dac_channels[index] = None if self._raw_buffer is not None: self.free_raw_buffer() if self._axidma_dev is not None: _axidma.lib.axidma_destroy(self._axidma_dev) self._axidma_dev = None if self._attenuator is not None: self._attenuator.close() self._attenuator = None @property def adc_clock(self): return ADCService.ADC_CLOCK[self._adc_type] @property def adc_resolution(self): return ADCService.ADC_BITS[self._adc_type] def allocate_channel( self, adc_chan, decimation_rate, lo_freq, dc_offset, buffer_size, chunk_size, double_buffer=True, prefill_buffers=True, sample_callback_fn=None): if self._axidma_dev is None: return None if self.adc_channels[adc_chan] is not None: raise InvalidChannelException( 'Channel already allocated.' ) if adc_chan < 0 or adc_chan > self._rx_chans.len: raise InvalidChannelException( f'No such channel id: {adc_chan}' ) chan_id = self._rx_chans.data[adc_chan] if sample_callback_fn is not None: self.adc_channel_cb[adc_chan] = sample_callback_fn else: self.adc_channel_cb[adc_chan] = None ch = ADCChannel( self, adc_chan, chan_id, decimation_rate, lo_freq, dc_offset, buffer_size, chunk_size, cfg_device=self._cfg_device, sw_device=self._sw_devices[adc_chan], double_buffer=double_buffer, sample_callback_fn=self.adc_channel_cb[adc_chan] ) ch_status = ch.initialize(self._axidma_dev, prefill_buffers) if ch_status is True: self.adc_channels[adc_chan] = ch return ch def allocate_dac_channel(self, dac_chan): if self.dac_channels[dac_chan] is not None: raise InvalidChannelException( 'Channel already allocated.' ) if dac_chan not in [0, 1]: raise InvalidChannelException( f'No such channel id: {adc_chan}' ) ch = DACChannel(self, dac_chan, cfg_device=self._dac_cfg_device) ch_status = ch.initialize() if ch_status is True: self.dac_channels[dac_chan] = ch return ch def flush_channel(self, adc_chan): for index, ch in enumerate(self.adc_channels): if index == adc_chan and ch is not None: ch.flush_buffers() def free_channel(self, adc_chan): for index, ch in enumerate(self.adc_channels): if index == adc_chan and ch is not None: ch.close() self.adc_channels[index] = None def free_dac_channel(self, dac_chan): for index, ch in enumerate(self.dac_channels): if index == dac_chan and ch is not None: ch.close() self.dac_channels[index] = None @property def available_channels(self): return [i for i, ch in enumerate(self.adc_channels) if ch is None] @property def available_dac_channels(self): return [i for i, ch in enumerate(self.dac_channels) if ch is None] def get_next_chunk(self, adc_chan): try: ch = self.adc_channels[adc_chan] if ch is None: raise InvalidChannelException( f'Channel {adc_chan} is uninitialized') except IndexError: raise InvalidChannelException( f'Channel {adc_chan} is an invalid channel id' ) return ch.next_chunk() def channel_attenuation(self, adc_chan): return self._attenuator.attenuation(adc_chan) def set_channel_attenuation(self, adc_chan, att): self._attenuator.set_attenuation(adc_chan, att) self._attenuator.update_attenuation(adc_chan) def allocate_raw_buffer(self, buffer_size=64*1024, chunk_size=16*1024): if self._raw_buffer is None: raw_chan_id = self._rx_chans.data[ADCService.RAW_CHAN_ID] self._raw_buffer = ADCBuffer(self, None, self._axidma_dev, raw_chan_id, buffer_size, chunk_size, '<i2') if self._raw_buffer.allocate() is False: return None self._raw_buffer.fill(wait=True) return self._raw_buffer def free_raw_buffer(self): if self._raw_buffer is not None: self._raw_buffer.close() self._raw_buffer = None class RPyCServer(): def __init__(self, serviceInst, host, port): super().__init__() self._serviceInst = serviceInst self._host = host self._port = port def run(self): print("Run ThreadedServer on {}:{}".format(self._host, self._port)) self._server = ThreadedServer( self._serviceInst, hostname=self._host, port=self._port, protocol_config={ 'allow_all_attrs': True, 'allow_setattr': True, 'allow_pickle': True}) self._server.start()
ADCAttenuator
ADC channel attenuators are controlled using the Zynq SPI and GPIO devices. Figure 1 shows the connections from the Redpitaya board to the attenuator modules. The ADCAttenuator class code makes use of the Python periphery package to control the attenuator devices.
Channel attenuators can be controlled via The ADCService using the
set_channle_attenuation
and channel_attenuation
methods.
from periphery import SPI, GPIO class ADCAttenuator: MIN_ATTENUATION = 0.0 MAX_ATTENUATION = 31.75 GPIO_0 = 0 GPIO_1 = 1 SPI_SPEED = 1000000 def __init__(self, spi_device, gpio_device): self.spi_device = spi_device self.gpio_device = gpio_device self._spi = SPI(self.spi_device, 0, ADCAttenuator.SPI_SPEED) self._le = [ GPIO(self.gpio_device, ADCAttenuator.GPIO_0, "out"), GPIO(self.gpio_device, ADCAttenuator.GPIO_1, "out")] self._attenuation = [ ADCAttenuator.MIN_ATTENUATION, ADCAttenuator.MIN_ATTENUATION] def close(self): if self._spi is not None: self._spi.close() self._spi = None for le in self._le: if le is not None: le.close() self._le = None def attenuation(self, ch): return self._attenuation[ch] def set_attenuation(self, ch, att): if att < ADCAttenuator.MIN_ATTENUATION: self._attenuation[ch] = MIN_ATTENUATION elif att > ADCAttenuator.MAX_ATTENUATION: self._attenuation[ch] = MAX_ATTENUATION else: self._attenuation[ch] = att def update_attenuation(self, ch): att = self._attenuation[ch] self._spi.transfer([int('{:08b}'.format(round(att*4))[::-1], 2) >> 1]) self._le[ch].write(True) self._le[ch].write(False)
ADCChannel
The ADC channel down converter design (DDC) is summarized in Figure 2. The DDC core configuration registers are accessed via a Linux userspace I/O (UIO) device. The stream switch configuration is also accessed as a register via separate UIO devices. (More details of the programmable hardware can be found here: ADC DMA Zynq Programmable Logic.)
Core and stream switch configuration registers are memory mapped from the
associated UIO devices using ADCChannel.map_registers
,
map_cfg_registers
, and map_sw_registers
. Table 1 lists the associated
UIO devices with Tables 2 and 3 listing the memory mapped register addresses.
Configuration |
Device |
---|---|
DDC cores |
/sys/class/uio/uio0 |
Stream Sw. 1 |
/sys/class/uio/uio2 |
Stream Sw. 2 |
/sys/class/uio/uio3 |
Register |
Ch 1 Addr |
Ch 2 Addr |
---|---|---|
LO Phase Incr. |
0:4 |
8:12 |
Decimation Rate |
4:6 |
12:14 |
DC Offset |
6:8 |
14:16 |
Register |
Addr |
---|---|
Switch config. |
0x40:0x44 |
Switch update |
0:4 |
ADCChannel class parameters are set by the hardware in use and the programmable logic being used to implement the DDC. Table 4 summarises the relevant parameters.
Parameter |
Value |
Description |
---|---|---|
ADC_MIN_FREQUENCY |
0.5 MHz |
Determined by the ADC bandwidth and any anti-alias filtering used |
ADC_MAX_FREQUENCY |
55.0 MHz |
|
MIN_DECIMATION_RATE |
4 |
Determined by the hardware CIC filter configuration |
MAX_DECIMATION_RATE |
8192 |
|
LO_DDS_PHASE_WIDTH |
30 |
Determined by the hardware DDS NCO Configuration |
DMA_LATENCY |
12 |
A characteristic of the Zynq DMA engine |
DMA_DATA_FORMAT |
'F' |
This is np.complex64 (single precision complex) |
from typing import ( Optional, List, Dict ) import os from os import O_RDWR from pathlib import Path from mmap import mmap import numpy as np from scipy import signal from data import SweepParameters from buffer import ADCBuffer from threadedbuffer import ADCThreadedBuffer class ADCChannel(object): ADC_MIN_FREQUENCY: float = 0.5 ADC_MAX_FREQUENCY: float = 55.0 MIN_DECIMATION_RATE: int = 4 MAX_DECIMATION_RATE: int = 8192 MAX_DC_OFFSET: int = 1<<13 MIN_DC_OFFSET: int = -(1<<13) LO_DDS_PHASE_WIDTH: int = 30 DMA_LATENCY: int = 12 # The DMA data format is set by the design of the programmable logic # in the down converter. DMA_DATA_FORMAT: str = 'F' _cfg_fd = None _cfg_mem = None @classmethod def map_registers(cls, uio_device: str): uio_path = Path(f'/sys/class/uio/{uio_device}') size_path = uio_path / 'maps/map0/size' dev_path = f'/dev/{uio_device}' with open(size_path) as size_fd: mem_len = int(size_fd.read(), 16) reg_fd = os.open(dev_path, O_RDWR) reg_mem = mmap(reg_fd, mem_len) return reg_fd, reg_mem @classmethod def map_cfg_registers(cls, cfg_uio_device: str): if cls._cfg_mem is None: cls._cfg_fd, cls._cfg_mem = cls.map_registers(cfg_uio_device) @classmethod def unmap_cfg_registers(cls): if cls._cfg_mem is not None: cls._cfg_mem.close() cls._cfg_mem = None os.close(cls._cfg_fd) def __init__(self, adc, adc_id: int, chan_id: int, decimation_rate: int = 1, lo_freq: float = 0.0, dc_offset: int = 0, buffer_size: int = 1024 * 128, chunk_size: int = 1024 * 8, double_buffer = True, sample_callback_fn = None, cfg_device = 'uio0', sw_device = 'uio2'): """Creates an instance of ``ADCChannel``. :param adc: a reference to the ADCService instance :type adc: ADCService :param adc_id: the ADC channel (one of ``[0, 1]``) :type adc_id: int :param chan_id: the DMA channel id as returned by ``axidma_get_dma_rx`` :type chan_id: int :param decimation_rate: the ADC channel decimation rate :type decimation_rate: int :param lo_freq: the ADC channel LO frequency setting (in MHz) :type lo_freq: float :param buffer_size: the DMA buffer size in units of complex64 :type buffer_size: int :param chunk_size: data transfer chunk size in units of complex64 :type chunk_size: int """ self._adc = adc self._lo_dds_clock = adc.adc_clock self._max_lo_freq = adc.adc_clock / 2 self._adc_id: int = adc_id self._chan_id: int = chan_id self._cfg_device = cfg_device self._sw_device = sw_device self._sw_fd = None self._sw_mem = None self.map_sw_registers() ADCChannel.map_cfg_registers(self._cfg_device) self._decimation_rate = 0 self.decimation_rate = decimation_rate self._lo_freq = 0 self.lo_freq = lo_freq self._dc_offset = 0 self.dc_offset = dc_offset self._buffer_size = buffer_size self._chunk_size = chunk_size self._dma_dev = None self._dtype = ADCChannel.DMA_DATA_FORMAT self._output_buffer = None self._output_buffer2 = None self._current_buffer = None self._double_buffer = double_buffer self._sample_cb_fn = sample_callback_fn def map_sw_registers(self) -> None: if self._sw_mem is None: self._sw_fd, self._sw_mem = ADCChannel.map_registers(self._sw_device) def unmap_sw_registers(self) -> None: if self._sw_mem is not None: self._sw_mem.close() self._sw_mem = None os.close(self._sw_fd) def initialize(self, axidma_dev, prefill_buffers=True): self._dma_dev = axidma_dev self._output_buffer = ADCThreadedBuffer( self._adc, self, self._dma_dev, self._chan_id, self.buffer_size, self.chunk_size, self._dtype) self._output_buffer.initialize(prefill_buffers) if self._double_buffer: self._output_buffer2 = ADCThreadedBuffer( self._adc, self, self._dma_dev, self._chan_id, self.buffer_size, self.chunk_size, self._dtype) self._output_buffer2.initialize(prefill_buffers) self._current_buffer = self._output_buffer return True def resize(self, new_size: int) -> None: if self.buffer_size == new_size: return if self.chunk_size > new_size: self.chunk_size = new_size self._buffer_size = new_size self._output_buffer.resize(new_size) if self._double_buffer: self._output_buffer2.resize(new_size) self._current_buffer = self._output_buffer @property def double_buffer(self) -> bool: return self._double_buffer @double_buffer.setter def double_buffer(self, double: bool) -> None: if double == self._double_buffer: return self._double_buffer = double if double: self._output_buffer2 = ADCThreadedBuffer( self._adc, self, self._dma_dev, self._chan_id, self.buffer_size, self.chunk_size, self._dtype) self._output_buffer2.initialize(prefill=False) else: if self._output_buffer2 is not None: self._output_buffer2.close() self._output_buffer2 = None def sweep(self, lo_list: List[float], rate: int, sample_len: int) -> Dict: """Collect samples over a range of LO frequencies. :param lo_list: List of LO frequencies to sweep over :type lo_list: List[float] :param rate: The decimation rate :type rate: int :param sample_len: The sample size to use :type sample_len: int """ self.decimation_rate = rate self.double_buffer = False # Allow an extra DMA_LATENCY samples at the beginning of the sample. # These will be thrown away in the returned sample arrays. self.resize(sample_len + ADCChannel.DMA_LATENCY) self.chunk_size = sample_len + ADCChannel.DMA_LATENCY samples = {} for i, lo in enumerate(lo_list): self.lo_freq = lo self.flush_buffers() samples[i] = np.array(self.next_chunk())[ADCChannel.DMA_LATENCY:] return samples def _configure_fft_sweep(self, start_freq: float, stop_freq: float, scaling: str='spectrum', rbw: Optional[float]=None, reqd_decimation_rate: Optional[int]=None): if start_freq < ADCChannel.ADC_MIN_FREQUENCY: start_freq = ADCChannel.ADC_MIN_FREQUENCY if stop_freq > ADCChannel.ADC_MAX_FREQUENCY: stop_freq = ADCChannel.ADC_MAX_FREQUENCY params = SweepParameters(start_freq, stop_freq, rbw, reqd_decimation_rate) lo_list = np.arange(params.f_start, params.f_stop, params.sample_span) return (params, lo_list) def fft_sweep(self, start_freq: float, stop_freq: float, rbw: float, adc_conv_factor: float) -> None: sweep_params, lo_list = self._configure_fft_sweep( float(start_freq), float(stop_freq), rbw=float(rbw)) self.double_buffer = False self.decimation_rate = sweep_params.decimation_rate sample_len = sweep_params.nperseg + sweep_params.lo_offset_bins self.resize(sample_len + ADCChannel.DMA_LATENCY) self.chunk_size = sample_len + ADCChannel.DMA_LATENCY if sweep_params.fft_bins >= sweep_params.sample_bins: sample_sizes = [ sweep_params.sample_bins for i in range(sweep_params.lo_count)] sample_sizes[-1] = sweep_params.fft_bins % sweep_params.sample_bins else: sample_sizes = [sweep_params.fft_bins+2] freq = np.array([]) pwr = np.array([]) for i, lo in enumerate(lo_list): self.lo_freq = lo self.flush_buffers() sample = np.array(self.next_chunk())[ADCChannel.DMA_LATENCY:] scaled_data = (adc_conv_factor*sample.real + (1j)*adc_conv_factor*sample.imag) f, Pxx_den = signal.welch( scaled_data, sweep_params.fs, detrend='linear', noverlap=sweep_params.nperseg*0.8, average='median', window=sweep_params.window, return_onesided=False, scaling='spectrum') freq = lo + f[:sample_sizes[i]] / 1e6 offset_bins = sweep_params.lo_offset_bins pwr = Pxx_den[offset_bins:(sample_sizes[i] + offset_bins)] if self._sample_cb_fn is not None: self._sample_cb_fn(freq, pwr, i, len(lo_list)) def next_chunk(self): if not self._current_buffer.data_available(): if self._double_buffer: if self._current_buffer is self._output_buffer: other_buffer = self._output_buffer2 else: other_buffer = self._output_buffer if not other_buffer.data_available(): # The other buffer is busy being filled. # Wait for data to be available in that buffer # before switching to it. if other_buffer.status == ADCThreadedBuffer.EMPTY: other_buffer.fill() other_buffer._data_available.wait() self._current_buffer.fill() if self._current_buffer is self._output_buffer: self._current_buffer = self._output_buffer2 else: self._current_buffer = self._output_buffer else: self._current_buffer.fill(wait=True) return self._current_buffer.next_chunk() def next_chunk_timed(self): if not self._current_buffer.data_available(): data = self.next_chunk() return (self.current_buffer.buffer_start_time, 0.0, self.sample_period, data) else: return (self.current_buffer.buffer_start_time, self.current_buffer.chunk_start_time, self.sample_period, self.next_chunk()) def flush_buffers(self): #print('ADCChannel.flush_buffers') self._output_buffer.fill(wait=True) if self._double_buffer: self._output_buffer2.fill(wait=True) self._current_buffer = self._output_buffer @property def chan_id(self): return self._chan_id @property def _phase_incr(self): return int((self._lo_freq * (1 << ADCChannel.LO_DDS_PHASE_WIDTH)) / self._lo_dds_clock) @property def current_buffer(self): return self._current_buffer @property def buffer_size(self): """The current size of the sample buffer. """ return self._buffer_size @property def chunk_size(self): """The size of the sample that is returned by :py:meth:`next_chunk` """ return self._chunk_size @chunk_size.setter def chunk_size(self, size): if size == self._chunk_size: return if size <= self.buffer_size: self._chunk_size = size self._output_buffer.chunk_size = size if self._double_buffer: self._output_buffer2.chunk_size = size @property def sample_rate(self): return (self._adc.adc_clock/self.decimation_rate) * 1e6 @property def sample_period(self): return 1.0/self.sample_rate @property def decimation_rate(self): return self._decimation_rate @decimation_rate.setter def decimation_rate(self, rate): if rate == self._decimation_rate: return if rate < 0: rate *= -1 self._decimation_rate = rate if rate > ADCChannel.MAX_DECIMATION_RATE: self._decimation_rate = ADCChannel.MAX_DECIMATION_RATE if self.decimation_rate in [1, 2, 3]: # Set direct sampling chan = 0x0 update = 0x2 self._sw_mem[0x40:0x44] = chan.to_bytes(4, 'little') self._sw_mem[0x0:0x4] = update.to_bytes(4, 'little') else: # Set decimated sampling chan = 0x1 update = 0x2 self._sw_mem[0x40:0x44] = chan.to_bytes(4, 'little') self._sw_mem[0x0:0x4] = update.to_bytes(4, 'little') if self._adc_id == 0: ADCChannel._cfg_mem[4:6] = self.decimation_rate.to_bytes( 2, 'little') else: ADCChannel._cfg_mem[12:14] = self.decimation_rate.to_bytes( 2, 'little') @property def lo_freq(self): """The current LO frequency in MHz. """ return self._lo_freq @lo_freq.setter def lo_freq(self, f): if f == self._lo_freq: return if f < 0.0: f *= -1.0 self._lo_freq = f if f > self._max_lo_freq: self._lo_freq = self._max_lo_freq if self._adc_id == 0: ADCChannel._cfg_mem[0:4] = self._phase_incr.to_bytes(4, 'little') else: ADCChannel._cfg_mem[8:12] = self._phase_incr.to_bytes(4, 'little') @property def dc_offset(self): """The current DC offset in ADC units. """ return self._dc_offset @dc_offset.setter def dc_offset(self, offset): if offset == self._dc_offset: return self._dc_offset = offset if offset > ADCChannel.MAX_DC_OFFSET: self._dc_offset = ADCChannel.MAX_DC_OFFSET if offset < ADCChannel.MIN_DC_OFFSET: self._dc_offset = ADCChannel.MIN_DC_OFFSET offset_bytes = self._dc_offset.to_bytes(2, 'little', signed=True) if self._adc_id == 0: ADCChannel._cfg_mem[6:8] = offset_bytes else: ADCChannel._cfg_mem[14:16] = offset_bytes def close(self): ADCChannel.unmap_cfg_registers() self.unmap_sw_registers() if self._output_buffer is not None: self._output_buffer.close() self._output_buffer = None if self._output_buffer2 is not None: self._output_buffer2.close() self._output_buffer2 = None
DACChannel
import os from os import O_RDWR from pathlib import Path from mmap import mmap import numpy as np class DACChannel: DEFAULT_FREQ = 10.0 MIN_AMPL = 0 MAX_AMPL = (1 << 15) - 1 DEFAULT_AMPL = 8192 LO_DDS_PHASE_WIDTH = 30 _reg_fd = None _reg_mem = None @classmethod def map_registers(cls, uio_device): if cls._reg_mem is None: uio_path = Path(f'/sys/class/uio/{uio_device}') size_path = uio_path / 'maps/map0/size' dev_path = f'/dev/{uio_device}' with open(size_path) as size_fd: mem_len = int(size_fd.read(), 16) cls._reg_fd = os.open(dev_path, O_RDWR) cls._reg_mem = mmap(cls._reg_fd, mem_len) return cls._reg_mem @classmethod def unmap_registers(cls): if cls._reg_mem is not None: cls._reg_mem.close() cls._reg_mem = None os.close(cls._reg_fd) def __init__(self, srv, dac_id: int, cfg_device = 'uio1') -> None: self._srv = srv self._dds_clock = srv.adc_clock self._dac_id: int = dac_id self._cfg_device = cfg_device self._min_freq = 0.0 self._max_freq = self._dds_clock / 2 self._freq = DACChannel.DEFAULT_FREQ self._ampl = DACChannel.DEFAULT_AMPL @property def _phase_incr(self): return int((self._freq * (1 << DACChannel.LO_DDS_PHASE_WIDTH)) / self._dds_clock) def initialize(self) -> bool: return True @property def freq(self) -> float: """The current output frequency (in MHz).""" return self._freq @freq.setter def freq(self, f: float) -> None: if f < self._min_freq: f = self._min_freq elif f > self._max_freq: f = self._max_freq self._freq = f DACChannel.map_registers(self._cfg_device) if self._dac_id == 0: DACChannel._reg_mem[0:4] = self._phase_incr.to_bytes(4, 'little') else: DACChannel._reg_mem[6:10] = self._phase_incr.to_bytes(4, 'little') DACChannel.unmap_registers() @property def ampl(self) -> int: return self._ampl @ampl.setter def ampl(self, a: int) -> None: if a < DACChannel.MIN_AMPL: a = DACChannel.MIN_AMPL elif a > DACChannel.MAX_AMPL: a = DACChannel.MAX_AMPL self._ampl = a DACChannel.map_registers(self._cfg_device) if self._dac_id == 0: DACChannel._reg_mem[4:6] = self._ampl.to_bytes(2, 'little') else: DACChannel._reg_mem[10:12] = self._ampl.to_bytes(2, 'little') DACChannel.unmap_registers() def close(self): DACChannel.unmap_registers()
ADCBuffer
import time import numpy as np import _axidma from _axidma import ffi @ffi.def_extern() def axidma_callback(ch, data): adc_buffer = ffi.from_handle(data) adc_buffer._async_transfer_complete() class ADCBuffer(object): EMPTY = 0 NON_EMPTY = 1 BUSY = 2 ERROR = 3 def __init__(self, srv, chan, dma_dev, dma_chan_id, size, chunk_size, dtype='F'): self._srv = srv self._chan = chan self._dma_dev = dma_dev self._dma_chan_id = dma_chan_id self._size = size self._chunk_size = chunk_size self._output_buffer = None self._iq_buffer = None self._chunk_offset = 0 self._dtype = np.dtype(dtype) self._status = ADCBuffer.EMPTY self._start_time = 0 self._error_code = 0 @property def chunk_size(self): return self._chunk_size @chunk_size.setter def chunk_size(self, size): if size <= self._size: self._chunk_size = size @property def sample_period(self): if self._chan is not None: return self._chan.sample_period else: return 1.0 / self._srv.adc_clock def allocate(self): self._output_buffer = _axidma.lib.axidma_malloc( self._dma_dev, self._size * self._dtype.itemsize) return True if self._output_buffer is not None else False def resize(self, new_size): _axidma.lib.axidma_free(self._dma_dev, self._output_buffer, self._size * self._dtype.itemsize) self._size = new_size return self.allocate() def fill(self, wait=False): if self.status == ADCBuffer.BUSY: # Waiting for a previous fill request to complete return True if wait is True: rc = _axidma.lib.axidma_oneway_transfer( self._dma_dev, self._dma_chan_id, self._output_buffer, self._size * self._dtype.itemsize, True) if rc < 0: self.status = ADCBuffer.ERROR self._error_code = rc print(f'Sync buffer fill error: {rc=}') return False self._start_time = time.clock_gettime_ns(time.CLOCK_MONOTONIC_RAW) self._iq_buffer = np.frombuffer( ffi.buffer(self._output_buffer, self._size * self._dtype.itemsize), dtype=self._dtype) self._chunk_offset = 0 self.status = ADCBuffer.NON_EMPTY else: self.status = ADCBuffer.BUSY self._cb_data = ffi.new_handle(self) _axidma.lib.axidma_set_callback( self._dma_dev, self._dma_chan_id, _axidma.lib.axidma_callback, self._cb_data) rc = _axidma.lib.axidma_oneway_transfer( self._dma_dev, self._dma_chan_id, self._output_buffer, self._size * self._dtype.itemsize, False) #print(f'fill: async buffer fill request, {rc=}') if rc < 0: _axidma.lib.axidma_set_callback( self._dma_dev, self._dma_chan_id, None, None) self.status = ADCBuffer.ERROR self._error_code = rc print(f'Async buffer fill error: {rc=}') return False return True def _async_transfer_complete(self): self._start_time = time.clock_gettime_ns(time.CLOCK_MONOTONIC_RAW) self._iq_buffer = np.frombuffer( ffi.buffer(self._output_buffer, self._size * self._dtype.itemsize), dtype=self._dtype) _axidma.lib.axidma_set_callback( self._dma_dev, self._dma_chan_id, ffi.NULL, ffi.NULL) self._chunk_offset = 0 self.status = ADCBuffer.NON_EMPTY #print('_async_transfer_complete') def get_next_chunk(self): if self.status == ADCBuffer.EMPTY or self.status == ADCBuffer.BUSY: return None if self._chunk_offset + self._chunk_size > self._size: # potential buffer overrun. self.status = ADCBuffer.EMPTY return None elif self._chunk_offset + self._chunk_size == self._size: # this is the last chunk for this buffer # set status to EMPTY and return last chunk self.status = ADCBuffer.EMPTY return self._iq_buffer[self._chunk_offset: self._chunk_offset + self._chunk_size] else: self._chunk_offset += self._chunk_size return self._iq_buffer[self._chunk_offset - self._chunk_size: self._chunk_offset] @property def buffer_start_time(self): """Return the absolute buffer start time (in ns). """ return self._start_time @property def chunk_start_time(self): """Return the chunk start time relative to the buffer start (in ns). """ return self._chunk_offset * self.sample_period @property def output_buffer(self): return self._output_buffer @property def status(self): return self._status @status.setter def status(self, s): self._status = s def close(self): if self.status == ADCBuffer.BUSY: _axidma.lib.axidma_set_callback( self._dma_dev, self._dma_chan_id, ffi.NULL, ffi.NULL) if self._output_buffer is not None: _axidma.lib.axidma_free(self._dma_dev, self._output_buffer, self._size * self._dtype.itemsize) self._output_buffer = None
ADCThreadedBuffer
import time from threading import ( Thread, Event, Lock ) import numpy as np import _axidma from _axidma import ffi class ADCThreadedBuffer(object): EMPTY = 0 NON_EMPTY = 1 BUSY = 2 ERROR = 3 def __init__(self, srv, chan, dma_dev, dma_chan_id, size, chunk_size, dtype='F'): self._srv = srv self._chan = chan self._dma_dev = dma_dev self._dma_chan_id = dma_chan_id self._size = size self._chunk_size = chunk_size self._output_buffer = None self._iq_buffer = None self._chunk_offset = 0 self._dtype = np.dtype(dtype) self._error_code = 0 self._start_time = 0 self._sample_period = 0.0 self._fill_thread = None self._status_lock = Lock() self._status = ADCThreadedBuffer.EMPTY self._data_available = Event() def initialize(self, prefill=True): self._output_buffer = _axidma.lib.axidma_malloc( self._dma_dev, self._size * self._dtype.itemsize) if prefill is True: self.fill(wait=True) self._chunk_offset = 0 else: # No data available yet self._data_available.clear() # The following indicates that the buffer needs to be filled self._chunk_offset = self._size + self._chunk_size def close(self): if self._output_buffer is not None: _axidma.lib.axidma_free(self._dma_dev, self._output_buffer, self._size * self._dtype.itemsize) self._output_buffer = None @property def chunk_size(self) -> int: return self._chunk_size @chunk_size.setter def chunk_size(self, new_size: int) -> None: self._chunk_size = new_size @property def sample_period(self): if self._chan is not None: return self._chan.sample_period else: return 1.0 / self._srv.adc_clock def resize(self, new_size): self._data_available.clear() _axidma.lib.axidma_free(self._dma_dev, self._output_buffer, self._size * self._dtype.itemsize) self._size = new_size with self._status_lock: self._status = ADCThreadedBuffer.EMPTY self._output_buffer = _axidma.lib.axidma_malloc( self._dma_dev, self._size * self._dtype.itemsize) def next_chunk(self): if self.data_available(): self._chunk_offset += self._chunk_size if self.is_empty(): self._data_available.clear() with self._status_lock: self._status = ADCThreadedBuffer.EMPTY return self._iq_buffer[self._chunk_offset - self._chunk_size: self._chunk_offset] else: self._data_available.wait() return self._iq_buffer[self._chunk_offset: self._chunk_offset + self._chunk_size] def data_available(self): return self._data_available.isSet() def is_empty(self): return (self._chunk_offset + self._chunk_size > self._size) def fill(self, wait=False): with self._status_lock: self._status = ADCThreadedBuffer.BUSY self._fill_thread = Thread(target=self._fill_task) self._fill_thread.start() if wait is True: #print(' waiting on _fill_task') self._data_available.wait() def _fill_task(self): rc = _axidma.lib.axidma_oneway_transfer( self._dma_dev, self._dma_chan_id, self._output_buffer, self._size * self._dtype.itemsize, True) if rc < 0: self._error_code = rc print(f'{self=}: Sync buffer fill error: {rc=}') return self._start_time = time.clock_gettime_ns(time.CLOCK_MONOTONIC_RAW) self._iq_buffer = np.frombuffer( ffi.buffer(self._output_buffer, self._size * self._dtype.itemsize), dtype=self._dtype) self._chunk_offset = 0 with self._status_lock: self._status = ADCThreadedBuffer.NON_EMPTY self._data_available.set() @property def status(self): with self._status_lock: status = self._status return status @property def buffer_start_time(self): """Return the absolute buffer start time (in ns). """ return self._start_time @property def data_start_time(self): """Return the absolute buffer start time (in ns). """ return self._start_time @property def chunk_start_time(self): """Return the chunk start time relative to the buffer start (in ns). """ return self._chunk_offset * self.sample_period
Imports
Testing
ADC service test script
import sys import signal from argparse import ArgumentParser import numpy as np import rpyc class InvalidChannelException(Exception): """Raised when an invalid channel access is attempted. """ pass class TestApp: DEFAULT_DECIMATION_RATE=1 DEFAULT_LO_FREQ=0.0 DEFAULT_DC_OFFSET=0 DEFAULT_SAMPLE_LEN=8*1024 DEFAULT_ATTENUATION=0.0 MAX_ATTENUATION=31.75 def __init__(self, adc_ip: str, adc_port: int): self._adc_ip: str = adc_ip self._adc_port: int = adc_port self.adcdma_service = None self._chan_id: int = None self._adc_chan = None def initialize_adcdma_service(self) -> bool: try: self.adcdma_service = rpyc.connect(self._adc_ip, self._adc_port) except ConnectionError as ce: print(ce) print('Please check to ensure that the Red Pitaya board is') print('powered on and connected to the network.') return False status = self.adcdma_service.root.initialize() return status def alloc_adc_chan(self, chan_id: int, rate: int=DEFAULT_DECIMATION_RATE, lo_freq: float=DEFAULT_LO_FREQ, dc_offset: int=DEFAULT_DC_OFFSET, sample_len: int=DEFAULT_SAMPLE_LEN) -> bool: if self._chan_id is None: try: self._adc_chan = self.adcdma_service.root.allocate_channel( chan_id, rate, lo_freq, dc_offset, sample_len, sample_len, 'F', double_buffer=True, prefill_buffers=False) except InvalidChannelException as ice: print(ice) return False else: self._chan_id = chan_id return True else: print(f"Channel '{self.chan_id}' is currently allocated.") print("Deallocate the current channel before allocating another.") return False @property def chan_attenuation(self): return self.adcdma_service.root.channel_attenuation(self._chan_id) @chan_attenuation.setter def chan_attenuation(self, atten): self.adcdma_service.root.set_channel_attenuation(self._chan_id, atten) @property def adcchan(self): """The ADCChannel instance for the currently allocated ADC channel. """ return self._adc_chan def free_adc_chan(self) -> None: if self._chan_id is not None: self.adcdma_service.root.free_channel(self._chan_id) self._chan_id = None def main(): default_adc_svc_host = "192.168.0.155" default_adc_svc_port = 18900 default_rate = TestApp.DEFAULT_DECIMATION_RATE default_lo_freq = TestApp.DEFAULT_LO_FREQ default_dcoffset = TestApp.DEFAULT_DC_OFFSET default_samplelen = TestApp.DEFAULT_SAMPLE_LEN default_attenuation = TestApp.DEFAULT_ATTENUATION # This ensures that Cntl-C will work as expected: signal.signal(signal.SIGINT, signal.SIG_DFL) parser = ArgumentParser(description="Red Pitaya test application") parser.add_argument("-a", "--adchost", default=default_adc_svc_host, help="IP address for the ADC DMA server instance") parser.add_argument("-p", "--adcport", default=default_adc_svc_port, type=int, help="TCP port for the ADC DMA server instance") parser.add_argument("-c", "--chanid", type=int, required=True, help="ADC channel to use") parser.add_argument("-r", "--rate", type=int, default=default_rate, help=f"Set ADC decimation rate. Default: {default_rate}") parser.add_argument("-f", "--lofreq", type=float, default=default_lo_freq, help=f"Set ADC LO frequency (in MHz). Default: {default_lo_freq}") parser.add_argument("-o", "--dcoffset", type=int, default=default_dcoffset, help=f"Set ADC channel DC offset. Default: {default_dcoffset}") parser.add_argument("-l", "--samplelen", type=int, default=default_samplelen, help=f"Length of ADC sample. Default: {default_samplelen}") parser.add_argument("-z", "--summarize", action="store_true", help="Print a summary of the captured data") parser.add_argument("-S", "--savefile", help="File to save captured ADC samples") parser.add_argument("-A", "--attenuation", type=float, default=default_attenuation, help=f"Set channel attenuation, 0.0 to 31.75 dB. Default: {default_attenuation}") args = parser.parse_args() app = TestApp(args.adchost, args.adcport) if app.initialize_adcdma_service() is False: print(f"Failed to initialize ADC service at {args.adchost}:{args.adcport}") sys.exit(-1) if app.alloc_adc_chan(args.chanid, args.rate, args.lofreq, args.dcoffset, args.samplelen) is False: print(f"Failed to allocate ADC channel (chan. id: {args.chan_id})") sys.exit(-2) atten = args.attenuation if atten > TestApp.MAX_ATTENUATION: atten = TestApp.MAX_ATTENUATION app.chan_attenuation = atten data = np.array(app.adcchan.next_chunk()) if args.savefile is None: if args.summarize: np.set_printoptions(precision=1, suppress=True, edgeitems=20) else: np.set_printoptions(precision=1, suppress=True, threshold=sys.maxsize) print(data) else: with open(args.savefile, 'wb') as fd: np.save(fd, data) app.free_adc_chan() if __name__ == '__main__': main()
Testing channel code
The following test script accesses the ADC DMA hardware directly through
the _axidma
module. In order not to conflict with the
rp_adcdma.service
the script should be run after that service has been
stopped:
An example script invocation:
$ cd /opt/lib/python/adcdma $ python testing.py -R 1000.0 sweep_params.sample_span=1.7554285714285713 sweep_params.nperseg=26154 sweep_params.bin_width=223.7297763794667 sweep_params.lo_offset_bins=5 sweep_params.decimation_rate=21 sweep_params.lo_offset=0.0011186488818973335 sweep_params.sample_bins=7846 sweep_params.fft_bins=8939 sample_sizes=[7846, 1093] lo_list=array([ 9. , 10.75542857]) acquisition took 0.0301 seconds fft took 0.3203 seconds fft_sweep took 0.3568 seconds sweep took 0.0219 seconds
import sys from time import time from argparse import ArgumentParser import numpy as np from scipy import signal import _axidma from data import ( SweepParameters ) from adccal import ( AdcCalibration ) from channel import ( ADCChannel ) from attenuator import ( ADCAttenuator ) RP_ADC_MIN_FREQUENCY: float = 0.5 RP_ADC_MAX_FREQUENCY: float = 55.0 RP_ADC_MIN_SPAN: float = 0.0005 RP_ADC_MAX_SPAN: float = RP_ADC_MAX_FREQUENCY-RP_ADC_MIN_FREQUENCY class ADCTestService: ADC_CLOCK = {'122-16': 122.88, '125-14': 125.0} ADC_BITS = {'122-16': 16, '125-14': 14} DAC_BITS = {'122-16': 14, '125-14': 14} def __init__(self): self._adc_type = None def initialize(self, adc_type='122-16'): self._adc_type = adc_type @property def adc_clock(self): return ADCTestService.ADC_CLOCK[self._adc_type] class ADCTestChannel(ADCChannel): def __init__(self, adc, adc_id: int, chan_id: int, decimation_rate: int = 1, lo_freq: float = 0.0, dc_offset: int = 0, buffer_size: int = 1024 * 128, chunk_size: int = 1024 * 8, cfg_device = 'uio0', sw_device = 'uio1', double_buffer = True): super().__init__(adc, adc_id, chan_id, decimation_rate, lo_freq, dc_offset, buffer_size, chunk_size, double_buffer, cfg_device, sw_device) def fft_sweep(self, lo_list, sweep_params, adc_conv_factor, sample_cb_fn=None): self.double_buffer = False self.decimation_rate = sweep_params.decimation_rate sample_len = sweep_params.nperseg + sweep_params.lo_offset_bins self.resize(sample_len + ADCChannel.DMA_LATENCY) self.chunk_size = sample_len + ADCChannel.DMA_LATENCY if sweep_params.fft_bins >= sweep_params.sample_bins: sample_sizes = [ sweep_params.sample_bins for i in range(sweep_params.lo_count)] sample_sizes[-1] = sweep_params.fft_bins % sweep_params.sample_bins else: sample_sizes = [sweep_params.fft_bins+2] freq = np.array([]) pwr = np.array([]) acquisition_time = 0.0 fft_time = 0.0 for i, lo in enumerate(lo_list): self.lo_freq = lo start = time() self.flush_buffers() sample = np.array(self.next_chunk())[ADCChannel.DMA_LATENCY:] scaled_data = (adc_conv_factor*sample.real + (1j)*adc_conv_factor*sample.imag) acquisition_time += time() - start start = time() f, Pxx_den = signal.welch( scaled_data, sweep_params.fs, detrend='linear', noverlap=sweep_params.nperseg*0.8, average='median', window=sweep_params.window, return_onesided=False, scaling='spectrum') freq = lo + f[:sample_sizes[i]] / 1e6 offset_bins = sweep_params.lo_offset_bins pwr = Pxx_den[offset_bins:(sample_sizes[i] + offset_bins)] if sample_cb_fn is not None: sample_cb_fn(freq, pwr) fft_time += time() - start print(f'acquisition took {acquisition_time:.4f} seconds') print(f'fft took {fft_time:.4f} seconds') if __name__ == '__main__': adc_type = '122-16' cfg_device = 'uio0' sw_device = 'uio2' sw_device_2 = 'uio3' spi_device = 'spidev1.0' gpio_device = 'gpiochip1' adc_start_freq = 9.0 adc_stop_freq = 11.0 rbw = 10000.0 adc_chan = 0 decimation_rate = 1 lo_freq = 0.0 dc_offset = 0 parser = ArgumentParser( description='Testing additions to the adcdma code.' ) parser.add_argument( "-S", "--start_freq", default=adc_start_freq, type=float, help=f"ADC sweep start freq. in MHz. (Default={adc_start_freq})" ) parser.add_argument( "-E", "--end_freq", default=adc_stop_freq, type=float, help=f"ADC sweep end freq. in MHz. (Default={adc_stop_freq})" ) parser.add_argument( "-R", "--rbw", default=rbw, type=float, help=f"Sweep resolution BW in Hz. (Default={rbw})" ) args = parser.parse_args() rbw = args.rbw / 1e6 adc_start_freq = args.start_freq adc_stop_freq = args.end_freq if adc_start_freq < RP_ADC_MIN_FREQUENCY: adc_start_freq = RP_ADC_MIN_FREQUENCY if adc_stop_freq > RP_ADC_MAX_FREQUENCY: adc_stop_freq = RP_ADC_MAX_FREQUENCY sweep_params = SweepParameters(adc_start_freq, adc_stop_freq, rbw) lo_list = np.arange( sweep_params.f_start, sweep_params.f_stop, sweep_params.sample_span) if sweep_params.fft_bins >= sweep_params.sample_bins: sample_sizes = [ sweep_params.sample_bins for i in range(sweep_params.lo_count)] sample_sizes[-1] = sweep_params.fft_bins % sweep_params.sample_bins else: sample_sizes = [sweep_params.fft_bins+2] adc_conv_factor = 1.0 / AdcCalibration.adc_to_volts(adc_chan) print(f'{sweep_params.sample_span=}') print(f'{sweep_params.nperseg=}') print(f'{sweep_params.bin_width=}') print(f'{sweep_params.lo_offset_bins=}') print(f'{sweep_params.decimation_rate=}') print(f'{sweep_params.lo_offset=}') print(f'{sweep_params.sample_bins=}') print(f'{sweep_params.fft_bins=}') print(f"{sample_sizes=}") print(f"{lo_list=}") testService = ADCTestService() testService.initialize() axidma_dev = _axidma.lib.axidma_init() if axidma_dev is None: print("Can't initialize axidma device!") sys.exit(-1) rx_chans = _axidma.lib.axidma_get_dma_rx(axidma_dev) attenuator = ADCAttenuator('/dev/'+spi_device, '/dev/'+gpio_device) attenuator.set_attenuation(adc_chan, 0.0) attenuator.update_attenuation(adc_chan) chan_id = rx_chans.data[adc_chan] ch = ADCTestChannel(testService, adc_chan, chan_id) status = ch.initialize(axidma_dev, prefill_buffers=False) start = time() ch.fft_sweep(lo_list - sweep_params.lo_offset, sweep_params, adc_conv_factor) duration = time() - start print(f'fft_sweep took {duration:.4f} seconds') start = time() samples = ch.sweep(lo_list - sweep_params.lo_offset, sweep_params.decimation_rate, sweep_params.nperseg + sweep_params.lo_offset_bins) duration = time() - start print(f'sweep took {duration:.4f} seconds') ch.close()