Instructions to use lytang/MiniCheck-Flan-T5-Large with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use lytang/MiniCheck-Flan-T5-Large with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-classification", model="lytang/MiniCheck-Flan-T5-Large")# Load model directly from transformers import AutoTokenizer, AutoModelForSeq2SeqLM tokenizer = AutoTokenizer.from_pretrained("lytang/MiniCheck-Flan-T5-Large") model = AutoModelForSeq2SeqLM.from_pretrained("lytang/MiniCheck-Flan-T5-Large") - Notebooks
- Google Colab
- Kaggle
| from minicheck_web.minicheck import MiniCheck | |
| from web_retrieval import * | |
| from nltk.tokenize import sent_tokenize | |
| import evaluate | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from heapq import heappush, heappop | |
| def sort_chunks_single_doc_claim(used_chunk, support_prob_per_chunk): | |
| ''' | |
| Sort the chunks in a single document based on the probability of "supported" in descending order. | |
| This function is used when a user document is provided. | |
| ''' | |
| flattened_docs = [doc for chunk in used_chunk for doc in chunk] | |
| flattened_scores = [score for chunk in support_prob_per_chunk for score in chunk] | |
| doc_score = list(zip(flattened_docs, flattened_scores)) | |
| ranked_doc_score = sorted(doc_score, key=lambda x: x[1], reverse=True) | |
| ranked_docs, scores = zip(*ranked_doc_score) | |
| return ranked_docs, scores | |
| def rank_documents_TFIDF(claim, scraped_results): | |
| """ | |
| each element in scraped_results is a tuple of (document, URL) | |
| """ | |
| documents = [result[0] for result in scraped_results] | |
| corpus = [claim] + documents | |
| vectorizer = TfidfVectorizer() | |
| tfidf_matrix = vectorizer.fit_transform(corpus) | |
| claim_vector = tfidf_matrix[0] | |
| similarity_scores = cosine_similarity(claim_vector, tfidf_matrix[1:]) | |
| ranked_results = [(scraped_results[i][0], scraped_results[i][1], score) | |
| for i, score in enumerate(similarity_scores[0])] | |
| ranked_results.sort(key=lambda x: x[2], reverse=True) | |
| ranked_documents = [(result[0], result[1]) for result in ranked_results] | |
| return ranked_documents | |
| class EndpointHandler(): | |
| def __init__(self, path="./"): | |
| self.scorer = MiniCheck(path=path) | |
| self.rouge = evaluate.load('rouge') | |
| self.tfidf_order = True | |
| self.num_highlights = 1 | |
| self.default_chunk_size = 500 | |
| self.chunk_size = 500 | |
| def __call__(self, data): | |
| # this is necessary for setting the chunk size for | |
| # retrived docs | |
| if 'chunk_size' in data['inputs']: | |
| self.chunk_size = int(data['inputs']['chunk_size']) | |
| else: | |
| self.chunk_size = self.default_chunk_size | |
| claim = data['inputs']['claims'][0] | |
| ents = extract_entities(claim) | |
| # Using user-provided document to do fact-checking | |
| if len(data['inputs']['docs']) == 1 and data['inputs']['docs'][0] != '': | |
| _, _, used_chunk, support_prob_per_chunk = self.scorer.score(data=data) | |
| ranked_docs, scores = sort_chunks_single_doc_claim(used_chunk, support_prob_per_chunk) | |
| span_to_highlight, rouge_score = [], [] | |
| for doc_chunk in ranked_docs: | |
| highest_score_sent, rouge_score = self.chunk_and_highest_rouge_score(doc_chunk, claim, k=self.num_highlights) | |
| span_to_highlight.append(highest_score_sent) | |
| outputs = { | |
| 'ranked_docs': ranked_docs, | |
| 'scores': scores, | |
| 'span_to_highlight': span_to_highlight, | |
| 'entities': ents, | |
| 'rouge_score': rouge_score | |
| } | |
| else: | |
| assert len(data['inputs']['claims']) == 1, "Only one claim is allowed for web retrieval for the current version." | |
| ranked_docs, scores, ranked_urls = self.search_relevant_docs(claim, tfidf_order=self.tfidf_order) | |
| span_to_highlight, rouge_score = [], [] | |
| for doc_chunk in ranked_docs: | |
| highest_score_sent, rouge_score = self.chunk_and_highest_rouge_score(doc_chunk, claim, k=self.num_highlights) | |
| span_to_highlight.append(highest_score_sent) | |
| outputs = { | |
| 'ranked_docs': ranked_docs, | |
| 'scores': scores, | |
| 'ranked_urls': ranked_urls, | |
| 'span_to_highlight': span_to_highlight, | |
| 'entities': ents, | |
| 'rouge_score': rouge_score | |
| } | |
| return outputs | |
| def search_relevant_docs(self, claim, timeout=10, max_search_results_per_query=5, allow_duplicated_urls=False, tfidf_order=False): | |
| """ | |
| if tfidf_order == True, then display the docs in the order of TF-IDF similarity with the claim, regardless of the entailment score | |
| otherwise, display the docs in the order of the entailment score | |
| """ | |
| search_results = search_google(claim, timeout=timeout) | |
| print('Searching webpages...') | |
| start = time() | |
| with concurrent.futures.ThreadPoolExecutor() as e: | |
| scraped_results = e.map(scrape_url, search_results, itertools.repeat(timeout)) | |
| end = time() | |
| print(f"Finished searching in {round((end - start), 1)} seconds.\n") | |
| scraped_results = [(r[0][:20000], r[1]) for r in scraped_results if r[0] and '��' not in r[0]] # those can be ranked based on TF-IDF to be more efficient | |
| scraped_results = rank_documents_TFIDF(claim, scraped_results) | |
| retrieved_docs, urls = zip(*scraped_results[:max_search_results_per_query]) | |
| print('Scoring webpages...') | |
| start = time() | |
| retrieved_data = { | |
| 'inputs': { | |
| 'docs': list(retrieved_docs), | |
| 'claims': [claim]*len(retrieved_docs), | |
| 'chunk_size': self.chunk_size | |
| } | |
| } | |
| _, _, used_chunk, support_prob_per_chunk = self.scorer.score(data=retrieved_data) | |
| end = time() | |
| num_chunks = len([item for items in used_chunk for item in items]) | |
| print(f'Finished {num_chunks} entailment checks in {round((end - start), 1)} seconds ({round(num_chunks / (end - start) * 60)} Doc./min).') | |
| if tfidf_order: | |
| tfidf_docs, scores = [], [] | |
| for used_c, support_prob_per_c in zip(used_chunk, support_prob_per_chunk): | |
| # If the doc can support the claim, find the chunk with the | |
| # highest entailment score; otherwise, use the first chunk | |
| if max(support_prob_per_c) > 0.5: | |
| tfidf_docs.append(used_c[np.argmax(support_prob_per_c)]) | |
| scores.append(max(support_prob_per_c)) | |
| else: | |
| tfidf_docs.append(used_c[0]) | |
| scores.append(support_prob_per_c[0]) | |
| return tfidf_docs, scores, urls | |
| else: | |
| ranked_docs, scores, ranked_urls = order_doc_score_url(used_chunk, support_prob_per_chunk, urls, allow_duplicated_urls=allow_duplicated_urls) | |
| return ranked_docs, scores, ranked_urls | |
| def chunk_and_highest_rouge_score(self, doc, claim, k=1): | |
| ''' | |
| Given a document and a claim, return the top k sentences with the highest rouge scores and their scores | |
| ''' | |
| doc_sentences = sent_tokenize(doc) | |
| claims = [claim] * len(doc_sentences) | |
| results = self.rouge.compute( | |
| predictions=doc_sentences, | |
| references=claims, | |
| use_aggregator=False) | |
| # Initialize a min heap to store the top k sentences and their scores | |
| top_k_heap = [] | |
| for i in range(len(doc_sentences)): | |
| score = results['rouge1'][i] | |
| sentence = doc_sentences[i] | |
| # If the heap has less than k elements, push the current sentence and score | |
| if len(top_k_heap) < k: | |
| heappush(top_k_heap, (score, sentence)) | |
| else: | |
| # If the current score is higher than the minimum score in the heap, | |
| # remove the minimum and push the current sentence and score | |
| if score > top_k_heap[0][0]: | |
| heappop(top_k_heap) | |
| heappush(top_k_heap, (score, sentence)) | |
| # Extract the top k sentences and scores from the heap | |
| top_k_sentences = [] | |
| top_k_scores = [] | |
| while top_k_heap: | |
| score, sentence = heappop(top_k_heap) | |
| top_k_sentences.append(sentence) | |
| top_k_scores.append(score) | |
| # Reverse the order of sentences and scores to get them in descending order | |
| top_k_sentences = top_k_sentences[::-1] | |
| top_k_scores = top_k_scores[::-1] | |
| return top_k_sentences, top_k_scores |