Source code for rfblocks.log_detector

# `LogDetector` - Base class for SPI enabled log detector transducers
#
#    Copyright (C) 2021 Dyadic Pty Ltd
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as published
#    by the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public License
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.

from typing import Dict, List, Tuple, Optional, Callable
import time
import struct
import serial
import numpy as np
from scipy import interpolate
from rfblocks import write_cmd, query_cmd


[docs]class CalibrationDataError(Exception): """Raised when there is no valid calibration data.""" pass
class InvalidCalibrationDataError(Exception): pass class DetectorReadError(Exception): """Raised when there is an inconsistency in reading detector data""" pass def cubic_func(x: float, a: float, b: float, c: float, d: float) -> float: return (((a*x) + b)*x + c)*x + d
[docs]class LogDetector(object): LTC5582 = 0 LT5537 = 1 DETECTOR_PARAMETERS = [ ['ltc5582', 100.0, 6000.0, -140.0, 30.0], ['lt5537', 5.0, 600.0, -140.0, 30.0] ] RAW_TO_MILLIVOLTS = 2560/1024 "The ADC is 10-bits using a ref. voltage range of 0 to 2.56" DETECTOR_TYPE_REG = 20 CAL_RECORD_COUNT_REG = 100 CAL_RECORD_COUNT = 17 def __init__(self, cs: str = None) -> None: """Create an instance of ``LogDetector``. :param cs: The detector board chip select (`~CS`) controller pin. :type cs: str Documentation for the LTC5582 and LT5537 power detector rfblocks modules can be found here: `LTC5582 and LT5537 Power Detectors <../boards/LTC5582-Power-Detector.html>`_ .. note:: The default detector type is set to be the LTC5582. This is updated by :py:`initialize` as read from the detector firmware register. """ self.cs: str = cs.upper() if cs else cs self.cal_data: Dict[str, List[float]] = {} self.detector_type: int = LogDetector.LTC5582 self.slope_interp_fn: Optional[Callable[[float], float]] = None self.intercept_interp_fn: Optional[Callable[[float], float]] = None self.min_limit_coeffs: List[float] = [] self.max_limit_coeffs: List[float] = [] def __str__(self) -> str: """ """ return """LogDetector: cs: {}""".format(self.cs) def __repr__(self) -> str: return self.__str__()
[docs] def pin_config(self) -> str: """Initialize controller pin configuration. :return: The command string required to configure the device controller pins. """ cmd = '' if self.cs: cmd += 'O{}:H{}:'.format(self.cs, self.cs) return cmd
[docs] def initialize(self, ser: serial.Serial) -> None: """Initialize the detector type and calibration data. :param ser: serial device to write commands to :type ser: serial.Serial :raises: DetectorReadError """ self.read_detector_type(ser) if not self.cal_data: self.read_cal_data(ser)
@property def frequency_range(self) -> Tuple[float, float]: """Returns the frequency range of the connected detector. This assumes that :py:meth:initialize has been called. """ params = LogDetector.DETECTOR_PARAMETERS[self.detector_type] return tuple(params[1:3]) @property def power_range(self) -> Tuple[float, float]: """Returns the power range of the connected detector. This assumes that :py:meth:initialize has been called. """ params = LogDetector.DETECTOR_PARAMETERS[self.detector_type] return tuple(params[3:5])
[docs] def set_cal_data(self, data: Dict[str, List[float]]) -> None: """Set the detector calibration data. :param data: a dict containing the calibration data. The data are the slope and log intercept of the linear portion of the detector response indexed with a string representing their frequency at which the fit was made. :type data: Dict[str, List[float]] Example cal. data: .. code-block:: {'0.0': [30.6677827385478, -82.38328220135256], '250.0': [30.6677827385478, -82.38328220135256], '500.0': [30.67172619194942, -82.12740330035196], ... '3500.0': [31.015178572555065, -77.42265310955872], '3750.0': [31.226428574568747, -76.5568104298865], '6000.0': [31.96632362294561, -69.4423167171135]} """ self.cal_data = data
[docs] def vout(self, ser: serial.Serial, avg: int = 1) -> float: """Read the current detector output voltage. :param ser: serial device to write commands to :type ser: serial.Serial :return: The current detector output voltage in millivolts. """ vout = 0.0 for i in range(avg): raw_pwr = self.read_raw(ser) vout += raw_pwr * LogDetector.RAW_TO_MILLIVOLTS return vout/avg
[docs] def power(self, ser: serial.Serial, freq: float, avg: int = 1) -> float: """Read the current detector power. This method will return the measured input signal power corrected using the calibration parameters stored in the detector's on-board EEPROM. :param ser: serial device to write commands to :type ser: serial.Serial :param freq: the frequency of the signal being measured (in MHz) :type freq: float :param avg: the number of readings to average over. The default value is 1. :type avg: int :return: The current detector power (in dBm) with calibration factor applied. """ self.initialize(ser) vout = self.vout(ser, avg) # Calculate corrected RF input power from ADC Vout # 1. Derive frequency interpolated values of slope (A) and # log intercept (B). # 2. RF input power = (vout/A) + B slope = self.slope(freq) log_intercept = self.intercept(freq) corrected_pwr = log_intercept + (vout/slope) return corrected_pwr
[docs] def raw_pwr(self, corrected_pwr: float, freq: float) -> int: """Convert a know 'corrected' input power to a measured 'raw' power. """ slope = self.slope(freq) log_intercept = self.intercept(freq) vout = (corrected_pwr - log_intercept) * slope return (vout / LogDetector.RAW_TO_MILLIVOLTS)
[docs] def read_raw(self, ser: serial.Serial) -> int: """Read a raw 10 bit ADC value from the detector board. :param ser: serial device to write commands to :type ser: serial.Serial :return: The raw detector ADC value. """ # SPI clock speed for the attiny45 should be <500 kHz cmd = 'L{}:S3:'.format(self.cs) write_cmd(ser, cmd) # Start an ADC conversion cmd = 'W40:' write_cmd(ser, cmd) # Max. ADC conversion time on the attiny is approx 0.2 millisecs # (125kHz ADC clock) time.sleep(0.00025) # Read the ADC low byte from register 1 cmd = 'W01:' write_cmd(ser, cmd) cmd = 'R:' resp = query_cmd(ser, cmd) low_byte = int(b''.join(resp).decode('utf-8')[:-1], 16) if len(resp) > 1 else 0 # Read the ADC high byte from register 2 cmd = 'W02:' write_cmd(ser, cmd) cmd = 'R:' resp = query_cmd(ser, cmd) high_byte = int(b''.join(resp).decode('utf-8')[:-1], 16) if len(resp) > 1 else 0 # SPI clock speed is returned to 4 MHz cmd = 'H{}:S1:'.format(self.cs) write_cmd(ser, cmd) return low_byte + (high_byte * 256)
[docs] def slope(self, freq: float) -> float: """Return the Vout vs RFin slope for a given frequency (in MHz). :param freq: frequency in MHz :type freq: float :return: the slope (in mV/dB) at the given frequency """ if not self.slope_interp_fn: freqs = np.array([float(f) for f in self.cal_data.keys()]) slopes = np.array([v[0] for v in self.cal_data.values()]) self.slope_interp_fn = interpolate.interp1d(freqs, slopes) return self.slope_interp_fn(freq)
[docs] def intercept(self, freq: float) -> float: """Return the Vout vs RFin log intercept for a given frequency (in MHz). :param freq: frequency in MHz :type freq: float :return: the log intercept (in dB) at the given frequency """ if not self.intercept_interp_fn: freqs = np.array([float(f) for f in self.cal_data.keys()]) intercepts = np.array([v[1] for v in self.cal_data.values()]) self.intercept_interp_fn = interpolate.interp1d(freqs, intercepts) return self.intercept_interp_fn(freq)
[docs] def linear_range(self, freq: float) -> Tuple[float, float]: """The detectors linear response range (in dBm). :param freq: frequency in MHz :type freq: float :return: A tuple containing the minimum and maximum input signal powers for which the detector provides a linear response. If no calibration data is available the :py:exc:`rfblocks.CalibrationDataError` is raised. """ if ((len(self.min_limit_coeffs) == 4) and (len(self.max_limit_coeffs) == 4)): return (cubic_func(freq, *self.min_limit_coeffs), cubic_func(freq, *self.max_limit_coeffs)) else: raise CalibrationDataError
[docs] def read_reg_byte(self, ser: serial.Serial, reg: int) -> int: """Read an 8-bit integer from a specified register. Note that it is assumed that SPI is initialized. :param ser: serial device to write commands to :type ser: serial.Serial :param reg: the register to read from :type reg: int :return: the 8-bit integer stored in reg. """ cmd = "W{:X}:".format(reg) write_cmd(ser, cmd) resp = query_cmd(ser, "R:") i = int(b''.join(resp).decode('utf-8')[:-1], 16) if len(resp) > 1 else 0 return i
[docs] def read_reg_word(self, ser: serial.Serial, reg: int) -> int: """Read a 16 bit integer from the specified register pair. Note that it is assumed that SPI is initialized. :param ser: serial device to write commands to :type ser: serial.Serial :param reg: the first of the pair of registers to read from :type reg: int :return: the 16 bit integer stored in ``reg``, ``reg+1``. The integer is calculated as ``[reg] + [reg+1]*256`` """ low_byte = self.read_reg_byte(ser, reg) high_byte = self.read_reg_byte(ser, reg+1) return low_byte + (high_byte * 256)
[docs] def read_reg_float(self, ser: serial.Serial, reg: int) -> float: """Read a 32 bit float from the specified register quad. Note that it is assumed that SPI is initialized. :param ser: serial device to write commands to :type ser: serial.Serial :param reg: the first of the quad of registers to read from :type reg: int :return: the 32 bit float stored in ``reg``, ``reg+1``, ``reg+2``, ``reg+3``. The float is calculated using: .. code-block:: b = [0x85, 0x4f, 0x6b, 0x2e] f = struct.unpack('f', bytes(b)) """ b = [] for r in range(reg, reg+4): b.append(self.read_reg_byte(ser, r)) f = struct.unpack('f', bytes(b)) return f[0]
[docs] def read_detector_type(self, ser: serial.Serial) -> None: """Read the detector type from the detector board. This sets the self.detector_type attribute. :param ser: serial device to write commands to :type ser: serial.Serial """ # SPI clock speed for the attiny45 should be <500 kHz cmd = 'L{}:S3:'.format(self.cs) write_cmd(ser, cmd) self.detector_type = self.read_reg_byte( ser, LogDetector.DETECTOR_TYPE_REG) cmd = 'H{}:S1:'.format(self.cs) write_cmd(ser, cmd) if self.detector_type not in [LogDetector.LT5537, LogDetector.LTC5582]: raise DetectorReadError
[docs] def is_connected(self, ser: serial.Serial) -> bool: """Test if there is a connected detector. Tests the value in the detector's CAL_RECORD_COUNT_REG. If it isn't the expected value of CAL_RECORD_COUNT then it is assumed that no detector is connected at that controller pin. :param ser: serial device to write commands to :type ser: serial.Serial :return: True if a valid detector is connected, False otherwise. """ # SPI clock speed for the attiny45 should be <500 kHz cmd = 'L{}:S3:'.format(self.cs) write_cmd(ser, cmd) reg = LogDetector.CAL_RECORD_COUNT_REG record_count = self.read_reg_word(ser, reg) cmd = 'H{}:S1:'.format(self.cs) write_cmd(ser, cmd) if record_count == LogDetector.CAL_RECORD_COUNT: return True else: return False
[docs] def read_cal_data(self, ser: serial.Serial) -> None: """Read calibration data from the detector board. Initializes self.cal_data with a dictionary containing the calibration data values keyed with the frequencies at which they were measured. This calibration data is used to calculate the corrected power as measured by the detector (see :py:meth:`power`). Also populates the self.min_limit_coeffs and self.max_limit_coeffs which are used to calculate the linear range of the detector (see :py:meth:`linear_range`). :param ser: serial device to write commands to :type ser: serial.Serial """ self.cal_data = {} # SPI clock speed for the attiny45 should be <500 kHz cmd = 'L{}:S3:'.format(self.cs) write_cmd(ser, cmd) reg = LogDetector.CAL_RECORD_COUNT_REG record_count = self.read_reg_word(ser, reg) if record_count != LogDetector.CAL_RECORD_COUNT: cmd = 'H{}:S1:'.format(self.cs) write_cmd(ser, cmd) raise DetectorReadError if record_count == 0xffff: cmd = 'H{}:S1:'.format(self.cs) write_cmd(ser, cmd) raise InvalidCalibrationDataError reg += 2 for i in range(record_count): # Each records contains: frequency, slope x 100, # log intercept x 100 freq = float(self.read_reg_word(ser, reg)) reg += 2 slope = self.read_reg_word(ser, reg) / 100.0 reg += 2 intercept = -(self.read_reg_word(ser, reg) / 100.0) reg += 2 self.cal_data['{:.1f}'.format(freq)] = [slope, intercept] # Read the linearity limits cubic fit coeffs self.min_limit_coeffs = [] self.max_limit_coeffs = [] for i in range(4): self.max_limit_coeffs.append(self.read_reg_float(ser, reg)) reg += 4 for i in range(4): self.min_limit_coeffs.append(self.read_reg_float(ser, reg)) reg += 4 cmd = 'H{}:S1:'.format(self.cs) write_cmd(ser, cmd)