Open RF Prototyping

Red Pitaya ADC/DAC Service

Note that this page is automatically generated from an org-mode source file. The Python code for this reference design is also automatically generated from this file using org-babel-tangle.

The Application

The adcdma.py script provides application level access to the Zynq FPGA CPUs and programmable logic (PL) which in turn interface to the Red Pitaya ADC and DAC hardware. A description of and code for the programmable logic, kernel driver, user space library and associated Python module is provided here: DMA on the Red Pitaya/Zynq.

"""Package for the adcdma.py application."""

__app_name__ = 'adcdma'
__version__ = '0.1.0'
"""adcdma application entry point."""

import adcdma


if __name__ == '__main__':
    adcdma.main()
# Generated from rp-adc-dma.org
#
import faulthandler
from argparse import ArgumentParser
import rpyc

from service import RPyCServer, ADCService


def main():

    faulthandler.enable()
    parser = ArgumentParser(description="RedPitaya ADC service.")

    parser.add_argument("-A", "--ipaddr", default="127.0.0.1",
                        help="IP address for to bind the RPyC server instance")
    parser.add_argument("-P", "--port", default=18900, type=int,
                        help="TCP port for the RPyC server instance")
    args = parser.parse_args()

    # Setting a large CHUNK_SIZE should improve network performance over
    # the default value of 64K.
    # For more detail see:
    #    https://github.com/tomerfiliba-org/rpyc/issues/398
    rpyc.core.SocketStream.MAX_IO_CHUNK = 35 * 1024 * 1024
    rpyc.core.Channel.COMPRESSION_THRESHOLD = 64 * 1024 * 1024
    server = RPyCServer(ADCService(), args.ipaddr, args.port)
    server.run()

The ADCService

The Python remote procedure call framework RPyC is used to create a network service which allows the Red Pitaya ADC channels to be accessed remotely.

The ADCService implements the functionality which is accessed via RPyC. It's a simple matter to use the standard Python REPL to interact with the service.

>>> import rpyc
>>> import numpy as np
>>>
>>> adcsvc = rpyc.connect("192.168.0.155", 18900)
>>> adcsvc.root.initialize()
True
>>> chanid = 0
>>> adcchan = adcsvc.root.allocate_channel(chanid, 1, 0.0, 0, 8*1024, 8*1024, "F")

Sampled data is now read using the allocated ADC channel object. Note that the sample data is returned as Numpy array of complex64 I/Q data.

>>> data = np.array(adcchan.next_chunk())
>>> np.set_printoptions(precision=1, suppress=True, edgeitems=10)
>>> data
array([-7442.9+2903.1j, -7144.9+2999.5j, -6858. +3087.6j, -6555.2+3155.2j,
       -6225.9+3195.2j, -5841.2+3189.j , -5463.6+3166.8j, -5080.9+3121.2j,
         475.7 -471.9j,   461.4 -481.7j, ...,  -570.1 -667.7j,
        -603.  -670.8j,  -632.4 -668.2j,  -650.7 -653.2j,  -664.2 -633.6j,
        -695.8 -630.5j,  -716.2 -616.5j,  -728.7 -595.4j,  -755.5 -585.8j,
        -754.1 -554.5j], dtype=complex64)
>>> len(data)
8192

Channel attenuation is adjustable between 0.0 and 31.75 dB. Note that in the code below, after flushing the ADC channel there are still a few samples in the data which remain from the previous attenuation setting. This is due to the DMA latency and will beb of the order of 10 samples.

>>> adcsvc.root.set_channel_attenuation(chanid, 0.0)
>>> adcsvc.root.flush_channel(chanid)
>>> data = np.array(adcchan.next_chunk())
>>> np.max(np.abs(data))
9443.004

>>> adcsvc.root.set_channel_attenuation(chanid, 10.0)
>>> adcsvc.root.flush_channel(chanid)
>>> data = np.array(adcchan.next_chunk())
>>> np.max(np.abs(data))
9363.997
>>> np.max(np.abs(data[10:]))
3087.9922

>>> adcsvc.root.set_channel_attenuation(chanid, 20.0)
>>> adcsvc.root.flush_channel(chanid)
>>> data = np.array(adcchan.next_chunk())
>>> np.max(np.abs(data))
2805.0051
>>> np.max(np.abs(data[10:]))
1082.9929

>>> adcsvc.root.free_channel(chanid)

Raw samples acquired by the ADC (and prior to processing by the digital down conversion) can accessed by allocating the raw buffer. The acquired samples are from both channels and are interleaved. In the code below, samples from the first channel are accessed using data[::2] and from the second channel using data[1::2].

rawbuff = adcsvc.root.allocate_raw_buffer()
>>> data = np.array(rawbuff.get_next_chunk())
>>> data[::2]
array([-35, -31, -27, -34, -32, -42, -27, -35, -31, -27, ..., -32, -32,
       -39, -37, -27, -52, -29, -35, -34, -38], dtype=int16)
>>> data[1::2]
>>> array([  229,  -320,  -796, -1096, -1123,  -866,  -391,   150,   670,
             990, ...,   977,   651,   124,  -428,  -872, -1112, -1080,  -787,
            -297,   250], dtype=int16)
>>> adcsvc.free_raw_buffer()

A small command line script which pulls together some of the code snippets above is provided here: ADC service test script.

import rpyc
from rpyc.utils.server import ThreadedServer

import _axidma
from buffer import ADCBuffer
from channel import ADCChannel
from dacchannel import DACChannel
from attenuator import ADCAttenuator


class InvalidChannelException(Exception):
    """Raised when an invalid channel access is attempted.
    """
    pass


class ADCService(rpyc.Service):

    ADC_CLOCK = {'122-16': 122.88,
                 '125-14': 125.0}
    ADC_BITS = {'122-16': 16,
                '125-14': 14}
    DAC_BITS = {'122-16': 14,
                '125-14': 14}

    RAW_CHAN_ID = 2

    def __init__(self):
        super().__init__()
        self._adc_type = None
        self.adc_channels = [None, None]
        self.adc_channel_cb = [None, None]
        self.dac_channels = [None, None]
        self._raw_buffer = None
        self._axidma_dev = None
        self._attenuator = None
        self._connect_count = 0
        self._initialized = False

    def on_connect(self, conn):
        self._connect_count += 1

    def on_disconnect(self, conn):
        self._connect_count -= 1
        if self.connect_count == 0:
            self.cleanup()

    @property
    def connect_count(self):
        return self._connect_count

    def initialize(self, adc_type='122-16',
                   cfg_device='uio0',
                   dac_cfg_device='uio1',
                   sw_device='uio2',
                   sw_device_2='uio3',
                   spi_device='spidev1.0',
                   gpio_device='gpiochip1'):
        if self._initialized is False:
            self._adc_type = adc_type
            self._cfg_device = cfg_device
            self._dac_cfg_device = dac_cfg_device
            self._sw_devices = [sw_device, sw_device_2]
            self._axidma_dev = _axidma.lib.axidma_init()
            if self._axidma_dev is None:
                return False
            self._rx_chans = _axidma.lib.axidma_get_dma_rx(self._axidma_dev)
            self._attenuator = ADCAttenuator(
                '/dev/'+spi_device, '/dev/'+gpio_device)
            self._initialized = True
        return True

    def finalize(self):
        self.cleanup()

    def cleanup(self):
        self._initialized = False
        for index, ch in enumerate(self.adc_channels):
            if ch is not None:
                ch.close()
                self.adc_channels[index] = None
        for index, dac_ch in enumerate(self.dac_channels):
            if dac_ch is not None:
                dac_ch.close()
                self.dac_channels[index] = None
        if self._raw_buffer is not None:
            self.free_raw_buffer()
        if self._axidma_dev is not None:
            _axidma.lib.axidma_destroy(self._axidma_dev)
            self._axidma_dev = None
        if self._attenuator is not None:
            self._attenuator.close()
            self._attenuator = None

    @property
    def adc_clock(self):
        return ADCService.ADC_CLOCK[self._adc_type]

    @property
    def adc_resolution(self):
        return ADCService.ADC_BITS[self._adc_type]

    def allocate_channel(
            self, adc_chan, decimation_rate, lo_freq, dc_offset,
            buffer_size, chunk_size, double_buffer=True,
            prefill_buffers=True, sample_callback_fn=None):
        if self._axidma_dev is None:
            return None
        if self.adc_channels[adc_chan] is not None:
            raise InvalidChannelException(
                'Channel already allocated.'
            )
        if adc_chan < 0 or adc_chan > self._rx_chans.len:
            raise InvalidChannelException(
                f'No such channel id: {adc_chan}'
            )
        chan_id = self._rx_chans.data[adc_chan]
        if sample_callback_fn is not None:
            self.adc_channel_cb[adc_chan] = sample_callback_fn
        else:
            self.adc_channel_cb[adc_chan] = None
        ch = ADCChannel(
            self, adc_chan, chan_id, decimation_rate, lo_freq, dc_offset,
            buffer_size, chunk_size,
            cfg_device=self._cfg_device,
            sw_device=self._sw_devices[adc_chan],
            double_buffer=double_buffer,
            sample_callback_fn=self.adc_channel_cb[adc_chan]
        )
        ch_status = ch.initialize(self._axidma_dev, prefill_buffers)
        if ch_status is True:
            self.adc_channels[adc_chan] = ch
        return ch

    def allocate_dac_channel(self, dac_chan):
        if self.dac_channels[dac_chan] is not None:
            raise InvalidChannelException(
                'Channel already allocated.'
            )
        if dac_chan not in [0, 1]:
            raise InvalidChannelException(
                f'No such channel id: {adc_chan}'
            )
        ch = DACChannel(self, dac_chan, cfg_device=self._dac_cfg_device)
        ch_status = ch.initialize()
        if ch_status is True:
            self.dac_channels[dac_chan] = ch
        return ch

    def flush_channel(self, adc_chan):
        for index, ch in enumerate(self.adc_channels):
            if index == adc_chan and ch is not None:
                ch.flush_buffers()

    def free_channel(self, adc_chan):
        for index, ch in enumerate(self.adc_channels):
            if index == adc_chan and ch is not None:
                ch.close()
                self.adc_channels[index] = None

    def free_dac_channel(self, dac_chan):
        for index, ch in enumerate(self.dac_channels):
            if index == dac_chan and ch is not None:
                ch.close()
                self.dac_channels[index] = None

    @property
    def available_channels(self):
        return [i for i, ch in enumerate(self.adc_channels) if ch is None]

    @property
    def available_dac_channels(self):
        return [i for i, ch in enumerate(self.dac_channels) if ch is None]

    def get_next_chunk(self, adc_chan):
        try:
            ch = self.adc_channels[adc_chan]
            if ch is None:
                raise InvalidChannelException(
                    f'Channel {adc_chan} is uninitialized')
        except IndexError:
            raise InvalidChannelException(
                f'Channel {adc_chan} is an invalid channel id'
            )
        return ch.next_chunk()

    def channel_attenuation(self, adc_chan):
        return self._attenuator.attenuation(adc_chan)

    def set_channel_attenuation(self, adc_chan, att):
        self._attenuator.set_attenuation(adc_chan, att)
        self._attenuator.update_attenuation(adc_chan)

    def allocate_raw_buffer(self, buffer_size=64*1024, chunk_size=16*1024):
        if self._raw_buffer is None:
            raw_chan_id = self._rx_chans.data[ADCService.RAW_CHAN_ID]
            self._raw_buffer = ADCBuffer(self, None, self._axidma_dev,
                                         raw_chan_id, buffer_size,
                                         chunk_size, '<i2')
            if self._raw_buffer.allocate() is False:
                 return None
            self._raw_buffer.fill(wait=True)
        return self._raw_buffer

    def free_raw_buffer(self):
        if self._raw_buffer is not None:
            self._raw_buffer.close()
            self._raw_buffer = None


class RPyCServer():

    def __init__(self, serviceInst, host, port):
        super().__init__()
        self._serviceInst = serviceInst
        self._host = host
        self._port = port

    def run(self):
        print("Run ThreadedServer on {}:{}".format(self._host, self._port))
        self._server = ThreadedServer(
            self._serviceInst,
            hostname=self._host,
            port=self._port,
            protocol_config={
                'allow_all_attrs': True,
                'allow_setattr': True,
                'allow_pickle': True})
        self._server.start()

ADCAttenuator

ADC channel attenuators are controlled using the Zynq SPI and GPIO devices. Figure 1 shows the connections from the Redpitaya board to the attenuator modules. The ADCAttenuator class code makes use of the Python periphery package to control the attenuator devices.

Channel attenuators can be controlled via The ADCService using the set_channle_attenuation and channel_attenuation methods.

./static/DualDacAdc-board.svg

Figure 1: Base band ADC/DAC front end

from periphery import SPI, GPIO

class ADCAttenuator:

    MIN_ATTENUATION = 0.0
    MAX_ATTENUATION = 31.75

    GPIO_0 = 0
    GPIO_1 = 1
    SPI_SPEED = 1000000

    def __init__(self, spi_device, gpio_device):
        self.spi_device = spi_device
        self.gpio_device = gpio_device
        self._spi = SPI(self.spi_device, 0, ADCAttenuator.SPI_SPEED)
        self._le = [
            GPIO(self.gpio_device, ADCAttenuator.GPIO_0, "out"),
            GPIO(self.gpio_device, ADCAttenuator.GPIO_1, "out")]
        self._attenuation = [
            ADCAttenuator.MIN_ATTENUATION,
            ADCAttenuator.MIN_ATTENUATION]

    def close(self):
        if self._spi is not None:
            self._spi.close()
            self._spi = None
        for le in self._le:
            if le is not None:
                le.close()
        self._le = None

    def attenuation(self, ch):
        return self._attenuation[ch]

    def set_attenuation(self, ch, att):
        if att < ADCAttenuator.MIN_ATTENUATION:
            self._attenuation[ch] = MIN_ATTENUATION
        elif att > ADCAttenuator.MAX_ATTENUATION:
            self._attenuation[ch] = MAX_ATTENUATION
        else:
            self._attenuation[ch] = att

    def update_attenuation(self, ch):
        att = self._attenuation[ch]
        self._spi.transfer([int('{:08b}'.format(round(att*4))[::-1], 2) >> 1])
        self._le[ch].write(True)
        self._le[ch].write(False)

ADCChannel

The ADC channel down converter design (DDC) is summarized in Figure 2. The DDC core configuration registers are accessed via a Linux userspace I/O (UIO) device. The stream switch configuration is also accessed as a register via separate UIO devices. (More details of the programmable hardware can be found here: ADC DMA Zynq Programmable Logic.)

ADC DMA DDC design

Figure 2: ADC channel down converter design

Core and stream switch configuration registers are memory mapped from the associated UIO devices using ADCChannel.map_registers, map_cfg_registers, and map_sw_registers. Table 1 lists the associated UIO devices with Tables 2 and 3 listing the memory mapped register addresses.

Table 1: UIO core and stream switch configuration devices.

Configuration

Device

DDC cores

/sys/class/uio/uio0

Stream Sw. 1

/sys/class/uio/uio2

Stream Sw. 2

/sys/class/uio/uio3

Table 2: DDC core configuration registers (/sys/class/uio/uio0)

Register

Ch 1 Addr

Ch 2 Addr

LO Phase Incr.

0:4

8:12

Decimation Rate

4:6

12:14

DC Offset

6:8

14:16

Table 3: Stream switch configuration registers. (/sys/class/uio/uio2 for channel 1, /sys/class/uio/uio3 for channel 2.)

Register

Addr

Switch config.

0x40:0x44

Switch update

0:4

ADCChannel class parameters are set by the hardware in use and the programmable logic being used to implement the DDC. Table 4 summarises the relevant parameters.

Table 4: Hardware related channel parameters

Parameter

Value

Description

ADC_MIN_FREQUENCY

0.5 MHz

Determined by the ADC bandwidth and any anti-alias filtering used

ADC_MAX_FREQUENCY

55.0 MHz

MIN_DECIMATION_RATE

4

Determined by the hardware CIC filter configuration

MAX_DECIMATION_RATE

8192

LO_DDS_PHASE_WIDTH

30

Determined by the hardware DDS NCO Configuration

DMA_LATENCY

12

A characteristic of the Zynq DMA engine

DMA_DATA_FORMAT

'F'

This is np.complex64 (single precision complex)

from typing import (
    Optional, List, Dict
)
import os
from os import O_RDWR
from pathlib import Path
from mmap import mmap
import numpy as np
from scipy import signal

from data import SweepParameters
from buffer import ADCBuffer
from threadedbuffer import ADCThreadedBuffer


class ADCChannel(object):

    ADC_MIN_FREQUENCY: float = 0.5
    ADC_MAX_FREQUENCY: float = 55.0
    MIN_DECIMATION_RATE: int = 4
    MAX_DECIMATION_RATE: int = 8192
    MAX_DC_OFFSET: int = 1<<13
    MIN_DC_OFFSET: int = -(1<<13)
    LO_DDS_PHASE_WIDTH: int = 30
    DMA_LATENCY: int = 12

    # The DMA data format is set by the design of the programmable logic
    # in the down converter.
    DMA_DATA_FORMAT: str = 'F'

    _cfg_fd = None
    _cfg_mem = None

    @classmethod
    def map_registers(cls, uio_device: str):
        uio_path = Path(f'/sys/class/uio/{uio_device}')
        size_path = uio_path / 'maps/map0/size'
        dev_path = f'/dev/{uio_device}'
        with open(size_path) as size_fd:
            mem_len = int(size_fd.read(), 16)
        reg_fd = os.open(dev_path, O_RDWR)
        reg_mem = mmap(reg_fd, mem_len)
        return reg_fd, reg_mem

    @classmethod
    def map_cfg_registers(cls, cfg_uio_device: str):
        if cls._cfg_mem is None:
            cls._cfg_fd, cls._cfg_mem = cls.map_registers(cfg_uio_device)

    @classmethod
    def unmap_cfg_registers(cls):
        if cls._cfg_mem is not None:
            cls._cfg_mem.close()
            cls._cfg_mem = None
            os.close(cls._cfg_fd)

    def __init__(self,
                 adc,
                 adc_id: int,
                 chan_id: int,
                 decimation_rate: int = 1,
                 lo_freq: float = 0.0,
                 dc_offset: int = 0,
                 buffer_size: int = 1024 * 128,
                 chunk_size: int = 1024 * 8,
                 double_buffer = True,
                 sample_callback_fn = None,
                 cfg_device = 'uio0',
                 sw_device = 'uio2'):
        """Creates an instance of ``ADCChannel``.

        :param adc: a reference to the ADCService instance
        :type adc: ADCService
        :param adc_id: the ADC channel (one of ``[0, 1]``)
        :type adc_id: int
        :param chan_id: the DMA channel id as returned by ``axidma_get_dma_rx``
        :type chan_id: int
        :param decimation_rate: the ADC channel decimation rate
        :type decimation_rate: int
        :param lo_freq: the ADC channel LO frequency setting (in MHz)
        :type lo_freq: float
        :param buffer_size: the DMA buffer size in units of complex64
        :type buffer_size: int
        :param chunk_size: data transfer chunk size in units of complex64
        :type chunk_size: int
        """
        self._adc = adc
        self._lo_dds_clock = adc.adc_clock
        self._max_lo_freq = adc.adc_clock / 2
        self._adc_id: int = adc_id
        self._chan_id: int = chan_id
        self._cfg_device = cfg_device
        self._sw_device = sw_device
        self._sw_fd = None
        self._sw_mem = None
        self.map_sw_registers()
        ADCChannel.map_cfg_registers(self._cfg_device)
        self._decimation_rate = 0
        self.decimation_rate = decimation_rate
        self._lo_freq = 0
        self.lo_freq = lo_freq
        self._dc_offset = 0
        self.dc_offset = dc_offset
        self._buffer_size = buffer_size
        self._chunk_size = chunk_size
        self._dma_dev = None
        self._dtype = ADCChannel.DMA_DATA_FORMAT
        self._output_buffer = None
        self._output_buffer2 = None
        self._current_buffer = None
        self._double_buffer = double_buffer
        self._sample_cb_fn = sample_callback_fn

    def map_sw_registers(self) -> None:
        if self._sw_mem is None:
            self._sw_fd, self._sw_mem = ADCChannel.map_registers(self._sw_device)

    def unmap_sw_registers(self) -> None:
        if self._sw_mem is not None:
            self._sw_mem.close()
            self._sw_mem = None
            os.close(self._sw_fd)

    def initialize(self, axidma_dev, prefill_buffers=True):
        self._dma_dev = axidma_dev

        self._output_buffer = ADCThreadedBuffer(
            self._adc,
            self,
            self._dma_dev,
            self._chan_id,
            self.buffer_size,
            self.chunk_size,
            self._dtype)
        self._output_buffer.initialize(prefill_buffers)

        if self._double_buffer:
            self._output_buffer2 = ADCThreadedBuffer(
                self._adc,
                self,
                self._dma_dev,
                self._chan_id,
                self.buffer_size,
                self.chunk_size,
                self._dtype)
            self._output_buffer2.initialize(prefill_buffers)

        self._current_buffer = self._output_buffer
        return True

    def resize(self, new_size: int) -> None:
        if self.buffer_size == new_size:
            return
        if self.chunk_size > new_size:
            self.chunk_size = new_size
        self._buffer_size = new_size
        self._output_buffer.resize(new_size)
        if self._double_buffer:
            self._output_buffer2.resize(new_size)
        self._current_buffer = self._output_buffer

    @property
    def double_buffer(self) -> bool:
        return self._double_buffer

    @double_buffer.setter
    def double_buffer(self, double: bool) -> None:
        if double == self._double_buffer:
            return
        self._double_buffer = double
        if double:
            self._output_buffer2 = ADCThreadedBuffer(
                self._adc,
                self,
                self._dma_dev,
                self._chan_id,
                self.buffer_size,
                self.chunk_size,
                self._dtype)
            self._output_buffer2.initialize(prefill=False)
        else:
            if self._output_buffer2 is not None:
                self._output_buffer2.close()
                self._output_buffer2 = None

    def sweep(self,
              lo_list: List[float],
              rate: int,
              sample_len: int) -> Dict:
        """Collect samples over a range of LO frequencies.

        :param lo_list: List of LO frequencies to sweep over
        :type lo_list: List[float]
        :param rate: The decimation rate
        :type rate: int
        :param sample_len: The sample size to use
        :type sample_len: int

        """
        self.decimation_rate = rate
        self.double_buffer = False
        # Allow an extra DMA_LATENCY samples at the beginning of the sample.
        # These will be thrown away in the returned sample arrays.
        self.resize(sample_len + ADCChannel.DMA_LATENCY)
        self.chunk_size = sample_len + ADCChannel.DMA_LATENCY
        samples = {}
        for i, lo in enumerate(lo_list):
            self.lo_freq = lo
            self.flush_buffers()
            samples[i] = np.array(self.next_chunk())[ADCChannel.DMA_LATENCY:]
        return samples

    def _configure_fft_sweep(self,
                             start_freq: float,
                             stop_freq: float,
                             scaling: str='spectrum',
                             rbw: Optional[float]=None,
                             reqd_decimation_rate: Optional[int]=None):
        if start_freq < ADCChannel.ADC_MIN_FREQUENCY:
            start_freq = ADCChannel.ADC_MIN_FREQUENCY
        if stop_freq > ADCChannel.ADC_MAX_FREQUENCY:
            stop_freq = ADCChannel.ADC_MAX_FREQUENCY
        params = SweepParameters(start_freq, stop_freq,
                                 rbw, reqd_decimation_rate)
        lo_list = np.arange(params.f_start, params.f_stop, params.sample_span)
        return (params, lo_list)

    def fft_sweep(self,
                  start_freq: float,
                  stop_freq: float,
                  rbw: float,
                  adc_conv_factor: float) -> None:
        sweep_params, lo_list = self._configure_fft_sweep(
            float(start_freq), float(stop_freq), rbw=float(rbw))
        self.double_buffer = False
        self.decimation_rate = sweep_params.decimation_rate
        sample_len = sweep_params.nperseg + sweep_params.lo_offset_bins
        self.resize(sample_len + ADCChannel.DMA_LATENCY)
        self.chunk_size = sample_len + ADCChannel.DMA_LATENCY
        if sweep_params.fft_bins >= sweep_params.sample_bins:
            sample_sizes = [
                sweep_params.sample_bins for i in range(sweep_params.lo_count)]
            sample_sizes[-1] = sweep_params.fft_bins % sweep_params.sample_bins
        else:
            sample_sizes = [sweep_params.fft_bins+2]

        freq = np.array([])
        pwr = np.array([])
        for i, lo in enumerate(lo_list):
            self.lo_freq = lo
            self.flush_buffers()
            sample = np.array(self.next_chunk())[ADCChannel.DMA_LATENCY:]
            scaled_data = (adc_conv_factor*sample.real +
                           (1j)*adc_conv_factor*sample.imag)

            f, Pxx_den = signal.welch(
                scaled_data,
                sweep_params.fs,
                detrend='linear',
                noverlap=sweep_params.nperseg*0.8,
                average='median',
                window=sweep_params.window,
                return_onesided=False,
                scaling='spectrum')
            freq = lo + f[:sample_sizes[i]] / 1e6
            offset_bins = sweep_params.lo_offset_bins
            pwr = Pxx_den[offset_bins:(sample_sizes[i] + offset_bins)]
            if self._sample_cb_fn is not None:
                self._sample_cb_fn(freq, pwr, i, len(lo_list))

    def next_chunk(self):
        if not self._current_buffer.data_available():
            if self._double_buffer:
                if self._current_buffer is self._output_buffer:
                    other_buffer = self._output_buffer2
                else:
                    other_buffer = self._output_buffer
                if not other_buffer.data_available():
                    # The other buffer is busy being filled.
                    # Wait for data to be available in that buffer
                    # before switching to it.
                    if other_buffer.status == ADCThreadedBuffer.EMPTY:
                        other_buffer.fill()
                    other_buffer._data_available.wait()
                self._current_buffer.fill()
                if self._current_buffer is self._output_buffer:
                    self._current_buffer = self._output_buffer2
                else:
                    self._current_buffer = self._output_buffer
            else:
                self._current_buffer.fill(wait=True)
        return self._current_buffer.next_chunk()

    def next_chunk_timed(self):
        if not self._current_buffer.data_available():
            data = self.next_chunk()
            return (self.current_buffer.buffer_start_time,
                    0.0,
                    self.sample_period,
                    data)
        else:
            return (self.current_buffer.buffer_start_time,
                    self.current_buffer.chunk_start_time,
                    self.sample_period,
                    self.next_chunk())

    def flush_buffers(self):
        #print('ADCChannel.flush_buffers')
        self._output_buffer.fill(wait=True)
        if self._double_buffer:
            self._output_buffer2.fill(wait=True)
        self._current_buffer = self._output_buffer

    @property
    def chan_id(self):
        return self._chan_id

    @property
    def _phase_incr(self):
        return int((self._lo_freq * (1 << ADCChannel.LO_DDS_PHASE_WIDTH))
                   / self._lo_dds_clock)

    @property
    def current_buffer(self):
        return self._current_buffer

    @property
    def buffer_size(self):
        """The current size of the sample buffer.
        """
        return self._buffer_size

    @property
    def chunk_size(self):
        """The size of the sample that is returned by :py:meth:`next_chunk`
        """
        return self._chunk_size

    @chunk_size.setter
    def chunk_size(self, size):
        if size == self._chunk_size:
            return
        if size <= self.buffer_size:
            self._chunk_size = size
            self._output_buffer.chunk_size = size
            if self._double_buffer:
                self._output_buffer2.chunk_size = size

    @property
    def sample_rate(self):
        return (self._adc.adc_clock/self.decimation_rate) * 1e6

    @property
    def sample_period(self):
        return 1.0/self.sample_rate

    @property
    def decimation_rate(self):
        return self._decimation_rate

    @decimation_rate.setter
    def decimation_rate(self, rate):
        if rate == self._decimation_rate:
            return
        if rate < 0:
            rate *= -1
        self._decimation_rate = rate
        if rate > ADCChannel.MAX_DECIMATION_RATE:
            self._decimation_rate = ADCChannel.MAX_DECIMATION_RATE

        if self.decimation_rate in [1, 2, 3]:
            # Set direct sampling
            chan = 0x0
            update = 0x2
            self._sw_mem[0x40:0x44] = chan.to_bytes(4, 'little')
            self._sw_mem[0x0:0x4] = update.to_bytes(4, 'little')
        else:
            # Set decimated sampling
            chan = 0x1
            update = 0x2
            self._sw_mem[0x40:0x44] = chan.to_bytes(4, 'little')
            self._sw_mem[0x0:0x4] = update.to_bytes(4, 'little')
            if self._adc_id == 0:
                ADCChannel._cfg_mem[4:6] = self.decimation_rate.to_bytes(
                    2, 'little')
            else:
                ADCChannel._cfg_mem[12:14] = self.decimation_rate.to_bytes(
                    2, 'little')

    @property
    def lo_freq(self):
        """The current LO frequency in MHz.
        """
        return self._lo_freq

    @lo_freq.setter
    def lo_freq(self, f):
        if f == self._lo_freq:
            return
        if f < 0.0:
            f *= -1.0
        self._lo_freq = f
        if f > self._max_lo_freq:
            self._lo_freq = self._max_lo_freq
        if self._adc_id == 0:
            ADCChannel._cfg_mem[0:4] = self._phase_incr.to_bytes(4, 'little')
        else:
            ADCChannel._cfg_mem[8:12] = self._phase_incr.to_bytes(4, 'little')

    @property
    def dc_offset(self):
        """The current DC offset in ADC units.
        """
        return self._dc_offset

    @dc_offset.setter
    def dc_offset(self, offset):
        if offset == self._dc_offset:
            return
        self._dc_offset = offset
        if offset > ADCChannel.MAX_DC_OFFSET:
            self._dc_offset = ADCChannel.MAX_DC_OFFSET
        if offset < ADCChannel.MIN_DC_OFFSET:
            self._dc_offset = ADCChannel.MIN_DC_OFFSET
        offset_bytes = self._dc_offset.to_bytes(2, 'little', signed=True)
        if self._adc_id == 0:
            ADCChannel._cfg_mem[6:8] = offset_bytes
        else:
            ADCChannel._cfg_mem[14:16] = offset_bytes

    def close(self):
        ADCChannel.unmap_cfg_registers()
        self.unmap_sw_registers()
        if self._output_buffer is not None:
            self._output_buffer.close()
            self._output_buffer = None
        if self._output_buffer2 is not None:
            self._output_buffer2.close()
            self._output_buffer2 = None

DACChannel

import os
from os import O_RDWR
from pathlib import Path
from mmap import mmap
import numpy as np


class DACChannel:

    DEFAULT_FREQ = 10.0
    MIN_AMPL = 0
    MAX_AMPL = (1 << 15) - 1
    DEFAULT_AMPL = 8192

    LO_DDS_PHASE_WIDTH = 30

    _reg_fd = None
    _reg_mem = None

    @classmethod
    def map_registers(cls, uio_device):
        if cls._reg_mem is None:
            uio_path = Path(f'/sys/class/uio/{uio_device}')
            size_path = uio_path / 'maps/map0/size'
            dev_path = f'/dev/{uio_device}'
            with open(size_path) as size_fd:
                mem_len = int(size_fd.read(), 16)
            cls._reg_fd = os.open(dev_path, O_RDWR)
            cls._reg_mem = mmap(cls._reg_fd, mem_len)
        return cls._reg_mem

    @classmethod
    def unmap_registers(cls):
        if cls._reg_mem is not None:
            cls._reg_mem.close()
            cls._reg_mem = None
            os.close(cls._reg_fd)

    def __init__(self,
                 srv,
                 dac_id: int,
                 cfg_device = 'uio1') -> None:
        self._srv = srv
        self._dds_clock = srv.adc_clock
        self._dac_id: int = dac_id
        self._cfg_device = cfg_device
        self._min_freq = 0.0
        self._max_freq = self._dds_clock / 2
        self._freq = DACChannel.DEFAULT_FREQ
        self._ampl = DACChannel.DEFAULT_AMPL

    @property
    def _phase_incr(self):
        return int((self._freq * (1 << DACChannel.LO_DDS_PHASE_WIDTH))
                   / self._dds_clock)

    def initialize(self) -> bool:
        return True

    @property
    def freq(self) -> float:
        """The current output frequency (in MHz)."""
        return self._freq

    @freq.setter
    def freq(self, f: float) -> None:
        if f < self._min_freq:
            f = self._min_freq
        elif f > self._max_freq:
            f = self._max_freq
        self._freq = f
        DACChannel.map_registers(self._cfg_device)
        if self._dac_id == 0:
            DACChannel._reg_mem[0:4] = self._phase_incr.to_bytes(4, 'little')
        else:
            DACChannel._reg_mem[6:10] = self._phase_incr.to_bytes(4, 'little')
        DACChannel.unmap_registers()

    @property
    def ampl(self) -> int:
        return self._ampl

    @ampl.setter
    def ampl(self, a: int) -> None:
        if a < DACChannel.MIN_AMPL:
            a = DACChannel.MIN_AMPL
        elif a > DACChannel.MAX_AMPL:
            a = DACChannel.MAX_AMPL
        self._ampl = a
        DACChannel.map_registers(self._cfg_device)
        if self._dac_id == 0:
            DACChannel._reg_mem[4:6] = self._ampl.to_bytes(2, 'little')
        else:
            DACChannel._reg_mem[10:12] = self._ampl.to_bytes(2, 'little')
        DACChannel.unmap_registers()

    def close(self):
        DACChannel.unmap_registers()

ADCBuffer

import time
import numpy as np

import _axidma
from _axidma import ffi


@ffi.def_extern()
def axidma_callback(ch, data):
    adc_buffer = ffi.from_handle(data)
    adc_buffer._async_transfer_complete()


class ADCBuffer(object):

    EMPTY = 0
    NON_EMPTY = 1
    BUSY = 2
    ERROR = 3

    def __init__(self, srv, chan, dma_dev, dma_chan_id,
                 size, chunk_size, dtype='F'):
        self._srv = srv
        self._chan = chan
        self._dma_dev = dma_dev
        self._dma_chan_id = dma_chan_id
        self._size = size
        self._chunk_size = chunk_size
        self._output_buffer = None
        self._iq_buffer = None
        self._chunk_offset = 0
        self._dtype = np.dtype(dtype)
        self._status = ADCBuffer.EMPTY
        self._start_time = 0
        self._error_code = 0

    @property
    def chunk_size(self):
        return self._chunk_size

    @chunk_size.setter
    def chunk_size(self, size):
        if size <= self._size:
            self._chunk_size = size

    @property
    def sample_period(self):
        if self._chan is not None:
            return self._chan.sample_period
        else:
            return 1.0 / self._srv.adc_clock

    def allocate(self):
        self._output_buffer = _axidma.lib.axidma_malloc(
            self._dma_dev, self._size * self._dtype.itemsize)
        return True if self._output_buffer is not None else False

    def resize(self, new_size):
        _axidma.lib.axidma_free(self._dma_dev,
                                self._output_buffer,
                                self._size * self._dtype.itemsize)
        self._size = new_size
        return self.allocate()

    def fill(self, wait=False):
        if self.status == ADCBuffer.BUSY:
            # Waiting for a previous fill request to complete
            return True
        if wait is True:
            rc = _axidma.lib.axidma_oneway_transfer(
                self._dma_dev, self._dma_chan_id,
                self._output_buffer,
                self._size * self._dtype.itemsize, True)
            if rc < 0:
                self.status = ADCBuffer.ERROR
                self._error_code = rc
                print(f'Sync buffer fill error: {rc=}')
                return False
            self._start_time = time.clock_gettime_ns(time.CLOCK_MONOTONIC_RAW)
            self._iq_buffer = np.frombuffer(
                ffi.buffer(self._output_buffer, self._size * self._dtype.itemsize),
                dtype=self._dtype)
            self._chunk_offset = 0
            self.status = ADCBuffer.NON_EMPTY
        else:
            self.status = ADCBuffer.BUSY
            self._cb_data = ffi.new_handle(self)
            _axidma.lib.axidma_set_callback(
                self._dma_dev, self._dma_chan_id,
                _axidma.lib.axidma_callback,
                self._cb_data)
            rc = _axidma.lib.axidma_oneway_transfer(
                self._dma_dev, self._dma_chan_id,
                self._output_buffer, self._size * self._dtype.itemsize, False)
            #print(f'fill: async buffer fill request, {rc=}')
            if rc < 0:
                _axidma.lib.axidma_set_callback(
                    self._dma_dev, self._dma_chan_id, None, None)
                self.status = ADCBuffer.ERROR
                self._error_code = rc
                print(f'Async buffer fill error: {rc=}')
                return False

        return True

    def _async_transfer_complete(self):
        self._start_time = time.clock_gettime_ns(time.CLOCK_MONOTONIC_RAW)
        self._iq_buffer = np.frombuffer(
            ffi.buffer(self._output_buffer, self._size * self._dtype.itemsize),
            dtype=self._dtype)
        _axidma.lib.axidma_set_callback(
            self._dma_dev, self._dma_chan_id, ffi.NULL, ffi.NULL)
        self._chunk_offset = 0
        self.status = ADCBuffer.NON_EMPTY
        #print('_async_transfer_complete')

    def get_next_chunk(self):
        if self.status == ADCBuffer.EMPTY or self.status == ADCBuffer.BUSY:
            return None
        if self._chunk_offset + self._chunk_size > self._size:
            # potential buffer overrun.
            self.status = ADCBuffer.EMPTY
            return None
        elif self._chunk_offset + self._chunk_size == self._size:
            # this is the last chunk for this buffer
            # set status to EMPTY and return last chunk
            self.status = ADCBuffer.EMPTY
            return self._iq_buffer[self._chunk_offset:
                                   self._chunk_offset + self._chunk_size]
        else:
            self._chunk_offset += self._chunk_size
            return self._iq_buffer[self._chunk_offset - self._chunk_size:
                                   self._chunk_offset]

    @property
    def buffer_start_time(self):
        """Return the absolute buffer start time (in ns).
        """
        return self._start_time

    @property
    def chunk_start_time(self):
        """Return the chunk start time relative to the buffer start (in ns).
        """
        return self._chunk_offset * self.sample_period

    @property
    def output_buffer(self):
        return self._output_buffer

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, s):
        self._status = s

    def close(self):
        if self.status == ADCBuffer.BUSY:
            _axidma.lib.axidma_set_callback(
                self._dma_dev, self._dma_chan_id, ffi.NULL, ffi.NULL)
        if self._output_buffer is not None:
            _axidma.lib.axidma_free(self._dma_dev,
                                    self._output_buffer,
                                    self._size * self._dtype.itemsize)
            self._output_buffer = None

ADCThreadedBuffer

import time
from threading import (
    Thread, Event, Lock
)
import numpy as np

import _axidma
from _axidma import ffi


class ADCThreadedBuffer(object):

    EMPTY = 0
    NON_EMPTY = 1
    BUSY = 2
    ERROR = 3

    def __init__(self, srv, chan, dma_dev, dma_chan_id,
                 size, chunk_size, dtype='F'):
        self._srv = srv
        self._chan = chan
        self._dma_dev = dma_dev
        self._dma_chan_id = dma_chan_id
        self._size = size
        self._chunk_size = chunk_size
        self._output_buffer = None
        self._iq_buffer = None
        self._chunk_offset = 0
        self._dtype = np.dtype(dtype)
        self._error_code = 0
        self._start_time = 0
        self._sample_period = 0.0
        self._fill_thread = None
        self._status_lock = Lock()
        self._status = ADCThreadedBuffer.EMPTY
        self._data_available = Event()

    def initialize(self, prefill=True):
        self._output_buffer = _axidma.lib.axidma_malloc(
            self._dma_dev, self._size * self._dtype.itemsize)
        if prefill is True:
            self.fill(wait=True)
            self._chunk_offset = 0
        else:
            # No data available yet
            self._data_available.clear()
            # The following indicates that the buffer needs to be filled
            self._chunk_offset = self._size + self._chunk_size

    def close(self):
        if self._output_buffer is not None:
            _axidma.lib.axidma_free(self._dma_dev,
                                    self._output_buffer,
                                    self._size * self._dtype.itemsize)
            self._output_buffer = None

    @property
    def chunk_size(self) -> int:
        return self._chunk_size

    @chunk_size.setter
    def chunk_size(self, new_size: int) -> None:
        self._chunk_size = new_size

    @property
    def sample_period(self):
        if self._chan is not None:
            return self._chan.sample_period
        else:
            return 1.0 / self._srv.adc_clock

    def resize(self, new_size):
        self._data_available.clear()
        _axidma.lib.axidma_free(self._dma_dev,
                                self._output_buffer,
                                self._size * self._dtype.itemsize)
        self._size = new_size
        with self._status_lock:
            self._status = ADCThreadedBuffer.EMPTY
        self._output_buffer = _axidma.lib.axidma_malloc(
            self._dma_dev, self._size * self._dtype.itemsize)

    def next_chunk(self):
        if self.data_available():
            self._chunk_offset += self._chunk_size
            if self.is_empty():
                self._data_available.clear()
                with self._status_lock:
                    self._status = ADCThreadedBuffer.EMPTY
            return self._iq_buffer[self._chunk_offset - self._chunk_size:
                                   self._chunk_offset]
        else:
            self._data_available.wait()
            return self._iq_buffer[self._chunk_offset:
                                   self._chunk_offset + self._chunk_size]

    def data_available(self):
        return self._data_available.isSet()

    def is_empty(self):
        return (self._chunk_offset + self._chunk_size > self._size)

    def fill(self, wait=False):
        with self._status_lock:
            self._status = ADCThreadedBuffer.BUSY
        self._fill_thread = Thread(target=self._fill_task)
        self._fill_thread.start()
        if wait is True:
            #print('  waiting on _fill_task')
            self._data_available.wait()

    def _fill_task(self):
        rc = _axidma.lib.axidma_oneway_transfer(
            self._dma_dev, self._dma_chan_id,
            self._output_buffer,
            self._size * self._dtype.itemsize, True)
        if rc < 0:
            self._error_code = rc
            print(f'{self=}: Sync buffer fill error: {rc=}')
            return
        self._start_time = time.clock_gettime_ns(time.CLOCK_MONOTONIC_RAW)
        self._iq_buffer = np.frombuffer(
            ffi.buffer(self._output_buffer, self._size * self._dtype.itemsize),
            dtype=self._dtype)
        self._chunk_offset = 0
        with self._status_lock:
            self._status = ADCThreadedBuffer.NON_EMPTY
        self._data_available.set()

    @property
    def status(self):
        with self._status_lock:
            status = self._status
        return status

    @property
    def buffer_start_time(self):
        """Return the absolute buffer start time (in ns).
        """
        return self._start_time

    @property
    def data_start_time(self):
        """Return the absolute buffer start time (in ns).
        """
        return self._start_time

    @property
    def chunk_start_time(self):
        """Return the chunk start time relative to the buffer start (in ns).
        """
        return self._chunk_offset * self.sample_period

Imports

import os
from os import O_RDWR
import time
from datetime import datetime
from pathlib import Path
from mmap import mmap
from argparse import ArgumentParser
import numpy as np

import rpyc
from rpyc.utils.server import ThreadedServer

import _axidma
from _axidma import ffi

Testing

ADC service test script

import sys
import signal
from argparse import ArgumentParser
import numpy as np
import rpyc

class InvalidChannelException(Exception):
    """Raised when an invalid channel access is attempted.
    """
    pass


class TestApp:

    DEFAULT_DECIMATION_RATE=1
    DEFAULT_LO_FREQ=0.0
    DEFAULT_DC_OFFSET=0
    DEFAULT_SAMPLE_LEN=8*1024
    DEFAULT_ATTENUATION=0.0
    MAX_ATTENUATION=31.75

    def __init__(self, adc_ip: str, adc_port: int):
        self._adc_ip: str = adc_ip
        self._adc_port: int = adc_port
        self.adcdma_service = None
        self._chan_id: int = None
        self._adc_chan = None

    def initialize_adcdma_service(self) -> bool:
        try:
            self.adcdma_service = rpyc.connect(self._adc_ip, self._adc_port)
        except ConnectionError as ce:
            print(ce)
            print('Please check to ensure that the Red Pitaya board is')
            print('powered on and connected to the network.')
            return False
        status = self.adcdma_service.root.initialize()
        return status

    def alloc_adc_chan(self, chan_id: int,
                       rate: int=DEFAULT_DECIMATION_RATE,
                       lo_freq: float=DEFAULT_LO_FREQ,
                       dc_offset: int=DEFAULT_DC_OFFSET,
                       sample_len: int=DEFAULT_SAMPLE_LEN) -> bool:
        if self._chan_id is None:
            try:
                self._adc_chan = self.adcdma_service.root.allocate_channel(
                    chan_id,
                    rate, lo_freq, dc_offset,
                    sample_len, sample_len, 'F',
                    double_buffer=True, prefill_buffers=False)
            except InvalidChannelException as ice:
                print(ice)
                return False
            else:
                self._chan_id = chan_id
                return True
        else:
            print(f"Channel '{self.chan_id}' is currently allocated.")
            print("Deallocate the current channel before allocating another.")
            return False

    @property
    def chan_attenuation(self):
        return self.adcdma_service.root.channel_attenuation(self._chan_id)

    @chan_attenuation.setter
    def chan_attenuation(self, atten):
        self.adcdma_service.root.set_channel_attenuation(self._chan_id, atten)

    @property
    def adcchan(self):
        """The ADCChannel instance for the currently allocated ADC channel.
        """
        return self._adc_chan

    def free_adc_chan(self) -> None:
        if self._chan_id is not None:
            self.adcdma_service.root.free_channel(self._chan_id)
            self._chan_id = None


def main():
    default_adc_svc_host = "192.168.0.155"
    default_adc_svc_port = 18900
    default_rate = TestApp.DEFAULT_DECIMATION_RATE
    default_lo_freq = TestApp.DEFAULT_LO_FREQ
    default_dcoffset = TestApp.DEFAULT_DC_OFFSET
    default_samplelen = TestApp.DEFAULT_SAMPLE_LEN
    default_attenuation = TestApp.DEFAULT_ATTENUATION

    # This ensures that Cntl-C will work as expected:
    signal.signal(signal.SIGINT, signal.SIG_DFL)

    parser = ArgumentParser(description="Red Pitaya test application")

    parser.add_argument("-a", "--adchost", default=default_adc_svc_host,
                        help="IP address for the ADC DMA server instance")
    parser.add_argument("-p", "--adcport", default=default_adc_svc_port,
                        type=int,
                        help="TCP port for the ADC DMA server instance")
    parser.add_argument("-c", "--chanid", type=int, required=True,
                        help="ADC channel to use")
    parser.add_argument("-r", "--rate", type=int, default=default_rate,
                        help=f"Set ADC decimation rate. Default: {default_rate}")
    parser.add_argument("-f", "--lofreq", type=float, default=default_lo_freq,
                        help=f"Set ADC LO frequency (in MHz). Default: {default_lo_freq}")
    parser.add_argument("-o", "--dcoffset", type=int, default=default_dcoffset,
                        help=f"Set ADC channel DC offset. Default: {default_dcoffset}")
    parser.add_argument("-l", "--samplelen", type=int, default=default_samplelen,
                        help=f"Length of ADC sample. Default: {default_samplelen}")
    parser.add_argument("-z", "--summarize", action="store_true",
                        help="Print a summary of the captured data")
    parser.add_argument("-S", "--savefile",
                        help="File to save captured ADC samples")
    parser.add_argument("-A", "--attenuation", type=float, default=default_attenuation,
                        help=f"Set channel attenuation, 0.0 to 31.75 dB. Default: {default_attenuation}")

    args = parser.parse_args()

    app = TestApp(args.adchost, args.adcport)
    if app.initialize_adcdma_service() is False:
        print(f"Failed to initialize ADC service at {args.adchost}:{args.adcport}")
        sys.exit(-1)
    if app.alloc_adc_chan(args.chanid,
                          args.rate,
                          args.lofreq,
                          args.dcoffset,
                          args.samplelen) is False:
        print(f"Failed to allocate ADC channel (chan. id: {args.chan_id})")
        sys.exit(-2)
    atten = args.attenuation
    if atten > TestApp.MAX_ATTENUATION:
        atten = TestApp.MAX_ATTENUATION
    app.chan_attenuation = atten

    data = np.array(app.adcchan.next_chunk())
    if args.savefile is None:
        if args.summarize:
            np.set_printoptions(precision=1, suppress=True, edgeitems=20)
        else:
            np.set_printoptions(precision=1, suppress=True, threshold=sys.maxsize)
        print(data)
    else:
        with open(args.savefile, 'wb') as fd:
            np.save(fd, data)

    app.free_adc_chan()


if __name__ == '__main__':
    main()

Testing channel code

The following test script accesses the ADC DMA hardware directly through the _axidma module. In order not to conflict with the rp_adcdma.service the script should be run after that service has been stopped:

sudo systemctl stop rp_adcdma.service

An example script invocation:

$ cd /opt/lib/python/adcdma
$ python testing.py -R 1000.0
sweep_params.sample_span=1.7554285714285713
sweep_params.nperseg=26154
sweep_params.bin_width=223.7297763794667
sweep_params.lo_offset_bins=5
sweep_params.decimation_rate=21
sweep_params.lo_offset=0.0011186488818973335
sweep_params.sample_bins=7846
sweep_params.fft_bins=8939
sample_sizes=[7846, 1093]
lo_list=array([ 9.        , 10.75542857])
acquisition took 0.0301 seconds
fft took 0.3203 seconds
fft_sweep took 0.3568 seconds
sweep took 0.0219 seconds
import sys
from time import time
from argparse import ArgumentParser
import numpy as np
from scipy import signal
import _axidma

from data import (
    SweepParameters
)
from adccal import (
    AdcCalibration
)

from channel import (
    ADCChannel
)
from attenuator import (
    ADCAttenuator
)


RP_ADC_MIN_FREQUENCY: float = 0.5
RP_ADC_MAX_FREQUENCY: float = 55.0
RP_ADC_MIN_SPAN: float = 0.0005
RP_ADC_MAX_SPAN: float = RP_ADC_MAX_FREQUENCY-RP_ADC_MIN_FREQUENCY


class ADCTestService:

    ADC_CLOCK = {'122-16': 122.88,
                 '125-14': 125.0}
    ADC_BITS = {'122-16': 16,
                '125-14': 14}
    DAC_BITS = {'122-16': 14,
                '125-14': 14}

    def __init__(self):
        self._adc_type = None

    def initialize(self, adc_type='122-16'):
        self._adc_type = adc_type

    @property
    def adc_clock(self):
        return ADCTestService.ADC_CLOCK[self._adc_type]


class ADCTestChannel(ADCChannel):

    def __init__(self,
                 adc,
                 adc_id: int,
                 chan_id: int,
                 decimation_rate: int = 1,
                 lo_freq: float = 0.0,
                 dc_offset: int = 0,
                 buffer_size: int = 1024 * 128,
                 chunk_size: int = 1024 * 8,
                 cfg_device = 'uio0',
                 sw_device = 'uio1',
                 double_buffer = True):
        super().__init__(adc, adc_id, chan_id, decimation_rate, lo_freq,
                         dc_offset, buffer_size, chunk_size, double_buffer,
                         cfg_device, sw_device)

    def fft_sweep(self, lo_list, sweep_params, adc_conv_factor,
                  sample_cb_fn=None):
        self.double_buffer = False
        self.decimation_rate = sweep_params.decimation_rate
        sample_len = sweep_params.nperseg + sweep_params.lo_offset_bins
        self.resize(sample_len + ADCChannel.DMA_LATENCY)
        self.chunk_size = sample_len + ADCChannel.DMA_LATENCY
        if sweep_params.fft_bins >= sweep_params.sample_bins:
            sample_sizes = [
                sweep_params.sample_bins for i in range(sweep_params.lo_count)]
            sample_sizes[-1] = sweep_params.fft_bins % sweep_params.sample_bins
        else:
            sample_sizes = [sweep_params.fft_bins+2]

        freq = np.array([])
        pwr = np.array([])
        acquisition_time = 0.0
        fft_time = 0.0
        for i, lo in enumerate(lo_list):
            self.lo_freq = lo
            start = time()
            self.flush_buffers()
            sample = np.array(self.next_chunk())[ADCChannel.DMA_LATENCY:]
            scaled_data = (adc_conv_factor*sample.real +
                           (1j)*adc_conv_factor*sample.imag)
            acquisition_time += time() - start

            start = time()
            f, Pxx_den = signal.welch(
                scaled_data,
                sweep_params.fs,
                detrend='linear',
                noverlap=sweep_params.nperseg*0.8,
                average='median',
                window=sweep_params.window,
                return_onesided=False,
                scaling='spectrum')
            freq = lo + f[:sample_sizes[i]] / 1e6
            offset_bins = sweep_params.lo_offset_bins
            pwr = Pxx_den[offset_bins:(sample_sizes[i] + offset_bins)]
            if sample_cb_fn is not None:
                sample_cb_fn(freq, pwr)
            fft_time += time() - start

        print(f'acquisition took {acquisition_time:.4f} seconds')
        print(f'fft took {fft_time:.4f} seconds')


if __name__ == '__main__':

    adc_type = '122-16'
    cfg_device = 'uio0'
    sw_device = 'uio2'
    sw_device_2 = 'uio3'
    spi_device = 'spidev1.0'
    gpio_device = 'gpiochip1'

    adc_start_freq = 9.0
    adc_stop_freq = 11.0
    rbw = 10000.0
    adc_chan = 0
    decimation_rate = 1
    lo_freq = 0.0
    dc_offset = 0

    parser = ArgumentParser(
        description='Testing additions to the adcdma code.'
    )

    parser.add_argument(
        "-S", "--start_freq", default=adc_start_freq, type=float,
        help=f"ADC sweep start freq. in MHz. (Default={adc_start_freq})"
    )
    parser.add_argument(
        "-E", "--end_freq", default=adc_stop_freq, type=float,
        help=f"ADC sweep end freq. in MHz. (Default={adc_stop_freq})"
    )
    parser.add_argument(
        "-R", "--rbw", default=rbw, type=float,
        help=f"Sweep resolution BW in Hz. (Default={rbw})"
    )
    args = parser.parse_args()

    rbw = args.rbw / 1e6
    adc_start_freq = args.start_freq
    adc_stop_freq = args.end_freq

    if adc_start_freq < RP_ADC_MIN_FREQUENCY:
        adc_start_freq = RP_ADC_MIN_FREQUENCY
    if adc_stop_freq > RP_ADC_MAX_FREQUENCY:
        adc_stop_freq = RP_ADC_MAX_FREQUENCY

    sweep_params = SweepParameters(adc_start_freq, adc_stop_freq, rbw)
    lo_list = np.arange(
        sweep_params.f_start, sweep_params.f_stop, sweep_params.sample_span)
    if sweep_params.fft_bins >= sweep_params.sample_bins:
        sample_sizes = [
            sweep_params.sample_bins for i in range(sweep_params.lo_count)]
        sample_sizes[-1] = sweep_params.fft_bins % sweep_params.sample_bins
    else:
        sample_sizes = [sweep_params.fft_bins+2]
    adc_conv_factor = 1.0 / AdcCalibration.adc_to_volts(adc_chan)

    print(f'{sweep_params.sample_span=}')
    print(f'{sweep_params.nperseg=}')
    print(f'{sweep_params.bin_width=}')
    print(f'{sweep_params.lo_offset_bins=}')
    print(f'{sweep_params.decimation_rate=}')
    print(f'{sweep_params.lo_offset=}')
    print(f'{sweep_params.sample_bins=}')
    print(f'{sweep_params.fft_bins=}')
    print(f"{sample_sizes=}")
    print(f"{lo_list=}")

    testService = ADCTestService()
    testService.initialize()

    axidma_dev = _axidma.lib.axidma_init()
    if axidma_dev is None:
        print("Can't initialize axidma device!")
        sys.exit(-1)
    rx_chans = _axidma.lib.axidma_get_dma_rx(axidma_dev)

    attenuator = ADCAttenuator('/dev/'+spi_device, '/dev/'+gpio_device)
    attenuator.set_attenuation(adc_chan, 0.0)
    attenuator.update_attenuation(adc_chan)

    chan_id = rx_chans.data[adc_chan]
    ch = ADCTestChannel(testService, adc_chan, chan_id)
    status = ch.initialize(axidma_dev, prefill_buffers=False)

    start = time()
    ch.fft_sweep(lo_list - sweep_params.lo_offset, sweep_params, adc_conv_factor)
    duration = time() - start
    print(f'fft_sweep took {duration:.4f} seconds')

    start = time()
    samples = ch.sweep(lo_list - sweep_params.lo_offset,
                       sweep_params.decimation_rate,
                       sweep_params.nperseg + sweep_params.lo_offset_bins)
    duration = time() - start
    print(f'sweep took {duration:.4f} seconds')


    ch.close()