#! /usr/bin/env python3 ''' Sort X509/private key material ''' import fileinput import logging import re import sys from argparse import ArgumentParser from datetime import datetime import certifi.core from cryptography.hazmat.primitives import serialization from OpenSSL import crypto VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\ r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\ r'[a-zA-Z0-9]))*$' CERTIFICATE_RE = '(-{5}BEGIN CERTIFICATE-{5}.*?-{5}END CERTIFICATE-{5})' RSAPRIVKEY_RE = '(-{5}BEGIN RSA PRIVATE KEY-{5}.*?-{5}END RSA PRIVATE KEY-{5})' PRIVKEY_RE = '(-{5}BEGIN PRIVATE KEY-{5}.*?-{5}END PRIVATE KEY-{5})' CERTINFO_TEMPLATE = ''' subject= /{subject} issuer= /{issuer} notBefore={notbefore!s} notAfter={notafter} SHA1 Fingerprint={sha1fingerprint} '''.strip() ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8')) OPENSSLTIME_FMT = '%b %e %T %Y GMT' class PkDecorator: # pylint: disable=too-few-public-methods ''' Provide some information on the private key object ''' p_key = None def __init__(self, p_key): self.p_key = p_key def __str__(self): return "Private key" class PkDecoratorEC(PkDecorator): # pylint: disable=too-few-public-methods ''' PkDecorator that knowns about elliptic curves ''' def __str__(self): pk_crypto = self.p_key.to_cryptography_key() return (f'EC Private key curve {pk_crypto.curve.name} ' f'({pk_crypto.key_size} bits)') class PkDecoratorRSA(PkDecorator): # pylint: disable=too-few-public-methods ''' PkDecorator that knowns about RSA ''' def __str__(self): pk_crypto = self.p_key.to_cryptography_key() return (f'RSA Private key {pk_crypto.key_size} bits ' f'(exponent {pk_crypto.private_numbers().public_numbers.e})') class PkDecoratorDSA(PkDecorator): # pylint: disable=too-few-public-methods ''' PkDecorator that knowns about DSA ''' def __str__(self): pk_crypto = self.p_key.to_cryptography_key() return f'DSA Private key {pk_crypto.key_size} bits' class PkDecoratorDH(PkDecorator): # pylint: disable=too-few-public-methods ''' PkDecorator that knowns about DH ''' def __str__(self): pk_crypto = self.p_key.to_cryptography_key() return f'DH Private key {pk_crypto.key_size} bits' class PkDecoratorFactory: # pylint: disable=too-few-public-methods ''' Provide some information on the private key object ''' @staticmethod def create(p_key): ''' Create the appropriate decorater object ''' decorators = { crypto.TYPE_DH: PkDecoratorDH, crypto.TYPE_EC: PkDecoratorEC, crypto.TYPE_DSA: PkDecoratorDSA, crypto.TYPE_RSA: PkDecoratorRSA, } if p_key.type() in decorators: return decorators[p_key.type()](p_key) raise UnsupportedPkEncryption( 'Unsupported private key type {p_key.type()}') class UnsupportedPkEncryption(Exception): ''' When we encounter unsupported encryption algorithms ''' class CertificateComponentException(Exception): ''' When something is not right with the whole cert+intermediates+private key bundle ''' def load_data(filenames): ''' Read all data till EOF. Might come from full chain (+ privkey) bundles Might come from stdin (e.g. as vim filter action) Might not be a good idea if it is being used to read thousands of files ''' datas = {} current_file = None # iterate over all inputs for line in fileinput.input(filenames): # switch files if the filename changes, automatically switches slots if current_file is not fileinput.filename(): current_file = fileinput.filename() logging.debug('Reading from %s', current_file) if current_file in datas: datas[current_file] += line.lstrip() else: datas[current_file] = line.lstrip() return datas def get_cert_pubkey(cert): ''' Get the pubkey of a certificate ''' cert_crypto = cert.to_cryptography() pubkey = cert_crypto.public_key() pub_bytes = pubkey.public_bytes( serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo) return pub_bytes def get_priv_pubkey(priv): ''' Get the pubkey of a private key ''' priv_crypto = priv.to_cryptography_key() pubkey = priv_crypto.public_key() pub_bytes = pubkey.public_bytes( serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo) return pub_bytes def match_cert_privkey(cert, priv): ''' Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long and reworked ''' # noqa: E501 return get_cert_pubkey(cert) == get_priv_pubkey(priv) def find_root(x509_objects, root_issuers): ''' Find a suitable anchor by finding the intermediate that was signed by root ''' root_cert = root_issuers[str(x509_objects[-1].get_issuer())] logging.debug('Retrieved root certificate %s', root_cert.get_subject()) return root_cert def find_intermediate_root(x509_objects, root_issuers): ''' Find a suitable anchor by finding the intermediate that was signed by root ''' # Some intermediates have the *same* subject as some root certificates. # blacklist them # XXX better use pubkey/hash for that, but can't find the appropriate # interface to that at the moment excluded_issuers = [str(x.get_subject()) for x in x509_objects if x.get_subject() != x.get_issuer()] logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers)) logging.debug('Excluding issuers because of potential intermediates\n\t%s', '\n\t'.join(excluded_issuers)) logging.debug('issuers seen in data\n\t%s', '\n\t'.join([str(x.get_issuer()) for x in x509_objects])) return [x for x in x509_objects if str(x.get_issuer()) in root_issuers and str(x.get_issuer()) not in excluded_issuers] def order_x509(x509_objects, root_issuers): ''' order x509 objects to ensure proper chain order ''' bundle = [] root_crt = [x for x in x509_objects if x.get_subject() == x.get_issuer()] if root_crt: root_crt = x509_objects.pop(x509_objects.index(root_crt[0])) logging.warning('Found self signed (root) certificate %s in input', str(root_crt.get_subject())) # Double check if our self signed root certificate is not also present # as an intermediate: # - It is probably invalid input, and doesn't make sense # - It confuses the ordering process if next((x for x in x509_objects if x.get_subject() != x.get_issuer() and x.get_subject() == root_crt.get_subject()), None): raise CertificateComponentException( f'Both present as intermediate ' f'and root certificate: {str(root_crt.get_subject())}') else: # Get intermediate cert signed by any root from bundle as anchor, and # make that our root logging.debug('No root certificate in input,' ' obtain intermediate through known root issuers') root_crt = find_intermediate_root(x509_objects, root_issuers) logging.debug('intermediates seen in data signed by root \n\t%s', '\n\t'.join([str(x.get_issuer()) for x in root_crt])) if root_crt and len(root_crt) == 1: logging.debug('Found subject=%s,issuer=%s', root_crt[0].get_subject(), root_crt[0].get_issuer()) root_crt = x509_objects.pop(x509_objects.index(root_crt[0])) else: raise CertificateComponentException('No intermediate found') # Insert our anchor. bundle.insert(0, root_crt) # now work our way up by going through the list, # inserting the certificate where the issuer matches the current topmost # subject until we are empty while x509_objects: sibling = [x for x in x509_objects if x.get_issuer() == bundle[0].get_subject()] if sibling and len(sibling) == 1: # insert sibling at beginning of list bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) else: # Lets complain raise CertificateComponentException( f'Non matching certificates in ' f'input: No sibling found for {bundle[0].get_subject()}') return bundle def load_root_issuers(): ''' Return the list of CA roots ''' root_issuers = None mozrootbundle_location = certifi.core.where() with open(mozrootbundle_location, 'r', encoding='utf-8') as fname_fh: logging.info('Using %s for root ca bundle', mozrootbundle_location) data = fname_fh.read() matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' '.*?' '-----END CERTIFICATE-----)', data, re.DOTALL) root_certs = [crypto.load_certificate(crypto.FILETYPE_PEM, match.group(1)) for match in matches] logging.debug('Loaded root certificates from bundle') for root_cert in root_certs: try: logging.debug('subject=%s\n\tissuer%s\n\t' 'expired=%s\n\tpubkey=%s', root_cert.get_subject(), root_cert.get_issuer(), root_cert.has_expired(), get_cert_pubkey(root_cert)) except UnsupportedPkEncryption as unsupported_crypto_exception: logging.debug(unsupported_crypto_exception) continue root_issuers = {str(root_cert.get_subject()): root_cert for root_cert in root_certs} return root_issuers def handle_args(): ''' Handle tool arguments ''' parser = ArgumentParser(description='Reorder X509/Private key data for' ' hosting use') loggrp = parser.add_mutually_exclusive_group() loggrp.add_argument('-v', '--verbose', action='store_true', help='Show verbose logging') loggrp.add_argument('-d', '--debug', action='store_true', help='Show debug logging') loggrp.add_argument('-q', '--quiet', action='store_true', help='Show only error logging') outputgrp = parser.add_mutually_exclusive_group() outputgrp.add_argument('-c', '--check', action='store_true', help='Only check, output nothing') outputgrp.add_argument('--just-certificate', dest='print_cert', action='store_true', help='Just print certificate') outputgrp.add_argument('--no-certificate', dest='print_cert', action='store_false', help='Omit certificate from output') outputgrp.set_defaults(print_cert=True) outputgrp.add_argument('--just-chain', dest='print_chain', action='store_true', help='Just print chain') outputgrp.add_argument('--no-chain', dest='print_chain', action='store_false', help='Omit chain from output') outputgrp.add_argument('--include-root', dest='include_root', action='store_true', help='Also include the root certificate') outputgrp.set_defaults(print_chain=True) outputgrp.add_argument('--key', dest='print_key', action='store_true', default=True, help='Just print key') outputgrp.add_argument('--no-key', dest='print_key', action='store_false', help='Omit key from output') outputgrp.set_defaults(print_key=True) parser.add_argument('-i', '--informational', action='store_true', help='Show some information about the PEM blocks') parser.add_argument('x509files', metavar='x509 file', nargs='*', help='x509 fullchain (+ rsa privkey)' ' bundles to be checked') return parser.parse_args() def setup_logging(args): ''' Set up logging ''' if args.verbose or args.check: logging.basicConfig(level=logging.INFO) elif args.debug: logging.basicConfig(level=logging.DEBUG) elif args.quiet: logging.basicConfig(level=logging.ERROR) else: logging.basicConfig(level=logging.WARNING) def main(): ''' main program start and argument parsing ''' root_issuers = None args = handle_args() setup_logging(args) root_issuers = load_root_issuers() for fname, data in list(load_data(args.x509files).items()): logging.debug('Processing %s', fname) x509_objects_components = None x509matches = re.finditer(CERTIFICATE_RE, data, re.DOTALL) rsamatches = re.finditer(RSAPRIVKEY_RE, data, re.DOTALL) pkmatches = re.finditer(PRIVKEY_RE, data, re.DOTALL) x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM, x509match.group(1)) for x509match in x509matches] rsa_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM, rsamatch.group(1)) for rsamatch in rsamatches] pk_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM, pkmatch.group(1)) for pkmatch in pkmatches] x509_objects = order_x509(x509_objects, root_issuers) x509_objects_components = x509_objects[0].get_subject().\ get_components() if len(rsa_objects) > 1: raise CertificateComponentException('More than one RSA private key' ' found in input.' ' Aborting') if len(pk_objects) > 1: raise CertificateComponentException('More than one private key' ' found in input.' ' Aborting') if rsa_objects: if not match_cert_privkey(x509_objects[0], rsa_objects[0]): raise CertificateComponentException('Provided certificate' ' and RSA private key' ' do not match') logging.info('OK: Public key of provided certificate' ' and RSA private key match') if pk_objects: if not match_cert_privkey(x509_objects[0], pk_objects[0]): raise CertificateComponentException('Provided certificate' ' and private key' ' do not match') logging.info('OK: Public key of provided certificate' ' and private key match') if args.include_root: logging.debug('root certificate in output requested') x509_objects.append(find_root(x509_objects, root_issuers)) logging.debug("Print certificates in order") logging.info('Writing bundle for Subject: %s', [x[1].decode('utf-8') for x in x509_objects_components if x[0].decode('utf-8') == 'CN'][0]) for x509_object in [x for x in x509_objects if x.get_subject() != x.get_issuer() or args.include_root]: # Stringify subject like openssl x509 -subject x509_subject = '/'.join([ f'{component[0].decode()}={component[1].decode()}' for component in x509_object.get_subject().get_components()]) # Stringify issuer like openssl x509 -issuer x509_issuer = '/'.join([ f'{component[0].decode()}={component[1].decode()}' for component in x509_object.get_issuer().get_components()]) x509_not_after = \ datetime.strptime(str(x509_object.get_notAfter()), ASN1TIME_FMT) x509_not_before = \ datetime.strptime(str(x509_object.get_notBefore()), ASN1TIME_FMT) logging.info('Subject: %s', x509_subject) logging.info('Issuer: %s', x509_issuer) if args.informational: print(CERTINFO_TEMPLATE.format( subject=x509_subject, issuer=x509_issuer, notbefore=x509_not_before.strftime(OPENSSLTIME_FMT), notafter=x509_not_after.strftime(OPENSSLTIME_FMT), sha1fingerprint=x509_object.digest('sha1').decode())) if not args.check: print(crypto.dump_certificate(crypto.FILETYPE_PEM, x509_object).decode('ascii'), end='') if rsa_objects: if not args.check: logging.info('Print RSA private keys') for rsa_object in rsa_objects: if args.informational: print(PkDecoratorFactory.create(rsa_object)) print(rsa_object.to_cryptography_key().private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=str( serialization.NoEncryption()).decode('ascii')), end='') elif pk_objects: if not args.check: logging.info('Print private keys') for pk_object in pk_objects: if args.informational: print(PkDecoratorFactory.create(pk_object)) print(crypto.dump_privatekey(crypto.FILETYPE_PEM, pk_object).decode('ascii'), end='') if __name__ == "__main__": try: sys.exit(main()) except CertificateComponentException as certcomponent_error: logging.error(certcomponent_error) sys.exit(1)