rsa加解密实现

RSA是一种非对称加密,是由一对密钥来进行加解密的过程,分别称为公钥和私钥。通常个人保存私钥,公钥是公开的。在不直接传递密钥的情况下,完成解密,能够确保信息的安全性,避免了直接传递密钥所造成的被破解的风险

目前项目做漏洞扫描,如果你用了是之前的填充方式,那么一定会扫描出来漏洞安全风险,并会建议你使用oaep填充方式。

什么是OAEP

RSA OAEPRSA加密机制的一种方案,全称是Optimal Asymmetric Encryption Padding(最优非对称加密填充),其安全性更高

python实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import base64

# pip install pycryptodomex
from Crypto.Cipher import PKCS1_OAEP
from Cryptodome.PublicKey import RSA
from django.conf import settings
from django.core.cache import caches


class RsaClient:
"""RSA 加密解密"""

def __init__(self, pub=None, pri=None):
"""
私钥生成:openssl genrsa -out private.pem 2048
公钥生成:openssl rsa -in private.pem -pubout -out public.pem
"""
self.pub = pub
self.pri = pri

def pub_key(self):
"""获取公钥"""
pub = self.pub
if not pub:
raise Exception("获取公钥失败")
return RSA.import_key(pub)

def pri_key(self):
"""获取私钥"""
pri = self.pri
# pri = getattr(self.key_vault_client.get_secret("Staging-RSA256-PRI-CA173-pem"), "value", None)
if not pri:
raise Exception("获取私钥失败")
return RSA.import_key(pri)

@property
def get_pub_cipher(self):
"""创建 公钥 密码器"""
return PKCS1_OAEP.new(self.pub_key())

@property
def get_pri_cipher(self):
"""创建 私钥 密码器"""
return PKCS1_OAEP.new(self.pri_key())

def encrypt(self, param: str) -> str:
"""
加密
@param param: 待加密字符串
@return: 加密结果
"""
# 加密内容(bytes), 返回加密后的密文(bytes类型)
ciphertext = self.get_pub_cipher.encrypt(param.encode())
# 方便api传输 进行bs64处理
return str(base64.b64encode(ciphertext), encoding='utf-8')

def decrypt(self, bs64_ciphertext):
"""解密"""
# 解密密文(bytes), 返回解密后的明文(bytes类型)
plaintext = self.get_pri_cipher.decrypt(base64.b64decode(bs64_ciphertext))
# 输出解密后的明文

return plaintext.decode()


def get_pri_key():
"""获取私钥"""
pri_body = caches["user_auth"].get("iep:token:pri")
# 判断缓存中是否存在私钥
if pri_body is None:
with open(settings.PRIVATE_KEY_PATH, 'r') as pri:
pri_body = pri.read()
caches["user_auth"].set("iep:token:pri", pri_body)
return pri_body


if __name__ == '__main__':
pub_body = ""
pri_body = ""
with open("../cert/public.pem") as pub, open("../cert/private.pem") as pri:
pub_body += pub.read()
pri_body += pri.read()

print(pub_body)
# print(pri_body)

print("长度:", len(pub_body))
print("私钥长度:", len(pri_body))

test = RsaClient(pub=pub_body, pri=pri_body)

# a = test.encrypt('{"app_id":"xxxxxxxxxxxx", "app_secret":"xxxxxxxxxxx","timestamp":1698891718441}')
a = test.encrypt('{"app_id":"123456789", "app_secret":"abcdefg%*sdfsdfsfd","timestamp":1698893284836}')
#
print(a)
# #
# res = test.decrypt(a)
# print(res)
#
# c = test.decrypt('PyiN+iWp3Fvlwr9cffnjRwjhJgnjBFX4PIGh0V+9QZHMm4NTSnscqWuDkBcjU2UUqzNm9TEjRwbVfspYXBGn2Pr8jhyae8/NF8b+99XIB+GmgIyWpe/rSpfJHorgGSKzve88JLxeDGB6tEmEAmCISnMwYsyk2rnAZH1ohtAy/1ch/bx/2qdGZC2dAaOrRlafv4oMMTN2Rq/+0ykIM9bCcuaz5scNZ/Ap+Psk296BBLutZqsq0dc/52MKf93Ut11HpggGycSDuPgxZDrDqI00PY5UxnymAlsm/eZUrHZCtJn56PXNeR/sABPlaYxV+jB4pmdmG35QtITyksNKV1uQ4Q==')
#
#
# print('ccc:', c)

d = test.decrypt(
'cXeyOelc+5GKj36oycqD3N2gGVcbS8EEga0kklVa3cv0q4UUi0swrKhcT/Xexgg2vFFkE+x16FX3MztKSJSuhAD/ONEwJz9eN4iUmSiNMZ/1osPDsqUvgx9eJH+nV0GBm17Ep06qJVnlsj2c5I+lLeqri33nGek+rBcaXjCGNtrEsV2jqJXkH/PNygR1IRNIj2rHvaJ3UhRE/M3gnnc5bXpbDry3PSYU7UblXZUBjhe+fzuDR8D3px1S4VKA+GVmw0lXoNpAZt4Pt3XSEFaMyk1yFTAgPLxyVpNKwFiEMjnCZwhyRoPYo8lBJJQiV0Uy7fr+ze8EddmlWO+5wSmIWw==')
print('ddd:', d)

# e = test.encrypt('{"app_id":"123456789", "app_secret":"abcdefg%*sdfsdfsfd","timestamp":1698893284836}')
# print('eee:', e)
#
# f = test.decrypt(e)
# print('fff:', f)

# test = RsaClient()
# print("*****************************")
# us = test.encrypt("")
# print(f'u = {us}')
# ps = test.encrypt("xxx")
# print(f'p = {ps}')
# # print("*****************************")
# print(f"den:{test.decrypt(us)}")
# print(f"den:{test.decrypt(ps)}")
# pub_a = test.encrypt("hello world")
# print(f"pub_a={pub_a}")
# plain_text = test.decrypt(pub_a)
# print(f"plain_text={plain_text}")
pass

go实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
@Author : Uyynot
@Email : uyynot@qq.com
@Time : 2023/11/20 10:07
@File : test.go
@Project : test
@Desc :
*/
package main

import (
"crypto/rand"
"crypto/rsa"
"crypto/sha1"
"crypto/x509"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
"os"
)

// 解析公钥
func ReadParsePublicKey(filename string) (*rsa.PublicKey, error) {
//获取公钥字节
publicKeyBytes, err := os.ReadFile(filename)
//fmt.Println("publicKeyBytes:", publicKeyBytes)
//fmt.Println("err:", err)
if err != nil {
return nil, err
}

// 2、解码公钥字节,生成加密块对象
block, _ := pem.Decode(publicKeyBytes)
//fmt.Println("block:", block)
//fmt.Println("rest:", rest)
if block == nil {
return nil, errors.New("公钥信息有误")
}

//3、解析DER编码的公钥,生成公钥接口
publicKeyInterface, err := x509.ParsePKIXPublicKey(block.Bytes)
//fmt.Println("publicKeyInterface:", publicKeyInterface)
//fmt.Println("err:", err)
if err != nil {
return nil, err
}

// 4、公钥接口转型成公钥对象
publicKey := publicKeyInterface.(*rsa.PublicKey)
//fmt.Println("publicKey:", publicKey)
return publicKey, nil
}

// 读取私钥文件,解析出私钥对象
func ReadParsePrivaterKey(filename string) (*rsa.PrivateKey, error) {
// 1、读取私钥文件,获取私钥字节
privateKeyBytes, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
// 2、对私钥文件进行编码,生成加密块对象
block, _ := pem.Decode(privateKeyBytes)
if block == nil {
return nil, errors.New("私钥信息错误!")
}
// 3、解析DER编码的私钥,生成私钥对象
privateKey, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return privateKey, nil
}

func main() {
//abced := base64.StdEncoding.EncodeToString([]byte("abcdefg"))
//fmt.Println("abced:", abced)

pub_key, err := ReadParsePublicKey("public.pem")
pri_key, err := ReadParsePrivaterKey("private.pem")
fmt.Println("err:", err)
fmt.Println("pub_key:", pub_key)
fmt.Println("pri_key:", pri_key)
// 要加密的明文
//plainText := []byte("abcdh1234中国")
plainText := []byte("{\"app_id\":\"123456789\", \"app_secret\":\"abcdefg%*sdfsdfsfd\",\"timestamp\":1698893284836}")

// 使用公钥加密明文
cipherText, err := rsa.EncryptOAEP(sha1.New(), rand.Reader, pub_key, plainText, nil)
if err != nil {
panic(err)
}
fmt.Printf("Cipher text: %x\n", cipherText)
b64CipherTest := base64.StdEncoding.EncodeToString(cipherText)
fmt.Println("b64CipherTest:", b64CipherTest)

// 使用私钥解密密文
decryptedPlainText, err := rsa.DecryptOAEP(sha1.New(), rand.Reader, pri_key, cipherText, nil)
if err != nil {
panic(err)
}
fmt.Printf("Decrypted plain text: %s\n", decryptedPlainText)

//// 生成一对RSA公私钥
//privKey, err := rsa.GenerateKey(rand.Reader, 2048)
//if err != nil {
// panic(err)
//}
//// 要加密的明文
//plainText := []byte("Hello RSA!")
//// 使用公钥加密明文
//cipherText, err := rsa.EncryptOAEP(sha256.New(), rand.Reader, &privKey.PublicKey, plainText, nil)
//if err != nil {
// panic(err)
//}
//fmt.Printf("Cipher text: %x\n", cipherText)
//// 使用私钥解密密文
//decryptedPlainText, err := rsa.DecryptOAEP(sha256.New(), rand.Reader, privKey, cipherText, nil)
//if err != nil {
// panic(err)
//}
//fmt.Printf("Decrypted plain text: %s\n", decryptedPlainText)
//// 签名消息
//hashed := sha256.Sum256([]byte("message to be signed"))
//signature, err := rsa.SignPKCS1v15(rand.Reader, privKey, crypto.SHA256, hashed[:])
//if err != nil {
// panic(err)
//}
//fmt.Printf("Signature: %x\n", signature)
//// 验证签名
//err = rsa.VerifyPKCS1v15(&privKey.PublicKey, crypto.SHA256, hashed[:], signature)
//if err != nil {
// panic(err)
//}

}