上层运动控制及开发
上层运动控制开发基于PNDbotics机器人控制系统开发,通过调用底层运动控制接口可实现上层运动控制功能,也可基于开放接口及内置运动规划控制算法进行个性化开发。 上层运动控制实现方式为ROS2,用户可根据例程开发控制上肢部分。
v1.0.0版本
启动adam_demo
- 启动adam_demo脚本时需要在root用户下,在
adam_demo/bin/
目录下执行
工程编译
- 启动adam_demo的脚本中使用了sudo权限,所以需要在编译前先切换到root用户下,编译包括
robot_state_publisher
和robot_state_subscriber
两个包
-
使用如下指令编译ros2包
数据接收节点
-
adam_demo启动后即可运行下面的指令接收上肢数据
数据发布节点
-
当机器人处于站立状态时,xbox手柄中
xx
的正按键按下,终端打印real time retarget start
表示机器人进入接收外部数据状态,此时执行如下命令 -
使用方法参考
ros2_test/src/robot_state_publisher/robot_state_publisher/robot_state_publisher_node.py
中描述,文件的63-68
行为控制姿态和高度和上肢的示例,文件的43-60
行为控制手指的示例 -
机器人停止接收外部数据时,xbox手柄中
xx
的负按键按下,终端打印real time retarget stop
表示机器人停止接收外部数据状态
代码下载
- 代码仓库地址:pnd_adam_ros2_publish
代码解析
robot_state_publisher_node.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState # 修改导入的消息类型
import numpy as np
class JointStatePublisher(Node):
def __init__(self):
super().__init__('joint_state_publisher')
self.publisher = self.create_publisher(
JointState, # 修改消息类型
'joint_states',
10)
self.timer = self.create_timer(0.01, self.timer_callback) # 100Hz
self.get_logger().info('发布者已启动,正在向/joint_states主题发送关节状态数据...') # 更新日志信息
self.counter = 0
# 定义关节名称,与retarget.cpp中的joint_name_publisher_保持一致
self.joint_names = [
"dof_pos/waistRoll", "dof_pos/waistPitch", "dof_pos/waistYaw",
"dof_pos/shoulderPitch_Left", "dof_pos/shoulderRoll_Left", "dof_pos/shoulderYaw_Left",
"dof_pos/elbow_Left", "dof_pos/wristYaw_Left", "dof_pos/wristPitch_Left",
"dof_pos/wristRoll_Left", "dof_pos/shoulderPitch_Right", "dof_pos/shoulderRoll_Right",
"dof_pos/shoulderYaw_Right", "dof_pos/elbow_Right", "dof_pos/wristYaw_Right",
"dof_pos/wristPitch_Right", "dof_pos/wristRoll_Right", "root_pos/z",
"dof_pos/hand_pinky_Left", "dof_pos/hand_ring_Left", "dof_pos/hand_middle_Left", "dof_pos/hand_index_Left", "dof_pos/hand_thumb_1_Left", "dof_pos/hand_thumb_2_Left",
"dof_pos/hand_pinky_Right", "dof_pos/hand_ring_Right", "dof_pos/hand_middle_Right", "dof_pos/hand_index_Right", "dof_pos/hand_thumb_1_Right", "dof_pos/hand_thumb_2_Right"
]
def timer_callback(self):
msg = JointState() # 创建JointState消息
msg.header.stamp = self.get_clock().now().to_msg() # 设置时间戳
msg.name = self.joint_names # 设置关节名称
# 初始化position数组
position_array = np.zeros(len(self.joint_names), dtype=np.float64)
position_array[:3] = 0.0 # 腰部姿态 复合角范围建议从小到大尝试
position_array[3:10] = 0.0 # 左臂关节 根据实际情况调整,单位为弧度
position_array[10:17] = 0.0 # 右臂关节 根据实际情况调整,单位为弧度
position_array[17] = 1.0 # base高度 范围[0.6m 1.0m],站立时为1.0m,调整为0.6m时会有下蹲动作
# 手指范围[0 1000],1000表示手指完全伸直,0表示手指完全弯曲
hand_control_left = np.zeros(6, dtype=np.float64)
hand_control_left[0] = 500.0
hand_control_left[1] = 500.0
hand_control_left[2] = 500.0
hand_control_left[3] = 500.0
hand_control_left[4] = 1000.0
hand_control_left[5] = 1000.0
hand_control_right = np.zeros(6, dtype=np.float64)
hand_control_right[0] = 500.0
hand_control_right[1] = 500.0
hand_control_right[2] = 500.0
hand_control_right[3] = 500.0
hand_control_right[4] = 1000.0
hand_control_right[5] = 1000.0
position_array[18:24] = hand_control_left
position_array[24:30] = hand_control_right
# 测试用,根据需要调整
# position_array[0] = 0.2*np.sin(2*np.pi*self.counter/100.0/5.0) # 测试腰部
# position_array[1] = 0.2*np.sin(2*np.pi*self.counter/100.0/6.0) # 测试腰部
# position_array[2] = 0.2*np.sin(2*np.pi*self.counter/100.0/7.0) # 测试腰部
# position_array[7] = 0.5*np.sin(2*np.pi*self.counter/100.0) # 测试左胳膊小臂
# position_array[14] = 0.5*np.sin(2*np.pi*self.counter/100.0) # 测试右胳膊小臂
# position_array[17] = 1.0 - (0.05*np.sin(2*np.pi*self.counter/100.0/5.0)+0.05) # 测试base高度
# 将numpy数组转换为list类型
msg.position = position_array.tolist()
msg.velocity = [0.0] * len(self.joint_names) # 设置速度为零
msg.effort = [0.0] * len(self.joint_names) # 设置力矩为零
self.publisher.publish(msg)
self.get_logger().info(f'发布关节状态数据 #{self.counter}')
self.counter += 1
def main(args=None):
rclpy.init(args=args)
publisher = JointStatePublisher()
rclpy.spin(publisher)
publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
robot_state_subscriber_node.py
import rclpy
from rclpy.node import Node
# 导入 JointState 消息类型
from sensor_msgs.msg import JointState
class RobotStateSubscriber(Node):
def __init__(self):
super().__init__('robot_state_subscriber')
self.subscription = self.create_subscription(
JointState,
'robot_states',
self.listener_callback,
10)
self.get_logger().info('订阅器已启动,正在等待/robot_states主题的数据...')
self.counter = 0 # 添加计数器
def listener_callback(self, msg):
self.counter += 1
if self.counter % 5 == 0:
print("接收到消息:")
max_name_length = max(len(name) for name in msg.name) if msg.name else 20
for i in range(len(msg.name)):
position = msg.position[i] if i < len(msg.position) else "N/A"
velocity = msg.velocity[i] if i < len(msg.velocity) else "N/A"
effort = msg.effort[i] if i < len(msg.effort) else "N/A"
print(f"关节名称: {msg.name[i]:<{max_name_length}} 位置: {str(position):<8} 速度: {str(velocity):<8} 加速度: {str(effort):<8}")
print("------------------------------")
def main(args=None):
rclpy.init(args=args)
subscriber = RobotStateSubscriber()
rclpy.spin(subscriber)
subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()