Source code for cracknuts.cracker.cracker_o1

# Copyright 2024 CrackNuts. All rights reserved.

import os
import re
import struct

from PIL import Image

from cracknuts.cracker import protocol
from cracknuts.cracker.cracker_g1 import ConfigG1, CrackerG1, wave_8m, wave_4m, _build_interp_func
import numpy as np
import importlib.util

_wave_interp_func_cv, _wave_interp_func_vc = _build_interp_func(
    os.path.join(
        os.path.join(os.path.dirname(importlib.util.find_spec("cracknuts").origin), "cracker"),
        "o1_wave_voltage_map.csv",
    )
)


[docs] class ConfigO1(ConfigG1):
[docs] def __init__(self): super().__init__() self.glitch_clock_arm: bool = False self.glitch_clock_len_normal: int = len(wave_8m) self.glitch_clock_wave_normal: list[float] = wave_8m # 默认时钟8mhz self.glitch_clock_config_len_glitch: int = len(wave_4m) self.glitch_clock_config_wave_glitch: list[float] = wave_4m # 默认glitch示例时钟4mhz self.glitch_clock_config_wait: int = 1 self.glitch_clock_config_delay: int = 1 self.glitch_clock_config_repeat: int = 1 self.glitch_clock_enable: bool = True
[docs] class CrackerO1(CrackerG1):
[docs] def __init__( self, address: tuple | str | None = None, bin_server_path: str | None = None, bin_bitstream_path: str | None = None, operator_port: int = None, ): """ Cracker O1 设备接口类。 """ super().__init__(address, bin_server_path, bin_bitstream_path, operator_port) self._config: ConfigG1 = self._config self._gpio_map = { "r": { "mode": 0x194C, "output": 0x1950, "input": 0x1954, "index": { "GP7": 7, "GP0": 0, "GP1": 1, "GP2": 2, "GP3": 3, "GP4": 4, "GP5": 5, "GP6": 6, "GP21": 21, "GP22": 22, "GP26": 26, "GP23": 23, "GP24": 24, "GP27": 27, "GP25": 25, "GP28": 28, }, }, "a": { "mode": 0x1940, "output": 0x1944, "input": 0x1948, "index": { "A2": 19, "A3": 18, "A4": 17, "A5": 16, "IO2": 2, "IO3": 3, "IO4": 4, "IO5": 5, "IO6": 6, "IO7": 7, "IO8": 8, "IO9": 9, "A": 10, }, }, } self._wave_fs = 10_000_000 # DAC采样率 self._buffer_depth = 160 # 160
[docs] def set_waveform_arbitrary(self, wave: list[float]) -> tuple[int, None | bytes]: """ 设置波形发生器输出波形。 :param wave: 单组波形的电压采样点序列(单位:伏特 V)。列表中每个元素表示一个离散采样点的输出电压值。 - 单个采样点时间间隔: 100 ns(对应 10 MHz 的采样率) - 波形按照数组顺序依次输出 - 一个数组表示一个完整周期的波形 :type wave: list[float] :return: 执行状态码与设备返回数据。 :rtype: tuple[int, None | bytes] """ status, res = self.register_write(base_address=0x43C10000, offset=0x1810, data=len(wave)) if status != protocol.STATUS_OK: return status, res for voltage in wave: status, res = self.register_write( base_address=0x43C10000, offset=0x1814, data=self._get_dac_code_from_voltage(voltage, _wave_interp_func_vc), ) if status != protocol.STATUS_OK: return status, res return protocol.STATUS_OK, None
@staticmethod def _parse_frequency(frequency) -> float: """ 将 frequency 转为 Hz 支持: 1e6 1000000 "1M" "1MHz" "500k" "500kHz" "2.5G" "10" -> 默认 MHz """ if isinstance(frequency, int | float): return float(frequency) if not isinstance(frequency, str): raise TypeError("frequency must be float | int | str") s = frequency.strip().lower() units = { "hz": 1, "k": 1e3, "khz": 1e3, "m": 1e6, "mhz": 1e6, "g": 1e9, "ghz": 1e9, } m = re.fullmatch(r"([0-9]*\.?[0-9]+)\s*([a-z]*)", s) if not m: raise ValueError(f"Invalid frequency format: {frequency}") value = float(m.group(1)) unit = m.group(2) if unit == "": multiplier = 1e6 else: if unit not in units: raise ValueError(f"Unknown frequency unit: {unit}") multiplier = units[unit] return value * multiplier
[docs] def set_waveform_standard( self, waveform: str, vpp, *, frequency: str | float | None = None, offset: float | None = None, duty: float = 0.5, phase: float = 0.0, ) -> tuple[int, None | bytes]: """ 设置标准波形输出。 本函数根据指定参数在软件中生成 **单周期波形数据**,并写入设备任意波形缓冲区,由 DAC 循环播放该波形。 支持波形类型: - ``"dc"`` :直流电压 - ``"sine"`` :正弦波 - ``"square"`` :方波 - ``"triangle"`` :三角波 - ``"sawtooth"`` :锯齿波(可调坡度) :param waveform: 波形类型名称(大小写不敏感)。 :type waveform: str :param vpp: 峰峰值电压(Volt)。对于非 DC 波形,振幅 `amplitude = vpp / 2`。 :type vpp: float :param frequency: 输出频率(Hz)。对 ``"dc"`` 可省略,其他波形必须指定。 支持数值或带单位字符串(例如 `1e6`、`"1MHz"`、`"100kHz"`)。 :type frequency: float, str, or None, optional :param offset: 直流偏置电压(Volt)。若为 `None`,默认 `offset = vpp / 2`,保证波形非负。 :type offset: float or None, optional :param duty: 波形占空比 / 坡度参数(0~1)。 - ``square``:高电平占周期比例 - ``sawtooth``:上升段占周期比例 - 接近 1 → 慢上升 + 快下降(标准锯齿波) - 0.5 → 对称三角波 - 接近 0 → 快速上升 + 慢下降 - 对 ``sine`` 与 ``triangle`` 无作用 :type duty: float, optional :default duty: 0.5 :param phase: 初始相位(弧度 rad)。数学定义:`2π rad = 1 个周期` :type phase: float, optional :default phase: 0.0 :return: 执行状态码与设备返回数据。 :rtype: tuple[int, None | bytes] """ waveform = waveform.lower() if waveform == "dc": wave = np.full(1, vpp, dtype=float) return self.set_waveform_arbitrary(wave.tolist()) if frequency is None: self._logger.error("frequency must be specified for non-DC waveform") return self.NON_PROTOCOL_ERROR, None frequency = self._parse_frequency(frequency) if frequency < self._wave_fs / self._buffer_depth: self._logger.error( "The frequency is too low to generate a valid waveform. " f"Minimum frequency is {self._wave_fs / self._buffer_depth} Hz." ) return self.NON_PROTOCOL_ERROR, None elif frequency > self._wave_fs: self._logger.error( "The frequency is too high to generate a valid waveform. " f"Maximum frequency is {self._wave_fs} Hz." ) return self.NON_PROTOCOL_ERROR, None amplitude = vpp / 2 if offset is None: offset = amplitude n_samples = max(1, int(self._wave_fs * 1.0 / frequency)) t = np.arange(n_samples) / self._wave_fs if waveform == "sine": wave = amplitude * np.sin(2 * np.pi * frequency * t + phase) elif waveform == "square": phase_t = (frequency * t + phase / (2 * np.pi)) % 1.0 wave = np.where(phase_t < duty, amplitude, -amplitude) elif waveform == "triangle": phase_t = (frequency * t + phase / (2 * np.pi)) % 1.0 wave = 4 * amplitude * np.abs(phase_t - 0.5) - amplitude elif waveform == "sawtooth": slope = np.clip(duty, 1e-6, 1 - 1e-6) phase_t = (frequency * t + phase / (2 * np.pi)) % 1.0 wave = np.where( phase_t < slope, -amplitude + (2 * amplitude / slope) * phase_t, amplitude - (2 * amplitude / (1 - slope)) * (phase_t - slope), ) else: raise ValueError(f"Unsupported waveform: {waveform}") wave = wave + offset v_min = float(np.min(wave)) if v_min < 0: raise ValueError("Generated waveform contains negative voltage. " "Increase offset or reduce amplitude.") return self.set_waveform_arbitrary(wave.tolist())
[docs] def set_waveform_sine( self, frequency: float | str, *, vpp: float = 1.0, phase: float = 0.0, offset: float | None = None, ): """ 设置正弦波输出。 :param frequency: 输出频率(Hz)。支持数值或带单位的字符串形式,例如 `1e6`、`1m`、`"1MHz"`、`"10kHz"`。 :type frequency: float or str :param vpp: 峰峰值电压(Volt)。 :type vpp: float, optional :default vpp: 1.0 :param phase: 初始相位(弧度 rad)。 :type phase: float, optional :default phase: 0.0 :param offset: 直流偏置电压(Volt)。若为 `None`,默认自动设置为 `vpp / 2` 以避免负电压。 :type offset: float or None, optional :return: 设备返回状态与响应数据。 :rtype: tuple[int, None | bytes] """ return self.set_waveform_standard( "sine", vpp=vpp, frequency=frequency, phase=phase, offset=offset, )
[docs] def set_waveform_square( self, frequency: float, *, duty: float = 0.5, vpp: float = 1.0, phase: float = 0.0, offset: float | None = None, ): """ 设置方波输出。 :param frequency: 输出频率(Hz)。 :type frequency: float :param duty: 占空比(0~1)。 - 0.5 表示标准 50% 方波 - 0.2 表示高电平占 20% 周期 :type duty: float, optional :default duty: 0.5 :param vpp: 峰峰值电压(Volt)。 :type vpp: float, optional :default vpp: 1.0 :param phase: 初始相位(弧度 rad)。 :type phase: float, optional :default phase: 0.0 :param offset: 直流偏置电压(Volt)。 :type offset: float or None, optional :return: 设备返回状态与响应数据。 :rtype: tuple[int, None | bytes] """ return self.set_waveform_standard( "square", frequency=frequency, vpp=vpp, duty=duty, phase=phase, offset=offset, )
[docs] def set_waveform_triangle( self, frequency: float, *, vpp: float = 1.0, phase: float = 0.0, offset: float | None = None, ): """ 设置三角波输出。 :param frequency: 输出频率(Hz)。 :type frequency: float :param vpp: 峰峰值电压(Volt)。 :type vpp: float, optional :default vpp: 1.0 :param phase: 初始相位(弧度 rad)。 :type phase: float, optional :default phase: 0.0 :param offset: 直流偏置电压(Volt)。 :type offset: float or None, optional :return: 设备返回状态与响应数据。 :rtype: tuple[int, None | bytes] """ return self.set_waveform_standard( "triangle", frequency=frequency, vpp=vpp, phase=phase, offset=offset, )
[docs] def set_waveform_sawtooth( self, frequency: float, *, vpp: float = 1.0, slope: float = 1.0, phase: float = 0.0, offset: float | None = None, ): """ 设置锯齿波输出。 :param frequency: 输出频率(Hz)。 :type frequency: float :param vpp: 峰峰值电压(Volt)。 :type vpp: float, optional :default vpp: 1.0 :param slope: 上升段占整个周期的比例(0~1)。 - 1.0 → 标准锯齿波(慢上升,快下降) - 0.5 → 对称三角波 - 接近 0 → 快速上升,慢下降 :type slope: float, optional :default slope: 1.0 :param phase: 初始相位(弧度 rad)。 :type phase: float, optional :default phase: 0.0 :param offset: 直流偏置电压(Volt)。 :type offset: float or None, optional :return: 设备返回状态与响应数据。 :rtype: tuple[int, None | bytes] """ return self.set_waveform_standard( "sawtooth", frequency=frequency, vpp=vpp, duty=slope, phase=phase, offset=offset, )
[docs] def set_waveform_dc(self, voltage: float): """ 设置直流电压输出。 :param voltage: 输出直流电压(Volt)。电压值必须在设备允许的输出范围内,且不得为负值。 :type voltage: float :return: 设备返回状态与响应数据。 :rtype: tuple[int, None | bytes] """ return self.set_waveform_standard( "dc", frequency=1.0, vpp=voltage, offset=voltage, )
[docs] def set_waveform_from_file(self, file_path: str) -> tuple[int, None | bytes]: """ 从文件加载波形数据并设置输出波形。 文件内容应为电压采样点序列(单位:伏特 V), 支持逗号分隔或逐行排列的数值格式。 采样点数量最大不得超过 160 个。 :param file_path: 包含波形数据的文本文件路径。 文件中每个数值表示一个电压采样点(单位 V)。 :type file_path: str :return: 设备返回状态与响应数据。 :rtype: tuple[int, None | bytes] """ import os import numpy as np # ---------- 文件检查 ---------- if not os.path.exists(file_path): self._logger.error(f"Waveform file not found: {file_path}") return self.NON_PROTOCOL_ERROR, None try: with open(file_path, encoding="utf-8") as f: content = f.read() except Exception as e: self._logger.error(f"Failed to read waveform file: {e}") return self.NON_PROTOCOL_ERROR, None # ---------- 解析数据 ---------- # 支持: # 1,2,3 # 1 2 3 # 1\n2\n3 tokens = content.replace(",", " ").split() if not tokens: self._logger.error("Waveform file is empty.") return self.NON_PROTOCOL_ERROR, None try: wave = np.array([float(v) for v in tokens], dtype=float) except ValueError: self._logger.error("Waveform file contains non-numeric values.") return self.NON_PROTOCOL_ERROR, None max_points = 160 if len(wave) > max_points: self._logger.error(f"Waveform exceeds maximum length ({max_points} samples).") return self.NON_PROTOCOL_ERROR, None if not np.all(np.isfinite(wave)): self._logger.error("Waveform contains NaN or Inf.") return self.NON_PROTOCOL_ERROR, None v_min = float(np.min(wave)) if v_min < 0: self._logger.error("Waveform contains negative voltage. " "All samples must be >= 0 V.") return self.NON_PROTOCOL_ERROR, None return self.set_waveform_arbitrary(wave.tolist())
[docs] def get_voltage_a0(self): """ 获取测量点a0电压 """ status, res = self.register_read(base_address=0x43C10000, offset=0x1E70) if status != protocol.STATUS_OK: return None return status, round(int.from_bytes(res, byteorder="big") / 16 / 4096 * 3.33, 2)
[docs] def get_voltage_a1(self): """ 获取测量点a1电压 """ status, res = self.register_read(base_address=0x43C10000, offset=0x1E40) if status != protocol.STATUS_OK: return None return status, round(int.from_bytes(res, byteorder="big") / 16 / 4096 * 3.33, 2)
[docs] def set_pwm(self, freq, duty_cycle): """ 设置PWM输出, GP29引脚 :param freq: PWM频率,单位Hz :param duty_cycle: PWM占空比,0-1之间的小数 """ period = struct.pack(">I", round((freq * 2**32) / 100_000_000)) duty = struct.pack(">I", round((1 - duty_cycle) * (2**32) - 1)) status, res = self.register_write(base_address=0x43C10000, offset=0x1838, data=period) if status != protocol.STATUS_OK: return status, res status, res = self.register_write(base_address=0x43C10000, offset=0x183C, data=duty) if status != protocol.STATUS_OK: return status, res return protocol.STATUS_OK, None
[docs] def get_switch_status(self, switch_id: str) -> tuple[int, None | tuple[int, int]]: """ 获取开关状态 :param switch_id: 开关ID,'pl'或'ps' :type switch_id: str :return: (status, (sw1, sw2)),其中status为协议状态码, sw1和sw2分别为开关的两个状态位, 0表示开关关闭,1表示开关打开 """ switch_id = switch_id.lower() if switch_id == "pl": offset = 0x193C elif switch_id == "ps": offset = 0x193C else: self._logger.error(f"switch_id {switch_id} not supported") return self.NON_PROTOCOL_ERROR, None status, res = self.register_read(base_address=0x43C10000, offset=offset) if status != protocol.STATUS_OK: return status, res res = struct.unpack(">I", res)[0] sw1, sw2 = ((res >> i) & 1 for i in (0, 1)) return status, (sw1, sw2)
[docs] def get_switch_status_pl(self) -> tuple[int, None | tuple[int, int]]: """ 获取PL开关状态 :return: (status, (sw1, sw2)),其中status为协议状态码, sw1和sw2分别为PL开关的两个状态位, 0表示开关关闭,1表示开关打开 """ return self.get_switch_status("pl")
[docs] def get_switch_status_ps(self) -> tuple[int, None | tuple[int, int]]: """ 获取PS开关状态 :return: (status, (sw1, sw2)),其中status为协议状态码, sw1和sw2分别为PS开关的两个状态位, 0表示开关关闭,1表示开关打开 """ return self.get_switch_status("ps")
def _load_image(self, image_path: str, fit: bool = True) -> np.ndarray | None: """ 读取图片并转换为 RGB888 数组。 :param image_path: 图片路径。 :type image_path: str :return: 转换后的 RGB888 数组。 - 若 `should_resize=True`:形状为 (64, 64, 3) - 若 `should_resize=False`:形状为 (H, W, 3),取决于原图 - 出错时返回 None :rtype: np.ndarray or None """ target_size = 64 try: with Image.open(image_path) as img: # 1. 强制转换为 RGB 模式 (处理 RGBA, 灰度等) rgb_img = img.convert("RGB") width, height = rgb_img.size processing_img = rgb_img if fit: # 无论原图大小,统一缩放至 64x64 # LANCZOS 算法在缩小和放大时都能提供较好的质量 processing_img = rgb_img.resize((target_size, target_size), Image.Resampling.LANCZOS) # print(f"Image resized from {width}x{height} to {target_size}x{target_size}") else: # 保持原图 # print(f"Resize disabled, kept original size {width}x{height}.") pass # 2. 转换为 NumPy 数组 rgb_array = np.array(processing_img, dtype=np.uint8) # 3. 验证形状 (仅在开启缩放时验证) if fit: if rgb_array.shape != (64, 64, 3): self._logger.warning(f"Expected shape (64, 64, 3), got {rgb_array.shape}") return rgb_array except FileNotFoundError: self._logger.error(f"Image {image_path} not exist") return None except Exception as e: self._logger.error(f"Load image failed: {e}") return None
[docs] def set_led_content(self, t: int, x: int, y: int, c: bytes, w: int = None) -> None: """ 设置LED显示内容 :param t: 显示内容类型,0表示文本,1表示图片 :param x: 显示内容的x坐标,单位为像素 :param y: 显示内容的y坐标,单位为像素 :param c: 显示内容,文本类型为UTF-8编码的字符串,图片类型为RGB888格式的字节数组 :param w: 显示内容的宽度,单位为像素,文本类型为内容的像素宽度,图片类型为图片的宽度 """ if t == 0: payload = struct.pack(">Bii", t, x, y) else: payload = struct.pack(">BiiI", t, x, y, w) payload += c self.send_with_command(command=0x400, payload=payload)
[docs] def set_led_text(self, text: str, x: int = 0, y: int = 0, auto_wrap: bool = True) -> None: """ 设置LED显示文本,仅支持英文字符显示。字符宽度为5像素,高度为6像素,字符间距为1像素。 屏幕分辨率为64x64像素,坐标原点在屏幕左上角,x坐标向右增加,y坐标向下增加。 :param text: 显示文本内容 :param x: 显示文本的x坐标,单位为像素 :param y: 显示文本的y坐标,单位为像素。 注意,坐标表示文本基线的位置,即文本的底部位置。所以如果要显示完整的一行文字,则要设置y为6(字符高度为6) """ if auto_wrap: max_chars_per_line = (64 - x) // 6 # 每行最多显示的字符数(5像素字符宽度 + 1像素间距) lines = [] current_line = "" for char in text: if len(current_line) < max_chars_per_line: current_line += char else: lines.append(current_line) current_line = char if current_line: lines.append(current_line) text = "\n".join(lines) self.set_led_content(0, x, y, text.encode("utf-8"))
[docs] def set_led_image(self, image_path: str, x: int = 0, y: int = 0, fit: bool = True) -> None: """ 设置LED显示图片 :param image_path: 图片文件路径,支持常见格式如PNG、JPEG等 :param x: 显示图片的x坐标,单位为像素 :param y: 显示图片的y坐标,单位为像素 :param fit: 是否强制缩放图片至64x64像素 - True: 无论原图大小,统一缩放至64x64。 - False: 保持原图尺寸,不进行缩放。 """ img_array = self._load_image(image_path, fit) if img_array is not None: h, w, _ = img_array.shape self.set_led_content(1, x, y, img_array.tobytes(), w)
def _get_gpio_offset_and_index(self, pin_id: str): pin_id = pin_id.upper() if pin_id.startswith("GP"): pin_index = self._gpio_map["r"]["index"].get(pin_id, None) output_offset = self._gpio_map["r"]["output"] input_offset = self._gpio_map["r"]["input"] mode = self._gpio_map["r"]["mode"] elif pin_id.startswith("A") or pin_id.startswith("IO"): pin_index = self._gpio_map["a"]["index"].get(pin_id, None) output_offset = self._gpio_map["a"]["output"] input_offset = self._gpio_map["a"]["input"] mode = self._gpio_map["a"]["mode"] else: self._logger.error(f"pin_id {pin_id} not supported") return self.NON_PROTOCOL_ERROR, None return pin_index, output_offset, input_offset, mode
[docs] def digital_read(self, pin_id: str): """ 读取数字IO引脚电平状态,高电平需要大于 1.4v :param pin_id: 引脚ID, 支持 GP0-GP7, GP21-GP27, A, A2-A5, IO2-IO9 :type pin_id: str :return: Cracker设备响应状态和接收到的数据:(status, response)。 :rtype: tuple[int, bytes | None | int] """ pin_index, _, offset, _ = self._get_gpio_offset_and_index(pin_id) if pin_index is None or offset is None: self._logger.error(f"pin_id {pin_id} not supported") return self.NON_PROTOCOL_ERROR, None s, r = self.register_read(base_address=self._BASE_ADDRESS, offset=offset) if s != protocol.STATUS_OK: self._logger.error(f"Get GPIO data failed, status: {s}") return s, r else: return s, self._get_bit_stream_lsb(r, pin_index)
[docs] def digital_write(self, pin_id: str, value: int): """ 设置数字IO引脚电平状态 :param pin_id: 引脚ID, 支持 GP0-GP7, GP21-GP27, A, A2-A5, IO2-IO9 :type pin_id: str :param value: 引脚电平状态,1:高电平,0: :type value: int :return: Cracker设备响应状态和接收到的数据:(status, response)。 :rtype: tuple[int, bytes | None] """ pin_index, offset, _, _ = self._get_gpio_offset_and_index(pin_id) s, r = self.register_read(base_address=self._BASE_ADDRESS, offset=offset) if s != protocol.STATUS_OK: self._logger.error(f"Get old GPIO data failed, status: {s}") return s, r else: gpio_data = int.from_bytes(r, byteorder="big") if value: gpio_data |= 1 << pin_index else: gpio_data &= ~(1 << pin_index) gpio_data_bytes = gpio_data.to_bytes(len(r), byteorder="big") return self.register_write(base_address=self._BASE_ADDRESS, offset=offset, data=gpio_data_bytes)
[docs] def digital_pin_mode(self, pin_id: str, mode: int | str): """ 设置数字IO引脚工作模式 :param pin_id: 引脚ID, 支持 GP0-GP7, GP21-GP27, A, A2-A5, IO2-IO9 :type pin_id: str :param mode: 引脚工作模式,1:输入模式,0:输出模式,或者 "INPUT"、"OUTPUT" :type mode: int | str :return: Cracker设备响应状态和接收到的数据:(status, response)。 :rtype: tuple[int, bytes | None] """ if isinstance(mode, str): if mode.upper() == "INPUT": mode = 1 elif mode.upper() == "OUTPUT": mode = 0 else: raise ValueError("Invalid mode string, must be 'INPUT' or 'OUTPUT'") pin_index, _, _, offset = self._get_gpio_offset_and_index(pin_id) s, r = self.register_read(base_address=self._BASE_ADDRESS, offset=offset) if s != protocol.STATUS_OK: self._logger.error(f"Get old GPIO mode failed, status: {s}") return s, r else: gpio_dir = int.from_bytes(r, byteorder="big") if mode: gpio_dir |= 1 << pin_index else: gpio_dir &= ~(1 << pin_index) gpio_dir_bytes = gpio_dir.to_bytes(len(r), byteorder="big") return self.register_write(base_address=self._BASE_ADDRESS, offset=offset, data=gpio_dir_bytes)