Add support for sha256 and md5 ssh keys
authorJan Dittberner <jandd@cacert.org>
Wed, 14 Feb 2018 21:16:41 +0000 (22:16 +0100)
committerJan Dittberner <jandd@cacert.org>
Wed, 14 Feb 2018 21:16:41 +0000 (22:16 +0100)
docs/sphinxext/cacert.py

index f605bb8..fe8e7ff 100644 (file)
@@ -6,6 +6,7 @@
 # sshkeys
 # sshkeylist
 
+import binascii
 import re
 import os.path
 from ipaddress import ip_address
@@ -19,11 +20,13 @@ from sphinx.errors import SphinxError
 from sphinx.util.nodes import set_source_info, make_refnode, traverse_parent
 
 from dateutil.parser import parse as date_parse
+from base64 import b64decode
 from validate_email import validate_email
 
 __version__ = '0.1.0'
 
 SUPPORTED_SSH_KEYTYPES = ('RSA', 'DSA', 'ECDSA', 'ED25519')
+SSH_MD5_RE = r'^([0-9a-f]{2}:){15}[0-9a-f]{2}$'
 
 
 class sslcert_node(nodes.General, nodes.Element):
@@ -49,11 +52,33 @@ def hex_int(argument):
     return value
 
 
-def md5_fingerprint(argument):
-    value = argument.strip().lower()
-    if not re.match(r'^([0-9a-f]{2}:){15}[0-9a-f]{2}$', value):
-        raise ValueError('no correctly formatted SHA1 fingerprint')
-    return value
+def ssh_fingerprint(argument):
+    value = argument.strip().split(" ")
+    result = {}
+    for k in value:
+        if k.startswith('SHA256:'):
+            sha256_encoded = k[len('SHA256:'):]
+            try:
+                sha256_decoded = b64decode(sha256_encoded + "=", validate=True)
+                if len(sha256_decoded) != 32:
+                    raise ValueError(
+                        '{} is no correctly formatted SHA256 fingerprint'.format(
+                            k))
+            except binascii.Error:
+                raise ValueError(
+                    '{} is no correctly formatted SHA256 fingerprint'.format(k))
+            result['sha256'] = sha256_encoded
+        elif k.startswith('MD5:'):
+            if not re.match(SSH_MD5_RE, k[len('MD5:'):].lower()):
+                raise ValueError(
+                    '{} is no correctly formatted MD5 fingerprint'.format(k))
+            result['md5'] = k[len('MD5:'):]
+        else:
+            if not re.match(SSH_MD5_RE, k.lower()):
+                raise ValueError(
+                    '{} is no correctly formatted MD5 fingerprint'.format(k))
+            result['md5'] = k.lower()
+    return result
 
 
 def sha1_fingerprint(argument):
@@ -158,6 +183,7 @@ class CAcertSSLCertList(Directive):
     """
     The sslcertlist directive implementation
     """
+
     def run(self):
         return [sslcertlist_node()]
 
@@ -168,8 +194,9 @@ class CAcertSSHKeys(Directive):
     host keys for a host.
     """
     option_spec = {
-        keytype.lower(): md5_fingerprint for keytype in SUPPORTED_SSH_KEYTYPES
+        keytype.lower(): ssh_fingerprint for keytype in SUPPORTED_SSH_KEYTYPES
     }
+
     def run(self):
         if len(self.options) == 0:
             raise self.error(
@@ -193,6 +220,7 @@ class CAcertSSHKeyList(Directive):
     """
     The sshkeylist directive implementation
     """
+
     def run(self):
         return [sshkeylist_node()]
 
@@ -271,12 +299,12 @@ def _get_cert_index_text(cert_info):
     return "Certificate; %s" % cert_info['cn']
 
 
-def _get_formatted_keyentry(keys_info, algorithm):
+def _get_formatted_keyentry(keys_info, algorithm, fptype):
     entry = nodes.entry()
     algkey = algorithm.lower()
-    if algkey in keys_info:
+    if algkey in keys_info and fptype in keys_info[algkey]:
         para = nodes.paragraph()
-        keyfp = nodes.literal(text=keys_info[algkey])
+        keyfp = nodes.literal(text=keys_info[algkey][fptype])
         para += keyfp
     else:
         para = nodes.paragraph(text="-")
@@ -359,18 +387,19 @@ def process_sslcerts(app, doctree):
         item = nodes.list_item()
         subbullets += item
         certfile = nodes.paragraph(text="certificate in file ")
-        certfile += addnodes.literal_emphasis(text=certdata['certfile']) #, node.line)
+        certfile += addnodes.literal_emphasis(
+            text=certdata['certfile'])  # , node.line)
         item += certfile
         item = nodes.list_item()
         subbullets += item
         keyfile = nodes.paragraph(text="private key in file ")
         keyfile += addnodes.literal_emphasis(text=certdata['keyfile'])
-        #keyfile += _create_interpreted_file_node(
+        # keyfile += _create_interpreted_file_node(
         #    certdata['keyfile'], node.line)
         item += keyfile
 
         node.parent.replace_self([targetnode, indexnode, bullets])
-        #env.note_indexentries_from(env.docname, doctree)
+        # env.note_indexentries_from(env.docname, doctree)
 
 
 def process_sshkeys(app, doctree):
@@ -388,16 +417,16 @@ def process_sshkeys(app, doctree):
     for node in doctree.traverse(sshkeys_node):
         # find section
         section = [s for s in traverse_parent(node, nodes.section)][0]
-        dockeys = {'docname': env.docname, 'secid': section['ids'][0]}
-        dockeys.update(node['keys'])
-        env.cacert_sshkeys.append(dockeys)
+        doc_keys = {'docname': env.docname, 'secid': section['ids'][0]}
+        doc_keys.update(node['keys'])
+        env.cacert_sshkeys.append(doc_keys)
 
         secparent = section.parent
         pos = secparent.index(section)
         # add index node for section
         indextitle = 'SSH host key; %s' % (
-            env.docname in env.titles and env.titles[env.docname].astext()
-            or os.path.basename(env.docname)
+                env.docname in env.titles and env.titles[env.docname].astext()
+                or os.path.basename(env.docname)
         )
         secparent.insert(pos, addnodes.index(entries=[
             ('pair', indextitle, section['ids'][0], '', None),
@@ -416,14 +445,23 @@ def process_sshkeys(app, doctree):
         tgroup += thead
         thead += create_table_row([
             nodes.paragraph(text='Algorithm'),
-            nodes.paragraph(text='Fingerprint'),
+            nodes.paragraph(text='Fingerprints'),
         ])
         tbody = nodes.tbody()
         tgroup += tbody
         for alg in SUPPORTED_SSH_KEYTYPES:
-            if alg.lower() in dockeys:
+            alg_key = alg.lower()
+            if alg_key in doc_keys:
+                result = []
                 fpparagraph = nodes.paragraph()
-                fpparagraph += nodes.literal(text=dockeys[alg.lower()])
+                for ktype in ('sha256', 'md5'):
+                    if ktype in doc_keys[alg_key]:
+                        result.append("{}:{}".format(
+                            ktype.upper(), doc_keys[alg_key][ktype]))
+                for idx in range(len(result)):
+                    fpparagraph += nodes.literal(text=result[idx])
+                    if idx < len(result) - 1:
+                        fpparagraph += nodes.inline(text=", ")
             else:
                 fpparagraph = nodes.paragraph(text='-')
             tbody += create_table_row([
@@ -521,7 +559,7 @@ def process_sslcert_nodes(app, doctree, docname):
             content.append(cert_sec)
 
         node.replace_self(content)
-        #env.note_indexentries_from(docname, doctree)
+        # env.note_indexentries_from(docname, doctree)
 
 
 def process_sshkeys_nodes(app, doctree, docname):
@@ -531,16 +569,16 @@ def process_sshkeys_nodes(app, doctree, docname):
         env.cacert_sslcerts = []
 
     for node in doctree.traverse(sshkeylist_node):
-        content = []
-        content.append(nodes.target(ids=['sshkeylist']))
+        content = [nodes.target(ids=['sshkeylist'])]
 
         if len(env.cacert_sshkeys) > 0:
             table = nodes.table()
             content.append(table)
-            tgroup = nodes.tgroup(cols=3)
+            tgroup = nodes.tgroup(cols=4)
             tgroup += nodes.colspec(colwidth=1)
             tgroup += nodes.colspec(colwidth=1)
-            tgroup += nodes.colspec(colwidth=4)
+            tgroup += nodes.colspec(colwidth=2)
+            tgroup += nodes.colspec(colwidth=2)
             table += tgroup
 
             thead = nodes.thead()
@@ -548,7 +586,7 @@ def process_sshkeys_nodes(app, doctree, docname):
             entry = nodes.entry()
             entry += nodes.paragraph(text="Host")
             row += entry
-            entry = nodes.entry(morecols=1)
+            entry = nodes.entry(morecols=2)
             entry += nodes.paragraph(text="SSH Host Keys")
             row += entry
             thead += row
@@ -577,7 +615,13 @@ def process_sshkeys_nodes(app, doctree, docname):
 
                 entry = nodes.entry()
                 para = nodes.paragraph()
-                para += nodes.strong(text='SSH host key MD5 fingerprint')
+                para += nodes.strong(text='MD5 fingerprint')
+                entry += para
+                trow += entry
+
+                entry = nodes.entry()
+                para = nodes.paragraph()
+                para += nodes.strong(text='SHA256 fingerprint')
                 entry += para
                 trow += entry
 
@@ -590,7 +634,9 @@ def process_sshkeys_nodes(app, doctree, docname):
                     entry += nodes.paragraph(text=algorithm)
                     trow += entry
 
-                    trow += _get_formatted_keyentry(keys_info, algorithm)
+                    trow += _get_formatted_keyentry(keys_info, algorithm, 'md5')
+                    trow += _get_formatted_keyentry(keys_info, algorithm,
+                                                    'sha256')
                     tbody += trow
         else:
             content.append(nodes.paragraph(
@@ -607,7 +653,7 @@ def resolve_missing_reference(app, env, node, contnode):
                 app.builder, node['refdoc'], env.cacert_certlistdoc,
                 node['refid'], contnode)
         raise SphinxError('No certlist directive found in the document tree')
-    if node['reftype'] == 'sshkeyref' :
+    if node['reftype'] == 'sshkeyref':
         if hasattr(env, 'cacert_sshkeylistdoc'):
             return make_refnode(
                 app.builder, node['refdoc'], env.cacert_sshkeylistdoc,
@@ -617,8 +663,8 @@ def resolve_missing_reference(app, env, node, contnode):
 
 def purge_sslcerts(app, env, docname):
     if (
-        hasattr(env, 'cacert_certlistdoc') and
-        env.cacert_certlistdoc == docname
+            hasattr(env, 'cacert_certlistdoc') and
+            env.cacert_certlistdoc == docname
     ):
         delattr(env, 'cacert_certlistdoc')
     if not hasattr(env, 'cacert_sslcerts'):
@@ -632,8 +678,8 @@ def purge_sslcerts(app, env, docname):
 
 def purge_sshkeys(app, env, docname):
     if (
-        hasattr(env, 'cacert_sshkeylistdoc') and
-        env.cacert_sshkeylistdoc == docname
+            hasattr(env, 'cacert_sshkeylistdoc') and
+            env.cacert_sshkeylistdoc == docname
     ):
         delattr(env, 'cacert_sshkeylistdoc')
     if not hasattr(env, 'cacert_sshkeys'):