| | import os |
| | import shutil |
| | import subprocess |
| | import time |
| | from collections import deque |
| | from copy import deepcopy |
| | from dataclasses import asdict, dataclass, field |
| | from typing import Any, Dict, List |
| |
|
| | import json |
| | import torch |
| |
|
| | from swift.llm import ExportArguments |
| | from swift.utils import find_free_port, get_device_count, get_logger |
| |
|
| | logger = get_logger() |
| |
|
| |
|
| | @dataclass |
| | class Experiment: |
| |
|
| | name: str |
| |
|
| | cmd: str |
| |
|
| | group: str |
| |
|
| | requirements: Dict = field(default_factory=dict) |
| |
|
| | eval_requirements: Dict = field(default_factory=dict) |
| |
|
| | eval_dataset: List = field(default_factory=list) |
| |
|
| | args: Dict = field(default_factory=dict) |
| |
|
| | env: Dict = field(default_factory=dict) |
| |
|
| | record: Dict = field(default_factory=dict) |
| |
|
| | create_time: float = None |
| |
|
| | runtime: Dict = field(default_factory=dict) |
| |
|
| | input_args: Any = None |
| |
|
| | do_eval = False |
| |
|
| | def __init__(self, |
| | name, |
| | cmd, |
| | group, |
| | requirements=None, |
| | eval_requirements=None, |
| | eval_dataset=None, |
| | args=None, |
| | input_args=None, |
| | **kwargs): |
| | self.name = name |
| | self.cmd = cmd |
| | self.group = group |
| | self.requirements = requirements or {} |
| | self.args = args or {} |
| | self.record = {} |
| | self.env = {} |
| | self.runtime = {} |
| | self.input_args = input_args |
| | self.eval_requirements = eval_requirements or {} |
| | self.eval_dataset = eval_dataset or [] |
| | if self.cmd == 'eval': |
| | self.do_eval = True |
| |
|
| | def load(self, _json): |
| | self.name = _json['name'] |
| | self.cmd = _json['cmd'] |
| | self.requirements = _json['requirements'] |
| | self.args = _json['args'] |
| | self.record = _json['record'] |
| | self.env = _json['env'] |
| | self.create_time = _json['create_time'] |
| |
|
| | @property |
| | def priority(self): |
| | return self.requirements.get('gpu', 0) |
| |
|
| | def to_dict(self): |
| | _dict = asdict(self) |
| | _dict.pop('runtime') |
| | _dict.pop('input_args') |
| | return _dict |
| |
|
| |
|
| | class ExpManager: |
| |
|
| | RESULT_FILE = 'result.jsonl' |
| |
|
| | def __init__(self): |
| | self.exps = [] |
| |
|
| | def assert_gpu_not_overlap(self): |
| | all_gpus = set() |
| | for exp in self.exps: |
| | gpus = exp.runtime['env']['CUDA_VISIBLE_DEVICES'].split(',') |
| | if all_gpus & set(gpus): |
| | raise ValueError(f'GPU overlap: {self.exps}!') |
| | all_gpus.update(gpus) |
| |
|
| | def run(self, exp: Experiment): |
| | if os.path.exists(os.path.join(exp.input_args.save_dir, exp.name + '.json')): |
| | with open(os.path.join(exp.input_args.save_dir, exp.name + '.json'), 'r', encoding='utf-8') as f: |
| | _json = json.load(f) |
| | if exp.eval_dataset and 'eval_result' not in _json['record']: |
| | if not exp.do_eval: |
| | logger.info(f'Experiment {exp.name} need eval, load from file.') |
| | exp.load(_json) |
| | exp.do_eval = True |
| | else: |
| | logger.warn(f'Experiment {exp.name} already done, skip') |
| | return |
| |
|
| | if exp.do_eval: |
| | runtime = self._build_eval_cmd(exp) |
| | exp.runtime = runtime |
| | envs = deepcopy(runtime.get('env', {})) |
| | envs.update(os.environ) |
| | logger.info(f'Running cmd: {runtime["running_cmd"]}, env: {runtime.get("env", {})}') |
| | os.makedirs('exp', exist_ok=True) |
| | log_file = os.path.join('exp', f'{exp.name}.eval.log') |
| | exp.handler = subprocess.Popen(runtime['running_cmd'] + f' > {log_file} 2>&1', env=envs, shell=True) |
| | self.exps.append(exp) |
| | self.assert_gpu_not_overlap() |
| | return |
| |
|
| | if any([exp.name == e.name for e in self.exps]): |
| | raise ValueError(f'Why exp name duplicate? {exp.name}') |
| | elif exp.cmd == 'export' and any([exp.cmd == 'export' for exp in self.exps]): |
| | raise AssertionError('Cannot run parallel export task.') |
| | else: |
| | exp.create_time = time.time() |
| | runtime = self._build_cmd(exp) |
| | exp.runtime = runtime |
| | envs = deepcopy(runtime.get('env', {})) |
| | envs.update(os.environ) |
| | logger.info(f'Running cmd: {runtime["running_cmd"]}, env: {runtime.get("env", {})}') |
| | os.makedirs('exp', exist_ok=True) |
| | log_file = os.path.join('exp', f'{exp.name}.{exp.cmd}.log') |
| | exp.handler = subprocess.Popen(runtime['running_cmd'] + f' > {log_file} 2>&1', env=envs, shell=True) |
| | self.exps.append(exp) |
| | self.assert_gpu_not_overlap() |
| |
|
| | def _build_eval_cmd(self, exp: Experiment): |
| | gpu = exp.eval_requirements.get('gpu', None) |
| | env = {} |
| | allocated = [] |
| | if gpu: |
| | allocated = self._find_free_gpu(int(gpu)) |
| | assert allocated, 'No free gpu for now!' |
| | allocated = [str(gpu) for gpu in allocated] |
| | env['CUDA_VISIBLE_DEVICES'] = ','.join(allocated) |
| |
|
| | best_model_checkpoint = exp.record.get('best_model_checkpoint') |
| | eval_dataset = exp.eval_dataset |
| | if best_model_checkpoint is not None: |
| | if not os.path.exists(os.path.join(best_model_checkpoint, 'args.json')): |
| | cmd = f'swift eval --ckpt_dir {best_model_checkpoint} ' \ |
| | + f'--infer_backend pt --train_type full --eval_dataset {" ".join(eval_dataset)}' |
| | else: |
| | cmd = f'swift eval --model {exp.args.get("model")} --infer_backend pt ' \ |
| | f'--eval_dataset {" ".join(eval_dataset)}' |
| |
|
| | return { |
| | 'running_cmd': cmd, |
| | 'gpu': allocated, |
| | 'env': env, |
| | } |
| |
|
| | def _build_cmd(self, exp: Experiment): |
| | gpu = exp.requirements.get('gpu', None) |
| | env = {} |
| | allocated = [] |
| | if gpu: |
| | allocated = self._find_free_gpu(int(gpu)) |
| | assert allocated, 'No free gpu for now!' |
| | allocated = [str(gpu) for gpu in allocated] |
| | env['CUDA_VISIBLE_DEVICES'] = ','.join(allocated) |
| | if int(exp.requirements.get('ddp', 1)) > 1: |
| | env['NPROC_PER_NODE'] = exp.requirements.get('ddp') |
| | env['MASTER_PORT'] = str(find_free_port()) |
| |
|
| | if exp.cmd == 'sft': |
| | from swift.llm import TrainArguments |
| | args = exp.args |
| | sft_args = TrainArguments(**args) |
| | args['output_dir'] = sft_args.output_dir |
| | args['logging_dir'] = sft_args.logging_dir |
| | args['add_version'] = False |
| | os.makedirs(sft_args.output_dir, exist_ok=True) |
| | os.makedirs(sft_args.logging_dir, exist_ok=True) |
| | cmd = 'swift sft ' |
| | for key, value in args.items(): |
| | cmd += f' --{key} {value}' |
| | elif exp.cmd == 'rlhf': |
| | from swift.llm import RLHFArguments |
| | args = exp.args |
| | rlhf_args = RLHFArguments(**args) |
| | args['output_dir'] = rlhf_args.output_dir |
| | args['logging_dir'] = rlhf_args.logging_dir |
| | args['add_version'] = False |
| | os.makedirs(rlhf_args.output_dir, exist_ok=True) |
| | os.makedirs(rlhf_args.logging_dir, exist_ok=True) |
| | cmd = 'swift rlhf ' |
| | for key, value in args.items(): |
| | cmd += f' --{key} {value}' |
| | elif exp.cmd == 'export': |
| | args = exp.args |
| | cmd = 'swift export ' |
| | for key, value in args.items(): |
| | cmd += f' --{key} {value}' |
| | else: |
| | raise ValueError(f'Unsupported cmd type: {exp.cmd}') |
| | return { |
| | 'running_cmd': cmd, |
| | 'gpu': allocated, |
| | 'env': env, |
| | 'logging_dir': args.get('logging_dir'), |
| | 'output_dir': args.get('output_dir', args.get('ckpt_dir')) |
| | } |
| |
|
| | def _find_free_gpu(self, n): |
| | all_gpus = set() |
| | for exp in self.exps: |
| | all_gpus.update(exp.runtime.get('gpu', set())) |
| | all_gpus = {int(g) for g in all_gpus} |
| | free_gpu = set(range(get_device_count())) - all_gpus |
| | if len(free_gpu) < n: |
| | return None |
| | return list(free_gpu)[:n] |
| |
|
| | def prepare_experiments(self, args: Any): |
| | experiments = [] |
| | for config_file in args.config: |
| | with open(config_file, 'r', encoding='utf-8') as f: |
| | group = os.path.basename(config_file) |
| | group = group[:-5] |
| | content = json.load(f) |
| | exps = content['experiment'] |
| | for exp in exps: |
| | main_cfg = deepcopy(content) |
| | name = exp['name'] |
| | cmd = main_cfg['cmd'] |
| | run_args = main_cfg['args'] |
| | env = main_cfg.get('env', {}) |
| | requirements = main_cfg.get('requirements', {}) |
| | eval_requirements = main_cfg.get('eval_requirements', {}) |
| | eval_dataset = main_cfg.get('eval_dataset', {}) |
| | if 'args' in exp: |
| | run_args.update(exp['args']) |
| | if 'requirements' in exp: |
| | requirements.update(exp['requirements']) |
| | if 'env' in exp: |
| | env.update(exp['env']) |
| | experiments.append( |
| | Experiment( |
| | group=group, |
| | name=name, |
| | cmd=cmd, |
| | args=run_args, |
| | env=env, |
| | requirements=requirements, |
| | eval_requirements=eval_requirements, |
| | eval_dataset=eval_dataset, |
| | input_args=args)) |
| | return experiments |
| |
|
| | @staticmethod |
| | def _get_metric(exp: Experiment): |
| | if exp.do_eval: |
| | if os.path.isfile(os.path.join('exp', f'{exp.name}.eval.log')): |
| | with open(os.path.join('exp', f'{exp.name}.eval.log'), 'r', encoding='utf-8') as f: |
| | for line in f.readlines(): |
| | if 'Final report:' in line: |
| | return json.loads(line.split('Final report:')[1].replace('\'', '"')) |
| | elif exp.cmd == 'export': |
| | exp_args = ExportArguments(**exp.args) |
| | if exp_args.quant_bits > 0: |
| | if exp_args.ckpt_dir is None: |
| | path = f'{exp_args.model_type}-{exp_args.quant_method}-int{exp_args.quant_bits}' |
| | else: |
| | ckpt_dir, ckpt_name = os.path.split(exp_args.ckpt_dir) |
| | path = os.path.join(ckpt_dir, f'{ckpt_name}-{exp_args.quant_method}-int{exp_args.quant_bits}') |
| | else: |
| | ckpt_dir, ckpt_name = os.path.split(exp_args.ckpt_dir) |
| | path = os.path.join(ckpt_dir, f'{ckpt_name}-merged') |
| | if os.path.exists(path): |
| | shutil.rmtree(exp.name, ignore_errors=True) |
| | os.makedirs(exp.name, exist_ok=True) |
| | shutil.move(path, os.path.join(exp.name, path)) |
| | return { |
| | 'best_model_checkpoint': os.path.join(exp.name, path), |
| | } |
| | else: |
| | logging_dir = exp.runtime.get('logging_dir') |
| | logging_file = os.path.join(logging_dir, '..', 'logging.jsonl') |
| | if os.path.isfile(logging_file): |
| | with open(logging_file, 'r', encoding='utf-8') as f: |
| | for line in f.readlines(): |
| | if 'model_info' in line: |
| | return json.loads(line) |
| | return None |
| |
|
| | @staticmethod |
| | def write_record(exp: Experiment): |
| | target_dir = exp.input_args.save_dir |
| | file = os.path.join(target_dir, exp.name + '.json') |
| | with open(file, 'w', encoding='utf-8') as f: |
| | f.write(json.dumps(exp.to_dict()) + '\n') |
| |
|
| | def _poll(self): |
| | while True: |
| | time.sleep(5) |
| |
|
| | has_finished = False |
| | for exp in self.exps: |
| | rt = exp.handler.poll() |
| | if rt is None: |
| | continue |
| |
|
| | has_finished = True |
| | if rt == 0: |
| | if not exp.do_eval: |
| | all_metric = self._get_metric(exp) |
| | if all_metric: |
| | exp.record.update(all_metric) |
| | if exp.eval_dataset: |
| | exp.do_eval = True |
| | self.exp_queue.appendleft(exp) |
| | self.write_record(exp) |
| | else: |
| | logger.error(f'Running {exp.name} task, but no result found') |
| | else: |
| | all_metric = self._get_metric(exp) |
| | exp.record['eval_result'] = all_metric |
| | if all_metric: |
| | self.write_record(exp) |
| | else: |
| | logger.error(f'Running {exp.name} eval task, but no eval result found') |
| | logger.info(f'Running {exp.name} finished with return code: {rt}') |
| |
|
| | if has_finished: |
| | self.exps = [exp for exp in self.exps if exp.handler.poll() is None] |
| | break |
| |
|
| | def begin(self, args: Any): |
| | exps = self.prepare_experiments(args) |
| | logger.info(f'all exps: {exps}') |
| | exps.sort(key=lambda e: e.priority) |
| | self.exp_queue = deque() |
| | for exp in exps: |
| | self.exp_queue.append(exp) |
| |
|
| | while len(self.exp_queue) or len(self.exps) > 0: |
| | while len(self.exp_queue): |
| | try: |
| | logger.info(f'Running exp: {self.exp_queue[0].name}') |
| | self.run(self.exp_queue[0]) |
| | except Exception as e: |
| | if not isinstance(e, AssertionError): |
| | logger.error(f'Adding exp {self.exp_queue[0].name} error because of:') |
| | logger.error(e) |
| | self.exp_queue.popleft() |
| | else: |
| | logger.info(f'Adding exp {self.exp_queue[0].name} error because of:', str(e)) |
| | if 'no free gpu' in str(e).lower(): |
| | break |
| | else: |
| | continue |
| | else: |
| | self.exp_queue.popleft() |
| | self._poll() |
| | logger.info(f'Run task finished because of exp queue: {self.exp_queue} and exps: {self.exps}') |
| |
|
| |
|
| | def find_all_config(dir_or_file: str): |
| | if os.path.isfile(dir_or_file): |
| | return [dir_or_file] |
| | else: |
| | configs = [] |
| | for dirpath, dirnames, filenames in os.walk(dir_or_file): |
| | for name in filenames: |
| | if name.endswith('.json') and 'ipynb' not in dirpath: |
| | configs.append(os.path.join(dirpath, name)) |
| | return configs |
| |
|