底层运动控制示例
约 1705 字大约 6 分钟
2025-05-29
该示例展示了如何使用 Magicbot-Gen1 SDK 进行初始化、连接机器人、底层运动控制(手、臂、腿、腰、头)、Imu传感器数据读取等基本操作。
C++
示例文件:low_level_motion_example.cpp
参考文档:low_level_motion_reference.md
#include "magic_robot.h"
#include <unistd.h>
#include <atomic>
#include <csignal>
#include <iostream>
using namespace magic::gen1;
magic::gen1::MagicRobot robot;
std::atomic<bool> running(true);
void signalHandler(int signum) {
std::cout << "Interrupt signal (" << signum << ") received.\n";
running = false;
robot.Shutdown();
// Exit process
exit(signum);
}
int main() {
// Bind SIGINT (Ctrl+C)
signal(SIGINT, signalHandler);
std::string local_ip = "192.168.54.111";
// Configure local IP address for direct network connection and initialize SDK
if (!robot.Initialize(local_ip)) {
std::cerr << "robot sdk initialize failed." << std::endl;
robot.Shutdown();
return -1;
}
// Connect to robot
auto status = robot.Connect();
if (status.code != ErrorCode::OK) {
std::cerr << "connect robot failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
// Switch motion control controller to low-level controller, default is high-level controller
status = robot.SetMotionControlLevel(ControllerLevel::LowLevel);
if (status.code != ErrorCode::OK) {
std::cerr << "switch robot motion control level failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
// Get low-level controller
auto& controller = robot.GetLowLevelMotionController();
// Set control command sending period to 2ms, 500Hz
controller.SetPeriodMs(2);
// 订阅imu数据
controller.SubscribeBodyImu([](const std::shared_ptr<Imu> msg) {
static int32_t count = 0;
if (count++ % 1000 == 1) {
std::cout << "+++++++++++ receive imu data." << std::endl;
std::cout << "timestamp: " << msg->timestamp << std::endl;
std::cout << "temperature: " << msg->temperature << std::endl;
std::cout << "orientation: " << msg->orientation[0] << ", " << msg->orientation[1] << ", " << msg->orientation[2] << ", " << msg->orientation[3] << std::endl;
std::cout << "angular_velocity: " << msg->angular_velocity[0] << ", " << msg->angular_velocity[1] << ", " << msg->angular_velocity[2] << std::endl;
std::cout << "linear_acceleration: " << msg->linear_acceleration[0] << ", " << msg->linear_acceleration[1] << ", " << msg->linear_acceleration[2] << std::endl;
}
// TODO: handle imu data
});
// 订阅手部数据
controller.SubscribeArmState([](const std::shared_ptr<JointState> msg) {
static int32_t count = 0;
if (count++ % 1000 == 1) {
std::cout << "+++++++++++ receive arm joint data." << std::endl;
std::cout << "timestamp: " << msg->timestamp << std::endl;
std::cout << "pos: " << msg->joints[0].posH << ", " << msg->joints[0].posL << std::endl;
std::cout << "vel: " << msg->joints[0].vel << std::endl;
std::cout << "toq: " << msg->joints[0].toq << std::endl;
std::cout << "current: " << msg->joints[0].current << std::endl;
std::cout << "error_code: " << msg->joints[0].err_code << std::endl;
}
// TODO: handle arm joint data
});
// 以上臂关节控制为例:
// 后续关节控制指令,关节的操作模式为1,表示关节处于位置控制模式
while (running.load()) {
// 左臂关节,参考文档:
// 左臂或者右臂1-5关节operation_mode需要从模式:200切换到模式:4(串联PID模式)进行指令下发;
JointCommand arm_command;
arm_command.joints.resize(kArmJointNum);
for (int ii = 0; ii < kArmJointNum; ii++) {
// 设置关节处于准备状态
arm_command.joints[ii].operation_mode = 200;
// TODO:设置目标位置、速度、力矩和增益
arm_command.joints[ii].pos = 0.0;
arm_command.joints[ii].vel = 0.0;
arm_command.joints[ii].toq = 0.0;
arm_command.joints[ii].kp = 0.0;
arm_command.joints[ii].kd = 0.0;
}
// 发布控制指令
controller.PublishArmCommand(arm_command);
// 500HZ的频率(2ms)下发控制指令
usleep(2000);
}
// Disconnect from robot
status = robot.Disconnect();
if (status.code != ErrorCode::OK) {
std::cerr << "disconnect robot failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
robot.Shutdown();
return 0;
}
Python
示例文件:low_level_motion_example.py
参考文档:low_level_motion_reference.md
完整代码示例
#!/usr/bin/env python3
import sys
import time
import signal
import logging
from typing import Optional
import magicbot_gen1_python as magicbot
# Configure logging format and level
logging.basicConfig(
level=logging.INFO, # Minimum log level
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Global variables
robot: Optional[magicbot.MagicRobot] = None
running = True
body_imu_counter = 0
arm_state_counter = 0
leg_state_counter = 0
head_state_counter = 0
waist_state_counter = 0
def signal_handler(signum, frame):
"""Signal handler function for graceful exit"""
global robot, running
running = False
logging.info("Received interrupt signal (%s), exiting...", signum)
if robot:
robot.disconnect()
logging.info("Robot disconnected")
robot.shutdown()
logging.info("Robot shutdown")
exit(-1)
def body_imu_callback(imu_data):
"""Body IMU data callback function"""
global body_imu_counter
if body_imu_counter % 1000 == 0:
logging.info("+++++++++++++ Received body IMU data")
# Print IMU data
logging.info("Received body IMU data, timestamp: %d", imu_data.timestamp)
logging.info("Received body IMU data, orientation: %s", imu_data.orientation)
logging.info(
"Received body IMU data, angular_velocity: %s",
imu_data.angular_velocity,
)
logging.info(
"Received body IMU data, linear_acceleration: %s",
imu_data.linear_acceleration,
)
logging.info("Received body IMU data, temperature: %s", imu_data.temperature)
body_imu_counter += 1
def arm_state_callback(joint_state):
"""Arm joint state callback function"""
global arm_state_counter
if arm_state_counter % 1000 == 0:
logging.info("+++++++++++++ Received arm joint state data")
# Print joint state data
logging.info(
"Received arm joint state data, status_word: %d",
joint_state.joints[0].status_word,
)
logging.info(
"Received arm joint state data, posH: %s", joint_state.joints[0].posH
)
logging.info(
"Received arm joint state data, posL: %s", joint_state.joints[0].posL
)
logging.info(
"Received arm joint state data, vel: %s", joint_state.joints[0].vel
)
logging.info(
"Received arm joint state data, toq: %s", joint_state.joints[0].toq
)
logging.info(
"Received arm joint state data, current: %s", joint_state.joints[0].current
)
logging.info(
"Received arm joint state data, error_code: %s",
joint_state.joints[0].err_code,
)
arm_state_counter += 1
def leg_state_callback(joint_state):
"""Leg joint state callback function"""
global leg_state_counter
if leg_state_counter % 1000 == 0:
logging.info("+++++++++++++ Received leg joint state data")
# Print joint state data
logging.info(
"Received leg joint state data, status_word: %d",
joint_state.joints[0].status_word,
)
logging.info(
"Received leg joint state data, posH: %s", joint_state.joints[0].posH
)
logging.info(
"Received leg joint state data, posL: %s", joint_state.joints[0].posL
)
logging.info(
"Received leg joint state data, vel: %s", joint_state.joints[0].vel
)
logging.info(
"Received leg joint state data, toq: %s", joint_state.joints[0].toq
)
logging.info(
"Received leg joint state data, current: %s", joint_state.joints[0].current
)
logging.info(
"Received leg joint state data, error_code: %s",
joint_state.joints[0].err_code,
)
leg_state_counter += 1
def head_state_callback(joint_state):
"""Head joint state callback function"""
global head_state_counter
if head_state_counter % 1000 == 0:
logging.info("+++++++++++++ Received head joint state data")
# Print joint state data
logging.info(
"Received head joint state data, status_word: %d",
joint_state.joints[0].status_word,
)
logging.info(
"Received head joint state data, posH: %s", joint_state.joints[0].posH
)
logging.info(
"Received head joint state data, posL: %s", joint_state.joints[0].posL
)
logging.info(
"Received head joint state data, vel: %s", joint_state.joints[0].vel
)
logging.info(
"Received head joint state data, toq: %s", joint_state.joints[0].toq
)
logging.info(
"Received head joint state data, current: %s", joint_state.joints[0].current
)
logging.info(
"Received head joint state data, error_code: %s",
joint_state.joints[0].err_code,
)
head_state_counter += 1
def waist_state_callback(joint_state):
"""Waist joint state callback function"""
global waist_state_counter
if waist_state_counter % 1000 == 0:
logging.info("+++++++++++++ Received waist joint state data")
# Print joint state data
logging.info(
"Received waist joint state data, status_word: %d",
joint_state.joints[0].status_word,
)
logging.info(
"Received waist joint state data, posH: %s", joint_state.joints[0].posH
)
logging.info(
"Received waist joint state data, posL: %s", joint_state.joints[0].posL
)
logging.info(
"Received waist joint state data, vel: %s", joint_state.joints[0].vel
)
logging.info(
"Received waist joint state data, toq: %s", joint_state.joints[0].toq
)
logging.info(
"Received waist joint state data, current: %s",
joint_state.joints[0].current,
)
logging.info(
"Received waist joint state data, error_code: %s",
joint_state.joints[0].err_code,
)
waist_state_counter += 1
def main():
"""Main function"""
global robot
# Bind signal handler
signal.signal(signal.SIGINT, signal_handler)
logging.info("Robot model: %s", magicbot.get_robot_model())
# Create robot instance
robot = magicbot.MagicRobot()
try:
# Configure local IP address for direct network connection and initialize SDK
local_ip = "192.168.54.111"
if not robot.initialize(local_ip):
logging.error("Failed to initialize robot SDK")
robot.shutdown()
return -1
# Connect to robot
status = robot.connect()
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to connect to robot, code: %s, message: %s",
status.code,
status.message,
)
robot.shutdown()
return -1
logging.info("Successfully connected to robot")
# Switch motion control controller to low-level controller, default is high-level controller
status = robot.set_motion_control_level(magicbot.ControllerLevel.LowLevel)
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to switch robot motion control level, code: %s, message: %s",
status.code,
status.message,
)
robot.shutdown()
return -1
logging.info("Switched to low-level motion controller")
# Get low-level motion controller
controller = robot.get_low_level_motion_controller()
# Set control command sending period to 2ms, 500Hz
controller.set_period_ms(2)
logging.info("Set control period to 2ms (500Hz)")
# Subscribe to body IMU data
controller.subscribe_body_imu(body_imu_callback)
logging.info("Subscribed to body IMU data")
# Subscribe to arm joint state
controller.subscribe_arm_state(arm_state_callback)
logging.info("Subscribed to arm joint state")
# Subscribe to leg joint state
controller.subscribe_leg_state(leg_state_callback)
logging.info("Subscribed to leg joint state")
# Subscribe to head joint state
controller.subscribe_head_state(head_state_callback)
logging.info("Subscribed to head joint state")
# Subscribe to waist joint state
controller.subscribe_waist_state(waist_state_callback)
logging.info("Subscribed to waist joint state")
# Main loop
global running
while running:
# Create arm joint control command
arm_command = magicbot.JointCommand()
leg_command = magicbot.JointCommand()
waist_command = magicbot.JointCommand()
head_command = magicbot.JointCommand()
# Set all joints to preparation state (operation mode 200)
for i in range(magicbot.ARM_JOINT_NUM):
joint = magicbot.SingleJointCommand()
joint.operation_mode = 200 # Preparation state
joint.pos = 0.0
joint.vel = 0.0
joint.toq = 0.0
joint.kp = 0.0
joint.kd = 0.0
arm_command.joints.append(joint)
for i in range(magicbot.LEG_JOINT_NUM):
joint = magicbot.SingleJointCommand()
joint.operation_mode = 200 # Preparation state
joint.pos = 0.0
joint.vel = 0.0
joint.toq = 0.0
joint.kp = 0.0
joint.kd = 0.0
leg_command.joints.append(joint)
for i in range(magicbot.HEAD_JOINT_NUM):
joint = magicbot.SingleJointCommand()
joint.operation_mode = 200 # Preparation state
joint.pos = 0.0
joint.vel = 0.0
joint.toq = 0.0
joint.kp = 0.0
joint.kd = 0.0
head_command.joints.append(joint)
for i in range(magicbot.WAIST_JOINT_NUM):
joint = magicbot.SingleJointCommand()
joint.operation_mode = 200 # Preparation state
joint.pos = 0.0
joint.vel = 0.0
joint.toq = 0.0
joint.kp = 0.0
joint.kd = 0.0
waist_command.joints.append(joint)
# Publish arm joint control command
controller.publish_arm_command(arm_command)
# Publish leg joint control command
controller.publish_leg_command(leg_command)
# Publish waist joint control command
controller.publish_waist_command(waist_command)
# Publish head joint control command
controller.publish_head_command(head_command)
time.sleep(0.002)
except Exception as e:
logging.error("Exception occurred during program execution: %s", e)
return -1
finally:
# Clean up resources
try:
logging.info("Clean up resources")
# Close low-level motion controller
controller = robot.get_low_level_motion_controller()
controller.shutdown()
logging.info("Low-level motion controller closed")
# Disconnect
robot.disconnect()
logging.info("Robot connection disconnected")
# Shutdown robot
robot.shutdown()
logging.info("Robot shutdown")
except Exception as e:
logging.error("Exception occurred while cleaning up resources: %s", e)
if __name__ == "__main__":
sys.exit(main())
运行说明
- 环境准备:
# 设置环境变量
export PYTHONPATH=/opt/magic_robotics/magicbot_gen1_sdk/lib:$PYTHONPATH
export LD_LIBRARY_PATH=/opt/magic_robotics/magicbot_gen1_sdk/lib:$LD_LIBRARY_PATH
- 运行示例:
# C++
./low_level_motion_example
# Python
python3 low_level_motion_example.py
- 停止程序:
- 按
Ctrl+C
可以安全停止程序 - 程序会自动清理所有资源