跳转至

🤚傲意 ROH-AP001

ROH-AP001

傲意第二代灵巧手 ROH-AP001,以 640 克轻量化设计,184 毫米高度结合工业级仿生结构,实现了接近人类手指的灵活度与精准操控能力。

🕹️控制方式

官方协议主要面向 RS485/串口/CAN 等总线接口,Adam将其转换为以太网接口,使其作为Adam的一个节点子设备方便进行tpc/udp等以太网操作,更好的融入Adam机器人生态。

  • 通讯流程:tcp/udp <-> 485 <-> hand

以太网转发模块做 UDP 与 RS485 串口数据的透明转发,Adam通过 UDP 发原厂协议帧。
具体协议内容参考原厂产品手册

🐍Python demo

import socket
import threading
import time
from typing import List, Tuple

FRAME_HEAD_0 = 0x55
FRAME_HEAD_1 = 0xAA

CMD_SET_FINGER_POS = 0x4C
CMD_SET_FINGER_POS_ALL = 0x50

LEFT_IP = "10.10.10.18"  # 左手IP地址
RIGHT_IP = "10.10.10.38"  # 右手IP地址
UDP_PORT = 2562
LEFT_HAND_ID = 0x02  # 左手ID
RIGHT_HAND_ID = 0x02  # 右手ID
MASTER_ID = 0x01
MOVE_SPEED = 0xFF
BOOT_DELAY_S = 4.0
STEP_DELAY_S = 1.2


def lrc_xor(data: bytes) -> int:
    value = 0
    for b in data:
        value ^= b
    return value & 0xFF

def build_frame(hand_id: int, master_id: int, cmd: int, data: bytes) -> bytes:
    payload = bytes([hand_id & 0xFF, master_id & 0xFF, cmd & 0xFF, len(data) & 0xFF]) + data
    checksum = lrc_xor(payload)
    return bytes([FRAME_HEAD_0, FRAME_HEAD_1]) + payload + bytes([checksum])

def parse_frame(frame: bytes) -> Tuple[dict, str]:
    if len(frame) < 7:
        return {}, "frame_too_short"
    if frame[0] != FRAME_HEAD_0 or frame[1] != FRAME_HEAD_1:
        return {}, "bad_header"
    data_len = frame[5]
    expected_len = 7 + data_len
    if len(frame) != expected_len:
        return {}, "bad_length"
    payload = frame[2:6 + data_len]
    checksum = frame[-1]
    if lrc_xor(payload) != checksum:
        return {}, "bad_checksum"

    cmd = frame[4]
    data = frame[6:6 + data_len]
    return {
        "hand_id": frame[2],
        "master_id": frame[3],
        "cmd": cmd,
        "data_len": data_len,
        "data": data,
        "checksum": checksum,
        "is_error": (cmd & 0x80) != 0,
    }, ""

def extract_frames(buf: bytes) -> List[bytes]:
    frames = []
    i = 0
    n = len(buf)
    while i + 7 <= n:
        if buf[i] == FRAME_HEAD_0 and buf[i + 1] == FRAME_HEAD_1:
            if i + 6 >= n:
                break
            data_len = buf[i + 5]
            total_len = 7 + data_len
            if i + total_len <= n:
                frames.append(buf[i:i + total_len])
                i += total_len
                continue
        i += 1
    return frames

def send_frame(sock: socket.socket, target: Tuple[str, int], frame: bytes) -> None:
    sock.sendto(frame, target)

def recv_frames(sock: socket.socket, timeout_s: float = 0.2) -> List[bytes]:
    sock.settimeout(timeout_s)
    try:
        data = sock.recv(4096)
    except socket.timeout:
        return []
    return extract_frames(data)

def set_finger_pos(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                   finger_id: int, pos: int, speed: int) -> None:
    data = bytes([
        finger_id & 0xFF,
        pos & 0xFF,
        (pos >> 8) & 0xFF,
        speed & 0xFF,
    ])
    frame = build_frame(hand_id, master_id, CMD_SET_FINGER_POS, data)
    send_frame(sock, target, frame)

def set_finger_pos_all(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                       positions: List[int], speed: int) -> None:
    if len(positions) != 6:
        raise ValueError("positions must contain 6 values")
    data = bytearray()
    for pos in positions:
        data.append(pos & 0xFF)
        data.append((pos >> 8) & 0xFF)
        data.append(speed & 0xFF)
    frame = build_frame(hand_id, master_id, CMD_SET_FINGER_POS_ALL, bytes(data))
    send_frame(sock, target, frame)

def print_responses(frames: List[bytes], tag: str) -> None:
    for frame in frames:
        info, err = parse_frame(frame)
        if err:
            print(f"[{tag}] recv invalid frame: {err} {frame.hex(' ')}")
            continue
        if info["is_error"]:
            data = info["data"].hex(" ") if info["data"] else ""
            print(f"[{tag}] recv error cmd=0x{info['cmd']:02X} data={data}")
        else:
            print(f"[{tag}] recv ok cmd=0x{info['cmd']:02X} data_len={info['data_len']} data={info['data'].hex(' ')}")

def make_udp_socket(timeout_s: float = 0.2) -> socket.socket:
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(timeout_s)
    return sock

def run_sequence(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                 speed: int, step_delay: float, tag: str) -> None:
    open_pos = 0
    close_pos = 65535

    close_others_first = [open_pos, close_pos, close_pos, close_pos, close_pos, open_pos]
    close_thumb_last = [close_pos, close_pos, close_pos, close_pos, close_pos, close_pos]
    open_thumb_first = [open_pos, close_pos, close_pos, close_pos, close_pos, close_pos]
    open_others_last = [open_pos, open_pos, open_pos, open_pos, open_pos, open_pos]

    set_finger_pos_all(sock, target, hand_id, master_id, open_thumb_first, speed)
    print(f"[{tag}] send open thumb first")
    print_responses(recv_frames(sock), tag)
    time.sleep(step_delay)

    set_finger_pos_all(sock, target, hand_id, master_id, open_others_last, speed)
    print(f"[{tag}] send open others last")
    print_responses(recv_frames(sock), tag)
    time.sleep(1)

    set_finger_pos_all(sock, target, hand_id, master_id, close_others_first, speed)
    print(f"[{tag}] send close others first")
    print_responses(recv_frames(sock), tag)
    time.sleep(step_delay)

    set_finger_pos_all(sock, target, hand_id, master_id, close_thumb_last, speed)
    print(f"[{tag}] send close thumb last")
    print_responses(recv_frames(sock), tag)
    time.sleep(1)

def hand_worker(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                speed: int, step_delay: float, tag: str, once: bool) -> None:
    while True:
        run_sequence(sock, target, hand_id, master_id, speed, step_delay, tag)
        if once:
            break

def main() -> int:
    left_target = (LEFT_IP, UDP_PORT)
    right_target = (RIGHT_IP, UDP_PORT)

    if BOOT_DELAY_S > 0:
        time.sleep(BOOT_DELAY_S)

    left_sock = make_udp_socket()
    right_sock = make_udp_socket()

    try:
        left_thread = threading.Thread(
            target=hand_worker,
            args=(left_sock, left_target, LEFT_HAND_ID, MASTER_ID, MOVE_SPEED,
                  STEP_DELAY_S, "L", False),
            daemon=True,
        )
        left_thread.start()

        right_thread = threading.Thread(
            target=hand_worker,
            args=(right_sock, right_target, RIGHT_HAND_ID, MASTER_ID, MOVE_SPEED,
                  STEP_DELAY_S, "R", False),
            daemon=True,
        )
        right_thread.start()

        while left_thread.is_alive():
            time.sleep(1)
    finally:
        left_sock.close()
        right_sock.close()

    return 0

if __name__ == "__main__":
    raise SystemExit(main())