Skip to content

🤚OYMotion ROH-AP001

ROH-AP001

The OYMotion second-generation dexterous hand ROH-AP001 features a lightweight design of 640 g and a height of 184 mm. Combined with an industrial-grade bionic structure, it achieves dexterity and precise manipulation close to that of human fingers.

🕹️Control Method

The official protocol mainly targets bus interfaces such as RS485/serial/CAN. Adam converts it to an Ethernet interface, allowing the hand to function as a node device within the Adam ecosystem, enabling Ethernet operations such as TCP/UDP more easily.

  • Communication process: TCP/UDP <-> 485 <-> hand

The Ethernet forwarding module performs transparent forwarding between UDP and RS485 serial data, and Adam sends the original manufacturer's protocol frames over UDP.
For specific protocol details, refer to the Original Product Manual.

🐍Python demo

import socket
import threading
import time
from typing import List, Tuple

FRAME_HEAD_0 = 0x55
FRAME_HEAD_1 = 0xAA

CMD_SET_FINGER_POS = 0x4C
CMD_SET_FINGER_POS_ALL = 0x50

LEFT_IP = "10.10.10.18"  # Left hand IP address
RIGHT_IP = "10.10.10.38"  # Right hand IP address
UDP_PORT = 2562
LEFT_HAND_ID = 0x02  # Left hand ID
RIGHT_HAND_ID = 0x02  # Right hand ID
MASTER_ID = 0x01
MOVE_SPEED = 0xFF
BOOT_DELAY_S = 4.0
STEP_DELAY_S = 1.2


def lrc_xor(data: bytes) -> int:
    value = 0
    for b in data:
        value ^= b
    return value & 0xFF

def build_frame(hand_id: int, master_id: int, cmd: int, data: bytes) -> bytes:
    payload = bytes([hand_id & 0xFF, master_id & 0xFF, cmd & 0xFF, len(data) & 0xFF]) + data
    checksum = lrc_xor(payload)
    return bytes([FRAME_HEAD_0, FRAME_HEAD_1]) + payload + bytes([checksum])

def parse_frame(frame: bytes) -> Tuple[dict, str]:
    if len(frame) < 7:
        return {}, "frame_too_short"
    if frame[0] != FRAME_HEAD_0 or frame[1] != FRAME_HEAD_1:
        return {}, "bad_header"
    data_len = frame[5]
    expected_len = 7 + data_len
    if len(frame) != expected_len:
        return {}, "bad_length"
    payload = frame[2:6 + data_len]
    checksum = frame[-1]
    if lrc_xor(payload) != checksum:
        return {}, "bad_checksum"

    cmd = frame[4]
    data = frame[6:6 + data_len]
    return {
        "hand_id": frame[2],
        "master_id": frame[3],
        "cmd": cmd,
        "data_len": data_len,
        "data": data,
        "checksum": checksum,
        "is_error": (cmd & 0x80) != 0,
    }, ""

def extract_frames(buf: bytes) -> List[bytes]:
    frames = []
    i = 0
    n = len(buf)
    while i + 7 <= n:
        if buf[i] == FRAME_HEAD_0 and buf[i + 1] == FRAME_HEAD_1:
            if i + 6 >= n:
                break
            data_len = buf[i + 5]
            total_len = 7 + data_len
            if i + total_len <= n:
                frames.append(buf[i:i + total_len])
                i += total_len
                continue
        i += 1
    return frames

def send_frame(sock: socket.socket, target: Tuple[str, int], frame: bytes) -> None:
    sock.sendto(frame, target)

def recv_frames(sock: socket.socket, timeout_s: float = 0.2) -> List[bytes]:
    sock.settimeout(timeout_s)
    try:
        data = sock.recv(4096)
    except socket.timeout:
        return []
    return extract_frames(data)

def set_finger_pos(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                   finger_id: int, pos: int, speed: int) -> None:
    data = bytes([
        finger_id & 0xFF,
        pos & 0xFF,
        (pos >> 8) & 0xFF,
        speed & 0xFF,
    ])
    frame = build_frame(hand_id, master_id, CMD_SET_FINGER_POS, data)
    send_frame(sock, target, frame)

def set_finger_pos_all(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                       positions: List[int], speed: int) -> None:
    if len(positions) != 6:
        raise ValueError("positions must contain 6 values")
    data = bytearray()
    for pos in positions:
        data.append(pos & 0xFF)
        data.append((pos >> 8) & 0xFF)
        data.append(speed & 0xFF)
    frame = build_frame(hand_id, master_id, CMD_SET_FINGER_POS_ALL, bytes(data))
    send_frame(sock, target, frame)

def print_responses(frames: List[bytes], tag: str) -> None:
    for frame in frames:
        info, err = parse_frame(frame)
        if err:
            print(f"[{tag}] recv invalid frame: {err} {frame.hex(' ')}")
            continue
        if info["is_error"]:
            data = info["data"].hex(" ") if info["data"] else ""
            print(f"[{tag}] recv error cmd=0x{info['cmd']:02X} data={data}")
        else:
            print(f"[{tag}] recv ok cmd=0x{info['cmd']:02X} data_len={info['data_len']} data={info['data'].hex(' ')}")

def make_udp_socket(timeout_s: float = 0.2) -> socket.socket:
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(timeout_s)
    return sock

def run_sequence(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                 speed: int, step_delay: float, tag: str) -> None:
    open_pos = 0
    close_pos = 65535

    close_others_first = [open_pos, close_pos, close_pos, close_pos, close_pos, open_pos]
    close_thumb_last = [close_pos, close_pos, close_pos, close_pos, close_pos, close_pos]
    open_thumb_first = [open_pos, close_pos, close_pos, close_pos, close_pos, close_pos]
    open_others_last = [open_pos, open_pos, open_pos, open_pos, open_pos, open_pos]

    set_finger_pos_all(sock, target, hand_id, master_id, open_thumb_first, speed)
    print(f"[{tag}] send open thumb first")
    print_responses(recv_frames(sock), tag)
    time.sleep(step_delay)

    set_finger_pos_all(sock, target, hand_id, master_id, open_others_last, speed)
    print(f"[{tag}] send open others last")
    print_responses(recv_frames(sock), tag)
    time.sleep(1)

    set_finger_pos_all(sock, target, hand_id, master_id, close_others_first, speed)
    print(f"[{tag}] send close others first")
    print_responses(recv_frames(sock), tag)
    time.sleep(step_delay)

    set_finger_pos_all(sock, target, hand_id, master_id, close_thumb_last, speed)
    print(f"[{tag}] send close thumb last")
    print_responses(recv_frames(sock), tag)
    time.sleep(1)

def hand_worker(sock: socket.socket, target: Tuple[str, int], hand_id: int, master_id: int,
                speed: int, step_delay: float, tag: str, once: bool) -> None:
    while True:
        run_sequence(sock, target, hand_id, master_id, speed, step_delay, tag)
        if once:
            break

def main() -> int:
    left_target = (LEFT_IP, UDP_PORT)
    right_target = (RIGHT_IP, UDP_PORT)

    if BOOT_DELAY_S > 0:
        time.sleep(BOOT_DELAY_S)

    left_sock = make_udp_socket()
    right_sock = make_udp_socket()

    try:
        left_thread = threading.Thread(
            target=hand_worker,
            args=(left_sock, left_target, LEFT_HAND_ID, MASTER_ID, MOVE_SPEED,
                  STEP_DELAY_S, "L", False),
            daemon=True,
        )
        left_thread.start()

        right_thread = threading.Thread(
            target=hand_worker,
            args=(right_sock, right_target, RIGHT_HAND_ID, MASTER_ID, MOVE_SPEED,
                  STEP_DELAY_S, "R", False),
            daemon=True,
        )
        right_thread.start()

        while left_thread.is_alive():
            time.sleep(1)
    finally:
        left_sock.close()
        right_sock.close()

    return 0

if __name__ == "__main__":
    raise SystemExit(main())