#!/usr/bin/env python3 import requests import time import logging import sys import signal import json import ipaddress logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) class ServiceUnavailable(Exception): """Raised when a monitored service is temporarily unavailable.""" pass class JellyfinQBittorrentMonitor: def __init__( self, jellyfin_url="http://localhost:8096", qbittorrent_url="http://localhost:8080", check_interval=30, jellyfin_api_key=None, streaming_start_delay=10, streaming_stop_delay=60, total_bandwidth_budget=30000000, service_buffer=5000000, default_stream_bitrate=10000000, min_torrent_speed=100, ): self.jellyfin_url = jellyfin_url self.qbittorrent_url = qbittorrent_url self.check_interval = check_interval self.jellyfin_api_key = jellyfin_api_key self.total_bandwidth_budget = total_bandwidth_budget self.service_buffer = service_buffer self.default_stream_bitrate = default_stream_bitrate self.min_torrent_speed = min_torrent_speed self.last_streaming_state = None self.current_state = "unlimited" self.torrents_paused = False self.last_alt_limits = None self.running = True self.session = requests.Session() # Use session for cookies self.last_active_streams = [] # Hysteresis settings to prevent rapid switching self.streaming_start_delay = streaming_start_delay self.streaming_stop_delay = streaming_stop_delay self.last_state_change = 0 # Local network ranges (RFC 1918 private networks + localhost) self.local_networks = [ ipaddress.ip_network("10.0.0.0/8"), ipaddress.ip_network("172.16.0.0/12"), ipaddress.ip_network("192.168.0.0/16"), ipaddress.ip_network("127.0.0.0/8"), ipaddress.ip_network("::1/128"), # IPv6 localhost ipaddress.ip_network("fe80::/10"), # IPv6 link-local ] def is_local_ip(self, ip_address: str) -> bool: """Check if an IP address is from a local network""" try: ip = ipaddress.ip_address(ip_address) return any(ip in network for network in self.local_networks) except ValueError: logger.warning(f"Invalid IP address format: {ip_address}") return True # Treat invalid IPs as local for safety def signal_handler(self, signum, frame): logger.info("Received shutdown signal, cleaning up...") self.running = False self.restore_normal_limits() sys.exit(0) def check_jellyfin_sessions(self) -> list[dict]: headers = ( {"X-Emby-Token": self.jellyfin_api_key} if self.jellyfin_api_key else {} ) try: response = requests.get( f"{self.jellyfin_url}/Sessions", headers=headers, timeout=10 ) response.raise_for_status() except requests.exceptions.RequestException as e: logger.error(f"Failed to check Jellyfin sessions: {e}") raise ServiceUnavailable(f"Jellyfin unavailable: {e}") from e try: sessions = response.json() except json.JSONDecodeError as e: logger.error(f"Failed to parse Jellyfin response: {e}") raise ServiceUnavailable(f"Jellyfin returned invalid JSON: {e}") from e active_streams = [] for session in sessions: if ( "NowPlayingItem" in session and not session.get("PlayState", {}).get("IsPaused", True) and not self.is_local_ip(session.get("RemoteEndPoint", "")) ): item = session["NowPlayingItem"] item_type = item.get("Type", "").lower() if item_type in ["movie", "episode", "video"]: user = session.get("UserName", "Unknown") stream_name = f"{user}: {item.get('Name', 'Unknown')}" if session.get("TranscodingInfo") and session[ "TranscodingInfo" ].get("Bitrate"): bitrate = session["TranscodingInfo"]["Bitrate"] elif item.get("Bitrate"): bitrate = item["Bitrate"] elif item.get("MediaSources", [{}])[0].get("Bitrate"): bitrate = item["MediaSources"][0]["Bitrate"] else: bitrate = self.default_stream_bitrate bitrate = min(int(bitrate), 100_000_000) active_streams.append({"name": stream_name, "bitrate_bps": bitrate}) return active_streams def check_qbittorrent_alternate_limits(self) -> bool: try: response = self.session.get( f"{self.qbittorrent_url}/api/v2/transfer/speedLimitsMode", timeout=10 ) if response.status_code == 200: return response.text.strip() == "1" else: logger.warning( f"SpeedLimitsMode endpoint returned HTTP {response.status_code}" ) raise ServiceUnavailable( f"qBittorrent returned HTTP {response.status_code}" ) except requests.exceptions.RequestException as e: logger.error(f"SpeedLimitsMode endpoint failed: {e}") raise ServiceUnavailable(f"qBittorrent unavailable: {e}") from e def use_alt_limits(self, enable: bool) -> None: action = "enabled" if enable else "disabled" try: current_throttle = self.check_qbittorrent_alternate_limits() if current_throttle == enable: logger.debug( f"Alternate speed limits already {action}, no action needed" ) return response = self.session.post( f"{self.qbittorrent_url}/api/v2/transfer/toggleSpeedLimitsMode", timeout=10, ) response.raise_for_status() new_state = self.check_qbittorrent_alternate_limits() if new_state == enable: logger.info(f"Alternate speed limits {action}") else: logger.warning( f"Toggle may have failed: expected {enable}, got {new_state}" ) except ServiceUnavailable: logger.warning( f"qBittorrent unavailable, cannot {action} alternate speed limits" ) except requests.exceptions.RequestException as e: logger.error(f"Failed to {action} alternate speed limits: {e}") def pause_all_torrents(self) -> None: try: response = self.session.post( f"{self.qbittorrent_url}/api/v2/torrents/stop", data={"hashes": "all"}, timeout=10, ) response.raise_for_status() except requests.exceptions.RequestException as e: logger.error(f"Failed to pause torrents: {e}") def resume_all_torrents(self) -> None: try: response = self.session.post( f"{self.qbittorrent_url}/api/v2/torrents/start", data={"hashes": "all"}, timeout=10, ) response.raise_for_status() except requests.exceptions.RequestException as e: logger.error(f"Failed to resume torrents: {e}") def set_alt_speed_limits(self, dl_kbs: float, ul_kbs: float) -> None: try: payload = { "alt_dl_limit": int(dl_kbs * 1024), "alt_up_limit": int(ul_kbs * 1024), } response = self.session.post( f"{self.qbittorrent_url}/api/v2/app/setPreferences", data={"json": json.dumps(payload)}, timeout=10, ) response.raise_for_status() self.last_alt_limits = (dl_kbs, ul_kbs) except requests.exceptions.RequestException as e: logger.error(f"Failed to set alternate speed limits: {e}") def restore_normal_limits(self) -> None: if self.torrents_paused: logger.info("Resuming all torrents before shutdown...") self.resume_all_torrents() self.torrents_paused = False if self.current_state != "unlimited": logger.info("Restoring normal speed limits before shutdown...") self.use_alt_limits(False) self.current_state = "unlimited" def sync_qbittorrent_state(self) -> None: try: if self.current_state == "unlimited": actual_state = self.check_qbittorrent_alternate_limits() if actual_state: logger.warning( "qBittorrent state mismatch detected: expected alt speed OFF, got ON. Re-syncing..." ) self.use_alt_limits(False) elif self.current_state == "throttled": if self.last_alt_limits: self.set_alt_speed_limits(*self.last_alt_limits) actual_state = self.check_qbittorrent_alternate_limits() if not actual_state: logger.warning( "qBittorrent state mismatch detected: expected alt speed ON, got OFF. Re-syncing..." ) self.use_alt_limits(True) elif self.current_state == "paused": self.pause_all_torrents() self.torrents_paused = True except ServiceUnavailable: pass def should_change_state(self, new_streaming_state: bool) -> bool: """Apply hysteresis to prevent rapid state changes""" now = time.time() if new_streaming_state == self.last_streaming_state: return False time_since_change = now - self.last_state_change if new_streaming_state and not self.last_streaming_state: if time_since_change >= self.streaming_start_delay: self.last_state_change = now return True else: remaining = self.streaming_start_delay - time_since_change logger.info( f"Streaming started - waiting {remaining:.1f}s before enforcing limits" ) elif not new_streaming_state and self.last_streaming_state: if time_since_change >= self.streaming_stop_delay: self.last_state_change = now return True else: remaining = self.streaming_stop_delay - time_since_change logger.info( f"Streaming stopped - waiting {remaining:.1f}s before restoring unlimited mode" ) return False def run(self): logger.info("Starting Jellyfin-qBittorrent monitor") logger.info(f"Jellyfin URL: {self.jellyfin_url}") logger.info(f"qBittorrent URL: {self.qbittorrent_url}") logger.info(f"Check interval: {self.check_interval}s") logger.info(f"Streaming start delay: {self.streaming_start_delay}s") logger.info(f"Streaming stop delay: {self.streaming_stop_delay}s") logger.info(f"Total bandwidth budget: {self.total_bandwidth_budget} bps") logger.info(f"Service buffer: {self.service_buffer} bps") logger.info(f"Default stream bitrate: {self.default_stream_bitrate} bps") logger.info(f"Minimum torrent speed: {self.min_torrent_speed} KB/s") signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) while self.running: try: self.sync_qbittorrent_state() try: active_streams = self.check_jellyfin_sessions() except ServiceUnavailable: logger.warning("Jellyfin unavailable, maintaining current state") time.sleep(self.check_interval) continue streaming_active = len(active_streams) > 0 if active_streams: for stream in active_streams: logger.debug( f"Active stream: {stream['name']} ({stream['bitrate_bps']} bps)" ) if active_streams != self.last_active_streams: if streaming_active: stream_names = ", ".join( stream["name"] for stream in active_streams ) logger.info( f"Active streams ({len(active_streams)}): {stream_names}" ) elif len(active_streams) == 0 and self.last_streaming_state: logger.info("No active streaming sessions") if self.should_change_state(streaming_active): self.last_streaming_state = streaming_active streaming_state = bool(self.last_streaming_state) total_streaming_bps = sum( stream["bitrate_bps"] for stream in active_streams ) remaining_bps = ( self.total_bandwidth_budget - self.service_buffer - total_streaming_bps ) remaining_kbs = max(0, remaining_bps) / 8 / 1024 if not streaming_state: desired_state = "unlimited" elif streaming_active: if remaining_kbs >= self.min_torrent_speed: desired_state = "throttled" else: desired_state = "paused" else: desired_state = self.current_state if desired_state != self.current_state: if desired_state == "unlimited": action = "resume torrents, disable alt speed" elif desired_state == "throttled": action = ( "set alt limits " f"dl={int(remaining_kbs)}KB/s ul={int(remaining_kbs)}KB/s, enable alt speed" ) else: action = "pause torrents" logger.info( "State change %s -> %s | streams=%d total_bps=%d remaining_bps=%d action=%s", self.current_state, desired_state, len(active_streams), total_streaming_bps, remaining_bps, action, ) if desired_state == "unlimited": if self.torrents_paused: self.resume_all_torrents() self.torrents_paused = False self.use_alt_limits(False) elif desired_state == "throttled": if self.torrents_paused: self.resume_all_torrents() self.torrents_paused = False self.set_alt_speed_limits(remaining_kbs, remaining_kbs) self.use_alt_limits(True) else: if not self.torrents_paused: self.pause_all_torrents() self.torrents_paused = True self.current_state = desired_state self.last_active_streams = active_streams time.sleep(self.check_interval) except KeyboardInterrupt: break except Exception as e: logger.error(f"Unexpected error in monitoring loop: {e}") time.sleep(self.check_interval) self.restore_normal_limits() logger.info("Monitor stopped") if __name__ == "__main__": import os # Configuration from environment variables jellyfin_url = os.getenv("JELLYFIN_URL", "http://localhost:8096") qbittorrent_url = os.getenv("QBITTORRENT_URL", "http://localhost:8080") check_interval = int(os.getenv("CHECK_INTERVAL", "30")) jellyfin_api_key = os.getenv("JELLYFIN_API_KEY") streaming_start_delay = int(os.getenv("STREAMING_START_DELAY", "10")) streaming_stop_delay = int(os.getenv("STREAMING_STOP_DELAY", "60")) total_bandwidth_budget = int(os.getenv("TOTAL_BANDWIDTH_BUDGET", "30000000")) service_buffer = int(os.getenv("SERVICE_BUFFER", "5000000")) default_stream_bitrate = int(os.getenv("DEFAULT_STREAM_BITRATE", "10000000")) min_torrent_speed = int(os.getenv("MIN_TORRENT_SPEED", "100")) monitor = JellyfinQBittorrentMonitor( jellyfin_url=jellyfin_url, qbittorrent_url=qbittorrent_url, check_interval=check_interval, jellyfin_api_key=jellyfin_api_key, streaming_start_delay=streaming_start_delay, streaming_stop_delay=streaming_stop_delay, total_bandwidth_budget=total_bandwidth_budget, service_buffer=service_buffer, default_stream_bitrate=default_stream_bitrate, min_torrent_speed=min_torrent_speed, ) monitor.run()