Skip to content

Instantly share code, notes, and snippets.

@atdt
Last active December 22, 2020 12:53
Show Gist options
  • Save atdt/bac712b0a45ebe06b614 to your computer and use it in GitHub Desktop.
Save atdt/bac712b0a45ebe06b614 to your computer and use it in GitHub Desktop.
A minimal ssh-agent for using a smartcard with PIV for public-key authentication with OpenSSH through PKCS11.
org.wikimedia.ssh-pkcs11-agent.plist -> ~/Library/LaunchAgents
OPENSC_LIBS=/usr/local/Cellar/opensc/0.15.0/lib
ssh-add -s $OPENSC_LIBS/opensc-pkcs11.so
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
</dict>
<key>Label</key>
<string>org.wikimedia.ssh-pkcs11-agent</string>
<key>ProgramArguments</key>
<array>
<string>/usr/local/opt/ssh-pkcs11-agent</string>
<string>-a=/Users/ori/tmp/ssh-agent-proxy.sock</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>WorkingDirectory</key>
<string>/usr/local/var</string>
<key>StandardErrorPath</key>
<string>/usr/local/var/log/ssh-pkcs11-agent.log</string>
<key>StandardOutPath</key>
<string>/usr/local/var/log/ssh-pkcs11-agent.log</string>
</dict>
</plist>
#!/usr/local/bin/python2
# -*- coding: utf-8 -*-
"""
ssh-pkcs11-agent
~~~~~~~~~~~~~~~~
A minimal ssh-agent for using a smartcard with PIV for public-key
authentication with OpenSSH through PKCS11. Only three operations
are supported:
1) Adding a smartcard identity to the agent (`ssh-add -s`)
2) Listing the agent's identities (`ssh-add -l`)
3) Signing requests.
Usage: ssh-pkcs11-agent [-a bind_address]
-a BIND_ADDRESS Bind the agent to the UNIX-domain socket at path
BIND_ADDRESS. Default: $TMPDIR/ssh-pkcs11-agent.sock
Once you have started the agent, add your key with ssh-add:
ssh-add -s /usr/lib/opensc-pkcs11.so
Copyright 2016 Ori Livneh <[email protected]>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import SocketServer
import argparse
import binascii
import errno
import hashlib
import os
import socket
import struct
import sys
import tempfile
try:
import PyKCS11
except ImportError:
sys.exit('ssh-pkcs11-agent requires PyKCS11: '
'https://pypi.python.org/pypi/pykcs11')
# To have all pkcs11 library calls logged, uncomment the following
# two lines, and run (e.g.) 'ssh-add -s /path/to/pkcs11-spy.so':
# os.environ['PKCS11SPY'] = '/path/to/opensc-pkcs11.so'
# os.environ['PKCS11SPY_OUTPUT'] = '/path/to/debug.log'
# Magic numbers from the ssh-agent protocol specification.
# <https://github.com/openssh/openssh-portable/blob/4e636cf/PROTOCOL.agent>
SSH_AGENTC_REQUEST_RSA_IDENTITIES = 1
SSH_AGENT_RSA_IDENTITIES_ANSWER = 2
SSH_AGENT_FAILURE = 5
SSH_AGENT_SUCCESS = 6
SSH2_AGENTC_REQUEST_IDENTITIES = 11
SSH2_AGENT_IDENTITIES_ANSWER = 12
SSH2_AGENTC_SIGN_REQUEST = 13
SSH2_AGENT_SIGN_RESPONSE = 14
SSH_AGENTC_ADD_SMARTCARD_KEY = 20
# The OID for SHA1 (1.3.14.3.2.26) in ASN1-DER format.
SHA1_OID = b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14'
def rsa_to_blob(e, n, comment=''):
"""Convert a partially-specified RSA private key to a binary blob."""
key_data = pack_netstring('ssh-rsa') + pack_mpint(e) + pack_mpint(n)
return pack_netstring(key_data) + pack_netstring(comment)
def pack_uint32(n):
return struct.pack('!L', n)
def unpack_uint32(buffer, offset=0):
n, = struct.unpack_from('!L', buffer, offset)
return n, offset + 4
def pack_netstring(s):
return pack_uint32(len(s)) + s
def unpack_netstring(buffer, offset=0):
fmt = 'xxxx%ds' % struct.unpack_from('!L', buffer, offset)
string, = struct.unpack_from(fmt, buffer, offset)
return string, offset + struct.calcsize(fmt)
def bytes_to_long(s):
acc = 0L
unpack = struct.unpack
length = len(s)
if length % 4:
extra = (4 - length % 4)
s = b'\000' * extra + s
length = length + extra
for i in range(0, length, 4):
acc = (acc << 32) + unpack('>I', s[i:i+4])[0]
return acc
def long_to_bytes(val):
width = len(bin(val).lstrip('-0b'))
while width % 8:
width += 1
fmt = '%%0%dx' % (width // 4)
bytes = binascii.unhexlify(fmt % val)
return bytes
def pack_mpint(n):
if n == 0:
return b'\x00' * 4
bn = long_to_bytes(n)
if ord(bn[0]) & 128:
bn = b'\x00' + bn
return pack_netstring(bn)
def unpack_mpint(buffer, offset=0):
bytes, new_offset = unpack_netstring(buffer, offset)
return bytes_to_long(bytes), new_offset
def rm_f(path):
"""Remove a file if it exists."""
try:
os.unlink(path)
except OSError as e:
if e.errno != errno.ENOENT:
raise
class PKCS11Session(object):
# Criteria to search for when iterating on session objects.
# We want an RSA private key that can be used to sign data.
USABLE_KEY_TEMPLATE = (
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
(PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_RSA),
(PyKCS11.CKA_SIGN, True),
)
def __init__(self, reader_id, pin):
self.reader_id = reader_id
self.pin = pin
def __enter__(self):
self.pkcs11 = PyKCS11.PyKCS11Lib()
self.pkcs11.load(self.reader_id)
slots = self.pkcs11.getSlotList()
self._session = self.pkcs11.openSession(slots[1])
self._session.login(self.pin)
return self
def __exit__(self, exc_type, exc_value, traceback):
try:
self._session.logout()
except PyKCS11.PyKCS11Error:
pass
try:
self._session.closeSession()
except PyKCS11.PyKCS11Error:
pass
def get_keys(self):
"""Get a list of private keys that can be used for signing.
Each key is represented as a tuple of (exponent, modulus)."""
keys = []
for key in self._session.findObjects(self.USABLE_KEY_TEMPLATE):
KEY_ATTRIBUTES = (PyKCS11.CKA_MODULUS, PyKCS11.CKA_PUBLIC_EXPONENT)
n, e = self._session.getAttributeValue(key, KEY_ATTRIBUTES)
n = bytes_to_long(bytearray(n))
e = bytes_to_long(bytearray(e))
keys.append((e, n))
return keys
def sign(self, key, data):
"""Sign `data` using `key`."""
digest = hashlib.sha1(data).digest()
signature = self._session.sign(key, SHA1_OID + digest)
return pack_netstring('ssh-rsa') + pack_netstring(bytearray(signature))
def get_key_object(self, e, n):
"""Search for a key with exponent `e` and modulus `n`."""
n = [long(ord(byte)) for byte in long_to_bytes(n)]
e = [long(ord(byte)) for byte in long_to_bytes(e)]
key = self._session.findObjects((
(PyKCS11.CKA_MODULUS, n),
(PyKCS11.CKA_PUBLIC_EXPONENT, e),
) + self.USABLE_KEY_TEMPLATE)[0]
return key
class PrivateUnixSocketServer(SocketServer.ThreadingUnixStreamServer):
"""A socket server that listens on a UNIX socket that is readable
and writable only by the current user."""
pkcs11_session = None
def server_bind(self):
rm_f(self.server_address)
os.umask(0o077)
SocketServer.ThreadingUnixStreamServer.server_bind(self)
os.chmod(self.server_address, 0o600)
class SshAgentProxyHandler(SocketServer.BaseRequestHandler):
S_HEADER = struct.Struct('!LB')
timeout = 1
def send_message(self, code, message=b''):
header = self.S_HEADER.pack(len(message) + 1, code)
self.request.sendall(header + message)
def recv_message(self):
try:
header = self.request.recv(self.S_HEADER.size, socket.MSG_WAITALL)
size, code = self.S_HEADER.unpack(header)
message = self.request.recv(size - 1, socket.MSG_WAITALL)
except (socket.error, struct.error):
return None, b''
return code, message
def add_smartcard_key(self, message):
"""Handle SSH_AGENTC_ADD_SMARTCARD_KEY requests."""
reader_id, offset = unpack_netstring(message)
pin, _ = unpack_netstring(message, offset)
self.server.pkcs11_session = PKCS11Session(reader_id, pin)
self.send_message(SSH_AGENT_SUCCESS)
def sign_request(self, message):
"""Handle an SSH2_AGENTC_SIGN_REQUEST request."""
blob, offset = unpack_netstring(message)
data, offset = unpack_netstring(message, offset)
flags, _ = unpack_uint32(message, offset)
key_type, offset = unpack_netstring(blob)
assert key_type == 'ssh-rsa'
e, offset = unpack_mpint(blob, offset)
n, _ = unpack_mpint(blob, offset)
if self.server.pkcs11_session is not None:
with self.server.pkcs11_session as session:
key = session.get_key_object(e, n)
signature = session.sign(key, data)
self.send_message(SSH2_AGENT_SIGN_RESPONSE, pack_netstring(signature))
def provide_identities(self):
"""Handle an SSH2_AGENTC_REQUEST_IDENTITIES request."""
keys = ()
if self.server.pkcs11_session is not None:
try:
with self.server.pkcs11_session as session:
comment = session.reader_id
keys = session.get_keys()
except (IndexError, PyKCS11.PyKCS11Error):
pass
response = pack_uint32(len(keys))
for e, n in keys:
response += rsa_to_blob(e, n, comment)
self.send_message(SSH2_AGENT_IDENTITIES_ANSWER, response)
def handle(self):
while 1:
code, message = self.recv_message()
if code is None:
return
elif code == SSH_AGENTC_ADD_SMARTCARD_KEY:
self.add_smartcard_key(message)
elif code == SSH2_AGENTC_REQUEST_IDENTITIES:
self.provide_identities()
elif code == SSH_AGENTC_REQUEST_RSA_IDENTITIES:
self.send_message(SSH_AGENT_RSA_IDENTITIES_ANSWER,
struct.pack('!L', 0))
elif code == SSH2_AGENTC_SIGN_REQUEST:
self.sign_request(message)
else:
self.send_message(SSH_AGENT_FAILURE)
arg_parser = argparse.ArgumentParser(description='ssh-agent proxy')
arg_parser.add_argument(
'-a', metavar='bind_address', dest='bind_address',
default=(tempfile.gettempdir() + '/ssh-pkcs11-agent.sock'),
help='Bind the agent to the UNIX-domain socket bind_address'
)
args = arg_parser.parse_args()
print('SSH_AUTH_SOCK=%s ; export SSH_AUTH_SOCK' % args.bind_address)
server = PrivateUnixSocketServer(args.bind_address, SshAgentProxyHandler)
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment