|
|
""" |
|
|
Semantic Chunking Utilities |
|
|
|
|
|
Strategies for splitting and merging document content |
|
|
into semantically meaningful chunks. |
|
|
""" |
|
|
|
|
|
import re |
|
|
from dataclasses import dataclass |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
|
from ..chunks.models import ( |
|
|
BoundingBox, |
|
|
ChunkType, |
|
|
DocumentChunk, |
|
|
) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ChunkingConfig: |
|
|
"""Configuration for semantic chunking.""" |
|
|
|
|
|
|
|
|
min_chunk_chars: int = 50 |
|
|
max_chunk_chars: int = 2000 |
|
|
target_chunk_chars: int = 500 |
|
|
|
|
|
|
|
|
overlap_chars: int = 100 |
|
|
|
|
|
|
|
|
split_on_headings: bool = True |
|
|
split_on_paragraphs: bool = True |
|
|
preserve_sentences: bool = True |
|
|
|
|
|
|
|
|
merge_small_chunks: bool = True |
|
|
merge_threshold_chars: int = 100 |
|
|
|
|
|
|
|
|
class SemanticChunker: |
|
|
""" |
|
|
Semantic chunking engine. |
|
|
|
|
|
Splits text into meaningful chunks based on document structure, |
|
|
headings, paragraphs, and sentence boundaries. |
|
|
""" |
|
|
|
|
|
|
|
|
HEADING_PATTERN = re.compile(r'^(?:#{1,6}\s+|[A-Z0-9][\.\)]\s+|\d+[\.\)]\s+)', re.MULTILINE) |
|
|
PARAGRAPH_PATTERN = re.compile(r'\n\s*\n') |
|
|
SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') |
|
|
|
|
|
def __init__(self, config: Optional[ChunkingConfig] = None): |
|
|
self.config = config or ChunkingConfig() |
|
|
|
|
|
def chunk_text( |
|
|
self, |
|
|
text: str, |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Split text into semantic chunks. |
|
|
|
|
|
Args: |
|
|
text: Input text to chunk |
|
|
metadata: Optional metadata to include with each chunk |
|
|
|
|
|
Returns: |
|
|
List of chunk dictionaries with text and metadata |
|
|
""" |
|
|
if not text or not text.strip(): |
|
|
return [] |
|
|
|
|
|
metadata = metadata or {} |
|
|
chunks: List[Dict[str, Any]] = [] |
|
|
|
|
|
|
|
|
if self.config.split_on_headings: |
|
|
sections = self._split_by_headings(text) |
|
|
else: |
|
|
sections = [{"heading": None, "text": text}] |
|
|
|
|
|
for section in sections: |
|
|
section_chunks = self._chunk_section( |
|
|
section["text"], |
|
|
section.get("heading"), |
|
|
) |
|
|
for chunk_text in section_chunks: |
|
|
if len(chunk_text.strip()) >= self.config.min_chunk_chars: |
|
|
chunks.append({ |
|
|
"text": chunk_text.strip(), |
|
|
"heading": section.get("heading"), |
|
|
**metadata, |
|
|
}) |
|
|
|
|
|
|
|
|
if self.config.merge_small_chunks: |
|
|
chunks = self._merge_small_chunks(chunks) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _split_by_headings(self, text: str) -> List[Dict[str, Any]]: |
|
|
"""Split text by heading patterns.""" |
|
|
sections = [] |
|
|
current_heading = None |
|
|
current_text = [] |
|
|
|
|
|
lines = text.split("\n") |
|
|
|
|
|
for line in lines: |
|
|
if self.HEADING_PATTERN.match(line): |
|
|
|
|
|
if current_text: |
|
|
sections.append({ |
|
|
"heading": current_heading, |
|
|
"text": "\n".join(current_text), |
|
|
}) |
|
|
current_heading = line.strip() |
|
|
current_text = [] |
|
|
else: |
|
|
current_text.append(line) |
|
|
|
|
|
|
|
|
if current_text: |
|
|
sections.append({ |
|
|
"heading": current_heading, |
|
|
"text": "\n".join(current_text), |
|
|
}) |
|
|
|
|
|
return sections if sections else [{"heading": None, "text": text}] |
|
|
|
|
|
def _chunk_section( |
|
|
self, |
|
|
text: str, |
|
|
heading: Optional[str], |
|
|
) -> List[str]: |
|
|
"""Chunk a single section.""" |
|
|
if len(text) <= self.config.max_chunk_chars: |
|
|
return [text] |
|
|
|
|
|
|
|
|
if self.config.split_on_paragraphs: |
|
|
paragraphs = self.PARAGRAPH_PATTERN.split(text) |
|
|
else: |
|
|
paragraphs = [text] |
|
|
|
|
|
chunks = [] |
|
|
current_chunk = "" |
|
|
|
|
|
for para in paragraphs: |
|
|
para = para.strip() |
|
|
if not para: |
|
|
continue |
|
|
|
|
|
|
|
|
if len(current_chunk) + len(para) + 1 <= self.config.target_chunk_chars: |
|
|
if current_chunk: |
|
|
current_chunk += "\n\n" + para |
|
|
else: |
|
|
current_chunk = para |
|
|
else: |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(current_chunk) |
|
|
|
|
|
|
|
|
if len(para) > self.config.max_chunk_chars: |
|
|
sub_chunks = self._split_long_text(para) |
|
|
chunks.extend(sub_chunks[:-1]) |
|
|
current_chunk = sub_chunks[-1] if sub_chunks else "" |
|
|
else: |
|
|
current_chunk = para |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(current_chunk) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _split_long_text(self, text: str) -> List[str]: |
|
|
"""Split long text by sentences.""" |
|
|
if not self.config.preserve_sentences: |
|
|
|
|
|
return self._split_by_chars(text) |
|
|
|
|
|
sentences = self.SENTENCE_PATTERN.split(text) |
|
|
chunks = [] |
|
|
current_chunk = "" |
|
|
|
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() |
|
|
if not sentence: |
|
|
continue |
|
|
|
|
|
if len(current_chunk) + len(sentence) + 1 <= self.config.target_chunk_chars: |
|
|
if current_chunk: |
|
|
current_chunk += " " + sentence |
|
|
else: |
|
|
current_chunk = sentence |
|
|
else: |
|
|
if current_chunk: |
|
|
chunks.append(current_chunk) |
|
|
|
|
|
if len(sentence) > self.config.max_chunk_chars: |
|
|
|
|
|
sub_chunks = self._split_by_chars(sentence) |
|
|
chunks.extend(sub_chunks[:-1]) |
|
|
current_chunk = sub_chunks[-1] if sub_chunks else "" |
|
|
else: |
|
|
current_chunk = sentence |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append(current_chunk) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _split_by_chars(self, text: str) -> List[str]: |
|
|
"""Split text by character count with overlap.""" |
|
|
chunks = [] |
|
|
start = 0 |
|
|
text_len = len(text) |
|
|
|
|
|
while start < text_len: |
|
|
end = min(start + self.config.target_chunk_chars, text_len) |
|
|
|
|
|
|
|
|
if end < text_len: |
|
|
|
|
|
space_idx = text.rfind(" ", start, end) |
|
|
if space_idx > start: |
|
|
end = space_idx |
|
|
|
|
|
chunks.append(text[start:end].strip()) |
|
|
|
|
|
|
|
|
start = end - self.config.overlap_chars |
|
|
if start < 0 or start >= text_len: |
|
|
break |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _merge_small_chunks( |
|
|
self, |
|
|
chunks: List[Dict[str, Any]], |
|
|
) -> List[Dict[str, Any]]: |
|
|
"""Merge chunks smaller than threshold.""" |
|
|
if not chunks: |
|
|
return chunks |
|
|
|
|
|
merged = [] |
|
|
current = None |
|
|
|
|
|
for chunk in chunks: |
|
|
text = chunk["text"] |
|
|
|
|
|
if current is None: |
|
|
current = chunk.copy() |
|
|
continue |
|
|
|
|
|
|
|
|
current_len = len(current["text"]) |
|
|
new_len = len(text) |
|
|
|
|
|
if (current_len < self.config.merge_threshold_chars and |
|
|
current_len + new_len <= self.config.max_chunk_chars and |
|
|
current.get("heading") == chunk.get("heading")): |
|
|
|
|
|
current["text"] = current["text"] + "\n\n" + text |
|
|
else: |
|
|
merged.append(current) |
|
|
current = chunk.copy() |
|
|
|
|
|
if current: |
|
|
merged.append(current) |
|
|
|
|
|
return merged |
|
|
|
|
|
|
|
|
class DocumentChunkBuilder: |
|
|
""" |
|
|
Builder for creating DocumentChunk objects. |
|
|
|
|
|
Provides a fluent interface for chunk construction with |
|
|
automatic ID generation and validation. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
doc_id: str, |
|
|
page: int, |
|
|
): |
|
|
self.doc_id = doc_id |
|
|
self.page = page |
|
|
self._chunks: List[DocumentChunk] = [] |
|
|
self._sequence_index = 0 |
|
|
|
|
|
def add_chunk( |
|
|
self, |
|
|
text: str, |
|
|
chunk_type: ChunkType, |
|
|
bbox: BoundingBox, |
|
|
confidence: float = 1.0, |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
) -> "DocumentChunkBuilder": |
|
|
"""Add a chunk.""" |
|
|
chunk_id = DocumentChunk.generate_chunk_id( |
|
|
doc_id=self.doc_id, |
|
|
page=self.page, |
|
|
bbox=bbox, |
|
|
chunk_type_str=chunk_type.value, |
|
|
) |
|
|
|
|
|
chunk = DocumentChunk( |
|
|
chunk_id=chunk_id, |
|
|
doc_id=self.doc_id, |
|
|
chunk_type=chunk_type, |
|
|
text=text, |
|
|
page=self.page, |
|
|
bbox=bbox, |
|
|
confidence=confidence, |
|
|
sequence_index=self._sequence_index, |
|
|
metadata=metadata or {}, |
|
|
) |
|
|
|
|
|
self._chunks.append(chunk) |
|
|
self._sequence_index += 1 |
|
|
return self |
|
|
|
|
|
def add_text( |
|
|
self, |
|
|
text: str, |
|
|
bbox: BoundingBox, |
|
|
confidence: float = 1.0, |
|
|
) -> "DocumentChunkBuilder": |
|
|
"""Add a text chunk.""" |
|
|
return self.add_chunk(text, ChunkType.TEXT, bbox, confidence) |
|
|
|
|
|
def add_title( |
|
|
self, |
|
|
text: str, |
|
|
bbox: BoundingBox, |
|
|
confidence: float = 1.0, |
|
|
) -> "DocumentChunkBuilder": |
|
|
"""Add a title chunk.""" |
|
|
return self.add_chunk(text, ChunkType.TITLE, bbox, confidence) |
|
|
|
|
|
def add_heading( |
|
|
self, |
|
|
text: str, |
|
|
bbox: BoundingBox, |
|
|
confidence: float = 1.0, |
|
|
) -> "DocumentChunkBuilder": |
|
|
"""Add a heading chunk.""" |
|
|
return self.add_chunk(text, ChunkType.HEADING, bbox, confidence) |
|
|
|
|
|
def add_paragraph( |
|
|
self, |
|
|
text: str, |
|
|
bbox: BoundingBox, |
|
|
confidence: float = 1.0, |
|
|
) -> "DocumentChunkBuilder": |
|
|
"""Add a paragraph chunk.""" |
|
|
return self.add_chunk(text, ChunkType.PARAGRAPH, bbox, confidence) |
|
|
|
|
|
def build(self) -> List[DocumentChunk]: |
|
|
"""Build and return the list of chunks.""" |
|
|
return self._chunks.copy() |
|
|
|
|
|
def reset(self) -> "DocumentChunkBuilder": |
|
|
"""Reset the builder.""" |
|
|
self._chunks = [] |
|
|
self._sequence_index = 0 |
|
|
return self |
|
|
|
|
|
|
|
|
def estimate_tokens(text: str) -> int: |
|
|
""" |
|
|
Estimate token count for text. |
|
|
|
|
|
Uses simple heuristic: ~4 characters per token. |
|
|
""" |
|
|
return len(text) // 4 |
|
|
|
|
|
|
|
|
def split_for_embedding( |
|
|
text: str, |
|
|
max_tokens: int = 512, |
|
|
overlap_tokens: int = 50, |
|
|
) -> List[str]: |
|
|
""" |
|
|
Split text for embedding model input. |
|
|
|
|
|
Args: |
|
|
text: Text to split |
|
|
max_tokens: Maximum tokens per chunk |
|
|
overlap_tokens: Overlap between chunks |
|
|
|
|
|
Returns: |
|
|
List of text chunks |
|
|
""" |
|
|
max_chars = max_tokens * 4 |
|
|
overlap_chars = overlap_tokens * 4 |
|
|
|
|
|
config = ChunkingConfig( |
|
|
max_chunk_chars=max_chars, |
|
|
target_chunk_chars=max_chars - 100, |
|
|
overlap_chars=overlap_chars, |
|
|
) |
|
|
|
|
|
chunker = SemanticChunker(config) |
|
|
chunks = chunker.chunk_text(text) |
|
|
|
|
|
return [c["text"] for c in chunks] |
|
|
|