| | from src.services.utils import load_data, stem, set_gemini |
| | import requests as r |
| | import json |
| | import nltk |
| | import itertools |
| | import numpy as np |
| | import requests |
| | from datasets import concatenate_datasets |
| |
|
| | from sentence_transformers import * |
| | model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
| |
|
| | def retrieve_constraints(prompt): |
| | request_input = {"models": ["meta-llama/llama-4-scout-17b-16e-instruct"], "messages": [{"role":"user", "content":prompt}]} |
| | response = r.post("https://organizedprogrammers-bettergroqinterface.hf.space/chat", json=request_input) |
| | decoded_content = json.loads(response.content.decode()) |
| | llm_response = decoded_content["content"] |
| | print(f"llm response : {llm_response}") |
| |
|
| | start_marker = '{' |
| | end_marker = '}' |
| | start_index = llm_response.find(start_marker) + len(start_marker) |
| | end_index = llm_response.find(end_marker, start_index) |
| | json_str = llm_response[start_index:end_index].strip() |
| |
|
| | constraints_json = json.loads("{"+json_str+"}") |
| |
|
| | return constraints_json |
| |
|
| | def remove_over_repeated_technologies(result): |
| | total_lists = len(result) |
| | tech_title = {} |
| |
|
| | for idx, item in enumerate(result): |
| | for tech in item['technologies']: |
| | tech_title[tech[0]['name']] = 0 if tech[0]['name'] not in tech_title else tech_title[tech[0]['name']] + 1 |
| |
|
| | threshold = total_lists * 0.3 |
| | print(threshold) |
| | print(tech_title) |
| | to_delete = [] |
| | for tech, lists in tech_title.items(): |
| | if lists > threshold: |
| | print(f"This technology have been found over repeated : " + tech) |
| | to_delete.append(tech) |
| |
|
| | for idx, item in enumerate(result): |
| | result[idx]['technologies'] = [tech for tech in item['technologies'] if tech[0]['name'] not in to_delete] |
| |
|
| | return result |
| |
|
| | def get_contrastive_similarities(constraints, dataset): |
| | selected_pairs = [] |
| | matrix = [] |
| |
|
| | constraint_descriptions = [c["description"] for c in constraints] |
| | constraint_embeddings = model.encode(constraint_descriptions, show_progress_bar=False) |
| |
|
| | for i, constraint in enumerate(constraints): |
| | constraint_embedding = constraint_embeddings[i] |
| | constraint_matrix = [] |
| | for j, row in enumerate(dataset): |
| | tech_embedding = row["embeddings"] |
| |
|
| | purpose_sim = model.similarity(constraint_embedding, tech_embedding) |
| |
|
| | if np.isnan(purpose_sim): |
| | purpose_sim = 0.0 |
| |
|
| | selected_pairs.append({ |
| | "constraint": constraint, |
| | "id2": j, |
| | "similarity": purpose_sim |
| | }) |
| | constraint_matrix.append(purpose_sim) |
| | matrix.append(constraint_matrix) |
| | return selected_pairs, matrix |
| |
|
| | def find_best_list_combinations(list1: list[str], list2: list[str], matrix) -> list[dict]: |
| | if not list1 or not list2: |
| | print("Warning: One or both input lists are empty. Returning an empty list.") |
| | return [] |
| |
|
| | print(list2) |
| |
|
| | MIN_SIMILARITY = 0.3 |
| | MAX_SIMILARITY = 0.8 |
| |
|
| | possible_matches_for_each_l1 = [] |
| | for i, row_i in enumerate(list1): |
| | valid_matches_for_l1_element = [] |
| | for j, row_j in enumerate(list2): |
| | score = matrix[i][j] |
| |
|
| | if MIN_SIMILARITY <= score <= MAX_SIMILARITY: |
| | data = row_j |
| | del data["embeddings"] |
| | data["id"] = j |
| | valid_matches_for_l1_element.append((data, score)) |
| |
|
| | if not valid_matches_for_l1_element: |
| | print(f"No valid matches found in list2 for '{row_i}' from list1 " |
| | f"(score between {MIN_SIMILARITY} and {MAX_SIMILARITY}). " |
| | "Returning an empty list as no complete combinations can be formed.") |
| |
|
| | else: |
| | possible_matches_for_each_l1.append((valid_matches_for_l1_element, row_i)) |
| |
|
| | result = [] |
| | for tech_list, problem in possible_matches_for_each_l1: |
| | sorted_list = sorted( |
| | tech_list, |
| | key=lambda x: x[1].item() if hasattr(x[1], 'item') else float(x[1]), |
| | reverse=True |
| | ) |
| | top5 = sorted_list[:5] |
| | result.append({ |
| | 'technologies': top5, |
| | 'problem': problem |
| | }) |
| |
|
| | result = remove_over_repeated_technologies(result) |
| | return result |
| |
|
| | def search_technology_by_name(user_input, dataset): |
| | url = "https://heymenn-search-technologies-api.hf.space/search-technologies" |
| |
|
| | headers = { |
| | "accept": "application/json", |
| | "Content-Type": "application/json" |
| | } |
| |
|
| | results = [] |
| | for input in user_input: |
| | payload = { |
| | "title": input, |
| | "type": "title" |
| | } |
| | response = requests.post(url, headers=headers, json=payload) |
| | print(response.json()) |
| | results.append(response.json()) |
| | |
| | technologies = [] |
| | for result in results: |
| | technology = dataset.filter(lambda row: row["name"] == result["title"]) |
| | technologies.append(technology) |
| |
|
| | combined_dataset = concatenate_datasets(technologies) |
| | return combined_dataset |
| |
|
| |
|
| | def select_technologies(problem_technology_list, forced_technology_list=[]): |
| | distinct_techs = set() |
| | candidate_map = [] |
| |
|
| | if len(forced_technology_list) == 0: |
| | for problem_data in forced_technology_list: |
| | cand_dict = {} |
| | for tech_info, sim in problem_data['technologies']: |
| | tech_id = tech_info['id'] |
| | distinct_techs.add(tech_id) |
| | cand_dict[tech_id] = float(sim) |
| |
|
| | for problem_data in problem_technology_list: |
| | cand_dict = {} |
| | for tech_info, sim in problem_data['technologies']: |
| | tech_id = tech_info['id'] |
| | distinct_techs.add(tech_id) |
| | cand_dict[tech_id] = float(sim) |
| | if cand_dict not in candidate_map: |
| | candidate_map.append(cand_dict) |
| |
|
| | distinct_techs = sorted(list(distinct_techs)) |
| | n = len(problem_technology_list) |
| |
|
| | if n == 0: |
| | return set() |
| |
|
| | min_k = None |
| | best_set = None |
| | best_avg = -1 |
| |
|
| | print(f"Distinct technologies: {distinct_techs}") |
| | print(f"Candidate map: {candidate_map}") |
| | print(f"Number of problems: {n}") |
| |
|
| | for k in range(1, len(distinct_techs)+1): |
| | if min_k is not None and k > min_k: |
| | break |
| |
|
| | for T in itertools.combinations(distinct_techs, k): |
| | total_sim = 0.0 |
| | covered = True |
| | for i in range(n): |
| | max_sim = -1.0 |
| | found = False |
| | for tech in T: |
| | if tech in candidate_map[i]: |
| | found = True |
| | sim_val = candidate_map[i][tech] |
| | if sim_val > max_sim: |
| | max_sim = sim_val |
| | if not found: |
| | covered = False |
| | break |
| | else: |
| | total_sim += max_sim |
| |
|
| | if covered: |
| | avg_sim = total_sim / n |
| | if min_k is None or k < min_k: |
| | min_k = k |
| | best_set = T |
| | best_avg = avg_sim |
| | elif k == min_k and avg_sim > best_avg: |
| | best_set = T |
| | best_avg = avg_sim |
| |
|
| | if min_k is not None and k == min_k: |
| | break |
| |
|
| | if best_set is None: |
| | return set() |
| | return set(best_set) |
| |
|
| | def load_titles(techno, data_type): |
| | if data_type == "pydantic": |
| | technology_titles = [tech.name for tech in techno] |
| | else: |
| | technologies = techno["technologies"] |
| | technology_titles = [tech["name"] for tech in technologies] |
| | return technology_titles |
| |
|
| | def search_prior_art(technologies_input: list, data: str, data_type: str, techno_type: str) -> json: |
| | """ |
| | Searches for prior art patents online that solve a given technical problem |
| | using a set of specified technologies, leveraging the Gemini model's search capabilities. |
| | """ |
| |
|
| | technology_titles = load_titles(technologies_input, techno_type) |
| |
|
| | if data_type == "problem": |
| | prompt = f"Find prior art patents or research paper online that address the technical problem: '{data}'. " \ |
| | |
| | elif data_type == "constraints": |
| | prompt = f"Find prior art patents or research paper online that address those constraints: '{data}'. " \ |
| |
|
| | prompt += f"Using any combination of the following technologies: {', '.join(technology_titles)}. " \ |
| | f"Specifically look for patents that integrate multiple of these technologies." \ |
| | f"Indicate for each document found what technologies is used inside of it from the provided list" \ |
| | f"Indicate for each document the solution, then the twist of this solution," \ |
| | f"What makes it different from all the other existing solutions." \ |
| | f"Output only one sentence for the solution and the twist." \ |
| |
|
| | client,config = set_gemini() |
| |
|
| | response = client.models.generate_content( |
| | model="gemini-2.5-flash", |
| | contents=prompt, |
| | config=config, |
| | ) |
| |
|
| | return response |
| |
|
| | def add_citations_and_collect_uris(response): |
| | try: |
| | print(response) |
| | text = response.text |
| | supports = response.candidates[0].grounding_metadata.grounding_supports |
| | chunks = response.candidates[0].grounding_metadata.grounding_chunks |
| |
|
| | sorted_supports = sorted(supports, key=lambda s: s.segment.end_index, reverse=True) |
| |
|
| | uris_added = set() |
| |
|
| | for support in sorted_supports: |
| | end_index = support.segment.end_index |
| | if support.grounding_chunk_indices: |
| | citation_links = [] |
| | for i in support.grounding_chunk_indices: |
| | if i < len(chunks): |
| | uri = chunks[i].web.uri |
| | if uri not in text and uri not in uris_added: |
| | citation_links.append(f"[{i + 1}]({uri})") |
| | uris_added.add(uri) |
| | if citation_links: |
| | citation_string = ", ".join(citation_links) |
| | text = text[:end_index] + citation_string + text[end_index:] |
| | return {"content": text,"uris": list(uris_added)} |
| | except Exception as e: |
| | print(f"Error : {e}") |
| | return {"content": e, "uris": []} |