From 2ba60dded4c8416669b487a6041edc0a81bceeb7 Mon Sep 17 00:00:00 2001 From: Ruben van Staveren Date: Mon, 6 May 2019 12:13:04 +0200 Subject: [PATCH] Import into VCS --- sort_certificate.py | 339 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100755 sort_certificate.py diff --git a/sort_certificate.py b/sort_certificate.py new file mode 100755 index 0000000..a2e5468 --- /dev/null +++ b/sort_certificate.py @@ -0,0 +1,339 @@ +#! /usr/bin/env python +''' +Sort X509/RSA key material +''' + +from __future__ import print_function + +import logging +import re +import os +import fileinput +from argparse import ArgumentParser +from OpenSSL import crypto +from Crypto.Util import asn1 +from cryptography.hazmat.primitives import serialization + + +# XXX Scan a couple of known locations to get this file +MOZROOTBUNDLE_LOCATIONS = [ + '/etc/ssl/cert.pem', + '/etc/pki/tls/cert.pem', +] + +VALID_HOSTNAME_RE = '^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\ + '(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\ + '[a-zA-Z0-9]))*$' + + +def load_data(filenames): + ''' + Read all data till EOF. + Might come from full chain (+ privkey) bundles + Might come from stdin (e.g. as vim filter action) + Might not be a good idea if it is being used to read thousands of files + ''' + datas = {} + current_file = None + + # iterate over all inputs + for line in fileinput.input(filenames): + # switch files if the filename changes, automatically switches slots + if current_file is not fileinput.filename(): + current_file = fileinput.filename() + logging.debug('Reading from %s', current_file) + if current_file in datas: + datas[current_file] += line.lstrip() + else: + datas[current_file] = line.lstrip() + return datas + + +def get_pub_modulus(cert): + ''' + Get the modulus of a certificate + ''' + pub = cert.get_pubkey() + + # Only works for RSA (I think) + if pub.type() != crypto.TYPE_RSA: + logging.debug('Can only handle RSA crypto:' + '\n\tsubject=%s\n\tissuer%s\n\texpired=%s\n\ttype=%s', + cert.get_subject(), + cert.get_subject(), + cert.has_expired(), + pub.type()) + raise Exception('Can only handle RSA crypto') + + pub_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, pub) + pub_der = asn1.DerSequence() + pub_der.decode(pub_asn1) + pub_modulus = pub_der[1] + + return pub_modulus + + +def get_priv_modulus(priv): + ''' + Get the modulus of a RSA private key + ''' + + # Only works for RSA (I think) + if priv.type() != crypto.TYPE_RSA: + raise Exception('Can only handle RSA crypto') + + priv_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, priv) + priv_der = asn1.DerSequence() + priv_der.decode(priv_asn1) + priv_modulus = priv_der[1] + + return priv_modulus + + +def match_cert_privkey(cert, priv): + ''' + Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not + and reworked + ''' + + return get_pub_modulus(cert) == get_priv_modulus(priv) + + +def find_intermediate_root(x509_objects, root_issuers): + ''' + Find a suitable anchor by finding the intermediate that was signed by root + ''' + + # Some intermediates have the *same* subject as some root certificates. + # blacklist them + # XXX better use modulus/hash for that, but can't find the appropriate + # interface to that at the moment + excluded_issuers = [str(x.get_subject()) for x in x509_objects + if x.get_subject() != x.get_issuer()] + + logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers)) + logging.debug('Excluding issuers because of potential intermediates\n\t%s', + '\n\t'.join(excluded_issuers)) + logging.debug('issuers seen in data\n\t%s', + '\n\t'.join([str(x.get_issuer()) for x in x509_objects])) + return [x for x in x509_objects + if str(x.get_issuer()) in root_issuers + and str(x.get_issuer()) not in excluded_issuers] + + +def order_x509(x509_objects, root_issuers): + ''' + order x509 objects to ensure proper chain order + ''' + bundle = [] + root_crt = [x for x in x509_objects if x.get_subject() == x.get_issuer()] + if root_crt: + root_crt = x509_objects.pop(x509_objects.index(root_crt[0])) + else: + # Get intermediate cert signed by any root from bundle as anchor, and + # make that our root + logging.debug('No root certificate in input,' + ' obtain intermediate through known root issuers') + root_crt = find_intermediate_root(x509_objects, root_issuers) + logging.debug('intermediates seen in data signed by root \n\t%s', + '\n\t'.join([str(x.get_issuer()) for x in root_crt])) + + if root_crt and len(root_crt) == 1: + logging.debug('Found subject=%s,issuer=%s', + root_crt[0].get_subject(), root_crt[0].get_issuer()) + root_crt = x509_objects.pop(x509_objects.index(root_crt[0])) + else: + raise Exception('No intermediate found') + + # Insert our anchor. + bundle.insert(0, root_crt) + + # now work our way up by going through the list, + # inserting the certificate where the issuer matches the current topmost + # subject until we are empty + while x509_objects: + sibling = [x for x in x509_objects + if x.get_issuer() == bundle[0].get_subject()] + if sibling and len(sibling) == 1: + # insert sibling at beginning of list + bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) + else: + # Lets complain + raise Exception('Non matching certificates in input:' + ' No sibling found for %s' + % bundle[0].get_subject()) + return bundle + + +def main(): + ''' + main program start and argument parsing + ''' + + mozrootbundle_location = [fname for fname in MOZROOTBUNDLE_LOCATIONS + if os.path.exists(fname)].pop(0) + + parser = ArgumentParser(description='Reorder X509/RSA data for' + ' hosting use') + + loggrp = parser.add_mutually_exclusive_group() + loggrp.add_argument('-v', '--verbose', + action='store_true', + help='Show verbose logging') + loggrp.add_argument('-d', '--debug', + action='store_true', + help='Show debug logging') + loggrp.add_argument('-q', '--quiet', + action='store_true', + help='Show only error logging') + + outputgrp = parser.add_mutually_exclusive_group() + + outputgrp.add_argument('--just-certificate', dest='print_cert', + action='store_true', help='Just print certificate') + outputgrp.add_argument('--no-certificate', dest='print_cert', + action='store_false', + help='Omit certificate from output') + outputgrp.set_defaults(print_cert=True) + + outputgrp.add_argument('--just-chain', dest='print_chain', + action='store_true', help='Just print chain') + outputgrp.add_argument('--no-chain', dest='print_chain', + action='store_false', help='Omit chain from output') + outputgrp.set_defaults(print_chain=True) + + outputgrp.add_argument('--key', dest='print_key', + action='store_true', default=True, + help='Just print key') + outputgrp.add_argument('--no-key', dest='print_key', + action='store_false', help='Omit key from output') + outputgrp.set_defaults(print_key=True) + + parser.add_argument('x509files', metavar='x509 file', nargs='*', + help='x509 fullchain (+ rsa privkey)' + ' bundles to be checked') + + args = parser.parse_args() + + root_issuers = None + + if args.verbose: + logging.basicConfig(level=logging.INFO) + elif args.debug: + logging.basicConfig(level=logging.DEBUG) + elif args.quiet: + logging.basicConfig(level=logging.ERROR) + else: + logging.basicConfig(level=logging.WARNING) + + with open(mozrootbundle_location, 'r') as fname_fh: + logging.info('Using %s for root ca bundle', mozrootbundle_location) + data = fname_fh.read() + matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' + '.*?' + '-----END CERTIFICATE-----)', + data, re.DOTALL) + root_certs = [crypto.load_certificate(crypto.FILETYPE_PEM, + match.group(1)) + for match in matches] + logging.debug('Loaded root certificates from bundle') + for root_cert in root_certs: + try: + logging.debug('subject=%s\n\tissuer%s\n\texpired=%s\n\tmodulus=%s', + root_cert.get_subject(), + root_cert.get_subject(), + root_cert.has_expired(), + get_pub_modulus(root_cert)) + except Exception: + continue + root_issuers = [str(root_cert.get_subject()) + for root_cert in root_certs] + + for fname, data in list(load_data(args.x509files).items()): + logging.debug('Processing %s', fname) + x509_objects_components = None + x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' + '.*?' + '-----END CERTIFICATE-----)', + data, re.DOTALL) + + rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----' + '.*?' + '-----END RSA PRIVATE KEY-----)', + data, re.DOTALL) + + pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----' + '.*?' + '-----END PRIVATE KEY-----)', + data, re.DOTALL) + + x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM, + x509match.group(1)) + for x509match in x509matches] + + rsa_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM, + rsamatch.group(1)) + for rsamatch in rsamatches] + + pk_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM, + pkmatch.group(1)) + for pkmatch in pkmatches] + + x509_objects = order_x509(x509_objects, root_issuers) + x509_objects_components = x509_objects[0].get_subject().\ + get_components() + + if len(rsa_objects) > 1: + raise Exception('More than one RSA private key found in input.' + ' Aborting') + elif rsa_objects: + if not match_cert_privkey(x509_objects[0], rsa_objects[0]): + raise Exception('Provided certificate' + ' and RSA private key do not match') + else: + logging.info('OK: Modulus of provided certificate' + ' and RSA private key match') + elif len(pk_objects) > 1: + raise Exception('More than one RSA private key found in input.' + ' Aborting') + elif pk_objects: + if not match_cert_privkey(x509_objects[0], pk_objects[0]): + raise Exception('Provided certificate' + ' and private key do not match') + else: + logging.info('OK: Modulus of provided certificate' + ' and private key match') + + logging.debug("Print certificates in order") + # XXX Need to do b'CN' to have this python3 compatible + logging.info('Writing bundle for Subject: %s', [x[1] + for x in x509_objects_components + if x[0] == b'CN'][0]) + + for x509_object in [x for x in x509_objects + if x.get_subject() != x.get_issuer()]: + logging.info('Subject: %s', x509_object.get_subject()) + logging.info('Issuer: %s', x509_object.get_issuer()) + print(crypto.dump_certificate(crypto.FILETYPE_PEM, + x509_object).decode('ascii'), + end='') + + if rsa_objects: + logging.info('Print RSA private keys') + for rsa_object in rsa_objects: + print(rsa_object.to_cryptography_key().private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption()).decode( + 'ascii'), + end='') + elif pk_objects: + logging.info('Print private keys') + for pk_object in pk_objects: + print(crypto.dump_privatekey(crypto.FILETYPE_PEM, + pk_object).decode('ascii'), + end='') + + +if __name__ == "__main__": + exit(main())