Better granularity with exception handling
This commit is contained in:
		@ -30,6 +30,18 @@ SHA1 Fingerprint={sha1fingerprint}
 | 
			
		||||
ASN1TIME_FMT = str('%Y%m%d%H%M%SZ'.encode('utf8'))
 | 
			
		||||
OPENSSLTIME_FMT = '%b %e %T %Y GMT'
 | 
			
		||||
 | 
			
		||||
class OnlyRSAKeyException(Exception):
 | 
			
		||||
    '''
 | 
			
		||||
    When we encounter other than RSA crypto material
 | 
			
		||||
    '''
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
class CertificateComponentException(Exception):
 | 
			
		||||
    '''
 | 
			
		||||
    When something is not right with the whole cert+intermediates+private key bundle
 | 
			
		||||
    '''
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_data(filenames):
 | 
			
		||||
    '''
 | 
			
		||||
@ -68,7 +80,7 @@ def get_pub_modulus(cert):
 | 
			
		||||
                      cert.get_subject(),
 | 
			
		||||
                      cert.has_expired(),
 | 
			
		||||
                      pub.type())
 | 
			
		||||
        raise Exception('Can only handle RSA crypto')
 | 
			
		||||
        raise OnlyRSAKeyException('Can only handle RSA crypto')
 | 
			
		||||
 | 
			
		||||
    pub_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, pub)
 | 
			
		||||
    pub_der = asn1.DerSequence()
 | 
			
		||||
@ -85,7 +97,7 @@ def get_priv_modulus(priv):
 | 
			
		||||
 | 
			
		||||
    # Only works for RSA (I think)
 | 
			
		||||
    if priv.type() != crypto.TYPE_RSA:
 | 
			
		||||
        raise Exception('Can only handle RSA crypto')
 | 
			
		||||
        raise OnlyRSAKeyException('Can only handle RSA crypto')
 | 
			
		||||
 | 
			
		||||
    priv_asn1 = crypto.dump_privatekey(crypto.FILETYPE_ASN1, priv)
 | 
			
		||||
    priv_der = asn1.DerSequence()
 | 
			
		||||
@ -143,9 +155,9 @@ def order_x509(x509_objects, root_issuers):
 | 
			
		||||
        if next((x for x in x509_objects
 | 
			
		||||
                 if x.get_subject() != x.get_issuer()
 | 
			
		||||
                 and x.get_subject() == root_crt.get_subject()), None):
 | 
			
		||||
            raise Exception('Both present as intermediate '
 | 
			
		||||
                            'and root certificate: %s' %
 | 
			
		||||
                            str(root_crt.get_subject()))
 | 
			
		||||
            raise CertificateComponentException('Both present as intermediate '
 | 
			
		||||
                                                'and root certificate: %s' %
 | 
			
		||||
                                                str(root_crt.get_subject()))
 | 
			
		||||
    else:
 | 
			
		||||
        # Get intermediate cert signed by any root from bundle as anchor, and
 | 
			
		||||
        # make that our root
 | 
			
		||||
@ -160,7 +172,7 @@ def order_x509(x509_objects, root_issuers):
 | 
			
		||||
                          root_crt[0].get_subject(), root_crt[0].get_issuer())
 | 
			
		||||
            root_crt = x509_objects.pop(x509_objects.index(root_crt[0]))
 | 
			
		||||
        else:
 | 
			
		||||
            raise Exception('No intermediate found')
 | 
			
		||||
            raise CertificateComponentException('No intermediate found')
 | 
			
		||||
 | 
			
		||||
    # Insert our anchor.
 | 
			
		||||
    bundle.insert(0, root_crt)
 | 
			
		||||
@ -176,9 +188,9 @@ def order_x509(x509_objects, root_issuers):
 | 
			
		||||
            bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
 | 
			
		||||
        else:
 | 
			
		||||
            # Lets complain
 | 
			
		||||
            raise Exception('Non matching certificates in input:'
 | 
			
		||||
                            ' No sibling found for %s'
 | 
			
		||||
                            % bundle[0].get_subject())
 | 
			
		||||
            raise CertificateComponentException('Non matching certificates in input:'
 | 
			
		||||
                                                ' No sibling found for %s'
 | 
			
		||||
                                                % bundle[0].get_subject())
 | 
			
		||||
    return bundle
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -260,7 +272,8 @@ def main():
 | 
			
		||||
                              root_cert.get_issuer(),
 | 
			
		||||
                              root_cert.has_expired(),
 | 
			
		||||
                              get_pub_modulus(root_cert))
 | 
			
		||||
            except Exception:
 | 
			
		||||
            except OnlyRSAKeyException as onlyrsa_exception:
 | 
			
		||||
                logging.debug(onlyrsa_exception)
 | 
			
		||||
                continue
 | 
			
		||||
        root_issuers = [str(root_cert.get_subject())
 | 
			
		||||
                        for root_cert in root_certs]
 | 
			
		||||
@ -300,22 +313,22 @@ def main():
 | 
			
		||||
            get_components()
 | 
			
		||||
 | 
			
		||||
        if len(rsa_objects) > 1:
 | 
			
		||||
            raise Exception('More than one RSA private key found in input.'
 | 
			
		||||
                            ' Aborting')
 | 
			
		||||
            raise CertificateComponentException('More than one RSA private key found in input.'
 | 
			
		||||
                                                ' Aborting')
 | 
			
		||||
        elif rsa_objects:
 | 
			
		||||
            if not match_cert_privkey(x509_objects[0], rsa_objects[0]):
 | 
			
		||||
                raise Exception('Provided certificate'
 | 
			
		||||
                                ' and RSA private key do not match')
 | 
			
		||||
                raise CertificateComponentException('Provided certificate'
 | 
			
		||||
                                                    ' and RSA private key do not match')
 | 
			
		||||
            else:
 | 
			
		||||
                logging.info('OK: Modulus of provided certificate'
 | 
			
		||||
                             ' and RSA private key match')
 | 
			
		||||
        elif len(pk_objects) > 1:
 | 
			
		||||
            raise Exception('More than one RSA private key found in input.'
 | 
			
		||||
                            ' Aborting')
 | 
			
		||||
            raise CertificateComponentException('More than one RSA private key found in input.'
 | 
			
		||||
                                                ' Aborting')
 | 
			
		||||
        elif pk_objects:
 | 
			
		||||
            if not match_cert_privkey(x509_objects[0], pk_objects[0]):
 | 
			
		||||
                raise Exception('Provided certificate'
 | 
			
		||||
                                ' and private key do not match')
 | 
			
		||||
                raise CertificateComponentException('Provided certificate'
 | 
			
		||||
                                                    ' and private key do not match')
 | 
			
		||||
            else:
 | 
			
		||||
                logging.info('OK: Modulus of provided certificate'
 | 
			
		||||
                             ' and private key match')
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user