# `LogDetector` - Base class for SPI enabled log detector transducers
#
# Copyright (C) 2021 Dyadic Pty Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Dict, List, Tuple, Optional, Callable
import time
import struct
import serial
import numpy as np
from scipy import interpolate
from rfblocks import write_cmd, query_cmd
[docs]class CalibrationDataError(Exception):
"""Raised when there is no valid calibration data."""
pass
class InvalidCalibrationDataError(Exception):
pass
class DetectorReadError(Exception):
"""Raised when there is an inconsistency in reading detector data"""
pass
def cubic_func(x: float,
a: float,
b: float,
c: float,
d: float) -> float:
return (((a*x) + b)*x + c)*x + d
[docs]class LogDetector(object):
LTC5582 = 0
LT5537 = 1
DETECTOR_PARAMETERS = [
['ltc5582', 100.0, 6000.0, -140.0, 30.0],
['lt5537', 5.0, 600.0, -140.0, 30.0]
]
RAW_TO_MILLIVOLTS = 2560/1024
"The ADC is 10-bits using a ref. voltage range of 0 to 2.56"
DETECTOR_TYPE_REG = 20
CAL_RECORD_COUNT_REG = 100
CAL_RECORD_COUNT = 17
def __init__(self, cs: str = None) -> None:
"""Create an instance of ``LogDetector``.
:param cs: The detector board chip select (`~CS`) controller pin.
:type cs: str
Documentation for the LTC5582 and LT5537 power detector rfblocks modules
can be found here: `LTC5582 and LT5537 Power Detectors <../boards/LTC5582-Power-Detector.html>`_
.. note::
The default detector type is set to be the LTC5582. This is updated
by :py:`initialize` as read from the detector firmware register.
"""
self.cs: str = cs.upper() if cs else cs
self.cal_data: Dict[str, List[float]] = {}
self.detector_type: int = LogDetector.LTC5582
self.slope_interp_fn: Optional[Callable[[float], float]] = None
self.intercept_interp_fn: Optional[Callable[[float], float]] = None
self.min_limit_coeffs: List[float] = []
self.max_limit_coeffs: List[float] = []
def __str__(self) -> str:
"""
"""
return """LogDetector: cs: {}""".format(self.cs)
def __repr__(self) -> str:
return self.__str__()
[docs] def pin_config(self) -> str:
"""Initialize controller pin configuration.
:return: The command string required to configure the device
controller pins.
"""
cmd = ''
if self.cs:
cmd += 'O{}:H{}:'.format(self.cs, self.cs)
return cmd
[docs] def initialize(self, ser: serial.Serial) -> None:
"""Initialize the detector type and calibration data.
:param ser: serial device to write commands to
:type ser: serial.Serial
:raises: DetectorReadError
"""
self.read_detector_type(ser)
if not self.cal_data:
self.read_cal_data(ser)
@property
def frequency_range(self) -> Tuple[float, float]:
"""Returns the frequency range of the connected detector.
This assumes that :py:meth:initialize has been called.
"""
params = LogDetector.DETECTOR_PARAMETERS[self.detector_type]
return tuple(params[1:3])
@property
def power_range(self) -> Tuple[float, float]:
"""Returns the power range of the connected detector.
This assumes that :py:meth:initialize has been called.
"""
params = LogDetector.DETECTOR_PARAMETERS[self.detector_type]
return tuple(params[3:5])
[docs] def set_cal_data(self,
data: Dict[str, List[float]]) -> None:
"""Set the detector calibration data.
:param data: a dict containing the calibration data. The data are
the slope and log intercept of the linear portion of the
detector response indexed with a string representing their
frequency at which the fit was made.
:type data: Dict[str, List[float]]
Example cal. data:
.. code-block::
{'0.0': [30.6677827385478, -82.38328220135256],
'250.0': [30.6677827385478, -82.38328220135256],
'500.0': [30.67172619194942, -82.12740330035196],
...
'3500.0': [31.015178572555065, -77.42265310955872],
'3750.0': [31.226428574568747, -76.5568104298865],
'6000.0': [31.96632362294561, -69.4423167171135]}
"""
self.cal_data = data
[docs] def vout(self,
ser: serial.Serial,
avg: int = 1) -> float:
"""Read the current detector output voltage.
:param ser: serial device to write commands to
:type ser: serial.Serial
:return: The current detector output voltage in millivolts.
"""
vout = 0.0
for i in range(avg):
raw_pwr = self.read_raw(ser)
vout += raw_pwr * LogDetector.RAW_TO_MILLIVOLTS
return vout/avg
[docs] def power(self,
ser: serial.Serial,
freq: float,
avg: int = 1) -> float:
"""Read the current detector power.
This method will return the measured input signal power
corrected using the calibration parameters stored in the
detector's on-board EEPROM.
:param ser: serial device to write commands to
:type ser: serial.Serial
:param freq: the frequency of the signal being measured (in MHz)
:type freq: float
:param avg: the number of readings to average over.
The default value is 1.
:type avg: int
:return: The current detector power (in dBm) with calibration factor
applied.
"""
self.initialize(ser)
vout = self.vout(ser, avg)
# Calculate corrected RF input power from ADC Vout
# 1. Derive frequency interpolated values of slope (A) and
# log intercept (B).
# 2. RF input power = (vout/A) + B
slope = self.slope(freq)
log_intercept = self.intercept(freq)
corrected_pwr = log_intercept + (vout/slope)
return corrected_pwr
[docs] def raw_pwr(self, corrected_pwr: float, freq: float) -> int:
"""Convert a know 'corrected' input power to a measured 'raw' power.
"""
slope = self.slope(freq)
log_intercept = self.intercept(freq)
vout = (corrected_pwr - log_intercept) * slope
return (vout / LogDetector.RAW_TO_MILLIVOLTS)
[docs] def read_raw(self, ser: serial.Serial) -> int:
"""Read a raw 10 bit ADC value from the detector board.
:param ser: serial device to write commands to
:type ser: serial.Serial
:return: The raw detector ADC value.
"""
# SPI clock speed for the attiny45 should be <500 kHz
cmd = 'L{}:S3:'.format(self.cs)
write_cmd(ser, cmd)
# Start an ADC conversion
cmd = 'W40:'
write_cmd(ser, cmd)
# Max. ADC conversion time on the attiny is approx 0.2 millisecs
# (125kHz ADC clock)
time.sleep(0.00025)
# Read the ADC low byte from register 1
cmd = 'W01:'
write_cmd(ser, cmd)
cmd = 'R:'
resp = query_cmd(ser, cmd)
low_byte = int(b''.join(resp).decode('utf-8')[:-1],
16) if len(resp) > 1 else 0
# Read the ADC high byte from register 2
cmd = 'W02:'
write_cmd(ser, cmd)
cmd = 'R:'
resp = query_cmd(ser, cmd)
high_byte = int(b''.join(resp).decode('utf-8')[:-1],
16) if len(resp) > 1 else 0
# SPI clock speed is returned to 4 MHz
cmd = 'H{}:S1:'.format(self.cs)
write_cmd(ser, cmd)
return low_byte + (high_byte * 256)
[docs] def slope(self, freq: float) -> float:
"""Return the Vout vs RFin slope for a given frequency (in MHz).
:param freq: frequency in MHz
:type freq: float
:return: the slope (in mV/dB) at the given frequency
"""
if not self.slope_interp_fn:
freqs = np.array([float(f) for f in self.cal_data.keys()])
slopes = np.array([v[0] for v in self.cal_data.values()])
self.slope_interp_fn = interpolate.interp1d(freqs, slopes)
return self.slope_interp_fn(freq)
[docs] def intercept(self, freq: float) -> float:
"""Return the Vout vs RFin log intercept for a given frequency (in MHz).
:param freq: frequency in MHz
:type freq: float
:return: the log intercept (in dB) at the given frequency
"""
if not self.intercept_interp_fn:
freqs = np.array([float(f) for f in self.cal_data.keys()])
intercepts = np.array([v[1] for v in self.cal_data.values()])
self.intercept_interp_fn = interpolate.interp1d(freqs, intercepts)
return self.intercept_interp_fn(freq)
[docs] def linear_range(self, freq: float) -> Tuple[float, float]:
"""The detectors linear response range (in dBm).
:param freq: frequency in MHz
:type freq: float
:return: A tuple containing the minimum and maximum input signal powers
for which the detector provides a linear response.
If no calibration data is available the
:py:exc:`rfblocks.CalibrationDataError` is raised.
"""
if ((len(self.min_limit_coeffs) == 4)
and (len(self.max_limit_coeffs) == 4)):
return (cubic_func(freq, *self.min_limit_coeffs),
cubic_func(freq, *self.max_limit_coeffs))
else:
raise CalibrationDataError
[docs] def read_reg_byte(self,
ser: serial.Serial,
reg: int) -> int:
"""Read an 8-bit integer from a specified register.
Note that it is assumed that SPI is initialized.
:param ser: serial device to write commands to
:type ser: serial.Serial
:param reg: the register to read from
:type reg: int
:return: the 8-bit integer stored in reg.
"""
cmd = "W{:X}:".format(reg)
write_cmd(ser, cmd)
resp = query_cmd(ser, "R:")
i = int(b''.join(resp).decode('utf-8')[:-1],
16) if len(resp) > 1 else 0
return i
[docs] def read_reg_word(self,
ser: serial.Serial,
reg: int) -> int:
"""Read a 16 bit integer from the specified register pair.
Note that it is assumed that SPI is initialized.
:param ser: serial device to write commands to
:type ser: serial.Serial
:param reg: the first of the pair of registers to read from
:type reg: int
:return: the 16 bit integer stored in ``reg``, ``reg+1``.
The integer is calculated as ``[reg] + [reg+1]*256``
"""
low_byte = self.read_reg_byte(ser, reg)
high_byte = self.read_reg_byte(ser, reg+1)
return low_byte + (high_byte * 256)
[docs] def read_reg_float(self,
ser: serial.Serial,
reg: int) -> float:
"""Read a 32 bit float from the specified register quad.
Note that it is assumed that SPI is initialized.
:param ser: serial device to write commands to
:type ser: serial.Serial
:param reg: the first of the quad of registers to read from
:type reg: int
:return: the 32 bit float stored in ``reg``, ``reg+1``, ``reg+2``,
``reg+3``. The float is calculated using:
.. code-block::
b = [0x85, 0x4f, 0x6b, 0x2e]
f = struct.unpack('f', bytes(b))
"""
b = []
for r in range(reg, reg+4):
b.append(self.read_reg_byte(ser, r))
f = struct.unpack('f', bytes(b))
return f[0]
[docs] def read_detector_type(self,
ser: serial.Serial) -> None:
"""Read the detector type from the detector board.
This sets the self.detector_type attribute.
:param ser: serial device to write commands to
:type ser: serial.Serial
"""
# SPI clock speed for the attiny45 should be <500 kHz
cmd = 'L{}:S3:'.format(self.cs)
write_cmd(ser, cmd)
self.detector_type = self.read_reg_byte(
ser,
LogDetector.DETECTOR_TYPE_REG)
cmd = 'H{}:S1:'.format(self.cs)
write_cmd(ser, cmd)
if self.detector_type not in [LogDetector.LT5537,
LogDetector.LTC5582]:
raise DetectorReadError
[docs] def is_connected(self,
ser: serial.Serial) -> bool:
"""Test if there is a connected detector.
Tests the value in the detector's CAL_RECORD_COUNT_REG.
If it isn't the expected value of CAL_RECORD_COUNT then it is
assumed that no detector is connected at that controller pin.
:param ser: serial device to write commands to
:type ser: serial.Serial
:return: True if a valid detector is connected, False otherwise.
"""
# SPI clock speed for the attiny45 should be <500 kHz
cmd = 'L{}:S3:'.format(self.cs)
write_cmd(ser, cmd)
reg = LogDetector.CAL_RECORD_COUNT_REG
record_count = self.read_reg_word(ser, reg)
cmd = 'H{}:S1:'.format(self.cs)
write_cmd(ser, cmd)
if record_count == LogDetector.CAL_RECORD_COUNT:
return True
else:
return False
[docs] def read_cal_data(self,
ser: serial.Serial) -> None:
"""Read calibration data from the detector board.
Initializes self.cal_data with a dictionary containing the
calibration data values keyed with the frequencies at which
they were measured. This calibration data is used to
calculate the corrected power as measured by the detector
(see :py:meth:`power`).
Also populates the self.min_limit_coeffs
and self.max_limit_coeffs which are used to calculate the
linear range of the detector (see :py:meth:`linear_range`).
:param ser: serial device to write commands to
:type ser: serial.Serial
"""
self.cal_data = {}
# SPI clock speed for the attiny45 should be <500 kHz
cmd = 'L{}:S3:'.format(self.cs)
write_cmd(ser, cmd)
reg = LogDetector.CAL_RECORD_COUNT_REG
record_count = self.read_reg_word(ser, reg)
if record_count != LogDetector.CAL_RECORD_COUNT:
cmd = 'H{}:S1:'.format(self.cs)
write_cmd(ser, cmd)
raise DetectorReadError
if record_count == 0xffff:
cmd = 'H{}:S1:'.format(self.cs)
write_cmd(ser, cmd)
raise InvalidCalibrationDataError
reg += 2
for i in range(record_count):
# Each records contains: frequency, slope x 100,
# log intercept x 100
freq = float(self.read_reg_word(ser, reg))
reg += 2
slope = self.read_reg_word(ser, reg) / 100.0
reg += 2
intercept = -(self.read_reg_word(ser, reg) / 100.0)
reg += 2
self.cal_data['{:.1f}'.format(freq)] = [slope, intercept]
# Read the linearity limits cubic fit coeffs
self.min_limit_coeffs = []
self.max_limit_coeffs = []
for i in range(4):
self.max_limit_coeffs.append(self.read_reg_float(ser, reg))
reg += 4
for i in range(4):
self.min_limit_coeffs.append(self.read_reg_float(ser, reg))
reg += 4
cmd = 'H{}:S1:'.format(self.cs)
write_cmd(ser, cmd)