jail2ban/jail2ban/__init__.py

165 lines
6.0 KiB
Python

from flask import Flask, request, jsonify, current_app
from flask_httpauth import HTTPBasicAuth
from werkzeug.security import check_password_hash
from ipaddress import ip_address
import re
from jail2ban.pfctl import pfctl_table_op, pfctl_cfg_read, pfctl_cfg_write
from jail2ban.auth import get_users
from subprocess import CalledProcessError
auth = HTTPBasicAuth()
PAT_PORT = r'^any(?:\s+port\s+{\w+(?:,\w+)*})?$'
PAT_PROT = r'^(?:tcp|udp)$'
PAT_NAME = r'^[\w\-]+$'
def untaint(pattern, string):
'''
untaint string (as perl does)
'''
match = re.match(pattern, string)
if match:
return match.string
else:
raise ValueError(f'"{string}" is tainted')
def create_app():
app = Flask(__name__, instance_relative_config=True)
# load the instance config, if it exists, when not testing
app.config.from_pyfile('config.py', silent=True)
@auth.verify_password
def verify_password(username, password):
users = get_users()
current_app.logger.debug(users)
current_app.logger.debug('Checking password of %s', username)
if username in users and \
check_password_hash(users.get(username), password):
return username
@app.route("/ping", methods=['GET'])
@auth.login_required
def ping():
remote_user = auth.username()
app.logger.info('Received ping for'
f' anchor f2b-jail/{remote_user}')
return jsonify({'anchor': f'f2b-jail/{remote_user}',
'operation': 'ping',
'result': 'pong'})
@app.route("/flush/<name>", methods=['GET'])
@auth.login_required
def flush(name):
remote_user = auth.username()
name = untaint(PAT_NAME, name)
app.logger.info(f'Flushing table f2b-{name}'
f' in anchor f2b-jail/{remote_user}')
res = pfctl_table_op('f2b-jail/{remote_user}',
table='f2b-{name}',
operation='flush')
return jsonify({'anchor': f'f2b-jail/{remote_user}',
'table': f'f2b-{name}',
'operation': 'flush',
'result': [x.decode('ascii') for x in res]})
@app.route("/register", methods=['PUT', 'DELETE'])
@auth.login_required
def register():
remote_user = auth.username()
# port / name / protocol
data = request.get_json()
name = untaint(PAT_NAME, data['name'])
protocol = untaint(PAT_PROT, data['protocol'])
port = untaint(PAT_PORT, data['port'])
cfg = pfctl_cfg_read(f'f2b-jail/{remote_user}')
if not cfg:
cfg = []
if request.method == 'PUT':
cfg.extend([
bytes(f'table <f2b-{name}> persist counters', 'ascii'),
bytes(f'block quick proto {protocol}'
f' from <f2b-{name}> to {port}', 'ascii')
])
res = pfctl_cfg_write(f'f2b-jail/{remote_user}',
b'\n'.join(cfg) + b'\n')
else: # 'DELETE':
cfg = [cfg_line for cfg_line in cfg
if cfg_line.find(bytes(f'<f2b-{name}>', 'ascii')) == -1]
res = pfctl_cfg_write(f'f2b-jail/{remote_user}',
b'\n'.join(cfg) + b'\n')
pfctl_table_op(f'f2b-jail/{remote_user}',
table=f'f2b-{name}',
operation='flush')
pfctl_table_op(f'f2b-jail/{remote_user}',
table=f'f2b-{name}',
operation='kill')
app.logger.info(f'pfctl -a f2b-jail/{remote_user} -f-')
return jsonify({'anchor': f'f2b-jail/{remote_user}',
'table': f'f2b-{name}',
'action': 'start' if request.method == 'PUT'
else 'stop',
'result': [x.decode('ascii') for x in res]})
@app.route("/ban", methods=['PUT', 'DELETE'])
@auth.login_required
def ban():
remote_user = auth.username()
data = request.get_json()
# name / ip
name = untaint(PAT_NAME, data['name'])
ip = ip_address(data['ip'])
if request.method == 'PUT':
app.logger.info(f'Add {ip} to f2b-{name}'
f' in anchor f2b-jail/{remote_user}')
res = pfctl_table_op(f'f2b-jail/{remote_user}',
table=f'f2b-{name}',
operation='add',
value=str(ip))
else: # 'DELETE':
app.logger.info(f'Remove {ip} from f2b-{name}'
f' in anchor f2b-jail/{remote_user}')
res = pfctl_table_op(f'f2b-jail/{remote_user}',
table=f'f2b-{name}',
operation='delete',
value=str(ip))
return jsonify({'anchor': f'f2b-jail/{remote_user}',
'table': f'f2b-{name}',
'operation': 'add' if request.method == 'PUT'
else 'delete',
'result': res})
@app.errorhandler(ValueError)
def permission_err(error):
'''
Show a json parsable error if the value is illegal
'''
app.logger.fatal(error)
return jsonify({'error': str(error)}), 500
@app.errorhandler(CalledProcessError)
def subprocess_err(error):
'''
Show a json parsable error if the value is illegal
'''
app.logger.fatal(error)
return jsonify({'error': str(error)}), 500
@app.errorhandler(FileNotFoundError)
def filenotfound_err(error):
'''
Show a json parsable error if the value is illegal
'''
app.logger.fatal(error)
return jsonify({'error': str(error)}), 500
@auth.error_handler
def auth_error():
app.logger.error('Access Denied')
return jsonify({'error': 'Access Denied'}), 401
return app