#! /usr/bin/env python ''' Sort X509/private key material ''' from __future__ import print_function import logging import re import fileinput from argparse import ArgumentParser from datetime import datetime from OpenSSL import crypto from cryptography.hazmat.primitives import serialization import certifi.core VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\ r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\ r'[a-zA-Z0-9]))*$' CERTINFO_TEMPLATE = ''' subject= /{subject} issuer= /{issuer} notBefore={notbefore!s} notAfter={notafter} SHA1 Fingerprint={sha1fingerprint} '''.strip() ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8')) OPENSSLTIME_FMT = '%b %e %T %Y GMT' class PkDecorator(object): ''' Provide some information on the private key object ''' pk = None def __init__(self, pk): self.pk = pk def __str__(self): return "Private key" class PkDecoratorEC(PkDecorator): def __str__(self): pk_crypto = self.pk.to_cryptography_key() return "EC Private key curve %s (%d bits)" % ( pk_crypto.curve.name, pk_crypto.key_size) class PkDecoratorRSA(PkDecorator): def __str__(self): pk_crypto = self.pk.to_cryptography_key() return "RSA Private key %d bits (exponent %d)" % ( pk_crypto.key_size, pk_crypto.private_numbers().public_numbers.e) class PkDecoratorDSA(PkDecorator): def __str__(self): pk_crypto = self.pk.to_cryptography_key() return "DSA Private key %d bits" % pk_crypto.key_size class PkDecoratorDH(PkDecorator): def __str__(self): pk_crypto = self.pk.to_cryptography_key() return "DH Private key %d bits" % pk_crypto.key_size class PkDecoratorFactory(object): ''' Provide some information on the private key object ''' def create(pk): ''' Create the appropriate decorater object ''' decorators = { crypto.TYPE_DH: PkDecoratorDH, crypto.TYPE_EC: PkDecoratorEC, crypto.TYPE_DSA: PkDecoratorDSA, crypto.TYPE_RSA: PkDecoratorRSA, } if pk.type() in decorators: return decorators[pk.type()](pk) else: raise UnsupportedPkEncryption("Unsupported private key type %d" % pk.type()) class UnsupportedPkEncryption(Exception): ''' When we encounter unsupported encryption algorithms ''' pass class CertificateComponentException(Exception): ''' When something is not right with the whole cert+intermediates+private key bundle ''' pass def load_data(filenames): ''' Read all data till EOF. Might come from full chain (+ privkey) bundles Might come from stdin (e.g. as vim filter action) Might not be a good idea if it is being used to read thousands of files ''' datas = {} current_file = None # iterate over all inputs for line in fileinput.input(filenames): # switch files if the filename changes, automatically switches slots if current_file is not fileinput.filename(): current_file = fileinput.filename() logging.debug('Reading from %s', current_file) if current_file in datas: datas[current_file] += line.lstrip() else: datas[current_file] = line.lstrip() return datas def get_cert_pubkey(cert): ''' Get the pubkey of a certificate ''' cert_crypto = cert.to_cryptography() pubkey = cert_crypto.public_key() pub_bytes = pubkey.public_bytes( serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo) return pub_bytes def get_priv_pubkey(priv): ''' Get the pubkey of a private key ''' priv_crypto = priv.to_cryptography_key() pubkey = priv_crypto.public_key() pub_bytes = pubkey.public_bytes( serialization.Encoding.PEM, serialization.PublicFormat.SubjectPublicKeyInfo) return pub_bytes def match_cert_privkey(cert, priv): ''' Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long and reworked ''' return get_cert_pubkey(cert) == get_priv_pubkey(priv) def find_root(x509_objects, root_issuers): ''' Find a suitable anchor by finding the intermediate that was signed by root ''' root_cert = root_issuers[str(x509_objects[-1].get_issuer())] logging.debug('Retrieved root certificate %s', root_cert.get_subject()) return root_cert def find_intermediate_root(x509_objects, root_issuers): ''' Find a suitable anchor by finding the intermediate that was signed by root ''' # Some intermediates have the *same* subject as some root certificates. # blacklist them if their issuer and subject name is present in the root # bundle excluded_issuers = [str(x.get_subject()) for x in x509_objects if x.get_subject() != x.get_issuer() and str(x.get_issuer()) in root_issuers and str(x.get_subject()) in root_issuers] logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers)) logging.debug('Excluding issuers because of potential intermediates\n\t%s', '\n\t'.join(excluded_issuers)) logging.debug('Certificates seen in data\n\t%s', '\n\t'.join([f'Subject: {x.get_subject()},' f' Issuer: {x.get_issuer()}' for x in x509_objects])) return [x for x in x509_objects if str(x.get_issuer()) in root_issuers and str(x.get_issuer()) not in excluded_issuers] def order_x509(x509_objects, root_issuers): ''' order x509 objects to ensure proper chain order ''' bundle = [] root_crt = [x for x in x509_objects if x.get_subject() == x.get_issuer()] if root_crt: root_crt = x509_objects.pop(x509_objects.index(root_crt[0])) logging.warning('Found self signed (root) certificate %s in input', str(root_crt.get_subject())) # Double check if our self signed root certificate is not also present # as an intermediate: # - It is probably invalid input, and doesn't make sense # - It confuses the ordering process if next((x for x in x509_objects if x.get_subject() != x.get_issuer() and x.get_subject() == root_crt.get_subject()), None): raise CertificateComponentException('Both present as intermediate ' 'and root certificate: %s' % str(root_crt.get_subject())) else: # Get intermediate cert signed by any root from bundle as anchor, and # make that our root logging.debug('No root certificate in input,' ' obtain intermediate through known root issuers') root_crt = find_intermediate_root(x509_objects, root_issuers) logging.debug('intermediates seen in data signed by root \n\t%s', '\n\t'.join([str(x.get_issuer()) for x in root_crt])) if root_crt and len(root_crt) == 1: logging.debug('Found subject=%s,issuer=%s', root_crt[0].get_subject(), root_crt[0].get_issuer()) root_crt = x509_objects.pop(x509_objects.index(root_crt[0])) else: raise CertificateComponentException('No intermediate found') # Insert our anchor. bundle.insert(0, root_crt) # now work our way up by going through the list, # inserting the certificate where the issuer matches the current topmost # subject until we are empty while x509_objects: sibling = [x for x in x509_objects if x.get_issuer() == bundle[0].get_subject()] if sibling and len(sibling) == 1: # insert sibling at beginning of list bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) else: # Lets complain logging.error('Certificates remaining data\n\t%s', '\n\t'.join([f'Subject: {x.get_subject()},' f' Issuer: {x.get_issuer()}' for x in x509_objects])) logging.error('Certificates placed in bundle \n\t%s', '\n\t'.join([f'Subject: {x.get_subject()},' f' Issuer: {x.get_issuer()}' for x in bundle])) raise CertificateComponentException('Non matching certificates in ' 'input:' ' No sibling found for %s' % bundle[0].get_subject()) return bundle def load_root_issuers(): ''' Return the list of CA roots ''' root_issuers = None mozrootbundle_location = certifi.core.where() with open(mozrootbundle_location, 'r') as fname_fh: logging.info('Using %s for root ca bundle', mozrootbundle_location) data = fname_fh.read() matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' '.*?' '-----END CERTIFICATE-----)', data, re.DOTALL) root_certs = [crypto.load_certificate(crypto.FILETYPE_PEM, match.group(1)) for match in matches] logging.debug('Loaded root certificates from bundle') for root_cert in root_certs: try: logging.debug('subject=%s\n\tissuer%s\n\t' 'expired=%s\n\tpubkey=%s', root_cert.get_subject(), root_cert.get_issuer(), root_cert.has_expired(), get_cert_pubkey(root_cert)) except UnsupportedPkEncryption as unsupported_crypto_exception: logging.debug(unsupported_crypto_exception) continue root_issuers = {str(root_cert.get_subject()): root_cert for root_cert in root_certs} return root_issuers def handle_args(): ''' Handle tool arguments ''' parser = ArgumentParser(description='Reorder X509/Private key data for' ' hosting use') loggrp = parser.add_mutually_exclusive_group() loggrp.add_argument('-v', '--verbose', action='store_true', help='Show verbose logging') loggrp.add_argument('-d', '--debug', action='store_true', help='Show debug logging') loggrp.add_argument('-q', '--quiet', action='store_true', help='Show only error logging') outputgrp = parser.add_mutually_exclusive_group() outputgrp.add_argument('-c', '--check', action='store_true', help='Only check, output nothing') outputgrp.add_argument('--just-certificate', dest='print_cert', action='store_true', help='Just print certificate') outputgrp.add_argument('--no-certificate', dest='print_cert', action='store_false', help='Omit certificate from output') outputgrp.set_defaults(print_cert=True) outputgrp.add_argument('--just-chain', dest='print_chain', action='store_true', help='Just print chain') outputgrp.add_argument('--no-chain', dest='print_chain', action='store_false', help='Omit chain from output') outputgrp.add_argument('--include-root', dest='include_root', action='store_true', help='Also include the root certificate') outputgrp.set_defaults(print_chain=True) outputgrp.add_argument('--key', dest='print_key', action='store_true', default=True, help='Just print key') outputgrp.add_argument('--no-key', dest='print_key', action='store_false', help='Omit key from output') outputgrp.set_defaults(print_key=True) parser.add_argument('-i', '--informational', action='store_true', help='Show some information about the PEM blocks') parser.add_argument('x509files', metavar='x509 file', nargs='*', help='x509 fullchain (+ rsa privkey)' ' bundles to be checked') return parser.parse_args() def main(): ''' main program start and argument parsing ''' root_issuers = None args = handle_args() if args.verbose or args.check: logging.basicConfig(level=logging.INFO) elif args.debug: logging.basicConfig(level=logging.DEBUG) elif args.quiet: logging.basicConfig(level=logging.ERROR) else: logging.basicConfig(level=logging.WARNING) root_issuers = load_root_issuers() for fname, data in list(load_data(args.x509files).items()): logging.debug('Processing %s', fname) x509_objects_components = None x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' '.*?' '-----END CERTIFICATE-----)', data, re.DOTALL) rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----' '.*?' '-----END RSA PRIVATE KEY-----)', data, re.DOTALL) pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----' '.*?' '-----END PRIVATE KEY-----)', data, re.DOTALL) x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM, x509match.group(1)) for x509match in x509matches] rsa_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM, rsamatch.group(1)) for rsamatch in rsamatches] pk_objects = [crypto.load_privatekey(crypto.FILETYPE_PEM, pkmatch.group(1)) for pkmatch in pkmatches] x509_objects = order_x509(x509_objects, root_issuers) x509_objects_components = x509_objects[0].get_subject().\ get_components() if len(rsa_objects) > 1: raise CertificateComponentException('More than one RSA private key' ' found in input.' ' Aborting') elif rsa_objects: if not match_cert_privkey(x509_objects[0], rsa_objects[0]): raise CertificateComponentException('Provided certificate' ' and RSA private key' ' do not match') else: logging.info('OK: Public key of provided certificate' ' and RSA private key match') elif len(pk_objects) > 1: raise CertificateComponentException('More than one private key' ' found in input.' ' Aborting') elif pk_objects: if not match_cert_privkey(x509_objects[0], pk_objects[0]): raise CertificateComponentException('Provided certificate' ' and private key' ' do not match') else: logging.info('OK: Public key of provided certificate' ' and private key match') if args.include_root: logging.debug('root certificate in output requested') x509_objects.append(find_root(x509_objects, root_issuers)) logging.debug("Print certificates in order") # Need to do b'CN' to have this python3 compatible logging.info('Writing bundle for Subject: %s', [x[1].decode('utf-8') for x in x509_objects_components if x[0] == b'CN'][0]) for x509_object in [x for x in x509_objects if x.get_subject() != x.get_issuer() or args.include_root]: # Stringify subject like openssl x509 -subject x509_subject = \ '/'.join(['{0}={1}'.format(component[0].decode(), component[1].decode()) for component in x509_object.get_subject().get_components()]) # Stringify issuer like openssl x509 -issuer x509_issuer = \ '/'.join(['{0}={1}'.format(component[0].decode(), component[1].decode()) for component in x509_object.get_issuer().get_components()]) x509_not_after = \ datetime.strptime(str(x509_object.get_notAfter()), ASN1TIME_FMT) x509_not_before = \ datetime.strptime(str(x509_object.get_notBefore()), ASN1TIME_FMT) logging.info('Subject: %s', x509_subject) logging.info('Issuer: %s', x509_issuer) if args.informational: print(CERTINFO_TEMPLATE.format( subject=x509_subject, issuer=x509_issuer, notbefore=x509_not_before.strftime(OPENSSLTIME_FMT), notafter=x509_not_after.strftime(OPENSSLTIME_FMT), sha1fingerprint=x509_object.digest('sha1').decode())) if not args.check: print(crypto.dump_certificate(crypto.FILETYPE_PEM, x509_object).decode('ascii'), end='') if rsa_objects: if not args.check: logging.info('Print RSA private keys') for rsa_object in rsa_objects: if args.informational: print(PkDecoratorFactory.create(rsa_object)) print(rsa_object.to_cryptography_key().private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()).decode( 'ascii'), end='') elif pk_objects: if not args.check: logging.info('Print private keys') for pk_object in pk_objects: if args.informational: print(PkDecoratorFactory.create(pk_object)) print(crypto.dump_privatekey(crypto.FILETYPE_PEM, pk_object).decode('ascii'), end='') if __name__ == "__main__": try: exit(main()) except CertificateComponentException as certcomponent_error: logging.error(certcomponent_error) exit(1)