103 lines
4.0 KiB
Python
103 lines
4.0 KiB
Python
# bot/recommender.py
|
|
|
|
import openai
|
|
import os
|
|
from dotenv import load_dotenv
|
|
from typing import List, Dict, Literal
|
|
from library_cache import LibraryCache
|
|
from config import OPENAI_API_KEY, OPENAI_MODEL
|
|
|
|
load_dotenv()
|
|
|
|
openai.api_key = OPENAI_API_KEY
|
|
|
|
MediaType = Literal["movie", "tv"]
|
|
|
|
class Recommender:
|
|
def __init__(self):
|
|
self.cache = LibraryCache()
|
|
|
|
def recommend(self, watched_titles: List[str], media_type: MediaType = "movie", max_recs: int = 5) -> Dict[str, List[str]]:
|
|
if not watched_titles:
|
|
raise ValueError("No watched titles provided.")
|
|
|
|
available_titles = [item["title"] for item in self.cache.data if item["type"] == media_type]
|
|
prompt = self.build_prompt(watched_titles, available_titles, media_type, max_recs)
|
|
response = self.query_openai(prompt)
|
|
|
|
print("🧠 Prompt:", prompt)
|
|
print("📥 Raw response:", response)
|
|
|
|
all_titles = self.parse_titles(response) # ✅ Bring this back
|
|
|
|
# Re-verify locally
|
|
available = [title for title in all_titles if self.cache.search(title, media_type)]
|
|
requestable = [title for title in all_titles if title not in available]
|
|
for title in all_titles:
|
|
match = self.cache.search(title, media_type)
|
|
print(f"{'✅' if match else '❌'} {title}")
|
|
|
|
return {
|
|
"available": available[:max_recs],
|
|
"requestable": requestable[:max_recs]
|
|
}
|
|
|
|
|
|
def build_prompt(self, watched: List[str], available_titles: List[str], media_type: str, max_recs: int) -> str:
|
|
type_text = "movies" if media_type == "movie" else "TV shows"
|
|
|
|
return (
|
|
f"""
|
|
The user has recently watched the following {type_text}: {', '.join(watched[:10])}.
|
|
Here is a list of {type_text} available on the Plex server: {', '.join(available_titles[:200])}.
|
|
Recommend 10 similar {type_text}. Select 5 that the user would like that are available on the server and 5 that are not available.
|
|
Return only a plain comma-separated list of titles.
|
|
"""
|
|
|
|
)
|
|
|
|
def extract_common_genres(self, watched: List[str], media_type: str) -> str:
|
|
genre_counts = {}
|
|
for item in self.cache.data:
|
|
if item["title"] in watched and item["type"] == media_type:
|
|
for genre in item.get("genres", []):
|
|
genre_counts[genre] = genre_counts.get(genre, 0) + 1
|
|
sorted_genres = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)
|
|
top_genres = [g for g, _ in sorted_genres[:3]]
|
|
return ", ".join(top_genres) if top_genres else "varied genres"
|
|
|
|
def query_openai(self, prompt: str) -> str:
|
|
try:
|
|
response = openai.ChatCompletion.create(
|
|
model=OPENAI_MODEL,
|
|
messages=[
|
|
{"role": "system", "content": "You're a helpful and precise media recommender."},
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
temperature=0.4,
|
|
max_tokens=300
|
|
)
|
|
return response.choices[0].message.content
|
|
except Exception as e:
|
|
print("⚠️ OpenAI API error:", e)
|
|
return ""
|
|
def parse_titles(self, response: str) -> List[str]:
|
|
lines = response.replace("\n", ",").split(",")
|
|
cleaned = []
|
|
for item in lines:
|
|
item = item.strip()
|
|
item = item.lstrip("-•*0123456789. ").strip()
|
|
if item:
|
|
cleaned.append(item)
|
|
return cleaned
|
|
|
|
def parse_split_titles(self, response: str) -> (List[str], List[str]):
|
|
available = []
|
|
requestable = []
|
|
for line in response.splitlines():
|
|
if line.lower().startswith("on plex:"):
|
|
available = [t.strip() for t in line.split(":", 1)[1].split(",") if t.strip()]
|
|
elif line.lower().startswith("not on plex:"):
|
|
requestable = [t.strip() for t in line.split(":", 1)[1].split(",") if t.strip()]
|
|
return available, requestable
|