🦞AgiBot X1 OmniPicker
AgiBot X1 OmniPicker is an adaptive universal gripper. It combines the advantages of gripper designs across different modalities, enabling the grasping of objects of various shapes with just one active degree of freedom.
🕹️ Control Method
The OmniPicker officially defaults to using a serial/485 interface. Adam converts it into an Ethernet interface, making it a node sub-device of Adam for convenient Ethernet operations such as TCP/UDP, and better integration into the Adam robot ecosystem.
- Communication process:
UDP/TCP<->Serial Port<->485<->Hand
Using the original manufacturer's protocol, Adam only performs protocol forwarding between UDP/TCP and 485 (forwarding the original protocol, not passthrough).
For specific protocol details, refer to the Original Product Manual.

Pos Cmd: Target position, value range 0-FF, where 0 is fully closed and FF is fully open.
🐍Python demo
import time
import socket
import threading
import signal
import sys
# ---------------- 基本配置 ----------------
port = 2562
addr_l = '10.10.10.18'
addr_r = '10.10.10.38'
# ---------------- 工具函数 ----------------
def hexdump(b: bytes) -> str:
return ' '.join(f'{x:02X}' for x in b)
def checksum(buf: bytes) -> int:
ret = 0
for i in buf:
ret += i
return (~ret) & 0xFF
def make_frame(node_id: int, pos=0x7F, force=0xFF, vel=0xFF, acc=0xFF, dec=0xFF) -> bytes:
"""帧格式: D0-D1:0x4141, D2:ID, D3:Resv, D4~D8:5个Cmd(各1字节),
D9:Resv, D10:Resv, D11:CheckSum(D2~D10)"""
def b(x): return int(x) & 0xFF
frame = bytearray()
frame += b'\x41\x41' # D0-D1: Frame Head 0x4141
frame.append(b(node_id)) # D2: ID
frame.append(0x00) # D3: Reserved
frame.append(b(pos)) # D4: Pos Cmd
frame.append(b(force)) # D5: Force Cmd
frame.append(b(vel)) # D6: Vel Cmd
frame.append(b(acc)) # D7: Acc Cmd
frame.append(b(dec)) # D8: Dec Cmd
frame.append(0x00) # D9: Reserved
frame.append(0x00) # D10: Reserved
frame.append(checksum(frame[2:])) # D11: CheckSum
return bytes(frame)
# ---------------- 状态帧解析 ----------------
def parse_status_frame(pkt: bytes):
"""
解析 12 字节状态帧:
D0-D1: 0x4141 | D2:ID | D3:Fault | D4:State | D5:Pos | D6:Vel | D7:Force
D8~D10: Reserved | D11: CheckSum = ~(D2..D10和) & 0xFF
"""
if len(pkt) != 12:
return None, "len!=12"
if pkt[0] != 0x41 or pkt[1] != 0x41:
return None, "bad head"
expect = checksum(pkt[2:11])
if pkt[11] != expect:
return None, f"bad checksum got=0x{pkt[11]:02X} expect=0x{expect:02X}"
d = {
"id": pkt[2], "fault": pkt[3], "state": pkt[4],
"pos": pkt[5], "vel": pkt[6], "force": pkt[7],
"rsv8": pkt[8], "rsv9": pkt[9], "rsv10": pkt[10],
"checksum": pkt[11],
}
return d, None
def extract_and_parse_all(data: bytes):
"""在任意数据里扫描 0x41 0x41 开头的 12 字节帧,逐帧解析。"""
results, i, n = [], 0, len(data)
while i + 12 <= n:
if data[i] == 0x41 and data[i+1] == 0x41:
pkt = data[i:i+12]
d, err = parse_status_frame(pkt)
results.append((pkt, d, err))
i += 12
else:
i += 1
return results
def pretty_status(d: dict) -> str:
return (f"ID={d['id']:02X} Fault={d['fault']:02X} State={d['state']:02X} "
f"Pos={d['pos']:02X} Vel={d['vel']:02X} Force={d['force']:02X} "
f"RSV8={d['rsv8']:02X} RSV9={d['rsv9']:02X} RSV10={d['rsv10']:02X}")
def handle_recv(dat: bytes, tag: str):
items = extract_and_parse_all(dat)
if not items:
print(f"[{tag}] recv (no frame): {hexdump(dat)}")
return
for pkt, d, err in items:
if err:
print(f"[{tag}] recv ERR {err}: {hexdump(pkt)}")
else:
print(f"[{tag}] recv OK {pretty_status(d)} | raw: {hexdump(pkt)}")
def hand_ctrl(sock: socket.socket, addr: str, port: int, tag: str, node_id: int = 1,pos: int = 0):
target = (addr, port)
data = make_frame(node_id=node_id, pos=pos)
sock.sendto(data, target)
print(f"[{tag}] send: {hexdump(data)}")
try:
recv = sock.recv(1024)
handle_recv(recv, tag)
except socket.timeout:
print(f"[{tag}] recv timeout")
def hand_l_worker(sock: socket.socket, addr: str, port: int, tag: str,
node_id: int = 1):
sock.settimeout(1.0)
while True:
hand_ctrl(sock, addr, port, tag, node_id, 0xFF)
time.sleep(1)
hand_ctrl(sock, addr, port, tag, node_id, 0x7F)
time.sleep(1)
hand_ctrl(sock, addr, port, tag, node_id, 0x00)
time.sleep(1)
def hand_r_worker(sock: socket.socket, addr: str, port: int, tag: str,
node_id: int = 1):
sock.settimeout(1.0)
while True:
hand_ctrl(sock, addr, port, tag, node_id, 0xFF)
time.sleep(1)
hand_ctrl(sock, addr, port, tag, node_id, 0x00)
time.sleep(1)
# ---------------- 启动与收尾 ----------------
def sig_handler(signum, frame):
print('You pressed Ctrl + C!')
sys.exit(0)
def make_udp_socket(timeout_s=1.0) -> socket.socket:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(timeout_s)
return s
signal.signal(signal.SIGINT, sig_handler)
sok_l = make_udp_socket()
sok_r = make_udp_socket()
t_l = threading.Thread(target=hand_l_worker, args=(sok_l, addr_l, port, "L"))
t_r = threading.Thread(target=hand_r_worker, args=(sok_r, addr_r, port, "R"))
t_l.daemon = True
t_r.daemon = True
t_l.start()
t_r.start()
while True:
time.sleep(1)