Source code for octodns.processor.filter

#
#
#

from ipaddress import ip_address, ip_network
from itertools import product
from logging import getLogger
from re import compile as re_compile

from ..record.exception import ValidationError
from .base import BaseProcessor


[docs] class _FilterProcessor(BaseProcessor):
[docs] def __init__(self, name, include_target=True, **kwargs): super().__init__(name, **kwargs) self.include_target = include_target
[docs] def process_source_zone(self, *args, **kwargs): return self._process(*args, **kwargs)
[docs] def process_target_zone(self, existing, *args, **kwargs): if self.include_target: return self._process(existing, *args, **kwargs) return existing
[docs] class AllowsMixin:
[docs] def matches(self, zone, record): pass
[docs] def doesnt_match(self, zone, record): zone.remove_record(record)
[docs] class RejectsMixin:
[docs] def matches(self, zone, record): zone.remove_record(record)
[docs] def doesnt_match(self, zone, record): pass
[docs] class _TypeBaseFilter(_FilterProcessor):
[docs] def __init__(self, name, _list, **kwargs): super().__init__(name, **kwargs) self._list = set(_list)
[docs] def _process(self, zone, *args, **kwargs): for record in zone.records: if record._type in self._list: self.matches(zone, record) else: self.doesnt_match(zone, record) return zone
[docs] class TypeAllowlistFilter(_TypeBaseFilter, AllowsMixin): '''Only manage records of the specified type(s). Example usage: processors: only-a-and-aaaa: class: octodns.processor.filter.TypeAllowlistFilter allowlist: - A - AAAA # Optional param that can be set to False to leave the target zone # alone, thus allowing deletion of existing records # (default: true) # include_target: True zones: exxampled.com.: sources: - config processors: - only-a-and-aaaa targets: - ns1 '''
[docs] def __init__(self, name, allowlist, **kwargs): super().__init__(name, allowlist, **kwargs)
[docs] class TypeRejectlistFilter(_TypeBaseFilter, RejectsMixin): '''Ignore records of the specified type(s). Example usage: processors: ignore-cnames: class: octodns.processor.filter.TypeRejectlistFilter rejectlist: - CNAME # Optional param that can be set to False to leave the target zone # alone, thus allowing deletion of existing records # (default: true) # include_target: True zones: exxampled.com.: sources: - config processors: - ignore-cnames targets: - route53 '''
[docs] def __init__(self, name, rejectlist, **kwargs): super().__init__(name, rejectlist, **kwargs)
[docs] class _NameBaseFilter(_FilterProcessor):
[docs] def __init__(self, name, _list, **kwargs): super().__init__(name, **kwargs) exact = set() regex = [] for pattern in _list: if pattern.startswith('/'): regex.append(re_compile(pattern[1:-1])) else: exact.add(pattern) self.exact = exact self.regex = regex
[docs] def _process(self, zone, *args, **kwargs): for record in zone.records: name = record.name if name in self.exact: self.matches(zone, record) continue elif any(r.search(name) for r in self.regex): self.matches(zone, record) continue self.doesnt_match(zone, record) return zone
[docs] class NameAllowlistFilter(_NameBaseFilter, AllowsMixin): '''Only manage records with names that match the provider patterns Example usage: processors: only-these: class: octodns.processor.filter.NameAllowlistFilter allowlist: # exact string match - www # contains/substring match - /substring/ # regex pattern match - /some-pattern-\\d\\+/ # regex - anchored so has to match start to end - /^start-.+-end$/ # Optional param that can be set to False to leave the target zone # alone, thus allowing deletion of existing records # (default: true) # include_target: True zones: exxampled.com.: sources: - config processors: - only-these targets: - route53 '''
[docs] def __init__(self, name, allowlist, **kwargs): super().__init__(name, allowlist, **kwargs)
[docs] class NameRejectlistFilter(_NameBaseFilter, RejectsMixin): '''Reject managing records with names that match the provider patterns Example usage: processors: not-these: class: octodns.processor.filter.NameRejectlistFilter rejectlist: # exact string match - www # contains/substring match - /substring/ # regex pattern match - /some-pattern-\\d\\+/ # regex - anchored so has to match start to end - /^start-.+-end$/ # Optional param that can be set to False to leave the target zone # alone, thus allowing deletion of existing records # (default: true) # include_target: True zones: exxampled.com.: sources: - config processors: - not-these targets: - route53 '''
[docs] def __init__(self, name, rejectlist, **kwargs): super().__init__(name, rejectlist, **kwargs)
[docs] class _ValueBaseFilter(_FilterProcessor):
[docs] def __init__(self, name, _list, **kwargs): super().__init__(name, **kwargs) exact = set() regex = [] for pattern in _list: if pattern.startswith('/'): regex.append(re_compile(pattern[1:-1])) else: exact.add(pattern) self.exact = exact self.regex = regex
[docs] def _process(self, zone, *args, **kwargs): for record in zone.records: values = [] if hasattr(record, 'values'): values = [value.rdata_text for value in record.values] elif record.value is not None: values = [record.value.rdata_text] else: self.log.warning( 'value for %s is NoneType, ignoring', record.fqdn ) if any(value in self.exact for value in values): self.matches(zone, record) continue elif any(r.search(value) for r in self.regex for value in values): self.matches(zone, record) continue self.doesnt_match(zone, record) return zone
[docs] class ValueAllowlistFilter(_ValueBaseFilter, AllowsMixin): '''Only manage records with values that match the provider patterns Example usage: processors: only-these: class: octodns.processor.filter.ValueAllowlistFilter allowlist: # exact string match - www # contains/substring match - /substring/ # regex pattern match - /some-pattern-\\d\\+/ # regex - anchored so has to match start to end - /^start-.+-end$/ # Optional param that can be set to False to leave the target zone # alone, thus allowing deletion of existing records # (default: true) # include_target: True zones: exxampled.com.: sources: - config processors: - only-these targets: - route53 '''
[docs] def __init__(self, name, allowlist, **kwargs): self.log = getLogger(f'ValueAllowlistFilter[{name}]') super().__init__(name, allowlist, **kwargs)
[docs] class ValueRejectlistFilter(_ValueBaseFilter, RejectsMixin): '''Reject managing records with names that match the provider patterns Example usage: processors: not-these: class: octodns.processor.filter.ValueRejectlistFilter rejectlist: # exact string match - www # contains/substring match - /substring/ # regex pattern match - /some-pattern-\\d\\+/ # regex - anchored so has to match start to end - /^start-.+-end$/ # Optional param that can be set to False to leave the target zone # alone, thus allowing deletion of existing records # (default: true) # include_target: True zones: exxampled.com.: sources: - config processors: - not-these targets: - route53 '''
[docs] def __init__(self, name, rejectlist, **kwargs): self.log = getLogger(f'ValueRejectlistFilter[{name}]') super().__init__(name, rejectlist, **kwargs)
[docs] class _NetworkValueBaseFilter(BaseProcessor):
[docs] def __init__(self, name, _list): super().__init__(name) self.networks = [] for value in _list: try: self.networks.append(ip_network(value)) except ValueError: raise ValueError(f'{value} is not a valid CIDR to use')
[docs] def _process(self, zone, *args, **kwargs): for record in zone.records: if record._type not in ['A', 'AAAA']: continue ips = [ip_address(value) for value in record.values] if any( ip in network for ip, network in product(ips, self.networks) ): self.matches(zone, record) else: self.doesnt_match(zone, record) return zone
process_source_zone = _process process_target_zone = _process
[docs] class NetworkValueAllowlistFilter(_NetworkValueBaseFilter, AllowsMixin): '''Only manage A and AAAA records with values that match the provider patterns All other types will be left as-is. Example usage: processors: only-these: class: octodns.processor.filter.NetworkValueAllowlistFilter allowlist: - 127.0.0.1/32 - 192.168.0.0/16 - fd00::/8 zones: exxampled.com.: sources: - config processors: - only-these targets: - route53 '''
[docs] def __init__(self, name, allowlist): super().__init__(name, allowlist)
[docs] class NetworkValueRejectlistFilter(_NetworkValueBaseFilter, RejectsMixin): '''Reject managing A and AAAA records with value matching a that match the provider patterns All other types will be left as-is. Example usage: processors: not-these: class: octodns.processor.filter.NetworkValueRejectlistFilter rejectlist: - 127.0.0.1/32 - 192.168.0.0/16 - fd00::/8 zones: exxampled.com.: sources: - config processors: - not-these targets: - route53 '''
[docs] def __init__(self, name, rejectlist): super().__init__(name, rejectlist)
[docs] class IgnoreRootNsFilter(BaseProcessor): '''Do not manage Root NS Records. Example usage: processors: no-root-ns: class: octodns.processor.filter.IgnoreRootNsFilter zones: exxampled.com.: sources: - config processors: - no-root-ns targets: - ns1 '''
[docs] def _process(self, zone, *args, **kwargs): for record in zone.records: if record._type == 'NS' and not record.name: zone.remove_record(record) return zone
process_source_zone = _process process_target_zone = _process
[docs] class ExcludeRootNsChanges(BaseProcessor): '''Do not allow root NS record changes Example usage: processors: exclude-root-ns-changes: class: octodns.processor.filter.ExcludeRootNsChanges # If true an a change for a root NS is seen an error will be thrown. If # false a warning will be printed and the change will be removed from # the plan. # (default: true) error: true zones: exxampled.com.: sources: - config processors: - exclude-root-ns-changes targets: - ns1 '''
[docs] def __init__(self, name, error=True): self.log = getLogger(f'ExcludeRootNsChanges[{name}]') super().__init__(name) self.error = error
[docs] def process_plan(self, plan, sources, target): if plan: for change in list(plan.changes): record = change.record if record._type == 'NS' and record.name == '': self.log.warning( 'root NS changes are disallowed, fqdn=%s', record.fqdn ) if self.error: raise ValidationError( record.fqdn, ['root NS changes are disallowed'], record.context, ) plan.changes.remove(change) return plan
[docs] class ZoneNameFilter(_FilterProcessor): '''Filter or error on record names that contain the zone name Example usage: processors: zone-name: class: octodns.processor.filter.ZoneNameFilter # If true a ValidationError will be throw when such records are # encouterd, if false the records will just be ignored/omitted. # (default: true) # error: True # Optional param that can be set to False to leave the target zone # alone, thus allowing deletion of existing records # (default: true) # include_target: True zones: exxampled.com.: sources: - config processors: - zone-name targets: - azure '''
[docs] def __init__(self, name, error=True, **kwargs): super().__init__(name, **kwargs) self.error = error
[docs] def _process(self, zone, *args, **kwargs): zone_name_with_dot = zone.name zone_name_without_dot = zone_name_with_dot[:-1] for record in zone.records: name = record.name if name.endswith(zone_name_with_dot) or name.endswith( zone_name_without_dot ): if self.error: raise ValidationError( record.fqdn, ['record name ends with zone name'], record.context, ) else: # just remove it zone.remove_record(record) return zone