🗜️DH-Robotics PGC Gripper
DH-Robotics PGC series, a signature electric parallel gripper designed for wide-ranging applications in cooperative manipulators. It boasts high protection, ensuring exceptional precision across various work environments.
🕹️Control Method
The gripper officially uses a 485 interface, which Adam has converted to an Ethernet interface. This allows the gripper to function as a node device within the Adam ecosystem, enabling Ethernet operations such as TCP/UDP more easily.
- Communication process:
TCP/UDP<->485<->hand
Using the original manufacturer's protocol, Adam only performs protocol forwarding between TCP/UDP and 485 For specific protocol details, refer to the Original Product Manual.
🐍Python demo
import socket
import struct
from enum import Enum
import time
# 寄存器地址枚举类
class RegisterAddress(Enum):
INIT = 0x0100
POSITION = 0x0103
# Modbus-RTU 功能码
class ModbusFunctionCode(Enum):
READ_COILS = 0x01
READ_DISCRETE_INPUTS = 0x02
READ_HOLDING_REGISTERS = 0x03
READ_INPUT_REGISTERS = 0x04
WRITE_SINGLE_COIL = 0x05
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_COILS = 0x0F
WRITE_MULTIPLE_REGISTERS = 0x10
class ModbusUDPClient:
def __init__(self, host='127.0.0.1', port=2562, device_id=1):
"""初始化Modbus UDP客户端"""
self.host = host
self.port = port
self.device_id = device_id
self.socket = None
self.connect()
def connect(self):
"""建立UDP连接"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(2.0) # 设置超时时间为2秒
print(f"已连接到 {self.host}:{self.port}")
except Exception as e:
print(f"连接失败: {e}")
self.socket = None
def _calculate_crc(self, data):
"""计算Modbus RTU CRC校验和
Args:
data: 要计算CRC的数据(字节序列)
Returns:
2字节的CRC校验和,小端格式
"""
# 初始化CRC值
crc = 0xFFFF
# 遍历每个字节
for byte in data:
# 与当前字节异或
crc ^= byte
# 对每个位进行处理
for _ in range(8):
# 检查最低位
if crc & 0x0001:
# 如果最低位为1,右移并与多项式异或
crc = (crc >> 1) ^ 0xA001
else:
# 否则仅右移
crc >>= 1
# 返回小端格式的CRC
return struct.pack('<H', crc)
def _verify_crc(self, data):
"""验证Modbus RTU CRC校验和
Args:
data: 包含CRC校验和的完整数据(字节序列)
Returns:
校验是否通过
"""
if len(data) < 2:
return False
# 分离数据和CRC
content = data[:-2]
received_crc = data[-2:]
# 计算数据的CRC
calculated_crc = self._calculate_crc(content)
# 比较CRC
return received_crc == calculated_crc
def _create_request(self, function_code, register_address, count=1, values=None):
"""创建Modbus-RTU请求数据
Args:
function_code: 功能码枚举值
register_address: 寄存器地址
count: 寄存器数量(读取时使用)
values: 要写入的值(写入时使用)
Returns:
完整的Modbus-RTU请求数据(包含CRC校验和)
"""
# 根据功能码构建请求数据
if function_code in [ModbusFunctionCode.READ_HOLDING_REGISTERS, ModbusFunctionCode.READ_INPUT_REGISTERS]:
# 读取寄存器请求格式:设备地址(1) + 功能码(1) + 起始地址(2) + 寄存器数量(2)
request_data = struct.pack('>BBHH', self.device_id, function_code.value, register_address, count)
elif function_code == ModbusFunctionCode.WRITE_SINGLE_REGISTER:
# 写入单个寄存器请求格式:设备地址(1) + 功能码(1) + 寄存器地址(2) + 写入值(2)
if values is None:
raise ValueError("写入寄存器必须提供values参数")
request_data = struct.pack('>BBHH', self.device_id, function_code.value, register_address, values)
else:
raise NotImplementedError(f"功能码 {function_code} 暂未实现")
# 计算并添加CRC校验和
crc = self._calculate_crc(request_data)
return request_data + crc
def _parse_response(self, response, function_code, count=1):
"""解析Modbus-RTU响应数据
Args:
response: 接收到的响应数据
function_code: 期望的功能码
count: 寄存器数量
Returns:
解析出的寄存器值
"""
# 验证CRC校验和
if not self._verify_crc(response):
raise ValueError("CRC校验失败")
# 去除CRC校验和
response_data = response[:-2]
# 检查设备地址
if response_data[0] != self.device_id:
raise ValueError(f"设备地址不匹配: 期望 {self.device_id}, 得到 {response_data[0]}")
# 检查功能码
response_function_code = response_data[1]
if response_function_code != function_code.value:
if response_function_code & 0x80:
# 错误响应
error_code = response_data[2]
raise Exception(f"Modbus错误: 功能码={function_code.value}, 错误码={error_code}")
else:
raise ValueError(f"意外的功能码: {response_function_code}")
# 根据功能码解析数据
if function_code in [ModbusFunctionCode.READ_HOLDING_REGISTERS, ModbusFunctionCode.READ_INPUT_REGISTERS]:
# 读取寄存器响应格式:设备地址(1) + 功能码(1) + 字节计数(1) + 数据(...)
byte_count = response_data[2]
if byte_count != count * 2: # 每个寄存器2字节
raise ValueError(f"字节计数不匹配: 期望 {count*2}, 得到 {byte_count}")
# 解析寄存器值
values = []
for i in range(count):
offset = 3 + i * 2
value = struct.unpack('>H', response_data[offset:offset+2])[0]
values.append(value)
return values if count > 1 else values[0]
elif function_code == ModbusFunctionCode.WRITE_SINGLE_REGISTER:
# 写入单个寄存器响应格式:设备地址(1) + 功能码(1) + 寄存器地址(2) + 写入值(2)
if len(response_data) != 6:
raise ValueError(f"响应长度不正确: 期望 6, 得到 {len(response_data)}")
written_address, written_value = struct.unpack('>HH', response_data[2:6])
return written_value
else:
raise NotImplementedError(f"功能码 {function_code} 的响应解析暂未实现")
def read_register(self, register_address, register_type="holding"):
"""读取寄存器值
Args:
register_address: 寄存器地址,可以是枚举值或整数
register_type: 寄存器类型,"holding"表示保持寄存器,"input"表示输入寄存器
Returns:
寄存器的值,如果读取失败则返回None
"""
if isinstance(register_address, RegisterAddress):
address_value = register_address.value
else:
address_value = register_address
if register_type == "holding":
function_code = ModbusFunctionCode.READ_HOLDING_REGISTERS
elif register_type == "input":
function_code = ModbusFunctionCode.READ_INPUT_REGISTERS
else:
raise ValueError("register_type必须是'holding'或'input'")
try:
# 创建请求
request = self._create_request(function_code, address_value)
print(f"发送请求: {[hex(b) for b in request]}")
# 发送请求
self.socket.sendto(request, (self.host, self.port))
# 接收响应
response, server_address = self.socket.recvfrom(1024)
print(f"从 {server_address} 接收响应: {[hex(b) for b in response]}")
# 解析响应
value = self._parse_response(response, function_code)
return value
except Exception as e:
print(f"读取寄存器错误: {e}")
return None
def write_register(self, register_address, value):
"""写入寄存器值
Args:
register_address: 寄存器地址,可以是枚举值或整数
value: 要写入的值
Returns:
写入的值,如果成功的话;否则返回None
"""
if isinstance(register_address, RegisterAddress):
address_value = register_address.value
else:
address_value = register_address
try:
# 创建请求
request = self._create_request(
ModbusFunctionCode.WRITE_SINGLE_REGISTER,
address_value,
values=value
)
print(f"发送请求: {[hex(b) for b in request]}")
# 发送请求
self.socket.sendto(request, (self.host, self.port))
# 接收响应
response, server_address = self.socket.recvfrom(1024)
print(f"从 {server_address} 接收响应: {[hex(b) for b in response]}")
# 解析响应
written_value = self._parse_response(response, ModbusFunctionCode.WRITE_SINGLE_REGISTER)
return written_value
except Exception as e:
print(f"写入寄存器错误: {e}")
return None
def close(self):
"""关闭连接"""
if self.socket:
self.socket.close()
self.socket = None
print("连接已关闭")
def hd_left_hand_test():
hd_hand_left_client = ModbusUDPClient(host='10.10.10.18', port=2562)
# init
hd_hand_left_client.write_register(RegisterAddress.INIT, 0x01)
time.sleep(3)
try:
while True:
hd_hand_left_client.write_register(RegisterAddress.POSITION, 0)
time.sleep(1)
pos_value = hd_hand_left_client.read_register(RegisterAddress.POSITION)
print(f"Position: {pos_value}")
hd_hand_left_client.write_register(RegisterAddress.POSITION, 800)
time.sleep(1)
pos_value = hd_hand_left_client.read_register(RegisterAddress.POSITION)
print(f"Position: {pos_value}")
finally:
# 确保连接关闭
hd_hand_left_client.close()
def hd_right_hand_test():
hd_hand_right_client = ModbusUDPClient(host='10.10.10.38', port=2562)
# init
hd_hand_right_client.write_register(RegisterAddress.INIT, 0x01)
time.sleep(3)
try:
while True:
hd_hand_right_client.write_register(RegisterAddress.POSITION, 0)
time.sleep(1)
pos_value = hd_hand_right_client.read_register(RegisterAddress.POSITION)
print(f"Position: {pos_value}")
hd_hand_right_client.write_register(RegisterAddress.POSITION, 800)
time.sleep(1)
pos_value = hd_hand_right_client.read_register(RegisterAddress.POSITION)
print(f"Position: {pos_value}")
finally:
# 确保连接关闭
hd_hand_right_client.close()
def hd_both_hand_test():
hd_hand_left_client = ModbusUDPClient(host='10.10.10.18', port=2562)
hd_hand_left_client.write_register(RegisterAddress.INIT, 0x01)
hd_hand_right_client = ModbusUDPClient(host='10.10.10.38', port=2562)
hd_hand_right_client.write_register(RegisterAddress.INIT, 0x01)
time.sleep(3)
try:
while True:
hd_hand_left_client.write_register(RegisterAddress.POSITION, 0)
hd_hand_right_client.write_register(RegisterAddress.POSITION, 0)
time.sleep(1)
hd_hand_left_client.write_register(RegisterAddress.POSITION, 800)
hd_hand_right_client.write_register(RegisterAddress.POSITION, 800)
time.sleep(1)
finally:
# 确保连接关闭
hd_hand_left_client.close()
hd_hand_right_client.close()
if __name__ == "__main__":
#hd_left_hand_test()
#hd_right_hand_test()
hd_both_hand_test()