| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """DNS stub resolver.""" |
|
|
| import contextlib |
| import random |
| import socket |
| import sys |
| import threading |
| import time |
| import warnings |
| from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union |
| from urllib.parse import urlparse |
|
|
| import dns._ddr |
| import dns.edns |
| import dns.exception |
| import dns.flags |
| import dns.inet |
| import dns.ipv4 |
| import dns.ipv6 |
| import dns.message |
| import dns.name |
| import dns.nameserver |
| import dns.query |
| import dns.rcode |
| import dns.rdataclass |
| import dns.rdatatype |
| import dns.rdtypes.svcbbase |
| import dns.reversename |
| import dns.tsig |
|
|
| if sys.platform == "win32": |
| import dns.win32util |
|
|
|
|
| class NXDOMAIN(dns.exception.DNSException): |
| """The DNS query name does not exist.""" |
|
|
| supp_kwargs = {"qnames", "responses"} |
| fmt = None |
|
|
| |
|
|
| |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
|
|
| def _check_kwargs(self, qnames, responses=None): |
| if not isinstance(qnames, (list, tuple, set)): |
| raise AttributeError("qnames must be a list, tuple or set") |
| if len(qnames) == 0: |
| raise AttributeError("qnames must contain at least one element") |
| if responses is None: |
| responses = {} |
| elif not isinstance(responses, dict): |
| raise AttributeError("responses must be a dict(qname=response)") |
| kwargs = dict(qnames=qnames, responses=responses) |
| return kwargs |
|
|
| def __str__(self) -> str: |
| if "qnames" not in self.kwargs: |
| return super().__str__() |
| qnames = self.kwargs["qnames"] |
| if len(qnames) > 1: |
| msg = "None of DNS query names exist" |
| else: |
| msg = "The DNS query name does not exist" |
| qnames = ", ".join(map(str, qnames)) |
| return "{}: {}".format(msg, qnames) |
|
|
| @property |
| def canonical_name(self): |
| """Return the unresolved canonical name.""" |
| if "qnames" not in self.kwargs: |
| raise TypeError("parametrized exception required") |
| for qname in self.kwargs["qnames"]: |
| response = self.kwargs["responses"][qname] |
| try: |
| cname = response.canonical_name() |
| if cname != qname: |
| return cname |
| except Exception: |
| |
| |
| pass |
| return self.kwargs["qnames"][0] |
|
|
| def __add__(self, e_nx): |
| """Augment by results from another NXDOMAIN exception.""" |
| qnames0 = list(self.kwargs.get("qnames", [])) |
| responses0 = dict(self.kwargs.get("responses", {})) |
| responses1 = e_nx.kwargs.get("responses", {}) |
| for qname1 in e_nx.kwargs.get("qnames", []): |
| if qname1 not in qnames0: |
| qnames0.append(qname1) |
| if qname1 in responses1: |
| responses0[qname1] = responses1[qname1] |
| return NXDOMAIN(qnames=qnames0, responses=responses0) |
|
|
| def qnames(self): |
| """All of the names that were tried. |
| |
| Returns a list of ``dns.name.Name``. |
| """ |
| return self.kwargs["qnames"] |
|
|
| def responses(self): |
| """A map from queried names to their NXDOMAIN responses. |
| |
| Returns a dict mapping a ``dns.name.Name`` to a |
| ``dns.message.Message``. |
| """ |
| return self.kwargs["responses"] |
|
|
| def response(self, qname): |
| """The response for query *qname*. |
| |
| Returns a ``dns.message.Message``. |
| """ |
| return self.kwargs["responses"][qname] |
|
|
|
|
| class YXDOMAIN(dns.exception.DNSException): |
| """The DNS query name is too long after DNAME substitution.""" |
|
|
|
|
| ErrorTuple = Tuple[ |
| Optional[str], |
| bool, |
| int, |
| Union[Exception, str], |
| Optional[dns.message.Message], |
| ] |
|
|
|
|
| def _errors_to_text(errors: List[ErrorTuple]) -> List[str]: |
| """Turn a resolution errors trace into a list of text.""" |
| texts = [] |
| for err in errors: |
| texts.append("Server {} answered {}".format(err[0], err[3])) |
| return texts |
|
|
|
|
| class LifetimeTimeout(dns.exception.Timeout): |
| """The resolution lifetime expired.""" |
|
|
| msg = "The resolution lifetime expired." |
| fmt = "%s after {timeout:.3f} seconds: {errors}" % msg[:-1] |
| supp_kwargs = {"timeout", "errors"} |
|
|
| |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
|
|
| def _fmt_kwargs(self, **kwargs): |
| srv_msgs = _errors_to_text(kwargs["errors"]) |
| return super()._fmt_kwargs( |
| timeout=kwargs["timeout"], errors="; ".join(srv_msgs) |
| ) |
|
|
|
|
| |
| |
| |
| Timeout = LifetimeTimeout |
|
|
|
|
| class NoAnswer(dns.exception.DNSException): |
| """The DNS response does not contain an answer to the question.""" |
|
|
| fmt = "The DNS response does not contain an answer to the question: {query}" |
| supp_kwargs = {"response"} |
|
|
| |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
|
|
| def _fmt_kwargs(self, **kwargs): |
| return super()._fmt_kwargs(query=kwargs["response"].question) |
|
|
| def response(self): |
| return self.kwargs["response"] |
|
|
|
|
| class NoNameservers(dns.exception.DNSException): |
| """All nameservers failed to answer the query. |
| |
| errors: list of servers and respective errors |
| The type of errors is |
| [(server IP address, any object convertible to string)]. |
| Non-empty errors list will add explanatory message () |
| """ |
|
|
| msg = "All nameservers failed to answer the query." |
| fmt = "%s {query}: {errors}" % msg[:-1] |
| supp_kwargs = {"request", "errors"} |
|
|
| |
| |
| def __init__(self, *args, **kwargs): |
| super().__init__(*args, **kwargs) |
|
|
| def _fmt_kwargs(self, **kwargs): |
| srv_msgs = _errors_to_text(kwargs["errors"]) |
| return super()._fmt_kwargs( |
| query=kwargs["request"].question, errors="; ".join(srv_msgs) |
| ) |
|
|
|
|
| class NotAbsolute(dns.exception.DNSException): |
| """An absolute domain name is required but a relative name was provided.""" |
|
|
|
|
| class NoRootSOA(dns.exception.DNSException): |
| """There is no SOA RR at the DNS root name. This should never happen!""" |
|
|
|
|
| class NoMetaqueries(dns.exception.DNSException): |
| """DNS metaqueries are not allowed.""" |
|
|
|
|
| class NoResolverConfiguration(dns.exception.DNSException): |
| """Resolver configuration could not be read or specified no nameservers.""" |
|
|
|
|
| class Answer: |
| """DNS stub resolver answer. |
| |
| Instances of this class bundle up the result of a successful DNS |
| resolution. |
| |
| For convenience, the answer object implements much of the sequence |
| protocol, forwarding to its ``rrset`` attribute. E.g. |
| ``for a in answer`` is equivalent to ``for a in answer.rrset``. |
| ``answer[i]`` is equivalent to ``answer.rrset[i]``, and |
| ``answer[i:j]`` is equivalent to ``answer.rrset[i:j]``. |
| |
| Note that CNAMEs or DNAMEs in the response may mean that answer |
| RRset's name might not be the query name. |
| """ |
|
|
| def __init__( |
| self, |
| qname: dns.name.Name, |
| rdtype: dns.rdatatype.RdataType, |
| rdclass: dns.rdataclass.RdataClass, |
| response: dns.message.QueryMessage, |
| nameserver: Optional[str] = None, |
| port: Optional[int] = None, |
| ) -> None: |
| self.qname = qname |
| self.rdtype = rdtype |
| self.rdclass = rdclass |
| self.response = response |
| self.nameserver = nameserver |
| self.port = port |
| self.chaining_result = response.resolve_chaining() |
| |
| |
| self.canonical_name = self.chaining_result.canonical_name |
| self.rrset = self.chaining_result.answer |
| self.expiration = time.time() + self.chaining_result.minimum_ttl |
|
|
| def __getattr__(self, attr): |
| if attr == "name": |
| return self.rrset.name |
| elif attr == "ttl": |
| return self.rrset.ttl |
| elif attr == "covers": |
| return self.rrset.covers |
| elif attr == "rdclass": |
| return self.rrset.rdclass |
| elif attr == "rdtype": |
| return self.rrset.rdtype |
| else: |
| raise AttributeError(attr) |
|
|
| def __len__(self) -> int: |
| return self.rrset and len(self.rrset) or 0 |
|
|
| def __iter__(self): |
| return self.rrset and iter(self.rrset) or iter(tuple()) |
|
|
| def __getitem__(self, i): |
| if self.rrset is None: |
| raise IndexError |
| return self.rrset[i] |
|
|
| def __delitem__(self, i): |
| if self.rrset is None: |
| raise IndexError |
| del self.rrset[i] |
|
|
|
|
| class Answers(dict): |
| """A dict of DNS stub resolver answers, indexed by type.""" |
|
|
|
|
| class HostAnswers(Answers): |
| """A dict of DNS stub resolver answers to a host name lookup, indexed by |
| type. |
| """ |
|
|
| @classmethod |
| def make( |
| cls, |
| v6: Optional[Answer] = None, |
| v4: Optional[Answer] = None, |
| add_empty: bool = True, |
| ) -> "HostAnswers": |
| answers = HostAnswers() |
| if v6 is not None and (add_empty or v6.rrset): |
| answers[dns.rdatatype.AAAA] = v6 |
| if v4 is not None and (add_empty or v4.rrset): |
| answers[dns.rdatatype.A] = v4 |
| return answers |
|
|
| |
| |
| def addresses_and_families( |
| self, family: int = socket.AF_UNSPEC |
| ) -> Iterator[Tuple[str, int]]: |
| if family == socket.AF_UNSPEC: |
| yield from self.addresses_and_families(socket.AF_INET6) |
| yield from self.addresses_and_families(socket.AF_INET) |
| return |
| elif family == socket.AF_INET6: |
| answer = self.get(dns.rdatatype.AAAA) |
| elif family == socket.AF_INET: |
| answer = self.get(dns.rdatatype.A) |
| else: |
| raise NotImplementedError(f"unknown address family {family}") |
| if answer: |
| for rdata in answer: |
| yield (rdata.address, family) |
|
|
| |
| |
| def addresses(self, family: int = socket.AF_UNSPEC) -> Iterator[str]: |
| return (pair[0] for pair in self.addresses_and_families(family)) |
|
|
| |
| def canonical_name(self) -> dns.name.Name: |
| answer = self.get(dns.rdatatype.AAAA, self.get(dns.rdatatype.A)) |
| return answer.canonical_name |
|
|
|
|
| class CacheStatistics: |
| """Cache Statistics""" |
|
|
| def __init__(self, hits: int = 0, misses: int = 0) -> None: |
| self.hits = hits |
| self.misses = misses |
|
|
| def reset(self) -> None: |
| self.hits = 0 |
| self.misses = 0 |
|
|
| def clone(self) -> "CacheStatistics": |
| return CacheStatistics(self.hits, self.misses) |
|
|
|
|
| class CacheBase: |
| def __init__(self) -> None: |
| self.lock = threading.Lock() |
| self.statistics = CacheStatistics() |
|
|
| def reset_statistics(self) -> None: |
| """Reset all statistics to zero.""" |
| with self.lock: |
| self.statistics.reset() |
|
|
| def hits(self) -> int: |
| """How many hits has the cache had?""" |
| with self.lock: |
| return self.statistics.hits |
|
|
| def misses(self) -> int: |
| """How many misses has the cache had?""" |
| with self.lock: |
| return self.statistics.misses |
|
|
| def get_statistics_snapshot(self) -> CacheStatistics: |
| """Return a consistent snapshot of all the statistics. |
| |
| If running with multiple threads, it's better to take a |
| snapshot than to call statistics methods such as hits() and |
| misses() individually. |
| """ |
| with self.lock: |
| return self.statistics.clone() |
|
|
|
|
| CacheKey = Tuple[dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass] |
|
|
|
|
| class Cache(CacheBase): |
| """Simple thread-safe DNS answer cache.""" |
|
|
| def __init__(self, cleaning_interval: float = 300.0) -> None: |
| """*cleaning_interval*, a ``float`` is the number of seconds between |
| periodic cleanings. |
| """ |
|
|
| super().__init__() |
| self.data: Dict[CacheKey, Answer] = {} |
| self.cleaning_interval = cleaning_interval |
| self.next_cleaning: float = time.time() + self.cleaning_interval |
|
|
| def _maybe_clean(self) -> None: |
| """Clean the cache if it's time to do so.""" |
|
|
| now = time.time() |
| if self.next_cleaning <= now: |
| keys_to_delete = [] |
| for k, v in self.data.items(): |
| if v.expiration <= now: |
| keys_to_delete.append(k) |
| for k in keys_to_delete: |
| del self.data[k] |
| now = time.time() |
| self.next_cleaning = now + self.cleaning_interval |
|
|
| def get(self, key: CacheKey) -> Optional[Answer]: |
| """Get the answer associated with *key*. |
| |
| Returns None if no answer is cached for the key. |
| |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| tuple whose values are the query name, rdtype, and rdclass respectively. |
| |
| Returns a ``dns.resolver.Answer`` or ``None``. |
| """ |
|
|
| with self.lock: |
| self._maybe_clean() |
| v = self.data.get(key) |
| if v is None or v.expiration <= time.time(): |
| self.statistics.misses += 1 |
| return None |
| self.statistics.hits += 1 |
| return v |
|
|
| def put(self, key: CacheKey, value: Answer) -> None: |
| """Associate key and value in the cache. |
| |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| tuple whose values are the query name, rdtype, and rdclass respectively. |
| |
| *value*, a ``dns.resolver.Answer``, the answer. |
| """ |
|
|
| with self.lock: |
| self._maybe_clean() |
| self.data[key] = value |
|
|
| def flush(self, key: Optional[CacheKey] = None) -> None: |
| """Flush the cache. |
| |
| If *key* is not ``None``, only that item is flushed. Otherwise the entire cache |
| is flushed. |
| |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| tuple whose values are the query name, rdtype, and rdclass respectively. |
| """ |
|
|
| with self.lock: |
| if key is not None: |
| if key in self.data: |
| del self.data[key] |
| else: |
| self.data = {} |
| self.next_cleaning = time.time() + self.cleaning_interval |
|
|
|
|
| class LRUCacheNode: |
| """LRUCache node.""" |
|
|
| def __init__(self, key, value): |
| self.key = key |
| self.value = value |
| self.hits = 0 |
| self.prev = self |
| self.next = self |
|
|
| def link_after(self, node: "LRUCacheNode") -> None: |
| self.prev = node |
| self.next = node.next |
| node.next.prev = self |
| node.next = self |
|
|
| def unlink(self) -> None: |
| self.next.prev = self.prev |
| self.prev.next = self.next |
|
|
|
|
| class LRUCache(CacheBase): |
| """Thread-safe, bounded, least-recently-used DNS answer cache. |
| |
| This cache is better than the simple cache (above) if you're |
| running a web crawler or other process that does a lot of |
| resolutions. The LRUCache has a maximum number of nodes, and when |
| it is full, the least-recently used node is removed to make space |
| for a new one. |
| """ |
|
|
| def __init__(self, max_size: int = 100000) -> None: |
| """*max_size*, an ``int``, is the maximum number of nodes to cache; |
| it must be greater than 0. |
| """ |
|
|
| super().__init__() |
| self.data: Dict[CacheKey, LRUCacheNode] = {} |
| self.set_max_size(max_size) |
| self.sentinel: LRUCacheNode = LRUCacheNode(None, None) |
| self.sentinel.prev = self.sentinel |
| self.sentinel.next = self.sentinel |
|
|
| def set_max_size(self, max_size: int) -> None: |
| if max_size < 1: |
| max_size = 1 |
| self.max_size = max_size |
|
|
| def get(self, key: CacheKey) -> Optional[Answer]: |
| """Get the answer associated with *key*. |
| |
| Returns None if no answer is cached for the key. |
| |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| tuple whose values are the query name, rdtype, and rdclass respectively. |
| |
| Returns a ``dns.resolver.Answer`` or ``None``. |
| """ |
|
|
| with self.lock: |
| node = self.data.get(key) |
| if node is None: |
| self.statistics.misses += 1 |
| return None |
| |
| |
| node.unlink() |
| if node.value.expiration <= time.time(): |
| del self.data[node.key] |
| self.statistics.misses += 1 |
| return None |
| node.link_after(self.sentinel) |
| self.statistics.hits += 1 |
| node.hits += 1 |
| return node.value |
|
|
| def get_hits_for_key(self, key: CacheKey) -> int: |
| """Return the number of cache hits associated with the specified key.""" |
| with self.lock: |
| node = self.data.get(key) |
| if node is None or node.value.expiration <= time.time(): |
| return 0 |
| else: |
| return node.hits |
|
|
| def put(self, key: CacheKey, value: Answer) -> None: |
| """Associate key and value in the cache. |
| |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| tuple whose values are the query name, rdtype, and rdclass respectively. |
| |
| *value*, a ``dns.resolver.Answer``, the answer. |
| """ |
|
|
| with self.lock: |
| node = self.data.get(key) |
| if node is not None: |
| node.unlink() |
| del self.data[node.key] |
| while len(self.data) >= self.max_size: |
| gnode = self.sentinel.prev |
| gnode.unlink() |
| del self.data[gnode.key] |
| node = LRUCacheNode(key, value) |
| node.link_after(self.sentinel) |
| self.data[key] = node |
|
|
| def flush(self, key: Optional[CacheKey] = None) -> None: |
| """Flush the cache. |
| |
| If *key* is not ``None``, only that item is flushed. Otherwise the entire cache |
| is flushed. |
| |
| *key*, a ``(dns.name.Name, dns.rdatatype.RdataType, dns.rdataclass.RdataClass)`` |
| tuple whose values are the query name, rdtype, and rdclass respectively. |
| """ |
|
|
| with self.lock: |
| if key is not None: |
| node = self.data.get(key) |
| if node is not None: |
| node.unlink() |
| del self.data[node.key] |
| else: |
| gnode = self.sentinel.next |
| while gnode != self.sentinel: |
| next = gnode.next |
| gnode.unlink() |
| gnode = next |
| self.data = {} |
|
|
|
|
| class _Resolution: |
| """Helper class for dns.resolver.Resolver.resolve(). |
| |
| All of the "business logic" of resolution is encapsulated in this |
| class, allowing us to have multiple resolve() implementations |
| using different I/O schemes without copying all of the |
| complicated logic. |
| |
| This class is a "friend" to dns.resolver.Resolver and manipulates |
| resolver data structures directly. |
| """ |
|
|
| def __init__( |
| self, |
| resolver: "BaseResolver", |
| qname: Union[dns.name.Name, str], |
| rdtype: Union[dns.rdatatype.RdataType, str], |
| rdclass: Union[dns.rdataclass.RdataClass, str], |
| tcp: bool, |
| raise_on_no_answer: bool, |
| search: Optional[bool], |
| ) -> None: |
| if isinstance(qname, str): |
| qname = dns.name.from_text(qname, None) |
| rdtype = dns.rdatatype.RdataType.make(rdtype) |
| if dns.rdatatype.is_metatype(rdtype): |
| raise NoMetaqueries |
| rdclass = dns.rdataclass.RdataClass.make(rdclass) |
| if dns.rdataclass.is_metaclass(rdclass): |
| raise NoMetaqueries |
| self.resolver = resolver |
| self.qnames_to_try = resolver._get_qnames_to_try(qname, search) |
| self.qnames = self.qnames_to_try[:] |
| self.rdtype = rdtype |
| self.rdclass = rdclass |
| self.tcp = tcp |
| self.raise_on_no_answer = raise_on_no_answer |
| self.nxdomain_responses: Dict[dns.name.Name, dns.message.QueryMessage] = {} |
| |
| self.qname = dns.name.empty |
| self.nameservers: List[dns.nameserver.Nameserver] = [] |
| self.current_nameservers: List[dns.nameserver.Nameserver] = [] |
| self.errors: List[ErrorTuple] = [] |
| self.nameserver: Optional[dns.nameserver.Nameserver] = None |
| self.tcp_attempt = False |
| self.retry_with_tcp = False |
| self.request: Optional[dns.message.QueryMessage] = None |
| self.backoff = 0.0 |
|
|
| def next_request( |
| self, |
| ) -> Tuple[Optional[dns.message.QueryMessage], Optional[Answer]]: |
| """Get the next request to send, and check the cache. |
| |
| Returns a (request, answer) tuple. At most one of request or |
| answer will not be None. |
| """ |
|
|
| |
| |
|
|
| while len(self.qnames) > 0: |
| self.qname = self.qnames.pop(0) |
|
|
| |
| if self.resolver.cache: |
| answer = self.resolver.cache.get( |
| (self.qname, self.rdtype, self.rdclass) |
| ) |
| if answer is not None: |
| if answer.rrset is None and self.raise_on_no_answer: |
| raise NoAnswer(response=answer.response) |
| else: |
| return (None, answer) |
| answer = self.resolver.cache.get( |
| (self.qname, dns.rdatatype.ANY, self.rdclass) |
| ) |
| if answer is not None and answer.response.rcode() == dns.rcode.NXDOMAIN: |
| |
| |
| self.nxdomain_responses[self.qname] = answer.response |
| continue |
|
|
| |
| request = dns.message.make_query(self.qname, self.rdtype, self.rdclass) |
| if self.resolver.keyname is not None: |
| request.use_tsig( |
| self.resolver.keyring, |
| self.resolver.keyname, |
| algorithm=self.resolver.keyalgorithm, |
| ) |
| request.use_edns( |
| self.resolver.edns, |
| self.resolver.ednsflags, |
| self.resolver.payload, |
| options=self.resolver.ednsoptions, |
| ) |
| if self.resolver.flags is not None: |
| request.flags = self.resolver.flags |
|
|
| self.nameservers = self.resolver._enrich_nameservers( |
| self.resolver._nameservers, |
| self.resolver.nameserver_ports, |
| self.resolver.port, |
| ) |
| if self.resolver.rotate: |
| random.shuffle(self.nameservers) |
| self.current_nameservers = self.nameservers[:] |
| self.errors = [] |
| self.nameserver = None |
| self.tcp_attempt = False |
| self.retry_with_tcp = False |
| self.request = request |
| self.backoff = 0.10 |
|
|
| return (request, None) |
|
|
| |
| |
| |
| |
| |
| raise NXDOMAIN(qnames=self.qnames_to_try, responses=self.nxdomain_responses) |
|
|
| def next_nameserver(self) -> Tuple[dns.nameserver.Nameserver, bool, float]: |
| if self.retry_with_tcp: |
| assert self.nameserver is not None |
| assert not self.nameserver.is_always_max_size() |
| self.tcp_attempt = True |
| self.retry_with_tcp = False |
| return (self.nameserver, True, 0) |
|
|
| backoff = 0.0 |
| if not self.current_nameservers: |
| if len(self.nameservers) == 0: |
| |
| raise NoNameservers(request=self.request, errors=self.errors) |
| self.current_nameservers = self.nameservers[:] |
| backoff = self.backoff |
| self.backoff = min(self.backoff * 2, 2) |
|
|
| self.nameserver = self.current_nameservers.pop(0) |
| self.tcp_attempt = self.tcp or self.nameserver.is_always_max_size() |
| return (self.nameserver, self.tcp_attempt, backoff) |
|
|
| def query_result( |
| self, response: Optional[dns.message.Message], ex: Optional[Exception] |
| ) -> Tuple[Optional[Answer], bool]: |
| |
| |
| |
| assert self.nameserver is not None |
| if ex: |
| |
| assert response is None |
| self.errors.append( |
| ( |
| str(self.nameserver), |
| self.tcp_attempt, |
| self.nameserver.answer_port(), |
| ex, |
| response, |
| ) |
| ) |
| if ( |
| isinstance(ex, dns.exception.FormError) |
| or isinstance(ex, EOFError) |
| or isinstance(ex, OSError) |
| or isinstance(ex, NotImplementedError) |
| ): |
| |
| self.nameservers.remove(self.nameserver) |
| elif isinstance(ex, dns.message.Truncated): |
| if self.tcp_attempt: |
| |
| self.nameservers.remove(self.nameserver) |
| else: |
| self.retry_with_tcp = True |
| return (None, False) |
| |
| assert response is not None |
| assert isinstance(response, dns.message.QueryMessage) |
| rcode = response.rcode() |
| if rcode == dns.rcode.NOERROR: |
| try: |
| answer = Answer( |
| self.qname, |
| self.rdtype, |
| self.rdclass, |
| response, |
| self.nameserver.answer_nameserver(), |
| self.nameserver.answer_port(), |
| ) |
| except Exception as e: |
| self.errors.append( |
| ( |
| str(self.nameserver), |
| self.tcp_attempt, |
| self.nameserver.answer_port(), |
| e, |
| response, |
| ) |
| ) |
| |
| self.nameservers.remove(self.nameserver) |
| return (None, False) |
| if self.resolver.cache: |
| self.resolver.cache.put((self.qname, self.rdtype, self.rdclass), answer) |
| if answer.rrset is None and self.raise_on_no_answer: |
| raise NoAnswer(response=answer.response) |
| return (answer, True) |
| elif rcode == dns.rcode.NXDOMAIN: |
| |
| |
| try: |
| answer = Answer( |
| self.qname, dns.rdatatype.ANY, dns.rdataclass.IN, response |
| ) |
| except Exception as e: |
| self.errors.append( |
| ( |
| str(self.nameserver), |
| self.tcp_attempt, |
| self.nameserver.answer_port(), |
| e, |
| response, |
| ) |
| ) |
| |
| self.nameservers.remove(self.nameserver) |
| return (None, False) |
| self.nxdomain_responses[self.qname] = response |
| if self.resolver.cache: |
| self.resolver.cache.put( |
| (self.qname, dns.rdatatype.ANY, self.rdclass), answer |
| ) |
| |
| |
| return (None, True) |
| elif rcode == dns.rcode.YXDOMAIN: |
| yex = YXDOMAIN() |
| self.errors.append( |
| ( |
| str(self.nameserver), |
| self.tcp_attempt, |
| self.nameserver.answer_port(), |
| yex, |
| response, |
| ) |
| ) |
| raise yex |
| else: |
| |
| |
| |
| |
| if rcode != dns.rcode.SERVFAIL or not self.resolver.retry_servfail: |
| self.nameservers.remove(self.nameserver) |
| self.errors.append( |
| ( |
| str(self.nameserver), |
| self.tcp_attempt, |
| self.nameserver.answer_port(), |
| dns.rcode.to_text(rcode), |
| response, |
| ) |
| ) |
| return (None, False) |
|
|
|
|
| class BaseResolver: |
| """DNS stub resolver.""" |
|
|
| |
| |
| |
|
|
| domain: dns.name.Name |
| nameserver_ports: Dict[str, int] |
| port: int |
| search: List[dns.name.Name] |
| use_search_by_default: bool |
| timeout: float |
| lifetime: float |
| keyring: Optional[Any] |
| keyname: Optional[Union[dns.name.Name, str]] |
| keyalgorithm: Union[dns.name.Name, str] |
| edns: int |
| ednsflags: int |
| ednsoptions: Optional[List[dns.edns.Option]] |
| payload: int |
| cache: Any |
| flags: Optional[int] |
| retry_servfail: bool |
| rotate: bool |
| ndots: Optional[int] |
| _nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] |
|
|
| def __init__( |
| self, filename: str = "/etc/resolv.conf", configure: bool = True |
| ) -> None: |
| """*filename*, a ``str`` or file object, specifying a file |
| in standard /etc/resolv.conf format. This parameter is meaningful |
| only when *configure* is true and the platform is POSIX. |
| |
| *configure*, a ``bool``. If True (the default), the resolver |
| instance is configured in the normal fashion for the operating |
| system the resolver is running on. (I.e. by reading a |
| /etc/resolv.conf file on POSIX systems and from the registry |
| on Windows systems.) |
| """ |
|
|
| self.reset() |
| if configure: |
| if sys.platform == "win32": |
| self.read_registry() |
| elif filename: |
| self.read_resolv_conf(filename) |
|
|
| def reset(self) -> None: |
| """Reset all resolver configuration to the defaults.""" |
|
|
| self.domain = dns.name.Name(dns.name.from_text(socket.gethostname())[1:]) |
| if len(self.domain) == 0: |
| self.domain = dns.name.root |
| self._nameservers = [] |
| self.nameserver_ports = {} |
| self.port = 53 |
| self.search = [] |
| self.use_search_by_default = False |
| self.timeout = 2.0 |
| self.lifetime = 5.0 |
| self.keyring = None |
| self.keyname = None |
| self.keyalgorithm = dns.tsig.default_algorithm |
| self.edns = -1 |
| self.ednsflags = 0 |
| self.ednsoptions = None |
| self.payload = 0 |
| self.cache = None |
| self.flags = None |
| self.retry_servfail = False |
| self.rotate = False |
| self.ndots = None |
|
|
| def read_resolv_conf(self, f: Any) -> None: |
| """Process *f* as a file in the /etc/resolv.conf format. If f is |
| a ``str``, it is used as the name of the file to open; otherwise it |
| is treated as the file itself. |
| |
| Interprets the following items: |
| |
| - nameserver - name server IP address |
| |
| - domain - local domain name |
| |
| - search - search list for host-name lookup |
| |
| - options - supported options are rotate, timeout, edns0, and ndots |
| |
| """ |
|
|
| nameservers = [] |
| if isinstance(f, str): |
| try: |
| cm: contextlib.AbstractContextManager = open(f) |
| except OSError: |
| |
| raise NoResolverConfiguration(f"cannot open {f}") |
| else: |
| cm = contextlib.nullcontext(f) |
| with cm as f: |
| for l in f: |
| if len(l) == 0 or l[0] == "#" or l[0] == ";": |
| continue |
| tokens = l.split() |
|
|
| |
| if len(tokens) < 2: |
| continue |
|
|
| if tokens[0] == "nameserver": |
| nameservers.append(tokens[1]) |
| elif tokens[0] == "domain": |
| self.domain = dns.name.from_text(tokens[1]) |
| |
| self.search = [] |
| elif tokens[0] == "search": |
| |
| self.search = [] |
| for suffix in tokens[1:]: |
| self.search.append(dns.name.from_text(suffix)) |
| |
| |
| elif tokens[0] == "options": |
| for opt in tokens[1:]: |
| if opt == "rotate": |
| self.rotate = True |
| elif opt == "edns0": |
| self.use_edns() |
| elif "timeout" in opt: |
| try: |
| self.timeout = int(opt.split(":")[1]) |
| except (ValueError, IndexError): |
| pass |
| elif "ndots" in opt: |
| try: |
| self.ndots = int(opt.split(":")[1]) |
| except (ValueError, IndexError): |
| pass |
| if len(nameservers) == 0: |
| raise NoResolverConfiguration("no nameservers") |
| |
| |
| self.nameservers = nameservers |
|
|
| def read_registry(self) -> None: |
| """Extract resolver configuration from the Windows registry.""" |
| try: |
| info = dns.win32util.get_dns_info() |
| if info.domain is not None: |
| self.domain = info.domain |
| self.nameservers = info.nameservers |
| self.search = info.search |
| except AttributeError: |
| raise NotImplementedError |
|
|
| def _compute_timeout( |
| self, |
| start: float, |
| lifetime: Optional[float] = None, |
| errors: Optional[List[ErrorTuple]] = None, |
| ) -> float: |
| lifetime = self.lifetime if lifetime is None else lifetime |
| now = time.time() |
| duration = now - start |
| if errors is None: |
| errors = [] |
| if duration < 0: |
| if duration < -1: |
| |
| raise LifetimeTimeout(timeout=duration, errors=errors) |
| else: |
| |
| |
| |
| duration = 0 |
| if duration >= lifetime: |
| raise LifetimeTimeout(timeout=duration, errors=errors) |
| return min(lifetime - duration, self.timeout) |
|
|
| def _get_qnames_to_try( |
| self, qname: dns.name.Name, search: Optional[bool] |
| ) -> List[dns.name.Name]: |
| |
| |
| if search is None: |
| search = self.use_search_by_default |
| qnames_to_try = [] |
| if qname.is_absolute(): |
| qnames_to_try.append(qname) |
| else: |
| abs_qname = qname.concatenate(dns.name.root) |
| if search: |
| if len(self.search) > 0: |
| |
| search_list = self.search[:] |
| elif self.domain != dns.name.root and self.domain is not None: |
| |
| |
| search_list = [self.domain] |
| else: |
| search_list = [] |
| |
| if self.ndots is None: |
| ndots = 1 |
| else: |
| ndots = self.ndots |
| for suffix in search_list: |
| qnames_to_try.append(qname + suffix) |
| if len(qname) > ndots: |
| |
| |
| qnames_to_try.insert(0, abs_qname) |
| else: |
| |
| |
| qnames_to_try.append(abs_qname) |
| else: |
| qnames_to_try.append(abs_qname) |
| return qnames_to_try |
|
|
| def use_tsig( |
| self, |
| keyring: Any, |
| keyname: Optional[Union[dns.name.Name, str]] = None, |
| algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm, |
| ) -> None: |
| """Add a TSIG signature to each query. |
| |
| The parameters are passed to ``dns.message.Message.use_tsig()``; |
| see its documentation for details. |
| """ |
|
|
| self.keyring = keyring |
| self.keyname = keyname |
| self.keyalgorithm = algorithm |
|
|
| def use_edns( |
| self, |
| edns: Optional[Union[int, bool]] = 0, |
| ednsflags: int = 0, |
| payload: int = dns.message.DEFAULT_EDNS_PAYLOAD, |
| options: Optional[List[dns.edns.Option]] = None, |
| ) -> None: |
| """Configure EDNS behavior. |
| |
| *edns*, an ``int``, is the EDNS level to use. Specifying |
| ``None``, ``False``, or ``-1`` means "do not use EDNS", and in this case |
| the other parameters are ignored. Specifying ``True`` is |
| equivalent to specifying 0, i.e. "use EDNS0". |
| |
| *ednsflags*, an ``int``, the EDNS flag values. |
| |
| *payload*, an ``int``, is the EDNS sender's payload field, which is the |
| maximum size of UDP datagram the sender can handle. I.e. how big |
| a response to this message can be. |
| |
| *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS |
| options. |
| """ |
|
|
| if edns is None or edns is False: |
| edns = -1 |
| elif edns is True: |
| edns = 0 |
| self.edns = edns |
| self.ednsflags = ednsflags |
| self.payload = payload |
| self.ednsoptions = options |
|
|
| def set_flags(self, flags: int) -> None: |
| """Overrides the default flags with your own. |
| |
| *flags*, an ``int``, the message flags to use. |
| """ |
|
|
| self.flags = flags |
|
|
| @classmethod |
| def _enrich_nameservers( |
| cls, |
| nameservers: Sequence[Union[str, dns.nameserver.Nameserver]], |
| nameserver_ports: Dict[str, int], |
| default_port: int, |
| ) -> List[dns.nameserver.Nameserver]: |
| enriched_nameservers = [] |
| if isinstance(nameservers, list): |
| for nameserver in nameservers: |
| enriched_nameserver: dns.nameserver.Nameserver |
| if isinstance(nameserver, dns.nameserver.Nameserver): |
| enriched_nameserver = nameserver |
| elif dns.inet.is_address(nameserver): |
| port = nameserver_ports.get(nameserver, default_port) |
| enriched_nameserver = dns.nameserver.Do53Nameserver( |
| nameserver, port |
| ) |
| else: |
| try: |
| if urlparse(nameserver).scheme != "https": |
| raise NotImplementedError |
| except Exception: |
| raise ValueError( |
| f"nameserver {nameserver} is not a " |
| "dns.nameserver.Nameserver instance or text form, " |
| "IP address, nor a valid https URL" |
| ) |
| enriched_nameserver = dns.nameserver.DoHNameserver(nameserver) |
| enriched_nameservers.append(enriched_nameserver) |
| else: |
| raise ValueError( |
| "nameservers must be a list or tuple (not a {})".format( |
| type(nameservers) |
| ) |
| ) |
| return enriched_nameservers |
|
|
| @property |
| def nameservers( |
| self, |
| ) -> Sequence[Union[str, dns.nameserver.Nameserver]]: |
| return self._nameservers |
|
|
| @nameservers.setter |
| def nameservers( |
| self, nameservers: Sequence[Union[str, dns.nameserver.Nameserver]] |
| ) -> None: |
| """ |
| *nameservers*, a ``list`` of nameservers, where a nameserver is either |
| a string interpretable as a nameserver, or a ``dns.nameserver.Nameserver`` |
| instance. |
| |
| Raises ``ValueError`` if *nameservers* is not a list of nameservers. |
| """ |
| |
| self._enrich_nameservers(nameservers, self.nameserver_ports, self.port) |
| self._nameservers = nameservers |
|
|
|
|
| class Resolver(BaseResolver): |
| """DNS stub resolver.""" |
|
|
| def resolve( |
| self, |
| qname: Union[dns.name.Name, str], |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| tcp: bool = False, |
| source: Optional[str] = None, |
| raise_on_no_answer: bool = True, |
| source_port: int = 0, |
| lifetime: Optional[float] = None, |
| search: Optional[bool] = None, |
| ) -> Answer: |
| """Query nameservers to find the answer to the question. |
| |
| The *qname*, *rdtype*, and *rdclass* parameters may be objects |
| of the appropriate type, or strings that can be converted into objects |
| of the appropriate type. |
| |
| *qname*, a ``dns.name.Name`` or ``str``, the query name. |
| |
| *rdtype*, an ``int`` or ``str``, the query type. |
| |
| *rdclass*, an ``int`` or ``str``, the query class. |
| |
| *tcp*, a ``bool``. If ``True``, use TCP to make the query. |
| |
| *source*, a ``str`` or ``None``. If not ``None``, bind to this IP |
| address when making queries. |
| |
| *raise_on_no_answer*, a ``bool``. If ``True``, raise |
| ``dns.resolver.NoAnswer`` if there's no answer to the question. |
| |
| *source_port*, an ``int``, the port from which to send the message. |
| |
| *lifetime*, a ``float``, how many seconds a query should run |
| before timing out. |
| |
| *search*, a ``bool`` or ``None``, determines whether the |
| search list configured in the system's resolver configuration |
| are used for relative names, and whether the resolver's domain |
| may be added to relative names. The default is ``None``, |
| which causes the value of the resolver's |
| ``use_search_by_default`` attribute to be used. |
| |
| Raises ``dns.resolver.LifetimeTimeout`` if no answers could be found |
| in the specified lifetime. |
| |
| Raises ``dns.resolver.NXDOMAIN`` if the query name does not exist. |
| |
| Raises ``dns.resolver.YXDOMAIN`` if the query name is too long after |
| DNAME substitution. |
| |
| Raises ``dns.resolver.NoAnswer`` if *raise_on_no_answer* is |
| ``True`` and the query name exists but has no RRset of the |
| desired type and class. |
| |
| Raises ``dns.resolver.NoNameservers`` if no non-broken |
| nameservers are available to answer the question. |
| |
| Returns a ``dns.resolver.Answer`` instance. |
| |
| """ |
|
|
| resolution = _Resolution( |
| self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search |
| ) |
| start = time.time() |
| while True: |
| (request, answer) = resolution.next_request() |
| |
| |
| |
| |
| if answer is not None: |
| |
| return answer |
| assert request is not None |
| done = False |
| while not done: |
| (nameserver, tcp, backoff) = resolution.next_nameserver() |
| if backoff: |
| time.sleep(backoff) |
| timeout = self._compute_timeout(start, lifetime, resolution.errors) |
| try: |
| response = nameserver.query( |
| request, |
| timeout=timeout, |
| source=source, |
| source_port=source_port, |
| max_size=tcp, |
| ) |
| except Exception as ex: |
| (_, done) = resolution.query_result(None, ex) |
| continue |
| (answer, done) = resolution.query_result(response, None) |
| |
| |
| |
| |
| if answer is not None: |
| return answer |
|
|
| def query( |
| self, |
| qname: Union[dns.name.Name, str], |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| tcp: bool = False, |
| source: Optional[str] = None, |
| raise_on_no_answer: bool = True, |
| source_port: int = 0, |
| lifetime: Optional[float] = None, |
| ) -> Answer: |
| """Query nameservers to find the answer to the question. |
| |
| This method calls resolve() with ``search=True``, and is |
| provided for backwards compatibility with prior versions of |
| dnspython. See the documentation for the resolve() method for |
| further details. |
| """ |
| warnings.warn( |
| "please use dns.resolver.Resolver.resolve() instead", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return self.resolve( |
| qname, |
| rdtype, |
| rdclass, |
| tcp, |
| source, |
| raise_on_no_answer, |
| source_port, |
| lifetime, |
| True, |
| ) |
|
|
| def resolve_address(self, ipaddr: str, *args: Any, **kwargs: Any) -> Answer: |
| """Use a resolver to run a reverse query for PTR records. |
| |
| This utilizes the resolve() method to perform a PTR lookup on the |
| specified IP address. |
| |
| *ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get |
| the PTR record for. |
| |
| All other arguments that can be passed to the resolve() function |
| except for rdtype and rdclass are also supported by this |
| function. |
| """ |
| |
| |
| |
| modified_kwargs: Dict[str, Any] = {} |
| modified_kwargs.update(kwargs) |
| modified_kwargs["rdtype"] = dns.rdatatype.PTR |
| modified_kwargs["rdclass"] = dns.rdataclass.IN |
| return self.resolve( |
| dns.reversename.from_address(ipaddr), *args, **modified_kwargs |
| ) |
|
|
| def resolve_name( |
| self, |
| name: Union[dns.name.Name, str], |
| family: int = socket.AF_UNSPEC, |
| **kwargs: Any, |
| ) -> HostAnswers: |
| """Use a resolver to query for address records. |
| |
| This utilizes the resolve() method to perform A and/or AAAA lookups on |
| the specified name. |
| |
| *qname*, a ``dns.name.Name`` or ``str``, the name to resolve. |
| |
| *family*, an ``int``, the address family. If socket.AF_UNSPEC |
| (the default), both A and AAAA records will be retrieved. |
| |
| All other arguments that can be passed to the resolve() function |
| except for rdtype and rdclass are also supported by this |
| function. |
| """ |
| |
| |
| |
| modified_kwargs: Dict[str, Any] = {} |
| modified_kwargs.update(kwargs) |
| modified_kwargs.pop("rdtype", None) |
| modified_kwargs["rdclass"] = dns.rdataclass.IN |
|
|
| if family == socket.AF_INET: |
| v4 = self.resolve(name, dns.rdatatype.A, **modified_kwargs) |
| return HostAnswers.make(v4=v4) |
| elif family == socket.AF_INET6: |
| v6 = self.resolve(name, dns.rdatatype.AAAA, **modified_kwargs) |
| return HostAnswers.make(v6=v6) |
| elif family != socket.AF_UNSPEC: |
| raise NotImplementedError(f"unknown address family {family}") |
|
|
| raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True) |
| lifetime = modified_kwargs.pop("lifetime", None) |
| start = time.time() |
| v6 = self.resolve( |
| name, |
| dns.rdatatype.AAAA, |
| raise_on_no_answer=False, |
| lifetime=self._compute_timeout(start, lifetime), |
| **modified_kwargs, |
| ) |
| |
| |
| |
| |
| |
| name = v6.qname |
| v4 = self.resolve( |
| name, |
| dns.rdatatype.A, |
| raise_on_no_answer=False, |
| lifetime=self._compute_timeout(start, lifetime), |
| **modified_kwargs, |
| ) |
| answers = HostAnswers.make(v6=v6, v4=v4, add_empty=not raise_on_no_answer) |
| if not answers: |
| raise NoAnswer(response=v6.response) |
| return answers |
|
|
| |
|
|
| def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name: |
| """Determine the canonical name of *name*. |
| |
| The canonical name is the name the resolver uses for queries |
| after all CNAME and DNAME renamings have been applied. |
| |
| *name*, a ``dns.name.Name`` or ``str``, the query name. |
| |
| This method can raise any exception that ``resolve()`` can |
| raise, other than ``dns.resolver.NoAnswer`` and |
| ``dns.resolver.NXDOMAIN``. |
| |
| Returns a ``dns.name.Name``. |
| """ |
| try: |
| answer = self.resolve(name, raise_on_no_answer=False) |
| canonical_name = answer.canonical_name |
| except dns.resolver.NXDOMAIN as e: |
| canonical_name = e.canonical_name |
| return canonical_name |
|
|
| |
|
|
| def try_ddr(self, lifetime: float = 5.0) -> None: |
| """Try to update the resolver's nameservers using Discovery of Designated |
| Resolvers (DDR). If successful, the resolver will subsequently use |
| DNS-over-HTTPS or DNS-over-TLS for future queries. |
| |
| *lifetime*, a float, is the maximum time to spend attempting DDR. The default |
| is 5 seconds. |
| |
| If the SVCB query is successful and results in a non-empty list of nameservers, |
| then the resolver's nameservers are set to the returned servers in priority |
| order. |
| |
| The current implementation does not use any address hints from the SVCB record, |
| nor does it resolve addresses for the SCVB target name, rather it assumes that |
| the bootstrap nameserver will always be one of the addresses and uses it. |
| A future revision to the code may offer fuller support. The code verifies that |
| the bootstrap nameserver is in the Subject Alternative Name field of the |
| TLS certficate. |
| """ |
| try: |
| expiration = time.time() + lifetime |
| answer = self.resolve( |
| dns._ddr._local_resolver_name, "SVCB", lifetime=lifetime |
| ) |
| timeout = dns.query._remaining(expiration) |
| nameservers = dns._ddr._get_nameservers_sync(answer, timeout) |
| if len(nameservers) > 0: |
| self.nameservers = nameservers |
| except Exception: |
| pass |
|
|
|
|
| |
| default_resolver: Optional[Resolver] = None |
|
|
|
|
| def get_default_resolver() -> Resolver: |
| """Get the default resolver, initializing it if necessary.""" |
| if default_resolver is None: |
| reset_default_resolver() |
| assert default_resolver is not None |
| return default_resolver |
|
|
|
|
| def reset_default_resolver() -> None: |
| """Re-initialize default resolver. |
| |
| Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX |
| systems) will be re-read immediately. |
| """ |
|
|
| global default_resolver |
| default_resolver = Resolver() |
|
|
|
|
| def resolve( |
| qname: Union[dns.name.Name, str], |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| tcp: bool = False, |
| source: Optional[str] = None, |
| raise_on_no_answer: bool = True, |
| source_port: int = 0, |
| lifetime: Optional[float] = None, |
| search: Optional[bool] = None, |
| ) -> Answer: |
| """Query nameservers to find the answer to the question. |
| |
| This is a convenience function that uses the default resolver |
| object to make the query. |
| |
| See ``dns.resolver.Resolver.resolve`` for more information on the |
| parameters. |
| """ |
|
|
| return get_default_resolver().resolve( |
| qname, |
| rdtype, |
| rdclass, |
| tcp, |
| source, |
| raise_on_no_answer, |
| source_port, |
| lifetime, |
| search, |
| ) |
|
|
|
|
| def query( |
| qname: Union[dns.name.Name, str], |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| tcp: bool = False, |
| source: Optional[str] = None, |
| raise_on_no_answer: bool = True, |
| source_port: int = 0, |
| lifetime: Optional[float] = None, |
| ) -> Answer: |
| """Query nameservers to find the answer to the question. |
| |
| This method calls resolve() with ``search=True``, and is |
| provided for backwards compatibility with prior versions of |
| dnspython. See the documentation for the resolve() method for |
| further details. |
| """ |
| warnings.warn( |
| "please use dns.resolver.resolve() instead", DeprecationWarning, stacklevel=2 |
| ) |
| return resolve( |
| qname, |
| rdtype, |
| rdclass, |
| tcp, |
| source, |
| raise_on_no_answer, |
| source_port, |
| lifetime, |
| True, |
| ) |
|
|
|
|
| def resolve_address(ipaddr: str, *args: Any, **kwargs: Any) -> Answer: |
| """Use a resolver to run a reverse query for PTR records. |
| |
| See ``dns.resolver.Resolver.resolve_address`` for more information on the |
| parameters. |
| """ |
|
|
| return get_default_resolver().resolve_address(ipaddr, *args, **kwargs) |
|
|
|
|
| def resolve_name( |
| name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any |
| ) -> HostAnswers: |
| """Use a resolver to query for address records. |
| |
| See ``dns.resolver.Resolver.resolve_name`` for more information on the |
| parameters. |
| """ |
|
|
| return get_default_resolver().resolve_name(name, family, **kwargs) |
|
|
|
|
| def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name: |
| """Determine the canonical name of *name*. |
| |
| See ``dns.resolver.Resolver.canonical_name`` for more information on the |
| parameters and possible exceptions. |
| """ |
|
|
| return get_default_resolver().canonical_name(name) |
|
|
|
|
| def try_ddr(lifetime: float = 5.0) -> None: |
| """Try to update the default resolver's nameservers using Discovery of Designated |
| Resolvers (DDR). If successful, the resolver will subsequently use |
| DNS-over-HTTPS or DNS-over-TLS for future queries. |
| |
| See :py:func:`dns.resolver.Resolver.try_ddr` for more information. |
| """ |
| return get_default_resolver().try_ddr(lifetime) |
|
|
|
|
| def zone_for_name( |
| name: Union[dns.name.Name, str], |
| rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, |
| tcp: bool = False, |
| resolver: Optional[Resolver] = None, |
| lifetime: Optional[float] = None, |
| ) -> dns.name.Name: |
| """Find the name of the zone which contains the specified name. |
| |
| *name*, an absolute ``dns.name.Name`` or ``str``, the query name. |
| |
| *rdclass*, an ``int``, the query class. |
| |
| *tcp*, a ``bool``. If ``True``, use TCP to make the query. |
| |
| *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. |
| If ``None``, the default, then the default resolver is used. |
| |
| *lifetime*, a ``float``, the total time to allow for the queries needed |
| to determine the zone. If ``None``, the default, then only the individual |
| query limits of the resolver apply. |
| |
| Raises ``dns.resolver.NoRootSOA`` if there is no SOA RR at the DNS |
| root. (This is only likely to happen if you're using non-default |
| root servers in your network and they are misconfigured.) |
| |
| Raises ``dns.resolver.LifetimeTimeout`` if the answer could not be |
| found in the allotted lifetime. |
| |
| Returns a ``dns.name.Name``. |
| """ |
|
|
| if isinstance(name, str): |
| name = dns.name.from_text(name, dns.name.root) |
| if resolver is None: |
| resolver = get_default_resolver() |
| if not name.is_absolute(): |
| raise NotAbsolute(name) |
| start = time.time() |
| expiration: Optional[float] |
| if lifetime is not None: |
| expiration = start + lifetime |
| else: |
| expiration = None |
| while 1: |
| try: |
| rlifetime: Optional[float] |
| if expiration is not None: |
| rlifetime = expiration - time.time() |
| if rlifetime <= 0: |
| rlifetime = 0 |
| else: |
| rlifetime = None |
| answer = resolver.resolve( |
| name, dns.rdatatype.SOA, rdclass, tcp, lifetime=rlifetime |
| ) |
| assert answer.rrset is not None |
| if answer.rrset.name == name: |
| return name |
| |
| except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e: |
| if isinstance(e, dns.resolver.NXDOMAIN): |
| response = e.responses().get(name) |
| else: |
| response = e.response() |
| if response: |
| for rrs in response.authority: |
| if rrs.rdtype == dns.rdatatype.SOA and rrs.rdclass == rdclass: |
| (nr, _, _) = rrs.name.fullcompare(name) |
| if nr == dns.name.NAMERELN_SUPERDOMAIN: |
| |
| |
| |
| |
| |
| |
| return rrs.name |
| |
| |
| try: |
| name = name.parent() |
| except dns.name.NoParent: |
| raise NoRootSOA |
|
|
|
|
| def make_resolver_at( |
| where: Union[dns.name.Name, str], |
| port: int = 53, |
| family: int = socket.AF_UNSPEC, |
| resolver: Optional[Resolver] = None, |
| ) -> Resolver: |
| """Make a stub resolver using the specified destination as the full resolver. |
| |
| *where*, a ``dns.name.Name`` or ``str`` the domain name or IP address of the |
| full resolver. |
| |
| *port*, an ``int``, the port to use. If not specified, the default is 53. |
| |
| *family*, an ``int``, the address family to use. This parameter is used if |
| *where* is not an address. The default is ``socket.AF_UNSPEC`` in which case |
| the first address returned by ``resolve_name()`` will be used, otherwise the |
| first address of the specified family will be used. |
| |
| *resolver*, a ``dns.resolver.Resolver`` or ``None``, the resolver to use for |
| resolution of hostnames. If not specified, the default resolver will be used. |
| |
| Returns a ``dns.resolver.Resolver`` or raises an exception. |
| """ |
| if resolver is None: |
| resolver = get_default_resolver() |
| nameservers: List[Union[str, dns.nameserver.Nameserver]] = [] |
| if isinstance(where, str) and dns.inet.is_address(where): |
| nameservers.append(dns.nameserver.Do53Nameserver(where, port)) |
| else: |
| for address in resolver.resolve_name(where, family).addresses(): |
| nameservers.append(dns.nameserver.Do53Nameserver(address, port)) |
| res = dns.resolver.Resolver(configure=False) |
| res.nameservers = nameservers |
| return res |
|
|
|
|
| def resolve_at( |
| where: Union[dns.name.Name, str], |
| qname: Union[dns.name.Name, str], |
| rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A, |
| rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN, |
| tcp: bool = False, |
| source: Optional[str] = None, |
| raise_on_no_answer: bool = True, |
| source_port: int = 0, |
| lifetime: Optional[float] = None, |
| search: Optional[bool] = None, |
| port: int = 53, |
| family: int = socket.AF_UNSPEC, |
| resolver: Optional[Resolver] = None, |
| ) -> Answer: |
| """Query nameservers to find the answer to the question. |
| |
| This is a convenience function that calls ``dns.resolver.make_resolver_at()`` to |
| make a resolver, and then uses it to resolve the query. |
| |
| See ``dns.resolver.Resolver.resolve`` for more information on the resolution |
| parameters, and ``dns.resolver.make_resolver_at`` for information about the resolver |
| parameters *where*, *port*, *family*, and *resolver*. |
| |
| If making more than one query, it is more efficient to call |
| ``dns.resolver.make_resolver_at()`` and then use that resolver for the queries |
| instead of calling ``resolve_at()`` multiple times. |
| """ |
| return make_resolver_at(where, port, family, resolver).resolve( |
| qname, |
| rdtype, |
| rdclass, |
| tcp, |
| source, |
| raise_on_no_answer, |
| source_port, |
| lifetime, |
| search, |
| ) |
|
|
|
|
| |
| |
| |
| |
|
|
| _protocols_for_socktype = { |
| socket.SOCK_DGRAM: [socket.SOL_UDP], |
| socket.SOCK_STREAM: [socket.SOL_TCP], |
| } |
|
|
| _resolver = None |
| _original_getaddrinfo = socket.getaddrinfo |
| _original_getnameinfo = socket.getnameinfo |
| _original_getfqdn = socket.getfqdn |
| _original_gethostbyname = socket.gethostbyname |
| _original_gethostbyname_ex = socket.gethostbyname_ex |
| _original_gethostbyaddr = socket.gethostbyaddr |
|
|
|
|
| def _getaddrinfo( |
| host=None, service=None, family=socket.AF_UNSPEC, socktype=0, proto=0, flags=0 |
| ): |
| if flags & socket.AI_NUMERICHOST != 0: |
| |
| |
| |
| |
| |
| return _original_getaddrinfo(host, service, family, socktype, proto, flags) |
| if flags & (socket.AI_ADDRCONFIG | socket.AI_V4MAPPED) != 0: |
| |
| |
| |
| |
| |
| |
| |
| |
| raise socket.gaierror( |
| socket.EAI_FAIL, "Non-recoverable failure in name resolution" |
| ) |
| if host is None and service is None: |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| addrs = [] |
| canonical_name = None |
| |
| |
| if host is None: |
| return _original_getaddrinfo(host, service, family, socktype, proto, flags) |
| try: |
| |
| |
| |
| dns.inet.af_for_address(host) |
| return _original_getaddrinfo(host, service, family, socktype, proto, flags) |
| except Exception: |
| pass |
| |
| try: |
| answers = _resolver.resolve_name(host, family) |
| addrs = answers.addresses_and_families() |
| canonical_name = answers.canonical_name().to_text(True) |
| except dns.resolver.NXDOMAIN: |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| except Exception: |
| |
| |
| |
| raise socket.gaierror(socket.EAI_AGAIN, "Temporary failure in name resolution") |
| port = None |
| try: |
| |
| if service is None: |
| port = 0 |
| else: |
| port = int(service) |
| except Exception: |
| if flags & socket.AI_NUMERICSERV == 0: |
| try: |
| port = socket.getservbyname(service) |
| except Exception: |
| pass |
| if port is None: |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| tuples = [] |
| if socktype == 0: |
| socktypes = [socket.SOCK_DGRAM, socket.SOCK_STREAM] |
| else: |
| socktypes = [socktype] |
| if flags & socket.AI_CANONNAME != 0: |
| cname = canonical_name |
| else: |
| cname = "" |
| for addr, af in addrs: |
| for socktype in socktypes: |
| for proto in _protocols_for_socktype[socktype]: |
| addr_tuple = dns.inet.low_level_address_tuple((addr, port), af) |
| tuples.append((af, socktype, proto, cname, addr_tuple)) |
| if len(tuples) == 0: |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| return tuples |
|
|
|
|
| def _getnameinfo(sockaddr, flags=0): |
| host = sockaddr[0] |
| port = sockaddr[1] |
| if len(sockaddr) == 4: |
| scope = sockaddr[3] |
| family = socket.AF_INET6 |
| else: |
| scope = None |
| family = socket.AF_INET |
| tuples = _getaddrinfo(host, port, family, socket.SOCK_STREAM, socket.SOL_TCP, 0) |
| if len(tuples) > 1: |
| raise socket.error("sockaddr resolved to multiple addresses") |
| addr = tuples[0][4][0] |
| if flags & socket.NI_DGRAM: |
| pname = "udp" |
| else: |
| pname = "tcp" |
| qname = dns.reversename.from_address(addr) |
| if flags & socket.NI_NUMERICHOST == 0: |
| try: |
| answer = _resolver.resolve(qname, "PTR") |
| hostname = answer.rrset[0].target.to_text(True) |
| except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): |
| if flags & socket.NI_NAMEREQD: |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| hostname = addr |
| if scope is not None: |
| hostname += "%" + str(scope) |
| else: |
| hostname = addr |
| if scope is not None: |
| hostname += "%" + str(scope) |
| if flags & socket.NI_NUMERICSERV: |
| service = str(port) |
| else: |
| service = socket.getservbyport(port, pname) |
| return (hostname, service) |
|
|
|
|
| def _getfqdn(name=None): |
| if name is None: |
| name = socket.gethostname() |
| try: |
| (name, _, _) = _gethostbyaddr(name) |
| |
| |
| except Exception: |
| pass |
| return name |
|
|
|
|
| def _gethostbyname(name): |
| return _gethostbyname_ex(name)[2][0] |
|
|
|
|
| def _gethostbyname_ex(name): |
| aliases = [] |
| addresses = [] |
| tuples = _getaddrinfo( |
| name, 0, socket.AF_INET, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME |
| ) |
| canonical = tuples[0][3] |
| for item in tuples: |
| addresses.append(item[4][0]) |
| |
| return (canonical, aliases, addresses) |
|
|
|
|
| def _gethostbyaddr(ip): |
| try: |
| dns.ipv6.inet_aton(ip) |
| sockaddr = (ip, 80, 0, 0) |
| family = socket.AF_INET6 |
| except Exception: |
| try: |
| dns.ipv4.inet_aton(ip) |
| except Exception: |
| raise socket.gaierror(socket.EAI_NONAME, "Name or service not known") |
| sockaddr = (ip, 80) |
| family = socket.AF_INET |
| (name, _) = _getnameinfo(sockaddr, socket.NI_NAMEREQD) |
| aliases = [] |
| addresses = [] |
| tuples = _getaddrinfo( |
| name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP, socket.AI_CANONNAME |
| ) |
| canonical = tuples[0][3] |
| |
| |
| |
| bin_ip = dns.inet.inet_pton(family, ip) |
| for item in tuples: |
| addr = item[4][0] |
| bin_addr = dns.inet.inet_pton(family, addr) |
| if bin_ip == bin_addr: |
| addresses.append(addr) |
| |
| return (canonical, aliases, addresses) |
|
|
|
|
| def override_system_resolver(resolver: Optional[Resolver] = None) -> None: |
| """Override the system resolver routines in the socket module with |
| versions which use dnspython's resolver. |
| |
| This can be useful in testing situations where you want to control |
| the resolution behavior of python code without having to change |
| the system's resolver settings (e.g. /etc/resolv.conf). |
| |
| The resolver to use may be specified; if it's not, the default |
| resolver will be used. |
| |
| resolver, a ``dns.resolver.Resolver`` or ``None``, the resolver to use. |
| """ |
|
|
| if resolver is None: |
| resolver = get_default_resolver() |
| global _resolver |
| _resolver = resolver |
| socket.getaddrinfo = _getaddrinfo |
| socket.getnameinfo = _getnameinfo |
| socket.getfqdn = _getfqdn |
| socket.gethostbyname = _gethostbyname |
| socket.gethostbyname_ex = _gethostbyname_ex |
| socket.gethostbyaddr = _gethostbyaddr |
|
|
|
|
| def restore_system_resolver() -> None: |
| """Undo the effects of prior override_system_resolver().""" |
|
|
| global _resolver |
| _resolver = None |
| socket.getaddrinfo = _original_getaddrinfo |
| socket.getnameinfo = _original_getnameinfo |
| socket.getfqdn = _original_getfqdn |
| socket.gethostbyname = _original_gethostbyname |
| socket.gethostbyname_ex = _original_gethostbyname_ex |
| socket.gethostbyaddr = _original_gethostbyaddr |
|
|