diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..43d85be --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +certifi==2024.2.2 +cryptography==42.0.5 +PyOpenSSL==24.1.0 diff --git a/sort_certificate.py b/sort_certificate.py index db26b60..6fc2371 100755 --- a/sort_certificate.py +++ b/sort_certificate.py @@ -1,23 +1,27 @@ -#! /usr/bin/env python +#! /usr/bin/env python3 ''' Sort X509/private key material ''' -from __future__ import print_function - +import fileinput import logging import re -import fileinput +import sys from argparse import ArgumentParser from datetime import datetime -from OpenSSL import crypto -from cryptography.hazmat.primitives import serialization + import certifi.core +from cryptography.hazmat.primitives import serialization +from OpenSSL import crypto VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\ r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\ r'[a-zA-Z0-9]))*$' +CERTIFICATE_RE = '(-{5}BEGIN CERTIFICATE-{5}.*?-{5}END CERTIFICATE-{5})' +RSAPRIVKEY_RE = '(-{5}BEGIN RSA PRIVATE KEY-{5}.*?-{5}END RSA PRIVATE KEY-{5})' +PRIVKEY_RE = '(-{5}BEGIN PRIVATE KEY-{5}.*?-{5}END PRIVATE KEY-{5})' + CERTINFO_TEMPLATE = ''' subject= /{subject} issuer= /{issuer} @@ -30,56 +34,68 @@ ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8')) OPENSSLTIME_FMT = '%b %e %T %Y GMT' -class PkDecorator(object): +class PkDecorator: # pylint: disable=too-few-public-methods ''' Provide some information on the private key object ''' - pk = None + p_key = None - def __init__(self, pk): - self.pk = pk + def __init__(self, p_key): + self.p_key = p_key def __str__(self): return "Private key" -class PkDecoratorEC(PkDecorator): +class PkDecoratorEC(PkDecorator): # pylint: disable=too-few-public-methods + ''' + PkDecorator that knowns about elliptic curves + ''' def __str__(self): - pk_crypto = self.pk.to_cryptography_key() - return "EC Private key curve %s (%d bits)" % ( - pk_crypto.curve.name, pk_crypto.key_size) + pk_crypto = self.p_key.to_cryptography_key() + return (f'EC Private key curve {pk_crypto.curve.name} ' + f'({pk_crypto.key_size} bits)') -class PkDecoratorRSA(PkDecorator): +class PkDecoratorRSA(PkDecorator): # pylint: disable=too-few-public-methods + ''' + PkDecorator that knowns about RSA + ''' def __str__(self): - pk_crypto = self.pk.to_cryptography_key() - return "RSA Private key %d bits (exponent %d)" % ( - pk_crypto.key_size, - pk_crypto.private_numbers().public_numbers.e) + pk_crypto = self.p_key.to_cryptography_key() + return (f'RSA Private key {pk_crypto.key_size} bits ' + f'(exponent {pk_crypto.private_numbers().public_numbers.e})') -class PkDecoratorDSA(PkDecorator): +class PkDecoratorDSA(PkDecorator): # pylint: disable=too-few-public-methods + ''' + PkDecorator that knowns about DSA + ''' def __str__(self): - pk_crypto = self.pk.to_cryptography_key() - return "DSA Private key %d bits" % pk_crypto.key_size + pk_crypto = self.p_key.to_cryptography_key() + return f'DSA Private key {pk_crypto.key_size} bits' -class PkDecoratorDH(PkDecorator): +class PkDecoratorDH(PkDecorator): # pylint: disable=too-few-public-methods + ''' + PkDecorator that knowns about DH + ''' def __str__(self): - pk_crypto = self.pk.to_cryptography_key() - return "DH Private key %d bits" % pk_crypto.key_size + pk_crypto = self.p_key.to_cryptography_key() + return f'DH Private key {pk_crypto.key_size} bits' -class PkDecoratorFactory(object): +class PkDecoratorFactory: # pylint: disable=too-few-public-methods ''' Provide some information on the private key object ''' - def create(pk): + @staticmethod + def create(p_key): ''' Create the appropriate decorater object ''' @@ -89,18 +105,16 @@ class PkDecoratorFactory(object): crypto.TYPE_DSA: PkDecoratorDSA, crypto.TYPE_RSA: PkDecoratorRSA, } - if pk.type() in decorators: - return decorators[pk.type()](pk) - else: - raise UnsupportedPkEncryption("Unsupported private key type %d" - % pk.type()) + if p_key.type() in decorators: + return decorators[p_key.type()](p_key) + raise UnsupportedPkEncryption( + 'Unsupported private key type {p_key.type()}') class UnsupportedPkEncryption(Exception): ''' When we encounter unsupported encryption algorithms ''' - pass class CertificateComponentException(Exception): @@ -108,7 +122,6 @@ class CertificateComponentException(Exception): When something is not right with the whole cert+intermediates+private key bundle ''' - pass def load_data(filenames): @@ -167,7 +180,7 @@ def match_cert_privkey(cert, priv): ''' Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long and reworked - ''' + ''' # noqa: E501 return get_cert_pubkey(cert) == get_priv_pubkey(priv) @@ -220,9 +233,9 @@ def order_x509(x509_objects, root_issuers): if next((x for x in x509_objects if x.get_subject() != x.get_issuer() and x.get_subject() == root_crt.get_subject()), None): - raise CertificateComponentException('Both present as intermediate ' - 'and root certificate: %s' % - str(root_crt.get_subject())) + raise CertificateComponentException( + f'Both present as intermediate ' + f'and root certificate: {str(root_crt.get_subject())}') else: # Get intermediate cert signed by any root from bundle as anchor, and # make that our root @@ -253,10 +266,9 @@ def order_x509(x509_objects, root_issuers): bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) else: # Lets complain - raise CertificateComponentException('Non matching certificates in ' - 'input:' - ' No sibling found for %s' - % bundle[0].get_subject()) + raise CertificateComponentException( + f'Non matching certificates in ' + f'input: No sibling found for {bundle[0].get_subject()}') return bundle @@ -268,7 +280,7 @@ def load_root_issuers(): mozrootbundle_location = certifi.core.where() - with open(mozrootbundle_location, 'r') as fname_fh: + with open(mozrootbundle_location, 'r', encoding='utf-8') as fname_fh: logging.info('Using %s for root ca bundle', mozrootbundle_location) data = fname_fh.read() matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' @@ -368,15 +380,10 @@ def handle_args(): return parser.parse_args() -def main(): +def setup_logging(args): ''' - main program start and argument parsing + Set up logging ''' - - root_issuers = None - - args = handle_args() - if args.verbose or args.check: logging.basicConfig(level=logging.INFO) elif args.debug: @@ -386,24 +393,30 @@ def main(): else: logging.basicConfig(level=logging.WARNING) + +def main(): + ''' + main program start and argument parsing + ''' + + root_issuers = None + + args = handle_args() + + setup_logging(args) + root_issuers = load_root_issuers() for fname, data in list(load_data(args.x509files).items()): logging.debug('Processing %s', fname) x509_objects_components = None - x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' - '.*?' - '-----END CERTIFICATE-----)', + x509matches = re.finditer(CERTIFICATE_RE, data, re.DOTALL) - rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----' - '.*?' - '-----END RSA PRIVATE KEY-----)', + rsamatches = re.finditer(RSAPRIVKEY_RE, data, re.DOTALL) - pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----' - '.*?' - '-----END PRIVATE KEY-----)', + pkmatches = re.finditer(PRIVKEY_RE, data, re.DOTALL) x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM, @@ -426,55 +439,51 @@ def main(): raise CertificateComponentException('More than one RSA private key' ' found in input.' ' Aborting') - elif rsa_objects: + if len(pk_objects) > 1: + raise CertificateComponentException('More than one private key' + ' found in input.' + ' Aborting') + if rsa_objects: if not match_cert_privkey(x509_objects[0], rsa_objects[0]): raise CertificateComponentException('Provided certificate' ' and RSA private key' ' do not match') - else: - logging.info('OK: Public key of provided certificate' - ' and RSA private key match') - elif len(pk_objects) > 1: - raise CertificateComponentException('More than one private key' - ' found in input.' - ' Aborting') - elif pk_objects: + logging.info('OK: Public key of provided certificate' + ' and RSA private key match') + + if pk_objects: if not match_cert_privkey(x509_objects[0], pk_objects[0]): raise CertificateComponentException('Provided certificate' ' and private key' ' do not match') - else: - logging.info('OK: Public key of provided certificate' - ' and private key match') + logging.info('OK: Public key of provided certificate' + ' and private key match') if args.include_root: logging.debug('root certificate in output requested') x509_objects.append(find_root(x509_objects, root_issuers)) logging.debug("Print certificates in order") - # Need to do b'CN' to have this python3 compatible logging.info('Writing bundle for Subject: %s', [x[1].decode('utf-8') for x in x509_objects_components - if x[0] == b'CN'][0]) + if x[0].decode('utf-8') == 'CN'][0]) for x509_object in [x for x in x509_objects if x.get_subject() != x.get_issuer() or args.include_root]: # Stringify subject like openssl x509 -subject - x509_subject = \ - '/'.join(['{0}={1}'.format(component[0].decode(), - component[1].decode()) - for component in - x509_object.get_subject().get_components()]) + x509_subject = '/'.join([ + f'{component[0].decode()}={component[1].decode()}' + for component + in x509_object.get_subject().get_components()]) # Stringify issuer like openssl x509 -issuer - x509_issuer = \ - '/'.join(['{0}={1}'.format(component[0].decode(), - component[1].decode()) - for component in - x509_object.get_issuer().get_components()]) + x509_issuer = '/'.join([ + f'{component[0].decode()}={component[1].decode()}' + for component + in x509_object.get_issuer().get_components()]) x509_not_after = \ datetime.strptime(str(x509_object.get_notAfter()), @@ -509,8 +518,8 @@ def main(): print(rsa_object.to_cryptography_key().private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption()).decode( - 'ascii'), + encryption_algorithm=str( + serialization.NoEncryption()).decode('ascii')), end='') elif pk_objects: if not args.check: @@ -525,7 +534,7 @@ def main(): if __name__ == "__main__": try: - exit(main()) + sys.exit(main()) except CertificateComponentException as certcomponent_error: logging.error(certcomponent_error) - exit(1) + sys.exit(1)