跳转至

🦞智元灵犀X1 OmniPicker

智元灵犀X1 OmniPicker是一款自适应的通用夹爪。融合了不同模态的夹爪设计的优点,只用一个主动自由度就可以实现各种不同形状物体的抓取。

🕹️控制方式

夹爪官方默认使用的是串口/485接口,Adam将其转换为以太网接口,使其作为Adam的一个节点子设备方便得进行tpc/udp等以太网操作,更好的融入Adam机器人生态。

  • 通讯流程:udp/tcp <-> 串口 <-> 485 <-> hand

使用原厂协议,Adam只做了udp/tcp与485的协议转发(只转发原厂协议,不是透传)。
具体协议内容参考原厂产品手册

pos_cmd

Pos Cmd:目标位置,数值范围0- FF,0为夹紧,FF为完全张开

🐍Python demo

import time
import socket
import threading
import signal
import sys

# ---------------- 基本配置 ----------------
port   = 2562
addr_l = '10.10.10.18'
addr_r = '10.10.10.38'

# ---------------- 工具函数 ----------------
def hexdump(b: bytes) -> str:
    return ' '.join(f'{x:02X}' for x in b)

def checksum(buf: bytes) -> int:
    ret = 0
    for i in buf:
        ret += i
    return (~ret) & 0xFF

def make_frame(node_id: int, pos=0x7F, force=0xFF, vel=0xFF, acc=0xFF, dec=0xFF) -> bytes:
    """帧格式: D0-D1:0x4141, D2:ID, D3:Resv, D4~D8:5个Cmd(各1字节),
       D9:Resv, D10:Resv, D11:CheckSum(D2~D10)"""
    def b(x): return int(x) & 0xFF
    frame = bytearray()
    frame += b'\x41\x41'          # D0-D1: Frame Head 0x4141
    frame.append(b(node_id))      # D2: ID
    frame.append(0x00)            # D3: Reserved
    frame.append(b(pos))          # D4: Pos Cmd
    frame.append(b(force))        # D5: Force Cmd
    frame.append(b(vel))          # D6: Vel Cmd
    frame.append(b(acc))          # D7: Acc Cmd
    frame.append(b(dec))          # D8: Dec Cmd
    frame.append(0x00)            # D9: Reserved
    frame.append(0x00)            # D10: Reserved
    frame.append(checksum(frame[2:]))  # D11: CheckSum
    return bytes(frame)

# ---------------- 状态帧解析 ----------------
def parse_status_frame(pkt: bytes):
    """
    解析 12 字节状态帧:
    D0-D1: 0x4141 | D2:ID | D3:Fault | D4:State | D5:Pos | D6:Vel | D7:Force
    D8~D10: Reserved | D11: CheckSum = ~(D2..D10和) & 0xFF
    """
    if len(pkt) != 12:
        return None, "len!=12"
    if pkt[0] != 0x41 or pkt[1] != 0x41:
        return None, "bad head"
    expect = checksum(pkt[2:11])
    if pkt[11] != expect:
        return None, f"bad checksum got=0x{pkt[11]:02X} expect=0x{expect:02X}"

    d = {
        "id": pkt[2], "fault": pkt[3], "state": pkt[4],
        "pos": pkt[5], "vel": pkt[6], "force": pkt[7],
        "rsv8": pkt[8], "rsv9": pkt[9], "rsv10": pkt[10],
        "checksum": pkt[11],
    }
    return d, None

def extract_and_parse_all(data: bytes):
    """在任意数据里扫描 0x41 0x41 开头的 12 字节帧,逐帧解析。"""
    results, i, n = [], 0, len(data)
    while i + 12 <= n:
        if data[i] == 0x41 and data[i+1] == 0x41:
            pkt = data[i:i+12]
            d, err = parse_status_frame(pkt)
            results.append((pkt, d, err))
            i += 12
        else:
            i += 1
    return results

def pretty_status(d: dict) -> str:
    return (f"ID={d['id']:02X} Fault={d['fault']:02X} State={d['state']:02X} "
            f"Pos={d['pos']:02X} Vel={d['vel']:02X} Force={d['force']:02X} "
            f"RSV8={d['rsv8']:02X} RSV9={d['rsv9']:02X} RSV10={d['rsv10']:02X}")

def handle_recv(dat: bytes, tag: str):
    items = extract_and_parse_all(dat)
    if not items:
        print(f"[{tag}] recv (no frame): {hexdump(dat)}")
        return
    for pkt, d, err in items:
        if err:
            print(f"[{tag}] recv ERR {err}: {hexdump(pkt)}")
        else:
            print(f"[{tag}] recv OK  {pretty_status(d)} | raw: {hexdump(pkt)}")

def hand_ctrl(sock: socket.socket, addr: str, port: int, tag: str, node_id: int = 1,pos: int = 0):
    target = (addr, port)
    data = make_frame(node_id=node_id, pos=pos)
    sock.sendto(data, target)
    print(f"[{tag}] send: {hexdump(data)}")
    try:
        recv = sock.recv(1024)
        handle_recv(recv, tag)
    except socket.timeout:
        print(f"[{tag}] recv timeout") 

def hand_l_worker(sock: socket.socket, addr: str, port: int, tag: str,
                node_id: int = 1):

    sock.settimeout(1.0)

    while True:
        hand_ctrl(sock, addr, port, tag, node_id, 0xFF)
        time.sleep(1)
        hand_ctrl(sock, addr, port, tag, node_id, 0x7F)
        time.sleep(1)
        hand_ctrl(sock, addr, port, tag, node_id, 0x00)
        time.sleep(1)

def hand_r_worker(sock: socket.socket, addr: str, port: int, tag: str,
                node_id: int = 1):

    sock.settimeout(1.0)

    while True:
        hand_ctrl(sock, addr, port, tag, node_id, 0xFF)
        time.sleep(1)
        hand_ctrl(sock, addr, port, tag, node_id, 0x00)
        time.sleep(1)

# ---------------- 启动与收尾 ----------------
def sig_handler(signum, frame):
    print('You pressed Ctrl + C!')
    sys.exit(0)

def make_udp_socket(timeout_s=1.0) -> socket.socket:
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.settimeout(timeout_s)
    return s

signal.signal(signal.SIGINT, sig_handler)

sok_l = make_udp_socket()
sok_r = make_udp_socket()

t_l = threading.Thread(target=hand_l_worker, args=(sok_l, addr_l, port, "L"))
t_r = threading.Thread(target=hand_r_worker, args=(sok_r, addr_r, port, "R"))

t_l.daemon = True
t_r.daemon = True
t_l.start()
t_r.start()

while True:
    time.sleep(1)