Compare commits

...

3 Commits

Author SHA1 Message Date
87e90e2178 Updated README 2025-06-03 13:24:21 -04:00
24dbe252de Working version 2025-06-03 13:12:58 -04:00
e991862a01 Initial working version 2025-05-29 15:04:36 -04:00
14 changed files with 451 additions and 1 deletions

3
.dockerignore Normal file
View File

@ -0,0 +1,3 @@
__pycache__/
*.pyc
.env

18
Dockerfile Normal file
View File

@ -0,0 +1,18 @@
# Dockerfile
FROM python:3.11-slim
ENV PYTHONUNBUFFERED=1
# Set working directory
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy bot source code
COPY bot/ ./bot
# Set default command
CMD ["python", "bot/main.py"]

View File

@ -1,3 +1,14 @@
# discord-plex-recommender
This is a discord bot to review a Plex user's viewing history and make recommendations.
This is a discord bot to review a Plex user's viewing history and make recommendations. This is a very basic framework and is used primarily as a foundation for a larger bot and project. The highlights here are that it connects to a Plex user's viewing history and references this history when connecting to an OpenAI API. Future versions will provide more depth, such as elaborting on recommendations, providing different recommendations by genre, and also accepting additional user arguments, such as what an argument is "in the mood for watching".
## The Bot in Action
![Screenshot 1](screenshots/discord_view.png)
*Calling the Bot in a Discord Chat*
![Screenshot 2](screenshots/tv.png)
*Response after requesting tv recommendations*
![Screenshot 3](screenshots/movies.png)
*Response after requesting movie recommendations*

21
bot/config.py Normal file
View File

@ -0,0 +1,21 @@
# bot/config.py
import os
from dotenv import load_dotenv
load_dotenv()
# --- Plex ---
PLEX_BASE_URL = os.getenv("PLEX_BASE_URL")
PLEX_TOKEN = os.getenv("PLEX_TOKEN")
# --- OpenAI ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4")
# --- Caching ---
CACHE_FILE = os.getenv("CACHE_FILE", "bot/library_cache.json")
MAX_CACHE_AGE_HOURS = int(os.getenv("MAX_CACHE_AGE_HOURS", 6))
# --- Discord Bot ---
DISCORD_BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")

75
bot/library_cache.py Normal file
View File

@ -0,0 +1,75 @@
# bot/library_cache.py
import json
import os
import time
from typing import List
from plex_client import PlexClient
from difflib import SequenceMatcher
CACHE_FILE = 'bot/library_cache.json'
MAX_CACHE_AGE_HOURS = 6
class LibraryCache:
def __init__(self):
if self.is_cache_stale():
print("📦 Plex cache is stale or missing — refreshing...")
self.refresh_cache()
else:
print("✅ Using cached Plex library data.")
self.load_cache()
def is_cache_stale(self) -> bool:
if not os.path.exists(CACHE_FILE):
return True
age_hours = (time.time() - os.path.getmtime(CACHE_FILE)) / 3600
return age_hours > MAX_CACHE_AGE_HOURS
def refresh_cache(self):
plex = PlexClient()
media = plex.server.library.sections()
new_data = []
for section in media:
section_type = section.type.lower()
# Normalize to 'show'
if section_type == 'show':
section_type = 'tv'
if section_type in ['movie', 'tv']:
for item in section.all():
new_data.append({
'title': item.title.strip(),
'type': section_type,
'genres': [g.tag for g in getattr(item, 'genres', [])],
'year': getattr(item, 'year', None)
})
self.data = new_data
self.save_cache()
def save_cache(self):
with open(CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
def load_cache(self):
with open(CACHE_FILE, 'r', encoding='utf-8') as f:
self.data = json.load(f)
def get_titles_by_type(self, media_type: str) -> List[str]:
return [item['title'] for item in self.data if item['type'] == media_type]
def search(self, title: str, media_type: str) -> bool:
def is_match(a: str, b: str) -> bool:
ratio = SequenceMatcher(None, a.lower(), b.lower()).ratio()
return ratio > 0.8 # tweak as needed
for item in self.data:
if item["type"] != media_type:
continue
if is_match(title, item["title"]):
return True
return False

67
bot/main.py Normal file
View File

@ -0,0 +1,67 @@
# bot/main.py
import discord
from discord.ext import commands
from discord import app_commands
from tautulli_client import TautulliClient
from recommender import Recommender
from config import DISCORD_BOT_TOKEN
intents = discord.Intents.default()
client = commands.Bot(command_prefix="!", intents=intents)
tree = client.tree
tautulli = TautulliClient()
recommender = Recommender()
@client.event
async def on_ready():
print(f"🤖 Logged in as {client.user} (ID: {client.user.id})")
await tree.sync()
print("✅ Slash commands synced.")
@tree.command(name="recommend", description="Recommend movies or TV shows for a Plex user")
@app_commands.describe(
username="Plex username (case-sensitive)",
media_type="Choose 'movie' or 'tv'"
)
async def recommend(interaction: discord.Interaction, username: str, media_type: str):
await interaction.response.defer() # Acknowledge the request
try:
history = tautulli.get_user_watch_history(username=username, media_type=media_type.lower())
if not history:
await interaction.followup.send(f"⚠️ No recent {media_type} history found for `{username}`.")
return
watched_titles = [entry["title"] for entry in history]
# Print cached Plex titles for this media_type
print(f"🗂️ Cached Plex {media_type}s:")
for item in recommender.cache.data:
if item["type"] == media_type.lower():
print(f"- {item['title']}")
results = recommender.recommend(watched_titles, media_type=media_type.lower())
available = results["available"]
requestable = results["requestable"]
def format_recs(title_list):
return "\n".join(f"{title}" for title in title_list)
# Final Discord message
response = (
f"🎬 **Recommendations for `{username}` ({media_type}s)**\n\n"
f"**✅ Available on Plex:**\n"
f"{format_recs(available) or 'None found.'}\n\n"
f"**🛒 Requestable:**\n"
f"{format_recs(requestable) or 'None found.'}"
)
await interaction.followup.send(response)
except Exception as e:
await interaction.followup.send(f"❌ An error occurred while generating recommendations: {e}")
if __name__ == "__main__":
client.run(DISCORD_BOT_TOKEN)

66
bot/plex_client.py Normal file
View File

@ -0,0 +1,66 @@
# bot/plex_client.py
from plexapi.server import PlexServer
from dotenv import load_dotenv
import os
from datetime import datetime, timedelta
from config import PLEX_BASE_URL, PLEX_TOKEN
class PlexClient:
def __init__(self):
self.server = PlexServer(PLEX_BASE_URL, PLEX_TOKEN)
self.users = {u.title.lower(): u for u in self.server.myPlexAccount().users()}
self.admin_username = self.server.myPlexAccount().username.lower()
def get_user_watch_history(self, username: str, media_type: str = "movie", max_items: int = 20):
"""Returns recent watch history within the last 14 days for a user."""
recent_cutoff = datetime.now() - timedelta(days=14)
username = username.lower()
if username == self.admin_username:
user_server = self.server
else:
user = self.users.get(username)
if not user:
raise ValueError(f"No Plex user found with username '{username}'")
token = user.get_token(self.server.friendlyName)
user_server = PlexServer(PLEX_BASE_URL, token)
history = []
sections = user_server.library.sections()
for section in sections:
if section.type == media_type:
items = section.search(sort='lastViewedAt:desc', limit=max_items * 2) # buffer for filtering
for item in items:
if item.lastViewedAt and item.lastViewedAt >= recent_cutoff:
history.append({
'title': item.title,
'type': media_type,
'genres': [g.tag for g in getattr(item, 'genres', [])],
'summary': getattr(item, 'summary', ''),
})
if len(history) >= max_items:
break
print(f"📺 Watch history for {username}:")
for item in history:
print(f"- {item['title']} (last viewed: {item.get('lastViewedAt', 'n/a')})")
return history
def get_all_library_titles(self):
"""Returns all movies and shows in the Plex library."""
media = {'movie': [], 'show': []}
for section in self.server.library.sections():
if section.type in media:
for item in section.all():
media[section.type].append(item.title)
return media

102
bot/recommender.py Normal file
View File

@ -0,0 +1,102 @@
# bot/recommender.py
import openai
import os
from dotenv import load_dotenv
from typing import List, Dict, Literal
from library_cache import LibraryCache
from config import OPENAI_API_KEY, OPENAI_MODEL
load_dotenv()
openai.api_key = OPENAI_API_KEY
MediaType = Literal["movie", "tv"]
class Recommender:
def __init__(self):
self.cache = LibraryCache()
def recommend(self, watched_titles: List[str], media_type: MediaType = "movie", max_recs: int = 5) -> Dict[str, List[str]]:
if not watched_titles:
raise ValueError("No watched titles provided.")
available_titles = [item["title"] for item in self.cache.data if item["type"] == media_type]
prompt = self.build_prompt(watched_titles, available_titles, media_type, max_recs)
response = self.query_openai(prompt)
print("🧠 Prompt:", prompt)
print("📥 Raw response:", response)
all_titles = self.parse_titles(response) # ✅ Bring this back
# Re-verify locally
available = [title for title in all_titles if self.cache.search(title, media_type)]
requestable = [title for title in all_titles if title not in available]
for title in all_titles:
match = self.cache.search(title, media_type)
print(f"{'' if match else ''} {title}")
return {
"available": available[:max_recs],
"requestable": requestable[:max_recs]
}
def build_prompt(self, watched: List[str], available_titles: List[str], media_type: str, max_recs: int) -> str:
type_text = "movies" if media_type == "movie" else "TV shows"
return (
f"""
The user has recently watched the following {type_text}: {', '.join(watched[:10])}.
Here is a list of {type_text} available on the Plex server: {', '.join(available_titles[:200])}.
Recommend 10 similar {type_text}. Select 5 that the user would like that are available on the server and 5 that are not available.
Return only a plain comma-separated list of titles.
"""
)
def extract_common_genres(self, watched: List[str], media_type: str) -> str:
genre_counts = {}
for item in self.cache.data:
if item["title"] in watched and item["type"] == media_type:
for genre in item.get("genres", []):
genre_counts[genre] = genre_counts.get(genre, 0) + 1
sorted_genres = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)
top_genres = [g for g, _ in sorted_genres[:3]]
return ", ".join(top_genres) if top_genres else "varied genres"
def query_openai(self, prompt: str) -> str:
try:
response = openai.ChatCompletion.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": "You're a helpful and precise media recommender."},
{"role": "user", "content": prompt}
],
temperature=0.4,
max_tokens=300
)
return response.choices[0].message.content
except Exception as e:
print("⚠️ OpenAI API error:", e)
return ""
def parse_titles(self, response: str) -> List[str]:
lines = response.replace("\n", ",").split(",")
cleaned = []
for item in lines:
item = item.strip()
item = item.lstrip("-•*0123456789. ").strip()
if item:
cleaned.append(item)
return cleaned
def parse_split_titles(self, response: str) -> (List[str], List[str]):
available = []
requestable = []
for line in response.splitlines():
if line.lower().startswith("on plex:"):
available = [t.strip() for t in line.split(":", 1)[1].split(",") if t.strip()]
elif line.lower().startswith("not on plex:"):
requestable = [t.strip() for t in line.split(":", 1)[1].split(",") if t.strip()]
return available, requestable

73
bot/tautulli_client.py Normal file
View File

@ -0,0 +1,73 @@
# bot/tautulli_client.py
import os
import requests
from datetime import datetime, timedelta
from dotenv import load_dotenv
load_dotenv()
TAUTULLI_URL = os.getenv("TAUTULLI_URL")
TAUTULLI_API_KEY = os.getenv("TAUTULLI_API_KEY")
class TautulliClient:
def __init__(self):
self.base_url = f"{TAUTULLI_URL}/api/v2"
self.api_key = TAUTULLI_API_KEY
def _request(self, cmd: str, params: dict = {}):
payload = {
"apikey": self.api_key,
"cmd": cmd,
**params
}
try:
res = requests.get(self.base_url, params=payload)
res.raise_for_status()
return res.json().get("response", {}).get("data", [])
except Exception as e:
print(f"❌ Tautulli API error for {cmd}: {e}")
return []
def get_user_watch_history(self, username: str, media_type: str = "movie", days: int = 14, max_items: int = 20):
"""Get a list of recently watched movies or shows by username."""
cutoff = datetime.now() - timedelta(days=days)
print(media_type)
if media_type=='movie':
tautulli_media_type='movie'
elif media_type=='tv':
tautulli_media_type='episode'
history = self._request("get_history", {
"user": username,
"media_type":tautulli_media_type,
"order_column": "date",
"order_dir": "desc",
"length": 100 # buffer to allow filtering
})
history = history['data']
#print(history)
filtered = []
print('FULL HISTORY')
for item in history:
print(item['user'],item['media_type'],item['title'],item['grandparent_title'],username,cutoff,datetime.fromtimestamp(item["date"]))
try:
watched_date = datetime.fromtimestamp(item["date"])
if watched_date < cutoff:
continue
filtered.append({
"title": item["title"] if media_type == "movie" else item["grandparent_title"],
"type": media_type,
"genres": item.get("genre", "").split(", "),
"summary": item.get("summary", "")
})
if len(filtered) >= max_items:
break
except Exception:
continue
print(filtered)
return filtered

9
docker-compose.yml Normal file
View File

@ -0,0 +1,9 @@
version: '3.8'
services:
plex-discord-bot:
build: .
env_file: .env
restart: unless-stopped
volumes:
- ./bot:/app/bot

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
discord.py
openai==0.28
plexapi
python-dotenv
requests

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.8 KiB

BIN
screenshots/movies.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

BIN
screenshots/tv.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB