Compare commits
9 Commits
ebe467260a
...
d614c1ff06
Author | SHA1 | Date | |
---|---|---|---|
d614c1ff06 | |||
497c230394 | |||
4f2119d2a5 | |||
0ec7c8b62c | |||
6dda760ffd | |||
5cce5722c5 | |||
5c4f6d2c67 | |||
2931f4809e | |||
ba94ceb9cc |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1 +1,2 @@
|
|||||||
*.sw?
|
*.sw?
|
||||||
|
__pycache__
|
||||||
|
@ -176,7 +176,15 @@ def find_root(x509_objects, root_issuers):
|
|||||||
'''
|
'''
|
||||||
Find a suitable anchor by finding the intermediate that was signed by root
|
Find a suitable anchor by finding the intermediate that was signed by root
|
||||||
'''
|
'''
|
||||||
root_cert = root_issuers[str(x509_objects[-1].get_issuer())]
|
root_cert = None
|
||||||
|
for x509_object in reversed(x509_objects):
|
||||||
|
if str(x509_object.get_issuer()) in root_issuers:
|
||||||
|
root_cert = root_issuers[str(x509_object.get_issuer())]
|
||||||
|
break
|
||||||
|
if not root_cert:
|
||||||
|
raise CertificateComponentException('Unable to find a suitable '
|
||||||
|
'trusted root certificate '
|
||||||
|
'for bundle')
|
||||||
logging.debug('Retrieved root certificate %s', root_cert.get_subject())
|
logging.debug('Retrieved root certificate %s', root_cert.get_subject())
|
||||||
return root_cert
|
return root_cert
|
||||||
|
|
||||||
@ -187,17 +195,21 @@ def find_intermediate_root(x509_objects, root_issuers):
|
|||||||
'''
|
'''
|
||||||
|
|
||||||
# Some intermediates have the *same* subject as some root certificates.
|
# Some intermediates have the *same* subject as some root certificates.
|
||||||
# blacklist them
|
# blacklist them if their issuer and subject name is present in the root
|
||||||
# XXX better use pubkey/hash for that, but can't find the appropriate
|
# bundle
|
||||||
# interface to that at the moment
|
|
||||||
excluded_issuers = [str(x.get_subject()) for x in x509_objects
|
excluded_issuers = [str(x.get_subject()) for x in x509_objects
|
||||||
if x.get_subject() != x.get_issuer()]
|
if x.get_subject() != x.get_issuer()
|
||||||
|
and str(x.get_issuer()) in root_issuers
|
||||||
|
and str(x.get_subject()) in root_issuers]
|
||||||
|
|
||||||
logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers))
|
logging.debug('Known root issuers\n\t%s', '\n\t'.join(root_issuers))
|
||||||
logging.debug('Excluding issuers because of potential intermediates\n\t%s',
|
logging.debug('Excluding issuers because of potential intermediates\n\t%s',
|
||||||
'\n\t'.join(excluded_issuers))
|
'\n\t'.join(excluded_issuers))
|
||||||
logging.debug('issuers seen in data\n\t%s',
|
logging.debug('Certificates seen in data\n\t%s',
|
||||||
'\n\t'.join([str(x.get_issuer()) for x in x509_objects]))
|
'\n\t'.join([f'Subject: {x.get_subject()},'
|
||||||
|
f' Issuer: {x.get_issuer()}'
|
||||||
|
for x in x509_objects]))
|
||||||
|
|
||||||
return [x for x in x509_objects
|
return [x for x in x509_objects
|
||||||
if str(x.get_issuer()) in root_issuers
|
if str(x.get_issuer()) in root_issuers
|
||||||
and str(x.get_issuer()) not in excluded_issuers]
|
and str(x.get_issuer()) not in excluded_issuers]
|
||||||
@ -214,15 +226,14 @@ def order_x509(x509_objects, root_issuers):
|
|||||||
logging.warning('Found self signed (root) certificate %s in input',
|
logging.warning('Found self signed (root) certificate %s in input',
|
||||||
str(root_crt.get_subject()))
|
str(root_crt.get_subject()))
|
||||||
# Double check if our self signed root certificate is not also present
|
# Double check if our self signed root certificate is not also present
|
||||||
# as an intermediate:
|
# as an cross signed intermediate:
|
||||||
# - It is probably invalid input, and doesn't make sense
|
# - It might confuse the ordering process
|
||||||
# - It confuses the ordering process
|
|
||||||
if next((x for x in x509_objects
|
if next((x for x in x509_objects
|
||||||
if x.get_subject() != x.get_issuer()
|
if x.get_subject() != x.get_issuer()
|
||||||
and x.get_subject() == root_crt.get_subject()), None):
|
and x.get_subject() == root_crt.get_subject()), None):
|
||||||
raise CertificateComponentException('Both present as intermediate '
|
logging.warning('Both present as intermediate '
|
||||||
'and root certificate: %s' %
|
'and root certificate: %s' %
|
||||||
str(root_crt.get_subject()))
|
str(root_crt.get_subject()))
|
||||||
else:
|
else:
|
||||||
# Get intermediate cert signed by any root from bundle as anchor, and
|
# Get intermediate cert signed by any root from bundle as anchor, and
|
||||||
# make that our root
|
# make that our root
|
||||||
@ -248,11 +259,24 @@ def order_x509(x509_objects, root_issuers):
|
|||||||
while x509_objects:
|
while x509_objects:
|
||||||
sibling = [x for x in x509_objects
|
sibling = [x for x in x509_objects
|
||||||
if x.get_issuer() == bundle[0].get_subject()]
|
if x.get_issuer() == bundle[0].get_subject()]
|
||||||
|
parent = [x for x in x509_objects
|
||||||
|
if x.get_subject() == bundle[-1].get_issuer()]
|
||||||
if sibling and len(sibling) == 1:
|
if sibling and len(sibling) == 1:
|
||||||
# insert sibling at beginning of list
|
# insert sibling at beginning of list
|
||||||
bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
|
bundle.insert(0, x509_objects.pop(x509_objects.index(sibling[0])))
|
||||||
|
elif parent and len(parent) == 1:
|
||||||
|
# Try to place a (cross signed) intermediate at the end if it matches
|
||||||
|
bundle.append(x509_objects.pop(x509_objects.index(parent[0])))
|
||||||
else:
|
else:
|
||||||
# Lets complain
|
# Lets complain
|
||||||
|
logging.error('Certificates remaining data\n\t%s',
|
||||||
|
'\n\t'.join([f'Subject: {x.get_subject()},'
|
||||||
|
f' Issuer: {x.get_issuer()}'
|
||||||
|
for x in x509_objects]))
|
||||||
|
logging.error('Certificates placed in bundle \n\t%s',
|
||||||
|
'\n\t'.join([f'Subject: {x.get_subject()},'
|
||||||
|
f' Issuer: {x.get_issuer()}'
|
||||||
|
for x in bundle]))
|
||||||
raise CertificateComponentException('Non matching certificates in '
|
raise CertificateComponentException('Non matching certificates in '
|
||||||
'input:'
|
'input:'
|
||||||
' No sibling found for %s'
|
' No sibling found for %s'
|
||||||
@ -294,7 +318,8 @@ def load_root_issuers():
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
root_issuers = {str(root_cert.get_subject()): root_cert
|
root_issuers = {str(root_cert.get_subject()): root_cert
|
||||||
for root_cert in root_certs}
|
for root_cert in root_certs
|
||||||
|
if not root_cert.has_expired()}
|
||||||
return root_issuers
|
return root_issuers
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user