| | import os |
| |
|
| | import hydra |
| |
|
| | import aiflows |
| | from aiflows.backends.api_info import ApiInfo |
| | from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys |
| |
|
| | from aiflows import logging |
| | from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache |
| |
|
| | from aiflows.utils import serving |
| | from aiflows.workers import run_dispatch_worker_thread |
| | from aiflows.messages import FlowMessage |
| | from aiflows.interfaces import KeyInterface |
| | from aiflows.utils.colink_utils import start_colink_server |
| | from aiflows import flow_verse |
| | import pandas as pd |
| | import sys |
| | from copy import deepcopy |
| | import requests |
| | import time |
| |
|
| | dependencies = [ |
| | { |
| | "url": "aiflows/FunSearchFlowModule", |
| | "revision": "../FunSearchFlowModule" |
| | } |
| | ] |
| | flow_verse.sync_dependencies(dependencies) |
| | from flow_modules.aiflows.FunSearchFlowModule.Loader import Loader |
| |
|
| | logging.set_verbosity_debug() |
| |
|
| |
|
| | def load_problem(id, ds_location = "./data/codeforces.jsonl.gz"): |
| | def make_problem_descriptions_str(row): |
| | def write_public_tests_individual_io_str(row): |
| | public_tests = row.public_tests_individual_io |
| | tests = "" |
| | for i,test in enumerate(public_tests): |
| | input = test[0] |
| | output = test[1] |
| | tests += f"Test {i+1}:\n Input: {input}\n Output: \'{output}\'\n" |
| | return tests |
| | |
| | problem_descritption = row.problem_description |
| | input_descriptions = row.input_description |
| | ouput_descriptions = row.output_description |
| | public_tests = write_public_tests_individual_io_str(row) |
| | |
| | problem_description_str = f"Problem Description:\n{problem_descritption}\n\n" |
| | input_description_str = f"Input Description:\n{input_descriptions}\n\n" |
| | output_description_str = f"Output Description:\n{ouput_descriptions}\n\n" |
| | public_tests_str = f"Public Tests:\n{public_tests}\n" |
| | |
| | final_str = problem_description_str + input_description_str + output_description_str +public_tests_str |
| | return final_str |
| | |
| | df = pd.read_json(ds_location, lines=True, compression='gzip') |
| | row = df[df.id == id].iloc[0] |
| |
|
| | assert row.non_unique_output == False, "Problem has non unique output. Not supported yet" |
| |
|
| | problem_description = make_problem_descriptions_str(row) |
| | public_test = row.public_tests_individual_io |
| | tests = {} |
| | test_counter = 1 |
| |
|
| | for public_test in public_test: |
| | tests["test_"+str(test_counter)] = {"tests_inputs": public_test[0], "expected_outputs": public_test[1]} |
| | test_counter += 1 |
| | |
| | for hidden_test in row.hidden_tests_io: |
| | tests["test_"+str(test_counter)] = {"tests_inputs": hidden_test[0], "expected_outputs": hidden_test[1]} |
| | test_counter += 1 |
| | |
| | return tests, problem_description |
| |
|
| | def download_codeforces_data(data_folder_path,file_name): |
| | print("Downloading data....") |
| | os.makedirs(data_folder_path, exist_ok=True) |
| | url = "https://github.com/epfl-dlab/cc_flows/raw/main/data/codeforces/codeforces.jsonl.gz" |
| | response = requests.get(url, stream=True) |
| | |
| | if response.status_code == 200: |
| | with open(os.path.join(data_folder_path,file_name), 'wb') as file: |
| | for chunk in response: |
| | file.write(chunk) |
| | print("Download complete") |
| | else: |
| | print("Failed to download data", response.status_code) |
| | |
| |
|
| | def get_configs(problem_id, ds_location = "./data/codeforces.jsonl.gz"): |
| | tests, problem_description = load_problem(problem_id,ds_location) |
| | |
| | path = os.path.join(".", "demo.yaml") |
| | funsearch_cfg = read_yaml_file(path) |
| | |
| | evaluate_function_file_path: str = "./cf_functions.py" |
| | evaluate_function_name: str = "evaluate" |
| | evolve_function_name:str = "solve_function" |
| | |
| | loader = Loader(file_path = evaluate_function_file_path, target_name = evaluate_function_name) |
| | evaluate_function: str= loader.load_target() |
| | evaluate_file_full_content = loader.load_full_file() |
| | |
| | evaluate_file_full_content = f"\"\"\"{problem_description}\"\"\"\n\n" + evaluate_file_full_content |
| | |
| | |
| | funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_function"] = evaluate_function |
| | funsearch_cfg["subflows_config"]["ProgramDBFlow"]["evaluate_file_full_content"] = evaluate_file_full_content |
| | funsearch_cfg["subflows_config"]["ProgramDBFlow"]["artifact_to_evolve_name"] = evolve_function_name |
| | |
| | if len(tests) > 0: |
| | first_test = tests["test_1"] |
| |
|
| | dummy_solution = f"def {evolve_function_name}(input) -> str:" +\ |
| | "\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\ |
| | f"\n return \'{first_test['expected_outputs']}\'\n" |
| |
|
| | |
| | else: |
| | dummy_solution = f"def {evolve_function_name}(input) -> str:" +\ |
| | "\n \"\"\"Attempt at solving the problem given the input input and returns the predicted output (see the top of the file for problem description)\"\"\"" +\ |
| | f"\n return 0.0\"\"\n" |
| | |
| | |
| | funsearch_cfg["subflows_config"]["EvaluatorFlow"]["py_file"] = evaluate_file_full_content |
| | funsearch_cfg["subflows_config"]["EvaluatorFlow"]["run_error_score"] = -1 |
| | funsearch_cfg["subflows_config"]["EvaluatorFlow"]["function_to_run_name"] = evaluate_function_name |
| | funsearch_cfg["subflows_config"]["EvaluatorFlow"]["test_inputs"] = tests |
| | |
| | funsearch_cfg["subflows_config"]["EvaluatorFlow"]["use_test_input_as_key"] = False |
| | |
| | |
| | |
| | funsearch_cfg["subflows_config"]["SamplerFlow"]["system_message_prompt_template"]["partial_variables"] = \ |
| | { |
| | "evaluate_name": evaluate_function_name, |
| | "evolve_name": evolve_function_name, |
| | "artifacts_per_prompt": 2 |
| | } |
| | |
| | |
| | return funsearch_cfg, dummy_solution |
| |
|
| |
|
| | FLOW_MODULES_PATH = "./" |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | cl = start_colink_server() |
| | |
| | problem_id = "1789B" |
| | |
| | if not os.path.exists("./data/codeforces.jsonl.gz"): |
| | download_codeforces_data("./data", "codeforces.jsonl.gz") |
| | |
| | funsearch_cfg, dummy_solution = get_configs(problem_id) |
| | |
| | api_information = [ApiInfo(backend_used="openai", |
| | api_key = os.getenv("OPENAI_API_KEY"))] |
| | |
| | serving.recursive_serve_flow( |
| | cl=cl, |
| | flow_class_name="flow_modules.aiflows.FunSearchFlowModule.FunSearch", |
| | flow_endpoint="FunSearch", |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | n_workers = 10 |
| | for i in range(n_workers): |
| | run_dispatch_worker_thread(cl) |
| | |
| | quick_load_api_keys(funsearch_cfg, api_information, key="api_infos") |
| | config_overrides = None |
| | |
| | funsearch_proxy = serving.get_flow_instance( |
| | cl=cl, |
| | flow_endpoint="FunSearch", |
| | config_overrides=funsearch_cfg, |
| | ) |
| | |
| | data = { |
| | "from": "SamplerFlow", |
| | "operation": "register_program", |
| | "api_output": dummy_solution |
| | } |
| | |
| | input_message = funsearch_proxy.package_input_message(data = data) |
| | |
| | funsearch_proxy.send_message(input_message) |
| | |
| | |
| | data = { |
| | "from": "FunSearch", |
| | "operation": "start", |
| | "content": {"num_samplers": 5}, |
| | } |
| | |
| | input_message = funsearch_proxy.package_input_message(data = data) |
| | |
| | funsearch_proxy.send_message(input_message) |
| |
|
| | data = { |
| | "from": "FunSearch", |
| | "operation": "stop", |
| | "content": {}, |
| | } |
| | |
| | input_message = funsearch_proxy.package_input_message(data = data) |
| | |
| | funsearch_proxy.send_message(input_message) |
| | |
| | |
| | wait_time = 1000 |
| | print(f"Waiting {wait_time} seconds before requesting result...") |
| | time.sleep(wait_time) |
| |
|
| | data = { |
| | "from": "FunSearch", |
| | "operation": "get_best_programs_per_island", |
| | "content": {} |
| | } |
| | |
| | input_message = funsearch_proxy.package_input_message(data = data) |
| | |
| | future = funsearch_proxy.get_reply_future(input_message) |
| | print("waiting for response....") |
| | response = future.get_data() |
| | print(response) |
| |
|