import json import random import time import pandas as pd from openai import OpenAI from Messaging_system.LLMR import LLMR from Messaging_system.PromptGenerator import PromptGenerator from Messaging_system.PromptEng import PromptEngine from Messaging_system.protection_layer import ProtectionLayer import openai from Messaging_system.LLM import LLM from copy import deepcopy from Messaging_system.Homepage_Recommender import DefaultRec class MultiMessage: def __init__(self, CoreConfig): """ Class that generates a sequence of messages (multi-step push notifications) for each user, building on previously generated messages. """ self.Core = CoreConfig self.llm = LLM(CoreConfig) self.defaultRec = DefaultRec(CoreConfig) self.promptGen=PromptGenerator(self.Core) if self.Core.involve_recsys_result: self.llmr = LLMR(CoreConfig, random=True) # ============================================================== def generate_multi_messages(self, user): """ Generates multiple messages per user, storing them in a single JSON structure. The first message is assumed to already exist in user["message"]. Subsequent messages are generated by referencing all previously generated ones. """ first_message_str = user.get("message", None) if first_message_str is None: print("No initial message found; cannot build a multi-message sequence.") return None try: first_message_dict = json.loads(first_message_str) except (json.JSONDecodeError, TypeError): print("Could not parse the first message as JSON. Returning None.") return None message_sequence = [first_message_dict] # how many total messages you want (self.Core.subsequence_messages is a dict) total_configured = len(self.Core.subsequent_examples) + 1 # includes the first to_generate = max(0, total_configured - 1) # figure out DF index once idx = self._get_user_idx(user) for i in range(to_generate): # The ordinal number of the next message in the sequence (first was #1) msg_number = i + 2 # ---- (A) pick the next recommendation BEFORE generating the text if required ---- recommendation_info = content_info = recsys_json = None zero_tokens = {"prompt_tokens": 0, "completion_tokens": 0} if getattr(self.Core, "involve_recsys_result", False): rec_info, cinfo, rjson = self.select_next_recommendation(user) recommendation_info, content_info, recsys_json = rec_info, cinfo, rjson if recommendation_info is None: # fallback content_id = self.defaultRec.recommendation content_info = self.defaultRec.recommendation_info recsys_json = self.defaultRec.for_you_url # Update DF and local user snapshot user = self._update_user_fields(idx, user,{ "recommendation": recommendation_info, "recommendation_info": content_info, "recsys_result": recsys_json }) # ---- (B) actually generate the next message; hand it the UPDATED user ---- next_msg_raw = self.generate_next_messages(message_sequence, msg_number, user) if next_msg_raw is None: print(f"Could not generate the message for step {msg_number}. Stopping.") break # If you have a protection layer, call it here (omitted for brevity) criticized_msg = next_msg_raw # ---- (C) Parse & validate ---- parsed_output_str = self.parsing_output_message(criticized_msg, user) if not parsed_output_str: print(f"Parsing output failed for step {msg_number}. Stopping.") break try: parsed_output_dict = json.loads(parsed_output_str) except json.JSONDecodeError: print(f"Could not parse the new message as JSON for step {msg_number}. Stopping.") break message_sequence.append(parsed_output_dict) final_structure = {"messages_sequence": message_sequence} return json.dumps(final_structure, ensure_ascii=False) # -------------------------------------------------------------- def generate_next_messages(self, previous_messages, step, user): """ Uses only the last two previously generated messages to produce the next message. Returns a *raw* dictionary (header, message, etc.) from the LLM. :param previous_messages: A list of dicts, each containing at least "header" and "message". :param step: The 1-based index of the message we’re about to generate. :return: A dictionary from LLM (with 'header' and 'message'), or None if generation fails. """ # Only keep up to the last two messages if len(previous_messages) > 2: context = previous_messages[-2:] else: context = previous_messages # 1) Build a prompt that includes only those last two messages prompt = self.generate_prompt(context, step, user) # new_prompt = self.engine.prompt_engineering(prompt) # 2) Call our existing LLM routine response_dict = self.llm.get_response(prompt=prompt, instructions=self.llm_instructions()) return response_dict # =============================================================== def get_examples(self, step): """ providing examples and instructions :return: """ if self.Core.subsequent_examples is not None: instructions = f""" Below are the available options to select the best header and message: ### **Available options:** {self.Core.subsequent_examples[step]} """ return instructions else: return "" # -------------------------------------------------------------- def generate_prompt(self, previous_messages, step, user): """ Creates a prompt to feed to the LLM, incorporating 3 previously generated messages. :param previous_messages: A list of dicts, each containing 'header' and 'message'. :return: A user-facing prompt string instructing the model to produce a new message. """ # Build a textual summary of previous messages - last three recent_messages = previous_messages[-3:] previous_text = [] for i, m in enumerate(recent_messages, start=1): header = m.get("header", "").strip() body = m.get("message", "").strip() previous_text.append(f"Message {i}: (Header) {header}\n (Body) {body}") # Combine into a single string previous_text_str = "\n\n".join(previous_text) user_info = self.promptGen.get_user_profile(user=user) input_context = self.promptGen.input_context() recommendation_instructions = self.promptGen.recommendations_instructions(user) output_instructions = self.promptGen.output_instruction() examples = self.get_examples(step) # Craft the prompt prompt = f""" We have previously sent these push notifications to the user and The user has not re-engaged yet: ** Previous messages ** {previous_text_str} {input_context} - The final header and message should be different from previous headers and messages and we should not have similar words and phrases from previous sends. {examples} {user_info} {recommendation_instructions} {output_instructions} """ return prompt # ============================================================================= def parsing_output_message(self, message, user): """ Parses the output JSON from the LLM and enriches it with additional content information if needed (e.g., from recsys). Re-uses the logic from the single-message pipeline to keep the results consistent. :param message: Output JSON *dictionary* from the LLM (with at least "message" and "header"). :param user: The user row dictionary. :return: A valid JSON string or None if the structure is invalid. """ if self.Core.involve_recsys_result: # If recsys is used, fetch recommendation data output_message = self.fetch_recommendation_data(user, message) elif self.Core.messaging_mode == "recommend_playlist": # If recommending a playlist, add the relevant fields if "playlist_id" in message and "message" in message: playlist_id = str(message["playlist_id"]) web_url_path = f"https://www.musora.com/{self.Core.brand}/playlist/{playlist_id}" output_message = { "header": message.get("header", ""), "message": message.get("message", ""), "playlist_id": int(message["playlist_id"]), "web_url_path": web_url_path, } else: print("LLM output is missing either 'playlist_id' or 'message'.") return None else: # Basic scenario: Only 'header' and 'message' expected if "message" not in message or "header" not in message: print("LLM output is missing 'header' or 'message'.") return None output_message = { "header": message["header"], "message": message["message"] } return json.dumps(output_message, ensure_ascii=False) # -------------------------------------------------------------- def fetch_recommendation_data(self, user, message): if user["recommendation"] == "for_you": output_message = { "header": message.get("header"), "message": message.get("message"), "content_id": None, "web_url_path": user["recsys_result"], "title": user["recommendation"], "thumbnail_url": None } else: recommendation_dict = user["recommendation"] content_id = int(recommendation_dict["content_id"]) # Extract required fields from found_item web_url_path = recommendation_dict["web_url_path"] title = recommendation_dict["title"] thumbnail_url = recommendation_dict["thumbnail_url"] msg = message.get("message") if isinstance(msg, str): msg = msg.replace('\\', '').replace('"', '') else: msg = str(msg) # or handle it differently if this shouldn't happen message["message"] = msg # message["message"].replace('\\', '').replace('"', '') # Add these to the message dict output_message = { "header": message.get("header"), "message": message.get("message"), "content_id": content_id, "web_url_path": web_url_path, "title": title, "thumbnail_url": thumbnail_url } return output_message # =============================================================== def _remove_from_all(self, recsys_dict, cid): for sec, recs in list(recsys_dict.items()): if isinstance(recs, list): recsys_dict[sec] = [r for r in recs if r.get("content_id") != cid] return recsys_dict # =============================================================== def _lookup_content_info(self, cid): row = self.Core.content_info[self.Core.content_info["content_id"] == cid] return row["content_info"].iloc[0] if not row.empty else None # =============================================================== def select_next_recommendation(self, user): """ Select next recommendation from the user's current recsys_result. Returns: content_id, content_info, updated_recsys_json """ self.llmr.user = user # _get_recommendation expects self.user to be set cid, cinfo, updated_json, _ = self.llmr._get_recommendation() return cid, cinfo, updated_json # ============================================================== def _get_user_idx(self, u): # If it's a Series, its index label is usually the row index if isinstance(u, pd.Series) and u.name in self.Core.users_df.index: return u.name # Otherwise try a stable key like user_id (change if your key is different) key_col = "user_id" if "user_id" in self.Core.users_df.columns else None if key_col and key_col in u: matches = self.Core.users_df.index[self.Core.users_df[key_col] == u[key_col]] if len(matches): return matches[0] # Fallback: try exact row equality (last resort; slower) try: return self.Core.users_df.index[self.Core.users_df.eq(pd.Series(u)).all(1)][0] except Exception: return None # ============================================================= def _update_user_fields(self, idx, user, fields: dict): """Update DF row and return a fresh copy of the user row (Series) with those fields reflected.""" if idx is None: # no index? just mutate the local dict/Series for k, v in fields.items(): user[k] = v return user for k, v in fields.items(): self.Core.users_df.at[idx, k] = v return self.Core.users_df.loc[idx] # -------------------------------------------------------------- # -------------------------------------------------------------- def llm_instructions(self): """ Setting instructions for llm :return: instructions as string """ banned_phrases = "\n".join(f"- {word}" for word in self.Core.config_file["AI_Jargon"]) jargon_list = "\n".join(f"- {word}" for word in self.Core.config_file[f"AI_phrases_{self.Core.brand}"]) if self.Core.personalization: instructions = f""" Your task is to select the best 'header' and a 'message' for a {self.Core.get_instrument()} student as a push notification. Based on the user instructions, you might need to **modify the selected option** very minimal and slightly to improve personalization if capable while preserving the original brand voice, tone, rhythm, and structure. **Important Note**: header < {self.Core.config_file["header_limit"]} and message < {self.Core.config_file["message_limit"]} characters. **Important Note**: NEVER use time-related words (“new,” “recent,” “latest,” etc.) and NEVER imply recency in any way. ### Don't use below phrases, words, or similar variations of them: {banned_phrases} {jargon_list} """ else: instructions = f""" Your task is to select the best 'header' and a 'message' for a {self.Core.get_instrument()} student as a push notification. DO NOT **change** or **modify** or **add to** the selected option in any shape or form. **Use the exact original selected header and message without ANY change** """ return instructions