Working version

This commit is contained in:
2025-06-03 13:12:58 -04:00
parent e991862a01
commit 24dbe252de
3 changed files with 71 additions and 38 deletions

View File

@ -5,6 +5,7 @@ import os
import time
from typing import List
from plex_client import PlexClient
from difflib import SequenceMatcher
CACHE_FILE = 'bot/library_cache.json'
MAX_CACHE_AGE_HOURS = 6
@ -30,11 +31,16 @@ class LibraryCache:
new_data = []
for section in media:
if section.type in ['movie', 'show']:
section_type = section.type.lower()
# Normalize to 'show'
if section_type == 'show':
section_type = 'tv'
if section_type in ['movie', 'tv']:
for item in section.all():
new_data.append({
'title': item.title.strip(),
'type': section.type,
'type': section_type,
'genres': [g.tag for g in getattr(item, 'genres', [])],
'year': getattr(item, 'year', None)
})
@ -42,6 +48,7 @@ class LibraryCache:
self.data = new_data
self.save_cache()
def save_cache(self):
with open(CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
@ -53,9 +60,16 @@ class LibraryCache:
def get_titles_by_type(self, media_type: str) -> List[str]:
return [item['title'] for item in self.data if item['type'] == media_type]
def search(self, title: str, media_type: str = None) -> bool:
"""Check if title exists in library (case-insensitive, optional type filter)."""
def search(self, title: str, media_type: str) -> bool:
def is_match(a: str, b: str) -> bool:
ratio = SequenceMatcher(None, a.lower(), b.lower()).ratio()
return ratio > 0.8 # tweak as needed
for item in self.data:
if item['title'].lower() == title.lower() and (media_type is None or item['type'] == media_type):
if item["type"] != media_type:
continue
if is_match(title, item["title"]):
return True
return False

View File

@ -29,33 +29,39 @@ async def recommend(interaction: discord.Interaction, username: str, media_type:
await interaction.response.defer() # Acknowledge the request
try:
#users = tautulli._request("get_users")
#print("👤 Tautulli Users:")
#for u in users:
# print(f"{u.get('username')}")
# Convert Discord input to Plex-friendly type
history = tautulli.get_user_watch_history(username=username, media_type=media_type.lower())
if not history:
await interaction.followup.send(f"⚠️ No recent {media_type} history found for `{username}`.")
return
watched_titles = [entry["title"] for entry in history]
result = recommender.recommend(watched_titles, media_type=media_type.lower())
# Print cached Plex titles for this media_type
print(f"🗂️ Cached Plex {media_type}s:")
for item in recommender.cache.data:
if item["type"] == media_type.lower():
print(f"- {item['title']}")
response = f"🎬 **{media_type.title()} Recommendations for `{username}`:**\n\n"
if result["available"]:
response += "✅ **Available on Plex:**\n" + "\n".join(f"- {title}" for title in result["available"]) + "\n\n"
if result["requestable"]:
response += "🛒 **Requestable:**\n" + "\n".join(f"- {title}" for title in result["requestable"])
if not result["available"] and not result["requestable"]:
response += "❌ No recommendations found."
results = recommender.recommend(watched_titles, media_type=media_type.lower())
available = results["available"]
requestable = results["requestable"]
def format_recs(title_list):
return "\n".join(f"{title}" for title in title_list)
# Final Discord message
response = (
f"🎬 **Recommendations for `{username}` ({media_type}s)**\n\n"
f"**✅ Available on Plex:**\n"
f"{format_recs(available) or 'None found.'}\n\n"
f"**🛒 Requestable:**\n"
f"{format_recs(requestable) or 'None found.'}"
)
await interaction.followup.send(response)
except Exception as e:
await interaction.followup.send(f"Error: {str(e)}")
print("Error in /recommend:", e)
await interaction.followup.send(f"An error occurred while generating recommendations: {e}")
if __name__ == "__main__":
client.run(DISCORD_BOT_TOKEN)

View File

@ -11,7 +11,7 @@ load_dotenv()
openai.api_key = OPENAI_API_KEY
MediaType = Literal["movie", "show"]
MediaType = Literal["movie", "tv"]
class Recommender:
def __init__(self):
@ -21,34 +21,39 @@ class Recommender:
if not watched_titles:
raise ValueError("No watched titles provided.")
prompt = self.build_prompt(watched_titles, media_type, max_recs)
available_titles = [item["title"] for item in self.cache.data if item["type"] == media_type]
prompt = self.build_prompt(watched_titles, available_titles, media_type, max_recs)
response = self.query_openai(prompt)
print("🧠 Prompt:", prompt)
print("📥 Raw response:", response)
all_titles = self.parse_titles(response)
print("📦 Parsed titles:", all_titles)
all_titles = self.parse_titles(response) # ✅ Bring this back
# Re-verify locally
available = [title for title in all_titles if self.cache.search(title, media_type)]
requestable = [title for title in all_titles if title not in available]
for title in all_titles:
match = self.cache.search(title, media_type)
print(f"{'' if match else ''} {title}")
return {
"available": available,
"requestable": requestable
"available": available[:max_recs],
"requestable": requestable[:max_recs]
}
def build_prompt(self, watched: List[str], media_type: str, max_recs: int) -> str:
def build_prompt(self, watched: List[str], available_titles: List[str], media_type: str, max_recs: int) -> str:
type_text = "movies" if media_type == "movie" else "TV shows"
# You could optionally summarize genres here
genre_summary = self.extract_common_genres(watched, media_type)
return (
f"A user has watched the following {type_text}: {', '.join(watched[:20])}. "
f"These shows are mostly {genre_summary}. "
f"Recommend {max_recs} similar {type_text} based on theme and tone. "
f"Return only a plain comma-separated list of titles — no numbers, no explanations."
f"""
The user has recently watched the following {type_text}: {', '.join(watched[:10])}.
Here is a list of {type_text} available on the Plex server: {', '.join(available_titles[:200])}.
Recommend 10 similar {type_text}. Select 5 that the user would like that are available on the server and 5 that are not available.
Return only a plain comma-separated list of titles.
"""
)
def extract_common_genres(self, watched: List[str], media_type: str) -> str:
@ -61,7 +66,6 @@ class Recommender:
top_genres = [g for g, _ in sorted_genres[:3]]
return ", ".join(top_genres) if top_genres else "varied genres"
def query_openai(self, prompt: str) -> str:
try:
response = openai.ChatCompletion.create(
@ -71,13 +75,12 @@ class Recommender:
{"role": "user", "content": prompt}
],
temperature=0.4,
max_tokens=150
max_tokens=300
)
return response.choices[0].message.content
except Exception as e:
print("⚠️ OpenAI API error:", e)
return ""
def parse_titles(self, response: str) -> List[str]:
lines = response.replace("\n", ",").split(",")
cleaned = []
@ -87,3 +90,13 @@ class Recommender:
if item:
cleaned.append(item)
return cleaned
def parse_split_titles(self, response: str) -> (List[str], List[str]):
available = []
requestable = []
for line in response.splitlines():
if line.lower().startswith("on plex:"):
available = [t.strip() for t in line.split(":", 1)[1].split(",") if t.strip()]
elif line.lower().startswith("not on plex:"):
requestable = [t.strip() for t in line.split(":", 1)[1].split(",") if t.strip()]
return available, requestable