Created
April 23, 2019 09:24
-
-
Save rsj217/7add586c76f7c33c58071332a6b928f9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf8 | |
import base64 | |
import io | |
import hashlib | |
from M2Crypto import RSA | |
class RsaClient(object): | |
def __init__(self, pub_path, pri_path): | |
self.pri_key = RSA.load_key(pri_path) | |
self.pub_key = RSA.load_pub_key(pub_path) | |
def private_chunk_decrypt(self, crypto, chunk, need_base64=True): | |
if need_base64: | |
crypto = base64.b64decode(crypto) | |
output = io.BytesIO() | |
mb = io.BytesIO(crypto) | |
for out in iter(lambda: mb.read(chunk), b''): | |
output.write(self.pri_key.private_decrypt(out, RSA.pkcs1_padding)) | |
mb.close() | |
return output.getvalue() | |
def private_chunk_encrypt(self, message, chunk, need_base64=True): | |
output = io.BytesIO() | |
mb = io.BytesIO(message) | |
message_len = len(message) | |
# while message_len > 0: | |
# out = mb.read(chunk) | |
# message_len -= chunk | |
# output.write(self.pri_key.private_encrypt(out, RSA.pkcs1_padding)) | |
for out in iter(lambda: mb.read(chunk), b''): | |
output.write(self.pri_key.private_encrypt(out, RSA.pkcs1_padding)) | |
mb.close() | |
crypto = output.getvalue() | |
if need_base64: | |
crypto = base64.b64encode(crypto) | |
return crypto | |
def public_chunk_decrypt(self, crypto, chunk, need_base64=True): | |
if need_base64: | |
crypto = base64.b64decode(crypto) | |
output = io.BytesIO() | |
mb = io.BytesIO(crypto) | |
for out in iter(lambda: mb.read(chunk), b''): | |
output.write(self.pub_key.public_decrypt(out, RSA.pkcs1_padding)) | |
mb.close() | |
return output.getvalue() | |
def public_chunk_encrypt(self, message, chunk, need_base64=True): | |
output = io.BytesIO() | |
mb = io.BytesIO(message) | |
for out in iter(lambda: mb.read(chunk), b''): | |
output.write(self.pub_key.public_encrypt(out, RSA.pkcs1_padding)) | |
mb.close() | |
crypto = output.getvalue() | |
if need_base64: | |
crypto = base64.b64encode(crypto) | |
return crypto | |
def __stringfy(self, args): | |
args_str = ''.join([str(arg) for arg in args if arg is not None]) | |
return hashlib.sha1(args_str) | |
rsa_pusher = RsaClient('./push_pub.pem', './push_pri.pem') | |
if __name__ == "__main__": | |
message = "需要加密的字符" | |
data = rsa_pusher.private_chunk_encrypt(message.encode('utf-8'), 117) | |
print(data) | |
print(rsa_pusher.public_chunk_decrypt(data, 128)) | |
data = rsa_pusher.public_chunk_encrypt(message.encode('utf-8'), 117) | |
print(data) | |
print(rsa_pusher.private_chunk_decrypt(data, 128)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment