14 Commits

Author SHA1 Message Date
c59ed29ec5 Merge branch 'release/2022.1' 2022-07-27 15:12:51 +02:00
31fb992d97 Merge branch 'feature/01_python39_regression' into 'develop'
Feature/01 python39 regression

See merge request ruben/sort_certificate!1
2022-07-27 13:10:08 +00:00
b89105fa67 Feature/01 python39 regression 2022-07-27 13:10:08 +00:00
ebe467260a Also show exponent 2020-09-10 17:44:59 +02:00
b0562e1497 Report some information on the private key 2020-09-10 16:43:32 +02:00
4cffe77fa0 Remove more 'RSA Only' references 2020-09-10 11:15:26 +02:00
66f45feea5 Show some information about the PEM blocks with -i 2020-09-09 15:39:45 +02:00
fe5286b8e5 The --check option is actually part of the "output group" set of switches 2020-09-09 15:38:54 +02:00
e4ca594165 Add a check only mode 2020-09-09 15:31:31 +02:00
c866d219cb Note that we handle non RSA as well now 2020-09-09 15:30:49 +02:00
e22223ba66 Please pylint/pep8 etc 2020-08-17 12:19:21 +02:00
1e5aa31eb8 Instead of RSA exclusive moduled, compare public key output from provided certificate and private key. This also enables the use of non RSA cryptography (tested with EC) 2020-08-11 13:25:55 +02:00
02a83a72a2 Merge branch 'master' of gitlab.niet.verweg.com:ruben/sort_certificate 2020-04-07 00:22:40 +02:00
fc3e220d61 Process certificate component errors in a more tidy fashion 2020-04-06 12:31:42 +02:00
2 changed files with 226 additions and 105 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
*.sw? *.sw?
__pycache__

View File

@ -1,6 +1,6 @@
#! /usr/bin/env python #! /usr/bin/env python
''' '''
Sort X509/RSA key material Sort X509/private key material
''' '''
from __future__ import print_function from __future__ import print_function
@ -11,7 +11,6 @@ import fileinput
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import datetime from datetime import datetime
from OpenSSL import crypto from OpenSSL import crypto
from Crypto.Util import asn1
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
import certifi.core import certifi.core
@ -30,15 +29,84 @@ SHA1 Fingerprint={sha1fingerprint}
ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8')) ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8'))
OPENSSLTIME_FMT = '%b %e %T %Y GMT' OPENSSLTIME_FMT = '%b %e %T %Y GMT'
class OnlyRSAKeyException(Exception):
class PkDecorator(object):
''' '''
When we encounter other than RSA crypto material Provide some information on the private key object
'''
pk = None
def __init__(self, pk):
self.pk = pk
def __str__(self):
return "Private key"
class PkDecoratorEC(PkDecorator):
def __str__(self):
pk_crypto = self.pk.to_cryptography_key()
return "EC Private key curve %s (%d bits)" % (
pk_crypto.curve.name, pk_crypto.key_size)
class PkDecoratorRSA(PkDecorator):
def __str__(self):
pk_crypto = self.pk.to_cryptography_key()
return "RSA Private key %d bits (exponent %d)" % (
pk_crypto.key_size,
pk_crypto.private_numbers().public_numbers.e)
class PkDecoratorDSA(PkDecorator):
def __str__(self):
pk_crypto = self.pk.to_cryptography_key()
return "DSA Private key %d bits" % pk_crypto.key_size
class PkDecoratorDH(PkDecorator):
def __str__(self):
pk_crypto = self.pk.to_cryptography_key()
return "DH Private key %d bits" % pk_crypto.key_size
class PkDecoratorFactory(object):
'''
Provide some information on the private key object
'''
def create(pk):
'''
Create the appropriate decorater object
'''
decorators = {
crypto.TYPE_DH: PkDecoratorDH,
crypto.TYPE_EC: PkDecoratorEC,
crypto.TYPE_DSA: PkDecoratorDSA,
crypto.TYPE_RSA: PkDecoratorRSA,
}
if pk.type() in decorators:
return decorators[pk.type()](pk)
else:
raise UnsupportedPkEncryption("Unsupported private key type %d"
% pk.type())
class UnsupportedPkEncryption(Exception):
'''
When we encounter unsupported encryption algorithms
''' '''
pass pass
class CertificateComponentException(Exception): class CertificateComponentException(Exception):
''' '''
When something is not right with the whole cert+intermediates+private key bundle When something is not right with the whole cert+intermediates+private key
bundle
''' '''
pass pass
@ -66,45 +134,33 @@ def load_data(filenames):
return datas return datas
def get_pub_modulus(cert): def get_cert_pubkey(cert):
''' '''
Get the modulus of a certificate Get the pubkey of a certificate
'''
pub = cert.get_pubkey()
# Only works for RSA (I think)
if pub.type() != crypto.TYPE_RSA:
logging.debug('Can only handle RSA crypto:'
'\n\tsubject=%s\n\tissuer%s\n\texpired=%s\n\ttype=%s',
cert.get_subject(),
cert.get_subject(),
cert.has_expired(),
pub.type())
raise OnlyRSAKeyException('Can only handle RSA crypto')
pub_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, pub)
pub_der = asn1.DerSequence()
pub_der.decode(pub_asn1)
pub_modulus = pub_der[1]
return pub_modulus
def get_priv_modulus(priv):
'''
Get the modulus of a RSA private key
''' '''
# Only works for RSA (I think) cert_crypto = cert.to_cryptography()
if priv.type() != crypto.TYPE_RSA: pubkey = cert_crypto.public_key()
raise OnlyRSAKeyException('Can only handle RSA crypto') pub_bytes = pubkey.public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo)
priv_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, priv) return pub_bytes
priv_der = asn1.DerSequence()
priv_der.decode(priv_asn1)
priv_modulus = priv_der[1]
return priv_modulus
def get_priv_pubkey(priv):
'''
Get the pubkey of a private key
'''
priv_crypto = priv.to_cryptography_key()
pubkey = priv_crypto.public_key()
pub_bytes = pubkey.public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo)
return pub_bytes
def match_cert_privkey(cert, priv): def match_cert_privkey(cert, priv):
@ -113,34 +169,47 @@ def match_cert_privkey(cert, priv):
and reworked and reworked
''' '''
return get_pub_modulus(cert) == get_priv_modulus(priv) return get_cert_pubkey(cert) == get_priv_pubkey(priv)
def find_root(x509_objects, root_issuers): def find_root(x509_objects, root_issuers):
''' '''
Find a suitable anchor by finding the intermediate that was signed by root Find a suitable anchor by finding the intermediate that was signed by root
''' '''
root_cert = root_issuers[str(x509_objects[-1].get_issuer())] root_cert = None
for x509_object in reversed(x509_objects):
if str(x509_object.get_issuer()) in root_issuers:
root_cert = root_issuers[str(x509_object.get_issuer())]
break
if not root_cert:
raise CertificateComponentException('Unable to find a suitable '
'trusted root certificate '
'for bundle')
logging.debug('Retrieved root certificate %s', root_cert.get_subject()) logging.debug('Retrieved root certificate %s', root_cert.get_subject())
return root_cert return root_cert
def find_intermediate_root(x509_objects, root_issuers): def find_intermediate_root(x509_objects, root_issuers):
''' '''
Find a suitable anchor by finding the intermediate that was signed by root Find a suitable anchor by finding the intermediate that was signed by root
''' '''
# Some intermediates have the *same* subject as some root certificates. # Some intermediates have the *same* subject as some root certificates.
# blacklist them # blacklist them if their issuer and subject name is present in the root
# XXX better use modulus/hash for that, but can't find the appropriate # bundle
# interface to that at the moment
excluded_issuers = [str(x.get_subject()) for x in x509_objects excluded_issuers = [str(x.get_subject()) for x in x509_objects
if x.get_subject() != x.get_issuer()] if x.get_subject() != x.get_issuer()
and str(x.get_issuer()) in root_issuers
and str(x.get_subject()) in root_issuers]
logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers)) logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers))
logging.debug('Excluding issuers because of potential intermediates\n\t%s', logging.debug('Excluding issuers because of potential intermediates\n\t%s',
'\n\t'.join(excluded_issuers)) '\n\t'.join(excluded_issuers))
logging.debug('issuers seen in data\n\t%s', logging.debug('Certificates seen in data\n\t%s',
'\n\t'.join([str(x.get_issuer()) for x in x509_objects])) '\n\t'.join([f'Subject: {x.get_subject()},'
f' Issuer: {x.get_issuer()}'
for x in x509_objects]))
return [x for x in x509_objects return [x for x in x509_objects
if str(x.get_issuer()) in root_issuers if str(x.get_issuer()) in root_issuers
and str(x.get_issuer()) not in excluded_issuers] and str(x.get_issuer()) not in excluded_issuers]
@ -157,13 +226,12 @@ def order_x509(x509_objects, root_issuers):
logging.warning('Found self signed (root) certificate %s in input', logging.warning('Found self signed (root) certificate %s in input',
str(root_crt.get_subject())) str(root_crt.get_subject()))
# Double check if our self signed root certificate is not also present # Double check if our self signed root certificate is not also present
# as an intermediate: # as an cross signed intermediate:
# - It is probably invalid input, and doesn't make sense # - It might confuse the ordering process
# - It confuses the ordering process
if next((x for x in x509_objects if next((x for x in x509_objects
if x.get_subject() != x.get_issuer() if x.get_subject() != x.get_issuer()
and x.get_subject() == root_crt.get_subject()), None): and x.get_subject() == root_crt.get_subject()), None):
raise CertificateComponentException('Both present as intermediate ' logging.warning('Both present as intermediate '
'and root certificate: %s' % 'and root certificate: %s' %
str(root_crt.get_subject())) str(root_crt.get_subject()))
else: else:
@ -191,19 +259,34 @@ def order_x509(x509_objects, root_issuers):
while x509_objects: while x509_objects:
sibling = [x for x in x509_objects sibling = [x for x in x509_objects
if x.get_issuer() == bundle[0].get_subject()] if x.get_issuer() == bundle[0].get_subject()]
parent = [x for x in x509_objects
if x.get_subject() == bundle[-1].get_issuer()]
if sibling and len(sibling) == 1: if sibling and len(sibling) == 1:
# insert sibling at beginning of list # insert sibling at beginning of list
bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
elif parent and len(parent) == 1:
# Try to place a (cross signed) intermediate at the end if it matches
bundle.append(x509_objects.pop(x509_objects.index(parent[0])))
else: else:
# Lets complain # Lets complain
raise CertificateComponentException('Non matching certificates in input:' logging.error('Certificates remaining data\n\t%s',
'\n\t'.join([f'Subject: {x.get_subject()},'
f' Issuer: {x.get_issuer()}'
for x in x509_objects]))
logging.error('Certificates placed in bundle \n\t%s',
'\n\t'.join([f'Subject: {x.get_subject()},'
f' Issuer: {x.get_issuer()}'
for x in bundle]))
raise CertificateComponentException('Non matching certificates in '
'input:'
' No sibling found for %s' ' No sibling found for %s'
% bundle[0].get_subject()) % bundle[0].get_subject())
return bundle return bundle
def load_root_issuers(): def load_root_issuers():
''' '''
Return the list of CA roots (RSA only) Return the list of CA roots
''' '''
root_issuers = None root_issuers = None
@ -225,17 +308,18 @@ def load_root_issuers():
for root_cert in root_certs: for root_cert in root_certs:
try: try:
logging.debug('subject=%s\n\tissuer%s\n\t' logging.debug('subject=%s\n\tissuer%s\n\t'
'expired=%s\n\tmodulus=%s', 'expired=%s\n\tpubkey=%s',
root_cert.get_subject(), root_cert.get_subject(),
root_cert.get_issuer(), root_cert.get_issuer(),
root_cert.has_expired(), root_cert.has_expired(),
get_pub_modulus(root_cert)) get_cert_pubkey(root_cert))
except OnlyRSAKeyException as onlyrsa_exception: except UnsupportedPkEncryption as unsupported_crypto_exception:
logging.debug(onlyrsa_exception) logging.debug(unsupported_crypto_exception)
continue continue
root_issuers = {str(root_cert.get_subject()): root_cert root_issuers = {str(root_cert.get_subject()): root_cert
for root_cert in root_certs} for root_cert in root_certs
if not root_cert.has_expired()}
return root_issuers return root_issuers
@ -243,7 +327,7 @@ def handle_args():
''' '''
Handle tool arguments Handle tool arguments
''' '''
parser = ArgumentParser(description='Reorder X509/RSA data for' parser = ArgumentParser(description='Reorder X509/Private key data for'
' hosting use') ' hosting use')
loggrp = parser.add_mutually_exclusive_group() loggrp = parser.add_mutually_exclusive_group()
@ -259,29 +343,50 @@ def handle_args():
outputgrp = parser.add_mutually_exclusive_group() outputgrp = parser.add_mutually_exclusive_group()
outputgrp.add_argument('--just-certificate', dest='print_cert', outputgrp.add_argument('-c', '--check',
action='store_true', help='Just print certificate') action='store_true',
outputgrp.add_argument('--no-certificate', dest='print_cert', help='Only check, output nothing')
outputgrp.add_argument('--just-certificate',
dest='print_cert',
action='store_true',
help='Just print certificate')
outputgrp.add_argument('--no-certificate',
dest='print_cert',
action='store_false', action='store_false',
help='Omit certificate from output') help='Omit certificate from output')
outputgrp.set_defaults(print_cert=True) outputgrp.set_defaults(print_cert=True)
outputgrp.add_argument('--just-chain', dest='print_chain', outputgrp.add_argument('--just-chain',
action='store_true', help='Just print chain') dest='print_chain',
outputgrp.add_argument('--no-chain', dest='print_chain', action='store_true',
action='store_false', help='Omit chain from output') help='Just print chain')
outputgrp.add_argument('--include-root', dest='include_root', outputgrp.add_argument('--no-chain',
action='store_true', help='Also include the root certificate') dest='print_chain',
action='store_false',
help='Omit chain from output')
outputgrp.add_argument('--include-root',
dest='include_root',
action='store_true',
help='Also include the root certificate')
outputgrp.set_defaults(print_chain=True) outputgrp.set_defaults(print_chain=True)
outputgrp.add_argument('--key', dest='print_key', outputgrp.add_argument('--key',
dest='print_key',
action='store_true', default=True, action='store_true', default=True,
help='Just print key') help='Just print key')
outputgrp.add_argument('--no-key', dest='print_key', outputgrp.add_argument('--no-key',
action='store_false', help='Omit key from output') dest='print_key',
action='store_false',
help='Omit key from output')
outputgrp.set_defaults(print_key=True) outputgrp.set_defaults(print_key=True)
parser.add_argument('x509files', metavar='x509 file', nargs='*', parser.add_argument('-i', '--informational',
action='store_true',
help='Show some information about the PEM blocks')
parser.add_argument('x509files',
metavar='x509 file',
nargs='*',
help='x509 fullchain (+ rsa privkey)' help='x509 fullchain (+ rsa privkey)'
' bundles to be checked') ' bundles to be checked')
@ -293,12 +398,11 @@ def main():
main program start and argument parsing main program start and argument parsing
''' '''
root_issuers = None root_issuers = None
args = handle_args() args = handle_args()
if args.verbose: if args.verbose or args.check:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
elif args.debug: elif args.debug:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
@ -344,24 +448,28 @@ def main():
get_components() get_components()
if len(rsa_objects) > 1: if len(rsa_objects) > 1:
raise CertificateComponentException('More than one RSA private key found in input.' raise CertificateComponentException('More than one RSA private key'
' found in input.'
' Aborting') ' Aborting')
elif rsa_objects: elif rsa_objects:
if not match_cert_privkey(x509_objects[0], rsa_objects[0]): if not match_cert_privkey(x509_objects[0], rsa_objects[0]):
raise CertificateComponentException('Provided certificate' raise CertificateComponentException('Provided certificate'
' and RSA private key do not match') ' and RSA private key'
' do not match')
else: else:
logging.info('OK: Modulus of provided certificate' logging.info('OK: Public key of provided certificate'
' and RSA private key match') ' and RSA private key match')
elif len(pk_objects) > 1: elif len(pk_objects) > 1:
raise CertificateComponentException('More than one RSA private key found in input.' raise CertificateComponentException('More than one private key'
' found in input.'
' Aborting') ' Aborting')
elif pk_objects: elif pk_objects:
if not match_cert_privkey(x509_objects[0], pk_objects[0]): if not match_cert_privkey(x509_objects[0], pk_objects[0]):
raise CertificateComponentException('Provided certificate' raise CertificateComponentException('Provided certificate'
' and private key do not match') ' and private key'
' do not match')
else: else:
logging.info('OK: Modulus of provided certificate' logging.info('OK: Public key of provided certificate'
' and private key match') ' and private key match')
if args.include_root: if args.include_root:
@ -404,6 +512,7 @@ def main():
logging.info('Subject: %s', x509_subject) logging.info('Subject: %s', x509_subject)
logging.info('Issuer: %s', x509_issuer) logging.info('Issuer: %s', x509_issuer)
if args.informational:
print(CERTINFO_TEMPLATE.format( print(CERTINFO_TEMPLATE.format(
subject=x509_subject, subject=x509_subject,
issuer=x509_issuer, issuer=x509_issuer,
@ -411,13 +520,17 @@ def main():
notafter=x509_not_after.strftime(OPENSSLTIME_FMT), notafter=x509_not_after.strftime(OPENSSLTIME_FMT),
sha1fingerprint=x509_object.digest('sha1').decode())) sha1fingerprint=x509_object.digest('sha1').decode()))
if not args.check:
print(crypto.dump_certificate(crypto.FILETYPE_PEM, print(crypto.dump_certificate(crypto.FILETYPE_PEM,
x509_object).decode('ascii'), x509_object).decode('ascii'),
end='') end='')
if rsa_objects: if rsa_objects:
if not args.check:
logging.info('Print RSA private keys') logging.info('Print RSA private keys')
for rsa_object in rsa_objects: for rsa_object in rsa_objects:
if args.informational:
print(PkDecoratorFactory.create(rsa_object))
print(rsa_object.to_cryptography_key().private_bytes( print(rsa_object.to_cryptography_key().private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.TraditionalOpenSSL,
@ -425,12 +538,19 @@ def main():
'ascii'), 'ascii'),
end='') end='')
elif pk_objects: elif pk_objects:
if not args.check:
logging.info('Print private keys') logging.info('Print private keys')
for pk_object in pk_objects: for pk_object in pk_objects:
if args.informational:
print(PkDecoratorFactory.create(pk_object))
print(crypto.dump_privatekey(crypto.FILETYPE_PEM, print(crypto.dump_privatekey(crypto.FILETYPE_PEM,
pk_object).decode('ascii'), pk_object).decode('ascii'),
end='') end='')
if __name__ == "__main__": if __name__ == "__main__":
try:
exit(main()) exit(main())
except CertificateComponentException as certcomponent_error:
logging.error(certcomponent_error)
exit(1)