鉴权签名 & 数据加密

鉴权签名

鉴权签名数据通过请求头X-UPA-SIGN进行传输。

签名数据规则

METHOD|URI|TIMESTAMP|REQUEST_ID|SIGN_PARAM

请求参数为空时SIGN_PARAM为空,签名字符串以 | 结尾

签名数据规则说明

METHOD:API请求方式(GET/POST)

URI:API地址(如:/api/v1/customer/add)

TIMESTAMP:请求时间戳(毫秒)60秒请求超时。和请求头X-UPAY-TIMESTAMP一致

REQUEST_ID:请求唯一ID,和请求头X-UPA-REQUESTID一致

SIGN_PARAM:签名参数(参考签名参数说明)

签名参数说明

1. 将请求参数按KEY进行升序排序

2. 使用 key=value 形式拼接

3. 多个字段之间用 & 连接

示例:

{
  "groupNo": "101680210001",
  "email": "[email protected]",
  "globalCode": "+1",
  "phone": "127897890",
  "givenName": "Demo given name",
  "lastName": "Demo last name",
  "company": "Demo company",
  "occupation": "Demo occupation",
  "position": "Demo position",
  "annualIncome": 100.5,
  "workYear": "1",
  "address": {
    "name": "Demo given name",
    "countryId": 12,
    "provinceId": 475,
    "postalCode": "94105",
    "address": "535 Mission St, Floor 12"
  },
  "remark": "This is sample data for adding customers"
}

排序后的数据:

{
  "address": {
    "address": "535 Mission St, Floor 12",
    "countryId": 12,
    "name": "Demo given name",
    "postalCode": "94105",
    "provinceId": 475
  },
  "annualIncome": 100.5,
  "company": "Demo company",
  "email": "[email protected]",
  "givenName": "Demo given name",
  "globalCode": "+1",
  "groupNo": "101680210001",
  "lastName": "Demo last name",
  "occupation": "Demo occupation",
  "phone": "127897890",
  "position": "Demo position",
  "remark": "This is sample data for adding customers",
  "workYear": "1"
}

参与签名的数据:

address={"address":"535 Mission St, Floor 12","cityId":2001,"countryId":840,"name":"Alex Wilson","postalCode":"94105","provinceId":1001}&annualIncome=12.5&company=Open Commerce Inc&[email protected]&givenName=Alex&globalCode=+1&groupNo=GRP202603260001&lastName=Wilson&occupation=Software Engineer&phone=4155550123&position=Senior Engineer&remark=customer created from open api sandbox&workYear=8

数据签名方式

签名时需使用UPay提供的Secretkey,可从商户平台获取.

1. 获取签名数据

2. 使用 HMAC-SHA256 算法计算签名数据的哈希值

3. 将哈希值进行 Base64 编码,得到最终签名

鉴权签名示例

参数示例
SecretKeyg3)&~8P9S+ZhqO@G8b9Ate%xa5Jh-.E%
METHODPOST
URI/api/v1/customer/add
TIMESTAMP1774573955994
REQUEST_ID366dde77c57a4e7e988c074a21a4f04c
SIGN_PARAMaddress={"address":"535 Mission St, Floor 12","countryId":12,"name":"Demo given name","postalCode":"94105","provinceId":475}&annualIncome=100.5&company=Demo company&email=[email protected] &givenName=Demo given name&globalCode=+1&groupNo=101680210001&lastName=Demo last name&occupation=Demo occupation&phone=127897890&position=Demo position&remark=This is sample data for adding customers&workYear=1

签名原始字符

POST|/api/v1/customer/add|1774573955994|366dde77c57a4e7e988c074a21a4f04c|address={"address":"535 Mission St, Floor 12","countryId":12,"name":"Demo given name","postalCode":"94105","provinceId":475}&annualIncome=100.5&company=Demo company&[email protected]&givenName=Demo given name&globalCode=+1&groupNo=101680210001&lastName=Demo last name&occupation=Demo occupation&phone=127897890&position=Demo position&remark=This is sample data for adding customers&workYear=1

签名后数据

签名后的值通过请求头X-UPA-SIGN进行传输。

pRbX0ikM/4j/E3tBxNOO/m3Hra+4d2/r+xtRTJ6N5pk=

签名示例代码

SecretKey: 通过商户后台获取

/**
  * 签名
  * @param data 签名字符串
  * @param secretKey 签名秘钥
  * @throws AppServerException
*/
 public static String generateHmacSha256(String data, String secretKey) throws Exception {
        // 获取 Mac 实例
        Mac mac = Mac.getInstance("HmacSHA256");
        // 创建 SecretKeySpec 对象
        byte[] keyBytes = secretKey.getBytes();
        SecretKeySpec signingKey = new SecretKeySpec(keyBytes, "HmacSHA256");
        // 初始化 Mac 实例
        mac.init(signingKey);
        // 执行 HMAC-SHA256
        byte[] rawHmac = mac.doFinal(data.getBytes());
        // 将结果转换为 Base64 字符串
        return Base64.getEncoder().encodeToString(rawHmac);
    }
import hmac
import hashlib
import base64

def generate_hmac_sha256(data: str, secret_key: str) -> str:
    """
    生成 HMAC-SHA256 签名,返回 Base64 编码的字符串
    
    Args:
        data: 待签名的数据
        secretKey: 密钥
    
    Returns:
        Base64 编码的 HMAC-SHA256 签名
    """
    # 将密钥和数据转换为字节
    key_bytes = secretKey.encode('utf-8')
    data_bytes = data.encode('utf-8')
    
    # 执行 HMAC-SHA256
    raw_hmac = hmac.new(key_bytes, data_bytes, hashlib.sha256).digest()
    
    # 将结果转换为 Base64 字符串
    return base64.b64encode(raw_hmac).decode('utf-8')

测试网站

https://www.devglan.com/online-tools/hmac-sha256-online

数据加/解密

加密过程

加密分为两对秘钥(公钥/私钥),一对是平台密钥,一对是商户密钥

平台秘钥

  • 平台公钥:商户请求API通过平台公钥将参数进行加密

  • 平台私钥:UPay获取到API的参数,通过平台私钥进行解密获取原始参数

商户秘钥

  • 生成公私钥命令示例:
openssl genrsa -out private.pem 2048 && openssl rsa -in private.pem -pubout -out public.pem
  • 公钥示例
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2r69mrDofiNPIAKNRV5v
BeVBjEnrYPgKf8LKDK+gLhLrepuTc+gxlJurH/gYwgRwD8DL5rmdcXRZMpX+L8gx
yszzD0qFWnBoumN1RHx2kkok1vgLr/dBlJRwVPZbZT69m/B8kGWR8JboBVOWJwxn
DX6WaRDyWVa2qu1KogQq4m028gJWZlVzs0bSExQqSW5ZiUFGyJygjTa2XAKqceSe
xHov8jR66mpnv0vJy6+4YJK34t2VzyNdEqseTE3spZkSJCzzxNV7xIs63u/WTenK
J1BWar+/rA63A+/0Lt2BB/Uf/iq0OwGm4t2d0Pf1O7hylXXP65BGunWvu0QWCsNo
xQIDAQAB
-----END PUBLIC KEY-----
  • 私钥示例
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDavr2asOh+I08g
Ao1FXm8F5UGMSetg+Ap/wsoMr6AuEut6m5Nz6DGUm6sf+BjCBHAPwMvmuZ1xdFky
lf4vyDHKzPMPSoVacGi6Y3VEfHaSSiTW+Auv90GUlHBU9ltlPr2b8HyQZZHwlugF
U5YnDGcNfpZpEPJZVraq7UqiBCribTbyAlZmVXOzRtITFCpJblmJQUbInKCNNrZc
Aqpx5J7Eei/yNHrqame/S8nLr7hgkrfi3ZXPI10Sqx5MTeylmRIkLPPE1XvEizre
79ZN6conUFZqv7+sDrcD7/Qu3YEH9R/+KrQ7Aabi3Z3Q9/U7uHKVdc/rkEa6da+7
RBYKw2jFAgMBAAECggEANwQQJe7mmosA5JkftNm6bK4rXUBeLeZUpat1K9mkHNJv
XUfxvw4gIjNAx+qbN3jsQloILoByo81SfdGRu6zLMSl43Fiuz39EJ9TJ8q6nF8YE
G/kI33n9iYQH+KZ5eC5ee/DxM1QIb41Uz7olIq8Q4Cj9ZXF4spWHndfOlI8dxhKg
3HLx90yW6/2D8teI2KoyDVd3FH/3dMsibAFwYbAlvfZzFuH0stOUyDOxt1CdWl/N
l4zTouGz82A+x8OPKf52E9W0iw+PnX/XOjDVpU39d21zhSmN3IjgfrW0vbbixD8F
1Xeusy9DYwNY60q2bo0xckdz8cisr59IT9iT+xfiBQKBgQDuM59iG56EljyEtwZR
YMkD3Tf2apzSl39+i6s1Sna96e+WCH1INqJofY24ZVRfcwID2jEzurrMlGtxTjM6
UwfWIhbyS+ZoMbpri13Hz3lN8lvKPiXoiJqceXSzfW0vJS7RJFUDwE1yLaNXrwko
jr2fRKnNS0wwUWptfGiPMtFVlwKBgQDrFvK8OHjuJDlwr/oYtSjPbYvoqo0zab1h
a2wfozRMNAtifh12MEJTmCaALvO6ip0k7KIVRGT6taE5eMOZVv/gkle6S/g2vC7+
uh8s2urjxxLZgoSj1nRCLtA/aby5xYxcK0Wmnnz/0e7u8j4LMzSxzr2YCDTUImCy
WO+gcajYAwKBgBsmiy41k7XtIezGp9OywnbMSkquED34wrF73gHvAOXYulRRl2YZ
xB1A4lx4QEu44ivqPN12lUAoUq7RiQlG2YfQzujDOfn6YRNNCV1zCpKV41yEBPIi
T+0x8tlanI1ZIaL5Dy+kRa+UACBIdTTIQFjdDLW3tXF0djsQiSJ5Wl71AoGBANNy
Zo1ItexsBIYIoggLGVPIkiiJDkuJ9d5jwnKKVoWb4gmKqXEeYunRVf/BO1MzTbhi
Zj5+r9yX9RU+O5/2ElupBOL5ZZ3FkPdn7JZpqQ+KhLfCnw7F2veUJ5aBwk/NETvt
Z84/iuqFpkSg+ZEVU3YCH6FY8DtFWHfRamaDFHyFAoGAIaACJa4qbSKPtK+jj8Sh
EpBAAQS9DIrOitS9+Fq7QgrMK9OvEuuO35+0E92EOb11exBgFk/3mkojseUhOHoH
qSKzafmQlW9BTV6uUtanRIXXkzV2yTipWOUMcQ9pHXJTOJ5wEEbKQN7t8NCO7KLE
2LnP0jnUNuNxPQbjkr4Vw0s=
-----END PRIVATE KEY-----
  • 商户公钥

    商户将生成的公钥配置到商户后台

    API如果是加密请求,响应数据一定是加密的。UPay会将响应数据通过商户公钥进行加密.

  • 商户私钥:商户获取到API响应数据,通过商户私钥进行解密后获取到原始的响应数据

加密算法

{
  "alg": "RSA-OAEP-256",
  "enc": "A128GCM",
  "kid": kid,
  "typ": "JOSE",
  "zip": "DEF"
}

参数作用及说明

参数算法备注
algRSA-OAEP-256密钥管理算法
encA128GCM加密算法
typJOSE加密类型
zipDEF压缩类型
增快传输速度,降低传输内容大小
kidsha256(public_pem).hexdigest()对公钥进行 hash 指纹签名
kid生成可参考<KID生成示例代码>

KID生成示例代码

  /**
     * 生成KID
     * @param publicKey 商户公钥
     */
    public String generateKid(RSAPublicKey publicKey) throws NoSuchAlgorithmException {
        byte[] publicKeyBytes = publicKey.getEncoded();
        MessageDigest digest = MessageDigest.getInstance("SHA-256");
        byte[] hash = digest.digest(publicKeyBytes);
        return Base64.getUrlEncoder().withoutPadding().encodeToString(hash).toUpperCase();
    }
import hashlib
import base64
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa

def generate_kid(public_key):
    """
    从 RSA 公钥生成 KID (Key ID)
    
    Args:
        public_key: 可以是 RSAPublicKey 对象、PEM 字符串或字节
        
    Returns:
        str: Base64URL 编码的 SHA-256 哈希值(大写)
    """
    # 获取公钥的编码字节
    if isinstance(public_key, str):
        # 如果是 PEM 字符串
        public_key_bytes = public_key.encode('utf-8')
    elif isinstance(public_key, bytes):
        # 如果是字节
        public_key_bytes = public_key
    elif hasattr(public_key, 'public_bytes'):
        # 如果是 cryptography 的 PublicKey 对象
        public_key_bytes = public_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
    else:
        raise ValueError("Unsupported public key type")
    
    # 计算 SHA-256 哈希
    digest = hashlib.sha256(public_key_bytes)
    hash_bytes = digest.digest()
    
    # Base64URL 编码(无填充)并转换为大写
    kid = base64.urlsafe_b64encode(hash_bytes).decode('utf-8').rstrip('=').upper()
    
    return kid

加/解密示例数据

加密示例数据

  • 平台公钥
    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAn7okKBd9BHTV48GGFbfdJYC53nkQcvBeEGG1HFL3pA43D2n0nbpFd3lppZfMz9ZFmSYu02mtbF0IA8qbxe15ia0ffTRFK6K1jwnluiDADBtUdG+n/aPWnmBhpG6fdHehQ7LKMwq9gYMyqCV5QOZ3f/kIlQT3/NUGpKfxRzGz7vVV86ibEwnI+62F1qAX5UN1e5AX9WhsqjaI+lB9OMmG7va1xnDhGTbTou/sEGRKKYSsZrYxC2H1pzeJG4puV8C+HVWYIBnIM1UtpROw8NTfdMotShkYZqKwBrX84Z4GZqGbh+h0BnzRC7g7Yon0iYv3F9Ee9SRLcNKOrNYdqK3aHQIDAQAB
    -----END PUBLIC KEY-----
    
  • KID
    通过商户公钥生成的KID为
    Y7UBL8TJQJY2YR9CDAM6UNXPKPLSM6QD4GMSA3MEPCQ
    
  • 原始请求报文
    {
      "groupNo": "101680210001",
      "email": "[email protected]",
      "globalCode": "+1",
      "phone": "127897890",
      "givenName": "Demo given name",
      "lastName": "Demo last name",
      "company": "Demo company",
      "occupation": "Demo occupation",
      "position": "Demo position",
      "annualIncome": 100.5,
      "workYear": "1",
      "address": {
        "name": "Demo given name",
        "countryId": 12,
        "provinceId": 475,
        "postalCode": "94105",
        "address": "535 Mission St, Floor 12"
      },
      "remark": "This is sample data for adding customers"
    }
    
  • 请求报文
    payload中的数据是通过平台公钥将原始请求报文加密后的数据
    {
        "payload": "eyJ6aXAiOiJERUYiLCJraWQiOiJZN1VCTDhUSlFKWTJZUjlDREFNNlVOWFBLUExTTTZRRDRHTVNBM01FUENRIiwidHlwIjoiSk9TRSIsImVuYyI6IkExMjhHQ00iLCJhbGciOiJSU0EtT0FFUC0yNTYifQ.iThR8f9aBxnvq8PR-ta-AkD6w-5USqrCrTkISbz8ovTurgCmHasM0NmmlV5N8_Kn5qxJWCqIVd3EkmU9nkjsiO5uLSHPxL_CLmQD1gV79m1AxC41GMpHxGtiXkg-LUCwxE2TM-yLa5xKCjNJRUFPRShkObIq1rWiz_HIppYbw853AIa9PS9vt2p0LWpfthQ5UBaiCbHI5i2pIzaTAff3I8spgz2kJaZamnGEPH3v93rqpv2e2ICL0aYt6rp0wsAIe4dHR1tPwdqQKQA1VFo1VMSNhfxQmJqLZ7WsEJTh-zFO82Og2SYNp_Kop6lsSsPkoIo62p-qC9dgNU75zWe_5w.pVaqC8cS3mgKZ1z7.25Dz8A5LmYMqP_ujf20ZcVSdIf4etmd7Ws2BW6Dd1LvYigkXtEat4VrOhHyJEJEj4ksBuyQRS-uNrYy75MdZAkPSYC9xw4bpgQAgjYoZAurvX8Yy6mTlRV5IS4VcLTP3b7rsMVZOohYdWwVwl35yqGRv-5u6NRiFOKx0jyIvje0sLhae9dicJXrPe7xRaMLcKUCklpSAYQs8X6dk_ZJ7WgEkdhO_JOwyGLu7AtjCnvXRRAQtH9jM0ioXOrCad1d99bsZyuJ5ndF_54z3Dbiv4XPHKX4Z8Rm2Uhs_T84nOEXDwUP0ZUD6XLTgYj685o3qHIfLSmdgmgC-MinFvWYWFCasEIGLS8R7hv4gNPH_X3ulvRZg8kJsfpdaHRr_.lQsBYAPQAWf3AdOraThn5w"
    }
    

解密示例数据

  • 原始响应数据
    {
        "code": 0,
        "data": {
            "payload": "eyJ6aXAiOiJERUYiLCJraWQiOiI3MzlTTFlCVUZNTVZWTFlIRktKSkIyN0EyTEJWWFhPVUpEU19FT0tXSjVVIiwidHlwIjoiSk9TRSIsImVuYyI6IkExMjhHQ00iLCJhbGciOiJSU0EtT0FFUC0yNTYifQ.ZIpTCVNg8G8IZKdvDOolnRr49oZZjO1WtFAtM3iW8yLy16KZQI8Ete_iksoHF4m8BCXcV5VIe6i2rv-8xHCFKXCk_rl3MwaM1MXPZL7Fm0hN_sZRYEbdWC5kGzIVl8t5mOnwsIIctPgvIa7ZQoLj4fNcMcXXdSpXIm2xtvxDSAeQUXnaP6pgP0-3Y0PG_ULYFDAxiQHYAp-EN5GpPIqDLdIAdyjYmdIoyEDcBrMfu0sbc1O7XZ6TMr0kK5r--CiHdsl36czPoQhDXsP1L4Bmbo0CsRNRLGcwSmZH8xD43BX4OOTubF4TTBDjx08hnE7wxjtElCxhvT2ibg7To9jVyA.zVwjezyC0K3FbJ_J.sm8AzVy7OjAhB0k3HMc.9MQzaH0aX_I4JEYzFrN1aw"
        },
        "msg": "Success",
        "success": true,
        "ts": 1774589644
    }
    
  • 商户私钥
    -----BEGIN PRIVATE KEY-----
    MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDavr2asOh+I08g
    Ao1FXm8F5UGMSetg+Ap/wsoMr6AuEut6m5Nz6DGUm6sf+BjCBHAPwMvmuZ1xdFky
    lf4vyDHKzPMPSoVacGi6Y3VEfHaSSiTW+Auv90GUlHBU9ltlPr2b8HyQZZHwlugF
    U5YnDGcNfpZpEPJZVraq7UqiBCribTbyAlZmVXOzRtITFCpJblmJQUbInKCNNrZc
    Aqpx5J7Eei/yNHrqame/S8nLr7hgkrfi3ZXPI10Sqx5MTeylmRIkLPPE1XvEizre
    79ZN6conUFZqv7+sDrcD7/Qu3YEH9R/+KrQ7Aabi3Z3Q9/U7uHKVdc/rkEa6da+7
    RBYKw2jFAgMBAAECggEANwQQJe7mmosA5JkftNm6bK4rXUBeLeZUpat1K9mkHNJv
    XUfxvw4gIjNAx+qbN3jsQloILoByo81SfdGRu6zLMSl43Fiuz39EJ9TJ8q6nF8YE
    G/kI33n9iYQH+KZ5eC5ee/DxM1QIb41Uz7olIq8Q4Cj9ZXF4spWHndfOlI8dxhKg
    3HLx90yW6/2D8teI2KoyDVd3FH/3dMsibAFwYbAlvfZzFuH0stOUyDOxt1CdWl/N
    l4zTouGz82A+x8OPKf52E9W0iw+PnX/XOjDVpU39d21zhSmN3IjgfrW0vbbixD8F
    1Xeusy9DYwNY60q2bo0xckdz8cisr59IT9iT+xfiBQKBgQDuM59iG56EljyEtwZR
    YMkD3Tf2apzSl39+i6s1Sna96e+WCH1INqJofY24ZVRfcwID2jEzurrMlGtxTjM6
    UwfWIhbyS+ZoMbpri13Hz3lN8lvKPiXoiJqceXSzfW0vJS7RJFUDwE1yLaNXrwko
    jr2fRKnNS0wwUWptfGiPMtFVlwKBgQDrFvK8OHjuJDlwr/oYtSjPbYvoqo0zab1h
    a2wfozRMNAtifh12MEJTmCaALvO6ip0k7KIVRGT6taE5eMOZVv/gkle6S/g2vC7+
    uh8s2urjxxLZgoSj1nRCLtA/aby5xYxcK0Wmnnz/0e7u8j4LMzSxzr2YCDTUImCy
    WO+gcajYAwKBgBsmiy41k7XtIezGp9OywnbMSkquED34wrF73gHvAOXYulRRl2YZ
    xB1A4lx4QEu44ivqPN12lUAoUq7RiQlG2YfQzujDOfn6YRNNCV1zCpKV41yEBPIi
    T+0x8tlanI1ZIaL5Dy+kRa+UACBIdTTIQFjdDLW3tXF0djsQiSJ5Wl71AoGBANNy
    Zo1ItexsBIYIoggLGVPIkiiJDkuJ9d5jwnKKVoWb4gmKqXEeYunRVf/BO1MzTbhi
    Zj5+r9yX9RU+O5/2ElupBOL5ZZ3FkPdn7JZpqQ+KhLfCnw7F2veUJ5aBwk/NETvt
    Z84/iuqFpkSg+ZEVU3YCH6FY8DtFWHfRamaDFHyFAoGAIaACJa4qbSKPtK+jj8Sh
    EpBAAQS9DIrOitS9+Fq7QgrMK9OvEuuO35+0E92EOb11exBgFk/3mkojseUhOHoH
    qSKzafmQlW9BTV6uUtanRIXXkzV2yTipWOUMcQ9pHXJTOJ5wEEbKQN7t8NCO7KLE
    2LnP0jnUNuNxPQbjkr4Vw0s=
    -----END PRIVATE KEY-----
    
  • 将原始响应数据中的payload通商户私钥进行解密结果
    101680110004
    

加/解密示例代码

import com.nimbusds.jose.*;
import com.nimbusds.jose.crypto.RSADecrypter;
import com.nimbusds.jose.crypto.RSAEncrypter;

import java.security.KeyFactory;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.util.Base64;

public class JWECrypto {

    private static final JWEAlgorithm JWE_ALG = JWEAlgorithm.RSA_OAEP_256;
    private static final EncryptionMethod ENC_METHOD = EncryptionMethod.A128GCM;

    private RSAPublicKey publicKey;
    private RSAPrivateKey privateKey;

    public JWECrypto(String publicKeyPEM , String privateKeyPEM) throws Exception {
        if (publicKeyPEM != null) {
            this.publicKey = loadPublicKey(publicKeyPEM);
        }
        if (privateKeyPEM != null) {
            this.privateKey = loadPrivateKey(privateKeyPEM);
        }
    }

    /**
     * 加密
     */
    public String encrypt(String payload) throws Exception {
        String kid = generateKid(publicKey);
        JWEHeader header = new JWEHeader.Builder(JWE_ALG , ENC_METHOD)
                .type(JOSEObjectType.JOSE)
                .keyID(kid)
                .compressionAlgorithm(CompressionAlgorithm.DEF)
                .build();
        JWEObject jweObject = new JWEObject(header , new Payload(payload));
        RSAEncrypter rsaEncrypter = new RSAEncrypter(publicKey);
        jweObject.encrypt(rsaEncrypter);
        return jweObject.serialize();
    }

    /**
     * 解密
     */
    public String decrypt(String jweString) throws Exception {
        JWEObject jweObject = JWEObject.parse(jweString);
        RSADecrypter rsaDecrypter = new RSADecrypter(privateKey);
        jweObject.decrypt(rsaDecrypter);
        return jweObject.getPayload().toString();
    }

    private RSAPublicKey loadPublicKey(String publicKeyPEM) throws Exception {
        String publicKeyPEMFormatted = publicKeyPEM.replace("-----BEGIN PUBLIC KEY-----" , "")
                .replace("-----END PUBLIC KEY-----" , "")
                .replaceAll("\\s" , "");

        byte[] encoded = Base64.getDecoder().decode(publicKeyPEMFormatted);
        KeyFactory keyFactory = KeyFactory.getInstance("RSA");
        X509EncodedKeySpec keySpec = new X509EncodedKeySpec(encoded);
        return (RSAPublicKey) keyFactory.generatePublic(keySpec);
    }

    private RSAPrivateKey loadPrivateKey(String privateKeyPEM) throws Exception {
        String privateKeyPEMFormatted = privateKeyPEM.replace("-----BEGIN PRIVATE KEY-----" , "")
                .replace("-----END PRIVATE KEY-----" , "")
                .replaceAll("\\s" , "");

        byte[] encoded = Base64.getDecoder().decode(privateKeyPEMFormatted);
        KeyFactory keyFactory = KeyFactory.getInstance("RSA");
        PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
        return (RSAPrivateKey) keyFactory.generatePrivate(keySpec);
    }

    /**
     * 生成KID
     * @param publicKey 商户公钥
     */
    public String generateKid(RSAPublicKey publicKey) throws NoSuchAlgorithmException {
        byte[] publicKeyBytes = publicKey.getEncoded();
        MessageDigest digest = MessageDigest.getInstance("SHA-256");
        byte[] hash = digest.digest(publicKeyBytes);
        return Base64.getUrlEncoder().withoutPadding().encodeToString(hash).toUpperCase();
    }
     
}

import base64
import hashlib
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from jwcrypto import jwe, jwk
from jwcrypto.common import json_encode
import json


class JWECrypto:
    JWE_ALG = "RSA-OAEP-256"
    ENC_METHOD = "A128GCM"

    def __init__(self, public_key_pem=None, private_key_pem=None):
        """
        初始化JWE加密解密器
        
        Args:
            public_key_pem: PEM格式的公钥字符串
            private_key_pem: PEM格式的私钥字符串
        """
        self.public_key = None
        self.private_key = None
        
        if public_key_pem:
            self.public_key = self._load_public_key(public_key_pem)
        if private_key_pem:
            self.private_key = self._load_private_key(private_key_pem)

    def _load_public_key(self, public_key_pem):
        """加载公钥"""
        try:
            # 清理PEM格式
            public_key_pem = public_key_pem.strip()
            if not public_key_pem.startswith('-----BEGIN PUBLIC KEY-----'):
                # 如果已经是纯base64格式,添加PEM头尾
                public_key_pem = self._format_pem(public_key_pem, "PUBLIC KEY")
            
            # 加载公钥
            public_key = serialization.load_pem_public_key(
                public_key_pem.encode('utf-8'),
                backend=default_backend()
            )
            return public_key
        except Exception as e:
            raise Exception(f"加载公钥失败: {e}")

    def _load_private_key(self, private_key_pem):
        """加载私钥"""
        try:
            # 清理PEM格式
            private_key_pem = private_key_pem.strip()
            if not private_key_pem.startswith('-----BEGIN PRIVATE KEY-----'):
                # 如果已经是纯base64格式,添加PEM头尾
                private_key_pem = self._format_pem(private_key_pem, "PRIVATE KEY")
            
            # 加载私钥
            private_key = serialization.load_pem_private_key(
                private_key_pem.encode('utf-8'),
                password=None,
                backend=default_backend()
            )
            return private_key
        except Exception as e:
            raise Exception(f"加载私钥失败: {e}")

    def _format_pem(self, key_str, key_type):
        """格式化PEM字符串"""
        # 移除所有空白字符
        key_str = ''.join(key_str.split())
        # 添加PEM头尾
        pem = f"-----BEGIN {key_type}-----\n"
        # 每64个字符换行
        for i in range(0, len(key_str), 64):
            pem += key_str[i:i+64] + "\n"
        pem += f"-----END {key_type}-----"
        return pem

    def _create_jwk_from_public_key(self):
        """从公钥创建JWK对象"""
        # 导出公钥为PEM格式
        public_pem = self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        
        # 创建JWK
        jwk_obj = jwk.JWK.from_pem(public_pem)
        return jwk_obj

    def _create_jwk_from_private_key(self):
        """从私钥创建JWK对象"""
        # 导出私钥为PEM格式
        private_pem = self.private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
        
        # 创建JWK
        jwk_obj = jwk.JWK.from_pem(private_pem)
        return jwk_obj

    def encrypt(self, payload):
        """
        加密数据
        
        Args:
            payload: 待加密的字符串
            
        Returns:
            JWE字符串
        """
        try:
            # 生成KID
            kid = self.generate_kid(self.public_key)
            
            # 创建JWK
            public_jwk = self._create_jwk_from_public_key()
            
            # 创建JWE对象
            jwe_token = jwe.JWE(
                json.dumps(payload) if isinstance(payload, dict) else payload,
                json_encode({
                    "alg": self.JWE_ALG,
                    "enc": self.ENC_METHOD,
                    "kid": kid,
                    "typ": "JOSE",
                    "zip": "DEF"  # 压缩算法
                })
            )
            
            # 加密
            jwe_token.add_recipient(public_jwk)
            encrypted = jwe_token.serialize(compact=True)
            
            return encrypted
            
        except Exception as e:
            raise Exception(f"加密失败: {e}")

    def decrypt(self, jwe_string):
        """
        解密数据
        
        Args:
            jwe_string: JWE字符串
            
        Returns:
            解密后的字符串
        """
        try:
            # 创建JWK
            private_jwk = self._create_jwk_from_private_key()
            
            # 解析并解密JWE
            jwe_token = jwe.JWE()
            jwe_token.deserialize(jwe_string, key=private_jwk)
            
            # 获取payload
            payload = jwe_token.payload
            return payload.decode('utf-8') if isinstance(payload, bytes) else str(payload)
            
        except Exception as e:
            raise Exception(f"解密失败: {e}")

    def generate_kid(self, public_key):
        """
        生成KID (Key ID)
        
        Args:
            public_key: RSA公钥对象
            
        Returns:
            KID字符串
        """
        # 获取公钥的DER编码
        public_der = public_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        
        # 计算SHA-256哈希
        sha256_hash = hashlib.sha256(public_der).digest()
        
        # Base64 URL编码(无填充)
        kid = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=')
        
        return kid.upper()


# 使用示例
if __name__ == "__main__":
    # 示例密钥(请使用您自己的密钥)
    # 注意:这里的密钥仅用于演示,实际使用时请使用真实的密钥对
    
    # 生成测试密钥对
    from cryptography.hazmat.primitives.asymmetric import rsa
    from cryptography.hazmat.primitives import serialization
    
    # 生成RSA密钥对
    private_key_obj = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    public_key_obj = private_key_obj.public_key()
    
    # 导出为PEM格式
    private_pem = private_key_obj.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    ).decode('utf-8')
    
    public_pem = public_key_obj.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    ).decode('utf-8')
    
    print("生成的公钥PEM:")
    print(public_pem)
    print("\n生成的私钥PEM:")
    print(private_pem)
    
    # 创建加密解密器
    jwe_crypto = JWECrypto(public_pem, private_pem)
    
    # 测试加密解密
    original_payload = "Hello, this is a test message!"
    print(f"\n原始数据: {original_payload}")
    
    # 加密
    encrypted = jwe_crypto.encrypt(original_payload)
    print(f"加密结果: {encrypted}")
    
    # 解密
    decrypted = jwe_crypto.decrypt(encrypted)
    print(f"解密结果: {decrypted}")
    
    # 验证
    assert original_payload == decrypted, "加密解密验证失败"
    print("\n加密解密验证成功!")

测试网站

https://dinochiesa.github.io/jwt/