Compare commits
	
		
			2 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| a3e60ea0bf | |||
| 220f1eb394 | 
							
								
								
									
										3
									
								
								requirements.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										3
									
								
								requirements.txt
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,3 @@ | |||||||
|  | certifi==2024.2.2 | ||||||
|  | cryptography==42.0.5 | ||||||
|  | PyOpenSSL==24.1.0 | ||||||
| @ -1,23 +1,35 @@ | |||||||
| #! /usr/bin/env python | #! /usr/bin/env -S uv run --script | ||||||
|  | # /// script | ||||||
|  | # requires-python = ">=3.11" | ||||||
|  | # dependencies = [ | ||||||
|  | #     "certifi", | ||||||
|  | #     "cryptography", | ||||||
|  | #     "pyopenssl", | ||||||
|  | # ] | ||||||
|  | # /// | ||||||
| ''' | ''' | ||||||
| Sort X509/private key material | Sort X509/private key material | ||||||
| ''' | ''' | ||||||
|  |  | ||||||
| from __future__ import print_function | import fileinput | ||||||
|  |  | ||||||
| import logging | import logging | ||||||
| import re | import re | ||||||
| import fileinput | import sys | ||||||
| from argparse import ArgumentParser | from argparse import ArgumentParser | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
| from OpenSSL import crypto |  | ||||||
| from cryptography.hazmat.primitives import serialization |  | ||||||
| import certifi.core | import certifi.core | ||||||
|  | from cryptography.hazmat.primitives import serialization | ||||||
|  | from OpenSSL import crypto | ||||||
|  |  | ||||||
| VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\ | VALID_FQDN_RE = r'^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])'\ | ||||||
|                 r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\ |                 r'(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}'\ | ||||||
|                 r'[a-zA-Z0-9]))*$' |                 r'[a-zA-Z0-9]))*$' | ||||||
|  |  | ||||||
|  | CERTIFICATE_RE = '(-{5}BEGIN CERTIFICATE-{5}.*?-{5}END CERTIFICATE-{5})' | ||||||
|  | RSAPRIVKEY_RE = '(-{5}BEGIN RSA PRIVATE KEY-{5}.*?-{5}END RSA PRIVATE KEY-{5})' | ||||||
|  | PRIVKEY_RE = '(-{5}BEGIN PRIVATE KEY-{5}.*?-{5}END PRIVATE KEY-{5})' | ||||||
|  |  | ||||||
| CERTINFO_TEMPLATE = ''' | CERTINFO_TEMPLATE = ''' | ||||||
| subject= /{subject} | subject= /{subject} | ||||||
| issuer= /{issuer} | issuer= /{issuer} | ||||||
| @ -30,56 +42,68 @@ ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8')) | |||||||
| OPENSSLTIME_FMT = '%b %e %T %Y GMT' | OPENSSLTIME_FMT = '%b %e %T %Y GMT' | ||||||
|  |  | ||||||
|  |  | ||||||
| class PkDecorator(object): | class PkDecorator:  # pylint: disable=too-few-public-methods | ||||||
|     ''' |     ''' | ||||||
|     Provide some information on the private key object |     Provide some information on the private key object | ||||||
|     ''' |     ''' | ||||||
|     pk = None |     p_key = None | ||||||
|  |  | ||||||
|     def __init__(self, pk): |     def __init__(self, p_key): | ||||||
|         self.pk = pk |         self.p_key = p_key | ||||||
|  |  | ||||||
|     def __str__(self): |     def __str__(self): | ||||||
|         return "Private key" |         return "Private key" | ||||||
|  |  | ||||||
|  |  | ||||||
| class PkDecoratorEC(PkDecorator): | class PkDecoratorEC(PkDecorator):  # pylint: disable=too-few-public-methods | ||||||
|  |     ''' | ||||||
|  |     PkDecorator that knowns about elliptic curves | ||||||
|  |     ''' | ||||||
|  |  | ||||||
|     def __str__(self): |     def __str__(self): | ||||||
|         pk_crypto = self.pk.to_cryptography_key() |         pk_crypto = self.p_key.to_cryptography_key() | ||||||
|         return "EC Private key curve %s (%d bits)" % ( |         return (f'EC Private key curve {pk_crypto.curve.name} ' | ||||||
|                 pk_crypto.curve.name, pk_crypto.key_size) |                 f'({pk_crypto.key_size} bits)') | ||||||
|  |  | ||||||
|  |  | ||||||
| class PkDecoratorRSA(PkDecorator): | class PkDecoratorRSA(PkDecorator):  # pylint: disable=too-few-public-methods | ||||||
|  |     ''' | ||||||
|  |     PkDecorator that knowns about RSA | ||||||
|  |     ''' | ||||||
|  |  | ||||||
|     def __str__(self): |     def __str__(self): | ||||||
|         pk_crypto = self.pk.to_cryptography_key() |         pk_crypto = self.p_key.to_cryptography_key() | ||||||
|         return "RSA Private key %d bits (exponent %d)" % ( |         return (f'RSA Private key {pk_crypto.key_size} bits ' | ||||||
|                 pk_crypto.key_size, |                 f'(exponent {pk_crypto.private_numbers().public_numbers.e})') | ||||||
|                 pk_crypto.private_numbers().public_numbers.e) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class PkDecoratorDSA(PkDecorator): | class PkDecoratorDSA(PkDecorator):  # pylint: disable=too-few-public-methods | ||||||
|  |     ''' | ||||||
|  |     PkDecorator that knowns about DSA | ||||||
|  |     ''' | ||||||
|  |  | ||||||
|     def __str__(self): |     def __str__(self): | ||||||
|         pk_crypto = self.pk.to_cryptography_key() |         pk_crypto = self.p_key.to_cryptography_key() | ||||||
|         return "DSA Private key %d bits" % pk_crypto.key_size |         return f'DSA Private key {pk_crypto.key_size} bits' | ||||||
|  |  | ||||||
|  |  | ||||||
| class PkDecoratorDH(PkDecorator): | class PkDecoratorDH(PkDecorator):  # pylint: disable=too-few-public-methods | ||||||
|  |     ''' | ||||||
|  |     PkDecorator that knowns about DH | ||||||
|  |     ''' | ||||||
|  |  | ||||||
|     def __str__(self): |     def __str__(self): | ||||||
|         pk_crypto = self.pk.to_cryptography_key() |         pk_crypto = self.p_key.to_cryptography_key() | ||||||
|         return "DH Private key %d bits" % pk_crypto.key_size |         return f'DH Private key {pk_crypto.key_size} bits' | ||||||
|  |  | ||||||
|  |  | ||||||
| class PkDecoratorFactory(object): | class PkDecoratorFactory:  # pylint: disable=too-few-public-methods | ||||||
|     ''' |     ''' | ||||||
|     Provide some information on the private key object |     Provide some information on the private key object | ||||||
|     ''' |     ''' | ||||||
|  |  | ||||||
|     def create(pk): |     @staticmethod | ||||||
|  |     def create(p_key): | ||||||
|         ''' |         ''' | ||||||
|         Create the appropriate decorater object |         Create the appropriate decorater object | ||||||
|         ''' |         ''' | ||||||
| @ -89,18 +113,16 @@ class PkDecoratorFactory(object): | |||||||
|                 crypto.TYPE_DSA: PkDecoratorDSA, |                 crypto.TYPE_DSA: PkDecoratorDSA, | ||||||
|                 crypto.TYPE_RSA: PkDecoratorRSA, |                 crypto.TYPE_RSA: PkDecoratorRSA, | ||||||
|         } |         } | ||||||
|         if pk.type() in decorators: |         if p_key.type() in decorators: | ||||||
|             return decorators[pk.type()](pk) |             return decorators[p_key.type()](p_key) | ||||||
|         else: |         raise UnsupportedPkEncryption( | ||||||
|             raise UnsupportedPkEncryption("Unsupported private key type %d" |                 'Unsupported private key type {p_key.type()}') | ||||||
|                                           % pk.type()) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class UnsupportedPkEncryption(Exception): | class UnsupportedPkEncryption(Exception): | ||||||
|     ''' |     ''' | ||||||
|     When we encounter unsupported encryption algorithms |     When we encounter unsupported encryption algorithms | ||||||
|     ''' |     ''' | ||||||
|     pass |  | ||||||
|  |  | ||||||
|  |  | ||||||
| class CertificateComponentException(Exception): | class CertificateComponentException(Exception): | ||||||
| @ -108,7 +130,6 @@ class CertificateComponentException(Exception): | |||||||
|     When something is not right with the whole cert+intermediates+private key |     When something is not right with the whole cert+intermediates+private key | ||||||
|     bundle |     bundle | ||||||
|     ''' |     ''' | ||||||
|     pass |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def load_data(filenames): | def load_data(filenames): | ||||||
| @ -167,7 +188,7 @@ def match_cert_privkey(cert, priv): | |||||||
|     ''' |     ''' | ||||||
|     Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long |     Copied from https://stackoverflow.com/questions/19922790/how-to-check-for-python-the-key-associated-with-the-certificate-or-not # noqa pylint: disable=line-too-long | ||||||
|     and reworked |     and reworked | ||||||
|     ''' |     ''' # noqa: E501 | ||||||
|  |  | ||||||
|     return get_cert_pubkey(cert) == get_priv_pubkey(priv) |     return get_cert_pubkey(cert) == get_priv_pubkey(priv) | ||||||
|  |  | ||||||
| @ -220,9 +241,9 @@ def order_x509(x509_objects, root_issuers): | |||||||
|         if next((x for x in x509_objects |         if next((x for x in x509_objects | ||||||
|                  if x.get_subject() != x.get_issuer() |                  if x.get_subject() != x.get_issuer() | ||||||
|                  and x.get_subject() == root_crt.get_subject()), None): |                  and x.get_subject() == root_crt.get_subject()), None): | ||||||
|             raise CertificateComponentException('Both present as intermediate ' |             raise CertificateComponentException( | ||||||
|                                                 'and root certificate: %s' % |                     f'Both present as intermediate ' | ||||||
|                                                 str(root_crt.get_subject())) |                     f'and root certificate: {str(root_crt.get_subject())}') | ||||||
|     else: |     else: | ||||||
|         # Get intermediate cert signed by any root from bundle as anchor, and |         # Get intermediate cert signed by any root from bundle as anchor, and | ||||||
|         # make that our root |         # make that our root | ||||||
| @ -253,10 +274,9 @@ def order_x509(x509_objects, root_issuers): | |||||||
|             bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) |             bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0]))) | ||||||
|         else: |         else: | ||||||
|             # Lets complain |             # Lets complain | ||||||
|             raise CertificateComponentException('Non matching certificates in ' |             raise CertificateComponentException( | ||||||
|                                                 'input:' |                     f'Non matching certificates in ' | ||||||
|                                                 ' No sibling found for %s' |                     f'input: No sibling found for {bundle[0].get_subject()}') | ||||||
|                                                 % bundle[0].get_subject()) |  | ||||||
|     return bundle |     return bundle | ||||||
|  |  | ||||||
|  |  | ||||||
| @ -268,7 +288,7 @@ def load_root_issuers(): | |||||||
|  |  | ||||||
|     mozrootbundle_location = certifi.core.where() |     mozrootbundle_location = certifi.core.where() | ||||||
|  |  | ||||||
|     with open(mozrootbundle_location, 'r') as fname_fh: |     with open(mozrootbundle_location, 'r', encoding='utf-8') as fname_fh: | ||||||
|         logging.info('Using %s for root ca bundle', mozrootbundle_location) |         logging.info('Using %s for root ca bundle', mozrootbundle_location) | ||||||
|         data = fname_fh.read() |         data = fname_fh.read() | ||||||
|         matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' |         matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' | ||||||
| @ -368,15 +388,10 @@ def handle_args(): | |||||||
|     return parser.parse_args() |     return parser.parse_args() | ||||||
|  |  | ||||||
|  |  | ||||||
| def main(): | def setup_logging(args): | ||||||
|     ''' |     ''' | ||||||
|     main program start and argument parsing |     Set up logging | ||||||
|     ''' |     ''' | ||||||
|  |  | ||||||
|     root_issuers = None |  | ||||||
|  |  | ||||||
|     args = handle_args() |  | ||||||
|  |  | ||||||
|     if args.verbose or args.check: |     if args.verbose or args.check: | ||||||
|         logging.basicConfig(level=logging.INFO) |         logging.basicConfig(level=logging.INFO) | ||||||
|     elif args.debug: |     elif args.debug: | ||||||
| @ -386,24 +401,30 @@ def main(): | |||||||
|     else: |     else: | ||||||
|         logging.basicConfig(level=logging.WARNING) |         logging.basicConfig(level=logging.WARNING) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def main(): | ||||||
|  |     ''' | ||||||
|  |     main program start and argument parsing | ||||||
|  |     ''' | ||||||
|  |  | ||||||
|  |     root_issuers = None | ||||||
|  |  | ||||||
|  |     args = handle_args() | ||||||
|  |  | ||||||
|  |     setup_logging(args) | ||||||
|  |  | ||||||
|     root_issuers = load_root_issuers() |     root_issuers = load_root_issuers() | ||||||
|  |  | ||||||
|     for fname, data in list(load_data(args.x509files).items()): |     for fname, data in list(load_data(args.x509files).items()): | ||||||
|         logging.debug('Processing %s', fname) |         logging.debug('Processing %s', fname) | ||||||
|         x509_objects_components = None |         x509_objects_components = None | ||||||
|         x509matches = re.finditer(r'(-----BEGIN CERTIFICATE-----' |         x509matches = re.finditer(CERTIFICATE_RE, | ||||||
|                                   '.*?' |  | ||||||
|                                   '-----END CERTIFICATE-----)', |  | ||||||
|                                   data, re.DOTALL) |                                   data, re.DOTALL) | ||||||
|  |  | ||||||
|         rsamatches = re.finditer(r'(-----BEGIN RSA PRIVATE KEY-----' |         rsamatches = re.finditer(RSAPRIVKEY_RE, | ||||||
|                                  '.*?' |  | ||||||
|                                  '-----END RSA PRIVATE KEY-----)', |  | ||||||
|                                  data, re.DOTALL) |                                  data, re.DOTALL) | ||||||
|  |  | ||||||
|         pkmatches = re.finditer(r'(-----BEGIN PRIVATE KEY-----' |         pkmatches = re.finditer(PRIVKEY_RE, | ||||||
|                                 '.*?' |  | ||||||
|                                 '-----END PRIVATE KEY-----)', |  | ||||||
|                                 data, re.DOTALL) |                                 data, re.DOTALL) | ||||||
|  |  | ||||||
|         x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM, |         x509_objects = [crypto.load_certificate(crypto.FILETYPE_PEM, | ||||||
| @ -426,24 +447,23 @@ def main(): | |||||||
|             raise CertificateComponentException('More than one RSA private key' |             raise CertificateComponentException('More than one RSA private key' | ||||||
|                                                 ' found in input.' |                                                 ' found in input.' | ||||||
|                                                 ' Aborting') |                                                 ' Aborting') | ||||||
|         elif rsa_objects: |         if len(pk_objects) > 1: | ||||||
|  |             raise CertificateComponentException('More than one private key' | ||||||
|  |                                                 ' found in input.' | ||||||
|  |                                                 ' Aborting') | ||||||
|  |         if rsa_objects: | ||||||
|             if not match_cert_privkey(x509_objects[0], rsa_objects[0]): |             if not match_cert_privkey(x509_objects[0], rsa_objects[0]): | ||||||
|                 raise CertificateComponentException('Provided certificate' |                 raise CertificateComponentException('Provided certificate' | ||||||
|                                                     ' and RSA private key' |                                                     ' and RSA private key' | ||||||
|                                                     ' do not match') |                                                     ' do not match') | ||||||
|             else: |  | ||||||
|             logging.info('OK: Public key of provided certificate' |             logging.info('OK: Public key of provided certificate' | ||||||
|                          ' and RSA private key match') |                          ' and RSA private key match') | ||||||
|         elif len(pk_objects) > 1: |  | ||||||
|             raise CertificateComponentException('More than one private key' |         if pk_objects: | ||||||
|                                                 ' found in input.' |  | ||||||
|                                                 ' Aborting') |  | ||||||
|         elif pk_objects: |  | ||||||
|             if not match_cert_privkey(x509_objects[0], pk_objects[0]): |             if not match_cert_privkey(x509_objects[0], pk_objects[0]): | ||||||
|                 raise CertificateComponentException('Provided certificate' |                 raise CertificateComponentException('Provided certificate' | ||||||
|                                                     ' and private key' |                                                     ' and private key' | ||||||
|                                                     ' do not match') |                                                     ' do not match') | ||||||
|             else: |  | ||||||
|             logging.info('OK: Public key of provided certificate' |             logging.info('OK: Public key of provided certificate' | ||||||
|                          ' and private key match') |                          ' and private key match') | ||||||
|  |  | ||||||
| @ -452,29 +472,26 @@ def main(): | |||||||
|             x509_objects.append(find_root(x509_objects, root_issuers)) |             x509_objects.append(find_root(x509_objects, root_issuers)) | ||||||
|  |  | ||||||
|         logging.debug("Print certificates in order") |         logging.debug("Print certificates in order") | ||||||
|         # Need to do b'CN' to have this python3 compatible |  | ||||||
|         logging.info('Writing bundle for Subject: %s', |         logging.info('Writing bundle for Subject: %s', | ||||||
|                      [x[1].decode('utf-8') |                      [x[1].decode('utf-8') | ||||||
|                       for x in x509_objects_components |                       for x in x509_objects_components | ||||||
|                       if x[0] == b'CN'][0]) |                       if x[0].decode('utf-8') == 'CN'][0]) | ||||||
|  |  | ||||||
|         for x509_object in [x for x in x509_objects |         for x509_object in [x for x in x509_objects | ||||||
|                             if x.get_subject() != x.get_issuer() |                             if x.get_subject() != x.get_issuer() | ||||||
|                             or args.include_root]: |                             or args.include_root]: | ||||||
|  |  | ||||||
|             # Stringify subject like openssl x509 -subject |             # Stringify subject like openssl x509 -subject | ||||||
|             x509_subject = \ |             x509_subject = '/'.join([ | ||||||
|                     '/'.join(['{0}={1}'.format(component[0].decode(), |                 f'{component[0].decode()}={component[1].decode()}' | ||||||
|                                                component[1].decode()) |                 for component | ||||||
|                               for component in |                 in x509_object.get_subject().get_components()]) | ||||||
|                               x509_object.get_subject().get_components()]) |  | ||||||
|  |  | ||||||
|             # Stringify issuer like openssl x509 -issuer |             # Stringify issuer like openssl x509 -issuer | ||||||
|             x509_issuer = \ |             x509_issuer = '/'.join([ | ||||||
|                 '/'.join(['{0}={1}'.format(component[0].decode(), |                 f'{component[0].decode()}={component[1].decode()}' | ||||||
|                                            component[1].decode()) |                 for component | ||||||
|                           for component in |                 in x509_object.get_issuer().get_components()]) | ||||||
|                           x509_object.get_issuer().get_components()]) |  | ||||||
|  |  | ||||||
|             x509_not_after = \ |             x509_not_after = \ | ||||||
|                 datetime.strptime(str(x509_object.get_notAfter()), |                 datetime.strptime(str(x509_object.get_notAfter()), | ||||||
| @ -509,8 +526,8 @@ def main(): | |||||||
|                     print(rsa_object.to_cryptography_key().private_bytes( |                     print(rsa_object.to_cryptography_key().private_bytes( | ||||||
|                         encoding=serialization.Encoding.PEM, |                         encoding=serialization.Encoding.PEM, | ||||||
|                         format=serialization.PrivateFormat.TraditionalOpenSSL, |                         format=serialization.PrivateFormat.TraditionalOpenSSL, | ||||||
|                         encryption_algorithm=serialization.NoEncryption()).decode( |                         encryption_algorithm=str( | ||||||
|                             'ascii'), |                             serialization.NoEncryption()).decode('ascii')), | ||||||
|                           end='') |                           end='') | ||||||
|         elif pk_objects: |         elif pk_objects: | ||||||
|             if not args.check: |             if not args.check: | ||||||
| @ -525,7 +542,7 @@ def main(): | |||||||
|  |  | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     try: |     try: | ||||||
|         exit(main()) |         sys.exit(main()) | ||||||
|     except CertificateComponentException as certcomponent_error: |     except CertificateComponentException as certcomponent_error: | ||||||
|         logging.error(certcomponent_error) |         logging.error(certcomponent_error) | ||||||
|         exit(1) |         sys.exit(1) | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user