from flask import Flask, request from flask_httpauth import HTTPBasicAuth from werkzeug.security import generate_password_hash, check_password_hash from ipaddress import ip_address import logging import re import json from pfctl import pfctl_table_op, pfctl_cfg_read, pfctl_cfg_write app = Flask(__name__) auth = HTTPBasicAuth() logging.basicConfig(level=logging.DEBUG) users = { "erg.verweg.com": 'pbkdf2:sha256:260000$leXVKkMYNu60eQZR$0893397beb241931d33d2c996e66447a375d3b7923aa32fc4af6b80eec716fbe' } PAT_PORT = r'^any(?:\s+port\s+{\w+(?:,\w+)*})?$' PAT_PROT = r'^(?:tcp|udp)$' PAT_NAME = r'^[\w\-]+$' def untaint(pattern, string): ''' untaint string (as perl does) ''' match = re.match(pattern, string) if match: return match.string else: raise ValueError(f'"{string}" is tainted') @auth.verify_password def verify_password(username, password): if username in users and \ check_password_hash(users.get(username), password): return username @app.route("/flush/", methods=['GET']) @auth.login_required def flush(name): remote_user = auth.username() name = untaint(PAT_NAME, name) logging.info(f'Flushing table f2b-{name}' ' in anchor f2b-jail/{remote_user}') res = pfctl_table_op('f2b-jail/{remote_user}', table='f2b-{name}', operation='flush') return json.dumps({'anchor': f'f2b-jail/{remote_user}', 'table': f'f2b-{name}', 'operation': 'flush', 'result': res}) @app.route("/register", methods=['PUT', 'DELETE']) @auth.login_required def register(): remote_user = auth.username() # port / name / protocol data = request.get_json() name = untaint(PAT_NAME, data['name']) protocol = untaint(PAT_PROT, data['protocol']) port = untaint(PAT_PORT, data['port']) cfg = pfctl_cfg_read(f'f2b-jail/{remote_user}') if not cfg: cfg = [] if request.method == 'PUT': cfg.extend([ bytes(f'table persist counters', 'ascii'), bytes(f'block quick proto {protocol}' f' from to {port}', 'ascii') ]) res = pfctl_cfg_write(f'f2b-jail/{remote_user}', b'\n'.join(cfg) + b'\n') elif request.method == 'DELETE': cfg = [cfg_line for cfg_line in cfg if cfg_line.find(bytes(f'', 'ascii')) == -1] res = pfctl_cfg_write(f'f2b-jail/{remote_user}', b'\n'.join(cfg) + b'\n') pfctl_table_op(f'f2b-jail/{remote_user}', table=f'f2b-{name}', operation='flush') pfctl_table_op(f'f2b-jail/{remote_user}', table=f'f2b-{name}', operation='kill') logging.info(f'pfctl -a f2b-jail/{remote_user} -f-') return json.dumps({'remote_user': remote_user, 'data': data}) return json.dumps({'anchor': f'f2b-jail/{remote_user}', 'table': f'f2b-{name}', 'action': 'start' if request.method == 'PUT' else 'stop', 'result': res}) @app.route("/ban", methods=['PUT', 'DELETE']) @auth.login_required def ban(): remote_user = auth.username() data = request.get_json() # name / ip name = untaint(PAT_NAME, data['name']) ip = ip_address(data['ip']) if request.method == 'PUT': logging.info(f'Add {ip} to f2b-{name}' f' in anchor f2b-jail/{remote_user}') res = pfctl_table_op(f'f2b-jail/{remote_user}', table=f'f2b-{name}', operation='add', value=str(ip)) elif request.method == 'DELETE': logging.info(f'Remove {ip} from f2b-{name}' f' in anchor f2b-jail/{remote_user}') res = pfctl_table_op(f'f2b-jail/{remote_user}', table=f'f2b-{name}', operation='delete', value=str(ip)) return json.dumps({'anchor': f'f2b-jail/{remote_user}', 'table': f'f2b-{name}', 'operation': 'add' if request.method == 'PUT' else 'delete', 'result': res}) @app.errorhandler(ValueError) def permission_err(error): ''' Show a json parsable error if the value is illegal ''' logging.fatal(error) return json.dumps({'error': str(error)}), 500 @auth.error_handler def auth_error(): logging.error('Access Denied') return json.dumps({'error': 'Access Denied'}), 401