File size: 11,796 Bytes
c7256ee | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 | from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
SentenceTransformersTokenTextSplitter,
NLTKTextSplitter
)
from langchain_experimental.text_splitter import SemanticChunker
from langchain_huggingface import HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any, Optional
import nltk
nltk.download('punkt_tab', quiet=True)
import pandas as pd
import re
class MarkdownTextSplitter:
"""
Custom markdown header chunking strategy.
Splits text by headers in a hierarchical manner:
- First checks h1 (#) headers
- If h1 content <= max_chars, accepts it as a chunk
- If h1 content > max_chars, splits into h2 headers
- If any h2 > max_chars, splits into h3, and so on
"""
def __init__(self, max_chars: int = 4000):
self.max_chars = max_chars
self.headers = ["\n# ", "\n## ", "\n### ", "\n#### "]
def split_text(self, text: str) -> List[str]:
"""Split text using markdown header hierarchy."""
return self._split_by_header(text, 0)
def _split_by_header(self, content: str, header_level: int) -> List[str]:
"""
Recursively split content by header levels.
Args:
content: The text content to split
header_level: Current header level (0=h1, 1=h2, etc.)
Returns:
List of text chunks
"""
# If content is within limit, return it as is
if len(content) <= self.max_chars:
return [content]
# If we've exhausted all header levels, return as single chunk
if header_level >= len(self.headers):
return [content]
# Split by current header level
header = self.headers[header_level]
parts = re.split(f'(?={re.escape(header)})', content)
# If no split occurred (no headers found at this level), try next level
if len(parts) == 1:
return self._split_by_header(content, header_level + 1)
result = []
accumulated = ""
for i, part in enumerate(parts):
# If this single part is too large, recursively split it with next header level
if len(part) > self.max_chars:
# First, flush any accumulated content
if accumulated:
result.append(accumulated)
accumulated = ""
# Then recursively split this large part with deeper headers
result.extend(self._split_by_header(part, header_level + 1))
# If adding this part would exceed limit, flush accumulated and start new
elif accumulated and len(accumulated) + len(part) > self.max_chars:
result.append(accumulated)
accumulated = part
# Accumulate parts that fit together
else:
accumulated += part
# Don't forget the last accumulated part
if accumulated:
result.append(accumulated)
return result
class ChunkProcessor:
def __init__(self, model_name='all-MiniLM-L6-v2', verbose: bool = True, load_hf_embeddings: bool = False):
self.model_name = model_name
self._use_remote_code = self._requires_remote_code(model_name)
st_kwargs = {"trust_remote_code": True} if self._use_remote_code else {}
self.encoder = SentenceTransformer(model_name, **st_kwargs)
self.verbose = verbose
hf_kwargs = {"model_kwargs": {"trust_remote_code": True}} if self._use_remote_code else {}
self.hf_embeddings = HuggingFaceEmbeddings(model_name=model_name, **hf_kwargs) if load_hf_embeddings else None
def _requires_remote_code(self, model_name: str) -> bool:
normalized = (model_name or "").strip().lower()
return normalized.startswith("jinaai/")
def _get_hf_embeddings(self):
if self.hf_embeddings is None:
hf_kwargs = {"model_kwargs": {"trust_remote_code": True}} if self._use_remote_code else {}
self.hf_embeddings = HuggingFaceEmbeddings(model_name=self.model_name, **hf_kwargs)
return self.hf_embeddings
# ------------------------------------------------------------------
# Splitters
# ------------------------------------------------------------------
def get_splitter(self, technique: str, chunk_size: int = 500, chunk_overlap: int = 50, **kwargs):
"""
Factory method to return different chunking strategies.
Strategies:
- "fixed": Character-based, may split mid-sentence
- "recursive": Recursive character splitting with hierarchical separators
- "character": Character-based splitting on paragraph boundaries
- "paragraph": Paragraph-level splitting on \\n\\n boundaries
- "sentence": Sliding window over NLTK sentences
- "semantic": Embedding-based semantic chunking
- "page": Page-level splitting on page markers
"""
if technique == "fixed":
return CharacterTextSplitter(
separator=kwargs.get('separator', ""),
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False
)
elif technique == "recursive":
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=kwargs.get('separators', ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " ", ""]),
length_function=len,
keep_separator=kwargs.get('keep_separator', True)
)
elif technique == "character":
return CharacterTextSplitter(
separator=kwargs.get('separator', "\n\n"),
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False
)
elif technique == "paragraph":
# Paragraph-level chunking using paragraph breaks
return CharacterTextSplitter(
separator=kwargs.get('separator', "\n\n"),
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False
)
elif technique == "sentence":
# sentence-level chunking using NLTK
return NLTKTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separator="\n"
)
elif technique == "semantic":
return SemanticChunker(
self._get_hf_embeddings(),
breakpoint_threshold_type=kwargs.get('breakpoint_threshold_type', "percentile"),
# Using 70 because 95 was giving way too big chunks
breakpoint_threshold_amount=kwargs.get('breakpoint_threshold_amount', 70)
)
elif technique == "page":
# Page-level chunking using page markers
return CharacterTextSplitter(
separator=kwargs.get('separator', "--- Page"),
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
is_separator_regex=False
)
elif technique == "markdown":
# Markdown header chunking - splits by headers with max char limit
return MarkdownTextSplitter(max_chars=chunk_size)
else:
raise ValueError(f"Technique '{technique}' is not supported. Choose from: fixed, recursive, character, paragraph, sentence, semantic, page, markdown")
# ------------------------------------------------------------------
# Processing
# ------------------------------------------------------------------
def process(self, df: pd.DataFrame, technique: str = "recursive", chunk_size: int = 500,
chunk_overlap: int = 50, max_docs: Optional[int] = 5,
verbose: Optional[bool] = None, **kwargs) -> List[Dict[str, Any]]:
"""
Processes a DataFrame into vector-ready chunks.
Args:
df: DataFrame with columns: id, title, url, full_text
technique: Chunking strategy to use
chunk_size: Maximum size of each chunk in characters
chunk_overlap: Overlap between consecutive chunks
max_docs: Number of documents to process (None for all)
verbose: Override instance verbose setting
**kwargs: Additional arguments passed to the splitter
Returns:
List of chunk dicts with embeddings and metadata
"""
should_print = verbose if verbose is not None else self.verbose
required_cols = ['id', 'title', 'url', 'full_text']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"DataFrame missing required columns: {missing_cols}")
splitter = self.get_splitter(technique, chunk_size, chunk_overlap, **kwargs)
subset_df = df.head(max_docs) if max_docs else df
processed_chunks = []
for _, row in subset_df.iterrows():
if should_print:
self._print_document_header(row['title'], row['url'], technique, chunk_size, chunk_overlap)
raw_chunks = splitter.split_text(row['full_text'])
for i, text in enumerate(raw_chunks):
content = text.page_content if hasattr(text, 'page_content') else text
if should_print:
self._print_chunk(i, content)
processed_chunks.append({
"id": f"{row['id']}-chunk-{i}",
"values": self.encoder.encode(content).tolist(),
"metadata": {
"title": row['title'],
"text": content,
"url": row['url'],
"chunk_index": i,
"technique": technique,
"chunk_size": len(content),
"total_chunks": len(raw_chunks)
}
})
if should_print:
self._print_document_summary(len(raw_chunks))
if should_print:
self._print_processing_summary(len(subset_df), processed_chunks)
return processed_chunks
# ------------------------------------------------------------------
# Printing
# ------------------------------------------------------------------
def _print_document_header(self, title, url, technique, chunk_size, chunk_overlap):
print("\n" + "="*80)
print(f"DOCUMENT: {title}")
print(f"URL: {url}")
print(f"Technique: {technique.upper()} | Chunk Size: {chunk_size} | Overlap: {chunk_overlap}")
print("-" * 80)
def _print_chunk(self, index, content):
print(f"\n[Chunk {index}] ({len(content)} chars):")
print(f" {content}")
def _print_document_summary(self, num_chunks):
print(f"Total Chunks Generated: {num_chunks}")
print("="*80)
def _print_processing_summary(self, num_docs, processed_chunks):
print(f"\nFinished processing {num_docs} documents into {len(processed_chunks)} chunks.")
if processed_chunks:
avg = sum(c['metadata']['chunk_size'] for c in processed_chunks) / len(processed_chunks)
print(f"Average chunk size: {avg:.0f} chars") |