| | import json |
| | import random |
| | import os |
| | from openai import OpenAI |
| | from tqdm import tqdm |
| | import concurrent.futures |
| | from typing import List, Dict, Optional |
| | from datetime import datetime |
| | from threading import Lock |
| | import time |
| | from prompt import R1_SYS_PROMPT |
| | |
| | client = OpenAI( |
| | api_key=os.environ.get("SL_KEY", "YOUR_SILCONFLOW_KEY"), |
| | base_url="https://api.siliconflow.cn/v1", |
| | ) |
| |
|
| | |
| | file_lock = Lock() |
| |
|
| | def format_query(qa_dict: Dict, v2=False) -> str: |
| | query = "Answer the question according to scene description.\n\n" |
| | query += qa_dict["description"] |
| | query += f"\nQuestion:\n{qa_dict['q']}" |
| | if v2: |
| | query += "\nInstructions:\n" |
| | query += "1. Carefully analyze the scene description\n" |
| | query += "2. Provide your reasoning if necessary\n" |
| | query += "3. For the final answer, start a new line with '**The answer is: **' followed by your answer\n" |
| | return query |
| |
|
| | def write_to_jsonl(result: Dict, filename: str): |
| | """Thread-safe function to write a result to JSONL file""" |
| | with file_lock: |
| | with open(filename, 'a') as f: |
| | f.write(json.dumps(result) + '\n') |
| |
|
| | def query_r1(qa_pair: Dict, output_file: str, model: str = "deepseek-ai/DeepSeek-R1", v2=False) -> Optional[Dict]: |
| | query = format_query(qa_pair, v2=v2) |
| | try: |
| | response = client.chat.completions.create( |
| | model=model, |
| | messages=[ |
| | {"role": "system", "content": R1_SYS_PROMPT}, |
| | {"role": "user", "content": query}], |
| | stream=False, |
| | max_tokens=4096 |
| | ) |
| | result = { |
| | **qa_pair, |
| | "r1_response": response.choices[0].message.content, |
| | "timestamp": datetime.now().isoformat() |
| | } |
| | |
| | write_to_jsonl(result, output_file) |
| | time.sleep(4) |
| | return result |
| | except Exception as e: |
| | print(f"Error processing query: {e}") |
| | error_result = { |
| | **qa_pair, |
| | "error": str(e), |
| | "timestamp": datetime.now().isoformat() |
| | } |
| | write_to_jsonl(error_result, f"errors_{output_file}") |
| | time.sleep(10) |
| | return None |
| |
|
| | def process_qa_pairs_parallel(qa_pairs: List[Dict], output_file: str, max_workers: int = 10) -> List[Dict]: |
| | successful_count = 0 |
| | |
| | with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: |
| | |
| | futures = [executor.submit(query_r1, qa_pair, output_file, v2="v2" in output_file) for qa_pair in qa_pairs] |
| | |
| | |
| | results = [] |
| | for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)): |
| | try: |
| | result = future.result() |
| | if result is not None: |
| | results.append(result) |
| | successful_count += 1 |
| | except Exception as e: |
| | print(f"Failed to process query: {e}") |
| | |
| | return results |
| |
|
| | if __name__ == "__main__": |
| | |
| | random.seed(1234) |
| | qa_pairs = json.load(open("/home/lilei/Visual-R1/data/clever_counting_problems_clevr_cogent_v1.0_trainA.json")) |
| | random.shuffle(qa_pairs) |
| | qa_pairs = qa_pairs[:10000] |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | output_file = f"r1_results_clevr_cogent_v1.0_trainA_v2.jsonl" |
| | |
| | finished = set() |
| | with open(output_file, 'r') as f: |
| | for line in f: |
| | ins = json.loads(line) |
| | key = ins["img_filename"] + "-" + ins["q"] + "-" + str(ins["a"]) |
| | finished.add(key) |
| | qa_pairs = [ins for ins in qa_pairs if ins["img_filename"] + "-" + ins["q"] + "-" + str(ins["a"]) not in finished] |
| | print("Finished: ", len(finished)) |
| | print("Remaining: ", len(qa_pairs)) |
| | |
| | r1_results = process_qa_pairs_parallel(qa_pairs, output_file) |
| | |
| | |
| | print(f"Successfully processed {len(r1_results)} out of {len(qa_pairs)} queries") |
| | print(f"Results saved to {output_file}") |
| | print(f"Any errors were saved to errors_{output_file}") |