Last active
December 22, 2020 12:53
-
-
Save atdt/bac712b0a45ebe06b614 to your computer and use it in GitHub Desktop.
A minimal ssh-agent for using a smartcard with PIV for public-key authentication with OpenSSH through PKCS11.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
org.wikimedia.ssh-pkcs11-agent.plist -> ~/Library/LaunchAgents | |
OPENSC_LIBS=/usr/local/Cellar/opensc/0.15.0/lib | |
ssh-add -s $OPENSC_LIBS/opensc-pkcs11.so |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> | |
<plist version="1.0"> | |
<dict> | |
<key>KeepAlive</key> | |
<dict> | |
<key>SuccessfulExit</key> | |
<false/> | |
</dict> | |
<key>Label</key> | |
<string>org.wikimedia.ssh-pkcs11-agent</string> | |
<key>ProgramArguments</key> | |
<array> | |
<string>/usr/local/opt/ssh-pkcs11-agent</string> | |
<string>-a=/Users/ori/tmp/ssh-agent-proxy.sock</string> | |
</array> | |
<key>RunAtLoad</key> | |
<true/> | |
<key>WorkingDirectory</key> | |
<string>/usr/local/var</string> | |
<key>StandardErrorPath</key> | |
<string>/usr/local/var/log/ssh-pkcs11-agent.log</string> | |
<key>StandardOutPath</key> | |
<string>/usr/local/var/log/ssh-pkcs11-agent.log</string> | |
</dict> | |
</plist> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python2 | |
# -*- coding: utf-8 -*- | |
""" | |
ssh-pkcs11-agent | |
~~~~~~~~~~~~~~~~ | |
A minimal ssh-agent for using a smartcard with PIV for public-key | |
authentication with OpenSSH through PKCS11. Only three operations | |
are supported: | |
1) Adding a smartcard identity to the agent (`ssh-add -s`) | |
2) Listing the agent's identities (`ssh-add -l`) | |
3) Signing requests. | |
Usage: ssh-pkcs11-agent [-a bind_address] | |
-a BIND_ADDRESS Bind the agent to the UNIX-domain socket at path | |
BIND_ADDRESS. Default: $TMPDIR/ssh-pkcs11-agent.sock | |
Once you have started the agent, add your key with ssh-add: | |
ssh-add -s /usr/lib/opensc-pkcs11.so | |
Copyright 2016 Ori Livneh <[email protected]> | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
""" | |
from __future__ import print_function | |
import SocketServer | |
import argparse | |
import binascii | |
import errno | |
import hashlib | |
import os | |
import socket | |
import struct | |
import sys | |
import tempfile | |
try: | |
import PyKCS11 | |
except ImportError: | |
sys.exit('ssh-pkcs11-agent requires PyKCS11: ' | |
'https://pypi.python.org/pypi/pykcs11') | |
# To have all pkcs11 library calls logged, uncomment the following | |
# two lines, and run (e.g.) 'ssh-add -s /path/to/pkcs11-spy.so': | |
# os.environ['PKCS11SPY'] = '/path/to/opensc-pkcs11.so' | |
# os.environ['PKCS11SPY_OUTPUT'] = '/path/to/debug.log' | |
# Magic numbers from the ssh-agent protocol specification. | |
# <https://github.com/openssh/openssh-portable/blob/4e636cf/PROTOCOL.agent> | |
SSH_AGENTC_REQUEST_RSA_IDENTITIES = 1 | |
SSH_AGENT_RSA_IDENTITIES_ANSWER = 2 | |
SSH_AGENT_FAILURE = 5 | |
SSH_AGENT_SUCCESS = 6 | |
SSH2_AGENTC_REQUEST_IDENTITIES = 11 | |
SSH2_AGENT_IDENTITIES_ANSWER = 12 | |
SSH2_AGENTC_SIGN_REQUEST = 13 | |
SSH2_AGENT_SIGN_RESPONSE = 14 | |
SSH_AGENTC_ADD_SMARTCARD_KEY = 20 | |
# The OID for SHA1 (1.3.14.3.2.26) in ASN1-DER format. | |
SHA1_OID = b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14' | |
def rsa_to_blob(e, n, comment=''): | |
"""Convert a partially-specified RSA private key to a binary blob.""" | |
key_data = pack_netstring('ssh-rsa') + pack_mpint(e) + pack_mpint(n) | |
return pack_netstring(key_data) + pack_netstring(comment) | |
def pack_uint32(n): | |
return struct.pack('!L', n) | |
def unpack_uint32(buffer, offset=0): | |
n, = struct.unpack_from('!L', buffer, offset) | |
return n, offset + 4 | |
def pack_netstring(s): | |
return pack_uint32(len(s)) + s | |
def unpack_netstring(buffer, offset=0): | |
fmt = 'xxxx%ds' % struct.unpack_from('!L', buffer, offset) | |
string, = struct.unpack_from(fmt, buffer, offset) | |
return string, offset + struct.calcsize(fmt) | |
def bytes_to_long(s): | |
acc = 0L | |
unpack = struct.unpack | |
length = len(s) | |
if length % 4: | |
extra = (4 - length % 4) | |
s = b'\000' * extra + s | |
length = length + extra | |
for i in range(0, length, 4): | |
acc = (acc << 32) + unpack('>I', s[i:i+4])[0] | |
return acc | |
def long_to_bytes(val): | |
width = len(bin(val).lstrip('-0b')) | |
while width % 8: | |
width += 1 | |
fmt = '%%0%dx' % (width // 4) | |
bytes = binascii.unhexlify(fmt % val) | |
return bytes | |
def pack_mpint(n): | |
if n == 0: | |
return b'\x00' * 4 | |
bn = long_to_bytes(n) | |
if ord(bn[0]) & 128: | |
bn = b'\x00' + bn | |
return pack_netstring(bn) | |
def unpack_mpint(buffer, offset=0): | |
bytes, new_offset = unpack_netstring(buffer, offset) | |
return bytes_to_long(bytes), new_offset | |
def rm_f(path): | |
"""Remove a file if it exists.""" | |
try: | |
os.unlink(path) | |
except OSError as e: | |
if e.errno != errno.ENOENT: | |
raise | |
class PKCS11Session(object): | |
# Criteria to search for when iterating on session objects. | |
# We want an RSA private key that can be used to sign data. | |
USABLE_KEY_TEMPLATE = ( | |
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), | |
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_RSA), | |
(PyKCS11.CKA_SIGN, True), | |
) | |
def __init__(self, reader_id, pin): | |
self.reader_id = reader_id | |
self.pin = pin | |
def __enter__(self): | |
self.pkcs11 = PyKCS11.PyKCS11Lib() | |
self.pkcs11.load(self.reader_id) | |
slots = self.pkcs11.getSlotList() | |
self._session = self.pkcs11.openSession(slots[1]) | |
self._session.login(self.pin) | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
try: | |
self._session.logout() | |
except PyKCS11.PyKCS11Error: | |
pass | |
try: | |
self._session.closeSession() | |
except PyKCS11.PyKCS11Error: | |
pass | |
def get_keys(self): | |
"""Get a list of private keys that can be used for signing. | |
Each key is represented as a tuple of (exponent, modulus).""" | |
keys = [] | |
for key in self._session.findObjects(self.USABLE_KEY_TEMPLATE): | |
KEY_ATTRIBUTES = (PyKCS11.CKA_MODULUS, PyKCS11.CKA_PUBLIC_EXPONENT) | |
n, e = self._session.getAttributeValue(key, KEY_ATTRIBUTES) | |
n = bytes_to_long(bytearray(n)) | |
e = bytes_to_long(bytearray(e)) | |
keys.append((e, n)) | |
return keys | |
def sign(self, key, data): | |
"""Sign `data` using `key`.""" | |
digest = hashlib.sha1(data).digest() | |
signature = self._session.sign(key, SHA1_OID + digest) | |
return pack_netstring('ssh-rsa') + pack_netstring(bytearray(signature)) | |
def get_key_object(self, e, n): | |
"""Search for a key with exponent `e` and modulus `n`.""" | |
n = [long(ord(byte)) for byte in long_to_bytes(n)] | |
e = [long(ord(byte)) for byte in long_to_bytes(e)] | |
key = self._session.findObjects(( | |
(PyKCS11.CKA_MODULUS, n), | |
(PyKCS11.CKA_PUBLIC_EXPONENT, e), | |
) + self.USABLE_KEY_TEMPLATE)[0] | |
return key | |
class PrivateUnixSocketServer(SocketServer.ThreadingUnixStreamServer): | |
"""A socket server that listens on a UNIX socket that is readable | |
and writable only by the current user.""" | |
pkcs11_session = None | |
def server_bind(self): | |
rm_f(self.server_address) | |
os.umask(0o077) | |
SocketServer.ThreadingUnixStreamServer.server_bind(self) | |
os.chmod(self.server_address, 0o600) | |
class SshAgentProxyHandler(SocketServer.BaseRequestHandler): | |
S_HEADER = struct.Struct('!LB') | |
timeout = 1 | |
def send_message(self, code, message=b''): | |
header = self.S_HEADER.pack(len(message) + 1, code) | |
self.request.sendall(header + message) | |
def recv_message(self): | |
try: | |
header = self.request.recv(self.S_HEADER.size, socket.MSG_WAITALL) | |
size, code = self.S_HEADER.unpack(header) | |
message = self.request.recv(size - 1, socket.MSG_WAITALL) | |
except (socket.error, struct.error): | |
return None, b'' | |
return code, message | |
def add_smartcard_key(self, message): | |
"""Handle SSH_AGENTC_ADD_SMARTCARD_KEY requests.""" | |
reader_id, offset = unpack_netstring(message) | |
pin, _ = unpack_netstring(message, offset) | |
self.server.pkcs11_session = PKCS11Session(reader_id, pin) | |
self.send_message(SSH_AGENT_SUCCESS) | |
def sign_request(self, message): | |
"""Handle an SSH2_AGENTC_SIGN_REQUEST request.""" | |
blob, offset = unpack_netstring(message) | |
data, offset = unpack_netstring(message, offset) | |
flags, _ = unpack_uint32(message, offset) | |
key_type, offset = unpack_netstring(blob) | |
assert key_type == 'ssh-rsa' | |
e, offset = unpack_mpint(blob, offset) | |
n, _ = unpack_mpint(blob, offset) | |
if self.server.pkcs11_session is not None: | |
with self.server.pkcs11_session as session: | |
key = session.get_key_object(e, n) | |
signature = session.sign(key, data) | |
self.send_message(SSH2_AGENT_SIGN_RESPONSE, pack_netstring(signature)) | |
def provide_identities(self): | |
"""Handle an SSH2_AGENTC_REQUEST_IDENTITIES request.""" | |
keys = () | |
if self.server.pkcs11_session is not None: | |
try: | |
with self.server.pkcs11_session as session: | |
comment = session.reader_id | |
keys = session.get_keys() | |
except (IndexError, PyKCS11.PyKCS11Error): | |
pass | |
response = pack_uint32(len(keys)) | |
for e, n in keys: | |
response += rsa_to_blob(e, n, comment) | |
self.send_message(SSH2_AGENT_IDENTITIES_ANSWER, response) | |
def handle(self): | |
while 1: | |
code, message = self.recv_message() | |
if code is None: | |
return | |
elif code == SSH_AGENTC_ADD_SMARTCARD_KEY: | |
self.add_smartcard_key(message) | |
elif code == SSH2_AGENTC_REQUEST_IDENTITIES: | |
self.provide_identities() | |
elif code == SSH_AGENTC_REQUEST_RSA_IDENTITIES: | |
self.send_message(SSH_AGENT_RSA_IDENTITIES_ANSWER, | |
struct.pack('!L', 0)) | |
elif code == SSH2_AGENTC_SIGN_REQUEST: | |
self.sign_request(message) | |
else: | |
self.send_message(SSH_AGENT_FAILURE) | |
arg_parser = argparse.ArgumentParser(description='ssh-agent proxy') | |
arg_parser.add_argument( | |
'-a', metavar='bind_address', dest='bind_address', | |
default=(tempfile.gettempdir() + '/ssh-pkcs11-agent.sock'), | |
help='Bind the agent to the UNIX-domain socket bind_address' | |
) | |
args = arg_parser.parse_args() | |
print('SSH_AUTH_SOCK=%s ; export SSH_AUTH_SOCK' % args.bind_address) | |
server = PrivateUnixSocketServer(args.bind_address, SshAgentProxyHandler) | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment