🦞智元灵犀X1 OmniPicker
智元灵犀X1 OmniPicker是一款自适应的通用夹爪。融合了不同模态的夹爪设计的优点,只用一个主动自由度就可以实现各种不同形状物体的抓取。
🕹️控制方式
夹爪官方默认使用的是串口/485接口,Adam将其转换为以太网接口,使其作为Adam的一个节点子设备方便得进行tpc/udp等以太网操作,更好的融入Adam机器人生态。
- 通讯流程:
udp/tcp<->串口<->485<->hand
使用原厂协议,Adam只做了udp/tcp与485的协议转发(只转发原厂协议,不是透传)。 具体协议内容参考原厂产品手册。

Pos Cmd:目标位置,数值范围0- FF,0为夹紧,FF为完全张开
🐍Python demo
import time
import socket
import threading
import signal
import sys
# ---------------- 基本配置 ----------------
port = 2562
addr_l = '10.10.10.18'
addr_r = '10.10.10.38'
# ---------------- 工具函数 ----------------
def hexdump(b: bytes) -> str:
return ' '.join(f'{x:02X}' for x in b)
def checksum(buf: bytes) -> int:
ret = 0
for i in buf:
ret += i
return (~ret) & 0xFF
def make_frame(node_id: int, pos=0x7F, force=0xFF, vel=0xFF, acc=0xFF, dec=0xFF) -> bytes:
"""帧格式: D0-D1:0x4141, D2:ID, D3:Resv, D4~D8:5个Cmd(各1字节),
D9:Resv, D10:Resv, D11:CheckSum(D2~D10)"""
def b(x): return int(x) & 0xFF
frame = bytearray()
frame += b'\x41\x41' # D0-D1: Frame Head 0x4141
frame.append(b(node_id)) # D2: ID
frame.append(0x00) # D3: Reserved
frame.append(b(pos)) # D4: Pos Cmd
frame.append(b(force)) # D5: Force Cmd
frame.append(b(vel)) # D6: Vel Cmd
frame.append(b(acc)) # D7: Acc Cmd
frame.append(b(dec)) # D8: Dec Cmd
frame.append(0x00) # D9: Reserved
frame.append(0x00) # D10: Reserved
frame.append(checksum(frame[2:])) # D11: CheckSum
return bytes(frame)
# ---------------- 状态帧解析 ----------------
def parse_status_frame(pkt: bytes):
"""
解析 12 字节状态帧:
D0-D1: 0x4141 | D2:ID | D3:Fault | D4:State | D5:Pos | D6:Vel | D7:Force
D8~D10: Reserved | D11: CheckSum = ~(D2..D10和) & 0xFF
"""
if len(pkt) != 12:
return None, "len!=12"
if pkt[0] != 0x41 or pkt[1] != 0x41:
return None, "bad head"
expect = checksum(pkt[2:11])
if pkt[11] != expect:
return None, f"bad checksum got=0x{pkt[11]:02X} expect=0x{expect:02X}"
d = {
"id": pkt[2], "fault": pkt[3], "state": pkt[4],
"pos": pkt[5], "vel": pkt[6], "force": pkt[7],
"rsv8": pkt[8], "rsv9": pkt[9], "rsv10": pkt[10],
"checksum": pkt[11],
}
return d, None
def extract_and_parse_all(data: bytes):
"""在任意数据里扫描 0x41 0x41 开头的 12 字节帧,逐帧解析。"""
results, i, n = [], 0, len(data)
while i + 12 <= n:
if data[i] == 0x41 and data[i+1] == 0x41:
pkt = data[i:i+12]
d, err = parse_status_frame(pkt)
results.append((pkt, d, err))
i += 12
else:
i += 1
return results
def pretty_status(d: dict) -> str:
return (f"ID={d['id']:02X} Fault={d['fault']:02X} State={d['state']:02X} "
f"Pos={d['pos']:02X} Vel={d['vel']:02X} Force={d['force']:02X} "
f"RSV8={d['rsv8']:02X} RSV9={d['rsv9']:02X} RSV10={d['rsv10']:02X}")
def handle_recv(dat: bytes, tag: str):
items = extract_and_parse_all(dat)
if not items:
print(f"[{tag}] recv (no frame): {hexdump(dat)}")
return
for pkt, d, err in items:
if err:
print(f"[{tag}] recv ERR {err}: {hexdump(pkt)}")
else:
print(f"[{tag}] recv OK {pretty_status(d)} | raw: {hexdump(pkt)}")
def hand_ctrl(sock: socket.socket, addr: str, port: int, tag: str, node_id: int = 1,pos: int = 0):
target = (addr, port)
data = make_frame(node_id=node_id, pos=pos)
sock.sendto(data, target)
print(f"[{tag}] send: {hexdump(data)}")
try:
recv = sock.recv(1024)
handle_recv(recv, tag)
except socket.timeout:
print(f"[{tag}] recv timeout")
def hand_l_worker(sock: socket.socket, addr: str, port: int, tag: str,
node_id: int = 1):
sock.settimeout(1.0)
while True:
hand_ctrl(sock, addr, port, tag, node_id, 0xFF)
time.sleep(1)
hand_ctrl(sock, addr, port, tag, node_id, 0x7F)
time.sleep(1)
hand_ctrl(sock, addr, port, tag, node_id, 0x00)
time.sleep(1)
def hand_r_worker(sock: socket.socket, addr: str, port: int, tag: str,
node_id: int = 1):
sock.settimeout(1.0)
while True:
hand_ctrl(sock, addr, port, tag, node_id, 0xFF)
time.sleep(1)
hand_ctrl(sock, addr, port, tag, node_id, 0x00)
time.sleep(1)
# ---------------- 启动与收尾 ----------------
def sig_handler(signum, frame):
print('You pressed Ctrl + C!')
sys.exit(0)
def make_udp_socket(timeout_s=1.0) -> socket.socket:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(timeout_s)
return s
signal.signal(signal.SIGINT, sig_handler)
sok_l = make_udp_socket()
sok_r = make_udp_socket()
t_l = threading.Thread(target=hand_l_worker, args=(sok_l, addr_l, port, "L"))
t_r = threading.Thread(target=hand_r_worker, args=(sok_r, addr_r, port, "R"))
t_l.daemon = True
t_r.daemon = True
t_l.start()
t_r.start()
while True:
time.sleep(1)