{ lib, pkgs, ... }: let mockQBittorrent = pkgs.writers.writePython3Bin "mock-qbt" { flakeIgnore = [ "E501" ]; } '' import http.server import socketserver class Handler(http.server.BaseHTTPRequestHandler): def log_message(self, fmt, *args): print(f"qbt: {fmt % args}") def do_GET(self): if self.path == "/api/v2/transfer/speedLimitsMode": self.send_response(200) self.send_header("Content-type", "text/plain") self.end_headers() self.wfile.write(("1" if getattr(self.server, "alt_speed", False) else "0").encode()) else: self.send_response(404) self.end_headers() def do_POST(self): if self.path == "/api/v2/transfer/toggleSpeedLimitsMode": self.server.alt_speed = not getattr(self.server, "alt_speed", False) self.send_response(200) self.end_headers() else: self.send_response(404) self.end_headers() with socketserver.TCPServer(("127.0.0.1", 8080), Handler) as s: print("Mock qBittorrent on port 8080") s.serve_forever() ''; mockJellyfin = pkgs.writers.writePython3Bin "mock-jellyfin" { flakeIgnore = [ "E501" ]; } '' import http.server import socketserver import json DEFAULT = {"streaming": False, "paused": False, "local": False, "media_type": "Movie"} class Handler(http.server.BaseHTTPRequestHandler): def log_message(self, fmt, *args): print(f"jellyfin: {fmt % args}") def do_GET(self): if self.path == "/Sessions": self.send_response(200) self.send_header("Content-type", "application/json") self.end_headers() state = getattr(self.server, "state", DEFAULT.copy()) sessions = [] if state["streaming"]: sessions = [{ "Id": "test-1", "UserName": "User", "RemoteEndPoint": "192.168.1.100" if state["local"] else "203.0.113.42", "NowPlayingItem": {"Name": f"Test {state['media_type']}", "Type": state["media_type"]}, "PlayState": {"IsPaused": state["paused"]} }] self.wfile.write(json.dumps(sessions).encode()) else: self.send_response(404) self.end_headers() def do_POST(self): if self.path == "/control/state": data = json.loads(self.rfile.read(int(self.headers.get("Content-Length", 0))).decode() or "{}") if not hasattr(self.server, "state"): self.server.state = DEFAULT.copy() self.server.state.update(data) self.send_response(200) self.end_headers() else: self.send_response(404) self.end_headers() with socketserver.TCPServer(("127.0.0.1", 8096), Handler) as s: print("Mock Jellyfin on port 8096") s.serve_forever() ''; in pkgs.testers.runNixOSTest { name = "jellyfin-qbittorrent-monitor"; nodes.server = { systemd.services.mock-qbittorrent = { wantedBy = [ "multi-user.target" ]; serviceConfig.ExecStart = lib.getExe mockQBittorrent; }; systemd.services.mock-jellyfin = { wantedBy = [ "multi-user.target" ]; serviceConfig.ExecStart = lib.getExe mockJellyfin; }; environment.systemPackages = [ pkgs.curl ]; networking.firewall.allowedTCPPorts = [ 8096 8080 ]; }; testScript = '' import time, json start_all() server.wait_for_unit("multi-user.target") for svc in ["mock-jellyfin", "mock-qbittorrent"]: server.wait_for_unit(f"{svc}.service") for port in [8096, 8080]: server.wait_for_open_port(port) time.sleep(2) def set_state(**kwargs): server.succeed(f"curl -sX POST -H 'Content-Type: application/json' -d '{json.dumps(kwargs)}' http://localhost:8096/control/state") def is_throttled(): return server.succeed("curl -s http://localhost:8080/api/v2/transfer/speedLimitsMode").strip() == "1" # Verify initial state assert not is_throttled(), "Should start unthrottled" assert "[]" in server.succeed("curl -s http://localhost:8096/Sessions"), "No initial sessions" # Start monitor with fast intervals python = "${pkgs.python3.withPackages (ps: [ ps.requests ])}/bin/python" monitor = "${../services/jellyfin-qbittorrent-monitor.py}" server.succeed(f""" systemd-run --unit=monitor-test \ --setenv=JELLYFIN_URL=http://localhost:8096 \ --setenv=QBITTORRENT_URL=http://localhost:8080 \ --setenv=CHECK_INTERVAL=1 \ --setenv=STREAMING_START_DELAY=1 \ --setenv=STREAMING_STOP_DELAY=1 \ {python} {monitor} """) time.sleep(2) # Test cases: (streaming, media_type, paused, local, expected_throttle) # Throttle only when: external + playing + video content (not audio) test_cases: list[tuple[bool, str, bool, bool, bool]] = [ # Video types, external, playing -> THROTTLE (True, "Movie", False, False, True), (True, "Episode", False, False, True), (True, "Video", False, False, True), # Audio external playing -> NO throttle (True, "Audio", False, False, False), # Paused -> NO throttle (True, "Movie", True, False, False), # Local -> NO throttle (True, "Movie", False, True, False), # Local + paused -> NO throttle (True, "Movie", True, True, False), # No streaming -> NO throttle (False, "Movie", False, False, False), ] for i, (streaming, media_type, paused, local, expected) in enumerate(test_cases, 1): desc = f"Test {i}: streaming={streaming}, type={media_type}, paused={paused}, local={local}" print(f"\n{desc}") set_state(streaming=streaming, media_type=media_type, paused=paused, local=local) time.sleep(1.5) actual = is_throttled() assert actual == expected, f"FAIL {desc}: got {actual}, expected {expected}" # Transition tests: pause/unpause while streaming print("\nTransition: external movie playing -> paused -> playing") set_state(streaming=True, media_type="Movie", paused=False, local=False) time.sleep(1.5) assert is_throttled(), "Should throttle external movie" set_state(streaming=True, media_type="Movie", paused=True, local=False) time.sleep(1.5) assert not is_throttled(), "Should unthrottle when paused" set_state(streaming=True, media_type="Movie", paused=False, local=False) time.sleep(1.5) assert is_throttled(), "Should re-throttle when unpaused" ''; }