| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """Tokenize DNS zone file format""" |
|
|
| import io |
| import sys |
| from typing import Any, List, Optional, Tuple |
|
|
| import dns.exception |
| import dns.name |
| import dns.ttl |
|
|
| _DELIMITERS = {" ", "\t", "\n", ";", "(", ")", '"'} |
| _QUOTING_DELIMITERS = {'"'} |
|
|
| EOF = 0 |
| EOL = 1 |
| WHITESPACE = 2 |
| IDENTIFIER = 3 |
| QUOTED_STRING = 4 |
| COMMENT = 5 |
| DELIMITER = 6 |
|
|
|
|
| class UngetBufferFull(dns.exception.DNSException): |
| """An attempt was made to unget a token when the unget buffer was full.""" |
|
|
|
|
| class Token: |
| """A DNS zone file format token. |
| |
| ttype: The token type |
| value: The token value |
| has_escape: Does the token value contain escapes? |
| """ |
|
|
| def __init__( |
| self, |
| ttype: int, |
| value: Any = "", |
| has_escape: bool = False, |
| comment: Optional[str] = None, |
| ): |
| """Initialize a token instance.""" |
|
|
| self.ttype = ttype |
| self.value = value |
| self.has_escape = has_escape |
| self.comment = comment |
|
|
| def is_eof(self) -> bool: |
| return self.ttype == EOF |
|
|
| def is_eol(self) -> bool: |
| return self.ttype == EOL |
|
|
| def is_whitespace(self) -> bool: |
| return self.ttype == WHITESPACE |
|
|
| def is_identifier(self) -> bool: |
| return self.ttype == IDENTIFIER |
|
|
| def is_quoted_string(self) -> bool: |
| return self.ttype == QUOTED_STRING |
|
|
| def is_comment(self) -> bool: |
| return self.ttype == COMMENT |
|
|
| def is_delimiter(self) -> bool: |
| return self.ttype == DELIMITER |
|
|
| def is_eol_or_eof(self) -> bool: |
| return self.ttype == EOL or self.ttype == EOF |
|
|
| def __eq__(self, other): |
| if not isinstance(other, Token): |
| return False |
| return self.ttype == other.ttype and self.value == other.value |
|
|
| def __ne__(self, other): |
| if not isinstance(other, Token): |
| return True |
| return self.ttype != other.ttype or self.value != other.value |
|
|
| def __str__(self): |
| return '%d "%s"' % (self.ttype, self.value) |
|
|
| def unescape(self) -> "Token": |
| if not self.has_escape: |
| return self |
| unescaped = "" |
| l = len(self.value) |
| i = 0 |
| while i < l: |
| c = self.value[i] |
| i += 1 |
| if c == "\\": |
| if i >= l: |
| raise dns.exception.UnexpectedEnd |
| c = self.value[i] |
| i += 1 |
| if c.isdigit(): |
| if i >= l: |
| raise dns.exception.UnexpectedEnd |
| c2 = self.value[i] |
| i += 1 |
| if i >= l: |
| raise dns.exception.UnexpectedEnd |
| c3 = self.value[i] |
| i += 1 |
| if not (c2.isdigit() and c3.isdigit()): |
| raise dns.exception.SyntaxError |
| codepoint = int(c) * 100 + int(c2) * 10 + int(c3) |
| if codepoint > 255: |
| raise dns.exception.SyntaxError |
| c = chr(codepoint) |
| unescaped += c |
| return Token(self.ttype, unescaped) |
|
|
| def unescape_to_bytes(self) -> "Token": |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| unescaped = b"" |
| l = len(self.value) |
| i = 0 |
| while i < l: |
| c = self.value[i] |
| i += 1 |
| if c == "\\": |
| if i >= l: |
| raise dns.exception.UnexpectedEnd |
| c = self.value[i] |
| i += 1 |
| if c.isdigit(): |
| if i >= l: |
| raise dns.exception.UnexpectedEnd |
| c2 = self.value[i] |
| i += 1 |
| if i >= l: |
| raise dns.exception.UnexpectedEnd |
| c3 = self.value[i] |
| i += 1 |
| if not (c2.isdigit() and c3.isdigit()): |
| raise dns.exception.SyntaxError |
| codepoint = int(c) * 100 + int(c2) * 10 + int(c3) |
| if codepoint > 255: |
| raise dns.exception.SyntaxError |
| unescaped += b"%c" % (codepoint) |
| else: |
| |
| |
| |
| |
| |
| unescaped += c.encode() |
| else: |
| unescaped += c.encode() |
| return Token(self.ttype, bytes(unescaped)) |
|
|
|
|
| class Tokenizer: |
| """A DNS zone file format tokenizer. |
| |
| A token object is basically a (type, value) tuple. The valid |
| types are EOF, EOL, WHITESPACE, IDENTIFIER, QUOTED_STRING, |
| COMMENT, and DELIMITER. |
| |
| file: The file to tokenize |
| |
| ungotten_char: The most recently ungotten character, or None. |
| |
| ungotten_token: The most recently ungotten token, or None. |
| |
| multiline: The current multiline level. This value is increased |
| by one every time a '(' delimiter is read, and decreased by one every time |
| a ')' delimiter is read. |
| |
| quoting: This variable is true if the tokenizer is currently |
| reading a quoted string. |
| |
| eof: This variable is true if the tokenizer has encountered EOF. |
| |
| delimiters: The current delimiter dictionary. |
| |
| line_number: The current line number |
| |
| filename: A filename that will be returned by the where() method. |
| |
| idna_codec: A dns.name.IDNACodec, specifies the IDNA |
| encoder/decoder. If None, the default IDNA 2003 |
| encoder/decoder is used. |
| """ |
|
|
| def __init__( |
| self, |
| f: Any = sys.stdin, |
| filename: Optional[str] = None, |
| idna_codec: Optional[dns.name.IDNACodec] = None, |
| ): |
| """Initialize a tokenizer instance. |
| |
| f: The file to tokenize. The default is sys.stdin. |
| This parameter may also be a string, in which case the tokenizer |
| will take its input from the contents of the string. |
| |
| filename: the name of the filename that the where() method |
| will return. |
| |
| idna_codec: A dns.name.IDNACodec, specifies the IDNA |
| encoder/decoder. If None, the default IDNA 2003 |
| encoder/decoder is used. |
| """ |
|
|
| if isinstance(f, str): |
| f = io.StringIO(f) |
| if filename is None: |
| filename = "<string>" |
| elif isinstance(f, bytes): |
| f = io.StringIO(f.decode()) |
| if filename is None: |
| filename = "<string>" |
| else: |
| if filename is None: |
| if f is sys.stdin: |
| filename = "<stdin>" |
| else: |
| filename = "<file>" |
| self.file = f |
| self.ungotten_char: Optional[str] = None |
| self.ungotten_token: Optional[Token] = None |
| self.multiline = 0 |
| self.quoting = False |
| self.eof = False |
| self.delimiters = _DELIMITERS |
| self.line_number = 1 |
| assert filename is not None |
| self.filename = filename |
| if idna_codec is None: |
| self.idna_codec: dns.name.IDNACodec = dns.name.IDNA_2003 |
| else: |
| self.idna_codec = idna_codec |
|
|
| def _get_char(self) -> str: |
| """Read a character from input.""" |
|
|
| if self.ungotten_char is None: |
| if self.eof: |
| c = "" |
| else: |
| c = self.file.read(1) |
| if c == "": |
| self.eof = True |
| elif c == "\n": |
| self.line_number += 1 |
| else: |
| c = self.ungotten_char |
| self.ungotten_char = None |
| return c |
|
|
| def where(self) -> Tuple[str, int]: |
| """Return the current location in the input. |
| |
| Returns a (string, int) tuple. The first item is the filename of |
| the input, the second is the current line number. |
| """ |
|
|
| return (self.filename, self.line_number) |
|
|
| def _unget_char(self, c: str) -> None: |
| """Unget a character. |
| |
| The unget buffer for characters is only one character large; it is |
| an error to try to unget a character when the unget buffer is not |
| empty. |
| |
| c: the character to unget |
| raises UngetBufferFull: there is already an ungotten char |
| """ |
|
|
| if self.ungotten_char is not None: |
| |
| raise UngetBufferFull |
| self.ungotten_char = c |
|
|
| def skip_whitespace(self) -> int: |
| """Consume input until a non-whitespace character is encountered. |
| |
| The non-whitespace character is then ungotten, and the number of |
| whitespace characters consumed is returned. |
| |
| If the tokenizer is in multiline mode, then newlines are whitespace. |
| |
| Returns the number of characters skipped. |
| """ |
|
|
| skipped = 0 |
| while True: |
| c = self._get_char() |
| if c != " " and c != "\t": |
| if (c != "\n") or not self.multiline: |
| self._unget_char(c) |
| return skipped |
| skipped += 1 |
|
|
| def get(self, want_leading: bool = False, want_comment: bool = False) -> Token: |
| """Get the next token. |
| |
| want_leading: If True, return a WHITESPACE token if the |
| first character read is whitespace. The default is False. |
| |
| want_comment: If True, return a COMMENT token if the |
| first token read is a comment. The default is False. |
| |
| Raises dns.exception.UnexpectedEnd: input ended prematurely |
| |
| Raises dns.exception.SyntaxError: input was badly formed |
| |
| Returns a Token. |
| """ |
|
|
| if self.ungotten_token is not None: |
| utoken = self.ungotten_token |
| self.ungotten_token = None |
| if utoken.is_whitespace(): |
| if want_leading: |
| return utoken |
| elif utoken.is_comment(): |
| if want_comment: |
| return utoken |
| else: |
| return utoken |
| skipped = self.skip_whitespace() |
| if want_leading and skipped > 0: |
| return Token(WHITESPACE, " ") |
| token = "" |
| ttype = IDENTIFIER |
| has_escape = False |
| while True: |
| c = self._get_char() |
| if c == "" or c in self.delimiters: |
| if c == "" and self.quoting: |
| raise dns.exception.UnexpectedEnd |
| if token == "" and ttype != QUOTED_STRING: |
| if c == "(": |
| self.multiline += 1 |
| self.skip_whitespace() |
| continue |
| elif c == ")": |
| if self.multiline <= 0: |
| raise dns.exception.SyntaxError |
| self.multiline -= 1 |
| self.skip_whitespace() |
| continue |
| elif c == '"': |
| if not self.quoting: |
| self.quoting = True |
| self.delimiters = _QUOTING_DELIMITERS |
| ttype = QUOTED_STRING |
| continue |
| else: |
| self.quoting = False |
| self.delimiters = _DELIMITERS |
| self.skip_whitespace() |
| continue |
| elif c == "\n": |
| return Token(EOL, "\n") |
| elif c == ";": |
| while 1: |
| c = self._get_char() |
| if c == "\n" or c == "": |
| break |
| token += c |
| if want_comment: |
| self._unget_char(c) |
| return Token(COMMENT, token) |
| elif c == "": |
| if self.multiline: |
| raise dns.exception.SyntaxError( |
| "unbalanced parentheses" |
| ) |
| return Token(EOF, comment=token) |
| elif self.multiline: |
| self.skip_whitespace() |
| token = "" |
| continue |
| else: |
| return Token(EOL, "\n", comment=token) |
| else: |
| |
| |
| |
| token = c |
| ttype = DELIMITER |
| else: |
| self._unget_char(c) |
| break |
| elif self.quoting and c == "\n": |
| raise dns.exception.SyntaxError("newline in quoted string") |
| elif c == "\\": |
| |
| |
| |
| |
| token += c |
| has_escape = True |
| c = self._get_char() |
| if c == "" or (c == "\n" and not self.quoting): |
| raise dns.exception.UnexpectedEnd |
| token += c |
| if token == "" and ttype != QUOTED_STRING: |
| if self.multiline: |
| raise dns.exception.SyntaxError("unbalanced parentheses") |
| ttype = EOF |
| return Token(ttype, token, has_escape) |
|
|
| def unget(self, token: Token) -> None: |
| """Unget a token. |
| |
| The unget buffer for tokens is only one token large; it is |
| an error to try to unget a token when the unget buffer is not |
| empty. |
| |
| token: the token to unget |
| |
| Raises UngetBufferFull: there is already an ungotten token |
| """ |
|
|
| if self.ungotten_token is not None: |
| raise UngetBufferFull |
| self.ungotten_token = token |
|
|
| def next(self): |
| """Return the next item in an iteration. |
| |
| Returns a Token. |
| """ |
|
|
| token = self.get() |
| if token.is_eof(): |
| raise StopIteration |
| return token |
|
|
| __next__ = next |
|
|
| def __iter__(self): |
| return self |
|
|
| |
|
|
| def get_int(self, base: int = 10) -> int: |
| """Read the next token and interpret it as an unsigned integer. |
| |
| Raises dns.exception.SyntaxError if not an unsigned integer. |
| |
| Returns an int. |
| """ |
|
|
| token = self.get().unescape() |
| if not token.is_identifier(): |
| raise dns.exception.SyntaxError("expecting an identifier") |
| if not token.value.isdigit(): |
| raise dns.exception.SyntaxError("expecting an integer") |
| return int(token.value, base) |
|
|
| def get_uint8(self) -> int: |
| """Read the next token and interpret it as an 8-bit unsigned |
| integer. |
| |
| Raises dns.exception.SyntaxError if not an 8-bit unsigned integer. |
| |
| Returns an int. |
| """ |
|
|
| value = self.get_int() |
| if value < 0 or value > 255: |
| raise dns.exception.SyntaxError( |
| "%d is not an unsigned 8-bit integer" % value |
| ) |
| return value |
|
|
| def get_uint16(self, base: int = 10) -> int: |
| """Read the next token and interpret it as a 16-bit unsigned |
| integer. |
| |
| Raises dns.exception.SyntaxError if not a 16-bit unsigned integer. |
| |
| Returns an int. |
| """ |
|
|
| value = self.get_int(base=base) |
| if value < 0 or value > 65535: |
| if base == 8: |
| raise dns.exception.SyntaxError( |
| "%o is not an octal unsigned 16-bit integer" % value |
| ) |
| else: |
| raise dns.exception.SyntaxError( |
| "%d is not an unsigned 16-bit integer" % value |
| ) |
| return value |
|
|
| def get_uint32(self, base: int = 10) -> int: |
| """Read the next token and interpret it as a 32-bit unsigned |
| integer. |
| |
| Raises dns.exception.SyntaxError if not a 32-bit unsigned integer. |
| |
| Returns an int. |
| """ |
|
|
| value = self.get_int(base=base) |
| if value < 0 or value > 4294967295: |
| raise dns.exception.SyntaxError( |
| "%d is not an unsigned 32-bit integer" % value |
| ) |
| return value |
|
|
| def get_uint48(self, base: int = 10) -> int: |
| """Read the next token and interpret it as a 48-bit unsigned |
| integer. |
| |
| Raises dns.exception.SyntaxError if not a 48-bit unsigned integer. |
| |
| Returns an int. |
| """ |
|
|
| value = self.get_int(base=base) |
| if value < 0 or value > 281474976710655: |
| raise dns.exception.SyntaxError( |
| "%d is not an unsigned 48-bit integer" % value |
| ) |
| return value |
|
|
| def get_string(self, max_length: Optional[int] = None) -> str: |
| """Read the next token and interpret it as a string. |
| |
| Raises dns.exception.SyntaxError if not a string. |
| Raises dns.exception.SyntaxError if token value length |
| exceeds max_length (if specified). |
| |
| Returns a string. |
| """ |
|
|
| token = self.get().unescape() |
| if not (token.is_identifier() or token.is_quoted_string()): |
| raise dns.exception.SyntaxError("expecting a string") |
| if max_length and len(token.value) > max_length: |
| raise dns.exception.SyntaxError("string too long") |
| return token.value |
|
|
| def get_identifier(self) -> str: |
| """Read the next token, which should be an identifier. |
| |
| Raises dns.exception.SyntaxError if not an identifier. |
| |
| Returns a string. |
| """ |
|
|
| token = self.get().unescape() |
| if not token.is_identifier(): |
| raise dns.exception.SyntaxError("expecting an identifier") |
| return token.value |
|
|
| def get_remaining(self, max_tokens: Optional[int] = None) -> List[Token]: |
| """Return the remaining tokens on the line, until an EOL or EOF is seen. |
| |
| max_tokens: If not None, stop after this number of tokens. |
| |
| Returns a list of tokens. |
| """ |
|
|
| tokens = [] |
| while True: |
| token = self.get() |
| if token.is_eol_or_eof(): |
| self.unget(token) |
| break |
| tokens.append(token) |
| if len(tokens) == max_tokens: |
| break |
| return tokens |
|
|
| def concatenate_remaining_identifiers(self, allow_empty: bool = False) -> str: |
| """Read the remaining tokens on the line, which should be identifiers. |
| |
| Raises dns.exception.SyntaxError if there are no remaining tokens, |
| unless `allow_empty=True` is given. |
| |
| Raises dns.exception.SyntaxError if a token is seen that is not an |
| identifier. |
| |
| Returns a string containing a concatenation of the remaining |
| identifiers. |
| """ |
| s = "" |
| while True: |
| token = self.get().unescape() |
| if token.is_eol_or_eof(): |
| self.unget(token) |
| break |
| if not token.is_identifier(): |
| raise dns.exception.SyntaxError |
| s += token.value |
| if not (allow_empty or s): |
| raise dns.exception.SyntaxError("expecting another identifier") |
| return s |
|
|
| def as_name( |
| self, |
| token: Token, |
| origin: Optional[dns.name.Name] = None, |
| relativize: bool = False, |
| relativize_to: Optional[dns.name.Name] = None, |
| ) -> dns.name.Name: |
| """Try to interpret the token as a DNS name. |
| |
| Raises dns.exception.SyntaxError if not a name. |
| |
| Returns a dns.name.Name. |
| """ |
| if not token.is_identifier(): |
| raise dns.exception.SyntaxError("expecting an identifier") |
| name = dns.name.from_text(token.value, origin, self.idna_codec) |
| return name.choose_relativity(relativize_to or origin, relativize) |
|
|
| def get_name( |
| self, |
| origin: Optional[dns.name.Name] = None, |
| relativize: bool = False, |
| relativize_to: Optional[dns.name.Name] = None, |
| ) -> dns.name.Name: |
| """Read the next token and interpret it as a DNS name. |
| |
| Raises dns.exception.SyntaxError if not a name. |
| |
| Returns a dns.name.Name. |
| """ |
|
|
| token = self.get() |
| return self.as_name(token, origin, relativize, relativize_to) |
|
|
| def get_eol_as_token(self) -> Token: |
| """Read the next token and raise an exception if it isn't EOL or |
| EOF. |
| |
| Returns a string. |
| """ |
|
|
| token = self.get() |
| if not token.is_eol_or_eof(): |
| raise dns.exception.SyntaxError( |
| 'expected EOL or EOF, got %d "%s"' % (token.ttype, token.value) |
| ) |
| return token |
|
|
| def get_eol(self) -> str: |
| return self.get_eol_as_token().value |
|
|
| def get_ttl(self) -> int: |
| """Read the next token and interpret it as a DNS TTL. |
| |
| Raises dns.exception.SyntaxError or dns.ttl.BadTTL if not an |
| identifier or badly formed. |
| |
| Returns an int. |
| """ |
|
|
| token = self.get().unescape() |
| if not token.is_identifier(): |
| raise dns.exception.SyntaxError("expecting an identifier") |
| return dns.ttl.from_text(token.value) |
|
|