sort_certificate/sort_certificate.py

437 lines
16 KiB
Python
Executable File

#! /usr/bin/env python
'''
Sort X509/RSA key material
'''
from __future__ import print_function
import logging
import re
import fileinput
from argparse import ArgumentParser
from datetime import datetime
from OpenSSL import crypto
from Crypto.Util import asn1
from cryptography.hazmat.primitives import serialization
import certifi.core
VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\
r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\
r'[a-zA-Z0-9]))*$'
CERTINFO_TEMPLATE = '''
subject= /{subject}
issuer= /{issuer}
notBefore={notbefore!s}
notAfter={notafter}
SHA1 Fingerprint={sha1fingerprint}
'''.strip()
ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8'))
OPENSSLTIME_FMT = '%b %e %T %Y GMT'
class OnlyRSAKeyException(Exception):
'''
When we encounter other than RSA crypto material
'''
pass
class CertificateComponentException(Exception):
'''
When something is not right with the whole cert+intermediates+private key bundle
'''
pass
def load_data(filenames):
'''
Read all data till EOF.
Might come from full chain (+ privkey) bundles
Might come from stdin (e.g. as vim filter action)
Might not be a good idea if it is being used to read thousands of files
'''
datas = {}
current_file = None
# iterate over all inputs
for line in fileinput.input(filenames):
# switch files if the filename changes, automatically switches slots
if current_file is not fileinput.filename():
current_file = fileinput.filename()
logging.debug('Reading from %s', current_file)
if current_file in datas:
datas[current_file] += line.lstrip()
else:
datas[current_file] = line.lstrip()
return datas
def get_pub_modulus(cert):
'''
Get the modulus of a certificate
'''
pub = cert.get_pubkey()
# Only works for RSA (I think)
if pub.type() != crypto.TYPE_RSA:
logging.debug('Can only handle RSA crypto:'
'\n\tsubject=%s\n\tissuer%s\n\texpired=%s\n\ttype=%s',
cert.get_subject(),
cert.get_subject(),
cert.has_expired(),
pub.type())
raise OnlyRSAKeyException('Can only handle RSA crypto')
pub_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, pub)
pub_der = asn1.DerSequence()
pub_der.decode(pub_asn1)
pub_modulus = pub_der[1]
return pub_modulus
def get_priv_modulus(priv):
'''
Get the modulus of a RSA private key
'''
# Only works for RSA (I think)
if priv.type() != crypto.TYPE_RSA:
raise OnlyRSAKeyException('Can only handle RSA crypto')
priv_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, priv)
priv_der = asn1.DerSequence()
priv_der.decode(priv_asn1)
priv_modulus = priv_der[1]
return priv_modulus
def match_cert_privkey(cert, priv):
'''
Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long
and reworked
'''
return get_pub_modulus(cert) == get_priv_modulus(priv)
def find_root(x509_objects, root_issuers):
'''
Find a suitable anchor by finding the intermediate that was signed by root
'''
root_cert = root_issuers[str(x509_objects[-1].get_issuer())]
logging.debug('Retrieved root certificate %s', root_cert.get_subject())
return root_cert
def find_intermediate_root(x509_objects, root_issuers):
'''
Find a suitable anchor by finding the intermediate that was signed by root
'''
# Some intermediates have the *same* subject as some root certificates.
# blacklist them
# XXX better use modulus/hash for that, but can't find the appropriate
# interface to that at the moment
excluded_issuers = [str(x.get_subject()) for x in x509_objects
if x.get_subject() != x.get_issuer()]
logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers))
logging.debug('Excluding issuers because of potential intermediates\n\t%s',
'\n\t'.join(excluded_issuers))
logging.debug('issuers seen in data\n\t%s',
'\n\t'.join([str(x.get_issuer()) for x in x509_objects]))
return [x for x in x509_objects
if str(x.get_issuer()) in root_issuers
and str(x.get_issuer()) not in excluded_issuers]
def order_x509(x509_objects, root_issuers):
'''
order x509 objects to ensure proper chain order
'''
bundle = []
root_crt = [x for x in x509_objects if x.get_subject() == x.get_issuer()]
if root_crt:
root_crt = x509_objects.pop(x509_objects.index(root_crt[0]))
logging.warning('Found self signed (root) certificate %s in input',
str(root_crt.get_subject()))
# Double check if our self signed root certificate is not also present
# as an intermediate:
# - It is probably invalid input, and doesn't make sense
# - It confuses the ordering process
if next((x for x in x509_objects
if x.get_subject() != x.get_issuer()
and x.get_subject() == root_crt.get_subject()), None):
raise CertificateComponentException('Both present as intermediate '
'and root certificate: %s' %
str(root_crt.get_subject()))
else:
# Get intermediate cert signed by any root from bundle as anchor, and
# make that our root
logging.debug('No root certificate in input,'
' obtain intermediate through known root issuers')
root_crt = find_intermediate_root(x509_objects, root_issuers)
logging.debug('intermediates seen in data signed by root \n\t%s',
'\n\t'.join([str(x.get_issuer()) for x in root_crt]))
if root_crt and len(root_crt) == 1:
logging.debug('Found subject=%s,issuer=%s',
root_crt[0].get_subject(), root_crt[0].get_issuer())
root_crt = x509_objects.pop(x509_objects.index(root_crt[0]))
else:
raise CertificateComponentException('No intermediate found')
# Insert our anchor.
bundle.insert(0, root_crt)
# now work our way up by going through the list,
# inserting the certificate where the issuer matches the current topmost
# subject until we are empty
while x509_objects:
sibling = [x for x in x509_objects
if x.get_issuer() == bundle[0].get_subject()]
if sibling and len(sibling) == 1:
# insert sibling at beginning of list
bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
else:
# Lets complain
raise CertificateComponentException('Non matching certificates in input:'
' No sibling found for %s'
% bundle[0].get_subject())
return bundle
def load_root_issuers():
'''
Return the list of CA roots (RSA only)
'''
root_issuers = None
mozrootbundle_location = certifi.core.where()
with open(mozrootbundle_location, 'r') as fname_fh:
logging.info('Using %s for root ca bundle', mozrootbundle_location)
data = fname_fh.read()
matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
'.*?'
'-----END CERTIFICATE-----)',
data, re.DOTALL)
root_certs = [crypto.load_certificate(crypto.FILETYPE_PEM,
match.group(1))
for match in matches]
logging.debug('Loaded root certificates from bundle')
for root_cert in root_certs:
try:
logging.debug('subject=%s\n\tissuer%s\n\t'
'expired=%s\n\tmodulus=%s',
root_cert.get_subject(),
root_cert.get_issuer(),
root_cert.has_expired(),
get_pub_modulus(root_cert))
except OnlyRSAKeyException as onlyrsa_exception:
logging.debug(onlyrsa_exception)
continue
root_issuers = {str(root_cert.get_subject()): root_cert
for root_cert in root_certs}
return root_issuers
def handle_args():
'''
Handle tool arguments
'''
parser = ArgumentParser(description='Reorder X509/RSA data for'
' hosting use')
loggrp = parser.add_mutually_exclusive_group()
loggrp.add_argument('-v', '--verbose',
action='store_true',
help='Show verbose logging')
loggrp.add_argument('-d', '--debug',
action='store_true',
help='Show debug logging')
loggrp.add_argument('-q', '--quiet',
action='store_true',
help='Show only error logging')
outputgrp = parser.add_mutually_exclusive_group()
outputgrp.add_argument('--just-certificate', dest='print_cert',
action='store_true', help='Just print certificate')
outputgrp.add_argument('--no-certificate', dest='print_cert',
action='store_false',
help='Omit certificate from output')
outputgrp.set_defaults(print_cert=True)
outputgrp.add_argument('--just-chain', dest='print_chain',
action='store_true', help='Just print chain')
outputgrp.add_argument('--no-chain', dest='print_chain',
action='store_false', help='Omit chain from output')
outputgrp.add_argument('--include-root', dest='include_root',
action='store_true', help='Also include the root certificate')
outputgrp.set_defaults(print_chain=True)
outputgrp.add_argument('--key', dest='print_key',
action='store_true', default=True,
help='Just print key')
outputgrp.add_argument('--no-key', dest='print_key',
action='store_false', help='Omit key from output')
outputgrp.set_defaults(print_key=True)
parser.add_argument('x509files', metavar='x509 file', nargs='*',
help='x509 fullchain (+ rsa privkey)'
' bundles to be checked')
return parser.parse_args()
def main():
'''
main program start and argument parsing
'''
root_issuers = None
args = handle_args()
if args.verbose:
logging.basicConfig(level=logging.INFO)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.ERROR)
else:
logging.basicConfig(level=logging.WARNING)
root_issuers = load_root_issuers()
for fname, data in list(load_data(args.x509files).items()):
logging.debug('Processing %s', fname)
x509_objects_components = None
x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
'.*?'
'-----END CERTIFICATE-----)',
data, re.DOTALL)
rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----'
'.*?'
'-----END RSA PRIVATE KEY-----)',
data, re.DOTALL)
pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----'
'.*?'
'-----END PRIVATE KEY-----)',
data, re.DOTALL)
x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM,
x509match.group(1))
for x509match in x509matches]
rsa_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM,
rsamatch.group(1))
for rsamatch in rsamatches]
pk_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM,
pkmatch.group(1))
for pkmatch in pkmatches]
x509_objects = order_x509(x509_objects, root_issuers)
x509_objects_components = x509_objects[0].get_subject().\
get_components()
if len(rsa_objects) > 1:
raise CertificateComponentException('More than one RSA private key found in input.'
' Aborting')
elif rsa_objects:
if not match_cert_privkey(x509_objects[0], rsa_objects[0]):
raise CertificateComponentException('Provided certificate'
' and RSA private key do not match')
else:
logging.info('OK: Modulus of provided certificate'
' and RSA private key match')
elif len(pk_objects) > 1:
raise CertificateComponentException('More than one RSA private key found in input.'
' Aborting')
elif pk_objects:
if not match_cert_privkey(x509_objects[0], pk_objects[0]):
raise CertificateComponentException('Provided certificate'
' and private key do not match')
else:
logging.info('OK: Modulus of provided certificate'
' and private key match')
if args.include_root:
logging.debug('root certificate in output requested')
x509_objects.append(find_root(x509_objects, root_issuers))
logging.debug("Print certificates in order")
# Need to do b'CN' to have this python3 compatible
logging.info('Writing bundle for Subject: %s',
[x[1].decode('utf-8')
for x in x509_objects_components
if x[0] == b'CN'][0])
for x509_object in [x for x in x509_objects
if x.get_subject() != x.get_issuer()
or args.include_root]:
# Stringify subject like openssl x509 -subject
x509_subject = \
'/'.join(['{0}={1}'.format(component[0].decode(),
component[1].decode())
for component in
x509_object.get_subject().get_components()])
# Stringify issuer like openssl x509 -issuer
x509_issuer = \
'/'.join(['{0}={1}'.format(component[0].decode(),
component[1].decode())
for component in
x509_object.get_issuer().get_components()])
x509_not_after = \
datetime.strptime(str(x509_object.get_notAfter()),
ASN1TIME_FMT)
x509_not_before = \
datetime.strptime(str(x509_object.get_notBefore()),
ASN1TIME_FMT)
logging.info('Subject: %s', x509_subject)
logging.info('Issuer: %s', x509_issuer)
print(CERTINFO_TEMPLATE.format(
subject=x509_subject,
issuer=x509_issuer,
notbefore=x509_not_before.strftime(OPENSSLTIME_FMT),
notafter=x509_not_after.strftime(OPENSSLTIME_FMT),
sha1fingerprint=x509_object.digest('sha1').decode()))
print(crypto.dump_certificate(crypto.FILETYPE_PEM,
x509_object).decode('ascii'),
end='')
if rsa_objects:
logging.info('Print RSA private keys')
for rsa_object in rsa_objects:
print(rsa_object.to_cryptography_key().private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()).decode(
'ascii'),
end='')
elif pk_objects:
logging.info('Print private keys')
for pk_object in pk_objects:
print(crypto.dump_privatekey(crypto.FILETYPE_PEM,
pk_object).decode('ascii'),
end='')
if __name__ == "__main__":
exit(main())