13 Commits

2 changed files with 258 additions and 143 deletions

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
certifi==2024.2.2
cryptography==42.0.5
PyOpenSSL==24.1.0

View File

@ -1,24 +1,35 @@
#! /usr/bin/env python
#! /usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "certifi",
# "cryptography",
# "pyopenssl",
# ]
# ///
'''
Sort X509/RSA key material
Sort X509/private key material
'''
from __future__ import print_function
import fileinput
import logging
import re
import fileinput
import sys
from argparse import ArgumentParser
from datetime import datetime
from OpenSSL import crypto
from Crypto.Util import asn1
from cryptography.hazmat.primitives import serialization
import certifi.core
from cryptography.hazmat.primitives import serialization
from OpenSSL import crypto
VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\
r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\
r'[a-zA-Z0-9]))*$'
CERTIFICATE_RE = '(-{5}BEGIN CERTIFICATE-{5}.*?-{5}END CERTIFICATE-{5})'
RSAPRIVKEY_RE = '(-{5}BEGIN RSA PRIVATE KEY-{5}.*?-{5}END RSA PRIVATE KEY-{5})'
PRIVKEY_RE = '(-{5}BEGIN PRIVATE KEY-{5}.*?-{5}END PRIVATE KEY-{5})'
CERTINFO_TEMPLATE = '''
subject= /{subject}
issuer= /{issuer}
@ -30,17 +41,95 @@ SHA1 Fingerprint={sha1fingerprint}
ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8'))
OPENSSLTIME_FMT = '%b %e %T %Y GMT'
class OnlyRSAKeyException(Exception):
class PkDecorator: # pylint: disable=too-few-public-methods
'''
When we encounter other than RSA crypto material
Provide some information on the private key object
'''
pass
p_key = None
def __init__(self, p_key):
self.p_key = p_key
def __str__(self):
return "Private key"
class PkDecoratorEC(PkDecorator): # pylint: disable=too-few-public-methods
'''
PkDecorator that knowns about elliptic curves
'''
def __str__(self):
pk_crypto = self.p_key.to_cryptography_key()
return (f'EC Private key curve {pk_crypto.curve.name} '
f'({pk_crypto.key_size} bits)')
class PkDecoratorRSA(PkDecorator): # pylint: disable=too-few-public-methods
'''
PkDecorator that knowns about RSA
'''
def __str__(self):
pk_crypto = self.p_key.to_cryptography_key()
return (f'RSA Private key {pk_crypto.key_size} bits '
f'(exponent {pk_crypto.private_numbers().public_numbers.e})')
class PkDecoratorDSA(PkDecorator): # pylint: disable=too-few-public-methods
'''
PkDecorator that knowns about DSA
'''
def __str__(self):
pk_crypto = self.p_key.to_cryptography_key()
return f'DSA Private key {pk_crypto.key_size} bits'
class PkDecoratorDH(PkDecorator): # pylint: disable=too-few-public-methods
'''
PkDecorator that knowns about DH
'''
def __str__(self):
pk_crypto = self.p_key.to_cryptography_key()
return f'DH Private key {pk_crypto.key_size} bits'
class PkDecoratorFactory: # pylint: disable=too-few-public-methods
'''
Provide some information on the private key object
'''
@staticmethod
def create(p_key):
'''
Create the appropriate decorater object
'''
decorators = {
crypto.TYPE_DH: PkDecoratorDH,
crypto.TYPE_EC: PkDecoratorEC,
crypto.TYPE_DSA: PkDecoratorDSA,
crypto.TYPE_RSA: PkDecoratorRSA,
}
if p_key.type() in decorators:
return decorators[p_key.type()](p_key)
raise UnsupportedPkEncryption(
'Unsupported private key type {p_key.type()}')
class UnsupportedPkEncryption(Exception):
'''
When we encounter unsupported encryption algorithms
'''
class CertificateComponentException(Exception):
'''
When something is not right with the whole cert+intermediates+private key bundle
When something is not right with the whole cert+intermediates+private key
bundle
'''
pass
def load_data(filenames):
@ -66,54 +155,42 @@ def load_data(filenames):
return datas
def get_pub_modulus(cert):
def get_cert_pubkey(cert):
'''
Get the modulus of a certificate
'''
pub = cert.get_pubkey()
# Only works for RSA (I think)
if pub.type() != crypto.TYPE_RSA:
logging.debug('Can only handle RSA crypto:'
'\n\tsubject=%s\n\tissuer%s\n\texpired=%s\n\ttype=%s',
cert.get_subject(),
cert.get_subject(),
cert.has_expired(),
pub.type())
raise OnlyRSAKeyException('Can only handle RSA crypto')
pub_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, pub)
pub_der = asn1.DerSequence()
pub_der.decode(pub_asn1)
pub_modulus = pub_der[1]
return pub_modulus
def get_priv_modulus(priv):
'''
Get the modulus of a RSA private key
Get the pubkey of a certificate
'''
# Only works for RSA (I think)
if priv.type() != crypto.TYPE_RSA:
raise OnlyRSAKeyException('Can only handle RSA crypto')
cert_crypto = cert.to_cryptography()
pubkey = cert_crypto.public_key()
pub_bytes = pubkey.public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo)
priv_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, priv)
priv_der = asn1.DerSequence()
priv_der.decode(priv_asn1)
priv_modulus = priv_der[1]
return pub_bytes
return priv_modulus
def get_priv_pubkey(priv):
'''
Get the pubkey of a private key
'''
priv_crypto = priv.to_cryptography_key()
pubkey = priv_crypto.public_key()
pub_bytes = pubkey.public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo)
return pub_bytes
def match_cert_privkey(cert, priv):
'''
Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long
and reworked
'''
''' # noqa: E501
return get_pub_modulus(cert) == get_priv_modulus(priv)
return get_cert_pubkey(cert) == get_priv_pubkey(priv)
def find_root(x509_objects, root_issuers):
@ -124,6 +201,7 @@ def find_root(x509_objects, root_issuers):
logging.debug('Retrieved root certificate %s', root_cert.get_subject())
return root_cert
def find_intermediate_root(x509_objects, root_issuers):
'''
Find a suitable anchor by finding the intermediate that was signed by root
@ -131,7 +209,7 @@ def find_intermediate_root(x509_objects, root_issuers):
# Some intermediates have the *same* subject as some root certificates.
# blacklist them
# XXX better use modulus/hash for that, but can't find the appropriate
# XXX better use pubkey/hash for that, but can't find the appropriate
# interface to that at the moment
excluded_issuers = [str(x.get_subject()) for x in x509_objects
if x.get_subject() != x.get_issuer()]
@ -163,9 +241,9 @@ def order_x509(x509_objects, root_issuers):
if next((x for x in x509_objects
if x.get_subject() != x.get_issuer()
and x.get_subject() == root_crt.get_subject()), None):
raise CertificateComponentException('Both present as intermediate '
'and root certificate: %s' %
str(root_crt.get_subject()))
raise CertificateComponentException(
f'Both present as intermediate '
f'and root certificate: {str(root_crt.get_subject())}')
else:
# Get intermediate cert signed by any root from bundle as anchor, and
# make that our root
@ -196,20 +274,21 @@ def order_x509(x509_objects, root_issuers):
bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
else:
# Lets complain
raise CertificateComponentException('Non matching certificates in input:'
' No sibling found for %s'
% bundle[0].get_subject())
raise CertificateComponentException(
f'Non matching certificates in '
f'input: No sibling found for {bundle[0].get_subject()}')
return bundle
def load_root_issuers():
'''
Return the list of CA roots (RSA only)
Return the list of CA roots
'''
root_issuers = None
mozrootbundle_location = certifi.core.where()
with open(mozrootbundle_location, 'r') as fname_fh:
with open(mozrootbundle_location, 'r', encoding='utf-8') as fname_fh:
logging.info('Using %s for root ca bundle', mozrootbundle_location)
data = fname_fh.read()
matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
@ -225,13 +304,13 @@ def load_root_issuers():
for root_cert in root_certs:
try:
logging.debug('subject=%s\n\tissuer%s\n\t'
'expired=%s\n\tmodulus=%s',
'expired=%s\n\tpubkey=%s',
root_cert.get_subject(),
root_cert.get_issuer(),
root_cert.has_expired(),
get_pub_modulus(root_cert))
except OnlyRSAKeyException as onlyrsa_exception:
logging.debug(onlyrsa_exception)
get_cert_pubkey(root_cert))
except UnsupportedPkEncryption as unsupported_crypto_exception:
logging.debug(unsupported_crypto_exception)
continue
root_issuers = {str(root_cert.get_subject()): root_cert
@ -243,7 +322,7 @@ def handle_args():
'''
Handle tool arguments
'''
parser = ArgumentParser(description='Reorder X509/RSA data for'
parser = ArgumentParser(description='Reorder X509/Private key data for'
' hosting use')
loggrp = parser.add_mutually_exclusive_group()
@ -259,46 +338,61 @@ def handle_args():
outputgrp = parser.add_mutually_exclusive_group()
outputgrp.add_argument('--just-certificate', dest='print_cert',
action='store_true', help='Just print certificate')
outputgrp.add_argument('--no-certificate', dest='print_cert',
outputgrp.add_argument('-c', '--check',
action='store_true',
help='Only check, output nothing')
outputgrp.add_argument('--just-certificate',
dest='print_cert',
action='store_true',
help='Just print certificate')
outputgrp.add_argument('--no-certificate',
dest='print_cert',
action='store_false',
help='Omit certificate from output')
outputgrp.set_defaults(print_cert=True)
outputgrp.add_argument('--just-chain', dest='print_chain',
action='store_true', help='Just print chain')
outputgrp.add_argument('--no-chain', dest='print_chain',
action='store_false', help='Omit chain from output')
outputgrp.add_argument('--include-root', dest='include_root',
action='store_true', help='Also include the root certificate')
outputgrp.add_argument('--just-chain',
dest='print_chain',
action='store_true',
help='Just print chain')
outputgrp.add_argument('--no-chain',
dest='print_chain',
action='store_false',
help='Omit chain from output')
outputgrp.add_argument('--include-root',
dest='include_root',
action='store_true',
help='Also include the root certificate')
outputgrp.set_defaults(print_chain=True)
outputgrp.add_argument('--key', dest='print_key',
outputgrp.add_argument('--key',
dest='print_key',
action='store_true', default=True,
help='Just print key')
outputgrp.add_argument('--no-key', dest='print_key',
action='store_false', help='Omit key from output')
outputgrp.add_argument('--no-key',
dest='print_key',
action='store_false',
help='Omit key from output')
outputgrp.set_defaults(print_key=True)
parser.add_argument('x509files', metavar='x509 file', nargs='*',
parser.add_argument('-i', '--informational',
action='store_true',
help='Show some information about the PEM blocks')
parser.add_argument('x509files',
metavar='x509 file',
nargs='*',
help='x509 fullchain (+ rsa privkey)'
' bundles to be checked')
return parser.parse_args()
def main():
def setup_logging(args):
'''
main program start and argument parsing
Set up logging
'''
root_issuers = None
args = handle_args()
if args.verbose:
if args.verbose or args.check:
logging.basicConfig(level=logging.INFO)
elif args.debug:
logging.basicConfig(level=logging.DEBUG)
@ -307,24 +401,30 @@ def main():
else:
logging.basicConfig(level=logging.WARNING)
def main():
'''
main program start and argument parsing
'''
root_issuers = None
args = handle_args()
setup_logging(args)
root_issuers = load_root_issuers()
for fname, data in list(load_data(args.x509files).items()):
logging.debug('Processing %s', fname)
x509_objects_components = None
x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----'
'.*?'
'-----END CERTIFICATE-----)',
x509matches = re.finditer(CERTIFICATE_RE,
data, re.DOTALL)
rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----'
'.*?'
'-----END RSA PRIVATE KEY-----)',
rsamatches = re.finditer(RSAPRIVKEY_RE,
data, re.DOTALL)
pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----'
'.*?'
'-----END PRIVATE KEY-----)',
pkmatches = re.finditer(PRIVKEY_RE,
data, re.DOTALL)
x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM,
@ -344,54 +444,54 @@ def main():
get_components()
if len(rsa_objects) > 1:
raise CertificateComponentException('More than one RSA private key found in input.'
raise CertificateComponentException('More than one RSA private key'
' found in input.'
' Aborting')
elif rsa_objects:
if len(pk_objects) > 1:
raise CertificateComponentException('More than one private key'
' found in input.'
' Aborting')
if rsa_objects:
if not match_cert_privkey(x509_objects[0], rsa_objects[0]):
raise CertificateComponentException('Provided certificate'
' and RSA private key do not match')
else:
logging.info('OK: Modulus of provided certificate'
' and RSA private key match')
elif len(pk_objects) > 1:
raise CertificateComponentException('More than one RSA private key found in input.'
' Aborting')
elif pk_objects:
' and RSA private key'
' do not match')
logging.info('OK: Public key of provided certificate'
' and RSA private key match')
if pk_objects:
if not match_cert_privkey(x509_objects[0], pk_objects[0]):
raise CertificateComponentException('Provided certificate'
' and private key do not match')
else:
logging.info('OK: Modulus of provided certificate'
' and private key match')
' and private key'
' do not match')
logging.info('OK: Public key of provided certificate'
' and private key match')
if args.include_root:
logging.debug('root certificate in output requested')
x509_objects.append(find_root(x509_objects, root_issuers))
logging.debug("Print certificates in order")
# Need to do b'CN' to have this python3 compatible
logging.info('Writing bundle for Subject: %s',
[x[1].decode('utf-8')
for x in x509_objects_components
if x[0] == b'CN'][0])
if x[0].decode('utf-8') == 'CN'][0])
for x509_object in [x for x in x509_objects
if x.get_subject() != x.get_issuer()
or args.include_root]:
# Stringify subject like openssl x509 -subject
x509_subject = \
'/'.join(['{0}={1}'.format(component[0].decode(),
component[1].decode())
for component in
x509_object.get_subject().get_components()])
x509_subject = '/'.join([
f'{component[0].decode()}={component[1].decode()}'
for component
in x509_object.get_subject().get_components()])
# Stringify issuer like openssl x509 -issuer
x509_issuer = \
'/'.join(['{0}={1}'.format(component[0].decode(),
component[1].decode())
for component in
x509_object.get_issuer().get_components()])
x509_issuer = '/'.join([
f'{component[0].decode()}={component[1].decode()}'
for component
in x509_object.get_issuer().get_components()])
x509_not_after = \
datetime.strptime(str(x509_object.get_notAfter()),
@ -404,33 +504,45 @@ def main():
logging.info('Subject: %s', x509_subject)
logging.info('Issuer: %s', x509_issuer)
print(CERTINFO_TEMPLATE.format(
subject=x509_subject,
issuer=x509_issuer,
notbefore=x509_not_before.strftime(OPENSSLTIME_FMT),
notafter=x509_not_after.strftime(OPENSSLTIME_FMT),
sha1fingerprint=x509_object.digest('sha1').decode()))
if args.informational:
print(CERTINFO_TEMPLATE.format(
subject=x509_subject,
issuer=x509_issuer,
notbefore=x509_not_before.strftime(OPENSSLTIME_FMT),
notafter=x509_not_after.strftime(OPENSSLTIME_FMT),
sha1fingerprint=x509_object.digest('sha1').decode()))
print(crypto.dump_certificate(crypto.FILETYPE_PEM,
x509_object).decode('ascii'),
end='')
if not args.check:
print(crypto.dump_certificate(crypto.FILETYPE_PEM,
x509_object).decode('ascii'),
end='')
if rsa_objects:
logging.info('Print RSA private keys')
for rsa_object in rsa_objects:
print(rsa_object.to_cryptography_key().private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()).decode(
'ascii'),
end='')
if not args.check:
logging.info('Print RSA private keys')
for rsa_object in rsa_objects:
if args.informational:
print(PkDecoratorFactory.create(rsa_object))
print(rsa_object.to_cryptography_key().private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=str(
serialization.NoEncryption()).decode('ascii')),
end='')
elif pk_objects:
logging.info('Print private keys')
for pk_object in pk_objects:
print(crypto.dump_privatekey(crypto.FILETYPE_PEM,
pk_object).decode('ascii'),
end='')
if not args.check:
logging.info('Print private keys')
for pk_object in pk_objects:
if args.informational:
print(PkDecoratorFactory.create(pk_object))
print(crypto.dump_privatekey(crypto.FILETYPE_PEM,
pk_object).decode('ascii'),
end='')
if __name__ == "__main__":
exit(main())
try:
sys.exit(main())
except CertificateComponentException as certcomponent_error:
logging.error(certcomponent_error)
sys.exit(1)