Source code for cracknuts.cracker.cracker_basic

# Copyright 2024 CrackNuts. All rights reserved.

import abc
import functools
import importlib.util
import json
import logging
import os
import socket
import struct
import threading
from abc import ABC
from dataclasses import dataclass
from enum import Enum

import numpy as np

import cracknuts.utils.hex_util as hex_util
from cracknuts import logger
from cracknuts.cracker import protocol
from cracknuts.cracker.cracker_manager import CrackerManager
from cracknuts.cracker.ssh_client import SSHClient, _SSHCracker


[docs] class ConfigBasic:
[docs] def __init__(self): self.osc_channel_0_enable = False self.osc_channel_1_enable = True self.osc_channel_0_gain = 10 self.osc_channel_1_gain = 10 self.osc_sample_length = 1024 self.osc_sample_delay = 0 self.osc_sample_clock = 48000 self.osc_sample_phase = 0 self.osc_trigger_source = 0 self.osc_trigger_mode = 0 self.osc_trigger_edge = 0 self.osc_trigger_edge_level = 1
def __str__(self): return f"Config({', '.join([f'{k}: {v}' for k, v in self.__dict__.items() if not k.startswith('_')])})" def __repr__(self): return self.__str__()
[docs] def dump_to_json(self) -> str: """ Dump the configuration to a JSON string. :return: JSON string representation of the configuration. :rtype: str """ def enum_converter(obj): if isinstance(obj, Enum): return obj.value raise TypeError(f"Type {type(obj)} not serializable") return json.dumps( {k: v for k, v in self.__dict__.items() if not k.startswith("_")}, indent=4, default=enum_converter )
[docs] def load_from_json(self, json_str: str) -> "ConfigBasic": """ Load configuration from a JSON string. If a value in the JSON string is null, it will be skipped and the default configuration will be used. :param json_str: JSON string to load configuration from. :type json_str: str :return: This configuration instance with values updated from the JSON string. :rtype: ConfigBasic """ for k, v in json.loads(json_str).items(): if v is not None: self.__dict__[k] = v return self
# === Since the device does not support the channel enable function, # the information is temporarily saved to the host software. === @dataclass class _ChannelConfig: osc_channel_0_enable: bool = False osc_channel_1_enable: bool = True # === end ===
[docs] def connection_status_check(func): """ This is a decorator to check the connection status of the cracker device. user should use this directly. """ @functools.wraps(func) def wrapper(self: "CrackerBasic", *args, **kwargs): if not self._connection_status: print("Error: Cracker not connected") sig_result = func.__annotations__.get("return", None) if sig_result is tuple: return self.DISCONNECTED, None else: return None return func(self, *args, **kwargs) return wrapper
[docs] class CrackerBasic[T: ConfigBasic](ABC): NON_PROTOCOL_ERROR = -1 DISCONNECTED = -2 _cracker_manager: CrackerManager | None = None _BASE_ADDRESS = 0x43C10000 _OFFSET_GPIO_DATA = 0x0C00 _OFFSET_GPIO_DIR = 0x0C04 """ The basic device class, provides support for the `CNP` protocol, configuration management, firmware maintenance, and other basic operations. """
[docs] def __init__( self, address: tuple | str | None = None, bin_server_path: str | None = None, bin_bitstream_path: str | None = None, ): """ :param address: Cracker device address (ip, port) or "[cnp://]<ip>[:port]", If no configuration is provided here, it needs to be configured later by calling `set_address`, `set_ip_port`, or `set_uri`. :type address: str | tuple | None :param bin_server_path: The bin_server (firmware) file for updates; normally, the user should not specify this. :type bin_server_path: str | None :param bin_bitstream_path: The bin_bitstream (firmware) file for updates; normally, the user should not specify this. :type bin_bitstream_path: str | None """ self._logger_debug_payload_max_length = 512 self._logger_info_payload_max_length = 16 self._command_lock = threading.Lock() self._logger = logger.get_logger(self) self._socket: socket.socket | None = None self._connection_status = False self._bin_server_path = bin_server_path self._bin_bitstream_path = bin_bitstream_path self._server_address: tuple[str, int] | None = None self._ssh_cracker: _SSHCracker | None = None self.set_address(address) self._config = self.get_default_config() self._hardware_model = None self._installed_bin_server_path = None self._installed_bin_bitstream_path = None # === Since the device does not support the channel enable function, # the information is temporarily saved to the host software. === self._channel_enable = _ChannelConfig() # === end === # Cracker only supports sampling length in multiples of 1024, # record actual length to truncate waveform data later. self._osc_sample_length: int | None = None
@property def shell(self) -> SSHClient | None: """ SSH connection to the device. Use this to run commands or upload files on the device without configuring credentials yourself. Only generic SSH operations (``exec``, ``upload``, etc.) are exposed through this interface. """ return self._ssh_cracker
[docs] def change_ip(self, new_ip: str, new_mask: str = None, new_gateway: str = None) -> bool: """ Change the network address of the current target device. This instance method uses the IP currently configured on this object as the target device IP, then delegates the network change to the shared ``CrackerManager`` instance. If the device is currently connected and the IP change succeeds, the object updates its own target address and reconnects when possible. :param new_ip: New device IP address, or ``"dhcp"`` to enable DHCP. :type new_ip: str :param new_mask: Subnet mask or prefix length for a static IP change. If omitted, the device's current mask is reused. :type new_mask: str | None :param new_gateway: Default gateway for a static IP change. If omitted, the device's current gateway is reused. :type new_gateway: str | None :return: ``True`` if the device accepted the change request, otherwise ``False``. :rtype: bool """ if self._server_address is None: self._logger.error("Change ip failed: cracker address is not configured.") return False target_ip = self._server_address[0] was_connected = self._connection_status if was_connected: self.disconnect() ack = type(self).set_device_ip( target_ip=target_ip, new_ip=new_ip, mask=new_mask, gateway=new_gateway, ) if ack is None or ack.get("status") != 0: if was_connected: self.connect() return False if new_ip != "dhcp": self.set_ip_port(new_ip, self._server_address[1]) if was_connected: self.connect() else: self._logger.info("Device switched to DHCP. Re-discover the device before reconnecting.") return True
@classmethod def _create_device_manager( cls, broadcast: str = "255.255.255.255", scan_interval: float = 3.0, offline_grace: float = 15.0, device_timeout: float = 60.0, continuous: bool = False, ) -> CrackerManager: """ Return the shared ``CrackerManager`` instance for this class. The manager is created lazily on first use and reused by subsequent class-level device discovery and IP update calls. Each call refreshes the cached manager configuration to match the provided arguments. :param broadcast: Broadcast address for discovery. :type broadcast: str :param scan_interval: Seconds between discovery scans. :type scan_interval: float :param offline_grace: Seconds without response before marking offline. :type offline_grace: float :param device_timeout: Seconds without response before removing a device. :type device_timeout: float :param continuous: Whether the shared manager should keep background scanning enabled. :type continuous: bool :return: Shared manager instance. :rtype: CrackerManager """ if cls._cracker_manager is None: cls._cracker_manager = CrackerManager( broadcast=broadcast, scan_interval=scan_interval, offline_grace=offline_grace, device_timeout=device_timeout, continuous=continuous, ) else: cls._cracker_manager._broadcast = broadcast cls._cracker_manager._scan_interval = scan_interval cls._cracker_manager._offline_grace = offline_grace cls._cracker_manager._device_timeout = device_timeout if continuous and not cls._cracker_manager._running: cls._cracker_manager.start_discovery() elif not continuous and cls._cracker_manager._running: cls._cracker_manager.stop_discovery() cls._cracker_manager._continuous = continuous return cls._cracker_manager
[docs] @classmethod def discover_devices( cls, broadcast: str = "255.255.255.255", scan_interval: float = 3.0, offline_grace: float = 15.0, device_timeout: float = 60.0, ) -> list[dict]: """ Discover devices on the local network once. This class method reuses the shared ``CrackerManager`` instance and performs a single discovery scan without starting continuous background monitoring. :param broadcast: Broadcast address for discovery. :type broadcast: str :param scan_interval: Cached scan interval for the shared manager. :type scan_interval: float :param offline_grace: Cached offline grace period for the shared manager. :type offline_grace: float :param device_timeout: Cached device timeout for the shared manager. :type device_timeout: float :return: List of discovered device information dictionaries. :rtype: list[dict] """ manager = cls._create_device_manager( broadcast=broadcast, scan_interval=scan_interval, offline_grace=offline_grace, device_timeout=device_timeout, continuous=False, ) return manager.discover_once()
[docs] @classmethod def set_device_ip( cls, target_ip: str, new_ip: str, mask: str = "", gateway: str = "", delay_ms: int = 200, broadcast: str = "255.255.255.255", ) -> dict | None: """ Change the network address of a device identified by its current IP. This class method reuses the shared ``CrackerManager`` instance and sends a single IP change request to the target device. When ``mask`` and ``gateway`` are omitted the device-side manager will apply its own defaults. :param target_ip: Current IP address of the target device. :type target_ip: str :param new_ip: New device IP address, or ``"dhcp"`` to enable DHCP. :type new_ip: str :param mask: Subnet mask. If omitted, the device-side manager handles it. :type mask: str :param gateway: Default gateway. If omitted, the device-side manager handles it. :type gateway: str :param delay_ms: Delay before the device applies the change. :type delay_ms: int :param broadcast: Broadcast address cached on the shared manager. :type broadcast: str :return: ACK information from the device, or ``None`` on timeout. :rtype: dict | None """ manager = cls._create_device_manager( broadcast=broadcast, continuous=False, ) return manager.set_ip( target_ip=target_ip, new_ip=new_ip, mask=mask, gateway=gateway, delay_ms=delay_ms, )
def _sync_inner_obj_ip(self) -> None: """ Synchronize inner helper objects to the current target IP. This updates the cached ``SSHCracker`` instance so it points at the address currently stored in ``self._server_address``. """ if self._server_address is None: self._ssh_cracker = None return if self._ssh_cracker is None or self._ssh_cracker.ip != self._server_address[0]: self._ssh_cracker = _SSHCracker(ip=self._server_address[0])
[docs] def set_address(self, address: tuple[str, int] | str | None) -> None: """ Update the target cracker address stored in this instance. This method updates the local target address metadata used by this object and synchronizes the inner ``SSHCracker`` target IP settings. It does not communicate with the device, so it does not modify the real device network configuration. :param address: Device address as ``(ip, port)``, URI-like string, or ``None`` to clear the current target. :type address: tuple[str, int] | str | None :return: None """ if isinstance(address, tuple): self._server_address = address self._sync_inner_obj_ip() elif address is None: self._server_address = None self._sync_inner_obj_ip() elif isinstance(address, str): self.set_uri(address)
[docs] def get_address(self) -> tuple[str, int] | None: """ Get the device address in tuple format. :return: address in tuple format: (ip, port). :rtype: tuple[str, int] """ return self._server_address
[docs] def set_ip_port(self, ip, port) -> None: """ Update the target cracker IP and port stored in this instance. This method updates the local target address used by subsequent operations and synchronizes the inner ``SSHCracker`` object to the same target. It does not modify the real device network configuration. :param ip: IP address. :type ip: str :param port: Port. :type port: int :return: None """ self._server_address = ip, port self._sync_inner_obj_ip()
[docs] def set_uri(self, uri: str) -> None: """ Update the target cracker address stored in this instance from a URI. This method parses and stores the local target address, then synchronizes the inner ``SSHCracker`` object to that target. It does not communicate with the device and does not modify the device IP. :param uri: Device URI in the form ``cnp://<ip>[:port]`` or ``<ip>[:port]``. :type uri: str :return: None """ if not uri.startswith("cnp://") and uri.count(":") < 2: uri = "cnp://" + uri uri = uri.replace("cnp://", "", 1) if ":" in uri: host, port = uri.split(":") else: host, port = uri, protocol.DEFAULT_PORT # type: ignore self._server_address = host, int(port) self._sync_inner_obj_ip()
[docs] @connection_status_check def set_logging_level(self, level: str) -> None: """ Set the Cracker OS logging level. :param level: The logging level: ``debug``, ``info``, ``warning``, or ``error``. :type level: str :return: None """ if level.lower() not in ("debug", "info", "warning", "error"): self._logger.error("Invalid logging level.") self.send_and_receive( protocol.build_send_message(protocol.Command.SET_LOGGING_LEVEL, payload=level.encode("ascii")) )
[docs] def get_uri(self) -> str | None: """ Get the device address in URI format. :return: URI. if cracker address is not specified, None is returned. :rtype: str | None """ if self._server_address is None: return None else: port = self._server_address[1] if port == protocol.DEFAULT_PORT: port = None return f"cnp://{self._server_address[0]}{'' if port is None else f':{port}'}"
[docs] def connect( self, update_bin: bool = True, force_update_bin: bool = False, bin_server_path: str | None = None, bin_bitstream_path: str | None = None, force_write_default_config: bool = False, ) -> None: """ Connect to cracker device. :param update_bin: Whether to update the firmware. :type update_bin: bool :param force_update_bin: Whether to force update the firmware while the device is running normally (by default, firmware updates are not performed when the device is running normally). :type force_update_bin: bool :param bin_server_path: The bin_server (firmware) file for updates. :type bin_server_path: str | None :param bin_bitstream_path: The bin_bitstream (firmware) file for updates. :type bin_bitstream_path: str | None :param force_write_default_config: Whether to force update the configuration while the device is running normally (by default, configuration updates are only performed when updating the firmware). :type force_write_default_config: bool :return: None """ self._sync_inner_obj_ip() if bin_server_path is None: bin_server_path = self._bin_server_path if bin_bitstream_path is None: bin_bitstream_path = self._bin_bitstream_path bin_updated = False if update_bin: success, bin_updated = self._update_cracker_bin(force_update_bin, bin_server_path, bin_bitstream_path) if not success: self._logger.error("Failed to update cracker firmware.") return if force_update_bin and self._socket and self._connection_status: # Reset the connection if forcing a bin update when it was previously connected. self._socket = None self._connection_status = False if self._socket and self._connection_status: self._logger.debug("Already connected, reuse.") return try: if not self._socket: self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.settimeout( 60 ) # todo 由于在IO通信时,delay的存在,可能超过这个超时时间,这里暂时设置为60秒的超时 self._socket.connect(self._server_address) self._connection_status = True self._logger.info(f"Connected to cracker: {self._server_address}") if (update_bin and bin_updated) or force_write_default_config: # Sync the default configuration to the cracker when updating its firmware. self.write_config_to_cracker(self.get_default_config()) self._logger.info("Write default configuration to Cracker.") self._config = self.get_current_config() self._logger.info("Synchronize the configuration from Cracker.") self._sync_inner_obj_ip() if self._ssh_cracker is not None: try: if not self._ssh_cracker.is_connected(): self._ssh_cracker.connect() self._logger.info(f"Connected to SSH cracker: {self._ssh_cracker.ip}") except Exception as e: self._logger.warning(f"Failed to connect SSH cracker: {e}") except OSError as e: self._logger.error("Connection failed: %s", e) self._socket = None self._connection_status = False
def _update_cracker_bin( self, force_update: bool = False, bin_server_path: str | None = None, bin_bitstream_path: str | None = None, ) -> tuple[bool, bool]: """ Update cracker's firmwares: server.bin and bitstream.bin. :param force_update: Whether to force update the firmware if the device already has the firmware installed. :type force_update: bool :param bin_server_path: The bin_server file path for updates, will use the embedded firmware if not specified. :type bin_server_path: str | None :param bin_bitstream_path: The bin_bitstream file path for updates, will use the embedded firmware if not specified. :type bin_bitstream_path: str | None """ if self._ssh_cracker is None: self._logger.error("Update cracker bin failed: SSH client is not initialized.") return False, False try: self._ssh_cracker._ensure_connected() except Exception as e: self._logger.error(f"Update cracker bin failed: SSH connection error: {e}") return False, False if not force_update and self._ssh_cracker.get_server_status(): return True, False self._hardware_model = self._ssh_cracker.get_hardware_model() if bin_server_path is None or bin_bitstream_path is None: if self._hardware_model is None or self._hardware_model == "unknown": self._logger.error( "The hardware model is unknown, and the Cracker bin cannot be updated. Alternatively, " "you can specify the bin_server_path and bin_bitstream_path in the connect API." ) return False, False _bin_server_path, _bin_bitstream_path = self._get_bin_file_path(self._hardware_model) if bin_server_path is None: bin_server_path = _bin_server_path if bin_bitstream_path is None: bin_bitstream_path = _bin_bitstream_path if bin_server_path is None: self._logger.error(f"Can't find bin_server file for hardware model: {self._hardware_model}.") return False, False if bin_bitstream_path is None: self._logger.error(f"Can't find bin_bitstream file for hardware model: {self._hardware_model}.") return False, False try: success = ( self._ssh_cracker.update_server(bin_server_path) and self._ssh_cracker.update_bitstream(bin_bitstream_path) and self._ssh_cracker.get_server_status() ) if success: self._installed_bin_server_path = bin_server_path self._installed_bin_bitstream_path = bin_bitstream_path return success, success except OSError as e: self._logger.error(f"Update cracker bin failed: {e.args}") return False, False
[docs] def get_firmware_info(self): if self._installed_bin_server_path is None or self._installed_bin_bitstream_path is None: self._logger.warning("The Cracker has not successfully installed any firmware.") return ( f"hardware model: {self._hardware_model}, " f"bin server: {self._installed_bin_server_path}, " f"bin_bitstream: {self._installed_bin_bitstream_path}" )
def _get_bin_file_path(self, model: str): firmware_path = os.path.join(os.path.dirname(importlib.util.find_spec("cracknuts").origin), "firmware") map_json_path = os.path.join(firmware_path, "map.json") map_json = json.load(open(map_json_path)) if model not in map_json: return None, None bin_server_path = map_json[model]["server"] if bin_server_path is not None: bin_server_path = os.path.join(firmware_path, bin_server_path) bin_bitstream_path = map_json[model]["bitstream"] if bin_bitstream_path is not None: bin_bitstream_path = os.path.join(firmware_path, bin_bitstream_path) if not os.path.exists(bin_server_path) or not os.path.isfile(bin_server_path): self._logger.error(f"Find bin server path: {bin_server_path}, but it is not exist or not a file.") bin_server_path = None if not os.path.exists(bin_bitstream_path) or not os.path.isfile(bin_bitstream_path): self._logger.error(f"Find bin_bitstream path: {bin_bitstream_path}, but it is not exist or not a file.") bin_bitstream_path = None return bin_server_path, bin_bitstream_path
[docs] def disconnect(self) -> None: """ Disconnect from cracker device. :return: None """ if not self._connection_status: return try: if self._socket: self._socket.close() self._logger.info(f"Disconnect from {self._server_address}") except OSError as e: self._logger.error("Disconnection failed: %s", e) finally: self._connection_status = False self._socket = None self._hardware_model = None self._installed_bin_server_path = None self._installed_bin_bitstream_path = None
[docs] def reconnect(self): """ Reconnect to cracker device. :return: None """ self.disconnect() self.connect()
[docs] def get_connection_status(self) -> bool: """ Get connection status. :return: True for connected and False for disconnected. :rtype: bool """ return self._connection_status
[docs] def send_and_receive(self, message: bytes) -> tuple[int, bytes | None]: """ Send message to cracker device. :param message: The byte message to send. :type message: bytes :return: Received message in format: (status, message). :rtype: tuple[int, bytes | None] """ if self._socket is None: self._logger.error("Cracker not connected") return protocol.STATUS_ERROR, None try: self._command_lock.acquire() if not self.get_connection_status(): self._logger.error("Cracker is not connected.") return protocol.STATUS_ERROR, None if self._logger.isEnabledFor(logging.DEBUG): self._logger.debug( f"Send message to {self._server_address}: \n{ hex_util.get_bytes_matrix(message, max_bytes_count=self._logger_debug_payload_max_length) }" ) self._socket.sendall(message) resp_header = self._socket.recv(protocol.RES_HEADER_SIZE) if self._logger.isEnabledFor(logging.DEBUG): self._logger.debug( "Get response header from %s: \n%s", self._server_address, hex_util.get_bytes_matrix(resp_header), ) try: magic, version, direction, status, length = struct.unpack(protocol.RES_HEADER_FORMAT, resp_header) except Exception as e: self._logger.error("Get response header failed: %s", e) self._logger.error(f"The request is {hex_util.get_bytes_matrix(message)}") self._logger.error(f"The header is [{hex_util.get_hex(resp_header)}]") # import traceback # # traceback.print_stack() return protocol.STATUS_ERROR, None if self._logger.isEnabledFor(logging.DEBUG): self._logger.debug( f"Receive header from {self._server_address}: " f"{magic}, {version}, {direction}, 0x{status:04X}, {length}" ) if length == 0: resp_payload = None else: resp_payload = self._recv(length) if status != protocol.STATUS_OK: try: resp_payload_str = resp_payload.decode("utf-8") if resp_payload is not None else "" except UnicodeDecodeError: resp_payload_str = hex_util.get_hex(resp_payload, max_len=len(resp_payload)) req_command, req_payload = protocol.unpack_send_message(message) if req_command is None: self._logger.warning("Request message format error, cannot unpack command.") self._logger.warning( f"Command {f'0x{req_command:04X}' if req_command is not None else 'None'} " f"with payload {hex_util.get_hex(req_payload, max_len=len(req_payload))} " f"received a non-OK response with status code 0x{status:04X}, " f"and payload [{resp_payload_str}]." ) else: if self._logger.isEnabledFor(logging.DEBUG): self._logger.debug( f"Receive payload from {self._server_address}: \n" f"{ hex_util.get_bytes_matrix( resp_payload, max_bytes_count=self._logger_debug_payload_max_length ) }" ) return status, resp_payload except OSError as e: self._logger.error( f"Send message failed: {e}, and msg, command :0x{message[7:9].hex()}" f", payload: {hex_util.get_hex(message[10:])}" ) return protocol.STATUS_ERROR, None finally: self._command_lock.release()
def _recv(self, length): resp_payload = b"" while (received_len := len(resp_payload)) < length: for_receive_len = length - received_len resp_payload += self._socket.recv(for_receive_len) return resp_payload
[docs] def send_with_command( self, command: int, rfu: int = 0, payload: str | bytes | None = None ) -> tuple[int, bytes | None]: if isinstance(payload, str): payload = bytes.fromhex(payload) if self._logger.isEnabledFor(logging.INFO): self._logger.info( f"Send command [0x{command:04x}] with payload: " f"{None if payload is None else hex_util.get_hex(payload, self._logger_info_payload_max_length)}] " f"to {self._server_address}" ) status, payload = self.send_and_receive(protocol.build_send_message(command, rfu, payload)) if self._logger.isEnabledFor(logging.INFO): self._logger.info( f"Receive response for command: [0x{command:04x}] from {self._server_address}, " f"status: 0x{status:04X}, payload: length: {0 if payload is None else len(payload)}, content: " f"{None if payload is None else hex_util.get_hex(payload, self._logger_info_payload_max_length)}" ) return status, payload
[docs] def set_logger_info_payload_max_length(self, length): self._logger_info_payload_max_length = length
[docs] def set_logger_debug_payload_max_length(self, length): self._logger_debug_payload_max_length = length
[docs] @abc.abstractmethod def get_default_config(self) -> T: """ Get the default configuration. This method needs to be implemented by the specific device class, as different devices have different default configurations. :return: The default config object(The specific subclass of CommonConfig). :rtype: ConfigBasic """ ...
[docs] @connection_status_check def get_current_config(self) -> T: """ Get current configuration of `Cracker`. Note: Currently, the configuration returned is recorded on the host computer, not the ACTUAL configuration of the device. In the future, it should be synchronized from the device to the host computer. :return: Current configuration of `Cracker`. :rtype: ConfigBasic """ return self._config
[docs] @connection_status_check def write_config_to_cracker(self, config: T): """ Sync config to cracker. To prevent configuration inconsistencies between the host and the device, so all configuration information needs to be written to the device. User should call this function before get data from device. NOTE: This function is currently ignored and will be resumed after all Cracker functions are completed. """ ...
[docs] @connection_status_check def dump_config(self, path=None) -> str | None: """ Dump the current config to a JSON file if a path is specified, or to a JSON string if no path is specified. :param path: the path to the JSON file :type path: str | None :return: the content of JSON string or None if no path is specified. :rtype: str | None """ config_json = self.get_current_config().dump_to_json() if path is None: return config_json else: with open(path, "w") as f: f.write(config_json)
[docs] @connection_status_check def load_config_from_file(self, path: str) -> None: """ Load config from a JSON file. :param path: the path to the JSON file :type path: str :return: None """ with open(path) as f: content = f.readlines() config_json = json.loads("".join(content)) if "cracker" in config_json: content = config_json["cracker"] self.load_config_from_str(content)
[docs] @connection_status_check def load_config_from_str(self, json_str: str) -> None: """ Load config from a JSON string. :param json_str: the JSON string :type json_str: str :return: None """ self._config.load_from_json(json_str) self.write_config_to_cracker(self._config)
[docs] @connection_status_check def get_id(self) -> tuple[int, str | None]: """ Get the ID of the equipment. :return: The equipment response status code and the ID of the equipment. :rtype: tuple[int, str | None] """ return protocol.STATUS_OK, self._ssh_cracker.get_sn() if self._ssh_cracker else None
[docs] @connection_status_check def get_hardware_model(self) -> tuple[int, str | None]: """ Get the name of the equipment. :return: The equipment response status code and the name of the equipment. :rtype: tuple[int, str | None] """ return protocol.STATUS_OK, self._ssh_cracker.get_hardware_model() if self._ssh_cracker else None
[docs] @connection_status_check def get_bitstream_version(self): return self.send_with_command(protocol.Command.GET_BITSTREAM_VERSION)
[docs] @connection_status_check def get_firmware_version(self) -> tuple[int, str | None]: """ Get the version of the equipment. :return: The equipment response status code and the version of the equipment. :rtype: tuple[int, str | None] """ bitstream_status, bitstream_version = self.get_bitstream_version() server_version = self._ssh_cracker.get_server_version() if self._ssh_cracker else None return ( protocol.STATUS_OK, f"server_version: {server_version}, bitstream_version: {bitstream_version}", )
[docs] @connection_status_check def osc_single(self) -> tuple[int, None]: payload = None self._logger.debug("scrat_sample_len payload: %s", payload) status, res = self.send_with_command(protocol.Command.OSC_SINGLE, payload=payload) return status, None
[docs] @connection_status_check def osc_force(self) -> tuple[int, None]: """ Force produce a wave data. :return: The device response status :rtype: tuple[int, None] """ payload = None self._logger.debug(f"scrat_force payload: {payload}") return self.send_with_command(protocol.Command.OSC_FORCE, payload=payload)
[docs] @connection_status_check def osc_is_triggered(self) -> tuple[int, bool]: payload = None self._logger.debug(f"scrat_is_triggered payload: {payload}") status, res = self.send_with_command(protocol.Command.OSC_IS_TRIGGERED, payload=payload) if status != protocol.STATUS_OK: self._logger.error(f"is_triggered receive status code error [0x{status:04X}]") return status, False else: if res is None: self._logger.error("is_triggered get empty payload.") return status, False else: res_code = int.from_bytes(res, "big") return status, res_code == 4
[docs] def osc_get_wave(self, channel: int | str, offset: int, sample_count: int) -> tuple[int, np.ndarray | None]: return self.osc_get_analog_wave(channel, offset, sample_count)
[docs] @connection_status_check def osc_get_analog_wave(self, channel: int, offset: int, sample_count: int) -> tuple[int, np.ndarray | None]: """ Get the analog wave. :param channel: The channel of the analog wave. It can be either 0, 1, or 'A', 'B'. :type channel: int|str :param offset: the offset of the analog wave. :type offset: int :param sample_count: the sample count of the analog wave. :type sample_count: int :return: the analog wave. :rtype: tuple[int, np.ndarray] """ if isinstance(channel, str): channels = ("A", "B") if channel not in channels: self._logger.error(f"Invalid channel: {channel}. it must be one of {channels}.") return self.NON_PROTOCOL_ERROR, None channel = channels.index(channel) payload = struct.pack(">BII", channel, offset, sample_count) self._logger.debug(f"scrat_get_analog_wave payload: {payload.hex()}") status, wave_bytes = self.send_with_command(protocol.Command.OSC_GET_ANALOG_WAVES, payload=payload) if status != protocol.STATUS_OK: return status, np.array([]) else: if wave_bytes is None: return status, np.array([]) else: wbl = len(wave_bytes) expect_wbl = sample_count * 2 if wbl != expect_wbl: self._logger.error( f"Wave bytes length error: require {expect_wbl} but get {wbl}:\n{hex_util.get_hex(wave_bytes)}" ) if wbl != 0: self._logger.error("Wave bytes length is not expected, will get actually length wave.") if wbl % 2 != 0: self._logger.error("Wave bytes length is a odd number, will get a even length wave.") wave = struct.unpack(f"{wbl // 2}h", wave_bytes) return status, np.array(wave, dtype=np.int16) else: return status, np.array([]) else: wave = struct.unpack(f"{sample_count}h", wave_bytes) return status, np.array(wave, dtype=np.int16)
[docs] def osc_get_digital_wave(self, channel: int, offset: int, sample_count: int) -> tuple[int, np.ndarray]: payload = struct.pack(">BII", channel, offset, sample_count) self._logger.debug(f"scrat_get_digital_wave payload: {payload.hex()}") status, wave_bytes = self.send_with_command(protocol.Command.OSC_GET_ANALOG_WAVES, payload=payload) if status != protocol.STATUS_OK: return status, np.array([]) else: if wave_bytes is None: return status, np.array([]) else: wave = struct.unpack(f"{sample_count}h", wave_bytes) return status, np.array(wave, dtype=np.int16)