Source code for cracknuts.cracker.cracker_o1
# Copyright 2024 CrackNuts. All rights reserved.
import os
import re
import struct
import math
from PIL import Image
from cracknuts.cracker import protocol
from cracknuts.cracker.cracker_g1 import ConfigG1, CrackerG1, wave_8m, wave_4m, _build_interp_func
import numpy as np
import importlib.util
_wave_interp_func_cv, _wave_interp_func_vc = _build_interp_func(
os.path.join(
os.path.join(os.path.dirname(importlib.util.find_spec("cracknuts").origin), "cracker"),
"o1_wave_voltage_map.csv",
)
)
[docs]
class ConfigO1(ConfigG1):
[docs]
def __init__(self):
super().__init__()
self.glitch_clock_arm: bool = False
self.glitch_clock_len_normal: int = len(wave_8m)
self.glitch_clock_wave_normal: list[float] = wave_8m # 默认时钟8mhz
self.glitch_clock_config_len_glitch: int = len(wave_4m)
self.glitch_clock_config_wave_glitch: list[float] = wave_4m # 默认glitch示例时钟4mhz
self.glitch_clock_config_wait: int = 1
self.glitch_clock_config_delay: int = 1
self.glitch_clock_config_repeat: int = 1
self.glitch_clock_enable: bool = True
[docs]
class CrackerO1(CrackerG1):
[docs]
def __init__(
self,
address: tuple | str | None = None,
bin_server_path: str | None = None,
bin_bitstream_path: str | None = None,
):
"""
Initialize the CrackNuts O1 device interface.
:param address: Device address as ``(ip, port)``, URI string, or ``None``.
:type address: tuple | str | None
:param bin_server_path: Path to the server firmware file for updates; normally not specified.
:type bin_server_path: str | None
:param bin_bitstream_path: Path to the bitstream firmware file for updates; normally not specified.
:type bin_bitstream_path: str | None
"""
super().__init__(address, bin_server_path, bin_bitstream_path)
self._config: ConfigG1 = self._config
self._gpio_map = {
"r": {
"mode": 0x194C,
"output": 0x1950,
"input": 0x1954,
"index": {
"GP7": 7,
"GP0": 0,
"GP1": 1,
"GP2": 2,
"GP3": 3,
"GP4": 4,
"GP5": 5,
"GP6": 6,
"GP21": 21,
"GP22": 22,
"GP26": 26,
"GP23": 23,
"GP24": 24,
"GP27": 27,
"GP25": 25,
"GP28": 28,
},
},
"a": {
"mode": 0x1940,
"output": 0x1944,
"input": 0x1948,
"index": {
"A2": 19,
"A3": 18,
"A4": 17,
"A5": 16,
"IO2": 2,
"IO3": 3,
"IO4": 4,
"IO5": 5,
"IO6": 6,
"IO7": 7,
"IO8": 8,
"IO9": 9,
"A": 10,
},
},
}
self._wave_fs = 10_000_000 # DAC采样率
self._buffer_depth = 2048 # 任意波形缓冲区深度(采样点数量)
self._wave_clk_div_max = 256
[docs]
def set_waveform_arbitrary(self, wave: list[float], wave_clk_div: int = 1) -> tuple[int, None | bytes]:
"""
Set the waveform generator output waveform.
:param wave: A sequence of voltage sample points for a single waveform period (unit: Volts).
Each element represents the output voltage of one discrete sample point.
The time interval per sample is 100 ns (corresponding to a 10 MHz sample rate).
Samples are output in array order; one array represents one complete waveform period.
:type wave: list[float]
:param wave_clk_div: Waveform clock divider (integer, default 1).
Actual output frequency = DAC sample rate / (len(wave) * wave_clk_div).
:type wave_clk_div: int
:return: Execution status code and device response data.
:rtype: tuple[int, None | bytes]
"""
if wave_clk_div < 1 or wave_clk_div > self._wave_clk_div_max:
self._logger.error(f"wave_clk_div must be between 1 and {self._wave_clk_div_max}")
return self.NON_PROTOCOL_ERROR, None
status, res = self.register_write(
base_address=0x43C10000, offset=0x182C, data=wave_clk_div - 1
) # 寄存器值为分频系数减1
if status != protocol.STATUS_OK:
return status, res
status, res = self.register_write(base_address=0x43C10000, offset=0x1810, data=len(wave))
if status != protocol.STATUS_OK:
return status, res
for voltage in wave:
status, res = self.register_write(
base_address=0x43C10000,
offset=0x1814,
data=self._get_dac_code_from_voltage(voltage, _wave_interp_func_vc),
)
if status != protocol.STATUS_OK:
return status, res
return protocol.STATUS_OK, None
@staticmethod
def _parse_frequency(frequency) -> float:
"""
Convert a frequency value to Hz.
Supported formats: ``1e6``, ``1000000``, ``"1M"``, ``"1MHz"``, ``"500k"``,
``"500kHz"``, ``"2.5G"``, ``"10"`` (plain number defaults to Hz).
:param frequency: Frequency value as a number or a string with optional unit suffix.
:type frequency: int | float | str
:return: Frequency in Hz.
:rtype: float
"""
if isinstance(frequency, int | float):
return float(frequency)
if not isinstance(frequency, str):
raise TypeError("frequency must be float | int | str")
s = frequency.strip().lower()
units = {
"hz": 1,
"k": 1e3,
"khz": 1e3,
"m": 1e6,
"mhz": 1e6,
"g": 1e9,
"ghz": 1e9,
}
m = re.fullmatch(r"([0-9]*\.?[0-9]+)\s*([a-z]*)", s)
if not m:
raise ValueError(f"Invalid frequency format: {frequency}")
value = float(m.group(1))
unit = m.group(2)
if unit == "":
multiplier = 1
else:
if unit not in units:
raise ValueError(f"Unknown frequency unit: {unit}")
multiplier = units[unit]
return value * multiplier
[docs]
def set_waveform_standard(
self,
waveform: str,
vpp,
*,
frequency: str | float | None = None,
offset: float | None = None,
duty: float = 0.5,
phase: float = 0.0,
) -> tuple[int, None | bytes]:
"""
Set standard waveform output.
This function generates a single-period waveform dataset in software according to the specified
parameters, writes it into the device arbitrary waveform buffer, and the DAC loops over it.
Supported waveform types: ``"dc"`` (DC voltage), ``"sine"`` (sine wave),
``"square"`` (square wave), ``"triangle"`` (triangle wave), ``"sawtooth"`` (sawtooth wave).
:param waveform: Waveform type name (case-insensitive).
:type waveform: str
:param vpp: Peak-to-peak voltage (Volts). For non-DC waveforms, amplitude = vpp / 2.
:type vpp: float
:param frequency: Output frequency (Hz). May be omitted for ``"dc"``; required for all others.
Accepts a numeric value or a string with unit suffix (e.g. ``1e6``, ``"1MHz"``, ``"100kHz"``).
:type frequency: float, str, or None
:param offset: DC offset voltage (Volts). If ``None``, defaults to ``vpp / 2`` to keep the
waveform non-negative.
:type offset: float or None
:param duty: Duty cycle or slope parameter between 0 and 1.
For ``square``, this is the fraction of the period at high level.
For ``sawtooth``, this is the fraction of the period occupied by the rising edge
(near 1.0 means slow rise and fast fall; 0.5 gives a symmetric triangle;
near 0.0 means fast rise and slow fall). Has no effect on ``sine`` or ``triangle``.
:type duty: float
:param phase: Initial phase in radians (2π rad = one full period).
:type phase: float
:return: Execution status code and device response data.
:rtype: tuple[int, None | bytes]
"""
waveform = waveform.lower()
if waveform == "dc":
wave = np.full(1, vpp, dtype=float)
return self.set_waveform_arbitrary(wave.tolist())
if frequency is None:
self._logger.error("frequency must be specified for non-DC waveform")
return self.NON_PROTOCOL_ERROR, None
frequency = self._parse_frequency(frequency)
if frequency > self._wave_fs:
self._logger.error(
"The frequency is too high to generate a valid waveform. " f"Maximum frequency is {self._wave_fs} Hz."
)
return self.NON_PROTOCOL_ERROR, None
wave_clk_div = 1
base_n_samples = max(1, int(self._wave_fs * 1.0 / frequency))
if base_n_samples > self._buffer_depth:
wave_clk_div = int(math.ceil(base_n_samples / self._buffer_depth))
if wave_clk_div > self._wave_clk_div_max:
min_freq = self._wave_fs / (self._buffer_depth * self._wave_clk_div_max)
self._logger.error(
"The frequency is too low to generate a valid waveform. " f"Minimum frequency is {min_freq} Hz."
)
return self.NON_PROTOCOL_ERROR, None
effective_fs = self._wave_fs / wave_clk_div
n_samples = max(1, int(effective_fs * 1.0 / frequency))
if n_samples > self._buffer_depth:
self._logger.error(
"The waveform is too long to fit in the buffer after applying wave_clk_div. "
f"Length is {n_samples}, max is {self._buffer_depth}."
)
return self.NON_PROTOCOL_ERROR, None
amplitude = vpp / 2
if offset is None:
offset = amplitude
t = np.arange(n_samples) / effective_fs
# Use sample-center time for phase-based waveforms to avoid low-sample aliasing.
t_center = t + (0.5 / effective_fs)
if waveform == "sine":
wave = amplitude * np.sin(2 * np.pi * frequency * t + phase)
elif waveform == "square":
phase_t = (frequency * t_center + phase / (2 * np.pi)) % 1.0
wave = np.where(phase_t < duty, amplitude, -amplitude)
elif waveform == "triangle":
phase_t = (frequency * t_center + phase / (2 * np.pi)) % 1.0
wave = 4 * amplitude * np.abs(phase_t - 0.5) - amplitude
elif waveform == "sawtooth":
slope = np.clip(duty, 1e-6, 1 - 1e-6)
phase_t = (frequency * t_center + phase / (2 * np.pi)) % 1.0
wave = np.where(
phase_t < slope,
-amplitude + (2 * amplitude / slope) * phase_t,
amplitude - (2 * amplitude / (1 - slope)) * (phase_t - slope),
)
else:
raise ValueError(f"Unsupported waveform: {waveform}")
wave = wave + offset
v_min = float(np.min(wave))
if v_min < 0:
raise ValueError("Generated waveform contains negative voltage. " "Increase offset or reduce amplitude.")
return self.set_waveform_arbitrary(wave.tolist(), wave_clk_div=wave_clk_div)
[docs]
def set_waveform_sine(
self,
frequency: float | str,
*,
vpp: float = 1.0,
phase: float = 0.0,
offset: float | None = None,
):
"""
Set sine wave output.
:param frequency: Output frequency (Hz). Accepts a numeric value or a string with unit suffix,
e.g. ``1e6``, ``"1m"``, ``"1MHz"``, ``"10kHz"``.
:type frequency: float or str
:param vpp: Peak-to-peak voltage (Volts). Default is 1.0.
:type vpp: float
:param phase: Initial phase in radians. Default is 0.0.
:type phase: float
:param offset: DC offset voltage (Volts). If ``None``, automatically set to ``vpp / 2``
to avoid negative voltage.
:type offset: float or None
:return: Device response status and response data.
:rtype: tuple[int, None | bytes]
"""
return self.set_waveform_standard(
"sine",
vpp=vpp,
frequency=frequency,
phase=phase,
offset=offset,
)
[docs]
def set_waveform_square(
self,
frequency: float,
*,
duty: float = 0.5,
vpp: float = 1.0,
phase: float = 0.0,
offset: float | None = None,
):
"""
Set square wave output.
:param frequency: Output frequency (Hz).
:type frequency: float
:param duty: Duty cycle (0–1). 0.5 means a standard 50% square wave;
0.2 means high level occupies 20% of the period. Default is 0.5.
:type duty: float
:param vpp: Peak-to-peak voltage (Volts). Default is 1.0.
:type vpp: float
:param phase: Initial phase in radians. Default is 0.0.
:type phase: float
:param offset: DC offset voltage (Volts).
:type offset: float or None
:return: Device response status and response data.
:rtype: tuple[int, None | bytes]
"""
return self.set_waveform_standard(
"square",
frequency=frequency,
vpp=vpp,
duty=duty,
phase=phase,
offset=offset,
)
[docs]
def set_waveform_triangle(
self,
frequency: float,
*,
vpp: float = 1.0,
phase: float = 0.0,
offset: float | None = None,
):
"""
Set triangle wave output.
:param frequency: Output frequency (Hz).
:type frequency: float
:param vpp: Peak-to-peak voltage (Volts). Default is 1.0.
:type vpp: float
:param phase: Initial phase in radians. Default is 0.0.
:type phase: float
:param offset: DC offset voltage (Volts).
:type offset: float or None
:return: Device response status and response data.
:rtype: tuple[int, None | bytes]
"""
return self.set_waveform_standard(
"triangle",
frequency=frequency,
vpp=vpp,
phase=phase,
offset=offset,
)
[docs]
def set_waveform_sawtooth(
self,
frequency: float,
*,
vpp: float = 1.0,
slope: float = 1.0,
phase: float = 0.0,
offset: float | None = None,
):
"""
Set sawtooth wave output.
:param frequency: Output frequency (Hz).
:type frequency: float
:param vpp: Peak-to-peak voltage (Volts). Default is 1.0.
:type vpp: float
:param slope: Fraction of the period occupied by the rising edge (0–1).
1.0 = standard sawtooth (slow rise, fast fall); 0.5 = symmetric triangle;
near 0 = fast rise, slow fall. Default is 1.0.
:type slope: float
:param phase: Initial phase in radians. Default is 0.0.
:type phase: float
:param offset: DC offset voltage (Volts).
:type offset: float or None
:return: Device response status and response data.
:rtype: tuple[int, None | bytes]
"""
return self.set_waveform_standard(
"sawtooth",
frequency=frequency,
vpp=vpp,
duty=slope,
phase=phase,
offset=offset,
)
[docs]
def set_waveform_dc(self, voltage: float):
"""
Set DC voltage output.
:param voltage: Output DC voltage (Volts). Must be within the device's allowed output range
and must not be negative.
:type voltage: float
:return: Device response status and response data.
:rtype: tuple[int, None | bytes]
"""
return self.set_waveform_standard(
"dc",
frequency=1.0,
vpp=voltage,
offset=voltage,
)
[docs]
def set_waveform_from_file(self, file_path: str) -> tuple[int, None | bytes]:
"""
Load waveform data from a file and set the output waveform.
The file should contain a sequence of voltage sample points (unit: Volts),
supporting comma-separated or newline-separated numeric formats.
The maximum number of sample points is 2048.
:param file_path: Path to a text file containing waveform data.
Each numeric value in the file represents a voltage sample point (unit: V).
:type file_path: str
:return: Device response status and response data.
:rtype: tuple[int, None | bytes]
"""
if not os.path.exists(file_path):
self._logger.error(f"Waveform file not found: {file_path}")
return self.NON_PROTOCOL_ERROR, None
try:
with open(file_path, encoding="utf-8") as f:
content = f.read()
except Exception as e:
self._logger.error(f"Failed to read waveform file: {e}")
return self.NON_PROTOCOL_ERROR, None
# ---------- 解析数据 ----------
# 支持:
# 1,2,3
# 1 2 3
# 1\n2\n3
tokens = content.replace(",", " ").split()
if not tokens:
self._logger.error("Waveform file is empty.")
return self.NON_PROTOCOL_ERROR, None
try:
wave = np.array([float(v) for v in tokens], dtype=float)
except ValueError:
self._logger.error("Waveform file contains non-numeric values.")
return self.NON_PROTOCOL_ERROR, None
if len(wave) > self._buffer_depth:
self._logger.error(f"Waveform exceeds maximum length ({self._buffer_depth} samples).")
return self.NON_PROTOCOL_ERROR, None
if not np.all(np.isfinite(wave)):
self._logger.error("Waveform contains NaN or Inf.")
return self.NON_PROTOCOL_ERROR, None
v_min = float(np.min(wave))
if v_min < 0:
self._logger.error("Waveform contains negative voltage. " "All samples must be >= 0 V.")
return self.NON_PROTOCOL_ERROR, None
return self.set_waveform_arbitrary(wave.tolist())
[docs]
def get_voltage_a0(self):
"""
Get the voltage at measurement point A0.
:return: Device response status and measured voltage in Volts, or None on error.
:rtype: tuple[int, float] or None
"""
status, res = self.register_read(base_address=0x43C10000, offset=0x1E70)
if status != protocol.STATUS_OK:
return None
return status, round(int.from_bytes(res, byteorder="big") / 16 / 4096 * 3.33, 2)
[docs]
def get_voltage_a1(self):
"""
Get the voltage at measurement point A1.
:return: Device response status and measured voltage in Volts, or None on error.
:rtype: tuple[int, float] or None
"""
status, res = self.register_read(base_address=0x43C10000, offset=0x1E40)
if status != protocol.STATUS_OK:
return None
return status, round(int.from_bytes(res, byteorder="big") / 16 / 4096 * 3.33, 2)
[docs]
def set_pwm(self, freq, duty_cycle):
"""
Set the PWM output on pin GP29.
:param freq: PWM frequency in Hz.
:type freq: float | int
:param duty_cycle: PWM duty cycle as a fraction between 0 and 1.
:type duty_cycle: float
:return: Device response status and received data: (status, response).
:rtype: tuple[int, None | bytes]
"""
period = struct.pack(">I", round((freq * 2**32) / 100_000_000))
duty = struct.pack(">I", round((1 - duty_cycle) * (2**32) - 1))
status, res = self.register_write(base_address=0x43C10000, offset=0x1838, data=period)
if status != protocol.STATUS_OK:
return status, res
status, res = self.register_write(base_address=0x43C10000, offset=0x183C, data=duty)
if status != protocol.STATUS_OK:
return status, res
return protocol.STATUS_OK, None
[docs]
def get_switch_status_pl(self) -> tuple[int, None | tuple[int, int]]:
"""
Get the PL switch status.
:return: Status tuple ``(status, (sw1, sw2))``, where ``status`` is the protocol status code
and ``sw1`` / ``sw2`` are the two PL switch state bits (0 = open, 1 = closed).
:rtype: tuple[int, None | tuple[int, int]]
"""
status, res = self.register_read(base_address=0x43C10000, offset=0x193C)
if status != protocol.STATUS_OK:
return status, res
res = struct.unpack(">I", res)[0]
sw1, sw2 = ((res >> i) & 1 for i in (0, 1))
return status, (sw1, sw2)
[docs]
def get_switch_status_ps(self) -> tuple[int, None | tuple[int, int]]:
"""
Get the PS switch status.
:return: Status tuple ``(status, (sw1, sw2))``, where ``status`` is the protocol status code
and ``sw1`` / ``sw2`` are the two PS switch state bits (0 = open, 1 = closed).
:rtype: tuple[int, None | tuple[int, int]]
"""
res = self._ssh_cracker.exec("gpioget -c 0 9 14", print_output=False)
if res is not None and res["exit_code"] == 0:
output = res["stdout"].strip()
if output is not None and len(output) > 0:
gpio_states = []
for item in output.split():
_, state = item.split("=")
gpio_states.append(1 if state == "active" else 0)
return protocol.STATUS_OK, tuple(gpio_states)
return self.NON_PROTOCOL_ERROR, None
def _load_image(self, image_path: str, fit: bool = True) -> np.ndarray | None:
"""
Load an image and convert it to an RGB888 array.
:param image_path: Path to the image file.
:type image_path: str
:param fit: Whether to resize the image to 64x64 pixels.
:type fit: bool
:return: Converted RGB888 array. Shape is ``(64, 64, 3)`` when ``fit=True``, or
``(H, W, 3)`` depending on the original image when ``fit=False``. Returns None on error.
:rtype: np.ndarray or None
"""
target_size = 64
try:
with Image.open(image_path) as img:
# 1. 强制转换为 RGB 模式 (处理 RGBA, 灰度等)
rgb_img = img.convert("RGB")
width, height = rgb_img.size
processing_img = rgb_img
if fit:
# 无论原图大小,统一缩放至 64x64
# LANCZOS 算法在缩小和放大时都能提供较好的质量
processing_img = rgb_img.resize((target_size, target_size), Image.Resampling.LANCZOS)
# print(f"Image resized from {width}x{height} to {target_size}x{target_size}")
else:
# 保持原图
# print(f"Resize disabled, kept original size {width}x{height}.")
pass
# 2. 转换为 NumPy 数组
rgb_array = np.array(processing_img, dtype=np.uint8)
# 3. 验证形状 (仅在开启缩放时验证)
if fit:
if rgb_array.shape != (64, 64, 3):
self._logger.warning(f"Expected shape (64, 64, 3), got {rgb_array.shape}")
return rgb_array
except FileNotFoundError:
self._logger.error(f"Image {image_path} not exist")
return None
except Exception as e:
self._logger.error(f"Load image failed: {e}")
return None
[docs]
def set_led_content(self, t: int, x: int, y: int, c: bytes, w: int = None) -> None:
"""
Set the LED display content.
:param t: Content type: 0 for text, 1 for image.
:type t: int
:param x: X coordinate of the content in pixels.
:type x: int
:param y: Y coordinate of the content in pixels.
:type y: int
:param c: Content data: UTF-8 encoded string for text, or RGB888 byte array for images.
:type c: bytes
:param w: Width of the content in pixels. For text, this is the pixel width of the content;
for images, this is the image width.
:type w: int | None
"""
if t == 0:
payload = struct.pack(">Bii", t, x, y)
else:
payload = struct.pack(">BiiI", t, x, y, w)
payload += c
self.send_with_command(command=0x400, payload=payload)
[docs]
def set_led_text(self, text: str, x: int = 0, y: int = 0, auto_wrap: bool = True) -> None:
"""
Set the LED display text. Only ASCII characters are supported. Each character is 5 pixels
wide and 6 pixels tall with 1 pixel spacing. The screen resolution is 64x64 pixels with
the origin at the top-left corner; x increases rightward and y increases downward.
:param text: Text content to display.
:type text: str
:param x: X coordinate of the text in pixels.
:type x: int
:param y: Y coordinate of the text baseline in pixels. Note that the coordinate represents
the baseline (bottom) of the text, so set y to 6 to display a complete first line.
:type y: int
:param auto_wrap: Whether to automatically wrap text when it exceeds the screen width.
:type auto_wrap: bool
"""
if auto_wrap:
max_chars_per_line = (64 - x) // 6 # 每行最多显示的字符数(5像素字符宽度 + 1像素间距)
lines = []
current_line = ""
for char in text:
if len(current_line) < max_chars_per_line:
current_line += char
else:
lines.append(current_line)
current_line = char
if current_line:
lines.append(current_line)
text = "\n".join(lines)
self.set_led_content(0, x, y, text.encode("utf-8"))
[docs]
def set_led_image(self, image_path: str, x: int = 0, y: int = 0, fit: bool = True) -> None:
"""
Set the LED display image.
:param image_path: Path to the image file; common formats such as PNG and JPEG are supported.
:type image_path: str
:param x: X coordinate of the image in pixels.
:type x: int
:param y: Y coordinate of the image in pixels.
:type y: int
:param fit: Whether to force-resize the image to 64x64 pixels.
True: always resize to 64x64 regardless of original size.
False: preserve the original image dimensions.
:type fit: bool
"""
img_array = self._load_image(image_path, fit)
if img_array is not None:
h, w, _ = img_array.shape
self.set_led_content(1, x, y, img_array.tobytes(), w)
def _get_gpio_offset_and_index(self, pin_id: str):
pin_id = pin_id.upper()
if pin_id.startswith("GP"):
pin_index = self._gpio_map["r"]["index"].get(pin_id, None)
output_offset = self._gpio_map["r"]["output"]
input_offset = self._gpio_map["r"]["input"]
mode = self._gpio_map["r"]["mode"]
elif pin_id.startswith("A") or pin_id.startswith("IO"):
pin_index = self._gpio_map["a"]["index"].get(pin_id, None)
output_offset = self._gpio_map["a"]["output"]
input_offset = self._gpio_map["a"]["input"]
mode = self._gpio_map["a"]["mode"]
else:
self._logger.error(f"pin_id {pin_id} not supported")
return self.NON_PROTOCOL_ERROR, None
return pin_index, output_offset, input_offset, mode
[docs]
def digital_read(self, pin_id: str):
"""
Read the level state of a digital IO pin. A high level requires more than 1.4 V.
:param pin_id: Pin identifier. Supported pins: GP0-GP7, GP21-GP27, A, A2-A5, IO2-IO9.
:type pin_id: str
:return: Device response status and pin level: (status, level).
:rtype: tuple[int, bytes | None | int]
"""
pin_index, _, offset, _ = self._get_gpio_offset_and_index(pin_id)
if pin_index is None or offset is None:
self._logger.error(f"pin_id {pin_id} not supported")
return self.NON_PROTOCOL_ERROR, None
s, r = self.register_read(base_address=self._BASE_ADDRESS, offset=offset)
if s != protocol.STATUS_OK:
self._logger.error(f"Get GPIO data failed, status: {s}")
return s, r
else:
return s, self._get_bit_stream_lsb(r, pin_index)
[docs]
def digital_write(self, pin_id: str, value: int):
"""
Set the level state of a digital IO pin.
:param pin_id: Pin identifier. Supported pins: GP0-GP7, GP21-GP27, A, A2-A5, IO2-IO9.
:type pin_id: str
:param value: Pin level state: 1 for high, 0 for low.
:type value: int
:return: Device response status and received data: (status, response).
:rtype: tuple[int, bytes | None]
"""
pin_index, offset, _, _ = self._get_gpio_offset_and_index(pin_id)
s, r = self.register_read(base_address=self._BASE_ADDRESS, offset=offset)
if s != protocol.STATUS_OK:
self._logger.error(f"Get old GPIO data failed, status: {s}")
return s, r
else:
gpio_data = int.from_bytes(r, byteorder="big")
if value:
gpio_data |= 1 << pin_index
else:
gpio_data &= ~(1 << pin_index)
gpio_data_bytes = gpio_data.to_bytes(len(r), byteorder="big")
return self.register_write(base_address=self._BASE_ADDRESS, offset=offset, data=gpio_data_bytes)
[docs]
def digital_pin_mode(self, pin_id: str, mode: int | str):
"""
Set the operating mode of a digital IO pin.
:param pin_id: Pin identifier. Supported pins: GP0-GP7, GP21-GP27, A, A2-A5, IO2-IO9.
:type pin_id: str
:param mode: Pin operating mode: 1 for input, 0 for output, or ``"INPUT"`` / ``"OUTPUT"``.
:type mode: int | str
:return: Device response status and received data: (status, response).
:rtype: tuple[int, bytes | None]
"""
if isinstance(mode, str):
if mode.upper() == "INPUT":
mode = 1
elif mode.upper() == "OUTPUT":
mode = 0
else:
raise ValueError("Invalid mode string, must be 'INPUT' or 'OUTPUT'")
pin_index, _, _, offset = self._get_gpio_offset_and_index(pin_id)
s, r = self.register_read(base_address=self._BASE_ADDRESS, offset=offset)
if s != protocol.STATUS_OK:
self._logger.error(f"Get old GPIO mode failed, status: {s}")
return s, r
else:
gpio_dir = int.from_bytes(r, byteorder="big")
if mode:
gpio_dir |= 1 << pin_index
else:
gpio_dir &= ~(1 << pin_index)
gpio_dir_bytes = gpio_dir.to_bytes(len(r), byteorder="big")
return self.register_write(base_address=self._BASE_ADDRESS, offset=offset, data=gpio_dir_bytes)