Skip to content

Instantly share code, notes, and snippets.

@rsj217
Created April 23, 2019 09:24
Show Gist options
  • Save rsj217/7add586c76f7c33c58071332a6b928f9 to your computer and use it in GitHub Desktop.
Save rsj217/7add586c76f7c33c58071332a6b928f9 to your computer and use it in GitHub Desktop.
# coding=utf8
import base64
import io
import hashlib
from M2Crypto import RSA
class RsaClient(object):
def __init__(self, pub_path, pri_path):
self.pri_key = RSA.load_key(pri_path)
self.pub_key = RSA.load_pub_key(pub_path)
def private_chunk_decrypt(self, crypto, chunk, need_base64=True):
if need_base64:
crypto = base64.b64decode(crypto)
output = io.BytesIO()
mb = io.BytesIO(crypto)
for out in iter(lambda: mb.read(chunk), b''):
output.write(self.pri_key.private_decrypt(out, RSA.pkcs1_padding))
mb.close()
return output.getvalue()
def private_chunk_encrypt(self, message, chunk, need_base64=True):
output = io.BytesIO()
mb = io.BytesIO(message)
message_len = len(message)
# while message_len > 0:
# out = mb.read(chunk)
# message_len -= chunk
# output.write(self.pri_key.private_encrypt(out, RSA.pkcs1_padding))
for out in iter(lambda: mb.read(chunk), b''):
output.write(self.pri_key.private_encrypt(out, RSA.pkcs1_padding))
mb.close()
crypto = output.getvalue()
if need_base64:
crypto = base64.b64encode(crypto)
return crypto
def public_chunk_decrypt(self, crypto, chunk, need_base64=True):
if need_base64:
crypto = base64.b64decode(crypto)
output = io.BytesIO()
mb = io.BytesIO(crypto)
for out in iter(lambda: mb.read(chunk), b''):
output.write(self.pub_key.public_decrypt(out, RSA.pkcs1_padding))
mb.close()
return output.getvalue()
def public_chunk_encrypt(self, message, chunk, need_base64=True):
output = io.BytesIO()
mb = io.BytesIO(message)
for out in iter(lambda: mb.read(chunk), b''):
output.write(self.pub_key.public_encrypt(out, RSA.pkcs1_padding))
mb.close()
crypto = output.getvalue()
if need_base64:
crypto = base64.b64encode(crypto)
return crypto
def __stringfy(self, args):
args_str = ''.join([str(arg) for arg in args if arg is not None])
return hashlib.sha1(args_str)
rsa_pusher = RsaClient('./push_pub.pem', './push_pri.pem')
if __name__ == "__main__":
message = "需要加密的字符"
data = rsa_pusher.private_chunk_encrypt(message.encode('utf-8'), 117)
print(data)
print(rsa_pusher.public_chunk_decrypt(data, 128))
data = rsa_pusher.public_chunk_encrypt(message.encode('utf-8'), 117)
print(data)
print(rsa_pusher.private_chunk_decrypt(data, 128))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment