3 Commits
main ... 2022.1

Author SHA1 Message Date
c59ed29ec5 Merge branch 'release/2022.1' 2022-07-27 15:12:51 +02:00
31fb992d97 Merge branch 'feature/01_python39_regression' into 'develop'
Feature/01 python39 regression

See merge request ruben/sort_certificate!1
2022-07-27 13:10:08 +00:00
b89105fa67 Feature/01 python39 regression 2022-07-27 13:10:08 +00:00
3 changed files with 125 additions and 111 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*.sw? *.sw?
__pycache__

View File

@ -1,3 +0,0 @@
certifi==2024.2.2
cryptography==42.0.5
PyOpenSSL==24.1.0

View File

@ -1,27 +1,23 @@
#! /usr/bin/env python3 #! /usr/bin/env python
''' '''
Sort X509/private key material Sort X509/private key material
''' '''
import fileinput from __future__ import print_function
import logging import logging
import re import re
import sys import fileinput
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import datetime from datetime import datetime
import certifi.core
from cryptography.hazmat.primitives import serialization
from OpenSSL import crypto from OpenSSL import crypto
from cryptography.hazmat.primitives import serialization
import certifi.core
VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\ VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\
r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\ r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\
r'[a-zA-Z0-9]))*$' r'[a-zA-Z0-9]))*$'
CERTIFICATE_RE = '(-{5}BEGIN CERTIFICATE-{5}.*?-{5}END CERTIFICATE-{5})'
RSAPRIVKEY_RE = '(-{5}BEGIN RSA PRIVATE KEY-{5}.*?-{5}END RSA PRIVATE KEY-{5})'
PRIVKEY_RE = '(-{5}BEGIN PRIVATE KEY-{5}.*?-{5}END PRIVATE KEY-{5})'
CERTINFO_TEMPLATE = ''' CERTINFO_TEMPLATE = '''
subject= /{subject} subject= /{subject}
issuer= /{issuer} issuer= /{issuer}
@ -34,68 +30,56 @@ ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8'))
OPENSSLTIME_FMT = '%b %e %T %Y GMT' OPENSSLTIME_FMT = '%b %e %T %Y GMT'
class PkDecorator: # pylint: disable=too-few-public-methods class PkDecorator(object):
''' '''
Provide some information on the private key object Provide some information on the private key object
''' '''
p_key = None pk = None
def __init__(self, p_key): def __init__(self, pk):
self.p_key = p_key self.pk = pk
def __str__(self): def __str__(self):
return "Private key" return "Private key"
class PkDecoratorEC(PkDecorator): # pylint: disable=too-few-public-methods class PkDecoratorEC(PkDecorator):
'''
PkDecorator that knowns about elliptic curves
'''
def __str__(self): def __str__(self):
pk_crypto = self.p_key.to_cryptography_key() pk_crypto = self.pk.to_cryptography_key()
return (f'EC Private key curve {pk_crypto.curve.name} ' return "EC Private key curve %s (%d bits)" % (
f'({pk_crypto.key_size} bits)') pk_crypto.curve.name, pk_crypto.key_size)
class PkDecoratorRSA(PkDecorator): # pylint: disable=too-few-public-methods class PkDecoratorRSA(PkDecorator):
'''
PkDecorator that knowns about RSA
'''
def __str__(self): def __str__(self):
pk_crypto = self.p_key.to_cryptography_key() pk_crypto = self.pk.to_cryptography_key()
return (f'RSA Private key {pk_crypto.key_size} bits ' return "RSA Private key %d bits (exponent %d)" % (
f'(exponent {pk_crypto.private_numbers().public_numbers.e})') pk_crypto.key_size,
pk_crypto.private_numbers().public_numbers.e)
class PkDecoratorDSA(PkDecorator): # pylint: disable=too-few-public-methods class PkDecoratorDSA(PkDecorator):
'''
PkDecorator that knowns about DSA
'''
def __str__(self): def __str__(self):
pk_crypto = self.p_key.to_cryptography_key() pk_crypto = self.pk.to_cryptography_key()
return f'DSA Private key {pk_crypto.key_size} bits' return "DSA Private key %d bits" % pk_crypto.key_size
class PkDecoratorDH(PkDecorator): # pylint: disable=too-few-public-methods class PkDecoratorDH(PkDecorator):
'''
PkDecorator that knowns about DH
'''
def __str__(self): def __str__(self):
pk_crypto = self.p_key.to_cryptography_key() pk_crypto = self.pk.to_cryptography_key()
return f'DH Private key {pk_crypto.key_size} bits' return "DH Private key %d bits" % pk_crypto.key_size
class PkDecoratorFactory: # pylint: disable=too-few-public-methods class PkDecoratorFactory(object):
''' '''
Provide some information on the private key object Provide some information on the private key object
''' '''
@staticmethod def create(pk):
def create(p_key):
''' '''
Create the appropriate decorater object Create the appropriate decorater object
''' '''
@ -105,16 +89,18 @@ class PkDecoratorFactory: # pylint: disable=too-few-public-methods
crypto.TYPE_DSA: PkDecoratorDSA, crypto.TYPE_DSA: PkDecoratorDSA,
crypto.TYPE_RSA: PkDecoratorRSA, crypto.TYPE_RSA: PkDecoratorRSA,
} }
if p_key.type() in decorators: if pk.type() in decorators:
return decorators[p_key.type()](p_key) return decorators[pk.type()](pk)
raise UnsupportedPkEncryption( else:
'Unsupported private key type {p_key.type()}') raise UnsupportedPkEncryption("Unsupported private key type %d"
% pk.type())
class UnsupportedPkEncryption(Exception): class UnsupportedPkEncryption(Exception):
''' '''
When we encounter unsupported encryption algorithms When we encounter unsupported encryption algorithms
''' '''
pass
class CertificateComponentException(Exception): class CertificateComponentException(Exception):
@ -122,6 +108,7 @@ class CertificateComponentException(Exception):
When something is not right with the whole cert+intermediates+private key When something is not right with the whole cert+intermediates+private key
bundle bundle
''' '''
pass
def load_data(filenames): def load_data(filenames):
@ -180,7 +167,7 @@ def match_cert_privkey(cert, priv):
''' '''
Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long
and reworked and reworked
''' # noqa: E501 '''
return get_cert_pubkey(cert) == get_priv_pubkey(priv) return get_cert_pubkey(cert) == get_priv_pubkey(priv)
@ -189,7 +176,15 @@ def find_root(x509_objects, root_issuers):
''' '''
Find a suitable anchor by finding the intermediate that was signed by root Find a suitable anchor by finding the intermediate that was signed by root
''' '''
root_cert = root_issuers[str(x509_objects[-1].get_issuer())] root_cert = None
for x509_object in reversed(x509_objects):
if str(x509_object.get_issuer()) in root_issuers:
root_cert = root_issuers[str(x509_object.get_issuer())]
break
if not root_cert:
raise CertificateComponentException('Unable to find a suitable '
'trusted root certificate '
'for bundle')
logging.debug('Retrieved root certificate %s', root_cert.get_subject()) logging.debug('Retrieved root certificate %s', root_cert.get_subject())
return root_cert return root_cert
@ -200,17 +195,21 @@ def find_intermediate_root(x509_objects, root_issuers):
''' '''
# Some intermediates have the *same* subject as some root certificates. # Some intermediates have the *same* subject as some root certificates.
# blacklist them # blacklist them if their issuer and subject name is present in the root
# XXX better use pubkey/hash for that, but can't find the appropriate # bundle
# interface to that at the moment
excluded_issuers = [str(x.get_subject()) for x in x509_objects excluded_issuers = [str(x.get_subject()) for x in x509_objects
if x.get_subject() != x.get_issuer()] if x.get_subject() != x.get_issuer()
and str(x.get_issuer()) in root_issuers
and str(x.get_subject()) in root_issuers]
logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers)) logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers))
logging.debug('Excluding issuers because of potential intermediates\n\t%s', logging.debug('Excluding issuers because of potential intermediates\n\t%s',
'\n\t'.join(excluded_issuers)) '\n\t'.join(excluded_issuers))
logging.debug('issuers seen in data\n\t%s', logging.debug('Certificates seen in data\n\t%s',
'\n\t'.join([str(x.get_issuer()) for x in x509_objects])) '\n\t'.join([f'Subject: {x.get_subject()},'
f' Issuer: {x.get_issuer()}'
for x in x509_objects]))
return [x for x in x509_objects return [x for x in x509_objects
if str(x.get_issuer()) in root_issuers if str(x.get_issuer()) in root_issuers
and str(x.get_issuer()) not in excluded_issuers] and str(x.get_issuer()) not in excluded_issuers]
@ -227,15 +226,14 @@ def order_x509(x509_objects, root_issuers):
logging.warning('Found self signed (root) certificate %s in input', logging.warning('Found self signed (root) certificate %s in input',
str(root_crt.get_subject())) str(root_crt.get_subject()))
# Double check if our self signed root certificate is not also present # Double check if our self signed root certificate is not also present
# as an intermediate: # as an cross signed intermediate:
# - It is probably invalid input, and doesn't make sense # - It might confuse the ordering process
# - It confuses the ordering process
if next((x for x in x509_objects if next((x for x in x509_objects
if x.get_subject() != x.get_issuer() if x.get_subject() != x.get_issuer()
and x.get_subject() == root_crt.get_subject()), None): and x.get_subject() == root_crt.get_subject()), None):
raise CertificateComponentException( logging.warning('Both present as intermediate '
f'Both present as intermediate ' 'and root certificate: %s' %
f'and root certificate: {str(root_crt.get_subject())}') str(root_crt.get_subject()))
else: else:
# Get intermediate cert signed by any root from bundle as anchor, and # Get intermediate cert signed by any root from bundle as anchor, and
# make that our root # make that our root
@ -261,14 +259,28 @@ def order_x509(x509_objects, root_issuers):
while x509_objects: while x509_objects:
sibling = [x for x in x509_objects sibling = [x for x in x509_objects
if x.get_issuer() == bundle[0].get_subject()] if x.get_issuer() == bundle[0].get_subject()]
parent = [x for x in x509_objects
if x.get_subject() == bundle[-1].get_issuer()]
if sibling and len(sibling) == 1: if sibling and len(sibling) == 1:
# insert sibling at beginning of list # insert sibling at beginning of list
bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
elif parent and len(parent) == 1:
# Try to place a (cross signed) intermediate at the end if it matches
bundle.append(x509_objects.pop(x509_objects.index(parent[0])))
else: else:
# Lets complain # Lets complain
raise CertificateComponentException( logging.error('Certificates remaining data\n\t%s',
f'Non matching certificates in ' '\n\t'.join([f'Subject: {x.get_subject()},'
f'input: No sibling found for {bundle[0].get_subject()}') f' Issuer: {x.get_issuer()}'
for x in x509_objects]))
logging.error('Certificates placed in bundle \n\t%s',
'\n\t'.join([f'Subject: {x.get_subject()},'
f' Issuer: {x.get_issuer()}'
for x in bundle]))
raise CertificateComponentException('Non matching certificates in '
'input:'
' No sibling found for %s'
% bundle[0].get_subject())
return bundle return bundle
@ -280,7 +292,7 @@ def load_root_issuers():
mozrootbundle_location = certifi.core.where() mozrootbundle_location = certifi.core.where()
with open(mozrootbundle_location, 'r', encoding='utf-8') as fname_fh: with open(mozrootbundle_location, 'r') as fname_fh:
logging.info('Using %s for root ca bundle', mozrootbundle_location) logging.info('Using %s for root ca bundle', mozrootbundle_location)
data = fname_fh.read() data = fname_fh.read()
matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
@ -306,7 +318,8 @@ def load_root_issuers():
continue continue
root_issuers = {str(root_cert.get_subject()): root_cert root_issuers = {str(root_cert.get_subject()): root_cert
for root_cert in root_certs} for root_cert in root_certs
if not root_cert.has_expired()}
return root_issuers return root_issuers
@ -380,20 +393,6 @@ def handle_args():
return parser.parse_args() return parser.parse_args()
def setup_logging(args):
'''
Set up logging
'''
if args.verbose or args.check:
logging.basicConfig(level=logging.INFO)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.ERROR)
else:
logging.basicConfig(level=logging.WARNING)
def main(): def main():
''' '''
main program start and argument parsing main program start and argument parsing
@ -403,20 +402,33 @@ def main():
args = handle_args() args = handle_args()
setup_logging(args) if args.verbose or args.check:
logging.basicConfig(level=logging.INFO)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.ERROR)
else:
logging.basicConfig(level=logging.WARNING)
root_issuers = load_root_issuers() root_issuers = load_root_issuers()
for fname, data in list(load_data(args.x509files).items()): for fname, data in list(load_data(args.x509files).items()):
logging.debug('Processing %s', fname) logging.debug('Processing %s', fname)
x509_objects_components = None x509_objects_components = None
x509matches = re.finditer(CERTIFICATE_RE, x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
'.*?'
'-----END CERTIFICATE-----)',
data, re.DOTALL) data, re.DOTALL)
rsamatches = re.finditer(RSAPRIVKEY_RE, rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----'
'.*?'
'-----END RSA PRIVATE KEY-----)',
data, re.DOTALL) data, re.DOTALL)
pkmatches = re.finditer(PRIVKEY_RE, pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----'
'.*?'
'-----END PRIVATE KEY-----)',
data, re.DOTALL) data, re.DOTALL)
x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM, x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM,
@ -439,23 +451,24 @@ def main():
raise CertificateComponentException('More than one RSA private key' raise CertificateComponentException('More than one RSA private key'
' found in input.' ' found in input.'
' Aborting') ' Aborting')
if len(pk_objects) > 1: elif rsa_objects:
raise CertificateComponentException('More than one private key'
' found in input.'
' Aborting')
if rsa_objects:
if not match_cert_privkey(x509_objects[0], rsa_objects[0]): if not match_cert_privkey(x509_objects[0], rsa_objects[0]):
raise CertificateComponentException('Provided certificate' raise CertificateComponentException('Provided certificate'
' and RSA private key' ' and RSA private key'
' do not match') ' do not match')
else:
logging.info('OK: Public key of provided certificate' logging.info('OK: Public key of provided certificate'
' and RSA private key match') ' and RSA private key match')
elif len(pk_objects) > 1:
if pk_objects: raise CertificateComponentException('More than one private key'
' found in input.'
' Aborting')
elif pk_objects:
if not match_cert_privkey(x509_objects[0], pk_objects[0]): if not match_cert_privkey(x509_objects[0], pk_objects[0]):
raise CertificateComponentException('Provided certificate' raise CertificateComponentException('Provided certificate'
' and private key' ' and private key'
' do not match') ' do not match')
else:
logging.info('OK: Public key of provided certificate' logging.info('OK: Public key of provided certificate'
' and private key match') ' and private key match')
@ -464,26 +477,29 @@ def main():
x509_objects.append(find_root(x509_objects, root_issuers)) x509_objects.append(find_root(x509_objects, root_issuers))
logging.debug("Print certificates in order") logging.debug("Print certificates in order")
# Need to do b'CN' to have this python3 compatible
logging.info('Writing bundle for Subject: %s', logging.info('Writing bundle for Subject: %s',
[x[1].decode('utf-8') [x[1].decode('utf-8')
for x in x509_objects_components for x in x509_objects_components
if x[0].decode('utf-8') == 'CN'][0]) if x[0] == b'CN'][0])
for x509_object in [x for x in x509_objects for x509_object in [x for x in x509_objects
if x.get_subject() != x.get_issuer() if x.get_subject() != x.get_issuer()
or args.include_root]: or args.include_root]:
# Stringify subject like openssl x509 -subject # Stringify subject like openssl x509 -subject
x509_subject = '/'.join([ x509_subject = \
f'{component[0].decode()}={component[1].decode()}' '/'.join(['{0}={1}'.format(component[0].decode(),
for component component[1].decode())
in x509_object.get_subject().get_components()]) for component in
x509_object.get_subject().get_components()])
# Stringify issuer like openssl x509 -issuer # Stringify issuer like openssl x509 -issuer
x509_issuer = '/'.join([ x509_issuer = \
f'{component[0].decode()}={component[1].decode()}' '/'.join(['{0}={1}'.format(component[0].decode(),
for component component[1].decode())
in x509_object.get_issuer().get_components()]) for component in
x509_object.get_issuer().get_components()])
x509_not_after = \ x509_not_after = \
datetime.strptime(str(x509_object.get_notAfter()), datetime.strptime(str(x509_object.get_notAfter()),
@ -518,8 +534,8 @@ def main():
print(rsa_object.to_cryptography_key().private_bytes( print(rsa_object.to_cryptography_key().private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=str( encryption_algorithm=serialization.NoEncryption()).decode(
serialization.NoEncryption()).decode('ascii')), 'ascii'),
end='') end='')
elif pk_objects: elif pk_objects:
if not args.check: if not args.check:
@ -534,7 +550,7 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
try: try:
sys.exit(main()) exit(main())
except CertificateComponentException as certcomponent_error: except CertificateComponentException as certcomponent_error:
logging.error(certcomponent_error) logging.error(certcomponent_error)
sys.exit(1) exit(1)