67 lines
2.4 KiB
Python
67 lines
2.4 KiB
Python
# bot/plex_client.py
|
|
|
|
from plexapi.server import PlexServer
|
|
from dotenv import load_dotenv
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
|
|
from config import PLEX_BASE_URL, PLEX_TOKEN
|
|
|
|
|
|
class PlexClient:
|
|
def __init__(self):
|
|
self.server = PlexServer(PLEX_BASE_URL, PLEX_TOKEN)
|
|
self.users = {u.title.lower(): u for u in self.server.myPlexAccount().users()}
|
|
self.admin_username = self.server.myPlexAccount().username.lower()
|
|
|
|
|
|
|
|
|
|
def get_user_watch_history(self, username: str, media_type: str = "movie", max_items: int = 20):
|
|
"""Returns recent watch history within the last 14 days for a user."""
|
|
recent_cutoff = datetime.now() - timedelta(days=14)
|
|
|
|
username = username.lower()
|
|
|
|
if username == self.admin_username:
|
|
user_server = self.server
|
|
else:
|
|
user = self.users.get(username)
|
|
if not user:
|
|
raise ValueError(f"No Plex user found with username '{username}'")
|
|
token = user.get_token(self.server.friendlyName)
|
|
user_server = PlexServer(PLEX_BASE_URL, token)
|
|
|
|
|
|
history = []
|
|
sections = user_server.library.sections()
|
|
|
|
for section in sections:
|
|
if section.type == media_type:
|
|
items = section.search(sort='lastViewedAt:desc', limit=max_items * 2) # buffer for filtering
|
|
for item in items:
|
|
if item.lastViewedAt and item.lastViewedAt >= recent_cutoff:
|
|
history.append({
|
|
'title': item.title,
|
|
'type': media_type,
|
|
'genres': [g.tag for g in getattr(item, 'genres', [])],
|
|
'summary': getattr(item, 'summary', ''),
|
|
})
|
|
if len(history) >= max_items:
|
|
break
|
|
print(f"📺 Watch history for {username}:")
|
|
for item in history:
|
|
print(f"- {item['title']} (last viewed: {item.get('lastViewedAt', 'n/a')})")
|
|
|
|
return history
|
|
|
|
|
|
def get_all_library_titles(self):
|
|
"""Returns all movies and shows in the Plex library."""
|
|
media = {'movie': [], 'show': []}
|
|
for section in self.server.library.sections():
|
|
if section.type in media:
|
|
for item in section.all():
|
|
media[section.type].append(item.title)
|
|
return media
|