|
|
import os
|
|
|
import json
|
|
|
import numpy as np
|
|
|
from PIL import Image
|
|
|
import pandas as pd
|
|
|
from IPython.display import Image
|
|
|
from ultralytics import YOLO
|
|
|
import torch
|
|
|
from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments
|
|
|
from datasets import load_dataset
|
|
|
import cv2
|
|
|
import pytesseract
|
|
|
from PIL import Image, ImageEnhance
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
|
pytesseract.pytesseract.tesseract_cmd = r'C:/Program Files/Tesseract-OCR/tesseract.exe'
|
|
|
|
|
|
def ocr_core(image):
|
|
|
|
|
|
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
|
|
|
df = pd.DataFrame(data)
|
|
|
df = df[df['conf'] != -1]
|
|
|
df['left_diff'] = df.groupby('block_num')['left'].diff().fillna(0).astype(int)
|
|
|
df['prev_width'] = df['width'].shift(1).fillna(0).astype(int)
|
|
|
df['spacing'] = (df['left_diff'] - df['prev_width']).fillna(0).astype(int)
|
|
|
df['text'] = df.apply(lambda x: '\n' + x['text'] if (x['word_num'] == 1) & (x['block_num'] != 1) else x['text'], axis=1)
|
|
|
df['text'] = df.apply(lambda x: ',' + x['text'] if x['spacing'] > 100 else x['text'], axis=1)
|
|
|
ocr_text = ""
|
|
|
for text in df['text']:
|
|
|
ocr_text += text + ' '
|
|
|
return ocr_text
|
|
|
|
|
|
def improve_ocr_accuracy(img):
|
|
|
|
|
|
img =Image.open(img)
|
|
|
|
|
|
|
|
|
img = img.resize((img.width * 4, img.height * 4))
|
|
|
|
|
|
|
|
|
enhancer = ImageEnhance.Contrast(img)
|
|
|
img = enhancer.enhance(2)
|
|
|
|
|
|
_, thresh = cv2.threshold(np.array(img), 127, 255, cv2.THRESH_BINARY_INV)
|
|
|
|
|
|
return thresh
|
|
|
|
|
|
|
|
|
def create_ocr_outputs():
|
|
|
directory_path = os.getcwd() + '/data/processed/hand_labeled_tables/hand_labeled_tables'
|
|
|
|
|
|
for root, dirs, files in os.walk(directory_path):
|
|
|
|
|
|
print(f"Current directory: {root}")
|
|
|
|
|
|
|
|
|
print("Subdirectories:")
|
|
|
for dir in dirs:
|
|
|
print(f"- {dir}")
|
|
|
|
|
|
|
|
|
print("Files:")
|
|
|
for image_path in files:
|
|
|
print(f"- {image_path}")
|
|
|
full_path = os.path.join(root, image_path)
|
|
|
|
|
|
preprocessed_image = improve_ocr_accuracy(full_path)
|
|
|
|
|
|
ocr_text = ocr_core(preprocessed_image)
|
|
|
with open(os.getcwd() + f"/data/processed/annotations/{image_path.split('.')[0]}.txt", 'wb') as f:
|
|
|
f.write(ocr_text.encode('utf-8'))
|
|
|
|
|
|
print("\n")
|
|
|
|
|
|
|
|
|
def prepare_dataset(ocr_dir, csv_dir, output_file):
|
|
|
with open(output_file, 'w', encoding='utf-8') as jsonl_file:
|
|
|
for filename in os.listdir(ocr_dir):
|
|
|
if filename.endswith('.txt'):
|
|
|
ocr_path = os.path.join(ocr_dir, filename)
|
|
|
csv_path = os.path.join(csv_dir, filename)
|
|
|
print(csv_path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with open(ocr_path, 'r', encoding='utf-8') as ocr_file:
|
|
|
ocr_text = ocr_file.read()
|
|
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as csv_file:
|
|
|
csv_text = csv_file.read()
|
|
|
|
|
|
json_object = {
|
|
|
"prompt": ocr_text,
|
|
|
"completion": csv_text
|
|
|
}
|
|
|
jsonl_file.write(json.dumps(json_object) + '\n')
|
|
|
|
|
|
def tokenize_function(examples):
|
|
|
|
|
|
inputs = tokenizer(examples['prompt'], truncation=True, padding='max_length', max_length=1012)
|
|
|
|
|
|
|
|
|
inputs['labels'] = inputs['input_ids'].copy()
|
|
|
return inputs
|
|
|
|
|
|
|
|
|
if __name__ == '__name__':
|
|
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
|
print(f"Using device: {device}")
|
|
|
|
|
|
|
|
|
model = YOLO('yolov8l.pt')
|
|
|
|
|
|
|
|
|
results = model.train(
|
|
|
data='config.yaml',
|
|
|
epochs=10,
|
|
|
imgsz=640,
|
|
|
batch=8,
|
|
|
name='yolov8l_custom',
|
|
|
device=device
|
|
|
)
|
|
|
|
|
|
|
|
|
metrics = model.val()
|
|
|
print(metrics.box.map)
|
|
|
torch.save(model, os.getcwd() + '/models/trained_yolov8.pt')
|
|
|
|
|
|
create_ocr_outputs()
|
|
|
|
|
|
|
|
|
ocr_dir = os.getcwd() + '/data/processed/annotations'
|
|
|
csv_dir = os.getcwd() + '/data/processed/hand_labeled_tables'
|
|
|
output_file = 'dataset.jsonl'
|
|
|
prepare_dataset(ocr_dir, csv_dir, output_file)
|
|
|
|
|
|
|
|
|
|
|
|
dataset = load_dataset('json', data_files={'train': 'dataset.jsonl'})
|
|
|
dataset = dataset['train'].train_test_split(test_size=0.1)
|
|
|
|
|
|
|
|
|
model_name = 'gpt2'
|
|
|
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
|
|
|
|
|
|
|
|
|
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
|
|
|
|
|
|
tokenized_dataset = dataset.map(tokenize_function, batched=True)
|
|
|
|
|
|
|
|
|
model = GPT2LMHeadModel.from_pretrained(model_name)
|
|
|
|
|
|
|
|
|
model.resize_token_embeddings(len(tokenizer))
|
|
|
|
|
|
training_args = TrainingArguments(
|
|
|
output_dir='./results',
|
|
|
num_train_epochs=3,
|
|
|
per_device_train_batch_size=2,
|
|
|
per_device_eval_batch_size=2,
|
|
|
warmup_steps=500,
|
|
|
weight_decay=0.01,
|
|
|
logging_dir='./logs',
|
|
|
logging_steps=10,
|
|
|
evaluation_strategy="epoch",
|
|
|
save_strategy="epoch",
|
|
|
load_best_model_at_end=True,
|
|
|
metric_for_best_model="eval_loss",
|
|
|
)
|
|
|
|
|
|
|
|
|
trainer = Trainer(
|
|
|
model=model,
|
|
|
args=training_args,
|
|
|
train_dataset=tokenized_dataset['train'],
|
|
|
eval_dataset=tokenized_dataset['test'],
|
|
|
)
|
|
|
|
|
|
|
|
|
trainer.train()
|
|
|
|
|
|
|
|
|
eval_results = trainer.evaluate()
|
|
|
print(f"Evaluation results: {eval_results}")
|
|
|
|
|
|
|
|
|
model.save_pretrained(os.getcwd() + '/models/gpt')
|
|
|
tokenizer.save_pretrained(os.getcwd() + '/models/gpt') |