Initial working version
This commit is contained in:
66
bot/plex_client.py
Normal file
66
bot/plex_client.py
Normal file
@ -0,0 +1,66 @@
|
||||
# bot/plex_client.py
|
||||
|
||||
from plexapi.server import PlexServer
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from config import PLEX_BASE_URL, PLEX_TOKEN
|
||||
|
||||
|
||||
class PlexClient:
|
||||
def __init__(self):
|
||||
self.server = PlexServer(PLEX_BASE_URL, PLEX_TOKEN)
|
||||
self.users = {u.title.lower(): u for u in self.server.myPlexAccount().users()}
|
||||
self.admin_username = self.server.myPlexAccount().username.lower()
|
||||
|
||||
|
||||
|
||||
|
||||
def get_user_watch_history(self, username: str, media_type: str = "movie", max_items: int = 20):
|
||||
"""Returns recent watch history within the last 14 days for a user."""
|
||||
recent_cutoff = datetime.now() - timedelta(days=14)
|
||||
|
||||
username = username.lower()
|
||||
|
||||
if username == self.admin_username:
|
||||
user_server = self.server
|
||||
else:
|
||||
user = self.users.get(username)
|
||||
if not user:
|
||||
raise ValueError(f"No Plex user found with username '{username}'")
|
||||
token = user.get_token(self.server.friendlyName)
|
||||
user_server = PlexServer(PLEX_BASE_URL, token)
|
||||
|
||||
|
||||
history = []
|
||||
sections = user_server.library.sections()
|
||||
|
||||
for section in sections:
|
||||
if section.type == media_type:
|
||||
items = section.search(sort='lastViewedAt:desc', limit=max_items * 2) # buffer for filtering
|
||||
for item in items:
|
||||
if item.lastViewedAt and item.lastViewedAt >= recent_cutoff:
|
||||
history.append({
|
||||
'title': item.title,
|
||||
'type': media_type,
|
||||
'genres': [g.tag for g in getattr(item, 'genres', [])],
|
||||
'summary': getattr(item, 'summary', ''),
|
||||
})
|
||||
if len(history) >= max_items:
|
||||
break
|
||||
print(f"📺 Watch history for {username}:")
|
||||
for item in history:
|
||||
print(f"- {item['title']} (last viewed: {item.get('lastViewedAt', 'n/a')})")
|
||||
|
||||
return history
|
||||
|
||||
|
||||
def get_all_library_titles(self):
|
||||
"""Returns all movies and shows in the Plex library."""
|
||||
media = {'movie': [], 'show': []}
|
||||
for section in self.server.library.sections():
|
||||
if section.type in media:
|
||||
for item in section.all():
|
||||
media[section.type].append(item.title)
|
||||
return media
|
Reference in New Issue
Block a user