Source code for octodns.record.svcb

#
# This file describes the SVCB records as defined in RFC 9460
# It also supports the 'ech' SvcParam as defined in draft-ietf-tls-svcb-ech-02
#

from base64 import b64decode
from binascii import Error as binascii_error
from ipaddress import AddressValueError, IPv4Address, IPv6Address

from fqdn import FQDN

from ..equality import EqualityTupleMixin
from ..idna import idna_encode
from .base import Record, ValuesMixin, unquote
from .chunked import _ChunkedValue
from .rr import RrParseError

SUPPORTED_PARAMS = {}


[docs] def validate_svcparam_port(svcparamvalue): reasons = [] try: port = int(svcparamvalue) if 0 < port > 65535: reasons.append(f'port {port} is not a valid number') except ValueError: reasons.append('port is not a number') return reasons
[docs] def validate_list(svcparamkey, svcparamvalue): if not isinstance(svcparamvalue, list): return [f'{svcparamkey} is not a list'] return list()
[docs] def validate_svcparam_alpn(svcparamvalue): reasons = validate_list('alpn', svcparamvalue) if len(reasons) != 0: return reasons for alpn in svcparamvalue: reasons += _ChunkedValue.validate(alpn, 'SVCB') return reasons
[docs] def validate_svcparam_iphint(ip_version, svcparamvalue): reasons = validate_list(f'ipv{ip_version}hint', svcparamvalue) if len(reasons) != 0: return reasons for address in svcparamvalue: try: if ip_version == 4: IPv4Address(address) if ip_version == 6: IPv6Address(address) except AddressValueError: reasons.append( f'ipv{ip_version}hint "{address}" is not a valid IPv{ip_version} address' ) return reasons
[docs] def validate_svcparam_ipv4hint(svcparamvalue): return validate_svcparam_iphint(4, svcparamvalue)
[docs] def validate_svcparam_ipv6hint(svcparamvalue): return validate_svcparam_iphint(6, svcparamvalue)
[docs] def validate_svcparam_mandatory(svcparamvalue): reasons = validate_list('mandatory', svcparamvalue) if len(reasons) != 0: return reasons for mandatory in svcparamvalue: if ( mandatory not in SUPPORTED_PARAMS.keys() and not mandatory.startswith('key') ): reasons.append(f'unsupported SvcParam "{mandatory}" in mandatory') if mandatory.startswith('key'): reasons += validate_svckey_number(mandatory) return reasons
[docs] def validate_svcparam_ech(svcparamvalue): try: b64decode(svcparamvalue, validate=True) except binascii_error: return ['ech SvcParam is invalid Base64']
[docs] def validate_svckey_number(paramkey): try: paramkeynum = int(paramkey[3:]) if 7 < paramkeynum > 65535: return [f'SvcParam key "{paramkey}" has wrong key number'] except ValueError: return [f'SvcParam key "{paramkey}" has wrong format'] return []
[docs] def parse_rdata_text_svcparamvalue_list(svcparamvalue): if svcparamvalue.startswith('"'): svcparamvalue = svcparamvalue[1:-1] return svcparamvalue.split(',')
[docs] def svcparamkeysort(svcparamkey): if svcparamkey.startswith('key'): return int(svcparamkey[3:]) return SUPPORTED_PARAMS[svcparamkey]['key_num']
# cc https://datatracker.ietf.org/doc/html/rfc9460#keys SUPPORTED_PARAMS = { 'no-default-alpn': {'key_num': 2, 'has_arg': False}, 'alpn': { 'key_num': 1, 'validate': validate_svcparam_alpn, 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, }, 'port': {'key_num': 3, 'validate': validate_svcparam_port}, 'ipv4hint': { 'key_num': 4, 'validate': validate_svcparam_ipv4hint, 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, }, 'ipv6hint': { 'key_num': 6, 'validate': validate_svcparam_ipv6hint, 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, }, 'mandatory': { 'key_num': 0, 'validate': validate_svcparam_mandatory, 'parse_rdata_text': parse_rdata_text_svcparamvalue_list, }, 'ech': {'key_num': 5, 'validate': validate_svcparam_ech}, }
[docs] class SvcbValue(EqualityTupleMixin, dict):
[docs] @classmethod def parse_rdata_text(cls, value): try: (svcpriority, targetname, *svcparams) = value.split(' ') except ValueError: raise RrParseError() try: svcpriority = int(svcpriority) except ValueError: pass targetname = unquote(targetname) params = dict() for svcparam in svcparams: paramkey, *paramvalue = svcparam.split('=') if paramkey in params.keys(): raise RrParseError(f'{paramkey} is specified twice') if len(paramvalue) != 0: parse_rdata_text = SUPPORTED_PARAMS.get(paramkey, {}).get( 'parse_rdata_text', None ) if parse_rdata_text is None: v = paramvalue[0] if v.startswith('"'): v = v[1:-1] params[paramkey] = v else: params[paramkey] = parse_rdata_text(paramvalue[0]) else: params[paramkey] = None return { 'svcpriority': svcpriority, 'targetname': targetname, 'svcparams': params, }
[docs] @classmethod def validate(cls, data, _): reasons = [] for value in data: svcpriority = -1 if 'svcpriority' not in value: reasons.append('missing svcpriority') else: try: svcpriority = int(value.get('svcpriority', 0)) if svcpriority < 0 or svcpriority > 65535: reasons.append(f'invalid priority ' f'"{svcpriority}"') except ValueError: reasons.append(f'invalid priority "{value["svcpriority"]}"') if 'targetname' not in value or value['targetname'] == '': reasons.append('missing targetname') else: targetname = str(value.get('targetname', '')) targetname = idna_encode(targetname) if not targetname.endswith('.'): reasons.append( f'SVCB value "{targetname}" missing trailing .' ) if targetname != '.' and not FQDN(targetname).is_valid: reasons.append( f'Invalid SVCB target "{targetname}" is not a valid FQDN.' ) if 'svcparams' in value: svcparams = value.get('svcparams', dict()) if svcpriority == 0 and len(svcparams) != 0: reasons.append('svcparams set on AliasMode SVCB record') for svcparamkey, svcparamvalue in svcparams.items(): # XXX: Should we test for keys existing when set in 'mandatory'? if svcparamkey.startswith('key'): reasons += validate_svckey_number(svcparamkey) continue if ( svcparamkey not in SUPPORTED_PARAMS.keys() and not svcparamkey.startswith('key') ): reasons.append(f'Unknown SvcParam {svcparamkey}') continue if SUPPORTED_PARAMS[svcparamkey].get('has_arg', True): reasons += SUPPORTED_PARAMS[svcparamkey]['validate']( svcparamvalue ) if ( not SUPPORTED_PARAMS[svcparamkey].get('has_arg', True) and svcparamvalue is not None ): reasons.append( f'SvcParam {svcparamkey} has value when it should not' ) return reasons
[docs] @classmethod def process(cls, values): return [cls(v) for v in values]
[docs] def __init__(self, value): super().__init__( { 'svcpriority': int(value['svcpriority']), 'targetname': idna_encode(value['targetname']), 'svcparams': value.get('svcparams', dict()), } )
@property def svcpriority(self): return self['svcpriority'] @svcpriority.setter def svcpriority(self, value): self['svcpriority'] = value @property def targetname(self): return self['targetname'] @targetname.setter def targetname(self, value): self['targetname'] = value @property def svcparams(self): return self['svcparams'] @svcparams.setter def svcparams(self, value): self['svcparams'] = value @property def rdata_text(self): params = '' sorted_svcparamkeys = sorted(self.svcparams, key=svcparamkeysort) for svcparamkey in sorted_svcparamkeys: params += f' {svcparamkey}' svcparamvalue = self.svcparams.get(svcparamkey, None) if svcparamvalue is not None: if isinstance(svcparamvalue, list): params += f'={",".join(svcparamvalue)}' else: params += f'={svcparamvalue}' return f'{self.svcpriority} {self.targetname}{params}'
[docs] def template(self, params): if '{' not in self.targetname: return self new = self.__class__(self) new.targetname = new.targetname.format(**params) # TODO: what, if any of the svcparams should be templated return new
def __hash__(self): return hash(self.__repr__())
[docs] def _equality_tuple(self): params = set() for svcparamkey, svcparamvalue in self.svcparams.items(): if svcparamvalue is not None: if isinstance(svcparamvalue, list): params.add(f'{svcparamkey}={",".join(svcparamvalue)}') else: params.add(f'{svcparamkey}={svcparamvalue}') else: params.add(f'{svcparamkey}') return (self.svcpriority, self.targetname, params)
[docs] def __repr__(self): return f"'{self.rdata_text}'"
[docs] class SvcbRecord(ValuesMixin, Record): _type = 'SVCB' _value_type = SvcbValue
Record.register_type(SvcbRecord)