Compare commits
9 Commits
main
...
feature/01
Author | SHA1 | Date | |
---|---|---|---|
d614c1ff06 | |||
497c230394 | |||
4f2119d2a5 | |||
0ec7c8b62c | |||
6dda760ffd | |||
5cce5722c5 | |||
5c4f6d2c67 | |||
2931f4809e | |||
ba94ceb9cc |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +1,2 @@
|
||||
*.sw?
|
||||
__pycache__
|
||||
|
@ -1,3 +0,0 @@
|
||||
certifi==2024.2.2
|
||||
cryptography==42.0.5
|
||||
PyOpenSSL==24.1.0
|
@ -1,27 +1,23 @@
|
||||
#! /usr/bin/env python3
|
||||
#! /usr/bin/env python
|
||||
'''
|
||||
Sort X509/private key material
|
||||
'''
|
||||
|
||||
import fileinput
|
||||
from __future__ import print_function
|
||||
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import fileinput
|
||||
from argparse import ArgumentParser
|
||||
from datetime import datetime
|
||||
|
||||
import certifi.core
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from OpenSSL import crypto
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import certifi.core
|
||||
|
||||
VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\
|
||||
r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\
|
||||
r'[a-zA-Z0-9]))*$'
|
||||
|
||||
CERTIFICATE_RE = '(-{5}BEGIN CERTIFICATE-{5}.*?-{5}END CERTIFICATE-{5})'
|
||||
RSAPRIVKEY_RE = '(-{5}BEGIN RSA PRIVATE KEY-{5}.*?-{5}END RSA PRIVATE KEY-{5})'
|
||||
PRIVKEY_RE = '(-{5}BEGIN PRIVATE KEY-{5}.*?-{5}END PRIVATE KEY-{5})'
|
||||
|
||||
CERTINFO_TEMPLATE = '''
|
||||
subject= /{subject}
|
||||
issuer= /{issuer}
|
||||
@ -34,68 +30,56 @@ ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8'))
|
||||
OPENSSLTIME_FMT = '%b %e %T %Y GMT'
|
||||
|
||||
|
||||
class PkDecorator: # pylint: disable=too-few-public-methods
|
||||
class PkDecorator(object):
|
||||
'''
|
||||
Provide some information on the private key object
|
||||
'''
|
||||
p_key = None
|
||||
pk = None
|
||||
|
||||
def __init__(self, p_key):
|
||||
self.p_key = p_key
|
||||
def __init__(self, pk):
|
||||
self.pk = pk
|
||||
|
||||
def __str__(self):
|
||||
return "Private key"
|
||||
|
||||
|
||||
class PkDecoratorEC(PkDecorator): # pylint: disable=too-few-public-methods
|
||||
'''
|
||||
PkDecorator that knowns about elliptic curves
|
||||
'''
|
||||
class PkDecoratorEC(PkDecorator):
|
||||
|
||||
def __str__(self):
|
||||
pk_crypto = self.p_key.to_cryptography_key()
|
||||
return (f'EC Private key curve {pk_crypto.curve.name} '
|
||||
f'({pk_crypto.key_size} bits)')
|
||||
pk_crypto = self.pk.to_cryptography_key()
|
||||
return "EC Private key curve %s (%d bits)" % (
|
||||
pk_crypto.curve.name, pk_crypto.key_size)
|
||||
|
||||
|
||||
class PkDecoratorRSA(PkDecorator): # pylint: disable=too-few-public-methods
|
||||
'''
|
||||
PkDecorator that knowns about RSA
|
||||
'''
|
||||
class PkDecoratorRSA(PkDecorator):
|
||||
|
||||
def __str__(self):
|
||||
pk_crypto = self.p_key.to_cryptography_key()
|
||||
return (f'RSA Private key {pk_crypto.key_size} bits '
|
||||
f'(exponent {pk_crypto.private_numbers().public_numbers.e})')
|
||||
pk_crypto = self.pk.to_cryptography_key()
|
||||
return "RSA Private key %d bits (exponent %d)" % (
|
||||
pk_crypto.key_size,
|
||||
pk_crypto.private_numbers().public_numbers.e)
|
||||
|
||||
|
||||
class PkDecoratorDSA(PkDecorator): # pylint: disable=too-few-public-methods
|
||||
'''
|
||||
PkDecorator that knowns about DSA
|
||||
'''
|
||||
class PkDecoratorDSA(PkDecorator):
|
||||
|
||||
def __str__(self):
|
||||
pk_crypto = self.p_key.to_cryptography_key()
|
||||
return f'DSA Private key {pk_crypto.key_size} bits'
|
||||
pk_crypto = self.pk.to_cryptography_key()
|
||||
return "DSA Private key %d bits" % pk_crypto.key_size
|
||||
|
||||
|
||||
class PkDecoratorDH(PkDecorator): # pylint: disable=too-few-public-methods
|
||||
'''
|
||||
PkDecorator that knowns about DH
|
||||
'''
|
||||
class PkDecoratorDH(PkDecorator):
|
||||
|
||||
def __str__(self):
|
||||
pk_crypto = self.p_key.to_cryptography_key()
|
||||
return f'DH Private key {pk_crypto.key_size} bits'
|
||||
pk_crypto = self.pk.to_cryptography_key()
|
||||
return "DH Private key %d bits" % pk_crypto.key_size
|
||||
|
||||
|
||||
class PkDecoratorFactory: # pylint: disable=too-few-public-methods
|
||||
class PkDecoratorFactory(object):
|
||||
'''
|
||||
Provide some information on the private key object
|
||||
'''
|
||||
|
||||
@staticmethod
|
||||
def create(p_key):
|
||||
def create(pk):
|
||||
'''
|
||||
Create the appropriate decorater object
|
||||
'''
|
||||
@ -105,16 +89,18 @@ class PkDecoratorFactory: # pylint: disable=too-few-public-methods
|
||||
crypto.TYPE_DSA: PkDecoratorDSA,
|
||||
crypto.TYPE_RSA: PkDecoratorRSA,
|
||||
}
|
||||
if p_key.type() in decorators:
|
||||
return decorators[p_key.type()](p_key)
|
||||
raise UnsupportedPkEncryption(
|
||||
'Unsupported private key type {p_key.type()}')
|
||||
if pk.type() in decorators:
|
||||
return decorators[pk.type()](pk)
|
||||
else:
|
||||
raise UnsupportedPkEncryption("Unsupported private key type %d"
|
||||
% pk.type())
|
||||
|
||||
|
||||
class UnsupportedPkEncryption(Exception):
|
||||
'''
|
||||
When we encounter unsupported encryption algorithms
|
||||
'''
|
||||
pass
|
||||
|
||||
|
||||
class CertificateComponentException(Exception):
|
||||
@ -122,6 +108,7 @@ class CertificateComponentException(Exception):
|
||||
When something is not right with the whole cert+intermediates+private key
|
||||
bundle
|
||||
'''
|
||||
pass
|
||||
|
||||
|
||||
def load_data(filenames):
|
||||
@ -180,7 +167,7 @@ def match_cert_privkey(cert, priv):
|
||||
'''
|
||||
Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long
|
||||
and reworked
|
||||
''' # noqa: E501
|
||||
'''
|
||||
|
||||
return get_cert_pubkey(cert) == get_priv_pubkey(priv)
|
||||
|
||||
@ -189,7 +176,15 @@ def find_root(x509_objects, root_issuers):
|
||||
'''
|
||||
Find a suitable anchor by finding the intermediate that was signed by root
|
||||
'''
|
||||
root_cert = root_issuers[str(x509_objects[-1].get_issuer())]
|
||||
root_cert = None
|
||||
for x509_object in reversed(x509_objects):
|
||||
if str(x509_object.get_issuer()) in root_issuers:
|
||||
root_cert = root_issuers[str(x509_object.get_issuer())]
|
||||
break
|
||||
if not root_cert:
|
||||
raise CertificateComponentException('Unable to find a suitable '
|
||||
'trusted root certificate '
|
||||
'for bundle')
|
||||
logging.debug('Retrieved root certificate %s', root_cert.get_subject())
|
||||
return root_cert
|
||||
|
||||
@ -200,17 +195,21 @@ def find_intermediate_root(x509_objects, root_issuers):
|
||||
'''
|
||||
|
||||
# Some intermediates have the *same* subject as some root certificates.
|
||||
# blacklist them
|
||||
# XXX better use pubkey/hash for that, but can't find the appropriate
|
||||
# interface to that at the moment
|
||||
# blacklist them if their issuer and subject name is present in the root
|
||||
# bundle
|
||||
excluded_issuers = [str(x.get_subject()) for x in x509_objects
|
||||
if x.get_subject() != x.get_issuer()]
|
||||
if x.get_subject() != x.get_issuer()
|
||||
and str(x.get_issuer()) in root_issuers
|
||||
and str(x.get_subject()) in root_issuers]
|
||||
|
||||
logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers))
|
||||
logging.debug('Excluding issuers because of potential intermediates\n\t%s',
|
||||
'\n\t'.join(excluded_issuers))
|
||||
logging.debug('issuers seen in data\n\t%s',
|
||||
'\n\t'.join([str(x.get_issuer()) for x in x509_objects]))
|
||||
logging.debug('Certificates seen in data\n\t%s',
|
||||
'\n\t'.join([f'Subject: {x.get_subject()},'
|
||||
f' Issuer: {x.get_issuer()}'
|
||||
for x in x509_objects]))
|
||||
|
||||
return [x for x in x509_objects
|
||||
if str(x.get_issuer()) in root_issuers
|
||||
and str(x.get_issuer()) not in excluded_issuers]
|
||||
@ -227,15 +226,14 @@ def order_x509(x509_objects, root_issuers):
|
||||
logging.warning('Found self signed (root) certificate %s in input',
|
||||
str(root_crt.get_subject()))
|
||||
# Double check if our self signed root certificate is not also present
|
||||
# as an intermediate:
|
||||
# - It is probably invalid input, and doesn't make sense
|
||||
# - It confuses the ordering process
|
||||
# as an cross signed intermediate:
|
||||
# - It might confuse the ordering process
|
||||
if next((x for x in x509_objects
|
||||
if x.get_subject() != x.get_issuer()
|
||||
and x.get_subject() == root_crt.get_subject()), None):
|
||||
raise CertificateComponentException(
|
||||
f'Both present as intermediate '
|
||||
f'and root certificate: {str(root_crt.get_subject())}')
|
||||
logging.warning('Both present as intermediate '
|
||||
'and root certificate: %s' %
|
||||
str(root_crt.get_subject()))
|
||||
else:
|
||||
# Get intermediate cert signed by any root from bundle as anchor, and
|
||||
# make that our root
|
||||
@ -261,14 +259,28 @@ def order_x509(x509_objects, root_issuers):
|
||||
while x509_objects:
|
||||
sibling = [x for x in x509_objects
|
||||
if x.get_issuer() == bundle[0].get_subject()]
|
||||
parent = [x for x in x509_objects
|
||||
if x.get_subject() == bundle[-1].get_issuer()]
|
||||
if sibling and len(sibling) == 1:
|
||||
# insert sibling at beginning of list
|
||||
bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
|
||||
elif parent and len(parent) == 1:
|
||||
# Try to place a (cross signed) intermediate at the end if it matches
|
||||
bundle.append(x509_objects.pop(x509_objects.index(parent[0])))
|
||||
else:
|
||||
# Lets complain
|
||||
raise CertificateComponentException(
|
||||
f'Non matching certificates in '
|
||||
f'input: No sibling found for {bundle[0].get_subject()}')
|
||||
logging.error('Certificates remaining data\n\t%s',
|
||||
'\n\t'.join([f'Subject: {x.get_subject()},'
|
||||
f' Issuer: {x.get_issuer()}'
|
||||
for x in x509_objects]))
|
||||
logging.error('Certificates placed in bundle \n\t%s',
|
||||
'\n\t'.join([f'Subject: {x.get_subject()},'
|
||||
f' Issuer: {x.get_issuer()}'
|
||||
for x in bundle]))
|
||||
raise CertificateComponentException('Non matching certificates in '
|
||||
'input:'
|
||||
' No sibling found for %s'
|
||||
% bundle[0].get_subject())
|
||||
return bundle
|
||||
|
||||
|
||||
@ -280,7 +292,7 @@ def load_root_issuers():
|
||||
|
||||
mozrootbundle_location = certifi.core.where()
|
||||
|
||||
with open(mozrootbundle_location, 'r', encoding='utf-8') as fname_fh:
|
||||
with open(mozrootbundle_location, 'r') as fname_fh:
|
||||
logging.info('Using %s for root ca bundle', mozrootbundle_location)
|
||||
data = fname_fh.read()
|
||||
matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
|
||||
@ -306,7 +318,8 @@ def load_root_issuers():
|
||||
continue
|
||||
|
||||
root_issuers = {str(root_cert.get_subject()): root_cert
|
||||
for root_cert in root_certs}
|
||||
for root_cert in root_certs
|
||||
if not root_cert.has_expired()}
|
||||
return root_issuers
|
||||
|
||||
|
||||
@ -380,20 +393,6 @@ def handle_args():
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def setup_logging(args):
|
||||
'''
|
||||
Set up logging
|
||||
'''
|
||||
if args.verbose or args.check:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
elif args.quiet:
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
else:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
|
||||
|
||||
def main():
|
||||
'''
|
||||
main program start and argument parsing
|
||||
@ -403,20 +402,33 @@ def main():
|
||||
|
||||
args = handle_args()
|
||||
|
||||
setup_logging(args)
|
||||
if args.verbose or args.check:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
elif args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
elif args.quiet:
|
||||
logging.basicConfig(level=logging.ERROR)
|
||||
else:
|
||||
logging.basicConfig(level=logging.WARNING)
|
||||
|
||||
root_issuers = load_root_issuers()
|
||||
|
||||
for fname, data in list(load_data(args.x509files).items()):
|
||||
logging.debug('Processing %s', fname)
|
||||
x509_objects_components = None
|
||||
x509matches = re.finditer(CERTIFICATE_RE,
|
||||
x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
|
||||
'.*?'
|
||||
'-----END CERTIFICATE-----)',
|
||||
data, re.DOTALL)
|
||||
|
||||
rsamatches = re.finditer(RSAPRIVKEY_RE,
|
||||
rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----'
|
||||
'.*?'
|
||||
'-----END RSA PRIVATE KEY-----)',
|
||||
data, re.DOTALL)
|
||||
|
||||
pkmatches = re.finditer(PRIVKEY_RE,
|
||||
pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----'
|
||||
'.*?'
|
||||
'-----END PRIVATE KEY-----)',
|
||||
data, re.DOTALL)
|
||||
|
||||
x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM,
|
||||
@ -439,51 +451,55 @@ def main():
|
||||
raise CertificateComponentException('More than one RSA private key'
|
||||
' found in input.'
|
||||
' Aborting')
|
||||
if len(pk_objects) > 1:
|
||||
raise CertificateComponentException('More than one private key'
|
||||
' found in input.'
|
||||
' Aborting')
|
||||
if rsa_objects:
|
||||
elif rsa_objects:
|
||||
if not match_cert_privkey(x509_objects[0], rsa_objects[0]):
|
||||
raise CertificateComponentException('Provided certificate'
|
||||
' and RSA private key'
|
||||
' do not match')
|
||||
logging.info('OK: Public key of provided certificate'
|
||||
' and RSA private key match')
|
||||
|
||||
if pk_objects:
|
||||
else:
|
||||
logging.info('OK: Public key of provided certificate'
|
||||
' and RSA private key match')
|
||||
elif len(pk_objects) > 1:
|
||||
raise CertificateComponentException('More than one private key'
|
||||
' found in input.'
|
||||
' Aborting')
|
||||
elif pk_objects:
|
||||
if not match_cert_privkey(x509_objects[0], pk_objects[0]):
|
||||
raise CertificateComponentException('Provided certificate'
|
||||
' and private key'
|
||||
' do not match')
|
||||
logging.info('OK: Public key of provided certificate'
|
||||
' and private key match')
|
||||
else:
|
||||
logging.info('OK: Public key of provided certificate'
|
||||
' and private key match')
|
||||
|
||||
if args.include_root:
|
||||
logging.debug('root certificate in output requested')
|
||||
x509_objects.append(find_root(x509_objects, root_issuers))
|
||||
|
||||
logging.debug("Print certificates in order")
|
||||
# Need to do b'CN' to have this python3 compatible
|
||||
logging.info('Writing bundle for Subject: %s',
|
||||
[x[1].decode('utf-8')
|
||||
for x in x509_objects_components
|
||||
if x[0].decode('utf-8') == 'CN'][0])
|
||||
if x[0] == b'CN'][0])
|
||||
|
||||
for x509_object in [x for x in x509_objects
|
||||
if x.get_subject() != x.get_issuer()
|
||||
or args.include_root]:
|
||||
|
||||
# Stringify subject like openssl x509 -subject
|
||||
x509_subject = '/'.join([
|
||||
f'{component[0].decode()}={component[1].decode()}'
|
||||
for component
|
||||
in x509_object.get_subject().get_components()])
|
||||
x509_subject = \
|
||||
'/'.join(['{0}={1}'.format(component[0].decode(),
|
||||
component[1].decode())
|
||||
for component in
|
||||
x509_object.get_subject().get_components()])
|
||||
|
||||
# Stringify issuer like openssl x509 -issuer
|
||||
x509_issuer = '/'.join([
|
||||
f'{component[0].decode()}={component[1].decode()}'
|
||||
for component
|
||||
in x509_object.get_issuer().get_components()])
|
||||
x509_issuer = \
|
||||
'/'.join(['{0}={1}'.format(component[0].decode(),
|
||||
component[1].decode())
|
||||
for component in
|
||||
x509_object.get_issuer().get_components()])
|
||||
|
||||
x509_not_after = \
|
||||
datetime.strptime(str(x509_object.get_notAfter()),
|
||||
@ -518,8 +534,8 @@ def main():
|
||||
print(rsa_object.to_cryptography_key().private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=str(
|
||||
serialization.NoEncryption()).decode('ascii')),
|
||||
encryption_algorithm=serialization.NoEncryption()).decode(
|
||||
'ascii'),
|
||||
end='')
|
||||
elif pk_objects:
|
||||
if not args.check:
|
||||
@ -534,7 +550,7 @@ def main():
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
sys.exit(main())
|
||||
exit(main())
|
||||
except CertificateComponentException as certcomponent_error:
|
||||
logging.error(certcomponent_error)
|
||||
sys.exit(1)
|
||||
exit(1)
|
||||
|
Loading…
x
Reference in New Issue
Block a user