Danialebrat's picture
adjusting prompts
d3ae060
import json
import random
import time
import pandas as pd
from openai import OpenAI
from Messaging_system.LLMR import LLMR
from Messaging_system.PromptGenerator import PromptGenerator
from Messaging_system.PromptEng import PromptEngine
from Messaging_system.protection_layer import ProtectionLayer
import openai
from Messaging_system.LLM import LLM
from copy import deepcopy
from Messaging_system.Homepage_Recommender import DefaultRec
class MultiMessage:
def __init__(self, CoreConfig):
"""
Class that generates a sequence of messages (multi-step push notifications)
for each user, building on previously generated messages.
"""
self.Core = CoreConfig
self.llm = LLM(CoreConfig)
self.defaultRec = DefaultRec(CoreConfig)
self.promptGen=PromptGenerator(self.Core)
if self.Core.involve_recsys_result:
self.llmr = LLMR(CoreConfig, random=True)
# ==============================================================
def generate_multi_messages(self, user):
"""
Generates multiple messages per user, storing them in a single JSON structure.
The first message is assumed to already exist in user["message"].
Subsequent messages are generated by referencing all previously generated ones.
"""
first_message_str = user.get("message", None)
if first_message_str is None:
print("No initial message found; cannot build a multi-message sequence.")
return None
try:
first_message_dict = json.loads(first_message_str)
except (json.JSONDecodeError, TypeError):
print("Could not parse the first message as JSON. Returning None.")
return None
message_sequence = [first_message_dict]
# how many total messages you want (self.Core.subsequence_messages is a dict)
total_configured = len(self.Core.subsequent_examples) + 1 # includes the first
to_generate = max(0, total_configured - 1)
# figure out DF index once
idx = self._get_user_idx(user)
for i in range(to_generate):
# The ordinal number of the next message in the sequence (first was #1)
msg_number = i + 2
# ---- (A) pick the next recommendation BEFORE generating the text if required ----
recommendation_info = content_info = recsys_json = None
zero_tokens = {"prompt_tokens": 0, "completion_tokens": 0}
if getattr(self.Core, "involve_recsys_result", False):
rec_info, cinfo, rjson = self.select_next_recommendation(user)
recommendation_info, content_info, recsys_json = rec_info, cinfo, rjson
if recommendation_info is None:
# fallback
content_id = self.defaultRec.recommendation
content_info = self.defaultRec.recommendation_info
recsys_json = self.defaultRec.for_you_url
# Update DF and local user snapshot
user = self._update_user_fields(idx, user,{
"recommendation": recommendation_info,
"recommendation_info": content_info,
"recsys_result": recsys_json
})
# ---- (B) actually generate the next message; hand it the UPDATED user ----
next_msg_raw = self.generate_next_messages(message_sequence, msg_number, user)
if next_msg_raw is None:
print(f"Could not generate the message for step {msg_number}. Stopping.")
break
# If you have a protection layer, call it here (omitted for brevity)
criticized_msg = next_msg_raw
# ---- (C) Parse & validate ----
parsed_output_str = self.parsing_output_message(criticized_msg, user)
if not parsed_output_str:
print(f"Parsing output failed for step {msg_number}. Stopping.")
break
try:
parsed_output_dict = json.loads(parsed_output_str)
except json.JSONDecodeError:
print(f"Could not parse the new message as JSON for step {msg_number}. Stopping.")
break
message_sequence.append(parsed_output_dict)
final_structure = {"messages_sequence": message_sequence}
return json.dumps(final_structure, ensure_ascii=False)
# --------------------------------------------------------------
def generate_next_messages(self, previous_messages, step, user):
"""
Uses only the last two previously generated messages to produce the next message.
Returns a *raw* dictionary (header, message, etc.) from the LLM.
:param previous_messages: A list of dicts, each containing at least "header" and "message".
:param step: The 1-based index of the message we’re about to generate.
:return: A dictionary from LLM (with 'header' and 'message'), or None if generation fails.
"""
# Only keep up to the last two messages
if len(previous_messages) > 2:
context = previous_messages[-2:]
else:
context = previous_messages
# 1) Build a prompt that includes only those last two messages
prompt = self.generate_prompt(context, step, user)
# new_prompt = self.engine.prompt_engineering(prompt)
# 2) Call our existing LLM routine
response_dict = self.llm.get_response(prompt=prompt, instructions=self.llm_instructions())
return response_dict
# ===============================================================
def get_examples(self, step):
"""
providing examples and instructions
:return:
"""
if self.Core.subsequent_examples is not None:
instructions = f"""
Below are the available options to select the best header and message:
### **Available options:**
{self.Core.subsequent_examples[step]}
"""
return instructions
else:
return ""
# --------------------------------------------------------------
def generate_prompt(self, previous_messages, step, user):
"""
Creates a prompt to feed to the LLM, incorporating 3 previously generated messages.
:param previous_messages: A list of dicts, each containing 'header' and 'message'.
:return: A user-facing prompt string instructing the model to produce a new message.
"""
# Build a textual summary of previous messages - last three
recent_messages = previous_messages[-3:]
previous_text = []
for i, m in enumerate(recent_messages, start=1):
header = m.get("header", "").strip()
body = m.get("message", "").strip()
previous_text.append(f"Message {i}: (Header) {header}\n (Body) {body}")
# Combine into a single string
previous_text_str = "\n\n".join(previous_text)
user_info = self.promptGen.get_user_profile(user=user)
input_context = self.promptGen.input_context()
recommendation_instructions = self.promptGen.recommendations_instructions(user)
output_instructions = self.promptGen.output_instruction()
examples = self.get_examples(step)
# Craft the prompt
prompt = f"""
We have previously sent these push notifications to the user and The user has not re-engaged yet:
** Previous messages **
{previous_text_str}
{input_context}
- The final header and message should be different from previous headers and messages and we should not have similar words and phrases from previous sends.
{examples}
{user_info}
{recommendation_instructions}
{output_instructions}
"""
return prompt
# =============================================================================
def parsing_output_message(self, message, user):
"""
Parses the output JSON from the LLM and enriches it with additional content
information if needed (e.g., from recsys). Re-uses the logic from the single-message
pipeline to keep the results consistent.
:param message: Output JSON *dictionary* from the LLM (with at least "message" and "header").
:param user: The user row dictionary.
:return: A valid JSON string or None if the structure is invalid.
"""
if self.Core.involve_recsys_result:
# If recsys is used, fetch recommendation data
output_message = self.fetch_recommendation_data(user, message)
elif self.Core.messaging_mode == "recommend_playlist":
# If recommending a playlist, add the relevant fields
if "playlist_id" in message and "message" in message:
playlist_id = str(message["playlist_id"])
web_url_path = f"https://www.musora.com/{self.Core.brand}/playlist/{playlist_id}"
output_message = {
"header": message.get("header", ""),
"message": message.get("message", ""),
"playlist_id": int(message["playlist_id"]),
"web_url_path": web_url_path,
}
else:
print("LLM output is missing either 'playlist_id' or 'message'.")
return None
else:
# Basic scenario: Only 'header' and 'message' expected
if "message" not in message or "header" not in message:
print("LLM output is missing 'header' or 'message'.")
return None
output_message = {
"header": message["header"],
"message": message["message"]
}
return json.dumps(output_message, ensure_ascii=False)
# --------------------------------------------------------------
def fetch_recommendation_data(self, user, message):
if user["recommendation"] == "for_you":
output_message = {
"header": message.get("header"),
"message": message.get("message"),
"content_id": None,
"web_url_path": user["recsys_result"],
"title": user["recommendation"],
"thumbnail_url": None
}
else:
recommendation_dict = user["recommendation"]
content_id = int(recommendation_dict["content_id"])
# Extract required fields from found_item
web_url_path = recommendation_dict["web_url_path"]
title = recommendation_dict["title"]
thumbnail_url = recommendation_dict["thumbnail_url"]
msg = message.get("message")
if isinstance(msg, str):
msg = msg.replace('\\', '').replace('"', '')
else:
msg = str(msg) # or handle it differently if this shouldn't happen
message["message"] = msg
# message["message"].replace('\\', '').replace('"', '')
# Add these to the message dict
output_message = {
"header": message.get("header"),
"message": message.get("message"),
"content_id": content_id,
"web_url_path": web_url_path,
"title": title,
"thumbnail_url": thumbnail_url
}
return output_message
# ===============================================================
def _remove_from_all(self, recsys_dict, cid):
for sec, recs in list(recsys_dict.items()):
if isinstance(recs, list):
recsys_dict[sec] = [r for r in recs if r.get("content_id") != cid]
return recsys_dict
# ===============================================================
def _lookup_content_info(self, cid):
row = self.Core.content_info[self.Core.content_info["content_id"] == cid]
return row["content_info"].iloc[0] if not row.empty else None
# ===============================================================
def select_next_recommendation(self, user):
"""
Select next recommendation from the user's current recsys_result.
Returns: content_id, content_info, updated_recsys_json
"""
self.llmr.user = user # _get_recommendation expects self.user to be set
cid, cinfo, updated_json, _ = self.llmr._get_recommendation()
return cid, cinfo, updated_json
# ==============================================================
def _get_user_idx(self, u):
# If it's a Series, its index label is usually the row index
if isinstance(u, pd.Series) and u.name in self.Core.users_df.index:
return u.name
# Otherwise try a stable key like user_id (change if your key is different)
key_col = "user_id" if "user_id" in self.Core.users_df.columns else None
if key_col and key_col in u:
matches = self.Core.users_df.index[self.Core.users_df[key_col] == u[key_col]]
if len(matches):
return matches[0]
# Fallback: try exact row equality (last resort; slower)
try:
return self.Core.users_df.index[self.Core.users_df.eq(pd.Series(u)).all(1)][0]
except Exception:
return None
# =============================================================
def _update_user_fields(self, idx, user, fields: dict):
"""Update DF row and return a fresh copy of the user row (Series) with those fields reflected."""
if idx is None:
# no index? just mutate the local dict/Series
for k, v in fields.items():
user[k] = v
return user
for k, v in fields.items():
self.Core.users_df.at[idx, k] = v
return self.Core.users_df.loc[idx]
# --------------------------------------------------------------
# --------------------------------------------------------------
def llm_instructions(self):
"""
Setting instructions for llm
:return: instructions as string
"""
banned_phrases = "\n".join(f"- {word}" for word in self.Core.config_file["AI_Jargon"])
jargon_list = "\n".join(f"- {word}" for word in self.Core.config_file[f"AI_phrases_{self.Core.brand}"])
if self.Core.personalization:
instructions = f"""
Your task is to select the best 'header' and a 'message' for a {self.Core.get_instrument()} student as a push notification.
Based on the user instructions, you might need to **modify the selected option** very minimal and slightly to improve personalization if capable while preserving the original brand voice, tone, rhythm, and structure.
**Important Note**: header < {self.Core.config_file["header_limit"]} and message < {self.Core.config_file["message_limit"]} characters.
**Important Note**: NEVER use time-related words (“new,” “recent,” “latest,” etc.) and NEVER imply recency in any way.
### Don't use below phrases, words, or similar variations of them:
{banned_phrases}
{jargon_list}
"""
else:
instructions = f"""
Your task is to select the best 'header' and a 'message' for a {self.Core.get_instrument()} student as a push notification.
DO NOT **change** or **modify** or **add to** the selected option in any shape or form. **Use the exact original selected header and message without ANY change**
"""
return instructions