{ config, lib, pkgs, ... }: let testServiceConfigs = { zpool_ssds = ""; https = { domain = "test.local"; }; ports = { immich = 2283; }; immich = { dir = "/var/lib/immich"; }; }; testLib = lib.extend ( final: prev: { serviceMountWithZpool = serviceName: zpool: dirs: { ... }: { }; } ); immichModule = { config, pkgs, ... }: { imports = [ (import ../services/immich.nix { inherit config pkgs; lib = testLib; service_configs = testServiceConfigs; }) ]; }; in pkgs.testers.runNixOSTest { name = "fail2ban-immich"; nodes = { server = { config, lib, pkgs, ... }: { imports = [ ../modules/security.nix immichModule ]; # Immich needs postgres services.postgresql.enable = true; # Let immich create its own DB for testing services.immich.database.createDB = lib.mkForce true; # Disable ZFS mount dependencies systemd.services."immich-server-mounts".enable = lib.mkForce false; systemd.services."immich-machine-learning-mounts".enable = lib.mkForce false; systemd.services.immich-server = { wants = lib.mkForce [ ]; after = lib.mkForce [ "postgresql.service" ]; requires = lib.mkForce [ ]; }; systemd.services.immich-machine-learning = { wants = lib.mkForce [ ]; after = lib.mkForce [ ]; requires = lib.mkForce [ ]; }; # Override for faster testing and correct port services.fail2ban.jails.immich.settings = { maxretry = lib.mkForce 3; # In test, we connect directly to Immich port, not via Caddy port = lib.mkForce "2283"; }; networking.firewall.allowedTCPPorts = [ 2283 ]; # Immich needs more resources virtualisation.diskSize = 4 * 1024; virtualisation.memorySize = 4 * 1024; # 4GB RAM for Immich }; client = { environment.systemPackages = [ pkgs.curl ]; }; }; testScript = '' import time import re start_all() server.wait_for_unit("postgresql.service") server.wait_for_unit("immich-server.service", timeout=120) server.wait_for_unit("fail2ban.service") server.wait_for_open_port(2283, timeout=60) time.sleep(3) with subtest("Verify immich jail is active"): status = server.succeed("fail2ban-client status") assert "immich" in status, f"immich jail not found in: {status}" with subtest("Generate failed login attempts"): # Use -4 to force IPv4 for consistent IP tracking for i in range(4): client.execute( "curl -4 -s -X POST http://server:2283/api/auth/login -H 'Content-Type: application/json' -d '{\"email\":\"bad@user.com\",\"password\":\"badpass\"}' || true" ) time.sleep(0.5) with subtest("Verify IP is banned"): time.sleep(3) status = server.succeed("fail2ban-client status immich") print(f"immich jail status: {status}") # Check that at least 1 IP is banned match = re.search(r"Currently banned:\s*(\d+)", status) assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}" with subtest("Verify banned client cannot connect"): # Use -4 to test with same IP that was banned exit_code = client.execute("curl -4 -s --max-time 3 http://server:2283/ 2>&1")[0] assert exit_code != 0, "Connection should be blocked" ''; }