Update certificates for funding and jenkins
[cacert-infradocs.git] / docs / sphinxext / cacert.py
1 # -*- python -*-
2 # This module provides the following CAcert specific sphinx directives
3 #
4 # sslcert
5 # sslcertlist
6 # sshkeys
7 # sshkeylist
8
9 import binascii
10 import re
11 import os.path
12 from ipaddress import ip_address
13
14 from docutils import nodes
15 from docutils.parsers.rst import Directive
16 from docutils.parsers.rst import directives
17
18 from sphinx import addnodes
19 from sphinx.errors import SphinxError
20 from sphinx.util.nodes import set_source_info, make_refnode, traverse_parent
21
22 from dateutil.parser import parse as date_parse
23 from base64 import b64decode
24 from validate_email import validate_email
25
26 __version__ = '0.1.0'
27
28 SUPPORTED_SSH_KEYTYPES = ('RSA', 'DSA', 'ECDSA', 'ED25519')
29 SSH_MD5_RE = r'^([0-9a-f]{2}:){15}[0-9a-f]{2}$'
30
31
32 class sslcert_node(nodes.General, nodes.Element):
33 pass
34
35
36 class sslcertlist_node(nodes.General, nodes.Element):
37 pass
38
39
40 class sshkeys_node(nodes.General, nodes.Element):
41 pass
42
43
44 class sshkeylist_node(nodes.General, nodes.Element):
45 pass
46
47
48 # mapping and validation functions for directive options
49
50 def hex_int(argument):
51 value = int(argument, base=16)
52 return value
53
54
55 def ssh_fingerprint(argument):
56 value = argument.strip().split(" ")
57 result = {}
58 for k in value:
59 if k.startswith('SHA256:'):
60 sha256_encoded = k[len('SHA256:'):]
61 try:
62 sha256_decoded = b64decode(sha256_encoded + "=", validate=True)
63 if len(sha256_decoded) != 32:
64 raise ValueError(
65 '{} is no correctly formatted SHA256 fingerprint'.format(
66 k))
67 except binascii.Error:
68 raise ValueError(
69 '{} is no correctly formatted SHA256 fingerprint'.format(k))
70 result['sha256'] = sha256_encoded
71 elif k.startswith('MD5:'):
72 if not re.match(SSH_MD5_RE, k[len('MD5:'):].lower()):
73 raise ValueError(
74 '{} is no correctly formatted MD5 fingerprint'.format(k))
75 result['md5'] = k[len('MD5:'):]
76 else:
77 if not re.match(SSH_MD5_RE, k.lower()):
78 raise ValueError(
79 '{} is no correctly formatted MD5 fingerprint'.format(k))
80 result['md5'] = k.lower()
81 return result
82
83
84 def sha1_fingerprint(argument):
85 value = argument.strip().lower()
86 if not re.match(r'^([0-9a-f]{2}:){19}[0-9a-f]{2}$', value):
87 raise ValueError('no correctly formatted SHA1 fingerprint')
88 return value
89
90
91 def is_valid_hostname(hostname):
92 if len(hostname) > 255:
93 return False
94 if hostname[-1] == ".": # strip exactly one dot from the right, if present
95 hostname = hostname[:-1]
96 allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
97 return all(allowed.match(x) for x in hostname.split("."))
98
99
100 def is_valid_ipaddress(content):
101 try:
102 ip_address(content)
103 except ValueError:
104 return False
105 return True
106
107
108 def subject_alternative_names(argument=""):
109 if not argument:
110 return []
111 value = [san.strip().split(':', 1) for san in argument.split(',')]
112 for typ, content in value:
113 if typ == 'DNS':
114 if not is_valid_hostname(content):
115 raise ValueError("%s is no valid DNS name" % content)
116 elif typ == 'EMAIL':
117 if not validate_email(content):
118 raise ValueError("%s is not a valid email address" % content)
119 elif typ == 'IP':
120 if not is_valid_ipaddress(content):
121 raise ValueError("%s is not a valid IP address" % content)
122 else:
123 raise ValueError(
124 "handling of %s subject alternative names (%s) has not been "
125 "implemented" % (typ, content))
126 return value
127
128
129 def expiration_date(argument):
130 return date_parse(directives.unchanged_required(argument))
131
132
133 class CAcertSSLCert(Directive):
134 """
135 The sslcert directive implementation.
136
137 There must only be one instance of a certificate with the same CN and
138 serial number that is not flagged as secondary
139 """
140 final_argument_whitespace = True
141 required_arguments = 1
142 option_spec = {
143 'certfile': directives.path,
144 'keyfile': directives.path,
145 'serial': hex_int,
146 'expiration': expiration_date,
147 'sha1fp': sha1_fingerprint,
148 'altnames': subject_alternative_names,
149 'issuer': directives.unchanged_required,
150 'secondary': directives.flag
151 }
152
153 def run(self):
154 if 'secondary' in self.options:
155 missing = [
156 required for required in ('certfile', 'keyfile', 'serial')
157 if required not in self.options
158 ]
159 else:
160 missing = [
161 required for required in (
162 'certfile', 'keyfile', 'serial', 'expiration', 'sha1fp',
163 'issuer')
164 if required not in self.options
165 ]
166 if missing:
167 raise self.error(
168 "required option(s) '%s' is/are not set for %s." % (
169 "', '".join(missing), self.name))
170 sslcert = sslcert_node()
171 sslcert.attributes['certdata'] = self.options.copy()
172 sslcert.attributes['certdata']['cn'] = self.arguments[0]
173 set_source_info(self, sslcert)
174
175 env = self.state.document.settings.env
176 targetid = 'sslcert-%s' % env.new_serialno('sslcert')
177 targetnode = nodes.target('', '', ids=[targetid])
178 para = nodes.paragraph()
179 para.append(targetnode)
180 para.append(sslcert)
181 return [para]
182
183
184 class CAcertSSLCertList(Directive):
185 """
186 The sslcertlist directive implementation
187 """
188
189 def run(self):
190 return [sslcertlist_node()]
191
192
193 class CAcertSSHKeys(Directive):
194 """
195 The sshkeys directive implementation that can be used to specify the ssh
196 host keys for a host.
197 """
198 option_spec = {
199 keytype.lower(): ssh_fingerprint for keytype in SUPPORTED_SSH_KEYTYPES
200 }
201
202 def run(self):
203 if len(self.options) == 0:
204 raise self.error(
205 "at least one ssh key fingerprint must be specified. The "
206 "following formats are supported: %s" % ", ".join(
207 SUPPORTED_SSH_KEYTYPES))
208 sshkeys = sshkeys_node()
209 sshkeys.attributes['keys'] = self.options.copy()
210 set_source_info(self, sshkeys)
211
212 env = self.state.document.settings.env
213 secid = 'sshkeys-%s' % env.new_serialno('sshkeys')
214
215 section = nodes.section(ids=[secid])
216 section += nodes.title(text='SSH host keys')
217 section += sshkeys
218 return [section]
219
220
221 class CAcertSSHKeyList(Directive):
222 """
223 The sshkeylist directive implementation
224 """
225
226 def run(self):
227 return [sshkeylist_node()]
228
229
230 def create_table_row(rowdata):
231 row = nodes.row()
232 for cell in rowdata:
233 entry = nodes.entry()
234 row += entry
235 entry += cell
236 return row
237
238
239 def _sslcert_item_key(item):
240 return "%s-%d" % (item['cn'], item['serial'])
241
242
243 def _sshkeys_item_key(item):
244 return "%s" % os.path.basename(item['docname'])
245
246
247 def _build_cert_anchor_name(cn, serial):
248 return 'cert_%s_%d' % (cn.replace('.', '_'), serial)
249
250
251 def _format_subject_alternative_names(altnames):
252 return nodes.paragraph(text=", ".join(
253 [content for _, content in altnames]
254 ))
255
256
257 def _place_sort_key(place):
258 return "%s-%d" % (place['docname'], place['lineno'])
259
260
261 def _file_ref_paragraph(cert_info, filekey, app, env, docname):
262 para = nodes.paragraph()
263
264 places = [place for place in cert_info['places'] if place['primary']]
265 places.extend(sorted([
266 place for place in cert_info['places'] if not place['primary']],
267 key=_place_sort_key))
268
269 for pos in range(len(places)):
270 place = places[pos]
271 title = env.titles[place['docname']].astext().lower()
272 if place['primary'] and len(places) > 1:
273 reftext = nodes.strong(text=title)
274 else:
275 reftext = nodes.Text(title)
276 para += make_refnode(
277 app.builder, docname, place['docname'], place['target']['ids'][0],
278 reftext)
279 para += nodes.Text(":")
280 para += addnodes.literal_emphasis(text=place[filekey])
281 if pos + 1 < len(places):
282 para += nodes.Text(", ")
283 return para
284
285
286 def _format_serial_number(serial):
287 return nodes.paragraph(text="%d (0x%0x)" % (serial, serial))
288
289
290 def _format_expiration_date(expiration):
291 return nodes.paragraph(text=expiration)
292
293
294 def _format_fingerprint(fingerprint):
295 para = nodes.paragraph()
296 para += nodes.literal(text=fingerprint, classes=['fingerprint'])
297 return para
298
299
300 def _get_cert_index_text(cert_info):
301 return "Certificate; %s" % cert_info['cn']
302
303
304 def _get_formatted_keyentry(keys_info, algorithm, fptype):
305 entry = nodes.entry()
306 algkey = algorithm.lower()
307 if algkey in keys_info and fptype in keys_info[algkey]:
308 para = nodes.paragraph()
309 keyfp = nodes.literal(text=keys_info[algkey][fptype])
310 para += keyfp
311 else:
312 para = nodes.paragraph(text="-")
313 entry += para
314 return entry
315
316
317 def process_sslcerts(app, doctree):
318 env = app.builder.env
319 if not hasattr(env, 'cacert_sslcerts'):
320 env.cacert_sslcerts = []
321
322 for node in doctree.traverse(sslcertlist_node):
323 if hasattr(env, 'cacert_certlistdoc'):
324 raise SphinxError(
325 "There must be one sslcertlist directive present in "
326 "the document tree only.")
327 env.cacert_certlistdoc = env.docname
328
329 for node in doctree.traverse(sslcert_node):
330 try:
331 targetnode = node.parent[node.parent.index(node) - 1]
332 if not isinstance(targetnode, nodes.target):
333 raise IndexError
334 except IndexError:
335 targetnode = None
336 certdata = node.attributes['certdata'].copy()
337 existing = [
338 cert_info for cert_info in env.cacert_sslcerts
339 if (cert_info['cn'], cert_info['serial']) ==
340 (certdata['cn'], certdata['serial'])
341 ]
342 place_info = {
343 'docname': env.docname,
344 'lineno': node.line,
345 'certfile': certdata['certfile'],
346 'keyfile': certdata['keyfile'],
347 'primary': 'secondary' not in certdata,
348 'target': targetnode,
349 }
350 if existing:
351 info = existing[0]
352 else:
353 info = {
354 'cn': certdata['cn'],
355 'serial': certdata['serial'],
356 'places': [],
357 }
358 env.cacert_sslcerts.append(info)
359 info['places'].append(place_info)
360 if 'sha1fp' in certdata:
361 info['sha1fp'] = certdata['sha1fp']
362 if 'issuer' in certdata:
363 info['issuer'] = certdata['issuer']
364 if 'expiration' in certdata:
365 info['expiration'] = certdata['expiration']
366 if 'altnames' in certdata:
367 info['altnames'] = certdata['altnames'].copy()
368 indexnode = addnodes.index(entries=[
369 ('pair', _get_cert_index_text(info), targetnode['ids'][0],
370 '', None)
371 ])
372
373 bullets = nodes.bullet_list()
374 certitem = nodes.list_item()
375 bullets += certitem
376 certpara = nodes.paragraph()
377 certpara += nodes.Text('Certificate for CN %s, see ' % certdata['cn'])
378 refid = _build_cert_anchor_name(certdata['cn'], certdata['serial'])
379 detailref = addnodes.pending_xref(
380 reftype='certlistref', refdoc=env.docname, refid=refid,
381 reftarget='certlist'
382 )
383 detailref += nodes.Text("details in the certificate list")
384 certpara += detailref
385 certitem += certpara
386
387 subbullets = nodes.bullet_list()
388 bullets += subbullets
389 item = nodes.list_item()
390 subbullets += item
391 certfile = nodes.paragraph(text="certificate in file ")
392 certfile += addnodes.literal_emphasis(
393 text=certdata['certfile']) # , node.line)
394 item += certfile
395 item = nodes.list_item()
396 subbullets += item
397 keyfile = nodes.paragraph(text="private key in file ")
398 keyfile += addnodes.literal_emphasis(text=certdata['keyfile'])
399 # keyfile += _create_interpreted_file_node(
400 # certdata['keyfile'], node.line)
401 item += keyfile
402
403 node.parent.replace_self([targetnode, indexnode, bullets])
404 # env.note_indexentries_from(env.docname, doctree)
405
406
407 def process_sshkeys(app, doctree):
408 env = app.builder.env
409 if not hasattr(env, 'cacert_sshkeys'):
410 env.cacert_sshkeys = []
411
412 for _ in doctree.traverse(sshkeylist_node):
413 if hasattr(env, 'cacert_sshkeylistdoc'):
414 raise SphinxError(
415 "There must be one sshkeylist directive present in "
416 "the document tree only.")
417 env.cacert_sshkeylistdoc = env.docname
418
419 for node in doctree.traverse(sshkeys_node):
420 # find section
421 section = [s for s in traverse_parent(node, nodes.section)][0]
422 doc_keys = {'docname': env.docname, 'secid': section['ids'][0]}
423 doc_keys.update(node['keys'])
424 env.cacert_sshkeys.append(doc_keys)
425
426 secparent = section.parent
427 pos = secparent.index(section)
428 # add index node for section
429 indextitle = 'SSH host key; %s' % (
430 env.docname in env.titles and env.titles[env.docname].astext()
431 or os.path.basename(env.docname)
432 )
433 secparent.insert(pos, addnodes.index(entries=[
434 ('pair', indextitle, section['ids'][0], '', None),
435 ]))
436
437 # add table
438 content = []
439 table = nodes.table()
440 content.append(table)
441 cols = (1, 4)
442 tgroup = nodes.tgroup(cols=len(cols))
443 table += tgroup
444 for col in cols:
445 tgroup += nodes.colspec(colwidth=col)
446 thead = nodes.thead()
447 tgroup += thead
448 thead += create_table_row([
449 nodes.paragraph(text='Algorithm'),
450 nodes.paragraph(text='Fingerprints'),
451 ])
452 tbody = nodes.tbody()
453 tgroup += tbody
454 for alg in SUPPORTED_SSH_KEYTYPES:
455 alg_key = alg.lower()
456 if alg_key in doc_keys:
457 result = []
458 fpparagraph = nodes.paragraph()
459 for ktype in ('sha256', 'md5'):
460 if ktype in doc_keys[alg_key]:
461 result.append("{}:{}".format(
462 ktype.upper(), doc_keys[alg_key][ktype]))
463 for idx in range(len(result)):
464 fpparagraph += nodes.literal(text=result[idx])
465 if idx < len(result) - 1:
466 fpparagraph += nodes.inline(text=", ")
467 else:
468 fpparagraph = nodes.paragraph(text='-')
469 tbody += create_table_row([
470 nodes.paragraph(text=alg),
471 fpparagraph,
472 ])
473 # add pending_xref for link to ssh key list
474 seealso = addnodes.seealso()
475 content.append(seealso)
476 detailref = addnodes.pending_xref(
477 reftype='sshkeyref', refdoc=env.docname, refid='sshkeylist',
478 reftarget='sshkeylist'
479 )
480 detailref += nodes.Text("SSH host key list")
481 seepara = nodes.paragraph()
482 seepara += detailref
483 seealso += seepara
484
485 node.replace_self(content)
486
487
488 def process_sslcert_nodes(app, doctree, docname):
489 env = app.builder.env
490
491 if not hasattr(env, 'cacert_sslcerts'):
492 env.cacert_sslcerts = []
493
494 for node in doctree.traverse(sslcertlist_node):
495 content = []
496
497 for cert_info in sorted(env.cacert_sslcerts, key=_sslcert_item_key):
498 primarycount = len([
499 place for place in cert_info['places'] if place['primary']
500 ])
501 if primarycount != 1:
502 raise SphinxError(
503 "There must be exactly one primary place for a "
504 "certificate, but the certificate for CN %s with "
505 "serial number %d has %d" %
506 (cert_info['cn'], cert_info['serial'], primarycount)
507 )
508 cert_sec = nodes.section()
509 cert_sec['ids'].append(
510 _build_cert_anchor_name(cert_info['cn'],
511 cert_info['serial'])
512 )
513 cert_sec += nodes.title(text=cert_info['cn'])
514 indexnode = addnodes.index(entries=[
515 ('pair', _get_cert_index_text(cert_info),
516 cert_sec['ids'][0], '', None),
517 ])
518 content.append(indexnode)
519 table = nodes.table()
520 cert_sec += table
521 tgroup = nodes.tgroup(cols=2)
522 table += tgroup
523 tgroup += nodes.colspec(colwidth=1)
524 tgroup += nodes.colspec(colwidth=5)
525 tbody = nodes.tbody()
526 tgroup += tbody
527 tbody += create_table_row([
528 nodes.paragraph(text='Common Name'),
529 nodes.paragraph(text=cert_info['cn'])
530 ])
531 if 'altnames' in cert_info:
532 tbody += create_table_row([
533 nodes.paragraph(text='Subject Alternative Names'),
534 _format_subject_alternative_names(
535 cert_info['altnames'])
536 ])
537 tbody += create_table_row([
538 nodes.paragraph(text='Key kept at'),
539 _file_ref_paragraph(cert_info, 'keyfile', app, env, docname)
540 ])
541 tbody += create_table_row([
542 nodes.paragraph(text='Cert kept at'),
543 _file_ref_paragraph(cert_info, 'certfile', app, env, docname)
544 ])
545 tbody += create_table_row([
546 nodes.paragraph(text='Serial number'),
547 _format_serial_number(cert_info['serial'])
548 ])
549 tbody += create_table_row([
550 nodes.paragraph(text='Expiration date'),
551 _format_expiration_date(cert_info['expiration'])
552 ])
553 tbody += create_table_row([
554 nodes.paragraph(text='Issuer'),
555 nodes.paragraph(text=cert_info['issuer'])
556 ])
557 tbody += create_table_row([
558 nodes.paragraph(text='SHA1 fingerprint'),
559 _format_fingerprint(cert_info['sha1fp'])
560 ])
561 content.append(cert_sec)
562
563 node.replace_self(content)
564 # env.note_indexentries_from(docname, doctree)
565
566
567 def process_sshkeys_nodes(app, doctree, docname):
568 env = app.builder.env
569
570 if not hasattr(env, 'cacert_sshkeys'):
571 env.cacert_sslcerts = []
572
573 for node in doctree.traverse(sshkeylist_node):
574 content = [nodes.target(ids=['sshkeylist'])]
575
576 if len(env.cacert_sshkeys) > 0:
577 table = nodes.table()
578 content.append(table)
579 tgroup = nodes.tgroup(cols=4)
580 tgroup += nodes.colspec(colwidth=1)
581 tgroup += nodes.colspec(colwidth=1)
582 tgroup += nodes.colspec(colwidth=2)
583 tgroup += nodes.colspec(colwidth=2)
584 table += tgroup
585
586 thead = nodes.thead()
587 row = nodes.row()
588 entry = nodes.entry()
589 entry += nodes.paragraph(text="Host")
590 row += entry
591 entry = nodes.entry(morecols=2)
592 entry += nodes.paragraph(text="SSH Host Keys")
593 row += entry
594 thead += row
595 tgroup += thead
596
597 tbody = nodes.tbody()
598 tgroup += tbody
599
600 for keys_info in sorted(env.cacert_sshkeys, key=_sshkeys_item_key):
601 trow = nodes.row()
602 entry = nodes.entry(morerows=len(SUPPORTED_SSH_KEYTYPES))
603 para = nodes.paragraph()
604 para += make_refnode(
605 app.builder, docname, keys_info['docname'],
606 keys_info['secid'],
607 nodes.Text(env.titles[keys_info['docname']].astext())
608 )
609 entry += para
610 trow += entry
611
612 entry = nodes.entry()
613 para = nodes.paragraph()
614 para += nodes.strong(text='Algorithm')
615 entry += para
616 trow += entry
617
618 entry = nodes.entry()
619 para = nodes.paragraph()
620 para += nodes.strong(text='MD5 fingerprint')
621 entry += para
622 trow += entry
623
624 entry = nodes.entry()
625 para = nodes.paragraph()
626 para += nodes.strong(text='SHA256 fingerprint')
627 entry += para
628 trow += entry
629
630 tbody += trow
631
632 for algorithm in SUPPORTED_SSH_KEYTYPES:
633 trow = nodes.row()
634
635 entry = nodes.entry()
636 entry += nodes.paragraph(text=algorithm)
637 trow += entry
638
639 trow += _get_formatted_keyentry(keys_info, algorithm, 'md5')
640 trow += _get_formatted_keyentry(keys_info, algorithm,
641 'sha256')
642 tbody += trow
643 else:
644 content.append(nodes.paragraph(
645 text="No ssh keys have been documented.")
646 )
647
648 node.replace_self(content)
649
650
651 def resolve_missing_reference(app, env, node, contnode):
652 if node['reftype'] == 'certlistref':
653 if hasattr(env, 'cacert_certlistdoc'):
654 return make_refnode(
655 app.builder, node['refdoc'], env.cacert_certlistdoc,
656 node['refid'], contnode)
657 raise SphinxError('No certlist directive found in the document tree')
658 if node['reftype'] == 'sshkeyref':
659 if hasattr(env, 'cacert_sshkeylistdoc'):
660 return make_refnode(
661 app.builder, node['refdoc'], env.cacert_sshkeylistdoc,
662 node['refid'], contnode)
663 raise SphinxError('No sshkeylist directive found in the document tree')
664
665
666 def purge_sslcerts(app, env, docname):
667 if (
668 hasattr(env, 'cacert_certlistdoc') and
669 env.cacert_certlistdoc == docname
670 ):
671 delattr(env, 'cacert_certlistdoc')
672 if not hasattr(env, 'cacert_sslcerts'):
673 return
674 for cert_info in env.cacert_sslcerts:
675 cert_info['places'] = [
676 place for place in cert_info['places']
677 if place['docname'] != docname
678 ]
679
680
681 def purge_sshkeys(app, env, docname):
682 if (
683 hasattr(env, 'cacert_sshkeylistdoc') and
684 env.cacert_sshkeylistdoc == docname
685 ):
686 delattr(env, 'cacert_sshkeylistdoc')
687 if not hasattr(env, 'cacert_sshkeys'):
688 return
689 env.cacert_sshkeys = [
690 keys for keys in env.cacert_sshkeys if keys['docname'] != docname
691 ]
692
693
694 def setup(app):
695 app.add_node(sslcertlist_node)
696 app.add_node(sslcert_node)
697 app.add_node(sshkeylist_node)
698 app.add_node(sshkeys_node)
699
700 app.add_directive('sslcert', CAcertSSLCert)
701 app.add_directive('sslcertlist', CAcertSSLCertList)
702 app.add_directive('sshkeys', CAcertSSHKeys)
703 app.add_directive('sshkeylist', CAcertSSHKeyList)
704
705 app.connect('doctree-read', process_sslcerts)
706 app.connect('doctree-read', process_sshkeys)
707 app.connect('doctree-resolved', process_sslcert_nodes)
708 app.connect('doctree-resolved', process_sshkeys_nodes)
709 app.connect('missing-reference', resolve_missing_reference)
710 app.connect('env-purge-doc', purge_sslcerts)
711 app.connect('env-purge-doc', purge_sshkeys)
712 return {'version': __version__}