传感器控制示例
约 826 字大约 3 分钟
2025-05-29
该示例展示了如何使用 MagicDog SDK 进行初始化、连接机器人、机器人传感器控制(雷达、RGBD 相机、双目相机)等基本操作。
C++
示例文件:sensor_example.cpp
参考文档:C++ API/sensor_reference.md
#include "magic_robot.h"
#include "magic_type.h"
#include <unistd.h>
#include <csignal>
#include <iostream>
#include <memory>
using namespace magic::dog;
magic::dog::MagicRobot robot;
void signalHandler(int signum) {
std::cout << "Interrupt signal (" << signum << ") received.\n";
robot.Shutdown();
// Exit process
exit(signum);
}
int main() {
// 绑定 SIGINT(Ctrl+C)
signal(SIGINT, signalHandler);
std::string local_ip = "192.168.55.10";
// Configure local IP address for direct network connection to machine and initialize SDK
if (!robot.Initialize(local_ip)) {
std::cerr << "Robot SDK initialization failed." << std::endl;
robot.Shutdown();
return -1;
}
// Set RPC timeout to 5s
robot.SetTimeout(5000);
// Connect to robot
auto status = robot.Connect();
if (status.code != ErrorCode::OK) {
std::cerr << "Connect robot failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
auto& controller = robot.GetSensorController();
status = controller.OpenChannelSwith();
if (status.code != ErrorCode::OK) {
std::cerr << "Open channel failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
controller.SubscribeLeftBinocularHighImg([](const std::shared_ptr<CompressedImage> msg) {
std::cout << "Received left binocular high image." << std::endl;
});
controller.SubscribeLeftBinocularLowImg([](const std::shared_ptr<CompressedImage> msg) {
std::cout << "Received left binocular low image." << std::endl;
});
controller.SubscribeRightBinocularLowImg([](const std::shared_ptr<CompressedImage> msg) {
std::cout << "Received right binocular low image." << std::endl;
});
controller.SubscribeDepthImage([](const std::shared_ptr<Image> msg) {
std::cout << "Received depth image." << std::endl;
});
status = controller.OpenBinocularCamera();
if (status.code != ErrorCode::OK) {
std::cerr << "Open binocular camera failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
usleep(50000000);
// Close binocular camera
status = controller.CloseBinocularCamera();
if (status.code != ErrorCode::OK) {
std::cerr << "Close binocular camera failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
}
status = controller.CloseChannelSwith();
if (status.code != ErrorCode::OK) {
std::cerr << "Close channel failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
// Disconnect from robot
status = robot.Disconnect();
if (status.code != ErrorCode::OK) {
std::cerr << "Disconnect robot failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
robot.Shutdown();
return 0;
}
Python
示例文件:sensor_example.py
示例文档:Python API/sensor_reference.md
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MagicDog SDK Python Usage Example
This file demonstrates how to use MagicDog SDK Python bindings to control the robot.
"""
import sys
import time
import logging
from typing import Optional
logging.basicConfig(
level=logging.INFO, # Minimum log level
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
try:
import magicdog_python as magicdog
from magicdog_python import TtsCommand, TtsPriority, TtsMode, GetSpeechConfig, ErrorCode
logging.info("Successfully imported MagicDog Python module!")
logging.info(f"Imported magicdog_python path: {sys.path}")
except ImportError as e:
logging.error(f"Import failed: {e}")
logging.error("Please run build_python.sh to build Python bindings first")
sys.exit(1)
def main():
"""Main function"""
logging.info("MagicDog SDK Audio Python Example Program")
local_ip = "192.168.55.10"
robot = magicdog.MagicRobot()
if not robot.initialize(local_ip):
logging.error("Robot initialization failed")
return
robot.set_timeout(5000)
if not robot.connect():
logging.error("Robot connection failed")
robot.shutdown()
return
logging.info("Robot connected successfully")
sensor_controller = robot.get_sensor_controller()
# sensor_controller.subscribe_tof(lambda tof: logging.info(f"TOF: {len(tof.data)}"))
sensor_controller.subscribe_ultra(lambda ultra: logging.info(f"Ultra: {len(ultra.data)}"))
sensor_controller.subscribe_laser_scan(lambda laser_scan: logging.info(f"Laser Scan: {len(laser_scan.ranges)}"))
# sensor_controller.subscribe_imu(lambda imu: logging.info(f"IMU: {imu.orientation}"))
sensor_controller.subscribe_rgbd_color_camera_info(lambda camera_info: logging.info(f"RGBD Color Camera Info: {camera_info.K}"))
sensor_controller.subscribe_rgbd_depth_image(lambda depth_image: logging.info(f"RGBD Depth Image: {len(depth_image.data)}"))
sensor_controller.subscribe_rgbd_color_image(lambda color_image: logging.info(f"RGBD Color Image: {len(color_image.data)}"))
sensor_controller.subscribe_rgb_depth_camera_info(lambda camera_info: logging.info(f"RGB Depth Camera Info: {camera_info.K}"))
sensor_controller.subscribe_left_binocular_high_img(lambda img: logging.info(f"Left Binocular High Image: {len(img.data)}"))
sensor_controller.subscribe_left_binocular_low_img(lambda img: logging.info(f"Left Binocular Low Image: {len(img.data)}"))
sensor_controller.subscribe_right_binocular_low_img(lambda img: logging.info(f"Right Binocular Low Image: {len(img.data)}"))
status = sensor_controller.open_channel_switch()
if status.code != ErrorCode.OK:
logging.error(f"Failed to open channel: {status.message}")
robot.shutdown()
return
status = sensor_controller.open_laser_scan()
if status.code != ErrorCode.OK:
logging.error(f"Failed to open laser scan: {status.message}")
robot.shutdown()
return
status = sensor_controller.open_rgbd_camera()
if status.code != ErrorCode.OK:
logging.error(f"Failed to open RGBD camera: {status.message}")
robot.shutdown()
return
status = sensor_controller.open_binocular_camera()
if status.code != ErrorCode.OK:
logging.error(f"Failed to open binocular camera: {status.message}")
robot.shutdown()
return
time.sleep(10)
status = sensor_controller.close_laser_scan()
if status.code != ErrorCode.OK:
logging.error(f"Failed to close laser scan: {status.message}")
robot.shutdown()
return
status = sensor_controller.close_rgbd_camera()
if status.code != ErrorCode.OK:
logging.error(f"Failed to close RGBD camera: {status.message}")
robot.shutdown()
return
status = sensor_controller.close_binocular_camera()
if status.code != ErrorCode.OK:
logging.error(f"Failed to close binocular camera: {status.message}")
robot.shutdown()
return
time.sleep(10)
status = sensor_controller.close_channel_switch()
if status.code != ErrorCode.OK:
logging.error(f"Failed to close channel: {status.message}")
robot.shutdown()
return
# Avoid unprocessed buffered data in lcm
time.sleep(10)
robot.disconnect()
robot.shutdown()
logging.info("\nExample program execution completed!")
if __name__ == "__main__":
main()
运行说明
- 环境准备:
# 设置环境变量
export PYTHONPATH=/opt/magic_robotics/magicdog_sdk/lib:$PYTHONPATH
export LD_LIBRARY_PATH=/opt/magic_robotics/magicdog_sdk/lib:$LD_LIBRARY_PATH
- 运行示例:
# C++
./sensor_example
# Python
python3 sensor_example.py
- 停止程序:
- 按
Ctrl+C
可以安全停止程序 - 程序会自动清理所有资源