Created
August 1, 2017 13:08
-
-
Save gdelpierre/eecb5d990e5f79d84760da784841f085 to your computer and use it in GitHub Desktop.
Ansible module that handle openssl PKCS#12 file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# | |
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) | |
ANSIBLE_METADATA = {'metadata_version': '1.0', | |
'status': ['preview'], | |
'supported_by': 'community'} | |
DOCUMENTATION = ''' | |
--- | |
module: openssl_pkcs12 | |
author: "Guillaume Delpierre (@gdelpierre)" | |
version_added: "2.4" | |
short_description: Generate OpenSSL pkcs12 archive. | |
description: | |
- "This module allows one to (re-)generate PKCS#12." | |
requirements: | |
- "python-pyOpenSSL" | |
options: | |
ca_certificates: | |
required: False | |
description: | |
- List of CA certificate to include. | |
cert_path: | |
required: False | |
description: | |
- The path to read certificates and private keys from. | |
Must be in PEM format. | |
action: | |
required: False | |
default: 'export' | |
choices: ['parse', 'export'] | |
description: | |
- Create (export) or parse a PKCS#12. | |
src: | |
required: False | |
description: | |
- PKCS#12 file path to parse. | |
path: | |
required: True | |
default: null | |
description: | |
- Filename to write the PKCS#12 file to. | |
force: | |
required: False | |
default: False | |
description: | |
- Should the file be regenerated even it it already exists. | |
friendly_name: | |
required: False | |
default: null | |
aliases: 'name' | |
description: | |
- Specifies the friendly name for the certificate and private key. | |
iter_size: | |
required: False | |
default: 2048 | |
description: | |
- Number of times to repeat the encryption step. | |
maciter_size: | |
required: False | |
default: 1 | |
description: | |
- Number of times to repeat the MAC step. | |
mode: | |
required: False | |
default: 0400 | |
description: | |
- Default mode for the generated PKCS#12 file. | |
passphrase: | |
required: False | |
default: null | |
description: | |
- The PKCS#12 password. | |
privatekey_path: | |
required: False | |
description: | |
- File to read private key from. | |
privatekey_passphrase: | |
required: False | |
default: null | |
description: | |
- Passphrase source to decrypt any input private keys with. | |
state: | |
required: False | |
default: 'present' | |
choices: ['present', 'absent'] | |
description: | |
- Whether the file should exist or not. | |
''' | |
EXAMPLES = ''' | |
- name: 'Generate PKCS#12 file' | |
openssl_pkcs12: | |
path: '/opt/certs/ansible.p12' | |
friendly_name: 'raclette' | |
privatekey_path: '/opt/certs/keys/key.pem' | |
cert_path: '/opt/certs/cert.pem' | |
ca_certificates: '/opt/certs/ca.pem' | |
state: present | |
- name: 'Change PKCS#12 file permission' | |
openssl_pkcs12: | |
path: '/opt/certs/ansible.p12' | |
friendly_name: 'raclette' | |
privatekey_path: '/opt/certs/keys/key.pem' | |
cert_path: '/opt/certs/cert.pem' | |
ca_certificates: '/opt/certs/ca.pem' | |
state: present | |
mode: 0600 | |
- name: 'Regen PKCS#12 file' | |
openssl_pkcs12: | |
path: '/opt/certs/ansible.p12' | |
friendly_name: 'raclette' | |
privatekey_path: '/opt/certs/keys/key.pem' | |
cert_path: '/opt/certs/cert.pem' | |
ca_certificates: '/opt/certs/ca.pem' | |
state: present | |
mode: 0600 | |
force: True | |
- name: 'Dump/Parse PKCS#12 file' | |
openssl_pkcs12: | |
src: '/opt/certs/ansible.p12' | |
path: '/opt/certs/ansible.pem' | |
state: present | |
- name: 'Remove PKCS#12 file' | |
openssl_pkcs12: | |
path: '/opt/certs/ansible.p12' | |
state: absent | |
''' | |
RETURN = ''' | |
filename: | |
description: Path to the generate PKCS#12 file. | |
returned: changed or success | |
type: string | |
sample: /opt/certs/ansible.p12 | |
''' | |
import errno | |
import os | |
try: | |
from OpenSSL import crypto | |
except ImportError: | |
pyopenssl_found = False | |
else: | |
pyopenssl_found = True | |
from ansible.module_utils.basic import AnsibleModule | |
from ansible.module_utils._text import to_native | |
class PkcsError(Exception): | |
pass | |
class Pkcs(object): | |
def __init__(self, module): | |
self.path = module.params['path'] | |
self.force = module.params['force'] | |
self.state = module.params['state'] | |
self.action = module.params['action'] | |
self.check_mode = module.check_mode | |
self.iter_size = module.params['iter_size'] | |
self.maciter_size = module.params['maciter_size'] | |
self.pkcs12 = None | |
self.src = module.params['src'] | |
self.privatekey_path = module.params['privatekey_path'] | |
self.privatekey_passphrase = module.params['privatekey_passphrase'] | |
self.cert_path = module.params['cert_path'] | |
self.ca_certificates = module.params['ca_certificates'] | |
self.friendly_name = module.params['friendly_name'] | |
self.passphrase = module.params['passphrase'] | |
self.mode = module.params['mode'] | |
self.changed = False | |
if not self.mode: | |
self.mode = int('0400', 8) | |
def load_privatekey(self, path, passphrase=None): | |
''' Load the specified OpenSSL private key. ''' | |
try: | |
if passphrase: | |
privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, | |
open(path, 'rb').read(), | |
passphrase) | |
else: | |
privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, | |
open(path, 'rb').read()) | |
return privatekey | |
except (IOError, OSError) as exc: | |
raise PkcsError(exc) | |
def load_certificate(self, path): | |
''' Load the specified certificate. ''' | |
try: | |
cert_content = open(path, 'rb').read() | |
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_content) | |
return cert | |
except (IOError, OSError) as exc: | |
raise PkcsError(exc) | |
def load_pkcs12(self, path, passphrase=None): | |
''' Load pkcs12 file. ''' | |
try: | |
if passphrase: | |
return crypto.load_pkcs12(open(path, 'rb').read(), | |
passphrase) | |
else: | |
return crypto.load_pkcs12(open(path, 'rb').read()) | |
except (IOError, OSError) as exc: | |
raise PkcsError(exc) | |
def dump_privatekey(self, path): | |
''' Dump the specified OpenSSL private key. ''' | |
try: | |
return crypto.dump_privatekey(crypto.FILETYPE_PEM, | |
self.load_pkcs12(path).get_privatekey()) | |
except (IOError, OSError) as exc: | |
raise PkcsError(exc) | |
def dump_certificate(self, path): | |
''' Dump the specified certificate. ''' | |
try: | |
return crypto.dump_certificate(crypto.FILETYPE_PEM, | |
self.load_pkcs12(path).get_certificate()) | |
except (IOError, OSError) as exc: | |
raise PkcsError(exc) | |
def generate(self, module): | |
''' Generate PKCS#12 file archive. ''' | |
if not os.path.exists(self.path) or self.force: | |
self.pkcs12 = crypto.PKCS12() | |
try: | |
self.remove() | |
except PkcsError as exc: | |
module.fail_json(msg=to_native(exc)) | |
if self.ca_certificates: | |
ca_certs = [self.load_certificate(ca_cert) for ca_cert | |
in self.ca_certificates] | |
self.pkcs12.set_ca_certificates(ca_certs) | |
if self.cert_path: | |
self.pkcs12.set_certificate(self.load_certificate( | |
self.cert_path)) | |
if self.friendly_name: | |
self.pkcs12.set_friendlyname(self.friendly_name) | |
if self.privatekey_path: | |
self.pkcs12.set_privatekey(self.load_privatekey( | |
self.privatekey_path, | |
self.privatekey_passphrase) | |
) | |
try: | |
with open(self.path, 'wb', self.mode) as archive: | |
archive.write( | |
self.pkcs12.export( | |
self.passphrase, | |
self.iter_size, | |
self.maciter_size | |
) | |
) | |
module.set_mode_if_different(self.path, self.mode, False) | |
self.changed = True | |
except (IOError, OSError) as exc: | |
self.remove() | |
raise PkcsError(exc) | |
file_args = module.load_file_common_arguments(module.params) | |
if module.set_fs_attributes_if_different(file_args, False): | |
module.set_mode_if_different(self.path, self.mode, False) | |
self.changed = True | |
def parse(self, module): | |
''' Read PKCS#12 file. ''' | |
if not os.path.exists(self.path) or self.force: | |
try: | |
self.remove() | |
with open(self.path, 'wb') as content: | |
content.write("%s%s" % (self.dump_privatekey(self.src), | |
self.dump_certificate(self.src))) | |
module.set_mode_if_different(self.path, self.mode, False) | |
self.changed = True | |
except IOError as exc: | |
raise PkcsError(exc) | |
file_args = module.load_file_common_arguments(module.params) | |
if module.set_fs_attributes_if_different(file_args, False): | |
module.set_mode_if_different(self.path, self.mode, False) | |
self.changed = True | |
def remove(self): | |
''' Remove the PKCS#12 file archive from the filesystem. ''' | |
try: | |
os.remove(self.path) | |
self.changed = True | |
except OSError as exc: | |
if exc.errno != errno.ENOENT: | |
raise PkcsError(exc) | |
else: | |
pass | |
def check(self, module, perms_required=True): | |
''' Ensure the resource is in its desired state. ''' | |
def _check_pkey_passphrase(): | |
if self.privatekey_passphrase: | |
try: | |
self.load_privatekey(self.path, | |
self.privatekey_passphrase) | |
return True | |
except crypto.Error: | |
return False | |
return True | |
if not os.path.exists(self.path): | |
return os.path.exists(self.path) | |
return _check_pkey_passphrase | |
def dump(self): | |
''' Serialize the object into a dictionary. ''' | |
result = { | |
'changed': self.changed, | |
'filename': self.path, | |
} | |
if self.privatekey_path: | |
result['privatekey_path'] = self.privatekey_path | |
return result | |
def main(): | |
argument_spec = dict( | |
action=dict(default='export', | |
choices=['parse', 'export'], | |
type='str'), | |
ca_certificates=dict(type='list'), | |
cert_path=dict(type='path'), | |
force=dict(default=False, type='bool'), | |
friendly_name=dict(type='str', aliases=['name']), | |
iter_size=dict(default=2048, type='int'), | |
maciter_size=dict(default=1, type='int'), | |
passphrase=dict(type='str', no_log=True), | |
path=dict(required=True, type='path'), | |
privatekey_path=dict(type='path'), | |
privatekey_passphrase=dict(type='str', no_log=True), | |
state=dict(default='present', | |
choices=['present', 'absent'], | |
type='str'), | |
src=dict(type='path'), | |
) | |
required_if = [ | |
['action', 'export', ['friendly_name']], | |
['action', 'parse', ['src']], | |
] | |
required_together = [ | |
['privatekey_path', 'friendly_name'], | |
] | |
module = AnsibleModule( | |
argument_spec=argument_spec, | |
add_file_common_args=True, | |
required_if=required_if, | |
required_together=required_together, | |
supports_check_mode=True, | |
) | |
if not pyopenssl_found: | |
module.fail_json(msg='The python pyOpenSSL library is required') | |
base_dir = os.path.dirname(module.params['path']) | |
if not os.path.isdir(base_dir): | |
module.fail_json( | |
name=base_dir, | |
msg='The directory %s does not exist or ' | |
'the file is not a directory' % base_dir | |
) | |
pkcs12 = Pkcs(module) | |
if module.params['state'] == 'present': | |
if module.check_mode: | |
result = pkcs12.dump() | |
result['changed'] = module.params['force'] or not pkcs12.check(module) | |
module.exit_json(**result) | |
try: | |
if module.params['action'] == 'export': | |
pkcs12.generate(module) | |
else: | |
pkcs12.parse(module) | |
except PkcsError as exc: | |
module.fail_json(msg=to_native(exc)) | |
else: | |
if module.check_mode: | |
result = pkcs12.dump() | |
result['changed'] = os.path.exists(module.params['path']) | |
module.exit_json(**result) | |
try: | |
pkcs12.remove() | |
except PkcsError as exc: | |
module.fail_json(msg=to_native(exc)) | |
result = pkcs12.dump() | |
module.exit_json(**result) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment