# Copyright (C) Internet Systems Consortium, Inc. ("ISC") # # SPDX-License-Identifier: MPL-2.0 # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, you can obtain one at https://mozilla.org/MPL/2.0/. # # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. from datetime import timedelta import os import re import shutil import time import pytest from isctest.kasp import KeyTimingMetadata from isctest.vars.algorithms import Algorithm from rollover.common import TIMEDELTA import isctest pytestmark = pytest.mark.extra_artifacts( [ "K*", "common.test.*", "fast.test.*", "future.test.*", "in-the-middle.test.*", "ksk-roll.test.*", "last-bundle.test.*", "past.test.*", "two-tone.test.*", "unlimited.test.*", "invalid-skr.test.*", "ns1/K*", "ns1/_default.nzd", "ns1/_default.nzf", "ns1/common.test.db", "ns1/common.test.db.jbk", "ns1/common.test.db.signed", "ns1/common.test.db.signed.jnl", "ns1/common.test.skr.2", "ns1/fast.test.db", "ns1/fast.test.db.jbk", "ns1/fast.test.db.signed", "ns1/fast.test.db.signed.jnl", "ns1/fast.test.skr.1", "ns1/future.test.db", "ns1/future.test.db.jbk", "ns1/future.test.db.signed", "ns1/future.test.skr.1", "ns1/in-the-middle.test.db", "ns1/in-the-middle.test.db.jbk", "ns1/in-the-middle.test.db.signed", "ns1/in-the-middle.test.db.signed.jnl", "ns1/in-the-middle.test.skr.1", "ns1/keydir", "ns1/ksk-roll.test.db", "ns1/ksk-roll.test.db.jbk", "ns1/ksk-roll.test.db.signed", "ns1/ksk-roll.test.db.signed.jnl", "ns1/ksk-roll.test.skr.1", "ns1/last-bundle.test.db", "ns1/last-bundle.test.db.jbk", "ns1/last-bundle.test.db.signed", "ns1/last-bundle.test.db.signed.jnl", "ns1/last-bundle.test.skr.1", "ns1/offline", "ns1/past.test.db", "ns1/past.test.db.jbk", "ns1/past.test.db.signed", "ns1/past.test.skr.1", "ns1/two-tone.test.db", "ns1/two-tone.test.db.jbk", "ns1/two-tone.test.db.signed", "ns1/two-tone.test.db.signed.jnl", "ns1/two-tone.test.skr.1", "ns1/unlimited.test.db", "ns1/unlimited.test.db.jbk", "ns1/unlimited.test.db.signed", "ns1/unlimited.test.db.signed.jnl", "ns1/unlimited.test.unlimited.skr.1", "ns1/invalid-skr.test.db", "ns1/invalid-skr.test.db.jbk", "ns1/invalid-skr.test.db.signed", "ns1/invalid-skr.test.db.signed.jnl", "ns1/invalid-skr.test.skr.1", ] ) CONFIG = { "dnskey-ttl": TIMEDELTA["PT1H"], "ds-ttl": TIMEDELTA["P1D"], "max-zone-ttl": TIMEDELTA["P1D"], "parent-propagation-delay": TIMEDELTA["PT1H"], "publish-safety": TIMEDELTA["PT1H"], "retire-safety": TIMEDELTA["PT1H"], "signatures-refresh": TIMEDELTA["P5D"], "signatures-validity": TIMEDELTA["P14D"], "zone-propagation-delay": TIMEDELTA["PT5M"], } FASTCONFIG = { "dnskey-ttl": timedelta(seconds=439), "ds-ttl": TIMEDELTA["PT1H"], "max-zone-ttl": timedelta(seconds=4396), "parent-propagation-delay": timedelta(seconds=5), "publish-safety": timedelta(seconds=1), "retire-safety": timedelta(seconds=1), "signatures-refresh": timedelta(seconds=2), "signatures-validity": timedelta(seconds=6), "zone-propagation-delay": timedelta(seconds=439), } def between(value, start, end): if value is None or start is None or end is None: return False return start < value < end def ksr(zone, policy, action, options="", raise_on_exception=True, to_file=""): ksr_command = [ os.environ.get("KSR"), "-l", "ns1/named.conf", "-k", policy, *options.split(), action, zone, ] if to_file: with open(to_file, "wb") as f: cmd = isctest.run.cmd( ksr_command, raise_on_exception=raise_on_exception, stdout=f ) else: cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception) return cmd def sign_delay(config): return config["signatures-validity"] - config["signatures-refresh"] def check_keys( keys, lifetime, config, alg=None, size=None, offset=0, with_state=False, ): # Check keys that were created. if alg is None: alg = Algorithm.default().number if size is None: size = Algorithm.default().bits num = 0 now = KeyTimingMetadata.now() for key in keys: # created: from keyfile plus offset created = key.get_timing("Created") + offset # active: retired previous key active = created if num > 0 and retired is not None: active = retired # published: dnskey-ttl + publish-safety + propagation pubtime = ( config["dnskey-ttl"] + config["publish-safety"] + config["zone-propagation-delay"] ) published = active - pubtime # retired: zsk-lifetime if lifetime is not None: retired = active + lifetime if key.is_ksk(): # removed: ttlds + retire-safety + parent-propagation remtime = ( config["ds-ttl"] + config["retire-safety"] + config["parent-propagation-delay"] ) removed = retired + remtime else: # removed: ttlsig + retire-safety + sign-delay + propagation remtime = ( config["max-zone-ttl"] + config["retire-safety"] + config["zone-propagation-delay"] + sign_delay(config) ) removed = retired + remtime else: retired = None removed = None goal = "hidden" state_dnskey = "hidden" state_zrrsig = "hidden" state_krrsig = "hidden" state_ds = "hidden" if retired is None or between(now, published, retired): goal = "omnipresent" pubdelay = published + pubtime signdelay = active + sign_delay(config) if between(now, published, pubdelay): state_dnskey = "rumoured" state_krrsig = "rumoured" else: state_dnskey = "omnipresent" state_krrsig = "omnipresent" if key.is_ksk(): state_ds = "hidden" else: if between(now, active, signdelay): state_zrrsig = "rumoured" else: state_zrrsig = "omnipresent" with open(key.statefile, "r", encoding="utf-8") as file: metadata = file.read() assert f"Algorithm: {alg}" in metadata assert f"Length: {size}" in metadata if key.is_ksk(): assert "KSK: yes" in metadata else: assert "KSK: no" in metadata if key.is_zsk(): assert "ZSK: yes" in metadata else: assert "ZSK: no" in metadata assert f"Published: {published}" in metadata assert f"Active: {active}" in metadata if lifetime is not None: assert f"Retired: {retired}" in metadata assert f"Removed: {removed}" in metadata assert f"Lifetime: {int(lifetime.total_seconds())}" in metadata else: assert "Lifetime: 0" in metadata assert "Retired:" not in metadata assert "Removed:" not in metadata if with_state: assert f"GoalState: {goal}" in metadata assert f"DNSKEYState: {state_dnskey}" in metadata if key.is_ksk(): assert f"KRRSIGState: {state_krrsig}" in metadata assert f"DSState: {state_ds}" in metadata else: assert "KRRSIGState:" not in metadata assert "DSState:" not in metadata if key.is_zsk(): assert f"ZRRSIGState: {state_zrrsig}" in metadata else: assert "ZRRSIGState:" not in metadata num += 1 def check_key_bundle(bundle_keys, bundle_lines, cdnskey=False): count = 0 for key in bundle_keys: found = False for line in bundle_lines: if key.dnskey_equals(line, cdnskey): found = True count += 1 assert found assert count == len(bundle_keys) assert count == len(bundle_lines) def check_cds_bundle(bundle_keys, bundle_lines, expected_cds): count = 0 for key in bundle_keys: found = False # the cds of this ksk must be in the ksr for line in bundle_lines: for alg in expected_cds: if key.cds_equals(line, alg.strip()): found = True count += 1 assert found assert count == len(expected_cds) * len(bundle_keys) assert count == len(bundle_lines) def check_rrsig_bundle(bundle_keys, bundle_lines, zone, rrtype, sigend, sigstart, ttl): count = 0 for key in bundle_keys: found = False alg = key.get_metadata("Algorithm") expect = f"{zone}. {ttl} IN RRSIG {rrtype} {alg} 2 {ttl} {sigend} {sigstart} {key.tag} {zone}." # there must be a signature of this ksk for line in bundle_lines: rrsig = " ".join(line.split()) if expect in rrsig: found = True count += 1 assert found, f"Expected string not found: {expect}" assert count == len(bundle_keys) assert count == len(bundle_lines) def check_keysigningrequest(path, zsks, start, end): with open(path, "r", encoding="utf-8") as f: lines = f.readlines() line_no = 0 inception = start while inception <= end: next_bundle = end + 1 # expect bundle header assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no] line_no += 1 bundle_keys = [] bundle_lines = [] # expect zsks for key in zsks: published = key.get_timing("Publish") if between(published, inception, next_bundle): next_bundle = published removed = key.get_timing("Delete", must_exist=False) if between(removed, inception, next_bundle): next_bundle = removed if published > inception: continue if removed is not None and inception >= removed: continue # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) bundle_lines.append(lines[line_no]) line_no += 1 check_key_bundle(bundle_keys, bundle_lines) inception = next_bundle # ksr footer assert ";; KeySigningRequest 1.0 generated at" in lines[line_no] line_no += 1 # trailing empty lines while line_no < len(lines): assert lines[line_no].rstrip() == "" line_no += 1 assert line_no == len(lines) def check_signedkeyresponse( path, config, zone, ksks, zsks, start, end, refresh, cdnskey=True, cds="SHA-256", ): with open(path, "r", encoding="utf-8") as f: lines = f.readlines() line_no = 0 next_bundle = end + 1 dnskey_ttl = int(config["dnskey-ttl"].total_seconds()) inception = start while inception <= end: # A single signed key response may consist of: # ;; SignedKeyResponse (header) # ;; DNSKEY 257 (one per published key in ksks) # ;; DNSKEY 256 (one per published key in zsks) # ;; RRSIG(DNSKEY) (one per active key in ksks) # ;; CDNSKEY (one per published key in ksks) # ;; RRSIG(CDNSKEY) (one per active key in ksks) # ;; CDS (one per published key in ksks) # ;; RRSIG(CDS) (one per active key in ksks) sigstart = inception - timedelta(hours=1) # clockskew sigend = inception + config["signatures-validity"] next_bundle = sigend + refresh # ignore empty lines while line_no < len(lines): if lines[line_no].rstrip() == "": line_no += 1 else: break # expect bundle header assert f";; SignedKeyResponse 1.0 {inception}" in lines[line_no] line_no += 1 # expect ksks bundle_keys = [] bundle_lines = [] for key in ksks: published = key.get_timing("Publish") if between(published, inception, next_bundle): next_bundle = published removed = key.get_timing("Delete", must_exist=False) if published > inception: continue if removed is not None and inception >= removed: continue if between(removed, inception, next_bundle): next_bundle = removed # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) bundle_lines.append(lines[line_no]) line_no += 1 check_key_bundle(bundle_keys, bundle_lines) # expect zsks bundle_keys = [] bundle_lines = [] for key in zsks: published = key.get_timing("Publish") if between(published, inception, next_bundle): next_bundle = published removed = key.get_timing("Delete", must_exist=False) if between(removed, inception, next_bundle): next_bundle = removed if published > inception: continue if removed is not None and inception >= removed: continue # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) bundle_lines.append(lines[line_no]) line_no += 1 check_key_bundle(bundle_keys, bundle_lines) # expect rrsig(dnskey) bundle_keys = [] bundle_lines = [] for key in ksks: active = key.get_timing("Activate") inactive = key.get_timing("Inactive", must_exist=False) if active > inception: continue if inactive is not None and inception > inactive: continue # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) bundle_lines.append(lines[line_no]) line_no += 1 check_rrsig_bundle( bundle_keys, bundle_lines, zone, "DNSKEY", sigend, sigstart, dnskey_ttl ) # expect cdnskey have_cdnskey = False if cdnskey: bundle_keys = [] bundle_lines = [] for key in ksks: published = key.get_timing("SyncPublish") if between(published, inception, next_bundle): next_bundle = published removed = key.get_timing("SyncDelete", must_exist=False) if between(removed, inception, next_bundle): next_bundle = removed if published > inception: continue if removed is not None and inception >= removed: continue # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) bundle_lines.append(lines[line_no]) line_no += 1 have_cdnskey = True check_key_bundle(bundle_keys, bundle_lines, cdnskey=True) if have_cdnskey: # expect rrsig(cdnskey) bundle_keys = [] bundle_lines = [] for key in ksks: active = key.get_timing("Activate") inactive = key.get_timing("Inactive", must_exist=False) if active > inception: continue if inactive is not None and inception > inactive: continue # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) bundle_lines.append(lines[line_no]) line_no += 1 check_rrsig_bundle( bundle_keys, bundle_lines, zone, "CDNSKEY", sigend, sigstart, dnskey_ttl, ) # expect cds have_cds = False if cds != "": bundle_keys = [] bundle_lines = [] expected_cds = cds.split(",") for key in ksks: published = key.get_timing("SyncPublish") if between(published, inception, next_bundle): next_bundle = published removed = key.get_timing("SyncDelete", must_exist=False) if between(removed, inception, next_bundle): next_bundle = removed if published > inception: continue if removed is not None and inception >= removed: continue # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) for _arg in expected_cds: bundle_lines.append(lines[line_no]) line_no += 1 have_cds = True check_cds_bundle(bundle_keys, bundle_lines, expected_cds) if have_cds: # expect rrsig(cds) bundle_keys = [] bundle_lines = [] for key in ksks: active = key.get_timing("Activate") inactive = key.get_timing("Inactive", must_exist=False) if active > inception: continue if inactive is not None and inception > inactive: continue # collect keys that should be in this bundle # collect lines that should be in this bundle bundle_keys.append(key) bundle_lines.append(lines[line_no]) line_no += 1 check_rrsig_bundle( bundle_keys, bundle_lines, zone, "CDS", sigend, sigstart, dnskey_ttl ) inception = next_bundle # skr footer assert ";; SignedKeyResponse 1.0 generated at" in lines[line_no] line_no += 1 # trailing empty lines while line_no < len(lines): assert lines[line_no] == "" line_no += 1 assert line_no == len(lines) def test_ksr_errors(): # check that 'dnssec-ksr' errors on unknown action cmd = ksr("common.test", "common", "foobar", raise_on_exception=False) assert "dnssec-ksr: fatal: unknown command 'foobar'" in cmd.err # check that 'dnssec-ksr keygen' errors on missing end date cmd = ksr("common.test", "common", "keygen", raise_on_exception=False) assert "dnssec-ksr: fatal: keygen requires an end date" in cmd.err # check that 'dnssec-ksr keygen' errors on zone with csk cmd = ksr( "csk.test", "csk", "keygen", options="-K ns1 -e +2y", raise_on_exception=False ) assert "dnssec-ksr: fatal: no keys created for policy 'csk'" in cmd.err # check that 'dnssec-ksr request' errors on missing end date cmd = ksr("common.test", "common", "request", raise_on_exception=False) assert "dnssec-ksr: fatal: request requires an end date" in cmd.err # check that 'dnssec-ksr sign' errors on missing ksr file cmd = ksr( "common.test", "common", "sign", options="-K ns1/offline -i now -e +1y", raise_on_exception=False, ) assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in cmd.err def test_ksr_common(ns1): # common test cases (1) zone = "common.test" policy = "common" n = 1 # create ksk kskdir = "ns1/offline" cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, CONFIG) # check that 'dnssec-ksr keygen' pregenerates right amount of keys cmd = ksr(zone, policy, "keygen", options="-i now -e +1y") zsks = isctest.kasp.keystr_to_keylist(cmd.out) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) check_keys(zsks, lifetime, CONFIG) # check that 'dnssec-ksr keygen' pregenerates right amount of keys # in the given key directory zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) check_keys(zsks, lifetime, CONFIG) for key in zsks: privatefile = f"{key.path}.private" keyfile = f"{key.path}.key" statefile = f"{key.path}.state" shutil.copyfile(privatefile, f"{privatefile}.backup") shutil.copyfile(keyfile, f"{keyfile}.backup") shutil.copyfile(statefile, f"{statefile}.backup") # check that 'dnssec-ksr request' creates correct ksr now = zsks[0].get_timing("Created") until = now + timedelta(days=365) ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y", to_file=ksr_fname, ) check_keysigningrequest(ksr_fname, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr refresh = -432000 # 5 days skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y", to_file=skr_fname, ) check_signedkeyresponse(skr_fname, CONFIG, zone, ksks, zsks, now, until, refresh) # common test cases (2) n = 2 # check that 'dnssec-ksr keygen' selects pregenerated keys for # the same time bundle cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y") selected_zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(selected_zsks) == 2 for index, key in enumerate(selected_zsks): assert zsks[index] == key isctest.check.file_contents_equal( f"{key.path}.private", f"{key.path}.private.backup" ) isctest.check.file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") isctest.check.file_contents_equal( f"{key.path}.state", f"{key.path}.state.backup" ) # check that 'dnssec-ksr keygen' generates only necessary keys for # overlapping time bundle cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1") overlapping_zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(overlapping_zsks) == 4 selected = len(re.findall("Selecting key pair", cmd.err)) generated = len(re.findall("Generating key pair", cmd.err)) - len( re.findall("collide", cmd.err) ) assert selected == 2 assert generated == 2 for index, key in enumerate(overlapping_zsks): if index < 2: assert zsks[index] == key isctest.check.file_contents_equal( f"{key.path}.private", f"{key.path}.private.backup" ) isctest.check.file_contents_equal( f"{key.path}.key", f"{key.path}.key.backup" ) isctest.check.file_contents_equal( f"{key.path}.state", f"{key.path}.state.backup" ) # run 'dnssec-ksr keygen' again with verbosity 0 cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y") overlapping_zsks2 = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(overlapping_zsks2) == 4 check_keys(overlapping_zsks2, lifetime, CONFIG) for index, key in enumerate(overlapping_zsks2): assert overlapping_zsks[index] == key # check that 'dnssec-ksr request' creates correct ksr if the # interval is shorter ksr_fname = f"{zone}.ksr.{n}.shorter" ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y", to_file=ksr_fname) check_keysigningrequest(ksr_fname, zsks, now, until) # check that 'dnssec-ksr request' creates correct ksr with new interval until = now + timedelta(days=365 * 2) ksr_fname = f"{zone}.ksr.{n}" ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y", to_file=ksr_fname) check_keysigningrequest(ksr_fname, overlapping_zsks, now, until) # check that 'dnssec-ksr request' errors if there are not enough keys cmd = ksr( zone, policy, "request", options=f"-K ns1 -i {now} -e +3y", raise_on_exception=False, ) errmsg = ( f"dnssec-ksr: fatal: no {zone}/ECDSAP256SHA256 zsk key pair found for bundle" ) assert errmsg in cmd.err # check that 'dnssec-ksr sign' creates correct skr refresh = -432000 # 5 days skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K ns1/offline -f {ksr_fname} -i {now} -e +2y", to_file=skr_fname, ) check_signedkeyresponse( skr_fname, CONFIG, zone, ksks, overlapping_zsks, now, until, refresh, ) # add zone ns1.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};" ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # test zone is correctly signed # - check rndc dnssec -status output isctest.kasp.check_dnssecstatus(ns1, zone, overlapping_zsks, policy=policy) # - dnssec_verify isctest.kasp.check_dnssec_verify(ns1, zone) # - check keys check_keys(overlapping_zsks, lifetime, CONFIG, with_state=True) # - check apex isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks, offline_ksk=True) # - check subdomain isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True) def test_ksr_lastbundle(ns1): zone = "last-bundle.test" policy = "common" n = 1 # create ksk kskdir = "ns1/offline" offset = -timedelta(days=365) cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, CONFIG, offset=offset) # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) check_keys(zsks, lifetime, CONFIG, offset=offset) # check that 'dnssec-ksr request' creates correct ksr then = zsks[0].get_timing("Created") + offset until = then + timedelta(days=366) ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1d", to_file=ksr_fname, ) check_keysigningrequest(ksr_fname, zsks, then, until) # check that 'dnssec-ksr sign' creates correct skr refresh = -432000 # 5 days skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {then} -e +1d", to_file=skr_fname, ) check_signedkeyresponse(skr_fname, CONFIG, zone, ksks, zsks, then, until, refresh) # add zone ns1.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};", ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # test zone is correctly signed # - check rndc dnssec -status output isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy) # - dnssec_verify isctest.kasp.check_dnssec_verify(ns1, zone) # - check keys check_keys(zsks, lifetime, CONFIG, offset=offset, with_state=True) # - check apex isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) # check that last bundle warning is logged warning = "last bundle in skr, please import new skr file" assert f"zone {zone}/IN (signed): zone_rekey: {warning}" in ns1.log def test_ksr_inthemiddle(ns1): zone = "in-the-middle.test" policy = "common" n = 1 # create ksk kskdir = "ns1/offline" offset = -timedelta(days=365) cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, CONFIG, offset=offset) # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(zsks) == 4 lifetime = timedelta(days=31 * 6) check_keys(zsks, lifetime, CONFIG, offset=offset) # check that 'dnssec-ksr request' creates correct ksr then = zsks[0].get_timing("Created") then = then + offset until = then + timedelta(days=365 * 2) ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1y", to_file=ksr_fname, ) check_keysigningrequest(ksr_fname, zsks, then, until) # check that 'dnssec-ksr sign' creates correct skr refresh = -432000 # 5 days skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {then} -e +1y", to_file=skr_fname, ) check_signedkeyresponse(skr_fname, CONFIG, zone, ksks, zsks, then, until, refresh) # add zone ns1.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};", ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # test zone is correctly signed # - check rndc dnssec -status output isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy) # - dnssec_verify isctest.kasp.check_dnssec_verify(ns1, zone) # - check keys check_keys(zsks, lifetime, CONFIG, offset=offset, with_state=True) # - check apex isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) # check that no last bundle warning is logged warning = "last bundle in skr, please import new skr file" assert f"zone {zone}/IN (signed): zone_rekey: {warning}" not in ns1.log def check_ksr_rekey_logs_error(server, zone, policy, offset, end): n = 1 # create ksk kskdir = "ns1/offline" now = KeyTimingMetadata.now() then = now + offset until = now + end cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 1 # key generation zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(zsks) == 2 # create request now = zsks[0].get_timing("Created") then = now + offset until = now + end ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {then} -e {until}", to_file=ksr_fname, ) # sign request skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {then} -e {until}", to_file=skr_fname, ) # add zone server.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};", ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") server.rndc(f"skr -import {skr_fname} {zone}") # test that rekey logs error time_remaining = 10 warning = "no available SKR bundle" line = f"zone {zone}/IN (signed): zone_rekey failure: {warning}" while time_remaining > 0: if line not in server.log: time_remaining -= 1 time.sleep(1) else: break assert line in server.log def test_ksr_rekey_logs_error(ns1): # check that an SKR that is too old logs error check_ksr_rekey_logs_error(ns1, "past.test", "common", -63072000, -31536000) # check that an SKR that is too new logs error check_ksr_rekey_logs_error(ns1, "future.test", "common", 2592000, 31536000) def test_ksr_unlimited(ns1): zone = "unlimited.test" policy = "unlimited" n = 1 # create ksk kskdir = "ns1/offline" cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, CONFIG) # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(zsks) == 1 lifetime = None check_keys(zsks, lifetime, CONFIG) # check that 'dnssec-ksr request' creates correct ksr now = zsks[0].get_timing("Created") until = now + timedelta(days=365 * 4) ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {now} -e +4y", to_file=ksr_fname, ) check_keysigningrequest(ksr_fname, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr without cdnskey refresh = -432000 # 5 days skr_fname = f"{zone}.no-cdnskey.skr.{n}" ksr( zone, "no-cdnskey", "sign", options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y", to_file=skr_fname, ) check_signedkeyresponse( skr_fname, CONFIG, zone, ksks, zsks, now, until, refresh, cdnskey=False, cds="SHA-1, SHA-256, SHA-384", ) # check that 'dnssec-ksr sign' creates correct skr without cds refresh = -432000 # 5 days skr_fname = f"{zone}.no-cds.skr.{n}" ksr( zone, "no-cds", "sign", options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y", to_file=skr_fname, ) check_signedkeyresponse( skr_fname, CONFIG, zone, ksks, zsks, now, until, refresh, cds="", ) # check that 'dnssec-ksr sign' creates correct skr refresh = -432000 # 5 days skr_fname = f"{zone}.{policy}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y", to_file=skr_fname, ) check_signedkeyresponse(skr_fname, CONFIG, zone, ksks, zsks, now, until, refresh) # add zone ns1.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};", ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # test zone is correctly signed # - check rndc dnssec -status output isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy) # - dnssec_verify isctest.kasp.check_dnssec_verify(ns1, zone) # - check keys check_keys(zsks, lifetime, CONFIG, with_state=True) # - check apex isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_twotone(ns1): zone = "two-tone.test" policy = "two-tone" n = 1 # create ksk kskdir = "ns1/offline" cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 2 ksks_defalg = [] ksks_altalg = [] for ksk in ksks: alg = ksk.get_metadata("Algorithm") if alg == os.environ.get("DEFAULT_ALGORITHM_NUMBER"): ksks_defalg.append(ksk) elif alg == os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER"): ksks_altalg.append(ksk) assert len(ksks_defalg) == 1 assert len(ksks_altalg) == 1 check_keys(ksks_defalg, None, CONFIG) alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER") size = os.environ.get("ALTERNATIVE_BITS") check_keys(ksks_altalg, None, CONFIG, alg, size) # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) # First algorithm keys have a lifetime of 3 months, so there should # be 4 created keys. Second algorithm keys have a lifetime of 5 # months, so there should be 3 created keys. While only two time # bundles of 5 months fit into one year, we need to create an extra # key for the remainder of the bundle. So 7 in total. assert len(zsks) == 7 zsks_defalg = [] zsks_altalg = [] for zsk in zsks: alg = zsk.get_metadata("Algorithm") if alg == os.environ.get("DEFAULT_ALGORITHM_NUMBER"): zsks_defalg.append(zsk) elif alg == os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER"): zsks_altalg.append(zsk) assert len(zsks_defalg) == 4 assert len(zsks_altalg) == 3 lifetime = timedelta(days=31 * 3) check_keys(zsks_defalg, lifetime, CONFIG) alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER") size = os.environ.get("ALTERNATIVE_BITS") lifetime = timedelta(days=31 * 5) check_keys(zsks_altalg, lifetime, CONFIG, alg, size) # check that 'dnssec-ksr request' creates correct ksr now = zsks[0].get_timing("Created") until = now + timedelta(days=365) ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y", to_file=ksr_fname, ) check_keysigningrequest(ksr_fname, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr refresh = -timedelta(days=5) skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y", to_file=skr_fname, ) check_signedkeyresponse(skr_fname, CONFIG, zone, ksks, zsks, now, until, refresh) # add zone ns1.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};", ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # test zone is correctly signed # - check rndc dnssec -status output isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy) # - dnssec_verify isctest.kasp.check_dnssec_verify(ns1, zone) # - check keys lifetime = timedelta(days=31 * 3) check_keys(zsks_defalg, lifetime, CONFIG, with_state=True) alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER") size = os.environ.get("ALTERNATIVE_BITS") lifetime = timedelta(days=31 * 5) check_keys(zsks_altalg, lifetime, CONFIG, alg, size, with_state=True) # - check apex isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_kskroll(ns1): zone = "ksk-roll.test" policy = "ksk-roll" n = 1 # create ksk kskdir = "ns1/offline" cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 2 lifetime = timedelta(days=31 * 6) check_keys(ksks, lifetime, CONFIG) # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(zsks) == 1 check_keys(zsks, None, CONFIG) # check that 'dnssec-ksr request' creates correct ksr now = zsks[0].get_timing("Created") until = now + timedelta(days=365) ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y", to_file=ksr_fname, ) check_keysigningrequest(ksr_fname, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr refresh = -432000 # 5 days skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y", to_file=skr_fname, ) check_signedkeyresponse(skr_fname, CONFIG, zone, ksks, zsks, now, until, refresh) # add zone ns1.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};", ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # test zone is correctly signed # - check rndc dnssec -status output isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy) # - dnssec_verify isctest.kasp.check_dnssec_verify(ns1, zone) # - check keys check_keys(zsks, None, CONFIG, with_state=True) # - check apex isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_fast(ns1): zone = "fast.test" policy = "fast" n = 1 # create ksk kskdir = "ns1/offline" cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1h -o") ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, FASTCONFIG) # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1h") zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir) assert len(zsks) == 1 lifetime = timedelta(seconds=8792) check_keys(zsks, lifetime, FASTCONFIG) # check that 'dnssec-ksr request' creates correct ksr now = zsks[0].get_timing("Created") until = now + timedelta(hours=1) ksr_fname = f"{zone}.ksr.{n}" ksr( zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1h", to_file=ksr_fname, ) check_keysigningrequest(ksr_fname, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr refresh = -2 skr_fname = f"{zone}.skr.{n}" ksr( zone, policy, "sign", options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1h", to_file=skr_fname, ) check_signedkeyresponse( skr_fname, FASTCONFIG, zone, ksks, zsks, now, until, refresh ) # add zone ns1.rndc( f"addzone {zone} " + "{ type primary; file " + f'"{zone}.db"; dnssec-policy {policy}; ' + "};", ) # import skr shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # test zone is correctly signed # - check rndc dnssec -status output isctest.kasp.check_dnssecstatus(ns1, zone, zsks, policy=policy) # - dnssec_verify isctest.kasp.check_dnssec_verify(ns1, zone) # - check keys check_keys(zsks, lifetime, FASTCONFIG, with_state=True) # - check apex isctest.kasp.check_apex(ns1, zone, ksks, zsks, offline_ksk=True) # - check subdomain isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True) def test_ksr_oversize(ns1): zone = "invalid-skr.test" n = 1 skr_fname = f"{zone}.skr.{n}" token_len = 5000 with open(skr_fname, "w", encoding="utf-8") as skr: huge_token = "A" * token_len skr.write(f";; SignedKeyResponse 1.0 {huge_token}\n") # - try importing invalid SKR file shutil.copyfile(skr_fname, f"ns1/{skr_fname}") ns1.rndc(f"skr -import {skr_fname} {zone}") # - check if named is still running ns1.rndc("status")