🤚 星动X-Hand 1
星动X-Hand 1是一款具备高自由度、高性能、高智能性的五指机器人灵巧手。
🕹️控制方式
官方默认使用的是485接口,Adam将其转换为以太网接口,使其作为Adam的一个节点子设备方便进行tpc/udp等以太网操作,更好的融入Adam机器人生态。
- 通讯流程:
tcp/udp<->485<->hand
使用原厂协议,Adam只做了tcp/udp与485的协议转发。 具体协议内容参考原厂产品手册。
🐍Python demo
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import socket
import struct
import signal
import threading
from dataclasses import dataclass
from typing import Optional, List, Dict, Any
# ================== 配置 ==================
DEVICE_PORT = 2562
DEVICES = ["10.10.10.18", "10.10.10.38"]
SRC_ID = 0xFE
DST_ID = 0xFF
# 控制哪个设备:
# "all" / None -> 两台都控
# "10.10.10.18" -> 单控
# "10.10.10.38" -> 单控
CONTROL_ONLY = "all"
# 固定周期发送(秒)
SEND_PERIOD = 0.02
RT_WAIT_TIMEOUT = 0.05
HEAD = b"\x55\xAA"
MIN_FRAME_LEN = 9 # 2(head)+1+1+1+2(len)+2(crc)
CMD_GET_VERSION = 0x13
CMD_RT = 0x02
RT_EXPECT_PAYLOAD_LEN = 2208
stop_event = threading.Event()
# ================== CRC16 ==================
crc16tab = [
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
]
def crc16(buf: bytes) -> int:
crc = 0x0000
for b in buf:
crc = ((crc << 8) & 0xFFFF) ^ crc16tab[((crc >> 8) ^ b) & 0xFF]
return crc
def pack_frame(src: int, dst: int, cmd: int, payload: bytes = b"") -> bytes:
frame = bytearray()
frame += HEAD
frame += struct.pack("<BBB", src, dst, cmd)
frame += struct.pack("<H", len(payload))
frame += payload
frame += struct.pack("<H", crc16(frame))
return bytes(frame)
# ================== 拼帧解码器 ==================
class FrameDecoder:
def __init__(self, crc16_func, max_buf=1024 * 1024):
self.buf = bytearray()
self.crc16 = crc16_func
self.max_buf = max_buf
def feed(self, data: bytes):
if not data:
return []
self.buf += data
if len(self.buf) > self.max_buf:
self.buf = self.buf[-4096:]
out = []
while True:
idx = self.buf.find(HEAD)
if idx < 0:
if len(self.buf) > 1:
self.buf = self.buf[-1:]
break
if idx > 0:
del self.buf[:idx]
if len(self.buf) < MIN_FRAME_LEN:
break
data_len = struct.unpack_from("<H", self.buf, 5)[0]
need_len = 7 + data_len + 2
if len(self.buf) < need_len:
break
frame = bytes(self.buf[:need_len])
del self.buf[:need_len]
src, dst, cmd = frame[2], frame[3], frame[4]
payload = frame[7:7 + data_len]
recv_crc = struct.unpack_from("<H", frame, 7 + data_len)[0]
calc_crc = self.crc16(frame[:7 + data_len])
out.append({
"src": src,
"dst": dst,
"cmd": cmd,
"data_len": data_len,
"payload": payload,
"crc_ok": (recv_crc == calc_crc),
})
return out
# ================== 版本解析 ==================
def decode_ver_u32(v: int):
major = (v >> 24) & 0xFF
minor = (v >> 16) & 0xFF
rel = v & 0xFFFF
return major, minor, rel
def parse_versions_payload(payload: bytes) -> dict:
if len(payload) != 8:
raise ValueError(f"invalid version payload len: {len(payload)}")
sw_u32 = struct.unpack_from("<I", payload, 0)[0]
hw_u32 = struct.unpack_from("<I", payload, 4)[0]
sw = decode_ver_u32(sw_u32)
hw = decode_ver_u32(hw_u32)
return {
"sw_u32": sw_u32,
"hw_u32": hw_u32,
"sw_str": f"{sw[0]}.{sw[1]}.{sw[2]}",
"hw_str": f"{hw[0]}.{hw[1]}.{hw[2]}",
}
# ================== RT_REPORT 解析 ==================
JOINT_STATE_SIZE = 24
RT_JOINT_COUNT = 12
def parse_rt_report_2208(payload: bytes) -> dict:
if len(payload) != RT_EXPECT_PAYLOAD_LEN:
raise ValueError("rt payload length error")
joints = []
off = 0
for i in range(RT_JOINT_COUNT):
joint_id, pos, torque, *d = struct.unpack_from("<HfH8H", payload, off)
joints.append({
"id": i,
"pos": pos,
"torque": torque,
"defaults": d,
})
off += JOINT_STATE_SIZE
tips = []
for i in range(5):
fx, fy, fz = struct.unpack_from("<bbB", payload, off)
temp_sum = payload[off + 383]
tips.append({"idx": i, "fx": fx, "fy": fy, "fz": fz, "temp_sum": temp_sum})
off += 384
return {"joints": joints, "tips": tips}
# ===== 你要求新增/使用的 dump_rt(保持原样) =====
def dump_rt(rt: dict) -> str:
lines = ["JOINTS:"]
for j in rt["joints"]:
lines.append(f" J{j['id']:02d} | pos={j['pos']:+7.3f} | tq={j['torque']:5d}")
return "\n".join(lines)
# ================== RT_CTRL 高效打包 ==================
JOINT_FMT = "<H h h h f H H H H H H"
JOINT_SIZE = struct.calcsize(JOINT_FMT) # 24
assert JOINT_SIZE == 24
def build_rt_ctrl_template_payload(kp=225, ki=0, kd=12000, mode=3, init_pos=0.0, init_force=0) -> bytearray:
payload = bytearray(JOINT_SIZE * 12)
for jid in range(12):
struct.pack_into(
JOINT_FMT, payload, jid * JOINT_SIZE,
jid, kp, ki, kd, float(init_pos), int(init_force), int(mode),
0, 0, 0, 0
)
return payload
def apply_ops_to_payload(payload: bytearray, ops, kp=225, ki=0, kd=12000, mode=3):
for joint_id, position, force_limit in ops:
jid = int(joint_id)
if not (0 <= jid < 12):
continue
struct.pack_into(
JOINT_FMT, payload, jid * JOINT_SIZE,
jid, kp, ki, kd, float(position), int(force_limit), int(mode),
0, 0, 0, 0
)
@dataclass
class RtLast:
t: float = 0.0
data_len: int = 0
rt: Optional[dict] = None
@dataclass
class Stats:
sec_t0: float
ok: int = 0
timeout: int = 0
def make_periodic_checker(period_s: float, fire_immediately: bool = True):
last_fire = None
def check() -> bool:
nonlocal last_fire
now = time.monotonic()
if last_fire is None:
last_fire = now
return bool(fire_immediately)
if now - last_fire >= period_s:
last_fire = now
return True
return False
return check
class DeviceClient:
"""
每个设备:
- 独立 socket
- 独立 rx 线程
- 独立 tx 线程(固定周期发送,互不影响)
- 发送后RT_WAIT_TIMEOUT窗口统计 ok/timeout(不丢包:Condition+seq)
"""
def __init__(self, ip: str, dev_port: int):
self.ip = ip
self.target = (ip, dev_port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(("0.0.0.0", 0))
self.sock.settimeout(0.5)
self.decoder = FrameDecoder(crc16)
self.rx_th = threading.Thread(target=self._recv_loop, daemon=True)
self.tx_th = threading.Thread(target=self._send_loop, daemon=True)
# 版本等待
self.ver_event = threading.Event()
self.ver_last: Optional[dict] = None
# RT: Condition + seq
self.rt_cv = threading.Condition()
self.rt_seq = 0
self.rt_last = RtLast()
# 统计(每IP独立)
self.stats = Stats(sec_t0=time.time())
# 发送内容由主线程配置
self._template_payload: Optional[bytearray] = None
self._ops_list: Optional[List[Any]] = None
self._ops_cnt = 0
self._step_check = make_periodic_checker(0.5, fire_immediately=False)
self._show_rt_dump = False
def start(self):
self.rx_th.start()
def start_tx(self, template_payload: bytearray, ops_list: List[Any], show_rt_dump: bool = False):
self._template_payload = template_payload
self._ops_list = ops_list
self._show_rt_dump = show_rt_dump
self.tx_th.start()
def close(self):
try:
self.sock.close()
except Exception:
pass
# ---------- RX ----------
def _recv_loop(self):
while not stop_event.is_set():
try:
data, peer = self.sock.recvfrom(8192)
if peer[0] != self.ip:
continue
for msg in self.decoder.feed(data):
self._on_frame(msg)
except socket.timeout:
continue
except OSError:
break
def _on_frame(self, msg: dict):
if not msg.get("crc_ok"):
return
if msg["cmd"] == CMD_GET_VERSION:
try:
ver = parse_versions_payload(msg["payload"])
self.ver_last = ver
print(f"[VER {self.ip}] SW={ver['sw_str']} (0x{ver['sw_u32']:08X}) HW={ver['hw_str']} (0x{ver['hw_u32']:08X})")
self.ver_event.set()
except Exception as e:
print(f"[VER {self.ip}] parse error:", e)
elif msg["cmd"] == CMD_RT:
if msg["data_len"] != RT_EXPECT_PAYLOAD_LEN:
return
try:
rt = parse_rt_report_2208(msg["payload"])
with self.rt_cv:
self.rt_last.t = time.time()
self.rt_last.data_len = msg["data_len"]
self.rt_last.rt = rt
self.rt_seq += 1
self.rt_cv.notify_all()
except Exception as e:
print(f"[RT {self.ip}] parse error:", e)
# ---------- GET_VERSION ----------
def send_get_version(self):
frame = pack_frame(SRC_ID, DST_ID, CMD_GET_VERSION, b"")
self.sock.sendto(frame, self.target)
def send_get_version_and_wait(self, timeout=1.0, retry=5, interval=0.2) -> dict:
self.ver_event.clear()
for _ in range(retry):
self.send_get_version()
if self.ver_event.wait(timeout=timeout) and self.ver_last:
return self.ver_last
time.sleep(interval)
raise TimeoutError(f"GET_VERSION timeout for {self.ip} after {retry} retries")
# ---------- RT_CTRL send ----------
def send_rt_ctrl_ops_fast(self, template_payload: bytearray, ops, kp=225, ki=0, kd=12000, mode=3):
payload = bytearray(template_payload)
apply_ops_to_payload(payload, ops, kp=kp, ki=ki, kd=kd, mode=mode)
frame = pack_frame(SRC_ID, DST_ID, CMD_RT, payload)
self.sock.sendto(frame, self.target)
# ---------- wait+count ----------
def wait_rt_and_count(self, timeout=0.2) -> bool:
deadline = time.monotonic() + timeout
with self.rt_cv:
start_seq = self.rt_seq
while self.rt_seq == start_seq and not stop_event.is_set():
remain = deadline - time.monotonic()
if remain <= 0:
self.stats.timeout += 1
return False
self.rt_cv.wait(timeout=remain)
if self.rt_seq != start_seq:
self.stats.ok += 1
return True
self.stats.timeout += 1
return False
def maybe_print_stats_each_second(self):
now = time.time()
if now - self.stats.sec_t0 >= 1.0:
dt = now - self.stats.sec_t0
hz = self.stats.ok / dt if dt > 0 else 0.0
age = (now - self.rt_last.t) if self.rt_last.t else 999.0
print(f"[STAT {self.ip}] ok={self.stats.ok}/s ({hz:.1f}Hz) timeout={self.stats.timeout}/s last_rt: len={self.rt_last.data_len} age={age:.3f}s")
# === 这里是你要的 dump_rt 打印 ===
if self._show_rt_dump and self.rt_last.rt:
# 缩进一下,方便阅读
print(" [RT]")
print(" " + dump_rt(self.rt_last.rt).replace("\n", "\n "))
self.stats.sec_t0 = now
self.stats.ok = 0
self.stats.timeout = 0
# ---------- TX loop (固定周期,独立线程) ----------
def _send_loop(self):
assert self._template_payload is not None
assert self._ops_list is not None
next_tick = time.monotonic() # 立即发第一帧
while not stop_event.is_set():
now_mono = time.monotonic()
# 睡到下一拍
if now_mono < next_tick:
time.sleep(next_tick - now_mono)
now_mono = time.monotonic()
# 推进下一拍(绝对时间,避免漂移)
next_tick += SEND_PERIOD
# 如果严重落后,跳帧对齐,避免积压
late = time.monotonic() - next_tick
if late > 2 * SEND_PERIOD:
next_tick = time.monotonic() + SEND_PERIOD
# 每秒切换动作序列(每设备独立)
if self._step_check():
self._ops_cnt = (self._ops_cnt + 1) % len(self._ops_list)
# 发送
ops = self._ops_list[self._ops_cnt]
self.send_rt_ctrl_ops_fast(self._template_payload, ops)
# 发送后窗口统计(每设备独立线程)
self.wait_rt_and_count(timeout=RT_WAIT_TIMEOUT)
# 每秒输出统计(每设备独立)
self.maybe_print_stats_each_second()
def select_targets(clients: Dict[str, DeviceClient]) -> List[DeviceClient]:
if CONTROL_ONLY is None or CONTROL_ONLY == "all":
return [clients[ip] for ip in DEVICES]
return [clients[CONTROL_ONLY]]
# ================== 主程序 ==================
def main():
clients: Dict[str, DeviceClient] = {ip: DeviceClient(ip, DEVICE_PORT) for ip in DEVICES}
for c in clients.values():
c.start()
targets = select_targets(clients)
print("=== TARGETS ===", [c.ip for c in targets])
# 先拿版本:目标设备都成功才继续
print("=== GET VERSION ===")
try:
for c in targets:
ver = c.send_get_version_and_wait(timeout=1.0, retry=5, interval=0.2)
print(f"[INFO] {c.ip} Version OK: SW={ver['sw_str']} HW={ver['hw_str']}")
except TimeoutError as e:
print("[ERROR]", e)
stop_event.set()
for c in clients.values():
c.close()
return
print("=== Start RT loop (per-device TX threads) ===")
ops_list = [
[(i, 0, 150) for i in range(12)],
[(10, 1, 150)], [(11, 1, 150)], [(11, 0, 150)], [(10, 0, 150)],
[(8, 1, 150)], [(9, 1, 150)], [(9, 0, 150)], [(8, 0, 150)],
[(6, 1, 150)], [(7, 1, 150)], [(7, 0, 150)], [(6, 0, 150)],
[(3, 1, 150)], [(4, 1, 150)], [(5, 1, 150)], [(5, 0, 150)], [(4, 0, 150)], [(3, 0, 150)],
[(0, 1, 150)], [(0, 0, 150)], [(1, 1, 150)], [(2, 1, 150)], [(2, 0, 150)], [(1, 0, 150)],
[(i, 0, 150) for i in range(12)],
[(0, 0, 150),
(1, 1, 150),
(2, 1, 150),
(3, 0, 150),
(4, 1, 150),
(5, 1, 150),
(6, 1, 150),
(7, 1, 150),
(8, 1, 150),
(9, 1, 150),
(10, 1, 150),
(11, 1, 150)],
]
template_payload = build_rt_ctrl_template_payload(kp=225, ki=0, kd=12000, mode=3)
# === 这里控制是否每秒打印 joints ===
SHOW_RT_DUMP = True
# 每个目标设备开启独立发送线程
for c in targets:
c.start_tx(template_payload=template_payload, ops_list=ops_list, show_rt_dump=SHOW_RT_DUMP)
# 主线程只负责等退出
try:
while not stop_event.is_set():
time.sleep(0.2)
finally:
stop_event.set()
for c in clients.values():
c.close()
# ================== 退出处理 ==================
def sig_handler(sig, frame):
stop_event.set()
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
if __name__ == "__main__":
main()