语音控制示例
约 1798 字大约 6 分钟
2025-05-29
该示例展示了如何使用 Magicbot-Gen1 SDK 进行初始化、连接机器人、音频控制(获取音量、设置音量、TTS 播放、音频流订阅)等基本操作。
C++
示例文件:audio_example.cpp
参考文档:audio_reference.md
#include "magic_robot.h"
#include <termios.h>
#include <unistd.h>
#include <csignal>
#include <iostream>
using namespace magic::gen1;
magic::gen1::MagicRobot robot;
void signalHandler(int signum) {
std::cout << "Interrupt signal (" << signum << ") received.\n";
robot.Shutdown();
// Exit process
exit(signum);
}
void print_help(const char* prog_name) {
std::cout << "Key Function Demo Program\n\n";
std::cout << "Usage: " << prog_name << "\n";
std::cout << "Key Function Description:\n";
std::cout << " ESC Exit program\n";
std::cout << " 1 Function 1: Get volume\n";
std::cout << " 2 Function 2: Set volume\n";
std::cout << " 3 Function 3: Play TTS\n";
std::cout << " 4 Function 4: Stop playback\n";
std::cout << " 5 Function 5: Open audio stream\n";
std::cout << " 6 Function 6: Close audio stream\n";
std::cout << " 7 Function 7: Subscribe to audio stream\n";
}
int getch() {
struct termios oldt, newt;
int ch;
tcgetattr(STDIN_FILENO, &oldt); // Get current terminal settings
newt = oldt;
newt.c_lflag &= ~(ICANON | ECHO); // Disable buffering and echo
tcsetattr(STDIN_FILENO, TCSANOW, &newt);
ch = getchar(); // Read key press
tcsetattr(STDIN_FILENO, TCSANOW, &oldt); // Restore settings
return ch;
}
void GetVolume() {
// Get audio controller
auto& controller = robot.GetAudioController();
// Get volume
int get_volume = 0;
auto status = controller.GetVolume(get_volume);
if (status.code != ErrorCode::OK) {
std::cerr << "get volume failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
return;
}
std::cout << "get volume success, volume: " << std::to_string(get_volume) << std::endl;
}
void SetVolume() {
// Get audio controller
auto& controller = robot.GetAudioController();
// Set volume
auto status = controller.SetVolume(7);
if (status.code != ErrorCode::OK) {
std::cerr << "set volume failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
return;
}
std::cout << "set volume success" << std::endl;
}
void PlayTts() {
// Get audio controller
auto& controller = robot.GetAudioController();
// Play speech
TtsCommand tts;
tts.id = "100000000001";
tts.content = "How's the weather today!";
tts.priority = TtsPriority::HIGH;
tts.mode = TtsMode::CLEARTOP;
auto status = controller.Play(tts);
if (status.code != ErrorCode::OK) {
std::cerr << "play tts failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
return;
}
std::cout << "play tts success" << std::endl;
}
void StopTts() {
// Get audio controller
auto& controller = robot.GetAudioController();
// Stop speech playback
auto status = controller.Stop();
if (status.code != ErrorCode::OK) {
std::cerr << "stop tts failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
return;
}
std::cout << "stop tts success" << std::endl;
}
void OpenAudioStream() {
// Get audio controller
auto& controller = robot.GetAudioController();
// Open audio stream
auto status = controller.OpenAudioStream();
if (status.code != ErrorCode::OK) {
std::cerr << "open audio stream failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
return;
}
std::cout << "open audio stream success" << std::endl;
}
void CloseAudioStream() {
// Get audio controller
auto& controller = robot.GetAudioController();
// Close audio stream
auto status = controller.CloseAudioStream();
if (status.code != ErrorCode::OK) {
std::cerr << "close audio stream failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
return;
}
std::cout << "close audio stream success" << std::endl;
}
void SubscribeAudioStream() {
// Get audio controller
auto& controller = robot.GetAudioController();
// Subscribe to audio streams
controller.SubscribeOriginAudioStream([](const std::shared_ptr<AudioStream> data) {
static int32_t counter = 0;
if (counter++ % 30 == 0) {
std::cout << "Received origin audio stream data, size: " << data->data_length << std::endl;
}
});
controller.SubscribeBfAudioStream([](const std::shared_ptr<AudioStream> data) {
static int32_t counter = 0;
if (counter++ % 30 == 0) {
std::cout << "Received bf audio stream data, size: " << data->data_length << std::endl;
}
});
std::cout << "Subscribed to audio streams" << std::endl;
}
int main(int argc, char* argv[]) {
// Bind SIGINT (Ctrl+C)
signal(SIGINT, signalHandler);
print_help(argv[0]);
std::string local_ip = "192.168.54.111";
// Configure local IP address for direct network connection and initialize SDK
if (!robot.Initialize(local_ip)) {
std::cerr << "robot sdk initialize failed." << std::endl;
robot.Shutdown();
return -1;
}
// Set RPC timeout to 5s
robot.SetTimeout(5000);
// Connect to robot
auto status = robot.Connect();
if (status.code != ErrorCode::OK) {
std::cerr << "connect robot failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
std::cout << "Press any key to continue (ESC to exit)..."
<< std::endl;
// Wait for user input
while (1) {
int key = getch();
if (key == 27)
break; // ESC key ASCII code is 27
std::cout << "Key ASCII: " << key << ", Character: " << static_cast<char>(key) << std::endl;
switch (key) {
case '1': {
// Get volume
GetVolume();
break;
}
case '2': {
// Set volume
SetVolume();
break;
}
case '3': {
PlayTts();
break;
}
case '4': {
StopTts();
break;
}
case '5': {
OpenAudioStream();
break;
}
case '6': {
CloseAudioStream();
break;
}
case '7': {
SubscribeAudioStream();
break;
}
default:
std::cout << "Unknown key: " << key << std::endl;
break;
}
usleep(10000);
}
// Disconnect from robot
status = robot.Disconnect();
if (status.code != ErrorCode::OK) {
std::cerr << "disconnect robot failed"
<< ", code: " << status.code
<< ", message: " << status.message << std::endl;
robot.Shutdown();
return -1;
}
robot.Shutdown();
return 0;
}
Python
示例文件:audio_example.py
参考文档:audio_reference.md
完整代码示例
#!/usr/bin/env python3
import sys
import time
import signal
import threading
import logging
from typing import Optional
import magicbot_gen1_python as magicbot
# Configure logging format and level
logging.basicConfig(
level=logging.INFO, # Minimum log level
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Global variables
robot: Optional[magicbot.MagicRobot] = None
running = True
def signal_handler(signum, frame):
"""Signal handler function for graceful exit"""
global running, robot
logging.info("Received interrupt signal (%s), exiting...", signum)
running = False
if robot:
robot.disconnect()
logging.info("Robot disconnected")
robot.shutdown()
logging.info("Robot shutdown")
exit(-1)
def print_help():
"""Print help information"""
logging.info("Key Function Demo Program")
logging.info("")
logging.info("Key Function Description:")
logging.info(" ESC Exit program")
logging.info(" 1 Function 1: Get volume")
logging.info(" 2 Function 2: Set volume")
logging.info(" 3 Function 3: Play TTS")
logging.info(" 4 Function 4: Stop playback")
logging.info(" 5 Function 5: Open audio stream")
logging.info(" 6 Function 6: Close audio stream")
logging.info(" 7 Function 7: Subscribe to audio stream")
def get_volume():
"""Get volume"""
global robot
try:
# Get audio controller
controller = robot.get_audio_controller()
# Get volume
status, volume = controller.get_volume()
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to get volume, code: %s, message: %s",
status.code,
status.message,
)
return
logging.info("Successfully got volume, volume: %s", volume)
except Exception as e:
logging.error("Exception occurred while getting volume: %s", e)
def set_volume():
"""Set volume"""
global robot
try:
# Get audio controller
controller = robot.get_audio_controller()
# Set volume to 7
status = controller.set_volume(7)
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to set volume, code: %s, message: %s",
status.code,
status.message,
)
return
logging.info("Successfully set volume")
except Exception as e:
logging.error("Exception occurred while setting volume: %s", e)
def play_tts():
"""Play TTS speech"""
global robot
try:
# Get audio controller
controller = robot.get_audio_controller()
# Create TTS command
tts = magicbot.TtsCommand()
tts.id = "100000000001"
tts.content = "How's the weather today!"
tts.priority = magicbot.TtsPriority.HIGH
tts.mode = magicbot.TtsMode.CLEARTOP
# Play speech
status = controller.play(tts)
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to play TTS, code: %s, message: %s", status.code, status.message
)
return
logging.info("Successfully played TTS")
except Exception as e:
logging.error("Exception occurred while playing TTS: %s", e)
def stop_tts():
"""Stop TTS playback"""
global robot
try:
# Get audio controller
controller = robot.get_audio_controller()
# Stop speech playback
status = controller.stop()
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to stop TTS, code: %s, message: %s", status.code, status.message
)
return
logging.info("Successfully stopped TTS")
except Exception as e:
logging.error("Exception occurred while stopping TTS: %s", e)
def open_audio_stream():
"""Open audio stream"""
global robot
try:
# Get audio controller
controller = robot.get_audio_controller()
# Open audio stream
status = controller.open_audio_stream()
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to open audio stream, code: %s, message: %s",
status.code,
status.message,
)
return
logging.info("Successfully opened audio stream")
except Exception as e:
logging.error("Exception occurred while opening audio stream: %s", e)
def close_audio_stream():
"""Close audio stream"""
global robot
try:
# Get audio controller
controller = robot.get_audio_controller()
# Close audio stream
status = controller.close_audio_stream()
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to close audio stream, code: %s, message: %s",
status.code,
status.message,
)
return
logging.info("Successfully closed audio stream")
except Exception as e:
logging.error("Exception occurred while closing audio stream: %s", e)
def subscribe_audio_stream():
"""Subscribe to audio stream"""
global robot
try:
# Get audio controller
controller = robot.get_audio_controller()
# Audio stream counters
origin_counter = 0
bf_counter = 0
def origin_audio_callback(audio_stream):
"""Original audio stream callback function"""
nonlocal origin_counter
if origin_counter % 30 == 0:
logging.info(
"Received original audio stream data, size: %d",
audio_stream.data_length,
)
origin_counter += 1
def bf_audio_callback(audio_stream):
"""BF audio stream callback function"""
nonlocal bf_counter
if bf_counter % 30 == 0:
logging.info(
"Received BF audio stream data, size: %d", audio_stream.data_length
)
bf_counter += 1
# Subscribe to audio streams
controller.subscribe_origin_audio_stream(origin_audio_callback)
controller.subscribe_bf_audio_stream(bf_audio_callback)
logging.info("Subscribed to audio streams")
except Exception as e:
logging.error("Exception occurred while subscribing to audio stream: %s", e)
def get_user_input():
"""Get user input"""
try:
# Python implementation of getch() on Linux systems
import tty
import termios
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
except ImportError:
# If termios is not available, use simple input
return input("Please press a key: ").strip()
def main():
"""Main function"""
global robot, running
# Bind signal handler
signal.signal(signal.SIGINT, signal_handler)
logging.info("Robot model: %s", magicbot.get_robot_model())
# Create robot instance
robot = magicbot.MagicRobot()
try:
# Configure local IP address for direct network connection and initialize SDK
local_ip = "192.168.54.111"
if not robot.initialize(local_ip):
logging.error("Failed to initialize robot SDK")
robot.shutdown()
return -1
# Set RPC timeout to 5 seconds
robot.set_timeout(5000)
# Connect to robot
status = robot.connect()
if status.code != magicbot.ErrorCode.OK:
logging.error(
"Failed to connect to robot, code: %s, message: %s",
status.code,
status.message,
)
robot.shutdown()
return -1
logging.info("Successfully connected to robot")
# Initialize audio controller
audio_controller = robot.get_audio_controller()
if not audio_controller.initialize():
logging.error("Failed to initialize audio controller")
robot.disconnect()
robot.shutdown()
return -1
logging.info("Successfully initialized audio controller")
print_help()
logging.info("Press any key to continue (ESC to exit)...")
# Main loop
while running:
try:
key = get_user_input()
if key == "\x1b": # ESC key
break
logging.info("Key pressed: %s", key)
if key == "1":
get_volume()
elif key == "2":
set_volume()
elif key == "3":
play_tts()
elif key == "4":
stop_tts()
elif key == "5":
open_audio_stream()
elif key == "6":
close_audio_stream()
elif key == "7":
subscribe_audio_stream()
else:
logging.warning("Unknown key: %s", key)
time.sleep(0.01) # Brief delay
except KeyboardInterrupt:
break
except Exception as e:
logging.error("Exception occurred while processing user input: %s", e)
except Exception as e:
logging.error("Exception occurred during program execution: %s", e)
return -1
finally:
# Clean up resources
try:
logging.info("Clean up resources")
# Close sensor controller
audio_controller = robot.get_audio_controller()
audio_controller.shutdown()
logging.info("Audio controller closed")
# Disconnect
robot.disconnect()
logging.info("Robot connection disconnected")
# Shutdown robot
robot.shutdown()
logging.info("Robot shutdown")
except Exception as e:
logging.error("Exception occurred while cleaning up resources: %s", e)
if __name__ == "__main__":
sys.exit(main())
运行说明
- 环境准备:
# 设置环境变量
export PYTHONPATH=/opt/magic_robotics/magicbot_gen1_sdk/lib:$PYTHONPATH
export LD_LIBRARY_PATH=/opt/magic_robotics/magicbot_gen1_sdk/lib:$LD_LIBRARY_PATH
- 运行示例:
# C++
./audio_example
# Python
python3 audio_example.py
- 控制说明:
ESC
:退出程序1
:获取当前音量2
:设置音量为73
:播放TTS语音4
:停止TTS播放5
:打开音频流6
:关闭音频流7
:订阅音频流数据
- 停止程序:
- 按
Ctrl+C
可以安全停止程序 - 程序会自动清理所有资源