import torch from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from peft import PeftModel import psycopg2 from psycopg2 import pool import re import logging from groq import Groq import os from dotenv import load_dotenv # Setup logging logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') logger = logging.getLogger(__name__) # Load environment variables load_dotenv() class EnergyIntelligenceBot: def __init__(self): self.db = None self._validate_env_vars() self._connect_db() def _validate_env_vars(self): """Ensures all critical variables exist before starting.""" required_vars = ["DB_USER", "DB_PASSWORD", "DB_HOST", "DB_PORT", "DB_NAME", "GROQ_API_KEY"] missing = [var for var in required_vars if not os.environ.get(var)] if missing: raise ValueError(f"CRITICAL: Missing environment variables: {', '.join(missing)}") def _connect_db(self): """Safely connects to PostgreSQL.""" try: self.db = psycopg2.pool.SimpleConnectionPool( minconn=1, maxconn=10, user=os.environ.get("DB_USER"), password=os.environ.get("DB_PASSWORD"), host=os.environ.get("DB_HOST"), port=os.environ.get("DB_PORT"), database=os.environ.get("DB_NAME") ) logger.info("PostgreSQL connection pool created successfully.") except Exception as e: logger.error(f"Database connection failed: {e}. Bot will run in text-only mode.") def call_llm(self, messages, model="llama-3.1-8b-instant"): """Calls Groq API.""" api_key = os.environ.get("GROQ_API_KEY") client = Groq(api_key=api_key) return client.chat.completions.create( messages=messages, model=model ) def execute_and_format_query(self, generated_sql, user_message): logger.info(f"Generated SQL text:\n{generated_sql}") # --- ROBUST SQL CLEANING --- sql_match = re.search(r"```sql\s*(.*?)\s*```", generated_sql, re.DOTALL | re.IGNORECASE) if sql_match: query = sql_match.group(1).strip() else: fallback_match = re.search(r"(SELECT[\s\S]*)", generated_sql, re.IGNORECASE) query = fallback_match.group(1).strip() if fallback_match else generated_sql query = re.sub(r';\s*(?=union)', ' ', query, flags=re.IGNORECASE) query = query.split(';')[0].strip() + ';' logger.info(f"Cleaned SQL query: {query}") Qres = [] if not self.db: return "Error: Database connection is not available." conn = None try: conn = self.db.getconn() with conn.cursor() as cur: cur.execute(query) Qres = cur.fetchall() logger.info(f"Result Fetched: {str(Qres)[:100]}...") except Exception as e: logger.error(f"Error executing query: {e}") if conn: conn.rollback() return f"An error occurred while executing the query: {e}" finally: if conn: self.db.putconn(conn) messages = [ { "role": "system", "content": """Task : Your main goal is to make SQL query results easy to interpret for users who may not have technical backgrounds while ensuring all information is correct and clear. user will give the conversation history, which contains ONLY the user's messages. Your task is to generate a response based on this history. You have electric monitoring systems data, of 4 locations : CNS Equipment Room Glide Path Localizer DVOR NEVER CHANGE THE ACTUAL DATA. USER PROVIDED DATA SHOULD BE AS THEY ARE DONT EVEN TRY TO CONVERT THEM, LIKE FOR ENERGY TO KWH. THEY ARE ALREADY IN KWH FORMAT. keep this in mind while making response that you have electric data so form them correctly with their units, there will be current, voltage, energy, power factor ,etc. the r,y,b will be denoting the different phases such as red, yellow and blue phase. There are not any phases in energy , frequency data. means they are regular data, they dont have any phases. 1. Receive SQL Query Results: When given an SQL query result, your task is to format it professionally and clearly so that it is easy to read and understand. 2. Structure the Answer: Tables: If the SQL query result contains rows and columns, format the output as a neat table. Bullet Points or Lists: Use bullet points or structured lists if the results are better conveyed this way. 3. Contextual Information: Add brief, clear explanations where necessary to provide context or meaning behind the data, ensuring the user understands what the result represents. 4. Formatting Example: Guidelines: Maintain a clean and simple presentation. When needed, include context or analysis like trends, anomalies, or insights from the data. The final answer should only include the well-formatted result and necessary explanation—no technical jargon or SQL-specific terms. 5. NOTE : Read the fluctuations or anomaly data and notice them, if they are in percentage or the actual values for show them with units.(all data like 234,256,.. then volts, and if all data like 3.4, 6.56, 11.34, then %) Never use this types of words in the final answer like "Based on the provided SQL query " or anything that indicates towards the sql query. just give natural answers as human can understand without any technical things like the sql related things. NEVER CHANGE THE VALUES FOR THE ENERGY REPORT, DONT YOU DARE CHANGING THEM. KEEP IT AS THEY ARE. """ }, { "role": "user", "content": f"{user_message} \nThis was the query : {query} \nAnd Here is the query result : {str(Qres)}" } ] try: ai_msg = self.call_llm(messages) response_content = ai_msg.choices[0].message.content logger.info("Formatted response generated successfully.") return response_content except Exception as e: logger.error(f"Error calling LLM for formatting: {e}") return "Failed to format response via LLM." def close_connections(self): if hasattr(self, 'db') and self.db: self.db.closeall() logger.info("PostgreSQL connection pool closed.") # --- Initialization & Safe Execution --- if __name__ == "__main__": bot = None try: bot = EnergyIntelligenceBot() # Hardware Check device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Using device: {device}") if device == "cpu": logger.warning("No GPU found. Running this model on CPU will be extremely slow.") else: # GPU optimizations for inference torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True logger.info(f"GPU optimizations enabled for {torch.cuda.get_device_name()}") base_model_id = "Qwen/Qwen2.5-7B-Instruct" adapter_path = "./" # Check if adapter exists if not os.path.exists(adapter_path) and adapter_path != "./": logger.warning(f"Adapter path {adapter_path} not found. Ensure the path is correct.") # VRAM check for RTX 2050 4GB if torch.cuda.is_available(): total_vram = torch.cuda.get_device_properties(0).total_memory / 1024**3 logger.info(f"Total VRAM: {total_vram:.1f} GB") if total_vram < 5: logger.warning("Low VRAM detected (<5GB). Using aggressive offloading.") if torch.cuda.is_available(): # Optimized 4-bit configuration for RTX 2050 4GB bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16 ) logger.info("Loading base model with AGGRESSIVE GPU quantization...") base_model = AutoModelForCausalLM.from_pretrained( base_model_id, dtype=torch.float16, device_map="auto", # Changed from "cuda:auto" to "auto" to better handle hybrid GPUs quantization_config=bnb_config, trust_remote_code=True, low_cpu_mem_usage=True, # Critical to prevent RAM spike max_memory={0: "3.5GiB", "cpu": "8GiB"} # Restricted RAM usage to keep system stable ) else: logger.info("Loading base model on CPU (no quantization)...") if torch.cuda.is_available(): logger.info(f"Model is actually on: {base_model.device}") logger.info(f"VRAM used: {torch.cuda.memory_allocated(0)/1024**3:.2f}GB") base_model = AutoModelForCausalLM.from_pretrained( base_model_id, dtype=torch.float16, device_map="cpu", trust_remote_code=True, low_cpu_mem_usage=True ) # Verification check logger.info(f"Model placement complete.") if torch.cuda.is_available(): logger.info(f"Model is actually on: {base_model.device}") logger.info(f"VRAM used: {torch.cuda.memory_allocated(0)/1024**3:.2f}GB") if "cpu" in str(base_model.device): logger.warning("MODEL IS ON CPU! Bitsandbytes may be failing to find CUDA kernels.") tokenizer = AutoTokenizer.from_pretrained(base_model_id, trust_remote_code=True) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token logger.info("Set pad_token to eos_token") logger.info("Loading adapter...") model = PeftModel.from_pretrained(base_model, adapter_path) model.eval() if device == "cuda": logger.info("CPU offload ready (disabled due to PeftModel compatibility)") print("Enter 'exit' to quit.") while True: user_question = input("Enter your question: ").strip().lower() if user_question == "exit": print("Exiting...") break if not user_question: print("Please enter a question.") continue prompt = f"generate the sql for this:{user_question.capitalize()}" messages = [ {"role": "system", "content": """You are an expert NLP-to-SQL agent. Database table is 'main_cns' with energy monitoring data. CRITICAL RULES: - ONLY generate ONE real SELECT query for 'main_cns' table. - NO examples, fictional tables (like 'energy'), multiple queries, or explanations. - Output ONLY the SQL query inside ```sql ... ``` block. - STRICTLY READ-ONLY SELECT statements. No INSERT/UPDATE/DELETE."""}, {"role": "user", "content": prompt} ] logger.info("Processing inputs...") text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = tokenizer(text, return_tensors="pt").to(model.device) logger.info("Generating response...") with torch.no_grad(), torch.cuda.amp.autocast(dtype=torch.float16): generated_ids = model.generate( **inputs, max_new_tokens=256, do_sample=False, pad_token_id=tokenizer.eos_token_id, use_cache=True ) generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] output_text = tokenizer.batch_decode(generated_ids_trimmed, skip_special_tokens=True) generated_sql = output_text[0] print("\n--- Model Response ---") print(generated_sql) if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info(f"Post-gen VRAM: {torch.cuda.memory_allocated(0)/1024**3:.1f}GB") # Final formatting and DB execution print("\n--- Executing SQL and Formatting Results ---") formatted_response = bot.execute_and_format_query(generated_sql, prompt.capitalize()) print("\n--- Formatted Response ---") print(formatted_response) print("\n" + "="*80 + "\n") except Exception as e: logger.critical(f"Application crashed: {e}") finally: # Resource cleanup if bot: bot.close_connections() if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info("GPU cache cleared.")