| |
| |
| |
|
|
| import socket |
| import time |
| from urllib.parse import urlparse |
|
|
| import dns.asyncbackend |
| import dns.inet |
| import dns.name |
| import dns.nameserver |
| import dns.query |
| import dns.rdtypes.svcbbase |
|
|
| |
| _local_resolver_name = dns.name.from_text("_dns.resolver.arpa") |
|
|
|
|
| |
| |
| |
| |
|
|
|
|
| class _SVCBInfo: |
| def __init__(self, bootstrap_address, port, hostname, nameservers): |
| self.bootstrap_address = bootstrap_address |
| self.port = port |
| self.hostname = hostname |
| self.nameservers = nameservers |
|
|
| def ddr_check_certificate(self, cert): |
| """Verify that the _SVCBInfo's address is in the cert's subjectAltName (SAN)""" |
| for name, value in cert["subjectAltName"]: |
| if name == "IP Address" and value == self.bootstrap_address: |
| return True |
| return False |
|
|
| def make_tls_context(self): |
| ssl = dns.query.ssl |
| ctx = ssl.create_default_context() |
| ctx.minimum_version = ssl.TLSVersion.TLSv1_2 |
| return ctx |
|
|
| def ddr_tls_check_sync(self, lifetime): |
| ctx = self.make_tls_context() |
| expiration = time.time() + lifetime |
| with socket.create_connection( |
| (self.bootstrap_address, self.port), lifetime |
| ) as s: |
| with ctx.wrap_socket(s, server_hostname=self.hostname) as ts: |
| ts.settimeout(dns.query._remaining(expiration)) |
| ts.do_handshake() |
| cert = ts.getpeercert() |
| return self.ddr_check_certificate(cert) |
|
|
| async def ddr_tls_check_async(self, lifetime, backend=None): |
| if backend is None: |
| backend = dns.asyncbackend.get_default_backend() |
| ctx = self.make_tls_context() |
| expiration = time.time() + lifetime |
| async with await backend.make_socket( |
| dns.inet.af_for_address(self.bootstrap_address), |
| socket.SOCK_STREAM, |
| 0, |
| None, |
| (self.bootstrap_address, self.port), |
| lifetime, |
| ctx, |
| self.hostname, |
| ) as ts: |
| cert = await ts.getpeercert(dns.query._remaining(expiration)) |
| return self.ddr_check_certificate(cert) |
|
|
|
|
| def _extract_nameservers_from_svcb(answer): |
| bootstrap_address = answer.nameserver |
| if not dns.inet.is_address(bootstrap_address): |
| return [] |
| infos = [] |
| for rr in answer.rrset.processing_order(): |
| nameservers = [] |
| param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.ALPN) |
| if param is None: |
| continue |
| alpns = set(param.ids) |
| host = rr.target.to_text(omit_final_dot=True) |
| port = None |
| param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.PORT) |
| if param is not None: |
| port = param.port |
| |
| |
| if b"h2" in alpns: |
| param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.DOHPATH) |
| if param is None or not param.value.endswith(b"{?dns}"): |
| continue |
| path = param.value[:-6].decode() |
| if not path.startswith("/"): |
| path = "/" + path |
| if port is None: |
| port = 443 |
| url = f"https://{host}:{port}{path}" |
| |
| try: |
| urlparse(url) |
| nameservers.append(dns.nameserver.DoHNameserver(url, bootstrap_address)) |
| except Exception: |
| |
| pass |
| if b"dot" in alpns: |
| if port is None: |
| port = 853 |
| nameservers.append( |
| dns.nameserver.DoTNameserver(bootstrap_address, port, host) |
| ) |
| if b"doq" in alpns: |
| if port is None: |
| port = 853 |
| nameservers.append( |
| dns.nameserver.DoQNameserver(bootstrap_address, port, True, host) |
| ) |
| if len(nameservers) > 0: |
| infos.append(_SVCBInfo(bootstrap_address, port, host, nameservers)) |
| return infos |
|
|
|
|
| def _get_nameservers_sync(answer, lifetime): |
| """Return a list of TLS-validated resolver nameservers extracted from an SVCB |
| answer.""" |
| nameservers = [] |
| infos = _extract_nameservers_from_svcb(answer) |
| for info in infos: |
| try: |
| if info.ddr_tls_check_sync(lifetime): |
| nameservers.extend(info.nameservers) |
| except Exception: |
| pass |
| return nameservers |
|
|
|
|
| async def _get_nameservers_async(answer, lifetime): |
| """Return a list of TLS-validated resolver nameservers extracted from an SVCB |
| answer.""" |
| nameservers = [] |
| infos = _extract_nameservers_from_svcb(answer) |
| for info in infos: |
| try: |
| if await info.ddr_tls_check_async(lifetime): |
| nameservers.extend(info.nameservers) |
| except Exception: |
| pass |
| return nameservers |
|
|