🤚傲意 ROH-A002

傲意灵巧手 ROH-A002,是模拟人体手部运动的高灵敏度末端执行装置,由手掌、手腕、五根手指、电机、传动部件等部分组成。
🕹️控制方式
官方协议主要面向 RS485/串口/CAN 等总线接口,Adam将其转换为以太网接口,使其作为Adam的一个节点子设备方便进行tpc/udp等以太网操作,更好的融入Adam机器人生态。
- 通讯流程:
tcp/udp<->485<->hand
以太网转发模块做 UDP 与 RS485 串口数据的透明转发,Adam通过 UDP 发原厂协议帧。 具体协议内容参考原厂产品手册。
🐍Python demo
import socket
import threading
import time
from typing import List, Tuple
FRAME_HEAD_0 = 0x55
FRAME_HEAD_1 = 0xAA
CMD_SET_FINGER_POS = 0x4C
CMD_SET_FINGER_POS_ALL = 0x50
LEFT_IP = "10.10.10.18" # 左手IP地址
RIGHT_IP = "10.10.10.38" # 右手IP地址
UDP_PORT = 2562
LEFT_HAND_ID = 0x02 # 左手ID
RIGHT_HAND_ID = 0x02 # 右手ID
MASTER_ID = 0x01
MOVE_SPEED = 0xFF
BOOT_DELAY_S = 4.0
STEP_DELAY_S = 1.2
def lrc_xor(data: bytes) -> int:
value = 0
for b in data:
value ^= b
return value & 0xFF
def build_frame(hand_id: int, master_id: int, cmd: int, data: bytes) -> bytes:
payload = bytes([hand_id & 0xFF, master_id & 0xFF, cmd & 0xFF, len(data) & 0xFF]) + data
checksum = lrc_xor(payload)
return bytes([FRAME_HEAD_0, FRAME_HEAD_1]) + payload + bytes([checksum])
def parse_frame(frame: bytes) -> Tuple[dict, str]:
if len(frame) < 7:
return {}, "frame_too_short"
if frame[0] != FRAME_HEAD_0 or frame[1] != FRAME_HEAD_1:
return {}, "bad_header"
data_len = frame[5]
expected_len = 7 + data_len
if len(frame) != expected_len:
return {}, "bad_length"
payload = frame[2:6 + data_len]
checksum = frame[-1]
if lrc_xor(payload) != checksum:
return {}, "bad_checksum"
cmd = frame[4]
data = frame[6:6 + data_len]
return {
"hand_id": frame[2],
"master_id": frame[3],
"cmd": cmd,
"data_len": data_len,
"data": data,
"checksum": checksum,
"is_error": (cmd & 0x80) != 0,
}, ""
def extract_frames(buf: bytes) -> List[bytes]:
frames = []
i = 0
n = len(buf)
while i + 7 <= n:
if buf[i] == FRAME_HEAD_0 and buf[i + 1] == FRAME_HEAD_1:
if i + 6 >= n:
break
data_len = buf[i + 5]
total_len = 7 + data_len
if i + total_len <= n:
frames.append(buf[i:i + total_len])
i += total_len
continue
i += 1
return frames
def send_frame(sock: socket.socket, target: Tuple[str, int], frame: bytes) -> None:
sock.sendto(frame, target)
def recv_frames(sock: socket.socket, timeout_s: float = 0.2) -> List[bytes]:
sock.settimeout(timeout_s)
try:
data = sock.recv(4096)
except socket.timeout:
return []
return extract_frames(data)
def set_finger_pos(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
finger_id: int, pos: int, speed: int) -> None:
data = bytes([
finger_id & 0xFF,
pos & 0xFF,
(pos >> 8) & 0xFF,
speed & 0xFF,
])
frame = build_frame(hand_id, master_id, CMD_SET_FINGER_POS, data)
send_frame(sock, target, frame)
def set_finger_pos_all(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
positions: List[int], speed: int) -> None:
if len(positions) != 6:
raise ValueError("positions must contain 6 values")
data = bytearray()
for pos in positions:
data.append(pos & 0xFF)
data.append((pos >> 8) & 0xFF)
data.append(speed & 0xFF)
frame = build_frame(hand_id, master_id, CMD_SET_FINGER_POS_ALL, bytes(data))
send_frame(sock, target, frame)
def print_responses(frames: List[bytes], tag: str) -> None:
for frame in frames:
info, err = parse_frame(frame)
if err:
print(f"[{tag}] recv invalid frame: {err} {frame.hex(' ')}")
continue
if info["is_error"]:
data = info["data"].hex(" ") if info["data"] else ""
print(f"[{tag}] recv error cmd=0x{info['cmd']:02X} data={data}")
else:
print(f"[{tag}] recv ok cmd=0x{info['cmd']:02X} data_len={info['data_len']} data={info['data'].hex(' ')}")
def make_udp_socket(timeout_s: float = 0.2) -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout_s)
return sock
def run_sequence(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
speed: int, step_delay: float, tag: str) -> None:
open_pos = 0
close_pos = 65535
close_others_first = [open_pos, close_pos, close_pos, close_pos, close_pos, open_pos]
close_thumb_last = [close_pos, close_pos, close_pos, close_pos, close_pos, close_pos]
open_thumb_first = [open_pos, close_pos, close_pos, close_pos, close_pos, close_pos]
open_others_last = [open_pos, open_pos, open_pos, open_pos, open_pos, open_pos]
set_finger_pos_all(sock, target, hand_id, master_id, open_thumb_first, speed)
print(f"[{tag}] send open thumb first")
print_responses(recv_frames(sock), tag)
time.sleep(step_delay)
set_finger_pos_all(sock, target, hand_id, master_id, open_others_last, speed)
print(f"[{tag}] send open others last")
print_responses(recv_frames(sock), tag)
time.sleep(1)
set_finger_pos_all(sock, target, hand_id, master_id, close_others_first, speed)
print(f"[{tag}] send close others first")
print_responses(recv_frames(sock), tag)
time.sleep(step_delay)
set_finger_pos_all(sock, target, hand_id, master_id, close_thumb_last, speed)
print(f"[{tag}] send close thumb last")
print_responses(recv_frames(sock), tag)
time.sleep(1)
def hand_worker(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
speed: int, step_delay: float, tag: str, once: bool) -> None:
while True:
run_sequence(sock, target, hand_id, master_id, speed, step_delay, tag)
if once:
break
def main() -> int:
left_target = (LEFT_IP, UDP_PORT)
right_target = (RIGHT_IP, UDP_PORT)
if BOOT_DELAY_S > 0:
time.sleep(BOOT_DELAY_S)
left_sock = make_udp_socket()
right_sock = make_udp_socket()
try:
left_thread = threading.Thread(
target=hand_worker,
args=(left_sock, left_target, LEFT_HAND_ID, MASTER_ID, MOVE_SPEED,
STEP_DELAY_S, "L", False),
daemon=True,
)
left_thread.start()
right_thread = threading.Thread(
target=hand_worker,
args=(right_sock, right_target, RIGHT_HAND_ID, MASTER_ID, MOVE_SPEED,
STEP_DELAY_S, "R", False),
daemon=True,
)
right_thread.start()
while left_thread.is_alive():
time.sleep(1)
finally:
left_sock.close()
right_sock.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())