Skip to content

🤚 ROBOTERA X-Hand 1

The XHAND1 by ROBOTERA is a full direct-drive humanoid five-finger dexterous hand developed in-house.

🕹️Control Method

The gripper officially uses a 485 interface, which Adam has converted to an Ethernet interface. This allows the gripper to function as a node device within the Adam ecosystem, enabling Ethernet operations such as TCP/UDP more easily.

  • Communication process: TCP/UDP <-> 485 <-> hand

Using the original manufacturer's protocol, Adam only performs protocol forwarding between TCP/UDP and 485
For specific protocol details, refer to the Original Product Manual

🐍Python demo

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import time
import socket
import struct
import signal
import threading
from dataclasses import dataclass
from typing import Optional, List, Dict, Any

# ================== 配置 ==================
DEVICE_PORT = 2562
DEVICES = ["10.10.10.18", "10.10.10.38"]

SRC_ID = 0xFE
DST_ID = 0xFF

# 控制哪个设备:
#   "all" / None  -> 两台都控
#   "10.10.10.18" -> 单控
#   "10.10.10.38" -> 单控
CONTROL_ONLY = "all"

# 固定周期发送(秒)
SEND_PERIOD = 0.02
RT_WAIT_TIMEOUT = 0.05

HEAD = b"\x55\xAA"
MIN_FRAME_LEN = 9  # 2(head)+1+1+1+2(len)+2(crc)

CMD_GET_VERSION = 0x13
CMD_RT = 0x02

RT_EXPECT_PAYLOAD_LEN = 2208

stop_event = threading.Event()

# ================== CRC16 ==================
crc16tab = [
    0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
    0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
    0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
    0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
    0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
    0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
    0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
    0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
    0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
    0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
    0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
    0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
    0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
    0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
    0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
    0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
    0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
    0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
    0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
    0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
    0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
    0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
    0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
    0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
    0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
    0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
    0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
    0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
    0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
    0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
    0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
    0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
]

def crc16(buf: bytes) -> int:
    crc = 0x0000
    for b in buf:
        crc = ((crc << 8) & 0xFFFF) ^ crc16tab[((crc >> 8) ^ b) & 0xFF]
    return crc

def pack_frame(src: int, dst: int, cmd: int, payload: bytes = b"") -> bytes:
    frame = bytearray()
    frame += HEAD
    frame += struct.pack("<BBB", src, dst, cmd)
    frame += struct.pack("<H", len(payload))
    frame += payload
    frame += struct.pack("<H", crc16(frame))
    return bytes(frame)

# ================== 拼帧解码器 ==================
class FrameDecoder:
    def __init__(self, crc16_func, max_buf=1024 * 1024):
        self.buf = bytearray()
        self.crc16 = crc16_func
        self.max_buf = max_buf

    def feed(self, data: bytes):
        if not data:
            return []

        self.buf += data
        if len(self.buf) > self.max_buf:
            self.buf = self.buf[-4096:]

        out = []
        while True:
            idx = self.buf.find(HEAD)
            if idx < 0:
                if len(self.buf) > 1:
                    self.buf = self.buf[-1:]
                break

            if idx > 0:
                del self.buf[:idx]

            if len(self.buf) < MIN_FRAME_LEN:
                break

            data_len = struct.unpack_from("<H", self.buf, 5)[0]
            need_len = 7 + data_len + 2
            if len(self.buf) < need_len:
                break

            frame = bytes(self.buf[:need_len])
            del self.buf[:need_len]

            src, dst, cmd = frame[2], frame[3], frame[4]
            payload = frame[7:7 + data_len]
            recv_crc = struct.unpack_from("<H", frame, 7 + data_len)[0]
            calc_crc = self.crc16(frame[:7 + data_len])

            out.append({
                "src": src,
                "dst": dst,
                "cmd": cmd,
                "data_len": data_len,
                "payload": payload,
                "crc_ok": (recv_crc == calc_crc),
            })
        return out

# ================== 版本解析 ==================
def decode_ver_u32(v: int):
    major = (v >> 24) & 0xFF
    minor = (v >> 16) & 0xFF
    rel = v & 0xFFFF
    return major, minor, rel

def parse_versions_payload(payload: bytes) -> dict:
    if len(payload) != 8:
        raise ValueError(f"invalid version payload len: {len(payload)}")
    sw_u32 = struct.unpack_from("<I", payload, 0)[0]
    hw_u32 = struct.unpack_from("<I", payload, 4)[0]
    sw = decode_ver_u32(sw_u32)
    hw = decode_ver_u32(hw_u32)
    return {
        "sw_u32": sw_u32,
        "hw_u32": hw_u32,
        "sw_str": f"{sw[0]}.{sw[1]}.{sw[2]}",
        "hw_str": f"{hw[0]}.{hw[1]}.{hw[2]}",
    }

# ================== RT_REPORT 解析 ==================
JOINT_STATE_SIZE = 24
RT_JOINT_COUNT = 12

def parse_rt_report_2208(payload: bytes) -> dict:
    if len(payload) != RT_EXPECT_PAYLOAD_LEN:
        raise ValueError("rt payload length error")

    joints = []
    off = 0
    for i in range(RT_JOINT_COUNT):
        joint_id, pos, torque, *d = struct.unpack_from("<HfH8H", payload, off)
        joints.append({
            "id": i,
            "pos": pos,
            "torque": torque,
            "defaults": d,
        })
        off += JOINT_STATE_SIZE

    tips = []
    for i in range(5):
        fx, fy, fz = struct.unpack_from("<bbB", payload, off)
        temp_sum = payload[off + 383]
        tips.append({"idx": i, "fx": fx, "fy": fy, "fz": fz, "temp_sum": temp_sum})
        off += 384

    return {"joints": joints, "tips": tips}

# ===== 你要求新增/使用的 dump_rt(保持原样) =====
def dump_rt(rt: dict) -> str:
    lines = ["JOINTS:"]
    for j in rt["joints"]:
        lines.append(f"  J{j['id']:02d} | pos={j['pos']:+7.3f} | tq={j['torque']:5d}")
    return "\n".join(lines)

# ================== RT_CTRL 高效打包 ==================
JOINT_FMT = "<H h h h f H H H H H H"
JOINT_SIZE = struct.calcsize(JOINT_FMT)  # 24
assert JOINT_SIZE == 24

def build_rt_ctrl_template_payload(kp=225, ki=0, kd=12000, mode=3, init_pos=0.0, init_force=0) -> bytearray:
    payload = bytearray(JOINT_SIZE * 12)
    for jid in range(12):
        struct.pack_into(
            JOINT_FMT, payload, jid * JOINT_SIZE,
            jid, kp, ki, kd, float(init_pos), int(init_force), int(mode),
            0, 0, 0, 0
        )
    return payload

def apply_ops_to_payload(payload: bytearray, ops, kp=225, ki=0, kd=12000, mode=3):
    for joint_id, position, force_limit in ops:
        jid = int(joint_id)
        if not (0 <= jid < 12):
            continue
        struct.pack_into(
            JOINT_FMT, payload, jid * JOINT_SIZE,
            jid, kp, ki, kd, float(position), int(force_limit), int(mode),
            0, 0, 0, 0
        )

@dataclass
class RtLast:
    t: float = 0.0
    data_len: int = 0
    rt: Optional[dict] = None

@dataclass
class Stats:
    sec_t0: float
    ok: int = 0
    timeout: int = 0

def make_periodic_checker(period_s: float, fire_immediately: bool = True):
    last_fire = None
    def check() -> bool:
        nonlocal last_fire
        now = time.monotonic()
        if last_fire is None:
            last_fire = now
            return bool(fire_immediately)
        if now - last_fire >= period_s:
            last_fire = now
            return True
        return False
    return check

class DeviceClient:
    """
    每个设备:
      - 独立 socket
      - 独立 rx 线程
      - 独立 tx 线程(固定周期发送,互不影响)
      - 发送后RT_WAIT_TIMEOUT窗口统计 ok/timeout(不丢包:Condition+seq)
    """
    def __init__(self, ip: str, dev_port: int):
        self.ip = ip
        self.target = (ip, dev_port)

        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind(("0.0.0.0", 0))
        self.sock.settimeout(0.5)

        self.decoder = FrameDecoder(crc16)

        self.rx_th = threading.Thread(target=self._recv_loop, daemon=True)
        self.tx_th = threading.Thread(target=self._send_loop, daemon=True)

        # 版本等待
        self.ver_event = threading.Event()
        self.ver_last: Optional[dict] = None

        # RT: Condition + seq
        self.rt_cv = threading.Condition()
        self.rt_seq = 0
        self.rt_last = RtLast()

        # 统计(每IP独立)
        self.stats = Stats(sec_t0=time.time())

        # 发送内容由主线程配置
        self._template_payload: Optional[bytearray] = None
        self._ops_list: Optional[List[Any]] = None
        self._ops_cnt = 0
        self._step_check = make_periodic_checker(0.5, fire_immediately=False)

        self._show_rt_dump = False

    def start(self):
        self.rx_th.start()

    def start_tx(self, template_payload: bytearray, ops_list: List[Any], show_rt_dump: bool = False):
        self._template_payload = template_payload
        self._ops_list = ops_list
        self._show_rt_dump = show_rt_dump
        self.tx_th.start()

    def close(self):
        try:
            self.sock.close()
        except Exception:
            pass

    # ---------- RX ----------
    def _recv_loop(self):
        while not stop_event.is_set():
            try:
                data, peer = self.sock.recvfrom(8192)
                if peer[0] != self.ip:
                    continue
                for msg in self.decoder.feed(data):
                    self._on_frame(msg)
            except socket.timeout:
                continue
            except OSError:
                break

    def _on_frame(self, msg: dict):
        if not msg.get("crc_ok"):
            return

        if msg["cmd"] == CMD_GET_VERSION:
            try:
                ver = parse_versions_payload(msg["payload"])
                self.ver_last = ver
                print(f"[VER {self.ip}] SW={ver['sw_str']} (0x{ver['sw_u32']:08X})  HW={ver['hw_str']} (0x{ver['hw_u32']:08X})")
                self.ver_event.set()
            except Exception as e:
                print(f"[VER {self.ip}] parse error:", e)

        elif msg["cmd"] == CMD_RT:
            if msg["data_len"] != RT_EXPECT_PAYLOAD_LEN:
                return
            try:
                rt = parse_rt_report_2208(msg["payload"])
                with self.rt_cv:
                    self.rt_last.t = time.time()
                    self.rt_last.data_len = msg["data_len"]
                    self.rt_last.rt = rt
                    self.rt_seq += 1
                    self.rt_cv.notify_all()
            except Exception as e:
                print(f"[RT {self.ip}] parse error:", e)

    # ---------- GET_VERSION ----------
    def send_get_version(self):
        frame = pack_frame(SRC_ID, DST_ID, CMD_GET_VERSION, b"")
        self.sock.sendto(frame, self.target)

    def send_get_version_and_wait(self, timeout=1.0, retry=5, interval=0.2) -> dict:
        self.ver_event.clear()
        for _ in range(retry):
            self.send_get_version()
            if self.ver_event.wait(timeout=timeout) and self.ver_last:
                return self.ver_last
            time.sleep(interval)
        raise TimeoutError(f"GET_VERSION timeout for {self.ip} after {retry} retries")

    # ---------- RT_CTRL send ----------
    def send_rt_ctrl_ops_fast(self, template_payload: bytearray, ops, kp=225, ki=0, kd=12000, mode=3):
        payload = bytearray(template_payload)
        apply_ops_to_payload(payload, ops, kp=kp, ki=ki, kd=kd, mode=mode)
        frame = pack_frame(SRC_ID, DST_ID, CMD_RT, payload)
        self.sock.sendto(frame, self.target)

    # ---------- wait+count ----------
    def wait_rt_and_count(self, timeout=0.2) -> bool:
        deadline = time.monotonic() + timeout
        with self.rt_cv:
            start_seq = self.rt_seq
            while self.rt_seq == start_seq and not stop_event.is_set():
                remain = deadline - time.monotonic()
                if remain <= 0:
                    self.stats.timeout += 1
                    return False
                self.rt_cv.wait(timeout=remain)

            if self.rt_seq != start_seq:
                self.stats.ok += 1
                return True

            self.stats.timeout += 1
            return False

    def maybe_print_stats_each_second(self):
        now = time.time()
        if now - self.stats.sec_t0 >= 1.0:
            dt = now - self.stats.sec_t0
            hz = self.stats.ok / dt if dt > 0 else 0.0
            age = (now - self.rt_last.t) if self.rt_last.t else 999.0
            print(f"[STAT {self.ip}] ok={self.stats.ok}/s ({hz:.1f}Hz) timeout={self.stats.timeout}/s  last_rt: len={self.rt_last.data_len} age={age:.3f}s")

            # === 这里是你要的 dump_rt 打印 ===
            if self._show_rt_dump and self.rt_last.rt:
                # 缩进一下,方便阅读
                print("  [RT]")
                print("    " + dump_rt(self.rt_last.rt).replace("\n", "\n    "))

            self.stats.sec_t0 = now
            self.stats.ok = 0
            self.stats.timeout = 0

    # ---------- TX loop (固定周期,独立线程) ----------
    def _send_loop(self):
        assert self._template_payload is not None
        assert self._ops_list is not None

        next_tick = time.monotonic()  # 立即发第一帧

        while not stop_event.is_set():
            now_mono = time.monotonic()

            # 睡到下一拍
            if now_mono < next_tick:
                time.sleep(next_tick - now_mono)
                now_mono = time.monotonic()

            # 推进下一拍(绝对时间,避免漂移)
            next_tick += SEND_PERIOD

            # 如果严重落后,跳帧对齐,避免积压
            late = time.monotonic() - next_tick
            if late > 2 * SEND_PERIOD:
                next_tick = time.monotonic() + SEND_PERIOD

            # 每秒切换动作序列(每设备独立)
            if self._step_check():
                self._ops_cnt = (self._ops_cnt + 1) % len(self._ops_list)

            # 发送
            ops = self._ops_list[self._ops_cnt]
            self.send_rt_ctrl_ops_fast(self._template_payload, ops)

            # 发送后窗口统计(每设备独立线程)
            self.wait_rt_and_count(timeout=RT_WAIT_TIMEOUT)

            # 每秒输出统计(每设备独立)
            self.maybe_print_stats_each_second()

def select_targets(clients: Dict[str, DeviceClient]) -> List[DeviceClient]:
    if CONTROL_ONLY is None or CONTROL_ONLY == "all":
        return [clients[ip] for ip in DEVICES]
    return [clients[CONTROL_ONLY]]

# ================== 主程序 ==================
def main():
    clients: Dict[str, DeviceClient] = {ip: DeviceClient(ip, DEVICE_PORT) for ip in DEVICES}
    for c in clients.values():
        c.start()

    targets = select_targets(clients)
    print("=== TARGETS ===", [c.ip for c in targets])

    # 先拿版本:目标设备都成功才继续
    print("=== GET VERSION ===")
    try:
        for c in targets:
            ver = c.send_get_version_and_wait(timeout=1.0, retry=5, interval=0.2)
            print(f"[INFO] {c.ip} Version OK: SW={ver['sw_str']} HW={ver['hw_str']}")
    except TimeoutError as e:
        print("[ERROR]", e)
        stop_event.set()
        for c in clients.values():
            c.close()
        return

    print("=== Start RT loop (per-device TX threads) ===")

    ops_list = [
        [(i, 0, 150) for i in range(12)],

        [(10, 1, 150)], [(11, 1, 150)], [(11, 0, 150)], [(10, 0, 150)],
        [(8, 1, 150)], [(9, 1, 150)], [(9, 0, 150)], [(8, 0, 150)],
        [(6, 1, 150)], [(7, 1, 150)], [(7, 0, 150)], [(6, 0, 150)],

        [(3, 1, 150)], [(4, 1, 150)], [(5, 1, 150)], [(5, 0, 150)], [(4, 0, 150)], [(3, 0, 150)],

        [(0, 1, 150)], [(0, 0, 150)], [(1, 1, 150)], [(2, 1, 150)], [(2, 0, 150)], [(1, 0, 150)],

        [(i, 0, 150) for i in range(12)],

        [(0, 0, 150),
         (1, 1, 150),
         (2, 1, 150),
         (3, 0, 150),
         (4, 1, 150),
         (5, 1, 150),
         (6, 1, 150),
         (7, 1, 150),
         (8, 1, 150),
         (9, 1, 150),
         (10, 1, 150),
         (11, 1, 150)],
    ]

    template_payload = build_rt_ctrl_template_payload(kp=225, ki=0, kd=12000, mode=3)

    # === 这里控制是否每秒打印 joints ===
    SHOW_RT_DUMP = True

    # 每个目标设备开启独立发送线程
    for c in targets:
        c.start_tx(template_payload=template_payload, ops_list=ops_list, show_rt_dump=SHOW_RT_DUMP)

    # 主线程只负责等退出
    try:
        while not stop_event.is_set():
            time.sleep(0.2)
    finally:
        stop_event.set()
        for c in clients.values():
            c.close()

# ================== 退出处理 ==================
def sig_handler(sig, frame):
    stop_event.set()

signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)

if __name__ == "__main__":
    main()