Compare commits
	
		
			15 Commits
		
	
	
		
			main
			...
			542718b956
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 542718b956 | |||
| 34f871ae75 | |||
| f3f8bd5dc6 | |||
| 5971c27a8f | |||
| 47e8208ce2 | |||
| af1fef189c | |||
| c33e63978e | |||
| 963f7e5702 | |||
| 450792b2d2 | |||
| ffe144f6b5 | |||
| a20145b447 | |||
| b5f16f161c | |||
| 7c7734c996 | |||
| bd12b2a18a | |||
| 64523ae8b5 | 
| @ -1,17 +0,0 @@ | |||||||
| --- |  | ||||||
| name: Flake8 |  | ||||||
| on: [push] |  | ||||||
|  |  | ||||||
|  |  | ||||||
| # XXX need to do stuff with uv |  | ||||||
| jobs: |  | ||||||
|   build: |  | ||||||
|     runs-on: freebsd |  | ||||||
|     strategy: |  | ||||||
|       matrix: |  | ||||||
|         python-version: ["3.11"] |  | ||||||
|     steps: |  | ||||||
|       - uses: actions/checkout@v4 |  | ||||||
|       - name: Analyse code with Flake8 |  | ||||||
|         run: | |  | ||||||
|           flake8 $(git ls-files '*.py') |  | ||||||
| @ -1,17 +0,0 @@ | |||||||
| --- |  | ||||||
| name: Mypy |  | ||||||
| on: [push] |  | ||||||
|  |  | ||||||
|  |  | ||||||
| # XXX need to do stuff with uv |  | ||||||
| jobs: |  | ||||||
|   build: |  | ||||||
|     runs-on: freebsd |  | ||||||
|     strategy: |  | ||||||
|       matrix: |  | ||||||
|         python-version: ["3.11"] |  | ||||||
|     steps: |  | ||||||
|       - uses: actions/checkout@v4 |  | ||||||
|       - name: Analyse code with Mypy |  | ||||||
|         run: | |  | ||||||
|           mypy --install-types --non-interactive $(git ls-files '*.py') |  | ||||||
| @ -1,17 +0,0 @@ | |||||||
| --- |  | ||||||
| name: Pylint |  | ||||||
| on: [push] |  | ||||||
|  |  | ||||||
|  |  | ||||||
| # XXX need to do stuff with uv |  | ||||||
| jobs: |  | ||||||
|   build: |  | ||||||
|     runs-on: freebsd |  | ||||||
|     strategy: |  | ||||||
|       matrix: |  | ||||||
|         python-version: ["3.11"] |  | ||||||
|     steps: |  | ||||||
|       - uses: actions/checkout@v4 |  | ||||||
|       - name: Analyse code with Pylint |  | ||||||
|         run: | |  | ||||||
|           pylint $(git ls-files '*.py') |  | ||||||
							
								
								
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -12,6 +12,3 @@ htmlcov/ | |||||||
| dist/ | dist/ | ||||||
| build/ | build/ | ||||||
| *.egg-info/ | *.egg-info/ | ||||||
|  |  | ||||||
| coverage.xml |  | ||||||
| report.xml |  | ||||||
|  | |||||||
| @ -1,34 +1,6 @@ | |||||||
| --- |  | ||||||
| run pylint: |  | ||||||
|   stage: test |  | ||||||
|   image: python:3.11 |  | ||||||
|   script: |  | ||||||
|     - pip install --upgrade pylint |  | ||||||
|     - pylint $(git ls-files '*.py') |  | ||||||
|   tags: |  | ||||||
|     - docker |  | ||||||
|  |  | ||||||
| run flake8: |  | ||||||
|   stage: test |  | ||||||
|   image: python:3.11 |  | ||||||
|   script: |  | ||||||
|     - pip install --upgrade flake8 |  | ||||||
|     - flake8 $(git ls-files '*.py') |  | ||||||
|   tags: |  | ||||||
|     - docker |  | ||||||
|  |  | ||||||
| run mypy: |  | ||||||
|   stage: test |  | ||||||
|   image: python:3.11 |  | ||||||
|   script: |  | ||||||
|     - pip install --upgrade mypy |  | ||||||
|     - mypy --install-types --non-interactive $(git ls-files '*.py') |  | ||||||
|   tags: |  | ||||||
|     - docker |  | ||||||
|  |  | ||||||
| run tests: | run tests: | ||||||
|   stage: test |   stage: test | ||||||
|   image: python:3.11 |   image: python:3.8 | ||||||
|   script: |   script: | ||||||
|     - pip install pytest pytest-cov pytest-mock pytest-flask |     - pip install pytest pytest-cov pytest-mock pytest-flask | ||||||
|     - pip install Flask-HTTPAuth |     - pip install Flask-HTTPAuth | ||||||
| @ -39,9 +11,7 @@ run tests: | |||||||
|   artifacts: |   artifacts: | ||||||
|     when: always |     when: always | ||||||
|     reports: |     reports: | ||||||
|       coverage_report: |       cobertura: coverage.xml | ||||||
|         coverage_format: cobertura |  | ||||||
|         path: coverage.xml |  | ||||||
|       junit: report.xml |       junit: report.xml | ||||||
|   tags: |   tags: | ||||||
|     - docker |     - docker | ||||||
|  | |||||||
							
								
								
									
										131
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										131
									
								
								README.md
									
									
									
									
									
								
							| @ -1,131 +0,0 @@ | |||||||
| [](https://gitlab.niet.verweg.com/ruben/jail2ban-pf/-/commits/main) |  | ||||||
| [](https://gitlab.niet.verweg.com/ruben/jail2ban-pf/-/commits/main) |  | ||||||
|  |  | ||||||
| An API to remotely control a pf based fail2ban |  | ||||||
|  |  | ||||||
| ## Installation |  | ||||||
|  |  | ||||||
|  |  | ||||||
| * Install uwsgi  |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| sudo pkg install www/uwsgi |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| * Clone this repository |  | ||||||
|  |  | ||||||
| ## Configuration |  | ||||||
|  |  | ||||||
| ### rc.conf |  | ||||||
|  |  | ||||||
| * Use the following for configuring uwsgi in rc.conf |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| sudo sysrc uwsgi\_enable="YES" |  | ||||||
| sudo sysrc uwsgi\_profiles="jail2ban\_pf" |  | ||||||
| sudo sysrc uwsgi\_jail2ban\_pf\_flags="-L -M --uid \_jail2ban --python-path /opt/jail2ban-pf --wsgi-file /opt/jail2ban-pf/wsgi.py --stats 127.0.0.1:9191 --socket 127.0.0.1:3031 --chdir /var/empty --callable app --manage-script-name" |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| ### jail2ban |  | ||||||
|  |  | ||||||
| * Configure <installation root>/instance/config.py |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| SECRET\_KEY = os.urandom(32).hex() |  | ||||||
| AUTHFILE = '/usr/local/etc/jail2ban-pf-users.txt' |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| ### nginx |  | ||||||
|  |  | ||||||
| * Configure a nginx upstream and vhost |  | ||||||
|  |  | ||||||
| _Of course you can listen on ipv4/ipv6 but you want to protect these addresses from inadvertent or malicious probes_ |  | ||||||
|  |  | ||||||
|     upstream uwsgi_pf_jail2ban { |  | ||||||
|         server 127.0.0.1:3031; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     server { |  | ||||||
|         listen       unix:/path/to/jail_1/var/run/pf2ban/pf_jail2ban.sock; |  | ||||||
|         listen       unix:/path/to/jail_2/var/run/pf2ban/pf_jail2ban.sock; |  | ||||||
|         listen       unix:/path/to/jail_3/var/run/pf2ban/pf_jail2ban.sock; |  | ||||||
|         server_name  _; |  | ||||||
|  |  | ||||||
|         location / { |  | ||||||
|             index     index.html index.htm index.php; |  | ||||||
|             allow all; |  | ||||||
|             include /usr/local/etc/nginx/uwsgi_params-dist; |  | ||||||
|             uwsgi_pass uwsgi_pf_jail2ban; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
| ### /etc/pf.conf |  | ||||||
|  |  | ||||||
| * Place anchors in pf for jail2ban to use. You probably want to place the early in your existing pf configuration |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| anchor "f2b/*" |  | ||||||
| anchor f2b-jail { |  | ||||||
|     anchor "jail1_fqdn" to { <addr_jail1>, <addr_extra_jail1>, <addr_extra6_jail1> } |  | ||||||
|     anchor "jail2_fqdn" to { <addr_jail2>, <addr_extra_jail2>, <addr_extra6_jail2> } |  | ||||||
|     anchor "jail3_fqdn" to { <addr_jail3>, <addr_extra_jail3>, <addr_extra6_jail3> } |  | ||||||
| } |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| Having seperate anchors per jail makes it possible to have fine grained |  | ||||||
| blocking: Something that is harmful to jail2 might be perfectly legit for jail2. |  | ||||||
|  |  | ||||||
| #### Checking rules/tables made with fail2ban/jail2ban |  | ||||||
| Fail2ban will (re)create the per anchor rules on startup, and populate the designated address tables with offenders, e.g.: |  | ||||||
|  |  | ||||||
|     sudo pfctl -a f2b-jail/jail1\_fqdn -T show -t f2b-recidive |  | ||||||
|     192.0.2.66 |  | ||||||
|     2001:db8:abad:cafe:0bad:f00d |  | ||||||
|  |  | ||||||
| And the rules referencing these tables |  | ||||||
|  |  | ||||||
|     sudo pfctl -a 'f2b-jail/jail1\_fqdn' -s rules |  | ||||||
|     block drop quick proto tcp from <f2b-dovecot> to any port = pop3 |  | ||||||
|     block drop quick proto tcp from <f2b-dovecot> to any port = pop3s |  | ||||||
|     block drop quick proto tcp from <f2b-dovecot> to any port = imap |  | ||||||
|     block drop quick proto tcp from <f2b-dovecot> to any port = imaps |  | ||||||
|     block drop quick proto tcp from <f2b-dovecot> to any port = submission |  | ||||||
|     block drop quick proto tcp from <f2b-dovecot> to any port = smtps |  | ||||||
|     block drop quick proto tcp from <f2b-dovecot> to any port = sieve |  | ||||||
|     block drop quick proto tcp from <f2b-sendmail-auth> to any port = submission |  | ||||||
|     block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtps |  | ||||||
|     block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtp |  | ||||||
|     block drop quick proto tcp from <f2b-sshd> to any port = ssh |  | ||||||
|     block drop quick proto tcp from <f2b-recidive> to any |  | ||||||
|      |  | ||||||
| ### fail2ban |  | ||||||
|  |  | ||||||
| * Create the following action plugin for fail2ban on the jail desiring to use fail2ban/jail2ban |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| cat <<'EOT' | tee /usr/local/etc/fail2ban/action.d/jail2ban-pf.conf > /dev/null |  | ||||||
| Definition] |  | ||||||
| actionstart = curl --unix-socket <jail2ban_sock> --basic -u '<jail2ban_user>:<jail2ban_pass>' -XPUT -H 'Content-Type: application/json' -d '{"port":"<actiontype>","name":"<name>","protocol":"<protocol>"}' http://localhost/register |  | ||||||
| actionstart_on_demand = false |  | ||||||
| actionstop = curl --unix-socket <jail2ban_sock> --basic -u '<jail2ban_user>:<jail2ban_pass>' -XDELETE -H 'Content-Type: application/json' -d '{"port":"<actiontype>","name":"<name>","protocol":"<protocol>"}' http://localhost/register |  | ||||||
| actionflush = curl --unix-socket <jail2ban_sock> --basic -u '<jail2ban_user>:<jail2ban_pass>' -X GET http://localhost/flush/<name> |  | ||||||
| actioncheck =  |  | ||||||
| actionban = curl --unix-socket <jail2ban_sock> --basic -u '<jail2ban_user>:<jail2ban_pass>' -X PUT -H 'Content-Type: application/json' -d '{"name":"<name>","ip":"<ip>"}' http://localhost/ban |  | ||||||
| actionunban = curl --unix-socket <jail2ban_sock> --basic -u '<jail2ban_user>:<jail2ban_pass>' -X DELETE -H 'Content-Type: application/json' -d '{"name":"<name>","ip":"<ip>"}' http://localhost/ban |  | ||||||
| [Init] |  | ||||||
| protocol = tcp |  | ||||||
| actiontype = <multiport> |  | ||||||
| allports = any |  | ||||||
| multiport = any port {<port>} |  | ||||||
| jail2ban_sock = /var/run/pf2ban/jail2ban.sock |  | ||||||
| jail2ban_user = login as set in password file for jail2ban |  | ||||||
| jail2ban_pass = password as set in password file for jail2ban |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| * Configure jail.local |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| cat <<'EOT' | tee /usr/local/etc/fail2ban/jail.local > /dev/null |  | ||||||
| [DEFAULT] |  | ||||||
| banaction = jail2ban-pf |  | ||||||
| ``` |  | ||||||
| @ -1,17 +1,11 @@ | |||||||
| ''' | from flask import Flask, request, jsonify, current_app | ||||||
| An API to remotely control a pf based fail2ban | from flask_httpauth import HTTPBasicAuth | ||||||
| ''' |  | ||||||
|  |  | ||||||
| import re |  | ||||||
| from ipaddress import ip_address |  | ||||||
| from subprocess import CalledProcessError |  | ||||||
|  |  | ||||||
| from flask import Flask, current_app, jsonify, request |  | ||||||
| from flask_httpauth import HTTPBasicAuth  # type: ignore |  | ||||||
| from werkzeug.security import check_password_hash | from werkzeug.security import check_password_hash | ||||||
|  | from ipaddress import ip_address | ||||||
|  | import re | ||||||
|  | from jail2ban.pfctl import pfctl_table_op, pfctl_cfg_read, pfctl_cfg_write | ||||||
| from jail2ban.auth import get_users | from jail2ban.auth import get_users | ||||||
| from jail2ban.pfctl import pfctl_cfg_read, pfctl_cfg_write, pfctl_table_op | from subprocess import CalledProcessError | ||||||
|  |  | ||||||
|  |  | ||||||
| auth = HTTPBasicAuth() | auth = HTTPBasicAuth() | ||||||
| @ -22,52 +16,39 @@ PAT_PROT = r'^(?:tcp|udp)$' | |||||||
| PAT_NAME = r'^[\w\-]+$' | PAT_NAME = r'^[\w\-]+$' | ||||||
|  |  | ||||||
|  |  | ||||||
| def untaint(pattern: str, string: str) -> str: | def untaint(pattern, string): | ||||||
|     ''' |     ''' | ||||||
|     untaint string (as perl does) |     untaint string (as perl does) | ||||||
|     ''' |     ''' | ||||||
|     match = re.match(pattern, string) |     match = re.match(pattern, string) | ||||||
|     if match: |     if match: | ||||||
|         return match.string |         return match.string | ||||||
|     raise ValueError(f'"{string}" is tainted') |     else: | ||||||
|  |         raise ValueError(f'"{string}" is tainted') | ||||||
|  |  | ||||||
|  |  | ||||||
| def create_app(): | def create_app(): | ||||||
|     ''' |  | ||||||
|     Create the flask application |  | ||||||
|     ''' |  | ||||||
|     app = Flask(__name__, instance_relative_config=True) |     app = Flask(__name__, instance_relative_config=True) | ||||||
|  |  | ||||||
|     # load the instance config, if it exists, when not testing |     # load the instance config, if it exists, when not testing | ||||||
|     app.config.from_pyfile('config.py', silent=True) |     app.config.from_pyfile('config.py', silent=True) | ||||||
|  |  | ||||||
|     @auth.verify_password |     @auth.verify_password | ||||||
|     def verify_password(username: str, password: str) -> str | None: |     def verify_password(username, password): | ||||||
|         users = get_users() |         users = get_users() | ||||||
|         current_app.logger.debug(users) |         current_app.logger.debug(users) | ||||||
|         current_app.logger.debug('Checking password of %s', username) |         current_app.logger.debug('Checking password of %s', username) | ||||||
|         if username in users and \ |         if username in users and \ | ||||||
|                 check_password_hash(users.get(username), password): |                 check_password_hash(users.get(username), password): | ||||||
|             return username |             return username | ||||||
|         return None |  | ||||||
|  |  | ||||||
|     @app.route("/ping", methods=['GET']) |  | ||||||
|     @auth.login_required |  | ||||||
|     def ping(): |  | ||||||
|         remote_user = auth.username() |  | ||||||
|         app.logger.info('Received ping for' |  | ||||||
|                         ' anchor f2b-jail/%s', remote_user) |  | ||||||
|         return jsonify({'anchor': f'f2b-jail/{remote_user}', |  | ||||||
|                         'operation': 'ping', |  | ||||||
|                         'result': 'pong'}) |  | ||||||
|  |  | ||||||
|     @app.route("/flush/<name>", methods=['GET']) |     @app.route("/flush/<name>", methods=['GET']) | ||||||
|     @auth.login_required |     @auth.login_required | ||||||
|     def flush(name): |     def flush(name): | ||||||
|         remote_user = auth.username() |         remote_user = auth.username() | ||||||
|         name = untaint(PAT_NAME, name) |         name = untaint(PAT_NAME, name) | ||||||
|         app.logger.info('Flushing table f2b-%s' |         app.logger.info(f'Flushing table f2b-{name}' | ||||||
|                         ' in anchor f2b-jail/%s', name, remote_user) |                         f' in anchor f2b-jail/{remote_user}') | ||||||
|         res = pfctl_table_op('f2b-jail/{remote_user}', |         res = pfctl_table_op('f2b-jail/{remote_user}', | ||||||
|                              table='f2b-{name}', |                              table='f2b-{name}', | ||||||
|                              operation='flush') |                              operation='flush') | ||||||
| @ -96,7 +77,7 @@ def create_app(): | |||||||
|                 ]) |                 ]) | ||||||
|             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', |             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||||
|                                   b'\n'.join(cfg) + b'\n') |                                   b'\n'.join(cfg) + b'\n') | ||||||
|         else:  # 'DELETE': |         elif request.method == 'DELETE': | ||||||
|             cfg = [cfg_line for cfg_line in cfg |             cfg = [cfg_line for cfg_line in cfg | ||||||
|                    if cfg_line.find(bytes(f'<f2b-{name}>', 'ascii')) == -1] |                    if cfg_line.find(bytes(f'<f2b-{name}>', 'ascii')) == -1] | ||||||
|             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', |             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||||
| @ -107,7 +88,7 @@ def create_app(): | |||||||
|             pfctl_table_op(f'f2b-jail/{remote_user}', |             pfctl_table_op(f'f2b-jail/{remote_user}', | ||||||
|                            table=f'f2b-{name}', |                            table=f'f2b-{name}', | ||||||
|                            operation='kill') |                            operation='kill') | ||||||
|         app.logger.info('pfctl -a f2b-jail/%s -f-', remote_user) |         app.logger.info(f'pfctl -a f2b-jail/{remote_user} -f-') | ||||||
|         return jsonify({'anchor': f'f2b-jail/{remote_user}', |         return jsonify({'anchor': f'f2b-jail/{remote_user}', | ||||||
|                         'table': f'f2b-{name}', |                         'table': f'f2b-{name}', | ||||||
|                         'action': 'start' if request.method == 'PUT' |                         'action': 'start' if request.method == 'PUT' | ||||||
| @ -123,15 +104,15 @@ def create_app(): | |||||||
|         name = untaint(PAT_NAME, data['name']) |         name = untaint(PAT_NAME, data['name']) | ||||||
|         ip = ip_address(data['ip']) |         ip = ip_address(data['ip']) | ||||||
|         if request.method == 'PUT': |         if request.method == 'PUT': | ||||||
|             app.logger.info('Add %s to f2b-%s' |             app.logger.info(f'Add {ip} to f2b-{name}' | ||||||
|                             ' in anchor f2b-jail/%s', ip, name, remote_user) |                             f' in anchor f2b-jail/{remote_user}') | ||||||
|             res = pfctl_table_op(f'f2b-jail/{remote_user}', |             res = pfctl_table_op(f'f2b-jail/{remote_user}', | ||||||
|                                  table=f'f2b-{name}', |                                  table=f'f2b-{name}', | ||||||
|                                  operation='add', |                                  operation='add', | ||||||
|                                  value=str(ip)) |                                  value=str(ip)) | ||||||
|         else:  # 'DELETE': |         elif request.method == 'DELETE': | ||||||
|             app.logger.info('Remove %s from f2b-%s' |             app.logger.info(f'Remove {ip} from f2b-{name}' | ||||||
|                             ' in anchor f2b-jail/%s', ip, name, remote_user) |                             f' in anchor f2b-jail/{remote_user}') | ||||||
|             res = pfctl_table_op(f'f2b-jail/{remote_user}', |             res = pfctl_table_op(f'f2b-jail/{remote_user}', | ||||||
|                                  table=f'f2b-{name}', |                                  table=f'f2b-{name}', | ||||||
|                                  operation='delete', |                                  operation='delete', | ||||||
| @ -158,14 +139,6 @@ def create_app(): | |||||||
|         app.logger.fatal(error) |         app.logger.fatal(error) | ||||||
|         return jsonify({'error': str(error)}), 500 |         return jsonify({'error': str(error)}), 500 | ||||||
|  |  | ||||||
|     @app.errorhandler(FileNotFoundError) |  | ||||||
|     def filenotfound_err(error): |  | ||||||
|         ''' |  | ||||||
|         Show a json parsable error if the value is illegal |  | ||||||
|         ''' |  | ||||||
|         app.logger.fatal(error) |  | ||||||
|         return jsonify({'error': str(error)}), 500 |  | ||||||
|  |  | ||||||
|     @auth.error_handler |     @auth.error_handler | ||||||
|     def auth_error(): |     def auth_error(): | ||||||
|         app.logger.error('Access Denied') |         app.logger.error('Access Denied') | ||||||
|  | |||||||
| @ -1,21 +1,18 @@ | |||||||
| ''' | from flask import current_app, g | ||||||
| Authentication backend | import os | ||||||
| ''' |  | ||||||
| from flask import current_app |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_users(): | def get_users(): | ||||||
|     ''' |     if 'users' not in g: | ||||||
|     Load users from password file (AUTHFILE) |         users = {} | ||||||
|     ''' |         authfile = current_app.config['AUTHFILE'] | ||||||
|     users = {} |  | ||||||
|     authfile = current_app.config['AUTHFILE'] |  | ||||||
|  |  | ||||||
|     current_app.logger.debug('Reading %s for users', authfile) |         current_app.logger.debug('Reading %s for users', authfile) | ||||||
|  |  | ||||||
|     with current_app.open_resource(authfile) as f: |         with current_app.open_resource(os.path.join("..", | ||||||
|         for entry in f: |                                                     authfile)) as f: | ||||||
|             users.update({ |             for entry in f: | ||||||
|                 tuple(entry.decode('ascii').strip().split(':', 1))}) |                 users.update({tuple(entry.decode('ascii').strip().split(':', 1))}) | ||||||
|     current_app.logger.debug(users) |             g.users = users | ||||||
|     return users |     current_app.logger.debug(g.users) | ||||||
|  |     return g.users | ||||||
|  | |||||||
| @ -1,6 +1,3 @@ | |||||||
| ''' |  | ||||||
| pf table/anchor operations |  | ||||||
| ''' |  | ||||||
| import logging | import logging | ||||||
| from subprocess import run | from subprocess import run | ||||||
|  |  | ||||||
| @ -9,9 +6,6 @@ _PFCTL = '/sbin/pfctl' | |||||||
|  |  | ||||||
|  |  | ||||||
| def pfctl_cfg_read(anchor): | def pfctl_cfg_read(anchor): | ||||||
|     ''' |  | ||||||
|     Read from pf anchor |  | ||||||
|     ''' |  | ||||||
|     cmd = [_SUDO, _PFCTL, '-a', anchor, '-sr'] |     cmd = [_SUDO, _PFCTL, '-a', anchor, '-sr'] | ||||||
|     logging.info('Running %s', cmd) |     logging.info('Running %s', cmd) | ||||||
|  |  | ||||||
| @ -22,9 +16,6 @@ def pfctl_cfg_read(anchor): | |||||||
|  |  | ||||||
|  |  | ||||||
| def pfctl_cfg_write(anchor, cfg): | def pfctl_cfg_write(anchor, cfg): | ||||||
|     ''' |  | ||||||
|     Apply configuration to pf anchor |  | ||||||
|     ''' |  | ||||||
|     cmd = [_SUDO, _PFCTL, '-a', anchor, '-f-'] |     cmd = [_SUDO, _PFCTL, '-a', anchor, '-f-'] | ||||||
|     logging.info('Running %s', cmd) |     logging.info('Running %s', cmd) | ||||||
|     logging.info('Config %s', cfg) |     logging.info('Config %s', cfg) | ||||||
| @ -39,13 +30,6 @@ def pfctl_cfg_write(anchor, cfg): | |||||||
|  |  | ||||||
|  |  | ||||||
| def pfctl_table_op(anchor, **kwargs): | def pfctl_table_op(anchor, **kwargs): | ||||||
|     ''' |  | ||||||
|     pf table operation |  | ||||||
|     Parameters: |  | ||||||
|     * table: which table to work on |  | ||||||
|     * operation: operation used on the table |  | ||||||
|     * value (optional): value used for the operation |  | ||||||
|     ''' |  | ||||||
|     table = kwargs['table'] |     table = kwargs['table'] | ||||||
|     operation = kwargs['operation'] |     operation = kwargs['operation'] | ||||||
|     value = kwargs['value'] if 'value' in kwargs else None |     value = kwargs['value'] if 'value' in kwargs else None | ||||||
|  | |||||||
| @ -1,10 +1,4 @@ | |||||||
| ''' |  | ||||||
| Test fixtures |  | ||||||
| ''' |  | ||||||
| import base64 |  | ||||||
|  |  | ||||||
| import pytest | import pytest | ||||||
|  |  | ||||||
| from jail2ban import create_app | from jail2ban import create_app | ||||||
|  |  | ||||||
|  |  | ||||||
| @ -14,7 +8,7 @@ def app(): | |||||||
|     app.config.update({ |     app.config.update({ | ||||||
|         "TESTING": True, |         "TESTING": True, | ||||||
|         "SECRET_KEY": 'Testing', |         "SECRET_KEY": 'Testing', | ||||||
|         "AUTHFILE": '../tests/users-test.txt' |         "AUTHFILE": 'tests/users-test.txt' | ||||||
|     }) |     }) | ||||||
|  |  | ||||||
|     # other setup can go here |     # other setup can go here | ||||||
| @ -26,23 +20,9 @@ def app(): | |||||||
|  |  | ||||||
| @pytest.fixture() | @pytest.fixture() | ||||||
| def client(app): | def client(app): | ||||||
|     ''' |  | ||||||
|     Create a synthetic client |  | ||||||
|     ''' |  | ||||||
|     return app.test_client() |     return app.test_client() | ||||||
|  |  | ||||||
|  |  | ||||||
| @pytest.fixture() | @pytest.fixture() | ||||||
| def runner(app): | def runner(app): | ||||||
|     ''' |  | ||||||
|     Create a synthetic runner |  | ||||||
|     ''' |  | ||||||
|     return app.test_cli_runner() |     return app.test_cli_runner() | ||||||
|  |  | ||||||
|  |  | ||||||
| @pytest.fixture() |  | ||||||
| def valid_credentials(): |  | ||||||
|     ''' |  | ||||||
|     Mock authentication for the test |  | ||||||
|     ''' |  | ||||||
|     return base64.b64encode(b"test.example.com:testpassword").decode("utf-8") |  | ||||||
|  | |||||||
| @ -1,13 +1,8 @@ | |||||||
| ''' | import base64 | ||||||
| Test banning |  | ||||||
| ''' |  | ||||||
| from types import SimpleNamespace | from types import SimpleNamespace | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_ban_ipv6(client, mocker, valid_credentials): | def test_ban_ipv6(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test ban an IPv6 address |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -18,19 +13,16 @@ def test_ban_ipv6(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} |     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} | ||||||
|     response = client.put("/ban", |     response = client.put("/ban", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'add' |     assert response.json['operation'] == 'add' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_ban_ipv4(client, mocker, valid_credentials): | def test_ban_ipv4(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test ban an IPv4 address |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -41,19 +33,15 @@ def test_ban_ipv4(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "192.0.2.42"} |     json_payload = {"name": "sshd", "ip": "192.0.2.42"} | ||||||
|     response = client.put("/ban", |     response = client.put("/ban", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'add' |     assert response.json['operation'] == 'add' | ||||||
|  |  | ||||||
|  | def test_ban_invalid(client, mocker): | ||||||
| def test_ban_invalid(client, mocker, valid_credentials): |  | ||||||
|     ''' |  | ||||||
|     Test ban an invalid address |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -64,20 +52,16 @@ def test_ban_invalid(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "not:an::addr:ess"} |     json_payload = {"name": "sshd", "ip": "not:an::addr:ess"} | ||||||
|     response = client.put("/ban", |     response = client.put("/ban", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['error'] == "'not:an::addr:ess' does not " \ |     assert response.json['error'] == "'not:an::addr:ess' does not appear to be an IPv4 or IPv6 address" | ||||||
|                                      "appear to be an IPv4 or IPv6 address" |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_unban_ipv6(client, mocker, valid_credentials): | def test_unban_ipv6(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test unbanning an IPv6 address |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -88,19 +72,16 @@ def test_unban_ipv6(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} |     json_payload = {"name": "sshd", "ip": "2001:db8::abad:cafe"} | ||||||
|     response = client.delete("/ban", |     response = client.delete("/ban", | ||||||
|                              json=json_payload, |                              json=json_payload, | ||||||
|                              headers={"Authorization": |                              headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                       "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'delete' |     assert response.json['operation'] == 'delete' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_unban_ipv4(client, mocker, valid_credentials): | def test_unban_ipv4(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test unbanning an IPv4 address |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -111,10 +92,10 @@ def test_unban_ipv4(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"name": "sshd", "ip": "192.0.2.42"} |     json_payload = {"name": "sshd", "ip": "192.0.2.42"} | ||||||
|     response = client.delete("/ban", |     response = client.delete("/ban", | ||||||
|                              json=json_payload, |                           json=json_payload, | ||||||
|                              headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                       "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'delete' |     assert response.json['operation'] == 'delete' | ||||||
|  | |||||||
| @ -1,14 +1,9 @@ | |||||||
| ''' | import base64 | ||||||
| Test flushing pf tables |  | ||||||
| ''' |  | ||||||
| from types import SimpleNamespace | from types import SimpleNamespace | ||||||
| from subprocess import CalledProcessError | from subprocess import CalledProcessError | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_flush(client, mocker, valid_credentials): | def test_flush(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test flushing existing entry |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = SimpleNamespace() |     run_res = SimpleNamespace() | ||||||
| @ -19,74 +14,25 @@ def test_flush(client, mocker, valid_credentials): | |||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     name = 'sshd' |     name = 'sshd' | ||||||
|     response = client.get(f"/flush/{name}", |     response = client.get(f"/flush/{name}", | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'flush' |     assert response.json['operation'] == 'flush' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_flush_nonexistent(client, mocker, valid_credentials): | def test_flush_nonexistent(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test flushing non existing entry |  | ||||||
|     ''' |  | ||||||
|  |  | ||||||
|     cmd = ['/usr/local/bin/sudo', |     cmd = ['/usr/local/bin/sudo', '/sbin/pfctl', '-a', 'some/anchor', '-t', 'nonexistent', '-T', 'flush'] | ||||||
|            '/sbin/pfctl', '-a', 'some/anchor', |  | ||||||
|            '-t', 'nonexistent', '-T', 'flush'] |  | ||||||
|  |  | ||||||
|     side_effect = CalledProcessError(255, cmd, output=b'', |  | ||||||
|                                      stderr=b'pfctl: Table does not exist') |  | ||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', |     mocker.patch('jail2ban.pfctl.run', | ||||||
|                  side_effect=side_effect) |                  side_effect=CalledProcessError(255, cmd, output=b'', | ||||||
|  |                                                 stderr=b'pfctl: Table does not exist')) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     name = 'nonexistent' |     name = 'nonexistent' | ||||||
|     response = client.get(f"/flush/{name}", |     response = client.get(f"/flush/{name}", | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert 'error' in response.json |     assert 'error' in response.json | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_wrong_method(client, mocker, valid_credentials): |  | ||||||
|     ''' |  | ||||||
|     Test invalid method |  | ||||||
|     ''' |  | ||||||
|  |  | ||||||
|     cmd = ['/usr/local/bin/sudo', |  | ||||||
|            '/sbin/pfctl', '-a', 'some/anchor', |  | ||||||
|            '-t', 'nonexistent', '-T', 'flush'] |  | ||||||
|  |  | ||||||
|     side_effect = CalledProcessError(255, cmd, output=b'', |  | ||||||
|                                      stderr=b'pfctl: Table does not exist') |  | ||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', |  | ||||||
|                  side_effect=side_effect) |  | ||||||
|  |  | ||||||
|     name = 'nonexistent' |  | ||||||
|     response = client.put(f"/flush/{name}", |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.status_code == 405 |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_filenotfound(app, valid_credentials): |  | ||||||
|     ''' |  | ||||||
|     Test for when AUTHFILE cannot be found |  | ||||||
|     ''' |  | ||||||
|  |  | ||||||
|     app.config.update({ |  | ||||||
|         "AUTHFILE": '../tests/nonexistent-users-test.txt' |  | ||||||
|     }) |  | ||||||
|  |  | ||||||
|     client = app.test_client() |  | ||||||
|  |  | ||||||
|     name = 'nonexistent' |  | ||||||
|     response = client.get(f"/flush/{name}", |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.status_code == 500 |  | ||||||
|  | |||||||
| @ -1,15 +0,0 @@ | |||||||
| ''' |  | ||||||
| Test application health check |  | ||||||
| ''' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_ping(client, valid_credentials): |  | ||||||
|     ''' |  | ||||||
|     Test application health check |  | ||||||
|     ''' |  | ||||||
|  |  | ||||||
|     response = client.get("/ping", |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['operation'] == 'ping' |  | ||||||
| @ -1,26 +1,16 @@ | |||||||
| ''' | import base64 | ||||||
| Test various registration scenarios |  | ||||||
| ''' |  | ||||||
| from subprocess import CompletedProcess | from subprocess import CompletedProcess | ||||||
|  |  | ||||||
| PFCTL_STDOUT_LINES = b''' | pfctl_stdout_lines = b''' | ||||||
| block drop quick proto tcp from <f2b-sendmail-auth> to any port = submission | block drop quick proto tcp from <f2b-sendmail-auth> to any port = submission | ||||||
| block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtps | block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtps | ||||||
| block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtp | block drop quick proto tcp from <f2b-sendmail-auth> to any port = smtp | ||||||
| block drop quick proto tcp from <f2b-sshd> to any port = ssh | block drop quick proto tcp from <f2b-sshd> to any port = ssh | ||||||
| block drop quick proto tcp from <f2b-recidive> to any | block drop quick proto tcp from <f2b-recidive> to any | ||||||
| '''.strip() + b'\n' | ''' | ||||||
|  |  | ||||||
| PFCTL_STDOUT_LINES_SCRATCH = b'table <f2b-dovecot> persist counters\n' \ |  | ||||||
|                              b'block quick proto tcp from <f2b-dovecot>' \ |  | ||||||
|                              b' to any port ' \ |  | ||||||
|                              b'{pop3,pop3s,imap,imaps,submission,465,sieve}\n' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_unauth(client): | def test_register_unauth(client): | ||||||
|     ''' |  | ||||||
|     Test a registration without being authorized |  | ||||||
|     ''' |  | ||||||
|     json_payload = {"port": |     json_payload = {"port": | ||||||
|                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", |                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |                     "name": "dovecot", "protocol": "tcp"} | ||||||
| @ -29,103 +19,64 @@ def test_register_unauth(client): | |||||||
|     assert response.json['error'] == 'Access Denied' |     assert response.json['error'] == 'Access Denied' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_unregister_valid(client, mocker, valid_credentials): | def test_register_valid(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test unregistration |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |     run_res = CompletedProcess(args=['true'], returncode=0) | ||||||
|     run_res.stdout = PFCTL_STDOUT_LINES |     run_res.stdout = pfctl_stdout_lines | ||||||
|     run_res.check_returncode = noop |     run_res.check_returncode = noop | ||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"port": |     json_payload = {"port": | ||||||
|                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", |                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |                     "name": "dovecot", "protocol": "tcp"} | ||||||
|  |  | ||||||
|     response = client.delete("/register", |     response = client.delete("/register", | ||||||
|                              json=json_payload, |                              json=json_payload, | ||||||
|                              headers={"Authorization": |                              headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                       "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['action'] == 'stop' |     assert response.json['action'] == 'stop' | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_valid(client, mocker, valid_credentials): | def test_unregister_valid(client, mocker): | ||||||
|     ''' |  | ||||||
|     Test a registration of a rule |  | ||||||
|     ''' |  | ||||||
|     def noop(): |     def noop(): | ||||||
|         pass |         pass | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |     run_res = CompletedProcess(args=['true'], returncode=0) | ||||||
|     run_res.stdout = PFCTL_STDOUT_LINES |     run_res.stdout = pfctl_stdout_lines | ||||||
|     run_res.check_returncode = noop |  | ||||||
|  |  | ||||||
|     pfctl_run = mocker.patch('jail2ban.pfctl.run', return_value=run_res) |  | ||||||
|  |  | ||||||
|     json_payload = {"port": |  | ||||||
|                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", |  | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |  | ||||||
|  |  | ||||||
|     response = client.put("/register", |  | ||||||
|                           json=json_payload, |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     pfctl_run_input_arg = pfctl_run.call_args_list[1][1]['input'] |  | ||||||
|     for existing_line in PFCTL_STDOUT_LINES.splitlines(): |  | ||||||
|         assert existing_line in pfctl_run_input_arg.splitlines() |  | ||||||
|  |  | ||||||
|     assert response.json['action'] == 'start' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_valid_from_scratch(client, mocker, valid_credentials): |  | ||||||
|     ''' |  | ||||||
|     Test from scratch point of view |  | ||||||
|     ''' |  | ||||||
|     def noop(): |  | ||||||
|         pass |  | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |  | ||||||
|     run_res.stdout = b'' |  | ||||||
|     run_res.check_returncode = noop |  | ||||||
|  |  | ||||||
|     pfctl_run = mocker.patch('jail2ban.pfctl.run', return_value=run_res) |  | ||||||
|  |  | ||||||
|     json_payload = {"port": |  | ||||||
|                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", |  | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |  | ||||||
|  |  | ||||||
|     response = client.put("/register", |  | ||||||
|                           json=json_payload, |  | ||||||
|                           headers={"Authorization": |  | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     pfctl_run_input_arg = pfctl_run.call_args_list[1][1]['input'] |  | ||||||
|     assert pfctl_run_input_arg == PFCTL_STDOUT_LINES_SCRATCH |  | ||||||
|     assert response.json['action'] == 'start' |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def test_register_invalid(client, mocker, valid_credentials): |  | ||||||
|     ''' |  | ||||||
|     Test a bogus pf command |  | ||||||
|     ''' |  | ||||||
|     def noop(): |  | ||||||
|         pass |  | ||||||
|     run_res = CompletedProcess(args=['true'], returncode=0) |  | ||||||
|     run_res.stdout = PFCTL_STDOUT_LINES |  | ||||||
|     run_res.check_returncode = noop |     run_res.check_returncode = noop | ||||||
|  |  | ||||||
|     mocker.patch('jail2ban.pfctl.run', return_value=run_res) |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|  |     json_payload = {"port": | ||||||
|  |                     "any port {pop3,pop3s,imap,imaps,submission,465,sieve}", | ||||||
|  |                     "name": "dovecot", "protocol": "tcp"} | ||||||
|  |  | ||||||
|  |     response = client.put("/register", | ||||||
|  |                           json=json_payload, | ||||||
|  |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|  |  | ||||||
|  |     assert response.json['action'] == 'start' | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test_register_invalid(client, mocker): | ||||||
|  |     def noop(): | ||||||
|  |         pass | ||||||
|  |     run_res = CompletedProcess(args=['true'], returncode=0) | ||||||
|  |     run_res.stdout = pfctl_stdout_lines | ||||||
|  |     run_res.check_returncode = noop | ||||||
|  |  | ||||||
|  |     mocker.patch('jail2ban.pfctl.run', return_value=run_res) | ||||||
|  |  | ||||||
|  |     valid_credentials = base64.b64encode(b"test.example.com:testpassword").decode("utf-8") | ||||||
|     json_payload = {"port": |     json_payload = {"port": | ||||||
|                     "not a pf statement", |                     "not a pf statement", | ||||||
|                     "name": "dovecot", "protocol": "tcp"} |                     "name": "dovecot", "protocol": "tcp"} | ||||||
|  |  | ||||||
|     response = client.put("/register", |     response = client.put("/register", | ||||||
|                           json=json_payload, |                           json=json_payload, | ||||||
|                           headers={"Authorization": |                           headers={"Authorization": "Basic " + valid_credentials}) | ||||||
|                                    "Basic " + valid_credentials}) |  | ||||||
|  |  | ||||||
|     assert response.json['error'] == '"not a pf statement" is tainted' |     assert response.json['error'] == '"not a pf statement" is tainted' | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user