Skip to content

Instantly share code, notes, and snippets.

@gdelpierre
Created August 1, 2017 13:08
Show Gist options
  • Save gdelpierre/eecb5d990e5f79d84760da784841f085 to your computer and use it in GitHub Desktop.
Save gdelpierre/eecb5d990e5f79d84760da784841f085 to your computer and use it in GitHub Desktop.
Ansible module that handle openssl PKCS#12 file
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
ANSIBLE_METADATA = {'metadata_version': '1.0',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
---
module: openssl_pkcs12
author: "Guillaume Delpierre (@gdelpierre)"
version_added: "2.4"
short_description: Generate OpenSSL pkcs12 archive.
description:
- "This module allows one to (re-)generate PKCS#12."
requirements:
- "python-pyOpenSSL"
options:
ca_certificates:
required: False
description:
- List of CA certificate to include.
cert_path:
required: False
description:
- The path to read certificates and private keys from.
Must be in PEM format.
action:
required: False
default: 'export'
choices: ['parse', 'export']
description:
- Create (export) or parse a PKCS#12.
src:
required: False
description:
- PKCS#12 file path to parse.
path:
required: True
default: null
description:
- Filename to write the PKCS#12 file to.
force:
required: False
default: False
description:
- Should the file be regenerated even it it already exists.
friendly_name:
required: False
default: null
aliases: 'name'
description:
- Specifies the friendly name for the certificate and private key.
iter_size:
required: False
default: 2048
description:
- Number of times to repeat the encryption step.
maciter_size:
required: False
default: 1
description:
- Number of times to repeat the MAC step.
mode:
required: False
default: 0400
description:
- Default mode for the generated PKCS#12 file.
passphrase:
required: False
default: null
description:
- The PKCS#12 password.
privatekey_path:
required: False
description:
- File to read private key from.
privatekey_passphrase:
required: False
default: null
description:
- Passphrase source to decrypt any input private keys with.
state:
required: False
default: 'present'
choices: ['present', 'absent']
description:
- Whether the file should exist or not.
'''
EXAMPLES = '''
- name: 'Generate PKCS#12 file'
openssl_pkcs12:
path: '/opt/certs/ansible.p12'
friendly_name: 'raclette'
privatekey_path: '/opt/certs/keys/key.pem'
cert_path: '/opt/certs/cert.pem'
ca_certificates: '/opt/certs/ca.pem'
state: present
- name: 'Change PKCS#12 file permission'
openssl_pkcs12:
path: '/opt/certs/ansible.p12'
friendly_name: 'raclette'
privatekey_path: '/opt/certs/keys/key.pem'
cert_path: '/opt/certs/cert.pem'
ca_certificates: '/opt/certs/ca.pem'
state: present
mode: 0600
- name: 'Regen PKCS#12 file'
openssl_pkcs12:
path: '/opt/certs/ansible.p12'
friendly_name: 'raclette'
privatekey_path: '/opt/certs/keys/key.pem'
cert_path: '/opt/certs/cert.pem'
ca_certificates: '/opt/certs/ca.pem'
state: present
mode: 0600
force: True
- name: 'Dump/Parse PKCS#12 file'
openssl_pkcs12:
src: '/opt/certs/ansible.p12'
path: '/opt/certs/ansible.pem'
state: present
- name: 'Remove PKCS#12 file'
openssl_pkcs12:
path: '/opt/certs/ansible.p12'
state: absent
'''
RETURN = '''
filename:
description: Path to the generate PKCS#12 file.
returned: changed or success
type: string
sample: /opt/certs/ansible.p12
'''
import errno
import os
try:
from OpenSSL import crypto
except ImportError:
pyopenssl_found = False
else:
pyopenssl_found = True
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native
class PkcsError(Exception):
pass
class Pkcs(object):
def __init__(self, module):
self.path = module.params['path']
self.force = module.params['force']
self.state = module.params['state']
self.action = module.params['action']
self.check_mode = module.check_mode
self.iter_size = module.params['iter_size']
self.maciter_size = module.params['maciter_size']
self.pkcs12 = None
self.src = module.params['src']
self.privatekey_path = module.params['privatekey_path']
self.privatekey_passphrase = module.params['privatekey_passphrase']
self.cert_path = module.params['cert_path']
self.ca_certificates = module.params['ca_certificates']
self.friendly_name = module.params['friendly_name']
self.passphrase = module.params['passphrase']
self.mode = module.params['mode']
self.changed = False
if not self.mode:
self.mode = int('0400', 8)
def load_privatekey(self, path, passphrase=None):
''' Load the specified OpenSSL private key. '''
try:
if passphrase:
privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM,
open(path, 'rb').read(),
passphrase)
else:
privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM,
open(path, 'rb').read())
return privatekey
except (IOError, OSError) as exc:
raise PkcsError(exc)
def load_certificate(self, path):
''' Load the specified certificate. '''
try:
cert_content = open(path, 'rb').read()
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_content)
return cert
except (IOError, OSError) as exc:
raise PkcsError(exc)
def load_pkcs12(self, path, passphrase=None):
''' Load pkcs12 file. '''
try:
if passphrase:
return crypto.load_pkcs12(open(path, 'rb').read(),
passphrase)
else:
return crypto.load_pkcs12(open(path, 'rb').read())
except (IOError, OSError) as exc:
raise PkcsError(exc)
def dump_privatekey(self, path):
''' Dump the specified OpenSSL private key. '''
try:
return crypto.dump_privatekey(crypto.FILETYPE_PEM,
self.load_pkcs12(path).get_privatekey())
except (IOError, OSError) as exc:
raise PkcsError(exc)
def dump_certificate(self, path):
''' Dump the specified certificate. '''
try:
return crypto.dump_certificate(crypto.FILETYPE_PEM,
self.load_pkcs12(path).get_certificate())
except (IOError, OSError) as exc:
raise PkcsError(exc)
def generate(self, module):
''' Generate PKCS#12 file archive. '''
if not os.path.exists(self.path) or self.force:
self.pkcs12 = crypto.PKCS12()
try:
self.remove()
except PkcsError as exc:
module.fail_json(msg=to_native(exc))
if self.ca_certificates:
ca_certs = [self.load_certificate(ca_cert) for ca_cert
in self.ca_certificates]
self.pkcs12.set_ca_certificates(ca_certs)
if self.cert_path:
self.pkcs12.set_certificate(self.load_certificate(
self.cert_path))
if self.friendly_name:
self.pkcs12.set_friendlyname(self.friendly_name)
if self.privatekey_path:
self.pkcs12.set_privatekey(self.load_privatekey(
self.privatekey_path,
self.privatekey_passphrase)
)
try:
with open(self.path, 'wb', self.mode) as archive:
archive.write(
self.pkcs12.export(
self.passphrase,
self.iter_size,
self.maciter_size
)
)
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
except (IOError, OSError) as exc:
self.remove()
raise PkcsError(exc)
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
def parse(self, module):
''' Read PKCS#12 file. '''
if not os.path.exists(self.path) or self.force:
try:
self.remove()
with open(self.path, 'wb') as content:
content.write("%s%s" % (self.dump_privatekey(self.src),
self.dump_certificate(self.src)))
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
except IOError as exc:
raise PkcsError(exc)
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
def remove(self):
''' Remove the PKCS#12 file archive from the filesystem. '''
try:
os.remove(self.path)
self.changed = True
except OSError as exc:
if exc.errno != errno.ENOENT:
raise PkcsError(exc)
else:
pass
def check(self, module, perms_required=True):
''' Ensure the resource is in its desired state. '''
def _check_pkey_passphrase():
if self.privatekey_passphrase:
try:
self.load_privatekey(self.path,
self.privatekey_passphrase)
return True
except crypto.Error:
return False
return True
if not os.path.exists(self.path):
return os.path.exists(self.path)
return _check_pkey_passphrase
def dump(self):
''' Serialize the object into a dictionary. '''
result = {
'changed': self.changed,
'filename': self.path,
}
if self.privatekey_path:
result['privatekey_path'] = self.privatekey_path
return result
def main():
argument_spec = dict(
action=dict(default='export',
choices=['parse', 'export'],
type='str'),
ca_certificates=dict(type='list'),
cert_path=dict(type='path'),
force=dict(default=False, type='bool'),
friendly_name=dict(type='str', aliases=['name']),
iter_size=dict(default=2048, type='int'),
maciter_size=dict(default=1, type='int'),
passphrase=dict(type='str', no_log=True),
path=dict(required=True, type='path'),
privatekey_path=dict(type='path'),
privatekey_passphrase=dict(type='str', no_log=True),
state=dict(default='present',
choices=['present', 'absent'],
type='str'),
src=dict(type='path'),
)
required_if = [
['action', 'export', ['friendly_name']],
['action', 'parse', ['src']],
]
required_together = [
['privatekey_path', 'friendly_name'],
]
module = AnsibleModule(
argument_spec=argument_spec,
add_file_common_args=True,
required_if=required_if,
required_together=required_together,
supports_check_mode=True,
)
if not pyopenssl_found:
module.fail_json(msg='The python pyOpenSSL library is required')
base_dir = os.path.dirname(module.params['path'])
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg='The directory %s does not exist or '
'the file is not a directory' % base_dir
)
pkcs12 = Pkcs(module)
if module.params['state'] == 'present':
if module.check_mode:
result = pkcs12.dump()
result['changed'] = module.params['force'] or not pkcs12.check(module)
module.exit_json(**result)
try:
if module.params['action'] == 'export':
pkcs12.generate(module)
else:
pkcs12.parse(module)
except PkcsError as exc:
module.fail_json(msg=to_native(exc))
else:
if module.check_mode:
result = pkcs12.dump()
result['changed'] = os.path.exists(module.params['path'])
module.exit_json(**result)
try:
pkcs12.remove()
except PkcsError as exc:
module.fail_json(msg=to_native(exc))
result = pkcs12.dump()
module.exit_json(**result)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment