| | """Markdown parser. |
| | |
| | Contains parser for md files. |
| | |
| | """ |
| | import re |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional, Tuple, Union, cast |
| |
|
| | import tiktoken |
| | from application.parser.file.base_parser import BaseParser |
| |
|
| |
|
| | class MarkdownParser(BaseParser): |
| | """Markdown parser. |
| | |
| | Extract text from markdown files. |
| | Returns dictionary with keys as headers and values as the text between headers. |
| | |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | *args: Any, |
| | remove_hyperlinks: bool = True, |
| | remove_images: bool = True, |
| | max_tokens: int = 2048, |
| | |
| | **kwargs: Any, |
| | ) -> None: |
| | """Init params.""" |
| | super().__init__(*args, **kwargs) |
| | self._remove_hyperlinks = remove_hyperlinks |
| | self._remove_images = remove_images |
| | self._max_tokens = max_tokens |
| | |
| |
|
| | def tups_chunk_append(self, tups: List[Tuple[Optional[str], str]], current_header: Optional[str], |
| | current_text: str): |
| | """Append to tups chunk.""" |
| | num_tokens = len(tiktoken.get_encoding("cl100k_base").encode(current_text)) |
| | if num_tokens > self._max_tokens: |
| | chunks = [current_text[i:i + self._max_tokens] for i in range(0, len(current_text), self._max_tokens)] |
| | for chunk in chunks: |
| | tups.append((current_header, chunk)) |
| | else: |
| | tups.append((current_header, current_text)) |
| | return tups |
| |
|
| | def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]: |
| | """Convert a markdown file to a dictionary. |
| | |
| | The keys are the headers and the values are the text under each header. |
| | |
| | """ |
| | markdown_tups: List[Tuple[Optional[str], str]] = [] |
| | lines = markdown_text.split("\n") |
| |
|
| | current_header = None |
| | current_text = "" |
| |
|
| | for line in lines: |
| | header_match = re.match(r"^#+\s", line) |
| | if header_match: |
| | if current_header is not None: |
| | if current_text == "" or None: |
| | continue |
| | markdown_tups = self.tups_chunk_append(markdown_tups, current_header, current_text) |
| |
|
| | current_header = line |
| | current_text = "" |
| | else: |
| | current_text += line + "\n" |
| | markdown_tups = self.tups_chunk_append(markdown_tups, current_header, current_text) |
| |
|
| | if current_header is not None: |
| | |
| | markdown_tups = [ |
| | (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value)) |
| | for key, value in markdown_tups |
| | ] |
| | else: |
| | markdown_tups = [ |
| | (key, re.sub("\n", "", value)) for key, value in markdown_tups |
| | ] |
| |
|
| | return markdown_tups |
| |
|
| | def remove_images(self, content: str) -> str: |
| | """Get a dictionary of a markdown file from its path.""" |
| | pattern = r"!{1}\[\[(.*)\]\]" |
| | content = re.sub(pattern, "", content) |
| | return content |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | def remove_hyperlinks(self, content: str) -> str: |
| | """Get a dictionary of a markdown file from its path.""" |
| | pattern = r"\[(.*?)\]\((.*?)\)" |
| | content = re.sub(pattern, r"\1", content) |
| | return content |
| |
|
| | def _init_parser(self) -> Dict: |
| | """Initialize the parser with the config.""" |
| | return {} |
| |
|
| | def parse_tups( |
| | self, filepath: Path, errors: str = "ignore" |
| | ) -> List[Tuple[Optional[str], str]]: |
| | """Parse file into tuples.""" |
| | with open(filepath, "r") as f: |
| | content = f.read() |
| | if self._remove_hyperlinks: |
| | content = self.remove_hyperlinks(content) |
| | if self._remove_images: |
| | content = self.remove_images(content) |
| | |
| | |
| | markdown_tups = self.markdown_to_tups(content) |
| | return markdown_tups |
| |
|
| | def parse_file( |
| | self, filepath: Path, errors: str = "ignore" |
| | ) -> Union[str, List[str]]: |
| | """Parse file into string.""" |
| | tups = self.parse_tups(filepath, errors=errors) |
| | results = [] |
| | |
| | for header, value in tups: |
| | if header is None: |
| | results.append(value) |
| | else: |
| | results.append(f"\n\n{header}\n{value}") |
| | return results |
| |
|