API Path
/aipaas/voice/v1/full/duplex
请求协议
WS
建立连接时的请求参数(websocket open) :
请求头部 :
| 头部标签 | 必填 | 说明 | 类型 | 数据字典 | 限制 | 头部内容 | 示例 |
|---|---|---|---|---|---|---|---|
| Content-Type | 是 | application/json | [string] | application/json | application/json | ||
| X-APP-ID | 是 | 控制台-应用管理-创建应用-AppID | [string] | ||||
| Device-Uuid | 否 | 设备管理-设备uuid | [string] | ||||
| Authorization | 是 | 鉴权信息 | [string] |
请求参数 Json:
| 参数名 | 说明 | 必填 | 类型 | 数据字典 | 限制 | 示例 |
|---|---|---|---|---|---|---|
| req_id | 请求 id,记录该值便于排查问题 | 是 | [string] | |||
| req_status | 当前的请求状态。0:开始对话;1:发送音频数据;2:结束发送音频数据;3:手动控制打断 | 是 | [int] | |||
| sampling_rate | 音频采样率,默认值16000Hz | 否 | [int] | |||
| num_channels | 输入音频数据的通道数,默认值是 1 | 否 | [int] | |||
| interrupt_sensitivity | 打断敏感度,默认值是 0.5 | 否 | [float] | |||
| enable_dialog_output | 是否开启对话文本输出,默认值是 true | 否 | [boolean] | |||
| enable_button_interrupt | 是否开启按键打断,默认值是 true | 否 | [boolean] | |||
| audio_stream | 音频数据,经过 base64 编码 | 否 | [string] |
返回结果
> 成功 (200)
> Json
> Object
| 参数名 | 说明 | 必填 | 类型 | 数据字典 | 限制 | 示例 |
|---|---|---|---|---|---|---|
| code | 返回码 | 是 | [int] | |||
| message | 返回码说明 | 是 | [string] | |||
| sid | 会话id,用于记录本次会话 | 是 | [string] | |||
| res_status | 当前服务端状态。0:服务端对话功能就绪;1:返回服务端对话结果;2:音频数据发送完成,服务端对话结束; | 是 | [int] | |||
| result | 识别或合成结果 | 是 | [object] | |||
| result>>type | 本次结果的类型:0:ASR;1:TTS;2:NLP 回答 ;3:Control Signal | 是 | [int] | |||
| result>>recog_status | 当前识别状态。1:服务端检测到有效语音开始时间点;2:返回中间转写结果;3:服务端检测到一句话结束,返回该 句的转写结果;4:服务端收到音频流结束的信号,返回识别结束信号; | 是 | [int] | |||
| result>>text | type 为 0 时为语音识别结果,type 为 2 时,为 NLP 结果。 | 是 | [string] | |||
| result>>audio | 合成音频数据,经过 base64 编码 | 是 | [string] | |||
| result>>is_end | 标志位。true:最后一个合成片段;false:中间合成片段 | 是 | [boolean] | |||
| result>>command | 当前控制信号表示的命令。Interrupt:打断信号 | 是 | [string] |
语音对话机器人可以与用户开展实时语音对话。支持输入格式:PCM 编码;支持单声道输入;支持 8000/16000 Hz 输入采样率;合成音频采样率 22050 Hz。
服务接口调用时需要严格遵循服务鉴权规则,服务调用鉴权规则请参见:开发指南 - 接口签名认证。
| 返回码 | 解释 | 说明 | 解决方法 |
|---|---|---|---|
| 10301 | Required parameter miss | 参数错误 | 检查请求体是否符合接口协议 |
| 10304 | Parse request body fail | 请求格式错误 | 查看请求的 URL body 格式是否正确,参考接口文档 |
| 10503 | Server connection time out | 服务连接超时 | 联系技术人员 |
| 10903 | Process failed | 处理失败 | 联系技术人员 |
| 10000 | Success | 成功 | 执行下一步操作 |
通用状态码请参考【状态码】中的【网关认证】
| 名称 | 类型 | 必需 | 说明 |
|---|---|---|---|
| req_id | string | 是 | 请求全局唯一 |
| req_status | int | 是 | 当前的请求状态。0:开始对话送音频数据 |
| sampling_rate | int | 否 | 音频采样率,默认值16000Hz |
| num_channels | int | 否 | 输入音频数据的通道数,默认值是 1 |
| interrupt_sensitivity | float | 否 | 打断敏感度,默认值是 0.5 |
| enable_dialog_output | bool | 否 | 是否开启对话文本输出,默认值是 true |
| enable_button_interrupt | bool | 否 | 是否开启按键打断,默认值是 true |
{
"req_id": "test001",
"req_status": 0,
"sampling_rate": 16000,
"num_channels": 1
}
| 名称 | 类型 | 必需 | 说明 |
|---|---|---|---|
| req_status | int | 是 | 当前的请求状态。1:发送音频数据 |
| audio_stream | string | 是 | 音频数据,经过 base64 编码 |
{
"req_status": 1,
"audio_stream": "yourBase64"
}
| 名称 | 类型 | 必需 | 说明 |
|---|---|---|---|
| req_status | int | 是 | 当前的请求状态。2:结束发送语音数据 |
{
"req_status": 2
}
import com.alibaba.fastjson.JSONObject;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
/**
* 为保证示例可以正常运行,请确认maven中是否存在以下依赖:
*
* org.java-websocket
* Java-WebSocket
* 1.5.3
*
*
* com.alibaba
* fastjson
* 2.0.49
*
*/
public class WebSocketExample extends WebSocketClient {
public WebSocketExample(URI serverUri, Map headers) {
super(serverUri, headers);
}
public static void main(String[] args) {
try {
// 创建WebSocket客户端
URI uri = new URI("算法调用地址");
Map headers = new HashMap();
//调用鉴权
headers.put("Content-Type", "application/json");
headers.put("X-APP-ID", "yourAppId");
headers.put("Authorization", "yourAuthorization");
WebSocketExample client = new WebSocketExample(uri, headers);
client.connect();
// 等待一段时间后关闭连接
Thread.sleep(5000);
client.close();
} catch (URISyntaxException | InterruptedException e) {
e.printStackTrace();
System.out.println("WebSocket error: " + e.getMessage());
}
}
@Override
public void onOpen(ServerHandshake handshakedata) {
System.out.println("WebSocket connection opened");
// 发送消息
try {
// 初始化连接消息
JSONObject initSend = new JSONObject();
initSend.put("req_id", "test001");
initSend.put("req_status", 0);
initSend.put("sampling_rate", 16000);
initSend.put("num_channels", 1);
send(initSend.toJSONString());
// 发送音频文件base64
JSONObject audioSend = new JSONObject();
String audioFilePath = "音频文件路径";
// 读取音频文件的字节
byte[] audioBytes = Files.readAllBytes(Paths.get(audioFilePath));
List chunkedList = chunkByteArray(audioBytes, 400);
// 将字节转换为Base64编码的字符串
for (byte[] chunked : chunkedList) {
String base64 = Base64.getEncoder().encodeToString(chunked);
audioSend.put("req_status", 1);
audioSend.put("audio_stream", base64);
send(audioSend.toJSONString());
}
// 发送结束消息
JSONObject endSend = new JSONObject();
endSend.put("req_status", 2);
send(endSend.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("WebSocket connection closed: " + reason);
}
@Override
public void onError(Exception ex) {
System.err.println("WebSocket error: " + ex.getMessage());
}
/**
* 将byte数组按照指定长度截断并存放在list中
*
* @param originalArray 原始byte数组
* @param chunkSize 每个截断部分的长度
* @return 包含截断后byte数组的list
*/
public static List chunkByteArray(byte[] originalArray, int chunkSize) {
List chunkedList = new ArrayList();
if (originalArray == null || chunkSize <= 0) {
throw new IllegalArgumentException("Invalid input parameters");
}
for (int i = 0; i < originalArray.length; i += chunkSize) {
int remainingLength = originalArray.length - i;
int lengthToCopy = Math.min(chunkSize, remainingLength);
byte[] chunk = new byte[lengthToCopy];
System.arraycopy(originalArray, i, chunk, 0, lengthToCopy);
chunkedList.add(chunk);
}
return chunkedList;
}
}
import logging
import os
import json
import time
import base64
import asyncio
import argparse
import websockets
import multiprocessing as mp
from scipy.io import wavfile
# 配置日志记录
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# 定义音频流的最大切分长度
CHUNK_SIZE = 6400
async def uws_loop_send(args):
"""
发送音频流的异步函数
:param args: 包含 websocket, idx, wav_id, audio_file, task_id, t_slp 的元组
"""
websocket, idx, wav_id, audio_file, task_id, t_slp = args
try:
logger.info(f"pid: {os.getpid()} idx: {idx:3}, task_id: {task_id:36} send_info: start loop send 1")
while True:
is_end = False
# 读取音频文件的一段数据
audio_stream = audio_file.read(CHUNK_SIZE)
# 将音频数据编码为 base64 格式
audio_stream = base64.b64encode(audio_stream).decode("utf-8")
# 检查是否到达文件末尾
try_stream = audio_file.read(1)
if not try_stream:
is_end = True
else:
audio_file.seek(-1, 1)
# 构建发送的消息
mid_json = {
"audio_stream": audio_stream,
"req_status": 1
}
# 发送消息
await websocket.send(json.dumps(mid_json))
await asyncio.sleep(t_slp)
if is_end:
logger.info(f"pid: {os.getpid()} idx: {idx:3} task_id: {task_id:36} send_info: end loop send 1")
break
except Exception as e:
logger.error(f"pid: {os.getpid()} idx: {idx:3} task_id: {task_id:36} send_info: send 1 fail: {e}")
return 1
return 0
async def uws_recv(args):
"""
接收服务器响应的异步函数
:param args: 包含 websocket, idx, wav_id, task_id 的元组
"""
websocket, idx, wav_id, task_id = args
logger.info(f"pid: {os.getpid()} idx: {idx:3} task_id: {task_id:36} recv_info: start recv")
total_text = ""
while True:
try:
# 接收服务器返回的消息
message = await websocket.recv()
message = json.loads(message)
# 处理不同的响应状态
if 'res_status' in message:
if message['res_status'] in [2, 3]:
logger.info(f"pid: {os.getpid()} task_id: {task_id} recv: {message['data']['results'][0]['text']}")
if message['res_status'] in [3, 4]:
total_text += message['data']['results'][0]['text']
if message['res_status'] == 4:
logger.info(f"pid: {os.getpid()} task_id: {task_id} end recv: {total_text}")
break
except Exception as e:
logger.error(f"pid: {os.getpid()} task_id: {task_id} recv fail: {e}")
return
async def uws_call(args):
"""
主调用函数,负责连接 WebSocket 并管理发送和接收任务
:param args: 命令行参数
"""
idx = 0
url = args.url
wav_id = args.audio_path.split("/")[-1].split(".")[0]
fs, wav = wavfile.read(args.audio_path)
num_samples = wav.shape[0]
audio_file = open(args.audio_path, 'rb')
audio_file.seek(0)
audio_file.read(44) # 跳过 WAV 文件头
header = {
"Content-Type": "application/json",
'X-APP-ID': args.X_APP_ID,
'Authorization': args.Authorization
}
try:
async with websockets.connect(url, extra_headers=header) as websocket:
# 发送初始请求
start_json = {
"req_id": "test001",
"req_status": 0,
"sampling_rate": 16000,
"num_channels": 1
}
logger.info(f"===========================start_json:{start_json}================================")
logger.info(f"pid: {os.getpid()} idx: {idx:3} task_id: {wav_id:10} send_info: send 0")
await websocket.send(json.dumps(start_json))
message = await websocket.recv()
message = json.loads(message)
if message['code'] == 10000:
logger.info(
f"pid: {os.getpid()} idx: {idx:3} task_id: {message['sid']} recv_info: {message['code']:5} {message['res_status']}")
# 循环发送音频流
send_time = time.perf_counter()
loop_send_task = asyncio.ensure_future(
uws_loop_send([websocket, idx, wav_id, audio_file, message['sid'], CHUNK_SIZE / 2 / fs]))
recv_task = asyncio.ensure_future(uws_recv([websocket, idx, wav_id, message['sid']]))
await loop_send_task
ret = loop_send_task.result()
if ret:
return 1
# 发送停止请求
stop_json = {
"req_status": 2
}
logger.info(f"===========================stop_json:{stop_json}================================")
await websocket.send(json.dumps(stop_json))
await recv_task
cost = time.perf_counter() - send_time
audio_len_in_ms = num_samples * 1000 / fs
rtf = float(cost * 1000) / float(audio_len_in_ms)
logger.info(
f"pid: {os.getpid()} idx: {idx:3} task_id: {message['sid']} info: dur: {audio_len_in_ms} cost: {cost * 1000} rtf: {rtf}")
ret = recv_task.result()
if ret == 1:
return 1
else:
logger.info(f"pid: {os.getpid()} idx: {idx:3} message: {message}")
await websocket.close()
except Exception as e:
logger.info(f"pid: {os.getpid()} idx: {idx:3} info: {e}")
audio_file.close()
return 1
audio_file.close()
return 0
def func(args):
"""
运行 uws_call 函数的包装函数
:param args: 命令行参数
"""
if args.forever:
while 1:
asyncio.get_event_loop().run_until_complete(uws_call(args))
else:
asyncio.get_event_loop().run_until_complete(uws_call(args))
def main(args):
"""
主函数,负责启动多个进程
:param args: 命令行参数
"""
process_list = []
for i in range(args.num_processes):
p = mp.Process(target=func, args=(args,))
p.start()
process_list.append(p)
for p in process_list:
p.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--url', type=str, default='算法调用地址')
parser.add_argument('--X_APP_ID', type=str, default='yourAppId')
parser.add_argument('--Authorization', type=str, default='yourAuthorization')
parser.add_argument("--num_processes", type=int, default=1)
parser.add_argument("--audio_path", type=str, default="yourAudioPath")
parser.add_argument("--forever", action="store_true")
args = parser.parse_args()
main(args)