Add support for sha256 and md5 ssh keys
[cacert-infradocs.git] / docs / sphinxext / cacert.py
1 # -*- python -*-
2 # This module provides the following CAcert specific sphinx directives
3 #
4 # sslcert
5 # sslcertlist
6 # sshkeys
7 # sshkeylist
8
9 import binascii
10 import re
11 import os.path
12 from ipaddress import ip_address
13
14 from docutils import nodes
15 from docutils.parsers.rst import Directive
16 from docutils.parsers.rst import directives
17
18 from sphinx import addnodes
19 from sphinx.errors import SphinxError
20 from sphinx.util.nodes import set_source_info, make_refnode, traverse_parent
21
22 from dateutil.parser import parse as date_parse
23 from base64 import b64decode
24 from validate_email import validate_email
25
26 __version__ = '0.1.0'
27
28 SUPPORTED_SSH_KEYTYPES = ('RSA', 'DSA', 'ECDSA', 'ED25519')
29 SSH_MD5_RE = r'^([0-9a-f]{2}:){15}[0-9a-f]{2}$'
30
31
32 class sslcert_node(nodes.General, nodes.Element):
33 pass
34
35
36 class sslcertlist_node(nodes.General, nodes.Element):
37 pass
38
39
40 class sshkeys_node(nodes.General, nodes.Element):
41 pass
42
43
44 class sshkeylist_node(nodes.General, nodes.Element):
45 pass
46
47
48 # mapping and validation functions for directive options
49
50 def hex_int(argument):
51 value = int(argument, base=16)
52 return value
53
54
55 def ssh_fingerprint(argument):
56 value = argument.strip().split(" ")
57 result = {}
58 for k in value:
59 if k.startswith('SHA256:'):
60 sha256_encoded = k[len('SHA256:'):]
61 try:
62 sha256_decoded = b64decode(sha256_encoded + "=", validate=True)
63 if len(sha256_decoded) != 32:
64 raise ValueError(
65 '{} is no correctly formatted SHA256 fingerprint'.format(
66 k))
67 except binascii.Error:
68 raise ValueError(
69 '{} is no correctly formatted SHA256 fingerprint'.format(k))
70 result['sha256'] = sha256_encoded
71 elif k.startswith('MD5:'):
72 if not re.match(SSH_MD5_RE, k[len('MD5:'):].lower()):
73 raise ValueError(
74 '{} is no correctly formatted MD5 fingerprint'.format(k))
75 result['md5'] = k[len('MD5:'):]
76 else:
77 if not re.match(SSH_MD5_RE, k.lower()):
78 raise ValueError(
79 '{} is no correctly formatted MD5 fingerprint'.format(k))
80 result['md5'] = k.lower()
81 return result
82
83
84 def sha1_fingerprint(argument):
85 value = argument.strip().lower()
86 if not re.match(r'^([0-9a-f]{2}:){19}[0-9a-f]{2}$', value):
87 raise ValueError('no correctly formatted SHA1 fingerprint')
88 return value
89
90
91 def is_valid_hostname(hostname):
92 if len(hostname) > 255:
93 return False
94 if hostname[-1] == ".": # strip exactly one dot from the right, if present
95 hostname = hostname[:-1]
96 allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
97 return all(allowed.match(x) for x in hostname.split("."))
98
99
100 def is_valid_ipaddress(content):
101 try:
102 ip_address(content)
103 except ValueError:
104 return False
105 return True
106
107
108 def subject_alternative_names(argument):
109 value = [san.strip().split(':', 1) for san in argument.split(',')]
110 for typ, content in value:
111 if typ == 'DNS':
112 if not is_valid_hostname(content):
113 raise ValueError("%s is no valid DNS name" % content)
114 elif typ == 'EMAIL':
115 if not validate_email(content):
116 raise ValueError("%s is not a valid email address" % content)
117 elif typ == 'IP':
118 if not is_valid_ipaddress(content):
119 raise ValueError("%s is not a valid IP address" % content)
120 else:
121 raise ValueError(
122 "handling of %s subject alternative names (%s) has not been "
123 "implemented" % (typ, content))
124 return value
125
126
127 def expiration_date(argument):
128 return date_parse(directives.unchanged_required(argument))
129
130
131 class CAcertSSLCert(Directive):
132 """
133 The sslcert directive implementation.
134
135 There must only be one instance of a certificate with the same CN and
136 serial number that is not flagged as secondary
137 """
138 final_argument_whitespace = True
139 required_arguments = 1
140 option_spec = {
141 'certfile': directives.path,
142 'keyfile': directives.path,
143 'serial': hex_int,
144 'expiration': expiration_date,
145 'sha1fp': sha1_fingerprint,
146 'altnames': subject_alternative_names,
147 'issuer': directives.unchanged_required,
148 'secondary': directives.flag
149 }
150
151 def run(self):
152 if 'secondary' in self.options:
153 missing = [
154 required for required in ('certfile', 'keyfile', 'serial')
155 if required not in self.options
156 ]
157 else:
158 missing = [
159 required for required in (
160 'certfile', 'keyfile', 'serial', 'expiration', 'sha1fp',
161 'issuer')
162 if required not in self.options
163 ]
164 if missing:
165 raise self.error(
166 "required option(s) '%s' is/are not set for %s." % (
167 "', '".join(missing), self.name))
168 sslcert = sslcert_node()
169 sslcert.attributes['certdata'] = self.options.copy()
170 sslcert.attributes['certdata']['cn'] = self.arguments[0]
171 set_source_info(self, sslcert)
172
173 env = self.state.document.settings.env
174 targetid = 'sslcert-%s' % env.new_serialno('sslcert')
175 targetnode = nodes.target('', '', ids=[targetid])
176 para = nodes.paragraph()
177 para.append(targetnode)
178 para.append(sslcert)
179 return [para]
180
181
182 class CAcertSSLCertList(Directive):
183 """
184 The sslcertlist directive implementation
185 """
186
187 def run(self):
188 return [sslcertlist_node()]
189
190
191 class CAcertSSHKeys(Directive):
192 """
193 The sshkeys directive implementation that can be used to specify the ssh
194 host keys for a host.
195 """
196 option_spec = {
197 keytype.lower(): ssh_fingerprint for keytype in SUPPORTED_SSH_KEYTYPES
198 }
199
200 def run(self):
201 if len(self.options) == 0:
202 raise self.error(
203 "at least one ssh key fingerprint must be specified. The "
204 "following formats are supported: %s" % ", ".join(
205 SUPPORTED_SSH_KEYTYPES))
206 sshkeys = sshkeys_node()
207 sshkeys.attributes['keys'] = self.options.copy()
208 set_source_info(self, sshkeys)
209
210 env = self.state.document.settings.env
211 secid = 'sshkeys-%s' % env.new_serialno('sshkeys')
212
213 section = nodes.section(ids=[secid])
214 section += nodes.title(text='SSH host keys')
215 section += sshkeys
216 return [section]
217
218
219 class CAcertSSHKeyList(Directive):
220 """
221 The sshkeylist directive implementation
222 """
223
224 def run(self):
225 return [sshkeylist_node()]
226
227
228 def create_table_row(rowdata):
229 row = nodes.row()
230 for cell in rowdata:
231 entry = nodes.entry()
232 row += entry
233 entry += cell
234 return row
235
236
237 def _sslcert_item_key(item):
238 return "%s-%d" % (item['cn'], item['serial'])
239
240
241 def _sshkeys_item_key(item):
242 return "%s" % os.path.basename(item['docname'])
243
244
245 def _build_cert_anchor_name(cn, serial):
246 return 'cert_%s_%d' % (cn.replace('.', '_'), serial)
247
248
249 def _format_subject_alternative_names(altnames):
250 return nodes.paragraph(text=", ".join(
251 [content for _, content in altnames]
252 ))
253
254
255 def _place_sort_key(place):
256 return "%s-%d" % (place['docname'], place['lineno'])
257
258
259 def _file_ref_paragraph(cert_info, filekey, app, env, docname):
260 para = nodes.paragraph()
261
262 places = [place for place in cert_info['places'] if place['primary']]
263 places.extend(sorted([
264 place for place in cert_info['places'] if not place['primary']],
265 key=_place_sort_key))
266
267 for pos in range(len(places)):
268 place = places[pos]
269 title = env.titles[place['docname']].astext().lower()
270 if place['primary'] and len(places) > 1:
271 reftext = nodes.strong(text=title)
272 else:
273 reftext = nodes.Text(title)
274 para += make_refnode(
275 app.builder, docname, place['docname'], place['target']['ids'][0],
276 reftext)
277 para += nodes.Text(":")
278 para += addnodes.literal_emphasis(text=place[filekey])
279 if pos + 1 < len(places):
280 para += nodes.Text(", ")
281 return para
282
283
284 def _format_serial_number(serial):
285 return nodes.paragraph(text="%d (0x%0x)" % (serial, serial))
286
287
288 def _format_expiration_date(expiration):
289 return nodes.paragraph(text=expiration)
290
291
292 def _format_fingerprint(fingerprint):
293 para = nodes.paragraph()
294 para += nodes.literal(text=fingerprint, classes=['fingerprint'])
295 return para
296
297
298 def _get_cert_index_text(cert_info):
299 return "Certificate; %s" % cert_info['cn']
300
301
302 def _get_formatted_keyentry(keys_info, algorithm, fptype):
303 entry = nodes.entry()
304 algkey = algorithm.lower()
305 if algkey in keys_info and fptype in keys_info[algkey]:
306 para = nodes.paragraph()
307 keyfp = nodes.literal(text=keys_info[algkey][fptype])
308 para += keyfp
309 else:
310 para = nodes.paragraph(text="-")
311 entry += para
312 return entry
313
314
315 def process_sslcerts(app, doctree):
316 env = app.builder.env
317 if not hasattr(env, 'cacert_sslcerts'):
318 env.cacert_sslcerts = []
319
320 for node in doctree.traverse(sslcertlist_node):
321 if hasattr(env, 'cacert_certlistdoc'):
322 raise SphinxError(
323 "There must be one sslcertlist directive present in "
324 "the document tree only.")
325 env.cacert_certlistdoc = env.docname
326
327 for node in doctree.traverse(sslcert_node):
328 try:
329 targetnode = node.parent[node.parent.index(node) - 1]
330 if not isinstance(targetnode, nodes.target):
331 raise IndexError
332 except IndexError:
333 targetnode = None
334 certdata = node.attributes['certdata'].copy()
335 existing = [
336 cert_info for cert_info in env.cacert_sslcerts
337 if (cert_info['cn'], cert_info['serial']) ==
338 (certdata['cn'], certdata['serial'])
339 ]
340 place_info = {
341 'docname': env.docname,
342 'lineno': node.line,
343 'certfile': certdata['certfile'],
344 'keyfile': certdata['keyfile'],
345 'primary': 'secondary' not in certdata,
346 'target': targetnode,
347 }
348 if existing:
349 info = existing[0]
350 else:
351 info = {
352 'cn': certdata['cn'],
353 'serial': certdata['serial'],
354 'places': [],
355 }
356 env.cacert_sslcerts.append(info)
357 info['places'].append(place_info)
358 if 'sha1fp' in certdata:
359 info['sha1fp'] = certdata['sha1fp']
360 if 'issuer' in certdata:
361 info['issuer'] = certdata['issuer']
362 if 'expiration' in certdata:
363 info['expiration'] = certdata['expiration']
364 if 'altnames' in certdata:
365 info['altnames'] = certdata['altnames'].copy()
366 indexnode = addnodes.index(entries=[
367 ('pair', _get_cert_index_text(info), targetnode['ids'][0],
368 '', None)
369 ])
370
371 bullets = nodes.bullet_list()
372 certitem = nodes.list_item()
373 bullets += certitem
374 certpara = nodes.paragraph()
375 certpara += nodes.Text('Certificate for CN %s, see ' % certdata['cn'])
376 refid = _build_cert_anchor_name(certdata['cn'], certdata['serial'])
377 detailref = addnodes.pending_xref(
378 reftype='certlistref', refdoc=env.docname, refid=refid,
379 reftarget='certlist'
380 )
381 detailref += nodes.Text("details in the certificate list")
382 certpara += detailref
383 certitem += certpara
384
385 subbullets = nodes.bullet_list()
386 bullets += subbullets
387 item = nodes.list_item()
388 subbullets += item
389 certfile = nodes.paragraph(text="certificate in file ")
390 certfile += addnodes.literal_emphasis(
391 text=certdata['certfile']) # , node.line)
392 item += certfile
393 item = nodes.list_item()
394 subbullets += item
395 keyfile = nodes.paragraph(text="private key in file ")
396 keyfile += addnodes.literal_emphasis(text=certdata['keyfile'])
397 # keyfile += _create_interpreted_file_node(
398 # certdata['keyfile'], node.line)
399 item += keyfile
400
401 node.parent.replace_self([targetnode, indexnode, bullets])
402 # env.note_indexentries_from(env.docname, doctree)
403
404
405 def process_sshkeys(app, doctree):
406 env = app.builder.env
407 if not hasattr(env, 'cacert_sshkeys'):
408 env.cacert_sshkeys = []
409
410 for _ in doctree.traverse(sshkeylist_node):
411 if hasattr(env, 'cacert_sshkeylistdoc'):
412 raise SphinxError(
413 "There must be one sshkeylist directive present in "
414 "the document tree only.")
415 env.cacert_sshkeylistdoc = env.docname
416
417 for node in doctree.traverse(sshkeys_node):
418 # find section
419 section = [s for s in traverse_parent(node, nodes.section)][0]
420 doc_keys = {'docname': env.docname, 'secid': section['ids'][0]}
421 doc_keys.update(node['keys'])
422 env.cacert_sshkeys.append(doc_keys)
423
424 secparent = section.parent
425 pos = secparent.index(section)
426 # add index node for section
427 indextitle = 'SSH host key; %s' % (
428 env.docname in env.titles and env.titles[env.docname].astext()
429 or os.path.basename(env.docname)
430 )
431 secparent.insert(pos, addnodes.index(entries=[
432 ('pair', indextitle, section['ids'][0], '', None),
433 ]))
434
435 # add table
436 content = []
437 table = nodes.table()
438 content.append(table)
439 cols = (1, 4)
440 tgroup = nodes.tgroup(cols=len(cols))
441 table += tgroup
442 for col in cols:
443 tgroup += nodes.colspec(colwidth=col)
444 thead = nodes.thead()
445 tgroup += thead
446 thead += create_table_row([
447 nodes.paragraph(text='Algorithm'),
448 nodes.paragraph(text='Fingerprints'),
449 ])
450 tbody = nodes.tbody()
451 tgroup += tbody
452 for alg in SUPPORTED_SSH_KEYTYPES:
453 alg_key = alg.lower()
454 if alg_key in doc_keys:
455 result = []
456 fpparagraph = nodes.paragraph()
457 for ktype in ('sha256', 'md5'):
458 if ktype in doc_keys[alg_key]:
459 result.append("{}:{}".format(
460 ktype.upper(), doc_keys[alg_key][ktype]))
461 for idx in range(len(result)):
462 fpparagraph += nodes.literal(text=result[idx])
463 if idx < len(result) - 1:
464 fpparagraph += nodes.inline(text=", ")
465 else:
466 fpparagraph = nodes.paragraph(text='-')
467 tbody += create_table_row([
468 nodes.paragraph(text=alg),
469 fpparagraph,
470 ])
471 # add pending_xref for link to ssh key list
472 seealso = addnodes.seealso()
473 content.append(seealso)
474 detailref = addnodes.pending_xref(
475 reftype='sshkeyref', refdoc=env.docname, refid='sshkeylist',
476 reftarget='sshkeylist'
477 )
478 detailref += nodes.Text("SSH host key list")
479 seepara = nodes.paragraph()
480 seepara += detailref
481 seealso += seepara
482
483 node.replace_self(content)
484
485
486 def process_sslcert_nodes(app, doctree, docname):
487 env = app.builder.env
488
489 if not hasattr(env, 'cacert_sslcerts'):
490 env.cacert_sslcerts = []
491
492 for node in doctree.traverse(sslcertlist_node):
493 content = []
494
495 for cert_info in sorted(env.cacert_sslcerts, key=_sslcert_item_key):
496 primarycount = len([
497 place for place in cert_info['places'] if place['primary']
498 ])
499 if primarycount != 1:
500 raise SphinxError(
501 "There must be exactly one primary place for a "
502 "certificate, but the certificate for CN %s with "
503 "serial number %d has %d" %
504 (cert_info['cn'], cert_info['serial'], primarycount)
505 )
506 cert_sec = nodes.section()
507 cert_sec['ids'].append(
508 _build_cert_anchor_name(cert_info['cn'],
509 cert_info['serial'])
510 )
511 cert_sec += nodes.title(text=cert_info['cn'])
512 indexnode = addnodes.index(entries=[
513 ('pair', _get_cert_index_text(cert_info),
514 cert_sec['ids'][0], '', None),
515 ])
516 content.append(indexnode)
517 table = nodes.table()
518 cert_sec += table
519 tgroup = nodes.tgroup(cols=2)
520 table += tgroup
521 tgroup += nodes.colspec(colwidth=1)
522 tgroup += nodes.colspec(colwidth=5)
523 tbody = nodes.tbody()
524 tgroup += tbody
525 tbody += create_table_row([
526 nodes.paragraph(text='Common Name'),
527 nodes.paragraph(text=cert_info['cn'])
528 ])
529 if 'altnames' in cert_info:
530 tbody += create_table_row([
531 nodes.paragraph(text='Subject Alternative Names'),
532 _format_subject_alternative_names(
533 cert_info['altnames'])
534 ])
535 tbody += create_table_row([
536 nodes.paragraph(text='Key kept at'),
537 _file_ref_paragraph(cert_info, 'keyfile', app, env, docname)
538 ])
539 tbody += create_table_row([
540 nodes.paragraph(text='Cert kept at'),
541 _file_ref_paragraph(cert_info, 'certfile', app, env, docname)
542 ])
543 tbody += create_table_row([
544 nodes.paragraph(text='Serial number'),
545 _format_serial_number(cert_info['serial'])
546 ])
547 tbody += create_table_row([
548 nodes.paragraph(text='Expiration date'),
549 _format_expiration_date(cert_info['expiration'])
550 ])
551 tbody += create_table_row([
552 nodes.paragraph(text='Issuer'),
553 nodes.paragraph(text=cert_info['issuer'])
554 ])
555 tbody += create_table_row([
556 nodes.paragraph(text='SHA1 fingerprint'),
557 _format_fingerprint(cert_info['sha1fp'])
558 ])
559 content.append(cert_sec)
560
561 node.replace_self(content)
562 # env.note_indexentries_from(docname, doctree)
563
564
565 def process_sshkeys_nodes(app, doctree, docname):
566 env = app.builder.env
567
568 if not hasattr(env, 'cacert_sshkeys'):
569 env.cacert_sslcerts = []
570
571 for node in doctree.traverse(sshkeylist_node):
572 content = [nodes.target(ids=['sshkeylist'])]
573
574 if len(env.cacert_sshkeys) > 0:
575 table = nodes.table()
576 content.append(table)
577 tgroup = nodes.tgroup(cols=4)
578 tgroup += nodes.colspec(colwidth=1)
579 tgroup += nodes.colspec(colwidth=1)
580 tgroup += nodes.colspec(colwidth=2)
581 tgroup += nodes.colspec(colwidth=2)
582 table += tgroup
583
584 thead = nodes.thead()
585 row = nodes.row()
586 entry = nodes.entry()
587 entry += nodes.paragraph(text="Host")
588 row += entry
589 entry = nodes.entry(morecols=2)
590 entry += nodes.paragraph(text="SSH Host Keys")
591 row += entry
592 thead += row
593 tgroup += thead
594
595 tbody = nodes.tbody()
596 tgroup += tbody
597
598 for keys_info in sorted(env.cacert_sshkeys, key=_sshkeys_item_key):
599 trow = nodes.row()
600 entry = nodes.entry(morerows=len(SUPPORTED_SSH_KEYTYPES))
601 para = nodes.paragraph()
602 para += make_refnode(
603 app.builder, docname, keys_info['docname'],
604 keys_info['secid'],
605 nodes.Text(env.titles[keys_info['docname']].astext())
606 )
607 entry += para
608 trow += entry
609
610 entry = nodes.entry()
611 para = nodes.paragraph()
612 para += nodes.strong(text='Algorithm')
613 entry += para
614 trow += entry
615
616 entry = nodes.entry()
617 para = nodes.paragraph()
618 para += nodes.strong(text='MD5 fingerprint')
619 entry += para
620 trow += entry
621
622 entry = nodes.entry()
623 para = nodes.paragraph()
624 para += nodes.strong(text='SHA256 fingerprint')
625 entry += para
626 trow += entry
627
628 tbody += trow
629
630 for algorithm in SUPPORTED_SSH_KEYTYPES:
631 trow = nodes.row()
632
633 entry = nodes.entry()
634 entry += nodes.paragraph(text=algorithm)
635 trow += entry
636
637 trow += _get_formatted_keyentry(keys_info, algorithm, 'md5')
638 trow += _get_formatted_keyentry(keys_info, algorithm,
639 'sha256')
640 tbody += trow
641 else:
642 content.append(nodes.paragraph(
643 text="No ssh keys have been documented.")
644 )
645
646 node.replace_self(content)
647
648
649 def resolve_missing_reference(app, env, node, contnode):
650 if node['reftype'] == 'certlistref':
651 if hasattr(env, 'cacert_certlistdoc'):
652 return make_refnode(
653 app.builder, node['refdoc'], env.cacert_certlistdoc,
654 node['refid'], contnode)
655 raise SphinxError('No certlist directive found in the document tree')
656 if node['reftype'] == 'sshkeyref':
657 if hasattr(env, 'cacert_sshkeylistdoc'):
658 return make_refnode(
659 app.builder, node['refdoc'], env.cacert_sshkeylistdoc,
660 node['refid'], contnode)
661 raise SphinxError('No sshkeylist directive found in the document tree')
662
663
664 def purge_sslcerts(app, env, docname):
665 if (
666 hasattr(env, 'cacert_certlistdoc') and
667 env.cacert_certlistdoc == docname
668 ):
669 delattr(env, 'cacert_certlistdoc')
670 if not hasattr(env, 'cacert_sslcerts'):
671 return
672 for cert_info in env.cacert_sslcerts:
673 cert_info['places'] = [
674 place for place in cert_info['places']
675 if place['docname'] != docname
676 ]
677
678
679 def purge_sshkeys(app, env, docname):
680 if (
681 hasattr(env, 'cacert_sshkeylistdoc') and
682 env.cacert_sshkeylistdoc == docname
683 ):
684 delattr(env, 'cacert_sshkeylistdoc')
685 if not hasattr(env, 'cacert_sshkeys'):
686 return
687 env.cacert_sshkeys = [
688 keys for keys in env.cacert_sshkeys if keys['docname'] != docname
689 ]
690
691
692 def setup(app):
693 app.add_node(sslcertlist_node)
694 app.add_node(sslcert_node)
695 app.add_node(sshkeylist_node)
696 app.add_node(sshkeys_node)
697
698 app.add_directive('sslcert', CAcertSSLCert)
699 app.add_directive('sslcertlist', CAcertSSLCertList)
700 app.add_directive('sshkeys', CAcertSSHKeys)
701 app.add_directive('sshkeylist', CAcertSSHKeyList)
702
703 app.connect('doctree-read', process_sslcerts)
704 app.connect('doctree-read', process_sshkeys)
705 app.connect('doctree-resolved', process_sslcert_nodes)
706 app.connect('doctree-resolved', process_sshkeys_nodes)
707 app.connect('missing-reference', resolve_missing_reference)
708 app.connect('env-purge-doc', purge_sslcerts)
709 app.connect('env-purge-doc', purge_sshkeys)
710 return {'version': __version__}