| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """DNS rdata.""" |
|
|
| import base64 |
| import binascii |
| import inspect |
| import io |
| import itertools |
| import random |
| from importlib import import_module |
| from typing import Any, Dict, Optional, Tuple, Union |
|
|
| import dns.exception |
| import dns.immutable |
| import dns.ipv4 |
| import dns.ipv6 |
| import dns.name |
| import dns.rdataclass |
| import dns.rdatatype |
| import dns.tokenizer |
| import dns.ttl |
| import dns.wire |
|
|
| _chunksize = 32 |
|
|
| |
| |
| |
| |
| |
| |
| _allow_relative_comparisons = True |
|
|
|
|
| class NoRelativeRdataOrdering(dns.exception.DNSException): |
| """An attempt was made to do an ordered comparison of one or more |
| rdata with relative names. The only reliable way of sorting rdata |
| is to use non-relativized rdata. |
| |
| """ |
|
|
|
|
| def _wordbreak(data, chunksize=_chunksize, separator=b" "): |
| """Break a binary string into chunks of chunksize characters separated by |
| a space. |
| """ |
|
|
| if not chunksize: |
| return data.decode() |
| return separator.join( |
| [data[i : i + chunksize] for i in range(0, len(data), chunksize)] |
| ).decode() |
|
|
|
|
| |
|
|
|
|
| def _hexify(data, chunksize=_chunksize, separator=b" ", **kw): |
| """Convert a binary string into its hex encoding, broken up into chunks |
| of chunksize characters separated by a separator. |
| """ |
|
|
| return _wordbreak(binascii.hexlify(data), chunksize, separator) |
|
|
|
|
| def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw): |
| """Convert a binary string into its base64 encoding, broken up into chunks |
| of chunksize characters separated by a separator. |
| """ |
|
|
| return _wordbreak(base64.b64encode(data), chunksize, separator) |
|
|
|
|
| |
|
|
| __escaped = b'"\\' |
|
|
|
|
| def _escapify(qstring): |
| """Escape the characters in a quoted string which need it.""" |
|
|
| if isinstance(qstring, str): |
| qstring = qstring.encode() |
| if not isinstance(qstring, bytearray): |
| qstring = bytearray(qstring) |
|
|
| text = "" |
| for c in qstring: |
| if c in __escaped: |
| text += "\\" + chr(c) |
| elif c >= 0x20 and c < 0x7F: |
| text += chr(c) |
| else: |
| text += "\\%03d" % c |
| return text |
|
|
|
|
| def _truncate_bitmap(what): |
| """Determine the index of greatest byte that isn't all zeros, and |
| return the bitmap that contains all the bytes less than that index. |
| """ |
|
|
| for i in range(len(what) - 1, -1, -1): |
| if what[i] != 0: |
| return what[0 : i + 1] |
| return what[0:1] |
|
|
|
|
| |
| _constify = dns.immutable.constify |
|
|
|
|
| @dns.immutable.immutable |
| class Rdata: |
| """Base class for all DNS rdata types.""" |
|
|
| __slots__ = ["rdclass", "rdtype", "rdcomment"] |
|
|
| def __init__(self, rdclass, rdtype): |
| """Initialize an rdata. |
| |
| *rdclass*, an ``int`` is the rdataclass of the Rdata. |
| |
| *rdtype*, an ``int`` is the rdatatype of the Rdata. |
| """ |
|
|
| self.rdclass = self._as_rdataclass(rdclass) |
| self.rdtype = self._as_rdatatype(rdtype) |
| self.rdcomment = None |
|
|
| def _get_all_slots(self): |
| return itertools.chain.from_iterable( |
| getattr(cls, "__slots__", []) for cls in self.__class__.__mro__ |
| ) |
|
|
| def __getstate__(self): |
| |
| |
| |
| |
| |
| |
| |
| |
| state = {} |
| for slot in self._get_all_slots(): |
| state[slot] = getattr(self, slot) |
| return state |
|
|
| def __setstate__(self, state): |
| for slot, val in state.items(): |
| object.__setattr__(self, slot, val) |
| if not hasattr(self, "rdcomment"): |
| |
| |
| object.__setattr__(self, "rdcomment", None) |
|
|
| def covers(self) -> dns.rdatatype.RdataType: |
| """Return the type a Rdata covers. |
| |
| DNS SIG/RRSIG rdatas apply to a specific type; this type is |
| returned by the covers() function. If the rdata type is not |
| SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when |
| creating rdatasets, allowing the rdataset to contain only RRSIGs |
| of a particular type, e.g. RRSIG(NS). |
| |
| Returns a ``dns.rdatatype.RdataType``. |
| """ |
|
|
| return dns.rdatatype.NONE |
|
|
| def extended_rdatatype(self) -> int: |
| """Return a 32-bit type value, the least significant 16 bits of |
| which are the ordinary DNS type, and the upper 16 bits of which are |
| the "covered" type, if any. |
| |
| Returns an ``int``. |
| """ |
|
|
| return self.covers() << 16 | self.rdtype |
|
|
| def to_text( |
| self, |
| origin: Optional[dns.name.Name] = None, |
| relativize: bool = True, |
| **kw: Dict[str, Any], |
| ) -> str: |
| """Convert an rdata to text format. |
| |
| Returns a ``str``. |
| """ |
|
|
| raise NotImplementedError |
|
|
| def _to_wire( |
| self, |
| file: Optional[Any], |
| compress: Optional[dns.name.CompressType] = None, |
| origin: Optional[dns.name.Name] = None, |
| canonicalize: bool = False, |
| ) -> bytes: |
| raise NotImplementedError |
|
|
| def to_wire( |
| self, |
| file: Optional[Any] = None, |
| compress: Optional[dns.name.CompressType] = None, |
| origin: Optional[dns.name.Name] = None, |
| canonicalize: bool = False, |
| ) -> bytes: |
| """Convert an rdata to wire format. |
| |
| Returns a ``bytes`` or ``None``. |
| """ |
|
|
| if file: |
| return self._to_wire(file, compress, origin, canonicalize) |
| else: |
| f = io.BytesIO() |
| self._to_wire(f, compress, origin, canonicalize) |
| return f.getvalue() |
|
|
| def to_generic( |
| self, origin: Optional[dns.name.Name] = None |
| ) -> "dns.rdata.GenericRdata": |
| """Creates a dns.rdata.GenericRdata equivalent of this rdata. |
| |
| Returns a ``dns.rdata.GenericRdata``. |
| """ |
| return dns.rdata.GenericRdata( |
| self.rdclass, self.rdtype, self.to_wire(origin=origin) |
| ) |
|
|
| def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes: |
| """Convert rdata to a format suitable for digesting in hashes. This |
| is also the DNSSEC canonical form. |
| |
| Returns a ``bytes``. |
| """ |
|
|
| return self.to_wire(origin=origin, canonicalize=True) |
|
|
| def __repr__(self): |
| covers = self.covers() |
| if covers == dns.rdatatype.NONE: |
| ctext = "" |
| else: |
| ctext = "(" + dns.rdatatype.to_text(covers) + ")" |
| return ( |
| "<DNS " |
| + dns.rdataclass.to_text(self.rdclass) |
| + " " |
| + dns.rdatatype.to_text(self.rdtype) |
| + ctext |
| + " rdata: " |
| + str(self) |
| + ">" |
| ) |
|
|
| def __str__(self): |
| return self.to_text() |
|
|
| def _cmp(self, other): |
| """Compare an rdata with another rdata of the same rdtype and |
| rdclass. |
| |
| For rdata with only absolute names: |
| Return < 0 if self < other in the DNSSEC ordering, 0 if self |
| == other, and > 0 if self > other. |
| For rdata with at least one relative names: |
| The rdata sorts before any rdata with only absolute names. |
| When compared with another relative rdata, all names are |
| made absolute as if they were relative to the root, as the |
| proper origin is not available. While this creates a stable |
| ordering, it is NOT guaranteed to be the DNSSEC ordering. |
| In the future, all ordering comparisons for rdata with |
| relative names will be disallowed. |
| """ |
| try: |
| our = self.to_digestable() |
| our_relative = False |
| except dns.name.NeedAbsoluteNameOrOrigin: |
| if _allow_relative_comparisons: |
| our = self.to_digestable(dns.name.root) |
| our_relative = True |
| try: |
| their = other.to_digestable() |
| their_relative = False |
| except dns.name.NeedAbsoluteNameOrOrigin: |
| if _allow_relative_comparisons: |
| their = other.to_digestable(dns.name.root) |
| their_relative = True |
| if _allow_relative_comparisons: |
| if our_relative != their_relative: |
| |
| |
| if our_relative: |
| return -1 |
| else: |
| return 1 |
| elif our_relative or their_relative: |
| raise NoRelativeRdataOrdering |
| if our == their: |
| return 0 |
| elif our > their: |
| return 1 |
| else: |
| return -1 |
|
|
| def __eq__(self, other): |
| if not isinstance(other, Rdata): |
| return False |
| if self.rdclass != other.rdclass or self.rdtype != other.rdtype: |
| return False |
| our_relative = False |
| their_relative = False |
| try: |
| our = self.to_digestable() |
| except dns.name.NeedAbsoluteNameOrOrigin: |
| our = self.to_digestable(dns.name.root) |
| our_relative = True |
| try: |
| their = other.to_digestable() |
| except dns.name.NeedAbsoluteNameOrOrigin: |
| their = other.to_digestable(dns.name.root) |
| their_relative = True |
| if our_relative != their_relative: |
| return False |
| return our == their |
|
|
| def __ne__(self, other): |
| if not isinstance(other, Rdata): |
| return True |
| if self.rdclass != other.rdclass or self.rdtype != other.rdtype: |
| return True |
| return not self.__eq__(other) |
|
|
| def __lt__(self, other): |
| if ( |
| not isinstance(other, Rdata) |
| or self.rdclass != other.rdclass |
| or self.rdtype != other.rdtype |
| ): |
| return NotImplemented |
| return self._cmp(other) < 0 |
|
|
| def __le__(self, other): |
| if ( |
| not isinstance(other, Rdata) |
| or self.rdclass != other.rdclass |
| or self.rdtype != other.rdtype |
| ): |
| return NotImplemented |
| return self._cmp(other) <= 0 |
|
|
| def __ge__(self, other): |
| if ( |
| not isinstance(other, Rdata) |
| or self.rdclass != other.rdclass |
| or self.rdtype != other.rdtype |
| ): |
| return NotImplemented |
| return self._cmp(other) >= 0 |
|
|
| def __gt__(self, other): |
| if ( |
| not isinstance(other, Rdata) |
| or self.rdclass != other.rdclass |
| or self.rdtype != other.rdtype |
| ): |
| return NotImplemented |
| return self._cmp(other) > 0 |
|
|
| def __hash__(self): |
| return hash(self.to_digestable(dns.name.root)) |
|
|
| @classmethod |
| def from_text( |
| cls, |
| rdclass: dns.rdataclass.RdataClass, |
| rdtype: dns.rdatatype.RdataType, |
| tok: dns.tokenizer.Tokenizer, |
| origin: Optional[dns.name.Name] = None, |
| relativize: bool = True, |
| relativize_to: Optional[dns.name.Name] = None, |
| ) -> "Rdata": |
| raise NotImplementedError |
|
|
| @classmethod |
| def from_wire_parser( |
| cls, |
| rdclass: dns.rdataclass.RdataClass, |
| rdtype: dns.rdatatype.RdataType, |
| parser: dns.wire.Parser, |
| origin: Optional[dns.name.Name] = None, |
| ) -> "Rdata": |
| raise NotImplementedError |
|
|
| def replace(self, **kwargs: Any) -> "Rdata": |
| """ |
| Create a new Rdata instance based on the instance replace was |
| invoked on. It is possible to pass different parameters to |
| override the corresponding properties of the base Rdata. |
| |
| Any field specific to the Rdata type can be replaced, but the |
| *rdtype* and *rdclass* fields cannot. |
| |
| Returns an instance of the same Rdata subclass as *self*. |
| """ |
|
|
| |
| parameters = inspect.signature(self.__init__).parameters |
|
|
| |
| |
| for key in kwargs: |
| if key == "rdcomment": |
| continue |
| if key not in parameters: |
| raise AttributeError( |
| "'{}' object has no attribute '{}'".format( |
| self.__class__.__name__, key |
| ) |
| ) |
| if key in ("rdclass", "rdtype"): |
| raise AttributeError( |
| "Cannot overwrite '{}' attribute '{}'".format( |
| self.__class__.__name__, key |
| ) |
| ) |
|
|
| |
| |
| args = (kwargs.get(key, getattr(self, key)) for key in parameters) |
|
|
| |
| rd = self.__class__(*args) |
| |
| |
| rdcomment = kwargs.get("rdcomment", self.rdcomment) |
| if rdcomment is not None: |
| object.__setattr__(rd, "rdcomment", rdcomment) |
| return rd |
|
|
| |
| |
|
|
| @classmethod |
| def _as_rdataclass(cls, value): |
| return dns.rdataclass.RdataClass.make(value) |
|
|
| @classmethod |
| def _as_rdatatype(cls, value): |
| return dns.rdatatype.RdataType.make(value) |
|
|
| @classmethod |
| def _as_bytes( |
| cls, |
| value: Any, |
| encode: bool = False, |
| max_length: Optional[int] = None, |
| empty_ok: bool = True, |
| ) -> bytes: |
| if encode and isinstance(value, str): |
| bvalue = value.encode() |
| elif isinstance(value, bytearray): |
| bvalue = bytes(value) |
| elif isinstance(value, bytes): |
| bvalue = value |
| else: |
| raise ValueError("not bytes") |
| if max_length is not None and len(bvalue) > max_length: |
| raise ValueError("too long") |
| if not empty_ok and len(bvalue) == 0: |
| raise ValueError("empty bytes not allowed") |
| return bvalue |
|
|
| @classmethod |
| def _as_name(cls, value): |
| |
| |
| |
| if isinstance(value, str): |
| return dns.name.from_text(value) |
| elif not isinstance(value, dns.name.Name): |
| raise ValueError("not a name") |
| return value |
|
|
| @classmethod |
| def _as_uint8(cls, value): |
| if not isinstance(value, int): |
| raise ValueError("not an integer") |
| if value < 0 or value > 255: |
| raise ValueError("not a uint8") |
| return value |
|
|
| @classmethod |
| def _as_uint16(cls, value): |
| if not isinstance(value, int): |
| raise ValueError("not an integer") |
| if value < 0 or value > 65535: |
| raise ValueError("not a uint16") |
| return value |
|
|
| @classmethod |
| def _as_uint32(cls, value): |
| if not isinstance(value, int): |
| raise ValueError("not an integer") |
| if value < 0 or value > 4294967295: |
| raise ValueError("not a uint32") |
| return value |
|
|
| @classmethod |
| def _as_uint48(cls, value): |
| if not isinstance(value, int): |
| raise ValueError("not an integer") |
| if value < 0 or value > 281474976710655: |
| raise ValueError("not a uint48") |
| return value |
|
|
| @classmethod |
| def _as_int(cls, value, low=None, high=None): |
| if not isinstance(value, int): |
| raise ValueError("not an integer") |
| if low is not None and value < low: |
| raise ValueError("value too small") |
| if high is not None and value > high: |
| raise ValueError("value too large") |
| return value |
|
|
| @classmethod |
| def _as_ipv4_address(cls, value): |
| if isinstance(value, str): |
| return dns.ipv4.canonicalize(value) |
| elif isinstance(value, bytes): |
| return dns.ipv4.inet_ntoa(value) |
| else: |
| raise ValueError("not an IPv4 address") |
|
|
| @classmethod |
| def _as_ipv6_address(cls, value): |
| if isinstance(value, str): |
| return dns.ipv6.canonicalize(value) |
| elif isinstance(value, bytes): |
| return dns.ipv6.inet_ntoa(value) |
| else: |
| raise ValueError("not an IPv6 address") |
|
|
| @classmethod |
| def _as_bool(cls, value): |
| if isinstance(value, bool): |
| return value |
| else: |
| raise ValueError("not a boolean") |
|
|
| @classmethod |
| def _as_ttl(cls, value): |
| if isinstance(value, int): |
| return cls._as_int(value, 0, dns.ttl.MAX_TTL) |
| elif isinstance(value, str): |
| return dns.ttl.from_text(value) |
| else: |
| raise ValueError("not a TTL") |
|
|
| @classmethod |
| def _as_tuple(cls, value, as_value): |
| try: |
| |
| |
| return (as_value(value),) |
| except Exception: |
| |
| |
| return tuple(as_value(v) for v in value) |
|
|
| |
|
|
| @classmethod |
| def _processing_order(cls, iterable): |
| items = list(iterable) |
| random.shuffle(items) |
| return items |
|
|
|
|
| @dns.immutable.immutable |
| class GenericRdata(Rdata): |
| """Generic Rdata Class |
| |
| This class is used for rdata types for which we have no better |
| implementation. It implements the DNS "unknown RRs" scheme. |
| """ |
|
|
| __slots__ = ["data"] |
|
|
| def __init__(self, rdclass, rdtype, data): |
| super().__init__(rdclass, rdtype) |
| self.data = data |
|
|
| def to_text( |
| self, |
| origin: Optional[dns.name.Name] = None, |
| relativize: bool = True, |
| **kw: Dict[str, Any], |
| ) -> str: |
| return r"\# %d " % len(self.data) + _hexify(self.data, **kw) |
|
|
| @classmethod |
| def from_text( |
| cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None |
| ): |
| token = tok.get() |
| if not token.is_identifier() or token.value != r"\#": |
| raise dns.exception.SyntaxError(r"generic rdata does not start with \#") |
| length = tok.get_int() |
| hex = tok.concatenate_remaining_identifiers(True).encode() |
| data = binascii.unhexlify(hex) |
| if len(data) != length: |
| raise dns.exception.SyntaxError("generic rdata hex data has wrong length") |
| return cls(rdclass, rdtype, data) |
|
|
| def _to_wire(self, file, compress=None, origin=None, canonicalize=False): |
| file.write(self.data) |
|
|
| @classmethod |
| def from_wire_parser(cls, rdclass, rdtype, parser, origin=None): |
| return cls(rdclass, rdtype, parser.get_remaining()) |
|
|
|
|
| _rdata_classes: Dict[Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any] = ( |
| {} |
| ) |
| _module_prefix = "dns.rdtypes" |
|
|
|
|
| def get_rdata_class(rdclass, rdtype): |
| cls = _rdata_classes.get((rdclass, rdtype)) |
| if not cls: |
| cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype)) |
| if not cls: |
| rdclass_text = dns.rdataclass.to_text(rdclass) |
| rdtype_text = dns.rdatatype.to_text(rdtype) |
| rdtype_text = rdtype_text.replace("-", "_") |
| try: |
| mod = import_module( |
| ".".join([_module_prefix, rdclass_text, rdtype_text]) |
| ) |
| cls = getattr(mod, rdtype_text) |
| _rdata_classes[(rdclass, rdtype)] = cls |
| except ImportError: |
| try: |
| mod = import_module(".".join([_module_prefix, "ANY", rdtype_text])) |
| cls = getattr(mod, rdtype_text) |
| _rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls |
| _rdata_classes[(rdclass, rdtype)] = cls |
| except ImportError: |
| pass |
| if not cls: |
| cls = GenericRdata |
| _rdata_classes[(rdclass, rdtype)] = cls |
| return cls |
|
|
|
|
| def from_text( |
| rdclass: Union[dns.rdataclass.RdataClass, str], |
| rdtype: Union[dns.rdatatype.RdataType, str], |
| tok: Union[dns.tokenizer.Tokenizer, str], |
| origin: Optional[dns.name.Name] = None, |
| relativize: bool = True, |
| relativize_to: Optional[dns.name.Name] = None, |
| idna_codec: Optional[dns.name.IDNACodec] = None, |
| ) -> Rdata: |
| """Build an rdata object from text format. |
| |
| This function attempts to dynamically load a class which |
| implements the specified rdata class and type. If there is no |
| class-and-type-specific implementation, the GenericRdata class |
| is used. |
| |
| Once a class is chosen, its from_text() class method is called |
| with the parameters to this function. |
| |
| If *tok* is a ``str``, then a tokenizer is created and the string |
| is used as its input. |
| |
| *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. |
| |
| *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. |
| |
| *tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``. |
| |
| *origin*, a ``dns.name.Name`` (or ``None``), the |
| origin to use for relative names. |
| |
| *relativize*, a ``bool``. If true, name will be relativized. |
| |
| *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use |
| when relativizing names. If not set, the *origin* value will be used. |
| |
| *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA |
| encoder/decoder to use if a tokenizer needs to be created. If |
| ``None``, the default IDNA 2003 encoder/decoder is used. If a |
| tokenizer is not created, then the codec associated with the tokenizer |
| is the one that is used. |
| |
| Returns an instance of the chosen Rdata subclass. |
| |
| """ |
| if isinstance(tok, str): |
| tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec) |
| rdclass = dns.rdataclass.RdataClass.make(rdclass) |
| rdtype = dns.rdatatype.RdataType.make(rdtype) |
| cls = get_rdata_class(rdclass, rdtype) |
| with dns.exception.ExceptionWrapper(dns.exception.SyntaxError): |
| rdata = None |
| if cls != GenericRdata: |
| |
| token = tok.get() |
| tok.unget(token) |
| if token.is_identifier() and token.value == r"\#": |
| |
| |
| |
| |
| |
| grdata = GenericRdata.from_text( |
| rdclass, rdtype, tok, origin, relativize, relativize_to |
| ) |
| rdata = from_wire( |
| rdclass, rdtype, grdata.data, 0, len(grdata.data), origin |
| ) |
| |
| |
| |
| |
| |
| rwire = rdata.to_wire() |
| if rwire != grdata.data: |
| raise dns.exception.SyntaxError( |
| "compressed data in " |
| "generic syntax form " |
| "of known rdatatype" |
| ) |
| if rdata is None: |
| rdata = cls.from_text( |
| rdclass, rdtype, tok, origin, relativize, relativize_to |
| ) |
| token = tok.get_eol_as_token() |
| if token.comment is not None: |
| object.__setattr__(rdata, "rdcomment", token.comment) |
| return rdata |
|
|
|
|
| def from_wire_parser( |
| rdclass: Union[dns.rdataclass.RdataClass, str], |
| rdtype: Union[dns.rdatatype.RdataType, str], |
| parser: dns.wire.Parser, |
| origin: Optional[dns.name.Name] = None, |
| ) -> Rdata: |
| """Build an rdata object from wire format |
| |
| This function attempts to dynamically load a class which |
| implements the specified rdata class and type. If there is no |
| class-and-type-specific implementation, the GenericRdata class |
| is used. |
| |
| Once a class is chosen, its from_wire() class method is called |
| with the parameters to this function. |
| |
| *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. |
| |
| *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. |
| |
| *parser*, a ``dns.wire.Parser``, the parser, which should be |
| restricted to the rdata length. |
| |
| *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, |
| then names will be relativized to this origin. |
| |
| Returns an instance of the chosen Rdata subclass. |
| """ |
|
|
| rdclass = dns.rdataclass.RdataClass.make(rdclass) |
| rdtype = dns.rdatatype.RdataType.make(rdtype) |
| cls = get_rdata_class(rdclass, rdtype) |
| with dns.exception.ExceptionWrapper(dns.exception.FormError): |
| return cls.from_wire_parser(rdclass, rdtype, parser, origin) |
|
|
|
|
| def from_wire( |
| rdclass: Union[dns.rdataclass.RdataClass, str], |
| rdtype: Union[dns.rdatatype.RdataType, str], |
| wire: bytes, |
| current: int, |
| rdlen: int, |
| origin: Optional[dns.name.Name] = None, |
| ) -> Rdata: |
| """Build an rdata object from wire format |
| |
| This function attempts to dynamically load a class which |
| implements the specified rdata class and type. If there is no |
| class-and-type-specific implementation, the GenericRdata class |
| is used. |
| |
| Once a class is chosen, its from_wire() class method is called |
| with the parameters to this function. |
| |
| *rdclass*, an ``int``, the rdataclass. |
| |
| *rdtype*, an ``int``, the rdatatype. |
| |
| *wire*, a ``bytes``, the wire-format message. |
| |
| *current*, an ``int``, the offset in wire of the beginning of |
| the rdata. |
| |
| *rdlen*, an ``int``, the length of the wire-format rdata |
| |
| *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, |
| then names will be relativized to this origin. |
| |
| Returns an instance of the chosen Rdata subclass. |
| """ |
| parser = dns.wire.Parser(wire, current) |
| with parser.restrict_to(rdlen): |
| return from_wire_parser(rdclass, rdtype, parser, origin) |
|
|
|
|
| class RdatatypeExists(dns.exception.DNSException): |
| """DNS rdatatype already exists.""" |
|
|
| supp_kwargs = {"rdclass", "rdtype"} |
| fmt = ( |
| "The rdata type with class {rdclass:d} and rdtype {rdtype:d} " |
| + "already exists." |
| ) |
|
|
|
|
| def register_type( |
| implementation: Any, |
| rdtype: int, |
| rdtype_text: str, |
| is_singleton: bool = False, |
| rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, |
| ) -> None: |
| """Dynamically register a module to handle an rdatatype. |
| |
| *implementation*, a module implementing the type in the usual dnspython |
| way. |
| |
| *rdtype*, an ``int``, the rdatatype to register. |
| |
| *rdtype_text*, a ``str``, the textual form of the rdatatype. |
| |
| *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e. |
| RRsets of the type can have only one member.) |
| |
| *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if |
| it applies to all classes. |
| """ |
|
|
| rdtype = dns.rdatatype.RdataType.make(rdtype) |
| existing_cls = get_rdata_class(rdclass, rdtype) |
| if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype): |
| raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype) |
| _rdata_classes[(rdclass, rdtype)] = getattr( |
| implementation, rdtype_text.replace("-", "_") |
| ) |
| dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton) |
|
|