Compare commits
	
		
			15 Commits
		
	
	
		
			2022.1
			...
			542718b956
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 542718b956 | |||
| 34f871ae75 | |||
| f3f8bd5dc6 | |||
| 5971c27a8f | |||
| 47e8208ce2 | |||
| af1fef189c | |||
| c33e63978e | |||
| 963f7e5702 | |||
| 450792b2d2 | |||
| ffe144f6b5 | |||
| a20145b447 | |||
| b5f16f161c | |||
| 7c7734c996 | |||
| bd12b2a18a | |||
| 64523ae8b5 | 
| @ -1,3 +0,0 @@ | |||||||
| [](https://gitlab.niet.verweg.com/ruben/jail2ban-pf/-/commits/main) |  | ||||||
| [](https://gitlab.niet.verweg.com/ruben/jail2ban-pf/-/commits/main) |  | ||||||
|  |  | ||||||
| @ -77,7 +77,7 @@ def create_app(): | |||||||
|                 ]) |                 ]) | ||||||
|             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', |             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||||
|                                   b'\n'.join(cfg) + b'\n') |                                   b'\n'.join(cfg) + b'\n') | ||||||
|         else:  # 'DELETE': |         elif request.method == 'DELETE': | ||||||
|             cfg = [cfg_line for cfg_line in cfg |             cfg = [cfg_line for cfg_line in cfg | ||||||
|                    if cfg_line.find(bytes(f'<f2b-{name}>', 'ascii')) == -1] |                    if cfg_line.find(bytes(f'<f2b-{name}>', 'ascii')) == -1] | ||||||
|             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', |             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||||
| @ -110,7 +110,7 @@ def create_app(): | |||||||
|                                  table=f'f2b-{name}', |                                  table=f'f2b-{name}', | ||||||
|                                  operation='add', |                                  operation='add', | ||||||
|                                  value=str(ip)) |                                  value=str(ip)) | ||||||
|         else:  # 'DELETE': |         elif request.method == 'DELETE': | ||||||
|             app.logger.info(f'Remove {ip} from f2b-{name}' |             app.logger.info(f'Remove {ip} from f2b-{name}' | ||||||
|                             f' in anchor f2b-jail/{remote_user}') |                             f' in anchor f2b-jail/{remote_user}') | ||||||
|             res = pfctl_table_op(f'f2b-jail/{remote_user}', |             res = pfctl_table_op(f'f2b-jail/{remote_user}', | ||||||
| @ -139,14 +139,6 @@ def create_app(): | |||||||
|         app.logger.fatal(error) |         app.logger.fatal(error) | ||||||
|         return jsonify({'error': str(error)}), 500 |         return jsonify({'error': str(error)}), 500 | ||||||
|  |  | ||||||
|     @app.errorhandler(FileNotFoundError) |  | ||||||
|     def filenotfound_err(error): |  | ||||||
|         ''' |  | ||||||
|         Show a json parsable error if the value is illegal |  | ||||||
|         ''' |  | ||||||
|         app.logger.fatal(error) |  | ||||||
|         return jsonify({'error': str(error)}), 500 |  | ||||||
|  |  | ||||||
|     @auth.error_handler |     @auth.error_handler | ||||||
|     def auth_error(): |     def auth_error(): | ||||||
|         app.logger.error('Access Denied') |         app.logger.error('Access Denied') | ||||||
|  | |||||||
| @ -1,15 +1,18 @@ | |||||||
| from flask import current_app | from flask import current_app, g | ||||||
|  | import os | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_users(): | def get_users(): | ||||||
|  |     if 'users' not in g: | ||||||
|         users = {} |         users = {} | ||||||
|         authfile = current_app.config['AUTHFILE'] |         authfile = current_app.config['AUTHFILE'] | ||||||
|  |  | ||||||
|         current_app.logger.debug('Reading %s for users', authfile) |         current_app.logger.debug('Reading %s for users', authfile) | ||||||
|  |  | ||||||
|     with current_app.open_resource(authfile) as f: |         with current_app.open_resource(os.path.join("..", | ||||||
|  |                                                     authfile)) as f: | ||||||
|             for entry in f: |             for entry in f: | ||||||
|             users.update({ |                 users.update({tuple(entry.decode('ascii').strip().split(':', 1))}) | ||||||
|                 tuple(entry.decode('ascii').strip().split(':', 1))}) |             g.users = users | ||||||
|     current_app.logger.debug(users) |     current_app.logger.debug(g.users) | ||||||
|     return users |     return g.users | ||||||
|  | |||||||
| @ -1,5 +1,4 @@ | |||||||
| import pytest | import pytest | ||||||
| import base64 |  | ||||||
| from jail2ban import create_app | from jail2ban import create_app | ||||||
|  |  | ||||||
|  |  | ||||||
| @ -9,7 +8,7 @@ def app(): | |||||||
|     app.config.update({ |     app.config.update({ | ||||||
|         "TESTING": True, |         "TESTING": True, | ||||||
|         "SECRET_KEY": 'Testing', |         "SECRET_KEY": 'Testing', | ||||||
|         "AUTHFILE": '../tests/users-test.txt' |         "AUTHFILE": 'tests/users-test.txt' | ||||||
|     }) |     }) | ||||||
|  |  | ||||||
|     # other setup can go here |     # other setup can go here | ||||||
| @ -27,8 +26,3 @@ def client(app): | |||||||
| @pytest.fixture() | @pytest.fixture() | ||||||
| def runner(app): | def runner(app): | ||||||
|     return app.test_cli_runner() |     return app.test_cli_runner() | ||||||
|  |  | ||||||
|  |  | ||||||
| @pytest.fixture() |  | ||||||
| def valid_credentials(): |  | ||||||
|     return base64.b64encode(b"test.example.com:testpassword").decode("utf-8") |  | ||||||
|  | |||||||
| @ -1,7 +1,8 @@ | |||||||
|  | import base64 | ||||||
| from types import SimpleNamespace | from types import SimpleNamespace | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_ban_ipv6(client, mocker, valid_credentials): | def test_ban_ipv6(client, mocker): | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -12,16 +13,16 @@ def test_ban_ipv6(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} |     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} | ||||||
|     response = client.put("/ban", |     response = client.put("/ban", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'add' |     assert response.json['operation'] == 'add' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_ban_ipv4(client, mocker, valid_credentials): | def test_ban_ipv4(client, mocker): | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -32,16 +33,15 @@ def test_ban_ipv4(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "192.0.2.42"} |     json_payload = {"name": "sshd", "ip": "192.0.2.42"} | ||||||
|     response = client.put("/ban", |     response = client.put("/ban", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'add' |     assert response.json['operation'] == 'add' | ||||||
|  |  | ||||||
|  | def test_ban_invalid(client, mocker): | ||||||
| def test_ban_invalid(client, mocker, valid_credentials): |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -52,17 +52,16 @@ def test_ban_invalid(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "not:an::addr:ess"} |     json_payload = {"name": "sshd", "ip": "not:an::addr:ess"} | ||||||
|     response = client.put("/ban", |     response = client.put("/ban", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['error'] == "'not:an::addr:ess' does not " \ |     assert response.json['error'] == "'not:an::addr:ess' does not appear to be an IPv4 or IPv6 address" | ||||||
|                                      "appear to be an IPv4 or IPv6 address" |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_unban_ipv6(client, mocker, valid_credentials): | def test_unban_ipv6(client, mocker): | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -73,16 +72,16 @@ def test_unban_ipv6(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} |     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} | ||||||
|     response = client.delete("/ban", |     response = client.delete("/ban", | ||||||
|                              json=json_payload, |                              json=json_payload, | ||||||
|                              headers={"Authorization": |                              headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                       "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'delete' |     assert response.json['operation'] == 'delete' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_unban_ipv4(client, mocker, valid_credentials): | def test_unban_ipv4(client, mocker): | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -93,10 +92,10 @@ def test_unban_ipv4(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "192.0.2.42"} |     json_payload = {"name": "sshd", "ip": "192.0.2.42"} | ||||||
|     response = client.delete("/ban", |     response = client.delete("/ban", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                              headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                       "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'delete' |     assert response.json['operation'] == 'delete' | ||||||
|  | |||||||
| @ -1,8 +1,9 @@ | |||||||
|  | import base64 | ||||||
| from types import SimpleNamespace | from types import SimpleNamespace | ||||||
| from subprocess import CalledProcessError | from subprocess import CalledProcessError | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_flush(client, mocker, valid_credentials): | def test_flush(client, mocker): | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -13,65 +14,25 @@ def test_flush(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     name = 'sshd' |     name = 'sshd' | ||||||
|     response = client.get(f"/flush/{name}", |     response = client.get(f"/flush/{name}", | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'flush' |     assert response.json['operation'] == 'flush' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_flush_nonexistent(client, mocker, valid_credentials): | def test_flush_nonexistent(client, mocker): | ||||||
|  |  | ||||||
|     cmd = ['/usr/local/bin/sudo', |     cmd = ['/usr/local/bin/sudo', '/sbin/pfctl', '-a', 'some/anchor', '-t', 'nonexistent', '-T', 'flush'] | ||||||
|            '/sbin/pfctl', '-a', 'some/anchor', |  | ||||||
|            '-t', 'nonexistent', '-T', 'flush'] |  | ||||||
|  |  | ||||||
|     side_effect = CalledProcessError(255, cmd, output=b'', |  | ||||||
|                                      stderr=b'pfctl: Table does not exist') |  | ||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', |     mocker.patch('jail2ban.pfctl.run', | ||||||
|                  side_effect=side_effect) |                  side_effect=CalledProcessError(255, cmd, output=b'', | ||||||
|  |                                                 stderr=b'pfctl: Table does not exist')) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     name = 'nonexistent' |     name = 'nonexistent' | ||||||
|     response = client.get(f"/flush/{name}", |     response = client.get(f"/flush/{name}", | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert 'error' in response.json |     assert 'error' in response.json | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_wrong_method(client, mocker, valid_credentials): |  | ||||||
|  |  | ||||||
|     cmd = ['/usr/local/bin/sudo', |  | ||||||
|            '/sbin/pfctl', '-a', 'some/anchor', |  | ||||||
|            '-t', 'nonexistent', '-T', 'flush'] |  | ||||||
|  |  | ||||||
|     side_effect = CalledProcessError(255, cmd, output=b'', |  | ||||||
|                                      stderr=b'pfctl: Table does not exist') |  | ||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', |  | ||||||
|                  side_effect=side_effect) |  | ||||||
|  |  | ||||||
|     name = 'nonexistent' |  | ||||||
|     response = client.put(f"/flush/{name}", |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.status_code == 405 |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_filenotfound(app, mocker, valid_credentials): |  | ||||||
|  |  | ||||||
|     app.config.update({ |  | ||||||
|         "AUTHFILE": '../tests/nonexistent-users-test.txt' |  | ||||||
|     }) |  | ||||||
|  |  | ||||||
|     client = app.test_client() |  | ||||||
|  |  | ||||||
|     name = 'nonexistent' |  | ||||||
|     response = client.get(f"/flush/{name}", |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.status_code == 500 |  | ||||||
|  | |||||||
| @ -1,3 +1,4 @@ | |||||||
|  | import base64 | ||||||
| from subprocess import CompletedProcess | from subprocess import CompletedProcess | ||||||
|  |  | ||||||
| pfctl_stdout_lines = b''' | pfctl_stdout_lines = b''' | ||||||
| @ -6,12 +7,7 @@ block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtps | |||||||
| block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtp | block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtp | ||||||
| block drop quick proto tcp from <f2b-sshd> to any port = ssh | block drop quick proto tcp from <f2b-sshd> to any port = ssh | ||||||
| block drop quick proto tcp from <f2b-recidive> to any | block drop quick proto tcp from <f2b-recidive> to any | ||||||
| '''.strip() + b'\n' | ''' | ||||||
|  |  | ||||||
| pfctl_stdout_lines_scratch = b'table <f2b-dovecot> persist counters\n' \ |  | ||||||
|                              b'block quick proto tcp from <f2b-dovecot>' \ |  | ||||||
|                              b' to any port ' \ |  | ||||||
|                              b'{pop3,pop3s,imap,imaps,submission,465,sieve}\n' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_unauth(client): | def test_register_unauth(client): | ||||||
| @ -23,7 +19,7 @@ def test_register_unauth(client): | |||||||
|     assert response.json['error'] == 'Access Denied' |     assert response.json['error'] == 'Access Denied' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_unregister_valid(client, mocker, valid_credentials): | def test_register_valid(client, mocker): | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |     run_res = CompletedProcess(args=['true'], returncode=0) | ||||||
| @ -32,67 +28,19 @@ def test_unregister_valid(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"port": |     json_payload = {"port": | ||||||
|                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", |                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |                     "name": "dovecot", "protocol": "tcp"} | ||||||
|  |  | ||||||
|     response = client.delete("/register", |     response = client.delete("/register", | ||||||
|                              json=json_payload, |                              json=json_payload, | ||||||
|                              headers={"Authorization": |                              headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                       "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['action'] == 'stop' |     assert response.json['action'] == 'stop' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_valid(client, mocker, valid_credentials): | def test_unregister_valid(client, mocker): | ||||||
|     def noop(): |  | ||||||
|         pass |  | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |  | ||||||
|     run_res.stdout = pfctl_stdout_lines |  | ||||||
|     run_res.check_returncode = noop |  | ||||||
|  |  | ||||||
|     pfctl_run = mocker.patch('jail2ban.pfctl.run', return_value=run_res) |  | ||||||
|  |  | ||||||
|     json_payload = {"port": |  | ||||||
|                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", |  | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |  | ||||||
|  |  | ||||||
|     response = client.put("/register", |  | ||||||
|                           json=json_payload, |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     pfctl_run_input_arg = pfctl_run.call_args_list[1][1]['input'] |  | ||||||
|     for existing_line in pfctl_stdout_lines.splitlines(): |  | ||||||
|         assert existing_line in pfctl_run_input_arg.splitlines() |  | ||||||
|  |  | ||||||
|     assert response.json['action'] == 'start' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_valid_from_scratch(client, mocker, valid_credentials): |  | ||||||
|     def noop(): |  | ||||||
|         pass |  | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |  | ||||||
|     run_res.stdout = b'' |  | ||||||
|     run_res.check_returncode = noop |  | ||||||
|  |  | ||||||
|     pfctl_run = mocker.patch('jail2ban.pfctl.run', return_value=run_res) |  | ||||||
|  |  | ||||||
|     json_payload = {"port": |  | ||||||
|                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", |  | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |  | ||||||
|  |  | ||||||
|     response = client.put("/register", |  | ||||||
|                           json=json_payload, |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     pfctl_run_input_arg = pfctl_run.call_args_list[1][1]['input'] |  | ||||||
|     assert pfctl_run_input_arg == pfctl_stdout_lines_scratch |  | ||||||
|     assert response.json['action'] == 'start' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_invalid(client, mocker, valid_credentials): |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |     run_res = CompletedProcess(args=['true'], returncode=0) | ||||||
| @ -101,13 +49,34 @@ def test_register_invalid(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|  |     json_payload = {"port": | ||||||
|  |                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", | ||||||
|  |                     "name": "dovecot", "protocol": "tcp"} | ||||||
|  |  | ||||||
|  |     response = client.put("/register", | ||||||
|  |                           json=json_payload, | ||||||
|  |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|  |  | ||||||
|  |     assert response.json['action'] == 'start' | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_register_invalid(client, mocker): | ||||||
|  |     def noop(): | ||||||
|  |         pass | ||||||
|  |     run_res = CompletedProcess(args=['true'], returncode=0) | ||||||
|  |     run_res.stdout = pfctl_stdout_lines | ||||||
|  |     run_res.check_returncode = noop | ||||||
|  |  | ||||||
|  |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"port": |     json_payload = {"port": | ||||||
|                     "not a pf statement", |                     "not a pf statement", | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |                     "name": "dovecot", "protocol": "tcp"} | ||||||
|  |  | ||||||
|     response = client.put("/register", |     response = client.put("/register", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['error'] == '"not a pf statement" is tainted' |     assert response.json['error'] == '"not a pf statement" is tainted' | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user