API Path
签名认证方式
本鉴权目的在于为星河系统的鉴权做增强鉴权认证体系,为了系统提供更加高效安全的通信方式。
公网域名:openapi.teleagi.cn
端口:443
接口地址:查看每个接口文档的头部连接地址
请求地址:
https://openapi.teleagi.cn:443/{接口地址}
wss://openapi.teleagi.cn:443/{接口地址}
请求头Header传入如下字段,字段说明见下表格
| 字段名 | 是否必填 | 字段说明 |
|---|---|---|
| Authorization | 是 | 签名 |
| X-APP-ID | 是 | 用户申请的账号ID |
| OrderNum | 否 | 订单号 |
使用快速生成工具 : https://www.teleai.com.cn/signature
Authorization={originName}/{X-APP-ID}/{region}/{timestamp}/{expirationPeriodInSeconds}/{signedHeaders}/{signature}
其中
HTTP请求头header根据自身情况进行赋值(推荐给默认值x-app-id)
比如当前请求的header中包含
A:a
A1:a
A2:a
A3:a
X-APP-ID:XXX #这个header的值是必填的 所以singnedHeaders 可以直接默认给x-app-id
以上边的hader为基础 signedHeaders 可以是 key的任意组合(推荐 x-app-id)
eg: a;a1;a2
eg: a1;a2;x-app-id
eg: x-app-id



| 编码 | 省市 | 缩写 |
|---|---|---|
| 000000 | 全国 | QG |
| 110000 | 北京市 | BJ |
| 120000 | 天津市 | TJ |
| 130000 | 河北省 | HE |
| 140000 | 山西省 | SX |
| 150000 | 内蒙古 | NM |
| 210000 | 辽宁省 | LN |
| 220000 | 吉林省 | JL |
| 230000 | 黑龙江 | HL |
| 310000 | 上海市 | SH |
| 320000 | 江苏省 | JS |
| 330000 | 浙江省 | ZJ |
| 340000 | 安徽省 | AH |
| 350000 | 福建省 | FJ |
| 360000 | 江西省 | JX |
| 370000 | 山东省 | SD |
| 410000 | 河南省 | HA |
| 420000 | 湖北省 | HB |
| 430000 | 湖南省 | HN |
| 440000 | 广东省 | GD |
| 450000 | 广西省 | GX |
| 460000 | 海南省 | HI |
| 500000 | 重庆市 | CQ |
| 510000 | 四川省 | SC |
| 520000 | 贵州省 | GZ |
| 530000 | 云南省 | YN |
| 540000 | 西藏 | XZ |
| 610000 | 陕西省 | SN |
| 620000 | 甘肃省 | GS |
| 630000 | 青海省 | QH |
| 640000 | 宁夏 | NX |
| 650000 | 新疆 | XJ |
| 710000 | 台湾省 | TW |
| 810000 | 香港 | HK |
| 820000 | 澳门 | MO |
package cn.com.utils;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.*;
@Slf4j
public class SignUtils {
String str ;
// 授权字符串字段数
public static final int AUTHORIZATION_ITEM_NUM = 7;
// HMAC SHA256 算法常量
private static final String HMAC_SHA256_ALGORITHM = "HmacSHA256";
public static final List DEFAULT_HEADERS = Arrays.asList(
HttpHeaders.HOST,
HttpHeaders.CONTENT_TYPE,
HttpHeaders.CONTENT_LENGTH,
"Content-Md5");
/**
* 生成签名
* @param signPrefix 签名前缀
* @param appKey 应用密钥
* @param method HTTP请求方法
* @param path HTTP请求uri
* @param params HTTP请求参数
* @param headers HTTP请求headers
* @return 生成的签名
*/
public static String genSignature(String signPrefix,String appKey, HttpMethod method, String path,
Map params, Map headers,String signedHeaders) {
// 格式化基础签名信息
String signingKey;
try {
// 生成签名密钥
signingKey = generateHmacSHA256Hex(signPrefix, appKey);
} catch (Exception e) {
log.error("{signPrefix}/{X-APP-ID}/{region}/{timestamp}/{expirationPeriodInSeconds}签名错误", e);
throw new RuntimeException(e);
}
log.info("appKey:[{}] signingKey:[{}]",appKey,signingKey);
// 获取规范化的请求字符串
String canonicalRequest = getCanonicalRequest(method, path, params, headers,signedHeaders);
log.info("请求内容签名信息 canonicalRequest:\n{}",canonicalRequest);
String signature;
try {
// 生成最终签名
signature = generateHmacSHA256Hex(canonicalRequest, signingKey);
} catch (Exception e) {
log.error("canonicalRequest 签名错误", e);
throw new RuntimeException(e);
}
return signature;
}
/**
* 获取规范化的请求字符串
*
* @param method HTTP请求方法
* @param path HTTP请求uri
* @param params HTTP请求参数
* @param headers HTTP请求headers
* @param signedHeaders 签名头
* @return 规范化的请求字符串
*/
private static String getCanonicalRequest(HttpMethod method, String path, Map params,
Map headers, String signedHeaders) {
// 构建规范化的请求字符串
StringBuilder canonicalRequest = new StringBuilder();
canonicalRequest.append(method).append("\n");
String canonicalURI;
try {
// 规范化请求URI
canonicalURI = URLEncoder.encode(path, "utf-8").replaceAll("%2F", "/");
} catch (UnsupportedEncodingException e) {
log.error("url encode error", e);
throw new RuntimeException(e);
}
canonicalRequest.append(canonicalURI).append("\n");
// 获取规范化查询字符串
canonicalRequest.append(getCanonicalQueryString(params)).append("\n");
// 获取规范化请求头
String canonicalHeaders = getCanonicalHeaders(headers,signedHeaders);
canonicalRequest.append(canonicalHeaders);
return canonicalRequest.toString();
}
/**
* 获取规范化的查询字符串
*
* @param paramMap 查询参数映射
* @return 规范化的查询字符串
*/
private static String getCanonicalQueryString(Map paramMap) {
List params = new ArrayList();
for (Map.Entry param : paramMap.entrySet()) {
String key = param.getKey();
String value = param.getValue();
try {
params.add(URLEncoder.encode(key, "UTF-8") + "=" + (StrUtil.isNotBlank(value) ? URLEncoder.encode(value, "UTF-8") : ""));
} catch (UnsupportedEncodingException e) {
log.error("params encode error", e);
throw new RuntimeException(e);
}
}
// 按字母顺序排序查询参数
// params.sort(String.CASE_INSENSITIVE_ORDER);
Collections.sort(params);
return StrUtil.join( "&",params);
}
/**
* 获取规范化的请求头
* @param headerMap 请求头映射
* @param signedHeaders 签名头
* @return 规范化的请求头字符串
*/
private static String getCanonicalHeaders(Map headerMap, String signedHeaders) {
if (headerMap.isEmpty()) {
return "";
}
// 将请求头键名转换为小写
Map lowerCaseHeaderMap = new HashMap();
for (Map.Entry entry : headerMap.entrySet()) {
lowerCaseHeaderMap.put(entry.getKey().toLowerCase(), entry.getValue());
}
List headerList = new ArrayList();
List headerNames;
if (StrUtil.isNotBlank(signedHeaders)) {
headerNames = StrUtil.split(signedHeaders,";");
} else {
headerNames = DEFAULT_HEADERS;
}
for (String headerName : headerNames) {
String lowerCaseHeaderName = headerName.trim().toLowerCase();
String headerValue = lowerCaseHeaderMap.get(lowerCaseHeaderName);
if (StrUtil.isNotBlank(headerValue)) {
try {
headerList.add(lowerCaseHeaderName + ":" + StrUtil.trim(URLEncoder.encode(headerValue, "UTF-8")));
} catch (UnsupportedEncodingException e) {
log.error("headers encode error", e);
throw new RuntimeException(e);
}
}
}
// 按字母顺序排序请求头
// headerList.sort(String.CASE_INSENSITIVE_ORDER);
Collections.sort(headerList);
return StrUtil.join("\n",headerList );
}
/**
* 生成一个 HmacSHA256 哈希值。
*
* @param data 要进行哈希处理的数据。
* @param key 用于哈希处理的密钥。
* @return 生成的 HmacSHA256 哈希值,以十六进制编码表示。
* @throws Exception 如果哈希处理失败,则抛出异常。
*/
private static String generateHmacSHA256Hex(String data, String key) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(key.getBytes(), HMAC_SHA256_ALGORITHM);
Mac mac = Mac.getInstance(HMAC_SHA256_ALGORITHM);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(data.getBytes());
return bytesToHex(rawHmac);
}
/**
* 将字节数组转换为十六进制字符串。
*
* @param bytes 要转换的字节数组。
* @return 字节数组的十六进制表示。
*/
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) {
hexString.append('0');
}
hexString.append(hex);
}
return hexString.toString();
}
public static void main(String[] args) {
//特此声明里边的值需要根据需求进行替换
//时间戳
long l = System.currentTimeMillis() / 1000L;
//这里因为没有引入Request 所以模拟header的map
Map headerMap = new HashMap();
//这里根据文档里的描述,这里需要根据实际需求进行修改(推荐使用 x-app-id)
String signedheaders = "x-app-id";
//如果signedheaders 有多个那么这里需要都塞进去 同样的 当你使用request的请求的时候 你的signedheaders里的key在header都需要具备
headerMap.put("X-APP-ID", "用户申请的账号ID");
//根据文档进行替换,所请求服务资源所在的区域,可使用所在省份缩写(见本文后面的省份列表)
String region="HN";
String appId="cf83cd826a1449e0954c26198155a8ed";
String basic = "teleai-cloud-auth-v1/"+appId+"/"+region+"/"+l+"/180000";
//这里是接口的param param指的是 路径后缀 使用?后面的参数 同样的如果使用request的话 使用了url拼接?key=value 那么加密必须使用
//这里模拟的是不具备?的情况所以塞了一个空的map
Map params = new HashMap();
String sign = SignUtils.genSignature(
basic,
//X-APP-KEY,对应账号信息中的key值
"fa120879155b4ca3a3a9c63955dac602",
//请求的发送方式 POST GET 等 websocket 是GET,需要与算法的请求方式保持一致
HttpMethod.POST,
//请求路径的path
"/aipaas/cv/v1/image/cyclistsHelmetDetect",
params,
headerMap,
signedheaders);
System.out.println(sign);
//生成的就是 Authorization 注意生成的有斜杠进行拼接
System.out.println(String.format("Authorization: %s/%s/%s",basic,signedheaders,sign));
}
}
#!/bin/bash
# Function to URL encode a string
url_encode() {
local string="${1}"
local strlen=${#string}
local encoded=""
for (( pos=0; pos<strlen; pos++ )); do
c=${string:pos:1}
case "$c" in
[a-zA-Z0-9.~_-])
encoded+="$c"
;;
*)
printf -v encoded '%s%%%02X' "$encoded" "'$c"
;;
esac
done
echo "$encoded"
}
# Function to generate HMAC SHA256 signature
generate_hmac_sha256() {
local data="${1}"
local key="${2}"
echo -n "$data" | openssl dgst -sha256 -hmac "$key" | sed 's/^.* //'
}
# Function to get canonical query string
get_canonical_query_string() {
local param_string="$1"
local param_list=()
IFS='&' read -ra param_pairs <<< "$param_string"
for param_pair in "${param_pairs[@]}"; do
IFS='=' read -ra param <<< "$param_pair"
key=${param[0]}
value=${param[1]}
encoded_key=$(url_encode "$key")
encoded_value=$(url_encode "$value")
param_list+=("${encoded_key}=${encoded_value}")
done
#echo "param_list: ${param_list[@]}"
sort_params=($(for i in "${param_list[@]}"; do echo $i; done | sort))
#echo "sort_params: ${sort_params[@]}"
query_string=$(IFS="&"; echo "${sort_params[*]}")
echo "$query_string"
}
# Function to get canonical headers
get_canonical_headers() {
local header_string="$1"
local signed_headers="$2"
local header_list=()
IFS=';' read -ra headers <<< "$signed_headers"
for header_name in "${headers[@]}"; do
IFS=';' read -ra header_pairs <<< "$header_string"
for header_pair in "${header_pairs[@]}"; do
IFS='=' read -ra kv < str:
"""
将字节数组转换为十六进制字符串
Args:
byte_array: 要转换的字节数组
Returns:
字节数组的十六进制表示
"""
return byte_array.hex()
def main():
"""
演示 SignUtils 使用方法的主函数
"""
timestamp = int(time.time())
app_id = "cf83cd826a1449e0954c26198155a8ed"
app_key = "fa120879155b4ca3a3a9c63955dac602"
# 这里因为没有引入Request 所以模拟header的map
header_map = {}
# 这里根据文档里的描述,这里需要根据实际需求进行修改(推荐使用 x-app-id)
signed_headers = "x-app-id"
# 如果signedheaders 有多个那么这里需要都塞进去 同样的 当你使用request的请求的时候 你的signedheaders里的key在header都需要具备
header_map["X-APP-ID"] = app_id
# 根据文档进行替换
basic = f"teleai-cloud-auth-v1/{app_id}/QG/{timestamp}/1800"
# 这里是接口的param param指的是 路径后缀 使用?后面的参数 同样的如果使用request的话 使用了url拼接?key=value 那么加密必须使用
# 这里模拟的是不具备?的情况所以塞了一个空的map
params = {}
params["q"] = "123"
sign = SignUtils.gen_signature(
basic,
# X-APP-KEY
app_key,
# 请求的发送方式 POST GET 等 websocket 是GET
"POST",
# 请求路径的path
"/aipaas/cv/v1/image/cyclistsHelmetDetect",
params,
header_map,
signed_headers
)
print(sign)
# 生成的就是 Authorization 注意生成的有斜杠进行拼接
print(f"Authorization: {basic}/{signed_headers}/{sign}")
if __name__ == "__main__":
main()
import json
import hashlib
import hmac
import time
import urllib.parse
# 配置环境变量(密钥、区域等))X-APP-ID对应账号信息中的ID值,X-APP-KEY对应账号信息中的KEY值
X_APP_ID = 'yourXAppId'
X_APP_KEY = 'yourXAppKey'
# URL 和请求数据
REGION = 'QG'
EXPIRATION_IN_SECONDS = '43200' # 默认过期时间,这里配置12小时
TIMESTAMP = str(int(time.time())) # 当前时间戳
SIGNED_HEADERS = 'x-app-id' # 默认签名头
PUBLIC_NETWORK_SING_HEADER = 'teleai-cloud-auth-v1' # 公网鉴权头部,按照实际调用情况选择
EMS_SING_HEADER = 'eop-auth-v1' # 内网鉴权头部,按照实际调用情况选择
# 构建请求头
headers = {
'Content-Type': 'application/json',
'X-APP-ID': X_APP_ID,
}
class HttpAuth(object):
# 编码函数
def normalize(self, string, encoding_slash=True):
quoted_string = urllib.parse.quote(string, safe='~()*!\'/' if encoding_slash else '~()*!\'')
return quoted_string
# 生成Canonical URI
def generate_canonical_uri(self, url):
parsed_url = urllib.parse.urlparse(url)
return '/'.join(self.normalize(segment, False) for segment in parsed_url.path.split('/'))
# 生成Canonical Headers
@staticmethod
def generate_canonical_headers(input_params):
signed_headers_list = SIGNED_HEADERS.split(';')
signed_headers_set = set(signed_headers_list)
sorted_headers = sorted((k.lower(), urllib.parse.quote(v.strip(), safe='')) for k, v in input_params.items() if
k.lower() in signed_headers_set)
canonical_headers = ''.join(f"{k}:{v}" for k, v in sorted_headers)
signed_headers_str = ';'.join(sorted(signed_headers_set))
print("canonical_headers:", canonical_headers)
return canonical_headers, signed_headers_str
# 生成签名
def generate_signature(self, x_app_id, x_app_key, region, timestamp, expiration_in_seconds, method,
canonical_uri,
canonical_headers, signed_headers_str, canonical_query_string):
# 在此处选择对应的鉴权头部
signing_key_str = f"{PUBLIC_NETWORK_SING_HEADER}/{x_app_id}/{region}/{timestamp}/{expiration_in_seconds}"
signing_key = hmac.new(bytes(x_app_key, 'utf-8'), bytes(signing_key_str, 'utf-8'), hashlib.sha256).hexdigest()
# print(f"signing_key = {signing_key}")
canonical_request = f"{method.upper()}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}"
print("canonical_request", canonical_request)
signature = hmac.new(bytes(signing_key, 'utf-8'), bytes(canonical_request, 'utf-8'), hashlib.sha256).hexdigest()
authorization = f"{signing_key_str}/{signed_headers_str}/{signature}"
# print("authorization", authorization)
return authorization
# 获取Content-Length
def get_content_length(self, data):
body = json.dumps(data)
return str(len(body))
# 发送请求并获取响应
def get_auth(self, url, method, canonical_query_string):
"""
主逻辑
:param url: 完整的请求url
:param method: GET/POST,根据实际情况传入,websocket请求传GET
:param canonical_query_string: params参数,GET请求需要传入,格式为key=value,其他请求请传入''
"""
canonical_uri = self.generate_canonical_uri(url)
canonical_headers, signed_headers_str = self.generate_canonical_headers(headers)
authorization = self.generate_signature(X_APP_ID, X_APP_KEY, REGION, TIMESTAMP, EXPIRATION_IN_SECONDS,
method, canonical_uri, canonical_headers, signed_headers_str, canonical_query_string)
headers['Authorization'] = authorization
print(f"authorization = {authorization}")
return headers
getAuth = HttpAuth()