API Path
/aipaas/lm/v1/stream/cnAsr
请求协议
WSS
请求头部 :
| 头部标签 | 必填 | 说明 | 类型 | 数据字典 | 限制 | 头部内容 | 示例 |
|---|---|---|---|---|---|---|---|
| X-APP-ID | 是 | 控制台-应用管理-创建应用-AppID,公网鉴权,公网调用时必传 | [string] | ||||
| Authorization | 是 | 公网鉴权,公网调用时必传 | [string] | ||||
| Device-Uuid | 否 | 设备管理-设备uuid | [string] |
请求参数 :
| 参数名 | 说明 | 必填 | 类型 | 数据字典 | 限制 | 示例 |
|---|---|---|---|---|---|---|
| option | 语音识别配置可选项,客户端发送开始识别请求时,根据具体需求,配置该字段 | 否 | [object] | |||
| option>>sample_rate | 音频采样率,默认值16000Hz | 否 | [int] | |||
| option>>enable_punctuation | 是否加标点,默认:是 | 否 | [boolean] | |||
| option>>enable_inverse_text_normalization | 是否开启 ITN,默认:是 | 否 | [boolean] | |||
| option>>enable_emendation | 是否开启校勘(仅对多方言服务有效),默认:否 | 否 | [boolean] | |||
| option>>hotwords | 热词列表 | 否 | [array] | |||
| req_id | 请求全局唯一id,记录该值便于排查问题 | 是 | [string] | |||
| rec_status | 当前是开始识别状态,设置rec_status=0,识别状态详解 0:开始识别 1:发送语音流 2: 结束语音流 | 是 | [int] |
返回结果 :
| 参数名 | 说明 | 必填 | 类型 | 数据字典 | 限制 | 示例 |
|---|---|---|---|---|---|---|
| code | 返回码 | 是 | [int] | 10000 | ||
| message | 返回码描述 | 是 | [string] | success | ||
| sid | 会话全局唯一 id,用于记录本次会话 | 是 | [string] | aae36140-bc13-441f-81f9-6700fe7a5e96 | ||
| res_status | 响应状态 0:识别就绪; 1:识别到有效语音开始; 2:如果开启了返回中间结果, 则返回中间识别结果; 3:检测到一段有效语音结束,返回该段语音的识别结果; 4:处理完所有的音频数据,返回尚未返回的识别结果(如果有); | 是 | [int] | 2 | ||
| data | 识别结果,服务端接收到语音流后返回 | 否 | [object] | |||
| data>>sn | 句子编号,从 1 开始 | 否 | [int] | |||
| data>>results | 当前句子识别结果,如果开启 object.nbest ,则返回多个结果 | 否 | [array] | |||
| data>>results>>text | 句子识别结果 | 否 | [string] | |||
| data>>results>>begin_time | 句子开始时间,单位是毫秒 | 否 | [int] | |||
| data>>results>>end_time | 句子结束时间,单位是毫秒 | 否 | [int] |
实时语音识别用于实时音频流转换为文字场景,支持一边上传音频流,一边实时返回转写结果。支持语言(language参数):sichuan-四川话,eng-英语,chn-普通话,wuhan-武汉话,yueyu-粤语,支持单通道 pcm_s16le编码的音频识别。
服务接口调用时需要严格遵循服务鉴权规则
公网服务调用鉴权规则请参见:开发指南 - 接口签名认证
通用状态码请参见【状态码】中的【网关认证】,其余状态码如下:
| code | 说明 | 错误描述信息 | 解决方法 |
|---|---|---|---|
| 101 | 成功 | {"message":"success"} | 成功,开始语音识别 |
| 4002 | 并发超过限制 | {"message":"worker size overflow"} | 联系商务,增加并发 |
| 返回码 | 描述 | 说明 | 处理方式 |
|---|---|---|---|
| 10000 | success | 成功 | 执行下一步操作 |
| 10001 | parse request body failurl body | URL body 格式不对 | 查看请求的url body格式是否正确,参考接口文档 |
| 10002 | session not found | 会话id查询失败 | 检查客户端发送的请求,通常是因为没有发送开始识别请求 |
| 10003 | required parameter miss | 参数缺失 | 检查接口文档,补全入参 |
| 10004 | duplicated session id | 会话id重复 | 检查客户端发送的请求,通常是因为重复发送开始识别请求 |
| 10005 | engine overflow | 引擎超并发 | 联系研发人员进行排查 |
| 10006 | unknown error | 未知错误 | 联系研发人员进行排查 |
客户端发送开始识别请求,需要通过请求body带语音识别过程中的可选配置参数,示例:
{
"option": {
"sample_rate": 8000,
"enable_intermediate_result": false,
"enable_punctuation": true,
"enable_inverse_text_normalization": true
},
"req_id": "aae36140-bc13-441f-81f9-6700fe7a5e96",
"rec_status": 0
}
客户端接收到服务端发送的确认识别请求有效的响应后,开始发送语音流数据,请求body各参数含义如下:
| 参数 | 类型 | 必须 | 说明 |
|---|---|---|---|
| rec_status | Integer | 是 | 识别状态0:开始识别;1:发送语音流;2:结束语音流; |
| audio_stream | String | 是 | 语音流,采用base64编码 |
示例:
{
"rec_status":1,
"audio_stream":"000asraae361406700fe7a5e9681f956210b5f1270"
}
客户端语音流发送完成,结束语音流,请求body各参数含义如下:
| 参数 | 类型 | 必须 | 说明 |
|---|---|---|---|
| rec_status | Integer | 是 | rec_status=2 结束语音流 |
示例:
{
"rec_status":2
}
客户端如果不需要继续进行语音识别,则立即关闭websocket 连接(避免占用资源),如果需要继续进行语音识别(多轮对话场景),需要从开始识别状态开始,按照上述步骤依次执行
import com.alibaba.fastjson.JSONObject;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 为保证示例可以正常运行,请确认maven中是否存在以下依赖:
*
* org.java-websocket
* Java-WebSocket
* 1.5.3
*
*
* com.alibaba
* fastjson
* 2.0.49
*
*/
public class WebSocketExample extends WebSocketClient {
private final AtomicBoolean gotEnd = new AtomicBoolean(false);
public WebSocketExample(URI serverUri, Map headers) {
super(serverUri, headers);
this.setConnectionLostTimeout(0);
}
public static void main(String[] args) {
try {
// 创建WebSocket客户端
URI uri = new URI("wss://xxx/aipaas/lm/v1/stream/cnAsr");
Map headers = new HashMap();
//调用鉴权
headers.put("Content-Type", "application/json");
headers.put("X-APP-ID", "xxx");
headers.put("Authorization", "xxx");
WebSocketExample client = new WebSocketExample(uri, headers);
client.connect();
// 循环检查 gotEnd
while (!client.gotEnd.get()) {
Thread.sleep(100); // 100ms 轮询一次
}
client.close();
} catch (URISyntaxException | InterruptedException e) {
e.printStackTrace();
System.out.println("WebSocket error: " + e.getMessage());
}
}
@Override
public void onOpen(ServerHandshake handshakedata) {
System.out.println("WebSocket connection opened");
// 发送消息
try {
// 初始化连接消息
JSONObject initSend = new JSONObject();
initSend.put("req_id", "test001");
initSend.put("rec_status", 0);
JSONObject option = new JSONObject();
option.put("sample_rate", 8000);
option.put("enable_punctuation", true);
option.put("enable_inverse_text_normalization", true);
initSend.put("option", option);
send(initSend.toJSONString());
// 发送音频文件base64
JSONObject audioSend = new JSONObject();
String audioFilePath = "your_path";
// 读取音频文件的字节
byte[] audioBytes = Files.readAllBytes(Paths.get(audioFilePath));
List chunkedList = chunkByteArray(audioBytes, 400);
// 将字节转换为Base64编码的字符串
for (Object chunkObj : chunkedList) {
byte[] chunked = (byte[]) chunkObj;
String base64 = Base64.getEncoder().encodeToString(chunked);
audioSend.put("rec_status", 1);
audioSend.put("audio_stream", base64);
send(audioSend.toJSONString());
}
// 发送结束消息
JSONObject endSend = new JSONObject();
endSend.put("rec_status", 2);
send(endSend.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onMessage(String message) {
System.out.println("Received message: " + message);
JSONObject obj = JSONObject.parseObject(message);
Integer resStatus = obj.getInteger("res_status");
if (resStatus == 4) {
gotEnd.set(true);
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("WebSocket connection closed: " + reason);
}
@Override
public void onError(Exception ex) {
System.err.println("WebSocket error: " + ex.getMessage());
}
/**
* 将byte数组按照指定长度截断并存放在list中
*
* @param originalArray 原始byte数组
* @param chunkSize 每个截断部分的长度
* @return 包含截断后byte数组的list
*/
public static List chunkByteArray(byte[] originalArray, int chunkSize) {
List chunkedList = new ArrayList();
if (originalArray == null || chunkSize <= 0) {
throw new IllegalArgumentException("Invalid input parameters");
}
for (int i = 0; i < originalArray.length; i += chunkSize) {
int remainingLength = originalArray.length - i;
int lengthToCopy = Math.min(chunkSize, remainingLength);
byte[] chunk = new byte[lengthToCopy];
System.arraycopy(originalArray, i, chunk, 0, lengthToCopy);
chunkedList.add(chunk);
}
return chunkedList;
}
}
import logging
import os
import json
import time
import base64
import asyncio
import argparse
import websockets
import multiprocessing as mp
from scipy.io import wavfile
# 配置日志记录
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# 定义音频流的最大切分长度
CHUNK_SIZE = 6400
async def uws_loop_send(args):
"""
发送音频流的异步函数
:param args: 包含 websocket, idx, wav_id, audio_file, task_id, t_slp 的元组
"""
websocket, idx, wav_id, audio_file, task_id, t_slp = args
try:
logger.info(f"pid: {os.getpid()} idx: {idx:3}, task_id: {task_id:36} send_info: start loop send 1")
while True:
is_end = False
# 读取音频文件的一段数据
audio_stream = audio_file.read(CHUNK_SIZE)
# 将音频数据编码为 base64 格式
audio_stream = base64.b64encode(audio_stream).decode("utf-8")
# 检查是否到达文件末尾
try_stream = audio_file.read(1)
if not try_stream:
is_end = True
else:
audio_file.seek(-1, 1)
# 构建发送的消息
mid_json = {
"audio_stream": audio_stream,
"rec_status": 1
}
# 发送消息
await websocket.send(json.dumps(mid_json))
await asyncio.sleep(t_slp)
if is_end:
logger.info(f"pid: {os.getpid()} idx: {idx:3} task_id: {task_id:36} send_info: end loop send 1")
break
except Exception as e:
logger.error(f"pid: {os.getpid()} idx: {idx:3} task_id: {task_id:36} send_info: send 1 fail: {e}")
return 1
return 0
async def uws_recv(args):
"""
接收服务器响应的异步函数
:param args: 包含 websocket, idx, wav_id, task_id 的元组
"""
websocket, idx, wav_id, task_id = args
logger.info(f"pid: {os.getpid()} idx: {idx:3} task_id: {task_id:36} recv_info: start recv")
total_text = ""
while True:
try:
# 接收服务器返回的消息
message = await websocket.recv()
message = json.loads(message)
# 处理不同的响应状态
if 'res_status' in message:
if message['res_status'] in [2, 3]:
logger.info(f"pid: {os.getpid()} task_id: {task_id} recv: {message['data']['results'][0]['text']}")
if message['res_status'] in [3, 4]:
total_text += message['data']['results'][0]['text']
if message['res_status'] == 4:
logger.info(f"pid: {os.getpid()} task_id: {task_id} end recv: {total_text}")
break
except Exception as e:
logger.error(f"pid: {os.getpid()} task_id: {task_id} recv fail: {e}")
return
async def uws_call(args):
"""
主调用函数,负责连接 WebSocket 并管理发送和接收任务
:param args: 命令行参数
"""
idx = 0
url = args.url
wav_id = args.audio_path.split("/")[-1].split(".")[0]
fs, wav = wavfile.read(args.audio_path)
num_samples = wav.shape[0]
audio_file = open(args.audio_path, 'rb')
audio_file.seek(0)
audio_file.read(44) # 跳过 WAV 文件头
header = {
# 公网调用鉴权,与内网鉴权必传其一
'X-APP-ID': args.X_APP_ID,
'Authorization': args.Authorization
}
try:
async with websockets.connect(url, extra_headers=header) as websocket:
# 发送初始请求
start_json = {
'option': {
'sample_rate': fs
},
'req_id': "test121801",
'rec_status': 0
}
logger.info(f"===========================start_json:{start_json}================================")
logger.info(f"pid: {os.getpid()} idx: {idx:3} task_id: {wav_id:10} send_info: send 0")
await websocket.send(json.dumps(start_json))
message = await websocket.recv()
message = json.loads(message)
if message['code'] == 10000:
logger.info(
f"pid: {os.getpid()} idx: {idx:3} task_id: {message['sid']} recv_info: {message['code']:5} {message['res_status']}")
# 循环发送音频流
send_time = time.perf_counter()
loop_send_task = asyncio.ensure_future(
uws_loop_send([websocket, idx, wav_id, audio_file, message['sid'], CHUNK_SIZE / 2 / fs]))
recv_task = asyncio.ensure_future(uws_recv([websocket, idx, wav_id, message['sid']]))
await loop_send_task
ret = loop_send_task.result()
if ret:
return 1
# 发送停止请求
stop_json = {
"rec_status": 2
}
logger.info(f"===========================stop_json:{stop_json}================================")
await websocket.send(json.dumps(stop_json))
await recv_task
cost = time.perf_counter() - send_time
audio_len_in_ms = num_samples * 1000 / fs
rtf = float(cost * 1000) / float(audio_len_in_ms)
logger.info(
f"pid: {os.getpid()} idx: {idx:3} task_id: {message['sid']} info: dur: {audio_len_in_ms} cost: {cost * 1000} rtf: {rtf}")
ret = recv_task.result()
if ret == 1:
return 1
else:
logger.info(f"pid: {os.getpid()} idx: {idx:3} message: {message}")
await websocket.close()
except Exception as e:
logger.info(f"pid: {os.getpid()} idx: {idx:3} info: {e}")
audio_file.close()
return 1
audio_file.close()
return 0
def func(args):
"""
运行 uws_call 函数的包装函数
:param args: 命令行参数
"""
if args.forever:
while 1:
asyncio.get_event_loop().run_until_complete(uws_call(args))
else:
asyncio.get_event_loop().run_until_complete(uws_call(args))
def main(args):
"""
主函数,负责启动多个进程
:param args: 命令行参数
"""
process_list = []
for i in range(args.num_processes):
p = mp.Process(target=func, args=(args,))
p.start()
process_list.append(p)
for p in process_list:
p.join()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--url', type=str, default='算法调用地址')
parser.add_argument('--X_APP_ID', type=str, default='yourAppId')
parser.add_argument('--Authorization', type=str, default='yourAuthorization')
parser.add_argument("--num_processes", type=int, default=1)
parser.add_argument("--audio_path", type=str, default="yourAudioPath")
parser.add_argument("--forever", action="store_true")
args = parser.parse_args()
main(args)