|
# MIT No Attribution |
|
# |
|
# Copyright 2023 Ben Kehoe |
|
# |
|
# Permission is hereby granted, free of charge, to any person obtaining a copy of this |
|
# software and associated documentation files (the "Software"), to deal in the Software |
|
# without restriction, including without limitation the rights to use, copy, modify, |
|
# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to |
|
# permit persons to whom the Software is furnished to do so. |
|
# |
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, |
|
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A |
|
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT |
|
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
|
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
|
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
|
|
import io |
|
import random |
|
|
|
import boto3 |
|
|
|
from stream_random import BytestreamRandom |
|
|
|
__all__ = ["get_kms_random"] |
|
|
|
def get_kms_random(session: boto3.Session, buffer_size=None) -> random.Random: |
|
"""Get a random number generator using KMS.GenerateRandom as a source""" |
|
kms_bytestream = KMSByteStream(session, buffer_size=buffer_size) |
|
return BytestreamRandom(kms_bytestream) |
|
|
|
class KMSRawByteStream(io.RawIOBase): |
|
"""Raw bytestream from KMS.GenerateRandom""" |
|
|
|
def __init__(self, session: boto3.Session) -> None: |
|
super().__init__() |
|
self.session = session |
|
self.kms_client = session.client("kms") |
|
|
|
def _get_bytes(self, num_bytes: int) -> bytes: |
|
response = self.kms_client.generate_random(NumberOfBytes=num_bytes) |
|
return response["Plaintext"] |
|
|
|
def _check_closed(self): |
|
if self.closed: |
|
raise ValueError("Closed") |
|
|
|
def readable(self) -> bool: |
|
return True |
|
|
|
def readall(self) -> bytes: |
|
raise io.UnsupportedOperation("readall") |
|
|
|
def readinto(self, __buffer) -> int | None: |
|
self._check_closed() |
|
num_bytes = len(__buffer) |
|
bytes = self._get_bytes(num_bytes) |
|
__buffer[:len(bytes)] = bytes |
|
return len(bytes) |
|
|
|
# seek |
|
|
|
def seekable(self) -> bool: |
|
return False |
|
|
|
# tell, truncate |
|
|
|
def writable(self) -> bool: |
|
return False |
|
|
|
def write(self, __b) -> int | None: |
|
raise io.UnsupportedOperation("write") |
|
|
|
class KMSByteStream(io.BufferedReader): |
|
"""Buffered bytestream from KMS.GenerateRandom""" |
|
def __init__(self, session: boto3.Session, buffer_size=None) -> None: |
|
raw = KMSRawByteStream(session) |
|
if buffer_size is None: |
|
buffer_size = 16 # UUID-sized |
|
super().__init__(raw, buffer_size=buffer_size) |