# Copyright 2024 CrackNuts. All rights reserved.
# Copyright 2024 CrackNuts. All rights reserved.
import os
import re
import struct
import typing
import pandas as pd
from scipy.interpolate import interp1d
import importlib.util
from cracknuts.cracker import protocol
from cracknuts.cracker.cracker_basic import ConfigBasic, connection_status_check
from cracknuts.cracker.cracker_s1 import ConfigS1, CrackerS1
from cracknuts.cracker.protocol import Command
wave_80m = [
3.2,
0.0
]
wave_40m = [
3.2, 3.2,
0.0, 0.0
]
wave_20m = [
3.2, 3.2, 3.2, 3.2,
0.0, 0.0, 0.0, 0.0,
]
wave_16m = [
3.2, 3.2, 3.2, 3.2, 3.2,
0.0, 0.0, 0.0, 0.0, 0.0
]
wave_10m = [
3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
]
wave_8m = [
3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
]
wave_4m = [
3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2, 3.2,
0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
]
[docs]
class ConfigG1(ConfigS1):
[docs]
def __init__(self):
super().__init__()
self.glitch_vcc_arm = False
self.glitch_vcc_normal = 3.3
self.glitch_vcc_config_wait = 1
self.glitch_vcc_config_level = self.glitch_vcc_normal
self.glitch_vcc_config_count = 1
self.glitch_vcc_config_delay = 0
self.glitch_vcc_config_repeat = 1
self.glitch_gnd_arm = False
self.glitch_gnd_normal = 0
self.glitch_gnd_config_wait = 1
self.glitch_gnd_config_level = self.glitch_gnd_normal
self.glitch_gnd_config_count = 1
self.glitch_gnd_config_delay = 0
self.glitch_gnd_config_repeat = 1
self.glitch_clock_arm: bool = False
self.glitch_clock_len_normal: int = len(wave_8m)
self.glitch_clock_wave_normal: list[float] = wave_8m # 默认时钟8mhz
self.glitch_clock_config_len_glitch: int = len(wave_4m)
self.glitch_clock_config_wave_glitch: list[float] = wave_4m # 默认glitch示例时钟4mhz
self.glitch_clock_config_wait: int = 1
self.glitch_clock_config_delay: int = 1
self.glitch_clock_config_repeat: int = 1
self.glitch_clock_enable: bool = True
class CommandG1(Command):
GLITCH_VCC_ARM = 0x0310
GLITCH_VCC_RESET = 0x0311
GLITCH_VCC_FORCE = 0x0312
GLITCH_VCC_CONFIG = 0x0313
GLITCH_VCC_NORMAL = 0x0314
GLITCH_GND_ARM = 0x0320
GLITCH_GND_RESET = 0x0321
GLITCH_GND_FORCE = 0x0322
GLITCH_GND_CONFIG = 0x0323
GLITCH_GND_NORMAL = 0x0324
GLITCH_CLK_ARM = 0x0330
GLITCH_CLK_RESET = 0x0331
GLITCH_CLK_FORCE = 0x0332
GLITCH_CLK_CONFIG = 0x0333
GLITCH_CLK_LEN_NORMAL = 0x0334
GLITCH_CLK_WAVE_NORMAL = 0x0335
GLITCH_CLK_LEN_GLITCH = 0x0336
GLITCH_CLK_WAVE_GLITCH = 0x0337
def _build_interp_func(map_path) -> typing.Tuple[interp1d, interp1d]:
df = pd.read_csv(map_path)
x = df["code"].map(lambda v: int(v, 16)).values
y = df["voltage"].values
return interp1d(x, y, kind="linear", fill_value="extrapolate"), interp1d(y, x, kind="linear", fill_value="extrapolate")
_voltage_map_dir = os.path.join(os.path.dirname(importlib.util.find_spec("cracknuts").origin), "glitch", "voltage_map")
_vcc_interp_func_cv, _vcc_interp_func_vc = _build_interp_func(os.path.join(_voltage_map_dir, "vcc.csv"))
_gnd_interp_func_cv, _gnd_interp_func_vc = _build_interp_func(os.path.join(_voltage_map_dir, "gnd.csv"))
_clk_interp_func_cv, _clk_interp_func_vc = _build_interp_func(os.path.join(_voltage_map_dir, "clk.csv"))
SYS_CLK_PERIOD_S = 6.25e-9 # g1 sys clock period in seconds
def get_clock_from_wave_length(wave_len: int) -> float:
return 1 / (wave_len * SYS_CLK_PERIOD_S) / 1000
[docs]
class CrackerG1(CrackerS1):
[docs]
def __init__(
self,
address: tuple | str | None = None,
bin_server_path: str | None = None,
bin_bitstream_path: str | None = None,
operator_port: int = None,
):
super().__init__(address, bin_server_path, bin_bitstream_path, operator_port)
self._config: ConfigG1 = self._config
@staticmethod
def _get_dac_code_from_voltage(voltage: float, interp_func: interp1d) -> int:
dac_code = int(round(interp_func(voltage * 1000).item()))
return 0 if dac_code < 0 else 0x03FF if dac_code > 0x03FF else dac_code
@staticmethod
def _get_voltage_from_dac_code(dac_code: int, interp_func: interp1d) -> float:
voltage = interp_func(dac_code).item() / 1000
return voltage
[docs]
def glitch_vcc_arm(self):
"""
设置glitch VCC 为armed状态。
"""
self._glitch_vcc_arm(True)
# def glitch_vcc_disable(self):
# self._glitch_vcc_enable(False)
def _glitch_vcc_arm(self, enable: bool):
payload = struct.pack(">?", enable)
self._logger.debug(f"glitch_vcc_enable payload: {payload.hex()}")
status, res = self.send_with_command(CommandG1.GLITCH_VCC_ARM, payload=payload)
if status != protocol.STATUS_OK:
return status, None
else:
return status, res
[docs]
def glitch_vcc_config(self, wait: int, level: int | float | str, length: int, delay: int, repeat: int):
"""
配置glitch ::
v_normal────┬───────────────────┐ ┌────────────┐ ┌───────
| wait | count | delay | count |
| | | | |
| └───────┘ └───────┘ <--------- level voltage
| └────────────────────┘
Trigger repeat
:param wait: glitch产生前等待时间(时钟个数)
:type wait: int
:param level: Glitch DAC电压值
:type level: int
:param length: Glitch持续时间, 单位 10ns
:type delay: int
:param repeat: Glitch重复次数
:type repeat: int
:return: Cracker设备响应状态和接收到的数据:(status, response)。
:rtype: tuple[int, bytes | None]
"""
level = self._parse_voltage(level)
level_code = self._get_dac_code_from_voltage(level, _vcc_interp_func_vc)
payload = struct.pack(">IIIII", wait, level_code, length, delay, repeat)
self._logger.debug(f"glitch_vcc_config payload: {payload.hex()}")
status, res = self.send_with_command(CommandG1.GLITCH_VCC_CONFIG, payload=payload)
if status == protocol.STATUS_OK:
self._config.glitch_vcc_config_wait = wait
self._config.glitch_vcc_config_level = level
self._config.glitch_vcc_config_count = length
self._config.glitch_vcc_config_delay = delay
self._config.glitch_vcc_config_repeat = repeat
return status, res
[docs]
def glitch_vcc_reset(self):
"""
重置glitch VCC配置。
"""
self._logger.debug(f"glitch_vcc_reset payload: {None}")
status, res = self.send_with_command(CommandG1.GLITCH_VCC_RESET, payload=None)
if status != protocol.STATUS_OK:
return status, None
else:
return status, res
[docs]
def glitch_vcc_force(self):
"""
强制触发glitch VCC。
"""
self._logger.debug(f"glitch_vcc_force payload: {None}")
status, res = self.send_with_command(CommandG1.GLITCH_VCC_FORCE, payload=None)
if status != protocol.STATUS_OK:
return status, None
else:
return status, res
[docs]
def nut_voltage(self, voltage: float | str | int) -> tuple[int, None]:
return self.glitch_vcc_normal(voltage)
[docs]
def glitch_vcc_normal(self, voltage: float | str | int) -> tuple[int, None]:
"""
设置glitch VCC正常电压值。
"""
voltage = self._parse_voltage(voltage)
dac_code = self._get_dac_code_from_voltage(voltage, _vcc_interp_func_vc)
payload = struct.pack(">I", dac_code)
self._logger.debug(f"glitch_vcc_normal payload: {payload}")
status, res = self.send_with_command(CommandG1.GLITCH_VCC_NORMAL, payload=payload)
if status == protocol.STATUS_OK:
self._config.glitch_vcc_normal = voltage
self._config.nut_voltage = voltage
return status, res
def _parse_voltage(self, voltage: float | str | int):
if isinstance(voltage, str):
m = re.match(r"^(\d+(?:\.\d+)?)([mM]?[vV])?$", voltage)
if not m:
self._logger.error(
"Input format error: " "it should be a number or a string and end with a unit in V or mV."
)
return self.NON_PROTOCOL_ERROR, None
voltage = float(m.group(1))
if m.group(2):
unit = m.group(2)
else:
unit = "v"
if unit.lower() == "mv":
voltage = voltage / 1000
else:
voltage = float(voltage)
return voltage
[docs]
def glitch_gnd_arm(self):
self._glitch_gnd_arm(True)
# def glitch_gnd_disable(self):
# self._glitch_gnd_arm(False)
def _glitch_gnd_arm(self, enable: bool):
payload = struct.pack(">?", enable)
self._logger.debug(f"glitch_gnd_enable payload: {payload.hex()}")
status, res = self.send_with_command(CommandG1.GLITCH_GND_ARM, payload=payload)
if status != protocol.STATUS_OK:
return status, None
else:
return status, res
[docs]
def glitch_gnd_config(self, wait: int, level: int, length: int, delay: int, repeat: int):
level = self._parse_voltage(level)
level_code = self._get_dac_code_from_voltage(level, _gnd_interp_func_vc)
payload = struct.pack(">IIIII", wait, level_code, length, delay, repeat)
self._logger.debug(f"glitch_gnd_config payload: {payload.hex()}")
status, res = self.send_with_command(CommandG1.GLITCH_GND_CONFIG, payload=payload)
if status == protocol.STATUS_OK:
self._config.glitch_gnd_config_wait = wait
self._config.glitch_gnd_config_level = level
self._config.glitch_gnd_config_count = length
self._config.glitch_gnd_config_delay = delay
self._config.glitch_gnd_config_repeat = repeat
return status, res
[docs]
def glitch_gnd_reset(self):
self._logger.debug(f"glitch_vcc_reset payload: {None}")
status, res = self.send_with_command(CommandG1.GLITCH_GND_RESET, payload=None)
if status != protocol.STATUS_OK:
return status, None
else:
return status, res
[docs]
def glitch_gnd_force(self):
self._logger.debug(f"glitch_vcc_force payload: {None}")
status, res = self.send_with_command(CommandG1.GLITCH_GND_FORCE, payload=None)
if status != protocol.STATUS_OK:
return status, None
else:
return status, res
[docs]
def glitch_gnd_normal(self, voltage: float | str | int) -> tuple[int, None]:
dac_code = self._get_dac_code_from_voltage(self._parse_voltage(voltage), _gnd_interp_func_vc)
payload = struct.pack(">I", dac_code)
self._logger.debug(f"glitch_gnd_normal payload: {payload}")
status, res = self.send_with_command(CommandG1.GLITCH_GND_NORMAL, payload=payload)
if status != protocol.STATUS_OK:
return status, None
else:
return status, res
def _get_config_bytes_format(self) -> tuple[dict[str, str], ConfigBasic]:
bytes_format, config = super()._get_config_bytes_format()
bytes_format.update(
{
"glitch_vcc_arm": "?",
"glitch_vcc_normal": "I",
"glitch_vcc_config_wait": "I",
"glitch_vcc_config_level": "I",
"glitch_vcc_config_count": "I",
"glitch_vcc_config_delay": "I",
"glitch_vcc_config_repeat": "I",
"glitch_gnd_arm": "?",
"glitch_gnd_normal": "I",
"glitch_gnd_config_wait": "I",
"glitch_gnd_config_level": "I",
"glitch_gnd_config_count": "I",
"glitch_gnd_config_delay": "I",
"glitch_gnd_config_repeat": "I",
"glitch_clock_arm": "?",
"glitch_clock_len_normal": "I",
"glitch_clock_wave_normal_fake": "I", # todo 这里无效,后续在server端删除
"glitch_clock_config_len_glitch": "I",
"glitch_clock_config_wave_glitch_fake": "I", # todo 这里无效,后续在server端删除
"glitch_clock_config_wait": "I", # todo 这里临时注释,后续在server端添加
"glitch_clock_config_delay": "I",
"glitch_clock_config_repeat": "I",
# "glitch_clock_enable": "?", # todo 这里临时注释,后续在server端添加
}
)
config = ConfigG1()
return bytes_format, config
def _parse_config_special_case(self, k, v):
if k in ("glitch_vcc_normal", "glitch_vcc_config_level"):
v = round(self._get_voltage_from_dac_code(v, _vcc_interp_func_cv), 1)
elif k in ("glitch_gnd_normal", "glitch_gnd_config_level"):
v = round(self._get_voltage_from_dac_code(v, _gnd_interp_func_cv), 1)
return v
[docs]
@connection_status_check
def get_current_config(self) -> ConfigG1 | None:
cur_cfg = typing.cast(ConfigG1, super().get_current_config())
# vcc
cur_cfg.nut_voltage = cur_cfg.glitch_vcc_normal
# clk normal
clk_rate, clk_wave = self._get_clock_from_wave_len(cur_cfg.glitch_clock_len_normal)
cur_cfg.nut_clock = round(clk_rate)
cur_cfg.glitch_clock_wave_normal = clk_wave
# clk normal enable
cur_cfg.nut_clock_enable = self._get_cracker_nut_clock_enabled()
cur_cfg.glitch_clock_enable = cur_cfg.nut_clock_enable
# clk glitch
glitch_clk_rate, glitch_clk_wave = self._get_clock_from_wave_len(cur_cfg.glitch_clock_config_len_glitch)
cur_cfg.glitch_clock_config_wave_glitch = glitch_clk_wave
return cur_cfg
# def _get_glitch_normal_clock(self) -> tuple[float, list[float]]:
# s, r = self.register_read(base_address=0x43c10000, offset=0x1D0)
#
# if s != protocol.STATUS_OK:
# return 0.0, []
# try:
# wave_len = struct.unpack('>I', r)[0]
# return self._get_clock_from_wave_len(wave_len)
# except struct.error as e:
# self._logger.error(f"Unpack nut clock failed: {e}")
# return 0.0, []
#
# def _get_glitch_clock(self) -> tuple[float, list[float]]:
# s, r = self.register_read(base_address=0x43c10000, offset=0x1DC)
#
# if s != protocol.STATUS_OK:
# return 0.0, []
# try:
# wave_len = struct.unpack('>I', r)[0]
# return self._get_clock_from_wave_len(wave_len)
# except struct.error as e:
# self._logger.error(f"Unpack nut clock failed: {e}")
# return 0.0, []
@staticmethod
def _get_clock_from_wave_len(wave_len: int) -> tuple[float, list[float]]:
"""
获取cracker当前nut时钟频率,单位kHz。
:return: nut时钟频率,单位kHz
:rtype: float
"""
# 这里在cracker中只能拿到寄存器中的波形长度信息,没有波形信息,所有只根据对半将波形分为高低电平来计算频率
return get_clock_from_wave_length(wave_len), [3.2 if i < wave_len / 2 else 0.0 for i in range(wave_len)]
def _get_cracker_nut_vcc_voltage(self) -> float:
"""
获取cracker当前nut电压,单位V。
:return: nut电压,单位V
:rtype: float
"""
s, r = self.register_read(base_address=0x43c10000, offset=0x150)
if s != protocol.STATUS_OK:
return 0.0
try:
dac_code = struct.unpack('>I', r)[0]
return self._get_voltage_from_dac_code(dac_code, _vcc_interp_func_cv)
except struct.error as e:
self._logger.error(f"Unpack nut voltage failed: {e}")
return 0.0
def _get_cracker_nut_gnd_voltage(self) -> float:
"""
获取cracker当前nut电压,单位V。
:return: nut电压,单位V
:rtype: float
"""
s, r = self.register_read(base_address=0x43c10000, offset=0x190)
if s != protocol.STATUS_OK:
return 0.0
try:
dac_code = struct.unpack('>I', r)[0]
return self._get_voltage_from_dac_code(dac_code, _vcc_interp_func_cv)
except struct.error as e:
self._logger.error(f"Unpack nut voltage failed: {e}")
return 0.0
def _get_cracker_nut_clock_enabled(self) -> bool:
"""
获取cracker当前nut时钟是否enabled。
:return: nut时钟是否enabled
:rtype: bool
"""
s, r = self.register_read(base_address=0x43c10000, offset=0x1C4)
if s != protocol.STATUS_OK:
return False
try:
enabled = struct.unpack('>I', r)[0]
return enabled == 0
except struct.error as e:
self._logger.error(f"Unpack nut clock enabled failed: {e}")
return False
[docs]
def get_default_config(self) -> ConfigS1:
return ConfigG1()
[docs]
def write_config_to_cracker(self, config: ConfigG1):
self._osc_set_analog_all_channel_enable({0: config.osc_channel_0_enable, 1: config.osc_channel_1_enable})
self.osc_analog_gain(0, config.osc_channel_0_gain)
self.osc_analog_gain(1, config.osc_channel_1_gain)
self.osc_sample_length(config.osc_sample_length)
self.osc_sample_delay(config.osc_sample_delay)
self.osc_sample_clock(config.osc_sample_clock)
self.osc_sample_clock_phase(config.osc_sample_phase)
self.osc_trigger_source(config.osc_trigger_source)
self.osc_trigger_mode(config.osc_trigger_mode)
self.osc_trigger_edge(config.osc_trigger_edge)
self.osc_trigger_level(config.osc_trigger_edge_level)
self.spi_config(
config.nut_spi_speed,
config.nut_spi_cpol,
config.nut_spi_cpha,
config.nut_spi_csn_auto,
config.nut_spi_csn_delay,
)
self._spi_enable(config.nut_spi_enable)
self.uart_config(
config.nut_uart_baudrate,
config.nut_uart_bytesize,
config.nut_uart_parity,
config.nut_uart_stopbits,
)
self._uart_io_enable(config.nut_uart_enable)
self.i2c_config(config.nut_i2c_dev_addr, config.nut_i2c_speed)
self._i2c_enable(config.nut_i2c_enable)
## Clock glitching settings ...
# 要先设置时钟(或者说是 glitch clock normal )
# 然后设置电压(或者说是 glitch vcc normal)因为nut需要先有时钟后上电才能正常工作
self.glitch_clock_normal(config.glitch_clock_wave_normal)
self.glitch_clock_config(
config.glitch_clock_config_wave_glitch,
config.glitch_clock_config_wait,
config.glitch_clock_config_repeat,
config.glitch_clock_config_delay
)
self._nut_set_clock_enable(config.nut_clock_enable)
self.glitch_vcc_normal(config.glitch_vcc_normal)
self.glitch_vcc_config(
config.glitch_gnd_config_wait,
config.glitch_vcc_config_level,
config.glitch_vcc_config_count,
config.glitch_vcc_config_delay,
config.glitch_vcc_config_repeat,
)
self.glitch_gnd_normal(config.glitch_gnd_normal)
self.glitch_gnd_config(
config.glitch_gnd_config_wait,
config.glitch_gnd_config_level,
config.glitch_gnd_config_count,
config.glitch_gnd_config_delay,
config.glitch_gnd_config_repeat,
)
self._nut_set_enable(config.nut_enable)
# def set_glitch_test_params(self, param):
# self._glitch_test_params = param
#
# def get_glitch_test_params(self):
# return self._glitch_test_params
[docs]
def glitch_clock_normal(self, wave: list[float]) -> tuple[int, None|bytes]:
status, res = self.register_write(base_address=0x43c10000, offset=0x1D0, data=len(wave))
if status != protocol.STATUS_OK:
return status, res
for voltage in wave:
status, res = self.register_write(base_address=0x43c10000, offset=0x1D4, data=self._get_dac_code_from_voltage(voltage, _clk_interp_func_vc))
if status != protocol.STATUS_OK:
return status, res
self._config.glitch_clock_wave_normal = wave
return protocol.STATUS_OK, None
def _glitch_clock_normal_enable(self, enable: bool):
return self.register_write(base_address=0x43c10000, offset=0x1C4, data=0 if enable else 1)
[docs]
def glitch_clock_normal_enable(self):
return self._glitch_clock_normal_enable(True)
[docs]
def glitch_clock_normal_disable(self):
return self._glitch_clock_normal_enable(False)
[docs]
def glitch_clock_config(self, wave: list[float], wait: int, delay: int, repeat: int) -> tuple[int, None|bytes]:
status, res = self.register_write(base_address=0x43c10000, offset=0x1DC, data=len(wave))
if status != protocol.STATUS_OK:
return status, res
for voltage in wave:
status, res = self.register_write(base_address=0x43c10000, offset=0x1E0, data=self._get_dac_code_from_voltage(voltage, _clk_interp_func_vc))
if status != protocol.STATUS_OK:
return status, res
status, res = self.register_write(base_address=0x43c10000, offset=0x1D8, data=wait)
if status != protocol.STATUS_OK:
return status, res
self.register_write(base_address=0x43c10000, offset=0x1E4, data=delay)
if status != protocol.STATUS_OK:
return status, res
self.register_write(base_address=0x43c10000, offset=0x1E8, data=repeat)
if status != protocol.STATUS_OK:
return status, res
self._config.glitch_clock_config_wave_glitch = wave
self._config.glitch_clock_config_len_glitch = len(wave)
self._config.glitch_clock_config_wait = wait
self._config.glitch_clock_config_repeat = repeat
self._config.glitch_clock_config_delay = delay
return protocol.STATUS_OK, None
[docs]
def glitch_clock_arm(self):
return self.register_write(base_address=0x43c10000, offset=0X1C8, data=1)
[docs]
def glitch_clock_reset(self):
return self._glitch_clock_normal_enable(False)
[docs]
def glitch_clock_force(self):
return self.register_write(base_address=0x43c10000, offset=0x1CC, data=1)
def _nut_set_clock_enable(self, enable: bool) -> tuple[int, None]:
status, res = self._glitch_clock_normal_enable(enable)
if status == protocol.STATUS_OK:
self._config.nut_clock_enable = enable
return status, res
[docs]
def nut_clock_freq(self, clock: int | str) -> tuple[int, None]:
"""
Set nut clock.
:param clock: Clock frequency in kHz or string with unit 'M', e.g., '8M' for 8000 kHz
:type clock: int | str
:return: The device response status
:rtype: tuple[int, None]
"""
if isinstance(clock, str):
m = re.match(r"^(\d+)[mM]$", clock)
if m:
clock = int(m.group(1)) * 1000
else:
self._logger.error("Input format error: it should be an integer or a string end with 'M'.")
return protocol.STATUS_ERROR, None
if clock == 80000:
wave = wave_80m
elif clock == 40000:
wave = wave_40m
elif clock == 20000:
wave = wave_20m
elif clock == 16000:
wave = wave_16m
elif clock == 10000:
wave = wave_10m
elif clock == 8000:
wave = wave_8m
elif clock == 4000:
wave = wave_4m
else:
self._logger.error(f"Unknown clock type: {clock}, only support 80M, 40M, 20M, 16M, 10M, 8M, 4M")
return protocol.STATUS_ERROR, None
status, res = self.glitch_clock_normal(wave)
if status == protocol.STATUS_OK:
self._config.nut_clock = clock
return status, res