Move application into package and use factory
This commit is contained in:
		
							
								
								
									
										137
									
								
								app.py
									
									
									
									
									
								
							
							
						
						
									
										137
									
								
								app.py
									
									
									
									
									
								
							| @ -1,137 +0,0 @@ | ||||
| from flask import Flask, request, jsonify | ||||
| from flask_httpauth import HTTPBasicAuth | ||||
| from werkzeug.security import check_password_hash | ||||
| from ipaddress import ip_address | ||||
| import re | ||||
| from pfctl import pfctl_table_op, pfctl_cfg_read, pfctl_cfg_write | ||||
|  | ||||
|  | ||||
| app = Flask(__name__) | ||||
|  | ||||
| auth = HTTPBasicAuth() | ||||
|  | ||||
| users = { | ||||
|     "erg.verweg.com": 'pbkdf2:sha256:260000$leXVKkMYNu60eQZR$0893397beb241931d33d2c996e66447a375d3b7923aa32fc4af6b80eec716fbe' | ||||
| } | ||||
|  | ||||
| PAT_PORT = r'^any(?:\s+port\s+{\w+(?:,\w+)*})?$' | ||||
| PAT_PROT = r'^(?:tcp|udp)$' | ||||
| PAT_NAME = r'^[\w\-]+$' | ||||
|  | ||||
|  | ||||
| def untaint(pattern, string): | ||||
|     ''' | ||||
|     untaint string (as perl does) | ||||
|     ''' | ||||
|     match = re.match(pattern, string) | ||||
|     if match: | ||||
|         return match.string | ||||
|     else: | ||||
|         raise ValueError(f'"{string}" is tainted') | ||||
|  | ||||
|  | ||||
| @auth.verify_password | ||||
| def verify_password(username, password): | ||||
|     if username in users and \ | ||||
|             check_password_hash(users.get(username), password): | ||||
|         return username | ||||
|  | ||||
|  | ||||
| @app.route("/flush/<name>", methods=['GET']) | ||||
| @auth.login_required | ||||
| def flush(name): | ||||
|     remote_user = auth.username() | ||||
|     name = untaint(PAT_NAME, name) | ||||
|     app.logger.info(f'Flushing table f2b-{name}' | ||||
|                  ' in anchor f2b-jail/{remote_user}') | ||||
|     res = pfctl_table_op('f2b-jail/{remote_user}', | ||||
|                          table='f2b-{name}', | ||||
|                          operation='flush') | ||||
|     return jsonify({'anchor': f'f2b-jail/{remote_user}', | ||||
|                     'table': f'f2b-{name}', | ||||
|                     'operation': 'flush', | ||||
|                     'result': res}) | ||||
|  | ||||
|  | ||||
| @app.route("/register", methods=['PUT', 'DELETE']) | ||||
| @auth.login_required | ||||
| def register(): | ||||
|     remote_user = auth.username() | ||||
|     # port / name / protocol | ||||
|     data = request.get_json() | ||||
|     name = untaint(PAT_NAME, data['name']) | ||||
|     protocol = untaint(PAT_PROT, data['protocol']) | ||||
|     port = untaint(PAT_PORT, data['port']) | ||||
|     cfg = pfctl_cfg_read(f'f2b-jail/{remote_user}') | ||||
|     if not cfg: | ||||
|         cfg = [] | ||||
|     if request.method == 'PUT': | ||||
|         cfg.extend([ | ||||
|             bytes(f'table <f2b-{name}> persist counters', 'ascii'), | ||||
|             bytes(f'block quick proto {protocol}' | ||||
|                   f' from <f2b-{name}> to {port}', 'ascii') | ||||
|             ]) | ||||
|         res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||
|                               b'\n'.join(cfg) + b'\n') | ||||
|     elif request.method == 'DELETE': | ||||
|         cfg = [cfg_line for cfg_line in cfg | ||||
|                if cfg_line.find(bytes(f'<f2b-{name}>', 'ascii')) == -1] | ||||
|         res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||
|                               b'\n'.join(cfg) + b'\n') | ||||
|         pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                        table=f'f2b-{name}', | ||||
|                        operation='flush') | ||||
|         pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                        table=f'f2b-{name}', | ||||
|                        operation='kill') | ||||
|     app.logger.info(f'pfctl -a f2b-jail/{remote_user} -f-') | ||||
|     return jsonify({'remote_user': remote_user, 'data': data}) | ||||
|     return jsonify({'anchor': f'f2b-jail/{remote_user}', | ||||
|                     'table': f'f2b-{name}', | ||||
|                     'action': 'start' if request.method == 'PUT' | ||||
|                     else 'stop', | ||||
|                     'result': res}) | ||||
|  | ||||
|  | ||||
| @app.route("/ban", methods=['PUT', 'DELETE']) | ||||
| @auth.login_required | ||||
| def ban(): | ||||
|     remote_user = auth.username() | ||||
|     data = request.get_json() | ||||
|     # name / ip | ||||
|     name = untaint(PAT_NAME, data['name']) | ||||
|     ip = ip_address(data['ip']) | ||||
|     if request.method == 'PUT': | ||||
|         app.logger.info(f'Add {ip} to f2b-{name}' | ||||
|                      f' in anchor f2b-jail/{remote_user}') | ||||
|         res = pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                              table=f'f2b-{name}', | ||||
|                              operation='add', | ||||
|                              value=str(ip)) | ||||
|     elif request.method == 'DELETE': | ||||
|         app.logger.info(f'Remove {ip} from f2b-{name}' | ||||
|                      f' in anchor f2b-jail/{remote_user}') | ||||
|         res = pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                              table=f'f2b-{name}', | ||||
|                              operation='delete', | ||||
|                              value=str(ip)) | ||||
|     return jsonify({'anchor': f'f2b-jail/{remote_user}', | ||||
|                     'table': f'f2b-{name}', | ||||
|                     'operation': 'add' if request.method == 'PUT' | ||||
|                     else 'delete', | ||||
|                     'result': res}) | ||||
|  | ||||
|  | ||||
| @app.errorhandler(ValueError) | ||||
| def permission_err(error): | ||||
|     ''' | ||||
|     Show a json parsable error if the value is illegal | ||||
|     ''' | ||||
|     app.logger.fatal(error) | ||||
|     return jsonify({'error': str(error)}), 500 | ||||
|  | ||||
|  | ||||
| @auth.error_handler | ||||
| def auth_error(): | ||||
|     app.logger.error('Access Denied') | ||||
|     return jsonify({'error': 'Access Denied'}), 401 | ||||
							
								
								
									
										135
									
								
								jail2ban/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										135
									
								
								jail2ban/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,135 @@ | ||||
| from flask import Flask, request, jsonify | ||||
| from flask_httpauth import HTTPBasicAuth | ||||
| from werkzeug.security import check_password_hash | ||||
| from ipaddress import ip_address | ||||
| import re | ||||
| from pfctl import pfctl_table_op, pfctl_cfg_read, pfctl_cfg_write | ||||
|  | ||||
|  | ||||
| auth = HTTPBasicAuth() | ||||
|  | ||||
| users = { | ||||
|     "erg.verweg.com": 'pbkdf2:sha256:260000$leXVKkMYNu60eQZR$0893397beb241931d33d2c996e66447a375d3b7923aa32fc4af6b80eec716fbe' | ||||
| } | ||||
|  | ||||
| PAT_PORT = r'^any(?:\s+port\s+{\w+(?:,\w+)*})?$' | ||||
| PAT_PROT = r'^(?:tcp|udp)$' | ||||
| PAT_NAME = r'^[\w\-]+$' | ||||
|  | ||||
|  | ||||
| def untaint(pattern, string): | ||||
|     ''' | ||||
|     untaint string (as perl does) | ||||
|     ''' | ||||
|     match = re.match(pattern, string) | ||||
|     if match: | ||||
|         return match.string | ||||
|     else: | ||||
|         raise ValueError(f'"{string}" is tainted') | ||||
|  | ||||
|  | ||||
| def create_app(): | ||||
|     app = Flask(__name__, instance_relative_config=True) | ||||
|  | ||||
|     @auth.verify_password | ||||
|     def verify_password(username, password): | ||||
|         if username in users and \ | ||||
|                 check_password_hash(users.get(username), password): | ||||
|             return username | ||||
|  | ||||
|     @app.route("/flush/<name>", methods=['GET']) | ||||
|     @auth.login_required | ||||
|     def flush(name): | ||||
|         remote_user = auth.username() | ||||
|         name = untaint(PAT_NAME, name) | ||||
|         app.logger.info(f'Flushing table f2b-{name}' | ||||
|                         f' in anchor f2b-jail/{remote_user}') | ||||
|         res = pfctl_table_op('f2b-jail/{remote_user}', | ||||
|                              table='f2b-{name}', | ||||
|                              operation='flush') | ||||
|         return jsonify({'anchor': f'f2b-jail/{remote_user}', | ||||
|                         'table': f'f2b-{name}', | ||||
|                         'operation': 'flush', | ||||
|                         'result': res}) | ||||
|  | ||||
|     @app.route("/register", methods=['PUT', 'DELETE']) | ||||
|     @auth.login_required | ||||
|     def register(): | ||||
|         remote_user = auth.username() | ||||
|         # port / name / protocol | ||||
|         data = request.get_json() | ||||
|         name = untaint(PAT_NAME, data['name']) | ||||
|         protocol = untaint(PAT_PROT, data['protocol']) | ||||
|         port = untaint(PAT_PORT, data['port']) | ||||
|         cfg = pfctl_cfg_read(f'f2b-jail/{remote_user}') | ||||
|         if not cfg: | ||||
|             cfg = [] | ||||
|         if request.method == 'PUT': | ||||
|             cfg.extend([ | ||||
|                 bytes(f'table <f2b-{name}> persist counters', 'ascii'), | ||||
|                 bytes(f'block quick proto {protocol}' | ||||
|                       f' from <f2b-{name}> to {port}', 'ascii') | ||||
|                 ]) | ||||
|             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||
|                                   b'\n'.join(cfg) + b'\n') | ||||
|         elif request.method == 'DELETE': | ||||
|             cfg = [cfg_line for cfg_line in cfg | ||||
|                    if cfg_line.find(bytes(f'<f2b-{name}>', 'ascii')) == -1] | ||||
|             res = pfctl_cfg_write(f'f2b-jail/{remote_user}', | ||||
|                                   b'\n'.join(cfg) + b'\n') | ||||
|             pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                            table=f'f2b-{name}', | ||||
|                            operation='flush') | ||||
|             pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                            table=f'f2b-{name}', | ||||
|                            operation='kill') | ||||
|         app.logger.info(f'pfctl -a f2b-jail/{remote_user} -f-') | ||||
|         return jsonify({'remote_user': remote_user, 'data': data}) | ||||
|         return jsonify({'anchor': f'f2b-jail/{remote_user}', | ||||
|                         'table': f'f2b-{name}', | ||||
|                         'action': 'start' if request.method == 'PUT' | ||||
|                         else 'stop', | ||||
|                         'result': res}) | ||||
|  | ||||
|     @app.route("/ban", methods=['PUT', 'DELETE']) | ||||
|     @auth.login_required | ||||
|     def ban(): | ||||
|         remote_user = auth.username() | ||||
|         data = request.get_json() | ||||
|         # name / ip | ||||
|         name = untaint(PAT_NAME, data['name']) | ||||
|         ip = ip_address(data['ip']) | ||||
|         if request.method == 'PUT': | ||||
|             app.logger.info(f'Add {ip} to f2b-{name}' | ||||
|                             f' in anchor f2b-jail/{remote_user}') | ||||
|             res = pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                                  table=f'f2b-{name}', | ||||
|                                  operation='add', | ||||
|                                  value=str(ip)) | ||||
|         elif request.method == 'DELETE': | ||||
|             app.logger.info(f'Remove {ip} from f2b-{name}' | ||||
|                             f' in anchor f2b-jail/{remote_user}') | ||||
|             res = pfctl_table_op(f'f2b-jail/{remote_user}', | ||||
|                                  table=f'f2b-{name}', | ||||
|                                  operation='delete', | ||||
|                                  value=str(ip)) | ||||
|         return jsonify({'anchor': f'f2b-jail/{remote_user}', | ||||
|                         'table': f'f2b-{name}', | ||||
|                         'operation': 'add' if request.method == 'PUT' | ||||
|                         else 'delete', | ||||
|                         'result': res}) | ||||
|  | ||||
|     @app.errorhandler(ValueError) | ||||
|     def permission_err(error): | ||||
|         ''' | ||||
|         Show a json parsable error if the value is illegal | ||||
|         ''' | ||||
|         app.logger.fatal(error) | ||||
|         return jsonify({'error': str(error)}), 500 | ||||
|  | ||||
|     @auth.error_handler | ||||
|     def auth_error(): | ||||
|         app.logger.error('Access Denied') | ||||
|         return jsonify({'error': 'Access Denied'}), 401 | ||||
|  | ||||
|     return app | ||||
		Reference in New Issue
	
	Block a user