| from __future__ import annotations |
|
|
| from langchain_core.output_parsers import StrOutputParser |
| from langchain_core.prompts import PromptTemplate |
|
|
| import ast |
| import atexit |
| import os |
| import re |
| import sys |
|
|
| FENCE_RE = re.compile(r"```(?:python)?\s*([\s\S]*?)\s*```", flags=re.IGNORECASE) |
| TRAILING_PARENS_RE = re.compile(r"\)\)\s*$", flags=re.MULTILINE) |
|
|
| |
| |
| |
|
|
|
|
| def _force_utf8_stdio() -> None: |
| try: |
| if hasattr(sys.stdout, "reconfigure"): |
| sys.stdout.reconfigure(encoding="utf-8") |
| if hasattr(sys.stderr, "reconfigure"): |
| sys.stderr.reconfigure(encoding="utf-8") |
| except Exception: |
| pass |
|
|
| |
| |
| |
| MODEL_FILE = "Cube-Python_v2.gguf" |
| N_CTX = 4096 |
| TEMPERATURE = 0.1 |
| N_GPU_LAYERS = -1 |
|
|
| MAX_FIX_ATTEMPTS = 2 |
|
|
| def load_llm(): |
| base_path = os.path.dirname(os.path.abspath(__file__)) |
| model_path = os.path.join(base_path, MODEL_FILE) |
| |
| if not os.path.exists(model_path): |
| raise FileNotFoundError(f"Không tìm thấy file model tại: {model_path}") |
|
|
| try: |
| from langchain_community.llms import GPT4All |
| except Exception as e: |
| raise RuntimeError( |
| "Chưa cài GPT4All cho LangChain. Cài bằng:\n" |
| " pip install gpt4all langchain-community\n" |
| f"Chi tiết: {e}" |
| ) |
|
|
| return GPT4All(model=model_path, temp=TEMPERATURE, verbose=False) |
|
|
| def close_llm_safely(llm): |
| try: |
| client = getattr(llm, "client", None) |
| close = getattr(client, "close", None) |
| if callable(close): |
| close() |
| except Exception: |
| pass |
|
|
| def extract_python_code(text: str) -> str: |
| if not text: |
| return "" |
|
|
| m = FENCE_RE.search(text) |
| if m: |
| return m.group(1).strip() |
|
|
| return text.strip() |
|
|
| def _syntax_error_message(code: str) -> str | None: |
| try: |
| ast.parse(code) |
| return None |
| except SyntaxError: |
| |
| try: |
| ast.parse(code) |
| return None |
| except SyntaxError as e: |
| line = (e.text or "").strip() |
| where = f"line {e.lineno}, col {e.offset}" if e.lineno and e.offset else "unknown location" |
| return f"{e.msg} ({where}). Offending line: {line}" |
|
|
|
|
| def is_valid_python(code: str) -> bool: |
| return _syntax_error_message(code) is None |
|
|
|
|
| def generate_code(chain, question: str) -> str: |
| raw = chain.invoke({"question": question}) |
| code = extract_python_code(raw) |
|
|
| for _ in range(MAX_FIX_ATTEMPTS): |
| err = _syntax_error_message(code) |
| if err is None: |
| return code |
|
|
| raw = chain.invoke( |
| { |
| "question": ( |
| "Output trước bị sai cú pháp Python.\n" |
| f"Lỗi: {err}\n\n" |
| f"Output trước:\n{raw}\n\n" |
| "Hãy trả lại code Python ĐÚNG cú pháp, chỉ code, không markdown." |
| ) |
| } |
| ) |
| code = extract_python_code(raw) |
|
|
| code2 = TRAILING_PARENS_RE.sub(")", code) |
| return code2 if is_valid_python(code2) else code |
|
|
| template = """[INST] Bạn là một trợ lý AI chuyên nghiệp về lập trình Python. |
| Hãy viết code Python chất lượng cao để giải quyết yêu cầu sau. |
| Chỉ trả lời bằng code Python thuần (KHÔNG markdown, KHÔNG giải thích). |
| Yêu cầu: {question} [/INST]""" |
|
|
| prompt = PromptTemplate(input_variables=["question"], template=template) |
|
|
| _force_utf8_stdio() |
| llm = load_llm() |
| atexit.register(close_llm_safely, llm) |
| chain = prompt | llm | StrOutputParser() |
|
|
| question = ''' |
| Write a Python program that extracts all email addresses from a given text. |
| Input: |
| A text: "Contact us at support@nlp.com or info@textprocessing.ai for more details." |
| Desired Output: |
| ['support@nlp.com', 'info@textprocessing.ai']''' |
|
|
| try: |
| print(generate_code(chain, question)) |
| finally: |
| close_llm_safely(llm) |