| | import json |
| | import random |
| | import time |
| | import pandas as pd |
| | from openai import OpenAI |
| | from Messaging_system.LLMR import LLMR |
| | from Messaging_system.PromptGenerator import PromptGenerator |
| | from Messaging_system.PromptEng import PromptEngine |
| | from Messaging_system.protection_layer import ProtectionLayer |
| | import openai |
| | from Messaging_system.LLM import LLM |
| | from copy import deepcopy |
| | from Messaging_system.Homepage_Recommender import DefaultRec |
| |
|
| | class MultiMessage: |
| | def __init__(self, CoreConfig): |
| | """ |
| | Class that generates a sequence of messages (multi-step push notifications) |
| | for each user, building on previously generated messages. |
| | """ |
| | self.Core = CoreConfig |
| | self.llm = LLM(CoreConfig) |
| | self.defaultRec = DefaultRec(CoreConfig) |
| | self.promptGen=PromptGenerator(self.Core) |
| |
|
| | if self.Core.involve_recsys_result: |
| | self.llmr = LLMR(CoreConfig, random=True) |
| |
|
| | |
| | def generate_multi_messages(self, user): |
| | """ |
| | Generates multiple messages per user, storing them in a single JSON structure. |
| | The first message is assumed to already exist in user["message"]. |
| | Subsequent messages are generated by referencing all previously generated ones. |
| | """ |
| | first_message_str = user.get("message", None) |
| | if first_message_str is None: |
| | print("No initial message found; cannot build a multi-message sequence.") |
| | return None |
| |
|
| | try: |
| | first_message_dict = json.loads(first_message_str) |
| | except (json.JSONDecodeError, TypeError): |
| | print("Could not parse the first message as JSON. Returning None.") |
| | return None |
| |
|
| | message_sequence = [first_message_dict] |
| |
|
| | |
| | total_configured = len(self.Core.subsequent_examples) + 1 |
| | to_generate = max(0, total_configured - 1) |
| |
|
| | |
| | idx = self._get_user_idx(user) |
| |
|
| | for i in range(to_generate): |
| | |
| | msg_number = i + 2 |
| |
|
| | |
| | recommendation_info = content_info = recsys_json = None |
| | zero_tokens = {"prompt_tokens": 0, "completion_tokens": 0} |
| |
|
| | if getattr(self.Core, "involve_recsys_result", False): |
| | rec_info, cinfo, rjson = self.select_next_recommendation(user) |
| | recommendation_info, content_info, recsys_json = rec_info, cinfo, rjson |
| |
|
| | if recommendation_info is None: |
| | |
| | content_id = self.defaultRec.recommendation |
| | content_info = self.defaultRec.recommendation_info |
| | recsys_json = self.defaultRec.for_you_url |
| | |
| | user = self._update_user_fields(idx, user,{ |
| | "recommendation": recommendation_info, |
| | "recommendation_info": content_info, |
| | "recsys_result": recsys_json |
| | }) |
| |
|
| | |
| | next_msg_raw = self.generate_next_messages(message_sequence, msg_number, user) |
| | if next_msg_raw is None: |
| | print(f"Could not generate the message for step {msg_number}. Stopping.") |
| | break |
| |
|
| | |
| | criticized_msg = next_msg_raw |
| |
|
| | |
| | parsed_output_str = self.parsing_output_message(criticized_msg, user) |
| | if not parsed_output_str: |
| | print(f"Parsing output failed for step {msg_number}. Stopping.") |
| | break |
| |
|
| | try: |
| | parsed_output_dict = json.loads(parsed_output_str) |
| | except json.JSONDecodeError: |
| | print(f"Could not parse the new message as JSON for step {msg_number}. Stopping.") |
| | break |
| |
|
| | message_sequence.append(parsed_output_dict) |
| |
|
| | final_structure = {"messages_sequence": message_sequence} |
| | return json.dumps(final_structure, ensure_ascii=False) |
| |
|
| | |
| | def generate_next_messages(self, previous_messages, step, user): |
| | """ |
| | Uses only the last two previously generated messages to produce the next message. |
| | Returns a *raw* dictionary (header, message, etc.) from the LLM. |
| | |
| | :param previous_messages: A list of dicts, each containing at least "header" and "message". |
| | :param step: The 1-based index of the message we’re about to generate. |
| | :return: A dictionary from LLM (with 'header' and 'message'), or None if generation fails. |
| | """ |
| | |
| | if len(previous_messages) > 2: |
| | context = previous_messages[-2:] |
| | else: |
| | context = previous_messages |
| |
|
| | |
| | prompt = self.generate_prompt(context, step, user) |
| |
|
| | |
| |
|
| | |
| | response_dict = self.llm.get_response(prompt=prompt, instructions=self.llm_instructions()) |
| |
|
| | return response_dict |
| |
|
| | |
| | def get_examples(self, step): |
| | """ |
| | providing examples and instructions |
| | :return: |
| | """ |
| |
|
| | if self.Core.subsequent_examples is not None: |
| |
|
| |
|
| | instructions = f""" |
| | Below are the available options to select the best header and message: |
| | ### **Available options:** |
| | |
| | {self.Core.subsequent_examples[step]} |
| | """ |
| | return instructions |
| | else: |
| | return "" |
| |
|
| |
|
| | |
| | def generate_prompt(self, previous_messages, step, user): |
| | """ |
| | Creates a prompt to feed to the LLM, incorporating 3 previously generated messages. |
| | |
| | :param previous_messages: A list of dicts, each containing 'header' and 'message'. |
| | :return: A user-facing prompt string instructing the model to produce a new message. |
| | """ |
| | |
| | recent_messages = previous_messages[-3:] |
| |
|
| | previous_text = [] |
| | for i, m in enumerate(recent_messages, start=1): |
| | header = m.get("header", "").strip() |
| | body = m.get("message", "").strip() |
| | previous_text.append(f"Message {i}: (Header) {header}\n (Body) {body}") |
| |
|
| | |
| | previous_text_str = "\n\n".join(previous_text) |
| |
|
| | user_info = self.promptGen.get_user_profile(user=user) |
| | input_context = self.promptGen.input_context() |
| | recommendation_instructions = self.promptGen.recommendations_instructions(user) |
| | output_instructions = self.promptGen.output_instruction() |
| | examples = self.get_examples(step) |
| |
|
| |
|
| | |
| | prompt = f""" |
| | We have previously sent these push notifications to the user and The user has not re-engaged yet: |
| | |
| | ** Previous messages ** |
| | {previous_text_str} |
| | |
| | {input_context} |
| | - The final header and message should be different from previous headers and messages and we should not have similar words and phrases from previous sends. |
| | |
| | {examples} |
| | |
| | {user_info} |
| | |
| | {recommendation_instructions} |
| | |
| | {output_instructions} |
| | """ |
| |
|
| | return prompt |
| |
|
| | |
| | def parsing_output_message(self, message, user): |
| | """ |
| | Parses the output JSON from the LLM and enriches it with additional content |
| | information if needed (e.g., from recsys). Re-uses the logic from the single-message |
| | pipeline to keep the results consistent. |
| | |
| | :param message: Output JSON *dictionary* from the LLM (with at least "message" and "header"). |
| | :param user: The user row dictionary. |
| | :return: A valid JSON string or None if the structure is invalid. |
| | """ |
| | if self.Core.involve_recsys_result: |
| | |
| | output_message = self.fetch_recommendation_data(user, message) |
| | elif self.Core.messaging_mode == "recommend_playlist": |
| | |
| | if "playlist_id" in message and "message" in message: |
| | playlist_id = str(message["playlist_id"]) |
| | web_url_path = f"https://www.musora.com/{self.Core.brand}/playlist/{playlist_id}" |
| | output_message = { |
| | "header": message.get("header", ""), |
| | "message": message.get("message", ""), |
| | "playlist_id": int(message["playlist_id"]), |
| | "web_url_path": web_url_path, |
| | } |
| | else: |
| | print("LLM output is missing either 'playlist_id' or 'message'.") |
| | return None |
| | else: |
| | |
| | if "message" not in message or "header" not in message: |
| | print("LLM output is missing 'header' or 'message'.") |
| | return None |
| | output_message = { |
| | "header": message["header"], |
| | "message": message["message"] |
| | } |
| |
|
| | return json.dumps(output_message, ensure_ascii=False) |
| |
|
| | |
| | def fetch_recommendation_data(self, user, message): |
| |
|
| | if user["recommendation"] == "for_you": |
| | output_message = { |
| | "header": message.get("header"), |
| | "message": message.get("message"), |
| | "content_id": None, |
| | "web_url_path": user["recsys_result"], |
| | "title": user["recommendation"], |
| | "thumbnail_url": None |
| | } |
| | else: |
| | recommendation_dict = user["recommendation"] |
| | content_id = int(recommendation_dict["content_id"]) |
| |
|
| | |
| | web_url_path = recommendation_dict["web_url_path"] |
| | title = recommendation_dict["title"] |
| | thumbnail_url = recommendation_dict["thumbnail_url"] |
| |
|
| | msg = message.get("message") |
| | if isinstance(msg, str): |
| | msg = msg.replace('\\', '').replace('"', '') |
| | else: |
| | msg = str(msg) |
| |
|
| | message["message"] = msg |
| |
|
| | |
| |
|
| | |
| | output_message = { |
| | "header": message.get("header"), |
| | "message": message.get("message"), |
| | "content_id": content_id, |
| | "web_url_path": web_url_path, |
| | "title": title, |
| | "thumbnail_url": thumbnail_url |
| | } |
| | return output_message |
| |
|
| | |
| | def _remove_from_all(self, recsys_dict, cid): |
| | for sec, recs in list(recsys_dict.items()): |
| | if isinstance(recs, list): |
| | recsys_dict[sec] = [r for r in recs if r.get("content_id") != cid] |
| | return recsys_dict |
| |
|
| | |
| | def _lookup_content_info(self, cid): |
| | row = self.Core.content_info[self.Core.content_info["content_id"] == cid] |
| | return row["content_info"].iloc[0] if not row.empty else None |
| |
|
| | |
| |
|
| | def select_next_recommendation(self, user): |
| | """ |
| | Select next recommendation from the user's current recsys_result. |
| | Returns: content_id, content_info, updated_recsys_json |
| | """ |
| | self.llmr.user = user |
| | cid, cinfo, updated_json, _ = self.llmr._get_recommendation() |
| | return cid, cinfo, updated_json |
| |
|
| | |
| | def _get_user_idx(self, u): |
| | |
| | if isinstance(u, pd.Series) and u.name in self.Core.users_df.index: |
| | return u.name |
| | |
| | key_col = "user_id" if "user_id" in self.Core.users_df.columns else None |
| | if key_col and key_col in u: |
| | matches = self.Core.users_df.index[self.Core.users_df[key_col] == u[key_col]] |
| | if len(matches): |
| | return matches[0] |
| | |
| | try: |
| | return self.Core.users_df.index[self.Core.users_df.eq(pd.Series(u)).all(1)][0] |
| | except Exception: |
| | return None |
| | |
| | def _update_user_fields(self, idx, user, fields: dict): |
| | """Update DF row and return a fresh copy of the user row (Series) with those fields reflected.""" |
| | if idx is None: |
| | |
| | for k, v in fields.items(): |
| | user[k] = v |
| | return user |
| | for k, v in fields.items(): |
| | self.Core.users_df.at[idx, k] = v |
| | return self.Core.users_df.loc[idx] |
| |
|
| | |
| | |
| |
|
| | def llm_instructions(self): |
| | """ |
| | Setting instructions for llm |
| | :return: instructions as string |
| | """ |
| | banned_phrases = "\n".join(f"- {word}" for word in self.Core.config_file["AI_Jargon"]) |
| | jargon_list = "\n".join(f"- {word}" for word in self.Core.config_file[f"AI_phrases_{self.Core.brand}"]) |
| |
|
| | if self.Core.personalization: |
| | instructions = f""" |
| | Your task is to select the best 'header' and a 'message' for a {self.Core.get_instrument()} student as a push notification. |
| | Based on the user instructions, you might need to **modify the selected option** very minimal and slightly to improve personalization if capable while preserving the original brand voice, tone, rhythm, and structure. |
| | |
| | **Important Note**: header < {self.Core.config_file["header_limit"]} and message < {self.Core.config_file["message_limit"]} characters. |
| | **Important Note**: NEVER use time-related words (“new,” “recent,” “latest,” etc.) and NEVER imply recency in any way. |
| | |
| | ### Don't use below phrases, words, or similar variations of them: |
| | {banned_phrases} |
| | {jargon_list} |
| | """ |
| |
|
| | else: |
| | instructions = f""" |
| | Your task is to select the best 'header' and a 'message' for a {self.Core.get_instrument()} student as a push notification. |
| | DO NOT **change** or **modify** or **add to** the selected option in any shape or form. **Use the exact original selected header and message without ANY change** |
| | |
| | """ |
| |
|
| |
|
| | return instructions |
| |
|
| |
|