鉴权签名
鉴权签名数据通过请求头X-UPA-SIGN进行传输。
签名数据规则
METHOD|URI|TIMESTAMP|REQUEST_ID|SIGN_PARAM
请求参数为空时SIGN_PARAM为空,签名字符串以 | 结尾
签名数据规则说明
METHOD:API请求方式(GET/POST)
URI:API地址(如:/api/v1/customer/add)
TIMESTAMP:请求时间戳(毫秒)60秒请求超时。和请求头X-UPAY-TIMESTAMP一致
REQUEST_ID:请求唯一ID,和请求头X-UPA-REQUESTID一致
SIGN_PARAM:签名参数(参考签名参数说明)
签名参数说明
1. 将请求参数按KEY进行升序排序
2. 使用 key=value 形式拼接
3. 多个字段之间用 & 连接
示例:
{
"groupNo": "101680210001",
"email": "[email protected]",
"globalCode": "+1",
"phone": "127897890",
"givenName": "Demo given name",
"lastName": "Demo last name",
"company": "Demo company",
"occupation": "Demo occupation",
"position": "Demo position",
"annualIncome": 100.5,
"workYear": "1",
"address": {
"name": "Demo given name",
"countryId": 12,
"provinceId": 475,
"postalCode": "94105",
"address": "535 Mission St, Floor 12"
},
"remark": "This is sample data for adding customers"
}
排序后的数据:
{
"address": {
"address": "535 Mission St, Floor 12",
"countryId": 12,
"name": "Demo given name",
"postalCode": "94105",
"provinceId": 475
},
"annualIncome": 100.5,
"company": "Demo company",
"email": "[email protected]",
"givenName": "Demo given name",
"globalCode": "+1",
"groupNo": "101680210001",
"lastName": "Demo last name",
"occupation": "Demo occupation",
"phone": "127897890",
"position": "Demo position",
"remark": "This is sample data for adding customers",
"workYear": "1"
}
参与签名的数据:
address={"address":"535 Mission St, Floor 12","cityId":2001,"countryId":840,"name":"Alex Wilson","postalCode":"94105","provinceId":1001}&annualIncome=12.5&company=Open Commerce Inc&[email protected]&givenName=Alex&globalCode=+1&groupNo=GRP202603260001&lastName=Wilson&occupation=Software Engineer&phone=4155550123&position=Senior Engineer&remark=customer created from open api sandbox&workYear=8
数据签名方式
签名时需使用UPay提供的Secretkey,可从商户平台获取.

1. 获取签名数据
2. 使用 HMAC-SHA256 算法计算签名数据的哈希值
3. 将哈希值进行 Base64 编码,得到最终签名
鉴权签名示例
| 参数 | 示例 |
|---|---|
| SecretKey | g3)&~8P9S+ZhqO@G8b9Ate%xa5Jh-.E% |
| METHOD | POST |
| URI | /api/v1/customer/add |
| TIMESTAMP | 1774573955994 |
| REQUEST_ID | 366dde77c57a4e7e988c074a21a4f04c |
| SIGN_PARAM | address={"address":"535 Mission St, Floor 12","countryId":12,"name":"Demo given name","postalCode":"94105","provinceId":475}&annualIncome=100.5&company=Demo company&email=[email protected] &givenName=Demo given name&globalCode=+1&groupNo=101680210001&lastName=Demo last name&occupation=Demo occupation&phone=127897890&position=Demo position&remark=This is sample data for adding customers&workYear=1 |
签名原始字符
POST|/api/v1/customer/add|1774573955994|366dde77c57a4e7e988c074a21a4f04c|address={"address":"535 Mission St, Floor 12","countryId":12,"name":"Demo given name","postalCode":"94105","provinceId":475}&annualIncome=100.5&company=Demo company&[email protected]&givenName=Demo given name&globalCode=+1&groupNo=101680210001&lastName=Demo last name&occupation=Demo occupation&phone=127897890&position=Demo position&remark=This is sample data for adding customers&workYear=1
签名后数据
签名后的值通过请求头X-UPA-SIGN进行传输。
pRbX0ikM/4j/E3tBxNOO/m3Hra+4d2/r+xtRTJ6N5pk=
签名示例代码
SecretKey: 通过商户后台获取
/**
* 签名
* @param data 签名字符串
* @param secretKey 签名秘钥
* @throws AppServerException
*/
public static String generateHmacSha256(String data, String secretKey) throws Exception {
// 获取 Mac 实例
Mac mac = Mac.getInstance("HmacSHA256");
// 创建 SecretKeySpec 对象
byte[] keyBytes = secretKey.getBytes();
SecretKeySpec signingKey = new SecretKeySpec(keyBytes, "HmacSHA256");
// 初始化 Mac 实例
mac.init(signingKey);
// 执行 HMAC-SHA256
byte[] rawHmac = mac.doFinal(data.getBytes());
// 将结果转换为 Base64 字符串
return Base64.getEncoder().encodeToString(rawHmac);
}
import hmac
import hashlib
import base64
def generate_hmac_sha256(data: str, secret_key: str) -> str:
"""
生成 HMAC-SHA256 签名,返回 Base64 编码的字符串
Args:
data: 待签名的数据
secretKey: 密钥
Returns:
Base64 编码的 HMAC-SHA256 签名
"""
# 将密钥和数据转换为字节
key_bytes = secretKey.encode('utf-8')
data_bytes = data.encode('utf-8')
# 执行 HMAC-SHA256
raw_hmac = hmac.new(key_bytes, data_bytes, hashlib.sha256).digest()
# 将结果转换为 Base64 字符串
return base64.b64encode(raw_hmac).decode('utf-8')
测试网站
https://www.devglan.com/online-tools/hmac-sha256-online
数据加/解密
加密过程
加密分为两对秘钥(公钥/私钥),一对是平台密钥,一对是商户密钥
平台秘钥
-
平台公钥:商户请求API通过平台公钥将参数进行加密

-
平台私钥:UPay获取到API的参数,通过平台私钥进行解密获取原始参数
商户秘钥
- 生成公私钥命令示例:
openssl genrsa -out private.pem 2048 && openssl rsa -in private.pem -pubout -out public.pem
- 公钥示例
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2r69mrDofiNPIAKNRV5v
BeVBjEnrYPgKf8LKDK+gLhLrepuTc+gxlJurH/gYwgRwD8DL5rmdcXRZMpX+L8gx
yszzD0qFWnBoumN1RHx2kkok1vgLr/dBlJRwVPZbZT69m/B8kGWR8JboBVOWJwxn
DX6WaRDyWVa2qu1KogQq4m028gJWZlVzs0bSExQqSW5ZiUFGyJygjTa2XAKqceSe
xHov8jR66mpnv0vJy6+4YJK34t2VzyNdEqseTE3spZkSJCzzxNV7xIs63u/WTenK
J1BWar+/rA63A+/0Lt2BB/Uf/iq0OwGm4t2d0Pf1O7hylXXP65BGunWvu0QWCsNo
xQIDAQAB
-----END PUBLIC KEY-----
- 私钥示例
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDavr2asOh+I08g
Ao1FXm8F5UGMSetg+Ap/wsoMr6AuEut6m5Nz6DGUm6sf+BjCBHAPwMvmuZ1xdFky
lf4vyDHKzPMPSoVacGi6Y3VEfHaSSiTW+Auv90GUlHBU9ltlPr2b8HyQZZHwlugF
U5YnDGcNfpZpEPJZVraq7UqiBCribTbyAlZmVXOzRtITFCpJblmJQUbInKCNNrZc
Aqpx5J7Eei/yNHrqame/S8nLr7hgkrfi3ZXPI10Sqx5MTeylmRIkLPPE1XvEizre
79ZN6conUFZqv7+sDrcD7/Qu3YEH9R/+KrQ7Aabi3Z3Q9/U7uHKVdc/rkEa6da+7
RBYKw2jFAgMBAAECggEANwQQJe7mmosA5JkftNm6bK4rXUBeLeZUpat1K9mkHNJv
XUfxvw4gIjNAx+qbN3jsQloILoByo81SfdGRu6zLMSl43Fiuz39EJ9TJ8q6nF8YE
G/kI33n9iYQH+KZ5eC5ee/DxM1QIb41Uz7olIq8Q4Cj9ZXF4spWHndfOlI8dxhKg
3HLx90yW6/2D8teI2KoyDVd3FH/3dMsibAFwYbAlvfZzFuH0stOUyDOxt1CdWl/N
l4zTouGz82A+x8OPKf52E9W0iw+PnX/XOjDVpU39d21zhSmN3IjgfrW0vbbixD8F
1Xeusy9DYwNY60q2bo0xckdz8cisr59IT9iT+xfiBQKBgQDuM59iG56EljyEtwZR
YMkD3Tf2apzSl39+i6s1Sna96e+WCH1INqJofY24ZVRfcwID2jEzurrMlGtxTjM6
UwfWIhbyS+ZoMbpri13Hz3lN8lvKPiXoiJqceXSzfW0vJS7RJFUDwE1yLaNXrwko
jr2fRKnNS0wwUWptfGiPMtFVlwKBgQDrFvK8OHjuJDlwr/oYtSjPbYvoqo0zab1h
a2wfozRMNAtifh12MEJTmCaALvO6ip0k7KIVRGT6taE5eMOZVv/gkle6S/g2vC7+
uh8s2urjxxLZgoSj1nRCLtA/aby5xYxcK0Wmnnz/0e7u8j4LMzSxzr2YCDTUImCy
WO+gcajYAwKBgBsmiy41k7XtIezGp9OywnbMSkquED34wrF73gHvAOXYulRRl2YZ
xB1A4lx4QEu44ivqPN12lUAoUq7RiQlG2YfQzujDOfn6YRNNCV1zCpKV41yEBPIi
T+0x8tlanI1ZIaL5Dy+kRa+UACBIdTTIQFjdDLW3tXF0djsQiSJ5Wl71AoGBANNy
Zo1ItexsBIYIoggLGVPIkiiJDkuJ9d5jwnKKVoWb4gmKqXEeYunRVf/BO1MzTbhi
Zj5+r9yX9RU+O5/2ElupBOL5ZZ3FkPdn7JZpqQ+KhLfCnw7F2veUJ5aBwk/NETvt
Z84/iuqFpkSg+ZEVU3YCH6FY8DtFWHfRamaDFHyFAoGAIaACJa4qbSKPtK+jj8Sh
EpBAAQS9DIrOitS9+Fq7QgrMK9OvEuuO35+0E92EOb11exBgFk/3mkojseUhOHoH
qSKzafmQlW9BTV6uUtanRIXXkzV2yTipWOUMcQ9pHXJTOJ5wEEbKQN7t8NCO7KLE
2LnP0jnUNuNxPQbjkr4Vw0s=
-----END PRIVATE KEY-----
-
商户公钥
商户将生成的公钥配置到商户后台

API如果是加密请求,响应数据一定是加密的。UPay会将响应数据通过商户公钥进行加密.
-
商户私钥:商户获取到API响应数据,通过商户私钥进行解密后获取到原始的响应数据
加密算法
{
"alg": "RSA-OAEP-256",
"enc": "A128GCM",
"kid": kid,
"typ": "JOSE",
"zip": "DEF"
}
参数作用及说明
| 参数 | 算法 | 备注 |
|---|---|---|
| alg | RSA-OAEP-256 | 密钥管理算法 |
| enc | A128GCM | 加密算法 |
| typ | JOSE | 加密类型 |
| zip | DEF | 压缩类型 增快传输速度,降低传输内容大小 |
| kid | sha256(public_pem).hexdigest() | 对公钥进行 hash 指纹签名 kid生成可参考<KID生成示例代码> |
KID生成示例代码
/**
* 生成KID
* @param publicKey 商户公钥
*/
public String generateKid(RSAPublicKey publicKey) throws NoSuchAlgorithmException {
byte[] publicKeyBytes = publicKey.getEncoded();
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(publicKeyBytes);
return Base64.getUrlEncoder().withoutPadding().encodeToString(hash).toUpperCase();
}
import hashlib
import base64
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
def generate_kid(public_key):
"""
从 RSA 公钥生成 KID (Key ID)
Args:
public_key: 可以是 RSAPublicKey 对象、PEM 字符串或字节
Returns:
str: Base64URL 编码的 SHA-256 哈希值(大写)
"""
# 获取公钥的编码字节
if isinstance(public_key, str):
# 如果是 PEM 字符串
public_key_bytes = public_key.encode('utf-8')
elif isinstance(public_key, bytes):
# 如果是字节
public_key_bytes = public_key
elif hasattr(public_key, 'public_bytes'):
# 如果是 cryptography 的 PublicKey 对象
public_key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
else:
raise ValueError("Unsupported public key type")
# 计算 SHA-256 哈希
digest = hashlib.sha256(public_key_bytes)
hash_bytes = digest.digest()
# Base64URL 编码(无填充)并转换为大写
kid = base64.urlsafe_b64encode(hash_bytes).decode('utf-8').rstrip('=').upper()
return kid
加/解密示例数据
加密示例数据
- 平台公钥
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAn7okKBd9BHTV48GGFbfdJYC53nkQcvBeEGG1HFL3pA43D2n0nbpFd3lppZfMz9ZFmSYu02mtbF0IA8qbxe15ia0ffTRFK6K1jwnluiDADBtUdG+n/aPWnmBhpG6fdHehQ7LKMwq9gYMyqCV5QOZ3f/kIlQT3/NUGpKfxRzGz7vVV86ibEwnI+62F1qAX5UN1e5AX9WhsqjaI+lB9OMmG7va1xnDhGTbTou/sEGRKKYSsZrYxC2H1pzeJG4puV8C+HVWYIBnIM1UtpROw8NTfdMotShkYZqKwBrX84Z4GZqGbh+h0BnzRC7g7Yon0iYv3F9Ee9SRLcNKOrNYdqK3aHQIDAQAB -----END PUBLIC KEY----- - KID
通过商户公钥生成的KID为Y7UBL8TJQJY2YR9CDAM6UNXPKPLSM6QD4GMSA3MEPCQ - 原始请求报文
{ "groupNo": "101680210001", "email": "[email protected]", "globalCode": "+1", "phone": "127897890", "givenName": "Demo given name", "lastName": "Demo last name", "company": "Demo company", "occupation": "Demo occupation", "position": "Demo position", "annualIncome": 100.5, "workYear": "1", "address": { "name": "Demo given name", "countryId": 12, "provinceId": 475, "postalCode": "94105", "address": "535 Mission St, Floor 12" }, "remark": "This is sample data for adding customers" } - 请求报文
payload中的数据是通过平台公钥将原始请求报文加密后的数据{ "payload": "eyJ6aXAiOiJERUYiLCJraWQiOiJZN1VCTDhUSlFKWTJZUjlDREFNNlVOWFBLUExTTTZRRDRHTVNBM01FUENRIiwidHlwIjoiSk9TRSIsImVuYyI6IkExMjhHQ00iLCJhbGciOiJSU0EtT0FFUC0yNTYifQ.iThR8f9aBxnvq8PR-ta-AkD6w-5USqrCrTkISbz8ovTurgCmHasM0NmmlV5N8_Kn5qxJWCqIVd3EkmU9nkjsiO5uLSHPxL_CLmQD1gV79m1AxC41GMpHxGtiXkg-LUCwxE2TM-yLa5xKCjNJRUFPRShkObIq1rWiz_HIppYbw853AIa9PS9vt2p0LWpfthQ5UBaiCbHI5i2pIzaTAff3I8spgz2kJaZamnGEPH3v93rqpv2e2ICL0aYt6rp0wsAIe4dHR1tPwdqQKQA1VFo1VMSNhfxQmJqLZ7WsEJTh-zFO82Og2SYNp_Kop6lsSsPkoIo62p-qC9dgNU75zWe_5w.pVaqC8cS3mgKZ1z7.25Dz8A5LmYMqP_ujf20ZcVSdIf4etmd7Ws2BW6Dd1LvYigkXtEat4VrOhHyJEJEj4ksBuyQRS-uNrYy75MdZAkPSYC9xw4bpgQAgjYoZAurvX8Yy6mTlRV5IS4VcLTP3b7rsMVZOohYdWwVwl35yqGRv-5u6NRiFOKx0jyIvje0sLhae9dicJXrPe7xRaMLcKUCklpSAYQs8X6dk_ZJ7WgEkdhO_JOwyGLu7AtjCnvXRRAQtH9jM0ioXOrCad1d99bsZyuJ5ndF_54z3Dbiv4XPHKX4Z8Rm2Uhs_T84nOEXDwUP0ZUD6XLTgYj685o3qHIfLSmdgmgC-MinFvWYWFCasEIGLS8R7hv4gNPH_X3ulvRZg8kJsfpdaHRr_.lQsBYAPQAWf3AdOraThn5w" }
解密示例数据
- 原始响应数据
{ "code": 0, "data": { "payload": "eyJ6aXAiOiJERUYiLCJraWQiOiI3MzlTTFlCVUZNTVZWTFlIRktKSkIyN0EyTEJWWFhPVUpEU19FT0tXSjVVIiwidHlwIjoiSk9TRSIsImVuYyI6IkExMjhHQ00iLCJhbGciOiJSU0EtT0FFUC0yNTYifQ.ZIpTCVNg8G8IZKdvDOolnRr49oZZjO1WtFAtM3iW8yLy16KZQI8Ete_iksoHF4m8BCXcV5VIe6i2rv-8xHCFKXCk_rl3MwaM1MXPZL7Fm0hN_sZRYEbdWC5kGzIVl8t5mOnwsIIctPgvIa7ZQoLj4fNcMcXXdSpXIm2xtvxDSAeQUXnaP6pgP0-3Y0PG_ULYFDAxiQHYAp-EN5GpPIqDLdIAdyjYmdIoyEDcBrMfu0sbc1O7XZ6TMr0kK5r--CiHdsl36czPoQhDXsP1L4Bmbo0CsRNRLGcwSmZH8xD43BX4OOTubF4TTBDjx08hnE7wxjtElCxhvT2ibg7To9jVyA.zVwjezyC0K3FbJ_J.sm8AzVy7OjAhB0k3HMc.9MQzaH0aX_I4JEYzFrN1aw" }, "msg": "Success", "success": true, "ts": 1774589644 } - 商户私钥
-----BEGIN PRIVATE KEY----- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDavr2asOh+I08g Ao1FXm8F5UGMSetg+Ap/wsoMr6AuEut6m5Nz6DGUm6sf+BjCBHAPwMvmuZ1xdFky lf4vyDHKzPMPSoVacGi6Y3VEfHaSSiTW+Auv90GUlHBU9ltlPr2b8HyQZZHwlugF U5YnDGcNfpZpEPJZVraq7UqiBCribTbyAlZmVXOzRtITFCpJblmJQUbInKCNNrZc Aqpx5J7Eei/yNHrqame/S8nLr7hgkrfi3ZXPI10Sqx5MTeylmRIkLPPE1XvEizre 79ZN6conUFZqv7+sDrcD7/Qu3YEH9R/+KrQ7Aabi3Z3Q9/U7uHKVdc/rkEa6da+7 RBYKw2jFAgMBAAECggEANwQQJe7mmosA5JkftNm6bK4rXUBeLeZUpat1K9mkHNJv XUfxvw4gIjNAx+qbN3jsQloILoByo81SfdGRu6zLMSl43Fiuz39EJ9TJ8q6nF8YE G/kI33n9iYQH+KZ5eC5ee/DxM1QIb41Uz7olIq8Q4Cj9ZXF4spWHndfOlI8dxhKg 3HLx90yW6/2D8teI2KoyDVd3FH/3dMsibAFwYbAlvfZzFuH0stOUyDOxt1CdWl/N l4zTouGz82A+x8OPKf52E9W0iw+PnX/XOjDVpU39d21zhSmN3IjgfrW0vbbixD8F 1Xeusy9DYwNY60q2bo0xckdz8cisr59IT9iT+xfiBQKBgQDuM59iG56EljyEtwZR YMkD3Tf2apzSl39+i6s1Sna96e+WCH1INqJofY24ZVRfcwID2jEzurrMlGtxTjM6 UwfWIhbyS+ZoMbpri13Hz3lN8lvKPiXoiJqceXSzfW0vJS7RJFUDwE1yLaNXrwko jr2fRKnNS0wwUWptfGiPMtFVlwKBgQDrFvK8OHjuJDlwr/oYtSjPbYvoqo0zab1h a2wfozRMNAtifh12MEJTmCaALvO6ip0k7KIVRGT6taE5eMOZVv/gkle6S/g2vC7+ uh8s2urjxxLZgoSj1nRCLtA/aby5xYxcK0Wmnnz/0e7u8j4LMzSxzr2YCDTUImCy WO+gcajYAwKBgBsmiy41k7XtIezGp9OywnbMSkquED34wrF73gHvAOXYulRRl2YZ xB1A4lx4QEu44ivqPN12lUAoUq7RiQlG2YfQzujDOfn6YRNNCV1zCpKV41yEBPIi T+0x8tlanI1ZIaL5Dy+kRa+UACBIdTTIQFjdDLW3tXF0djsQiSJ5Wl71AoGBANNy Zo1ItexsBIYIoggLGVPIkiiJDkuJ9d5jwnKKVoWb4gmKqXEeYunRVf/BO1MzTbhi Zj5+r9yX9RU+O5/2ElupBOL5ZZ3FkPdn7JZpqQ+KhLfCnw7F2veUJ5aBwk/NETvt Z84/iuqFpkSg+ZEVU3YCH6FY8DtFWHfRamaDFHyFAoGAIaACJa4qbSKPtK+jj8Sh EpBAAQS9DIrOitS9+Fq7QgrMK9OvEuuO35+0E92EOb11exBgFk/3mkojseUhOHoH qSKzafmQlW9BTV6uUtanRIXXkzV2yTipWOUMcQ9pHXJTOJ5wEEbKQN7t8NCO7KLE 2LnP0jnUNuNxPQbjkr4Vw0s= -----END PRIVATE KEY----- - 将原始响应数据中的payload通商户私钥进行解密结果
101680110004
加/解密示例代码
import com.nimbusds.jose.*;
import com.nimbusds.jose.crypto.RSADecrypter;
import com.nimbusds.jose.crypto.RSAEncrypter;
import java.security.KeyFactory;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;
public class JWECrypto {
private static final JWEAlgorithm JWE_ALG = JWEAlgorithm.RSA_OAEP_256;
private static final EncryptionMethod ENC_METHOD = EncryptionMethod.A128GCM;
private RSAPublicKey publicKey;
private RSAPrivateKey privateKey;
public JWECrypto(String publicKeyPEM , String privateKeyPEM) throws Exception {
if (publicKeyPEM != null) {
this.publicKey = loadPublicKey(publicKeyPEM);
}
if (privateKeyPEM != null) {
this.privateKey = loadPrivateKey(privateKeyPEM);
}
}
/**
* 加密
*/
public String encrypt(String payload) throws Exception {
String kid = generateKid(publicKey);
JWEHeader header = new JWEHeader.Builder(JWE_ALG , ENC_METHOD)
.type(JOSEObjectType.JOSE)
.keyID(kid)
.compressionAlgorithm(CompressionAlgorithm.DEF)
.build();
JWEObject jweObject = new JWEObject(header , new Payload(payload));
RSAEncrypter rsaEncrypter = new RSAEncrypter(publicKey);
jweObject.encrypt(rsaEncrypter);
return jweObject.serialize();
}
/**
* 解密
*/
public String decrypt(String jweString) throws Exception {
JWEObject jweObject = JWEObject.parse(jweString);
RSADecrypter rsaDecrypter = new RSADecrypter(privateKey);
jweObject.decrypt(rsaDecrypter);
return jweObject.getPayload().toString();
}
private RSAPublicKey loadPublicKey(String publicKeyPEM) throws Exception {
String publicKeyPEMFormatted = publicKeyPEM.replace("-----BEGIN PUBLIC KEY-----" , "")
.replace("-----END PUBLIC KEY-----" , "")
.replaceAll("\\s" , "");
byte[] encoded = Base64.getDecoder().decode(publicKeyPEMFormatted);
KeyFactory keyFactory = KeyFactory.getInstance("RSA");
X509EncodedKeySpec keySpec = new X509EncodedKeySpec(encoded);
return (RSAPublicKey) keyFactory.generatePublic(keySpec);
}
private RSAPrivateKey loadPrivateKey(String privateKeyPEM) throws Exception {
String privateKeyPEMFormatted = privateKeyPEM.replace("-----BEGIN PRIVATE KEY-----" , "")
.replace("-----END PRIVATE KEY-----" , "")
.replaceAll("\\s" , "");
byte[] encoded = Base64.getDecoder().decode(privateKeyPEMFormatted);
KeyFactory keyFactory = KeyFactory.getInstance("RSA");
PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
return (RSAPrivateKey) keyFactory.generatePrivate(keySpec);
}
/**
* 生成KID
* @param publicKey 商户公钥
*/
public String generateKid(RSAPublicKey publicKey) throws NoSuchAlgorithmException {
byte[] publicKeyBytes = publicKey.getEncoded();
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(publicKeyBytes);
return Base64.getUrlEncoder().withoutPadding().encodeToString(hash).toUpperCase();
}
}
import base64
import hashlib
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from jwcrypto import jwe, jwk
from jwcrypto.common import json_encode
import json
class JWECrypto:
JWE_ALG = "RSA-OAEP-256"
ENC_METHOD = "A128GCM"
def __init__(self, public_key_pem=None, private_key_pem=None):
"""
初始化JWE加密解密器
Args:
public_key_pem: PEM格式的公钥字符串
private_key_pem: PEM格式的私钥字符串
"""
self.public_key = None
self.private_key = None
if public_key_pem:
self.public_key = self._load_public_key(public_key_pem)
if private_key_pem:
self.private_key = self._load_private_key(private_key_pem)
def _load_public_key(self, public_key_pem):
"""加载公钥"""
try:
# 清理PEM格式
public_key_pem = public_key_pem.strip()
if not public_key_pem.startswith('-----BEGIN PUBLIC KEY-----'):
# 如果已经是纯base64格式,添加PEM头尾
public_key_pem = self._format_pem(public_key_pem, "PUBLIC KEY")
# 加载公钥
public_key = serialization.load_pem_public_key(
public_key_pem.encode('utf-8'),
backend=default_backend()
)
return public_key
except Exception as e:
raise Exception(f"加载公钥失败: {e}")
def _load_private_key(self, private_key_pem):
"""加载私钥"""
try:
# 清理PEM格式
private_key_pem = private_key_pem.strip()
if not private_key_pem.startswith('-----BEGIN PRIVATE KEY-----'):
# 如果已经是纯base64格式,添加PEM头尾
private_key_pem = self._format_pem(private_key_pem, "PRIVATE KEY")
# 加载私钥
private_key = serialization.load_pem_private_key(
private_key_pem.encode('utf-8'),
password=None,
backend=default_backend()
)
return private_key
except Exception as e:
raise Exception(f"加载私钥失败: {e}")
def _format_pem(self, key_str, key_type):
"""格式化PEM字符串"""
# 移除所有空白字符
key_str = ''.join(key_str.split())
# 添加PEM头尾
pem = f"-----BEGIN {key_type}-----\n"
# 每64个字符换行
for i in range(0, len(key_str), 64):
pem += key_str[i:i+64] + "\n"
pem += f"-----END {key_type}-----"
return pem
def _create_jwk_from_public_key(self):
"""从公钥创建JWK对象"""
# 导出公钥为PEM格式
public_pem = self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# 创建JWK
jwk_obj = jwk.JWK.from_pem(public_pem)
return jwk_obj
def _create_jwk_from_private_key(self):
"""从私钥创建JWK对象"""
# 导出私钥为PEM格式
private_pem = self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# 创建JWK
jwk_obj = jwk.JWK.from_pem(private_pem)
return jwk_obj
def encrypt(self, payload):
"""
加密数据
Args:
payload: 待加密的字符串
Returns:
JWE字符串
"""
try:
# 生成KID
kid = self.generate_kid(self.public_key)
# 创建JWK
public_jwk = self._create_jwk_from_public_key()
# 创建JWE对象
jwe_token = jwe.JWE(
json.dumps(payload) if isinstance(payload, dict) else payload,
json_encode({
"alg": self.JWE_ALG,
"enc": self.ENC_METHOD,
"kid": kid,
"typ": "JOSE",
"zip": "DEF" # 压缩算法
})
)
# 加密
jwe_token.add_recipient(public_jwk)
encrypted = jwe_token.serialize(compact=True)
return encrypted
except Exception as e:
raise Exception(f"加密失败: {e}")
def decrypt(self, jwe_string):
"""
解密数据
Args:
jwe_string: JWE字符串
Returns:
解密后的字符串
"""
try:
# 创建JWK
private_jwk = self._create_jwk_from_private_key()
# 解析并解密JWE
jwe_token = jwe.JWE()
jwe_token.deserialize(jwe_string, key=private_jwk)
# 获取payload
payload = jwe_token.payload
return payload.decode('utf-8') if isinstance(payload, bytes) else str(payload)
except Exception as e:
raise Exception(f"解密失败: {e}")
def generate_kid(self, public_key):
"""
生成KID (Key ID)
Args:
public_key: RSA公钥对象
Returns:
KID字符串
"""
# 获取公钥的DER编码
public_der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# 计算SHA-256哈希
sha256_hash = hashlib.sha256(public_der).digest()
# Base64 URL编码(无填充)
kid = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=')
return kid.upper()
# 使用示例
if __name__ == "__main__":
# 示例密钥(请使用您自己的密钥)
# 注意:这里的密钥仅用于演示,实际使用时请使用真实的密钥对
# 生成测试密钥对
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
# 生成RSA密钥对
private_key_obj = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key_obj = private_key_obj.public_key()
# 导出为PEM格式
private_pem = private_key_obj.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
public_pem = public_key_obj.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
print("生成的公钥PEM:")
print(public_pem)
print("\n生成的私钥PEM:")
print(private_pem)
# 创建加密解密器
jwe_crypto = JWECrypto(public_pem, private_pem)
# 测试加密解密
original_payload = "Hello, this is a test message!"
print(f"\n原始数据: {original_payload}")
# 加密
encrypted = jwe_crypto.encrypt(original_payload)
print(f"加密结果: {encrypted}")
# 解密
decrypted = jwe_crypto.decrypt(encrypted)
print(f"解密结果: {decrypted}")
# 验证
assert original_payload == decrypted, "加密解密验证失败"
print("\n加密解密验证成功!")
