diff --git a/.gitea/workflows/bandit.yml b/.gitea/workflows/bandit.yml new file mode 100644 index 0000000..e70599d --- /dev/null +++ b/.gitea/workflows/bandit.yml @@ -0,0 +1,36 @@ +--- +name: Bandit +on: + push: + branches: [main] + pull_request: + branches: [main] + +# XXX need to do stuff with uv +jobs: + audit-runtime-security: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: + - "3.11" + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '${{ matrix.python-version }}' + cache: 'pip' + cache-dependency-path: 'requirements.txt' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + + - name: Analyse code with Bandit + run: | + bandit -x '**/test_*.py,./.venv/**' -r . diff --git a/.gitea/workflows/flake8.yml b/.gitea/workflows/flake8.yml index a50707e..69ec686 100644 --- a/.gitea/workflows/flake8.yml +++ b/.gitea/workflows/flake8.yml @@ -1,17 +1,35 @@ --- name: Flake8 -on: [push] - +on: + push: + branches: [main] + pull_request: + branches: [main] # XXX need to do stuff with uv jobs: - build: - runs-on: freebsd + audit: + runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.11"] + python-version: + - "3.11" steps: - - uses: actions/checkout@v4 + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '${{ matrix.python-version }}' + cache: 'pip' + cache-dependency-path: 'requirements.txt' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt - name: Analyse code with Flake8 run: | flake8 $(git ls-files '*.py') diff --git a/.gitea/workflows/mypy.yml b/.gitea/workflows/mypy.yml index 94c12ea..0409561 100644 --- a/.gitea/workflows/mypy.yml +++ b/.gitea/workflows/mypy.yml @@ -1,17 +1,36 @@ --- name: Mypy -on: [push] - +on: + push: + branches: [main] + pull_request: + branches: [main] # XXX need to do stuff with uv jobs: - build: - runs-on: freebsd + audit-typing: + runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.11"] + python-version: + - "3.11" steps: - - uses: actions/checkout@v4 + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '${{ matrix.python-version }}' + cache: 'pip' + cache-dependency-path: 'requirements.txt' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + - name: Analyse code with Mypy run: | mypy --install-types --non-interactive $(git ls-files '*.py') diff --git a/.gitea/workflows/pip-audit.yml b/.gitea/workflows/pip-audit.yml new file mode 100644 index 0000000..7f41fb1 --- /dev/null +++ b/.gitea/workflows/pip-audit.yml @@ -0,0 +1,38 @@ +--- +name: pip-audit +on: + push: + branches: [main] + pull_request: + branches: [main] + schedule: + - cron: '0 0 * * 0' # Weekly on Sunday + +# XXX need to do stuff with uv +jobs: + audit-dependency-security: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: + - "3.11" + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '${{ matrix.python-version }}' + cache: 'pip' + cache-dependency-path: 'requirements.txt' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + + - name: Check vulnerable components with pip-audit + run: | + pip-audit -r requirements.txt diff --git a/.gitea/workflows/pylint.yml b/.gitea/workflows/pylint.yml index d101f6a..e43b69c 100644 --- a/.gitea/workflows/pylint.yml +++ b/.gitea/workflows/pylint.yml @@ -1,17 +1,35 @@ --- name: Pylint -on: [push] - +on: + push: + branches: [main] + pull_request: + branches: [main] # XXX need to do stuff with uv jobs: - build: - runs-on: freebsd + audit-runtime-security: + runs-on: ubuntu-latest strategy: matrix: - python-version: ["3.11"] + python-version: + - "3.11" steps: - - uses: actions/checkout@v4 + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '${{ matrix.python-version }}' + cache: 'pip' + cache-dependency-path: 'requirements.txt' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt - name: Analyse code with Pylint run: | pylint $(git ls-files '*.py') diff --git a/.gitea/workflows/python-coverage.yml b/.gitea/workflows/python-coverage.yml new file mode 100644 index 0000000..565d8df --- /dev/null +++ b/.gitea/workflows/python-coverage.yml @@ -0,0 +1,50 @@ +name: Python Coverage + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test-and-coverage: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: + - "3.11" + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '${{ matrix.python-version }}' + cache: 'pip' + cache-dependency-path: 'requirements.txt' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r requirements-dev.txt + + - name: Run tests with coverage + run: | + pytest --cov=./ --cov-report=term --cov-report=xml --cov-report=html --junitxml=report.xml tests + + - name: Upload coverage artifacts + uses: actions/upload-artifact@v3 + with: + name: coverage-reports + path: | + coverage.xml + htmlcov/ + + - name: Upload test results + uses: actions/upload-artifact@v3 + with: + name: test-results + path: report.xml diff --git a/.gitignore b/.gitignore index 5d381cc..35995e6 100644 --- a/.gitignore +++ b/.gitignore @@ -51,6 +51,7 @@ coverage.xml .hypothesis/ .pytest_cache/ cover/ +report.xml # Translations *.mo @@ -160,3 +161,5 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +# Vim swap files +*.sw? diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..243a383 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,33 @@ +[MASTER] +init-hook="import os, sys; sys.path.append('app')" + +# Enable only error and fatal messages initially +disable=all +enable=E,F + +[FORMAT] +max-line-length=120 + +[BASIC] +# Good variable names which should always be accepted +good-names=i,j,k,ex,Run,_,id + +[TYPECHECK] +# Don't check for missing member access +ignore-mixin-members=yes + +[VARIABLES] +# Don't check for unused arguments in overridden methods +dummy-variables-rgx=_|dummy|^ignored_|^unused_ + +[DESIGN] +# Maximum number of arguments for function / method +max-args=10 + +[SIMILARITIES] +# Minimum lines number of a similarity +min-similarity-lines=4 + +[MISCELLANEOUS] +# List of note tags to take in consideration +notes=FIXME,XXX,TODO diff --git a/app/main.py b/app/main.py index 6396921..b9306ae 100644 --- a/app/main.py +++ b/app/main.py @@ -1,19 +1,41 @@ ''' Simple Geolocation with FastAPI ''' +import os from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network from typing import Annotated, Optional, Union import geoip2.database -from geoip2.errors import AddressNotFoundError -from fastapi import FastAPI, Path, Body, Request, Response, status +from dotenv import load_dotenv +from fastapi import (Body, FastAPI, HTTPException, Path, Request, Response, + status) +from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse +from geoip2.errors import AddressNotFoundError from pydantic import BaseModel +# Load environment variables +load_dotenv() + app = FastAPI() -GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb' -GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb' +# Configure CORS from environment variables +cors_origins = os.getenv('CORS_ALLOW_ORIGINS', 'http://localhost') +allow_origins = [origin.strip() for origin in cors_origins.split(',') + if origin.strip()] + +app.add_middleware( + CORSMiddleware, + allow_origins=allow_origins, + allow_credentials=True, + allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], + allow_headers=["*"], +) + +GEOLITE2_ASN_DB = os.getenv('GEOLITE2_ASN_DB', + '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb') +GEOLITE2_CITY_DB = os.getenv('GEOLITE2_CITY_DB', + '/usr/local/share/GeoIP/GeoLite2-City.mmdb') class IPAddressParam(BaseModel): @@ -105,4 +127,6 @@ def root_redirect(req: Request) -> RedirectResponse: ''' Redirect empty request using REMOTE_ADDR ''' + if not req.client: + raise HTTPException(status_code=404, detail="Item not found") return RedirectResponse(url=str(req.url) + str(req.client.host)) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..4252fcc --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,9 @@ +faker==40.11.0 +flake8==7.3.0 +mypy==1.19.1 +pylint==4.0.5 +pytest-cov==7.0.0 +bandit==1.7.10 +httpx==0.28.1 +pip-audit==2.10.0 +starlette==0.50.0 diff --git a/requirements.txt b/requirements.txt index 900a300..59b1eae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ -geoip2==4.7.0 -fastapi==0.115.6 +geoip2==5.1.0 +fastapi==0.128.0 +python-dotenv==1.2.1 diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 0000000..c6f165f --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +pythonpath = ../app diff --git a/tests/test_iplookup.py b/tests/test_iplookup.py new file mode 100644 index 0000000..fae1619 --- /dev/null +++ b/tests/test_iplookup.py @@ -0,0 +1,175 @@ +''' +Test ismijnverweg geolookup api +''' +import logging +import random +import re +from ipaddress import ip_network +from operator import itemgetter +from unittest.mock import MagicMock, patch + +import geoip2.database +from faker import Faker +from fastapi.testclient import TestClient +from main import app # type: ignore + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize Faker for generating test data +fake = Faker() + +# Create test client +fake_ipv6 = fake.ipv6() +client = TestClient(app, client=(fake_ipv6, 31337)) + + +def gen_testdata(): + ''' + Generate some mocked up GeoIP2 City/ASN entries + ''' + continents = ('EU', 'NA', 'SA', 'AS', 'AU') + asns = {} + cities = {} + # get me max 10 networks to create mocked up entries + networks = list(filter(lambda network: (network.version == 4 + and network.prefixlen < 32 + and network.prefixlen >= 8) + or (network.version == 6 + and network.prefixlen <= 64 + and network.prefixlen >= 56), + (ip_network(fake.unique.ipv4_public(network=True) + if random.random() < 0.25 + else fake.unique.ipv6(network=True)) + for _ in range(50))))[0:10] + for network in networks: + hostaddr = next(network.hosts()) + logging.info('Using %s from %s', hostaddr, network) + asns[hostaddr] = geoip2.models.ASN( + hostaddr, + network=network, + autonomous_system_organization=fake.company(), + autonomous_system_number=fake.random_number(5)) + cities[hostaddr] = geoip2.models.City( + locales=['en'], + city={'names': {'en': fake.city()}}, + country={'iso_code': fake.country_code(), + 'names': {'en': fake.country()}}, + continent={'code': random.choice(continents)}) + return asns, cities + + +def get_mock_reader(test_data): + ''' + Mock the geoip2.database.Reader + ''' + + def _asn_lookup(ip): + try: + logging.info('Looking up ASN info for %s', ip) + return test_data[0][ip] + except KeyError as exc: + raise geoip2.errors.AddressNotFoundError( + f'{ip} not in test database') from exc + + def _city_lookup(ip): + try: + logging.info('Looking up City info for %s', ip) + return test_data[1][ip] + except KeyError as exc: + raise geoip2.errors.AddressNotFoundError( + f'{ip} not in test database') from exc + + mock_reader = MagicMock() + mock_reader_ctx = MagicMock() + mock_reader_ctx.test_data = test_data + mock_reader_ctx.asn = _asn_lookup + mock_reader_ctx.city = _city_lookup + mock_reader.__enter__ = lambda _: mock_reader_ctx + return mock_reader + + +def test_no_query(): + """Test searching without a query parameter""" + test_data = gen_testdata() + + with patch('geoip2.database.Reader', + return_value=get_mock_reader(test_data)): + + response = client.get("/") + + assert response.status_code == 200 + results = response.json() + logging.info(results) + assert results['ip'] == fake_ipv6 + assert len(results) > 0 + + +def test_single_query(): + """Test searching with an ip address""" + test_data = gen_testdata() + + with patch('geoip2.database.Reader', + return_value=get_mock_reader(test_data)): + fake_ipv4 = fake.ipv4_public() + + response = client.get(f"/{fake_ipv4}") + + assert response.status_code == 200 + results = response.json() + logging.info(results) + assert results['ip'] == fake_ipv4 + assert len(results) > 0 + + +def test_multi_query(): + """Test searching with an ip address""" + test_data = gen_testdata() + + with patch('geoip2.database.Reader', + return_value=get_mock_reader(test_data)): + fake_ips = [{'ip': fake.ipv6() if random.random() > 0.5 + else fake.ipv4()} + for _ in range(16)] + + response = client.post("/", json=fake_ips) + + assert response.status_code == 200 + results = response.json() + logging.info(results) + + for ip in map(itemgetter('ip'), results): + assert ip in map(itemgetter('ip'), fake_ips) + + assert len(results) > 0 + + +def test_invalid_query(): + """Test searching with an invalid ip address""" + test_data = gen_testdata() + + with patch('geoip2.database.Reader', + return_value=get_mock_reader(test_data)): + invalid_ip = '500.312.77.31337' + test_pattern = 'Input is not a valid IPv[46] address' + + response = client.get(f"/{invalid_ip}") + + assert response.status_code == 422 + results = response.json() + logging.info(results) + assert all(map(lambda x: x == invalid_ip, ( + map(itemgetter('input'), results['detail'])))) + assert all(map(lambda x: re.match(test_pattern, x), ( + map(itemgetter('msg'), results['detail'])))) + assert len(results) > 0 + + +if __name__ == "__main__": + # Run tests + test_no_query() + test_single_query() + test_invalid_query() + test_multi_query() + print("All tests passed!")