跳转至

🗜️大寰 PGC 夹爪

大寰机器人 PGC 系列协作型平行电爪,是一款主要应用于协作机械臂的电动夹爪,具有防护等级高、即插即用、大负载等优势。

🕹️控制方式

夹爪官方默认使用的是485接口,Adam将其转换为以太网接口,使其作为Adam的一个节点子设备方便进行tpc/udp等以太网操作,更好的融入Adam机器人生态。

  • 通讯流程:tcp/udp <-> 485 <-> hand

使用原厂协议,Adam只做了tcp/udp与485的协议转发。
具体协议内容参考原厂产品手册

🐍Python demo

import socket
import struct
from enum import Enum
import time
# 寄存器地址枚举类
class RegisterAddress(Enum):
    INIT = 0x0100
    POSITION = 0x0103

# Modbus-RTU 功能码
class ModbusFunctionCode(Enum):
    READ_COILS = 0x01
    READ_DISCRETE_INPUTS = 0x02
    READ_HOLDING_REGISTERS = 0x03
    READ_INPUT_REGISTERS = 0x04
    WRITE_SINGLE_COIL = 0x05
    WRITE_SINGLE_REGISTER = 0x06
    WRITE_MULTIPLE_COILS = 0x0F
    WRITE_MULTIPLE_REGISTERS = 0x10

class ModbusUDPClient:
    def __init__(self, host='127.0.0.1', port=2562, device_id=1):
        """初始化Modbus UDP客户端"""
        self.host = host
        self.port = port
        self.device_id = device_id
        self.socket = None
        self.connect()

    def connect(self):
        """建立UDP连接"""
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.socket.settimeout(2.0)  # 设置超时时间为2秒
            print(f"已连接到 {self.host}:{self.port}")
        except Exception as e:
            print(f"连接失败: {e}")
            self.socket = None

    def _calculate_crc(self, data):
        """计算Modbus RTU CRC校验和

        Args:
            data: 要计算CRC的数据(字节序列)

        Returns:
            2字节的CRC校验和,小端格式
        """
        # 初始化CRC值
        crc = 0xFFFF

        # 遍历每个字节
        for byte in data:
            # 与当前字节异或
            crc ^= byte

            # 对每个位进行处理
            for _ in range(8):
                # 检查最低位
                if crc & 0x0001:
                    # 如果最低位为1,右移并与多项式异或
                    crc = (crc >> 1) ^ 0xA001
                else:
                    # 否则仅右移
                    crc >>= 1

        # 返回小端格式的CRC
        return struct.pack('<H', crc)

    def _verify_crc(self, data):
        """验证Modbus RTU CRC校验和

        Args:
            data: 包含CRC校验和的完整数据(字节序列)

        Returns:
            校验是否通过
        """
        if len(data) < 2:
            return False

        # 分离数据和CRC
        content = data[:-2]
        received_crc = data[-2:]

        # 计算数据的CRC
        calculated_crc = self._calculate_crc(content)

        # 比较CRC
        return received_crc == calculated_crc

    def _create_request(self, function_code, register_address, count=1, values=None):
        """创建Modbus-RTU请求数据

        Args:
            function_code: 功能码枚举值
            register_address: 寄存器地址
            count: 寄存器数量(读取时使用)
            values: 要写入的值(写入时使用)

        Returns:
            完整的Modbus-RTU请求数据(包含CRC校验和)
        """
        # 根据功能码构建请求数据
        if function_code in [ModbusFunctionCode.READ_HOLDING_REGISTERS, ModbusFunctionCode.READ_INPUT_REGISTERS]:
            # 读取寄存器请求格式:设备地址(1) + 功能码(1) + 起始地址(2) + 寄存器数量(2)
            request_data = struct.pack('>BBHH', self.device_id, function_code.value, register_address, count)
        elif function_code == ModbusFunctionCode.WRITE_SINGLE_REGISTER:
            # 写入单个寄存器请求格式:设备地址(1) + 功能码(1) + 寄存器地址(2) + 写入值(2)
            if values is None:
                raise ValueError("写入寄存器必须提供values参数")
            request_data = struct.pack('>BBHH', self.device_id, function_code.value, register_address, values)
        else:
            raise NotImplementedError(f"功能码 {function_code} 暂未实现")

        # 计算并添加CRC校验和
        crc = self._calculate_crc(request_data)

        return request_data + crc

    def _parse_response(self, response, function_code, count=1):
        """解析Modbus-RTU响应数据

        Args:
            response: 接收到的响应数据
            function_code: 期望的功能码
            count: 寄存器数量

        Returns:
            解析出的寄存器值
        """
        # 验证CRC校验和
        if not self._verify_crc(response):
            raise ValueError("CRC校验失败")

        # 去除CRC校验和
        response_data = response[:-2]

        # 检查设备地址
        if response_data[0] != self.device_id:
            raise ValueError(f"设备地址不匹配: 期望 {self.device_id}, 得到 {response_data[0]}")

        # 检查功能码
        response_function_code = response_data[1]
        if response_function_code != function_code.value:
            if response_function_code & 0x80:
                # 错误响应
                error_code = response_data[2]
                raise Exception(f"Modbus错误: 功能码={function_code.value}, 错误码={error_code}")
            else:
                raise ValueError(f"意外的功能码: {response_function_code}")

        # 根据功能码解析数据
        if function_code in [ModbusFunctionCode.READ_HOLDING_REGISTERS, ModbusFunctionCode.READ_INPUT_REGISTERS]:
            # 读取寄存器响应格式:设备地址(1) + 功能码(1) + 字节计数(1) + 数据(...)
            byte_count = response_data[2]
            if byte_count != count * 2:  # 每个寄存器2字节
                raise ValueError(f"字节计数不匹配: 期望 {count*2}, 得到 {byte_count}")

            # 解析寄存器值
            values = []
            for i in range(count):
                offset = 3 + i * 2
                value = struct.unpack('>H', response_data[offset:offset+2])[0]
                values.append(value)

            return values if count > 1 else values[0]
        elif function_code == ModbusFunctionCode.WRITE_SINGLE_REGISTER:
            # 写入单个寄存器响应格式:设备地址(1) + 功能码(1) + 寄存器地址(2) + 写入值(2)
            if len(response_data) != 6:
                raise ValueError(f"响应长度不正确: 期望 6, 得到 {len(response_data)}")

            written_address, written_value = struct.unpack('>HH', response_data[2:6])
            return written_value
        else:
            raise NotImplementedError(f"功能码 {function_code} 的响应解析暂未实现")

    def read_register(self, register_address, register_type="holding"):
        """读取寄存器值

        Args:
            register_address: 寄存器地址,可以是枚举值或整数
            register_type: 寄存器类型,"holding"表示保持寄存器,"input"表示输入寄存器

        Returns:
            寄存器的值,如果读取失败则返回None
        """
        if isinstance(register_address, RegisterAddress):
            address_value = register_address.value
        else:
            address_value = register_address

        if register_type == "holding":
            function_code = ModbusFunctionCode.READ_HOLDING_REGISTERS
        elif register_type == "input":
            function_code = ModbusFunctionCode.READ_INPUT_REGISTERS
        else:
            raise ValueError("register_type必须是'holding'或'input'")

        try:
            # 创建请求
            request = self._create_request(function_code, address_value)
            print(f"发送请求: {[hex(b) for b in request]}")

            # 发送请求
            self.socket.sendto(request, (self.host, self.port))

            # 接收响应
            response, server_address = self.socket.recvfrom(1024)
            print(f"从 {server_address} 接收响应: {[hex(b) for b in response]}")

            # 解析响应
            value = self._parse_response(response, function_code)

            return value
        except Exception as e:
            print(f"读取寄存器错误: {e}")
            return None

    def write_register(self, register_address, value):
        """写入寄存器值

        Args:
            register_address: 寄存器地址,可以是枚举值或整数
            value: 要写入的值

        Returns:
            写入的值,如果成功的话;否则返回None
        """
        if isinstance(register_address, RegisterAddress):
            address_value = register_address.value
        else:
            address_value = register_address

        try:
            # 创建请求
            request = self._create_request(
                ModbusFunctionCode.WRITE_SINGLE_REGISTER, 
                address_value, 
                values=value
            )
            print(f"发送请求: {[hex(b) for b in request]}")

            # 发送请求
            self.socket.sendto(request, (self.host, self.port))

            # 接收响应
            response, server_address = self.socket.recvfrom(1024)
            print(f"从 {server_address} 接收响应: {[hex(b) for b in response]}")

            # 解析响应
            written_value = self._parse_response(response, ModbusFunctionCode.WRITE_SINGLE_REGISTER)

            return written_value
        except Exception as e:
            print(f"写入寄存器错误: {e}")
            return None

    def close(self):
        """关闭连接"""
        if self.socket:
            self.socket.close()
            self.socket = None
            print("连接已关闭")

def hd_left_hand_test():
    hd_hand_left_client = ModbusUDPClient(host='10.10.10.18', port=2562)
    # init
    hd_hand_left_client.write_register(RegisterAddress.INIT, 0x01)
    time.sleep(3)
    try:
        while True:
            hd_hand_left_client.write_register(RegisterAddress.POSITION, 0)
            time.sleep(1)
            pos_value = hd_hand_left_client.read_register(RegisterAddress.POSITION)
            print(f"Position: {pos_value}")
            hd_hand_left_client.write_register(RegisterAddress.POSITION, 800)
            time.sleep(1)
            pos_value = hd_hand_left_client.read_register(RegisterAddress.POSITION)
            print(f"Position: {pos_value}")
    finally:
        # 确保连接关闭
        hd_hand_left_client.close()

def hd_right_hand_test():
    hd_hand_right_client = ModbusUDPClient(host='10.10.10.38', port=2562)
    # init
    hd_hand_right_client.write_register(RegisterAddress.INIT, 0x01)
    time.sleep(3)
    try:
        while True:
            hd_hand_right_client.write_register(RegisterAddress.POSITION, 0)
            time.sleep(1)
            pos_value = hd_hand_right_client.read_register(RegisterAddress.POSITION)
            print(f"Position: {pos_value}")
            hd_hand_right_client.write_register(RegisterAddress.POSITION, 800)
            time.sleep(1)
            pos_value = hd_hand_right_client.read_register(RegisterAddress.POSITION)
            print(f"Position: {pos_value}")
    finally:
        # 确保连接关闭
        hd_hand_right_client.close()

def hd_both_hand_test():
    hd_hand_left_client = ModbusUDPClient(host='10.10.10.18', port=2562)
    hd_hand_left_client.write_register(RegisterAddress.INIT, 0x01)
    hd_hand_right_client = ModbusUDPClient(host='10.10.10.38', port=2562)
    hd_hand_right_client.write_register(RegisterAddress.INIT, 0x01)
    time.sleep(3)
    try:
        while True:
            hd_hand_left_client.write_register(RegisterAddress.POSITION, 0)
            hd_hand_right_client.write_register(RegisterAddress.POSITION, 0)
            time.sleep(1)
            hd_hand_left_client.write_register(RegisterAddress.POSITION, 800)
            hd_hand_right_client.write_register(RegisterAddress.POSITION, 800)
            time.sleep(1)
    finally:
        # 确保连接关闭
        hd_hand_left_client.close()
        hd_hand_right_client.close()

if __name__ == "__main__":
    #hd_left_hand_test()
    #hd_right_hand_test()
    hd_both_hand_test()