{ config, lib, pkgs, ... }: let testServiceConfigs = { zpool_ssds = ""; https = { domain = "test.local"; }; ports = { vaultwarden = 8222; }; vaultwarden = { path = "/var/lib/vaultwarden"; }; }; testLib = lib.extend ( final: prev: { serviceMountWithZpool = serviceName: zpool: dirs: { ... }: { }; } ); vaultwardenModule = { config, pkgs, ... }: { imports = [ (import ../services/bitwarden.nix { inherit config pkgs; lib = testLib; service_configs = testServiceConfigs; }) ]; }; in pkgs.testers.runNixOSTest { name = "fail2ban-vaultwarden"; nodes = { server = { config, lib, pkgs, ... }: { imports = [ ../modules/security.nix vaultwardenModule ]; # Disable ZFS mount dependencies systemd.services."vaultwarden-mounts".enable = lib.mkForce false; systemd.services."backup-vaultwarden-mounts".enable = lib.mkForce false; systemd.services.vaultwarden = { wants = lib.mkForce [ ]; after = lib.mkForce [ ]; requires = lib.mkForce [ ]; }; systemd.services.backup-vaultwarden = { wants = lib.mkForce [ ]; after = lib.mkForce [ ]; requires = lib.mkForce [ ]; }; # Override Vaultwarden settings for testing # - Listen on all interfaces (not just localhost) # - Enable logging at info level to capture failed login attempts services.vaultwarden.config = { ROCKET_ADDRESS = lib.mkForce "0.0.0.0"; ROCKET_LOG = lib.mkForce "info"; }; # Override for faster testing and correct port services.fail2ban.jails.vaultwarden.settings = { maxretry = lib.mkForce 3; # In test, we connect directly to Vaultwarden port, not via Caddy port = lib.mkForce "8222"; }; networking.firewall.allowedTCPPorts = [ 8222 ]; }; client = { environment.systemPackages = [ pkgs.curl ]; }; }; testScript = '' import time import re start_all() server.wait_for_unit("vaultwarden.service") server.wait_for_unit("fail2ban.service") server.wait_for_open_port(8222) time.sleep(2) with subtest("Verify vaultwarden jail is active"): status = server.succeed("fail2ban-client status") assert "vaultwarden" in status, f"vaultwarden jail not found in: {status}" with subtest("Generate failed login attempts"): # Use -4 to force IPv4 for consistent IP tracking for i in range(4): client.execute(""" curl -4 -s -X POST 'http://server:8222/identity/connect/token' \ -H 'Content-Type: application/x-www-form-urlencoded' \ -H 'Bitwarden-Client-Name: web' \ -H 'Bitwarden-Client-Version: 2024.1.0' \ -d 'grant_type=password&username=bad@user.com&password=badpass&scope=api+offline_access&client_id=web&deviceType=10&deviceIdentifier=test&deviceName=test' \ || true """) time.sleep(0.5) with subtest("Verify IP is banned"): time.sleep(3) status = server.succeed("fail2ban-client status vaultwarden") print(f"vaultwarden jail status: {status}") # Check that at least 1 IP is banned match = re.search(r"Currently banned:\s*(\d+)", status) assert match and int(match.group(1)) >= 1, f"Expected at least 1 banned IP, got: {status}" with subtest("Verify banned client cannot connect"): # Use -4 to test with same IP that was banned exit_code = client.execute("curl -4 -s --max-time 3 http://server:8222/ 2>&1")[0] assert exit_code != 0, "Connection should be blocked" ''; }