#!/usr/bin/env python3 import requests import time import logging import sys import signal import json import ipaddress logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) class ServiceUnavailable(Exception): """Raised when a monitored service is temporarily unavailable.""" pass class JellyfinQBittorrentMonitor: def __init__( self, jellyfin_url="http://localhost:8096", qbittorrent_url="http://localhost:8080", check_interval=30, jellyfin_api_key=None, streaming_start_delay=10, streaming_stop_delay=60, ): self.jellyfin_url = jellyfin_url self.qbittorrent_url = qbittorrent_url self.check_interval = check_interval self.jellyfin_api_key = jellyfin_api_key self.last_streaming_state = None self.throttle_active = False self.running = True self.session = requests.Session() # Use session for cookies self.last_active_streams = [] # Hysteresis settings to prevent rapid switching self.streaming_start_delay = streaming_start_delay self.streaming_stop_delay = streaming_stop_delay self.last_state_change = 0 # Local network ranges (RFC 1918 private networks + localhost) self.local_networks = [ ipaddress.ip_network("10.0.0.0/8"), ipaddress.ip_network("172.16.0.0/12"), ipaddress.ip_network("192.168.0.0/16"), ipaddress.ip_network("127.0.0.0/8"), ipaddress.ip_network("::1/128"), # IPv6 localhost ipaddress.ip_network("fe80::/10"), # IPv6 link-local ] def is_local_ip(self, ip_address: str) -> bool: """Check if an IP address is from a local network""" try: ip = ipaddress.ip_address(ip_address) return any(ip in network for network in self.local_networks) except ValueError: logger.warning(f"Invalid IP address format: {ip_address}") return True # Treat invalid IPs as local for safety def signal_handler(self, signum, frame): logger.info("Received shutdown signal, cleaning up...") self.running = False self.restore_normal_limits() sys.exit(0) def check_jellyfin_sessions(self) -> list[str]: headers = {"X-Emby-Token": self.jellyfin_api_key} if self.jellyfin_api_key else {} try: response = requests.get( f"{self.jellyfin_url}/Sessions", headers=headers, timeout=10 ) response.raise_for_status() except requests.exceptions.RequestException as e: logger.error(f"Failed to check Jellyfin sessions: {e}") raise ServiceUnavailable(f"Jellyfin unavailable: {e}") from e try: sessions = response.json() except json.JSONDecodeError as e: logger.error(f"Failed to parse Jellyfin response: {e}") raise ServiceUnavailable(f"Jellyfin returned invalid JSON: {e}") from e active_streams = [] for session in sessions: if ( "NowPlayingItem" in session and not session.get("PlayState", {}).get("IsPaused", True) and not self.is_local_ip(session.get("RemoteEndPoint", "")) ): item = session["NowPlayingItem"] item_type = item.get("Type", "").lower() if item_type in ["movie", "episode", "video"]: user = session.get("UserName", "Unknown") active_streams.append(f"{user}: {item.get('Name', 'Unknown')}") return active_streams def check_qbittorrent_alternate_limits(self) -> bool: try: response = self.session.get( f"{self.qbittorrent_url}/api/v2/transfer/speedLimitsMode", timeout=10 ) if response.status_code == 200: return response.text.strip() == "1" else: logger.warning( f"SpeedLimitsMode endpoint returned HTTP {response.status_code}" ) raise ServiceUnavailable(f"qBittorrent returned HTTP {response.status_code}") except requests.exceptions.RequestException as e: logger.error(f"SpeedLimitsMode endpoint failed: {e}") raise ServiceUnavailable(f"qBittorrent unavailable: {e}") from e def use_alt_limits(self, enable: bool) -> None: action = "enabled" if enable else "disabled" try: current_throttle = self.check_qbittorrent_alternate_limits() if current_throttle == enable: logger.debug(f"Alternate speed limits already {action}, no action needed") return response = self.session.post( f"{self.qbittorrent_url}/api/v2/transfer/toggleSpeedLimitsMode", timeout=10, ) response.raise_for_status() self.throttle_active = enable new_state = self.check_qbittorrent_alternate_limits() if new_state == enable: logger.info(f"Alternate speed limits {action}") else: logger.warning(f"Toggle may have failed: expected {enable}, got {new_state}") except ServiceUnavailable: logger.warning(f"qBittorrent unavailable, cannot {action} alternate speed limits") except requests.exceptions.RequestException as e: logger.error(f"Failed to {action} alternate speed limits: {e}") def restore_normal_limits(self) -> None: if self.throttle_active: logger.info("Restoring normal speed limits before shutdown...") self.use_alt_limits(False) def sync_qbittorrent_state(self) -> None: try: actual_state = self.check_qbittorrent_alternate_limits() if actual_state != self.throttle_active: logger.warning( f"qBittorrent state mismatch detected: expected {self.throttle_active}, got {actual_state}. Re-syncing..." ) self.use_alt_limits(self.throttle_active) except ServiceUnavailable: pass def should_change_state(self, new_streaming_state: bool) -> bool: """Apply hysteresis to prevent rapid state changes""" now = time.time() if new_streaming_state == self.last_streaming_state: return False time_since_change = now - self.last_state_change # Start throttling (streaming started) if new_streaming_state and not self.last_streaming_state: if time_since_change >= self.streaming_start_delay: self.last_state_change = now return True else: remaining = self.streaming_start_delay - time_since_change logger.info( f"Streaming started - waiting {remaining:.1f}s before enabling throttling" ) # Stop throttling (streaming stopped) elif not new_streaming_state and self.last_streaming_state: if time_since_change >= self.streaming_stop_delay: self.last_state_change = now return True else: remaining = self.streaming_stop_delay - time_since_change logger.info( f"Streaming stopped - waiting {remaining:.1f}s before disabling throttling" ) return False def run(self): logger.info("Starting Jellyfin-qBittorrent monitor") logger.info(f"Jellyfin URL: {self.jellyfin_url}") logger.info(f"qBittorrent URL: {self.qbittorrent_url}") logger.info(f"Check interval: {self.check_interval}s") signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) while self.running: try: self.sync_qbittorrent_state() try: active_streams = self.check_jellyfin_sessions() except ServiceUnavailable: logger.warning("Jellyfin unavailable, maintaining current throttle state") time.sleep(self.check_interval) continue streaming_active = len(active_streams) > 0 if active_streams != self.last_active_streams: if streaming_active: logger.info( f"Active streams ({len(active_streams)}): {', '.join(active_streams)}" ) elif len(active_streams) == 0 and self.last_streaming_state: logger.info("No active streaming sessions") if self.should_change_state(streaming_active): self.last_streaming_state = streaming_active self.use_alt_limits(streaming_active) self.last_active_streams = active_streams time.sleep(self.check_interval) except KeyboardInterrupt: break except Exception as e: logger.error(f"Unexpected error in monitoring loop: {e}") time.sleep(self.check_interval) self.restore_normal_limits() logger.info("Monitor stopped") if __name__ == "__main__": import os # Configuration from environment variables jellyfin_url = os.getenv("JELLYFIN_URL", "http://localhost:8096") qbittorrent_url = os.getenv("QBITTORRENT_URL", "http://localhost:8080") check_interval = int(os.getenv("CHECK_INTERVAL", "30")) jellyfin_api_key = os.getenv("JELLYFIN_API_KEY") streaming_start_delay = int(os.getenv("STREAMING_START_DELAY", "10")) streaming_stop_delay = int(os.getenv("STREAMING_STOP_DELAY", "60")) monitor = JellyfinQBittorrentMonitor( jellyfin_url=jellyfin_url, qbittorrent_url=qbittorrent_url, check_interval=check_interval, jellyfin_api_key=jellyfin_api_key, streaming_start_delay=streaming_start_delay, streaming_stop_delay=streaming_stop_delay, ) monitor.run()