Merge pull request 'handle_cors' (#2) from handle_cors into main
All checks were successful
pip-audit / audit-dependency-security (3.11) (push) Successful in 11m16s
Pylint / audit-runtime-security (3.11) (push) Successful in 10m36s
Bandit / audit-runtime-security (3.11) (push) Successful in 10m6s
Flake8 / audit (3.11) (push) Successful in 10m1s
Mypy / audit-typing (3.11) (push) Successful in 10m55s
Python Coverage / test-and-coverage (3.11) (push) Successful in 10m26s

Reviewed-on: #2
This commit is contained in:
2026-03-17 16:23:28 +00:00
13 changed files with 450 additions and 24 deletions

View File

@ -0,0 +1,36 @@
---
name: Bandit
on:
push:
branches: [main]
pull_request:
branches: [main]
# XXX need to do stuff with uv
jobs:
audit-runtime-security:
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.11"
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '${{ matrix.python-version }}'
cache: 'pip'
cache-dependency-path: 'requirements.txt'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Analyse code with Bandit
run: |
bandit -x '**/test_*.py,./.venv/**' -r .

View File

@ -1,17 +1,35 @@
---
name: Flake8
on: [push]
on:
push:
branches: [main]
pull_request:
branches: [main]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
audit:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
python-version:
- "3.11"
steps:
- uses: actions/checkout@v4
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '${{ matrix.python-version }}'
cache: 'pip'
cache-dependency-path: 'requirements.txt'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Analyse code with Flake8
run: |
flake8 $(git ls-files '*.py')

View File

@ -1,17 +1,36 @@
---
name: Mypy
on: [push]
on:
push:
branches: [main]
pull_request:
branches: [main]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
audit-typing:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
python-version:
- "3.11"
steps:
- uses: actions/checkout@v4
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '${{ matrix.python-version }}'
cache: 'pip'
cache-dependency-path: 'requirements.txt'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Analyse code with Mypy
run: |
mypy --install-types --non-interactive $(git ls-files '*.py')

View File

@ -0,0 +1,38 @@
---
name: pip-audit
on:
push:
branches: [main]
pull_request:
branches: [main]
schedule:
- cron: '0 0 * * 0' # Weekly on Sunday
# XXX need to do stuff with uv
jobs:
audit-dependency-security:
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.11"
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '${{ matrix.python-version }}'
cache: 'pip'
cache-dependency-path: 'requirements.txt'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Check vulnerable components with pip-audit
run: |
pip-audit -r requirements.txt

View File

@ -1,17 +1,35 @@
---
name: Pylint
on: [push]
on:
push:
branches: [main]
pull_request:
branches: [main]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
audit-runtime-security:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
python-version:
- "3.11"
steps:
- uses: actions/checkout@v4
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '${{ matrix.python-version }}'
cache: 'pip'
cache-dependency-path: 'requirements.txt'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Analyse code with Pylint
run: |
pylint $(git ls-files '*.py')

View File

@ -0,0 +1,50 @@
name: Python Coverage
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test-and-coverage:
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.11"
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '${{ matrix.python-version }}'
cache: 'pip'
cache-dependency-path: 'requirements.txt'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run tests with coverage
run: |
pytest --cov=./ --cov-report=term --cov-report=xml --cov-report=html --junitxml=report.xml tests
- name: Upload coverage artifacts
uses: actions/upload-artifact@v3
with:
name: coverage-reports
path: |
coverage.xml
htmlcov/
- name: Upload test results
uses: actions/upload-artifact@v3
with:
name: test-results
path: report.xml

3
.gitignore vendored
View File

@ -51,6 +51,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
cover/
report.xml
# Translations
*.mo
@ -160,3 +161,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Vim swap files
*.sw?

33
.pylintrc Normal file
View File

@ -0,0 +1,33 @@
[MASTER]
init-hook="import os, sys; sys.path.append('app')"
# Enable only error and fatal messages initially
disable=all
enable=E,F
[FORMAT]
max-line-length=120
[BASIC]
# Good variable names which should always be accepted
good-names=i,j,k,ex,Run,_,id
[TYPECHECK]
# Don't check for missing member access
ignore-mixin-members=yes
[VARIABLES]
# Don't check for unused arguments in overridden methods
dummy-variables-rgx=_|dummy|^ignored_|^unused_
[DESIGN]
# Maximum number of arguments for function / method
max-args=10
[SIMILARITIES]
# Minimum lines number of a similarity
min-similarity-lines=4
[MISCELLANEOUS]
# List of note tags to take in consideration
notes=FIXME,XXX,TODO

View File

@ -1,19 +1,41 @@
'''
Simple Geolocation with FastAPI
'''
import os
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Annotated, Optional, Union
import geoip2.database
from geoip2.errors import AddressNotFoundError
from fastapi import FastAPI, Path, Body, Request, Response, status
from dotenv import load_dotenv
from fastapi import (Body, FastAPI, HTTPException, Path, Request, Response,
status)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from geoip2.errors import AddressNotFoundError
from pydantic import BaseModel
# Load environment variables
load_dotenv()
app = FastAPI()
GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb'
GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb'
# Configure CORS from environment variables
cors_origins = os.getenv('CORS_ALLOW_ORIGINS', 'http://localhost')
allow_origins = [origin.strip() for origin in cors_origins.split(',')
if origin.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
GEOLITE2_ASN_DB = os.getenv('GEOLITE2_ASN_DB',
'/usr/local/share/GeoIP/GeoLite2-ASN.mmdb')
GEOLITE2_CITY_DB = os.getenv('GEOLITE2_CITY_DB',
'/usr/local/share/GeoIP/GeoLite2-City.mmdb')
class IPAddressParam(BaseModel):
@ -105,4 +127,6 @@ def root_redirect(req: Request) -> RedirectResponse:
'''
Redirect empty request using REMOTE_ADDR
'''
if not req.client:
raise HTTPException(status_code=404, detail="Item not found")
return RedirectResponse(url=str(req.url) + str(req.client.host))

9
requirements-dev.txt Normal file
View File

@ -0,0 +1,9 @@
faker==40.11.0
flake8==7.3.0
mypy==1.19.1
pylint==4.0.5
pytest-cov==7.0.0
bandit==1.7.10
httpx==0.28.1
pip-audit==2.10.0
starlette==0.50.0

View File

@ -1,2 +1,3 @@
geoip2==4.7.0
fastapi==0.115.6
geoip2==5.1.0
fastapi==0.128.0
python-dotenv==1.2.1

2
tests/pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
pythonpath = ../app

175
tests/test_iplookup.py Normal file
View File

@ -0,0 +1,175 @@
'''
Test ismijnverweg geolookup api
'''
import logging
import random
import re
from ipaddress import ip_network
from operator import itemgetter
from unittest.mock import MagicMock, patch
import geoip2.database
from faker import Faker
from fastapi.testclient import TestClient
from main import app # type: ignore
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize Faker for generating test data
fake = Faker()
# Create test client
fake_ipv6 = fake.ipv6()
client = TestClient(app, client=(fake_ipv6, 31337))
def gen_testdata():
'''
Generate some mocked up GeoIP2 City/ASN entries
'''
continents = ('EU', 'NA', 'SA', 'AS', 'AU')
asns = {}
cities = {}
# get me max 10 networks to create mocked up entries
networks = list(filter(lambda network: (network.version == 4
and network.prefixlen < 32
and network.prefixlen >= 8)
or (network.version == 6
and network.prefixlen <= 64
and network.prefixlen >= 56),
(ip_network(fake.unique.ipv4_public(network=True)
if random.random() < 0.25
else fake.unique.ipv6(network=True))
for _ in range(50))))[0:10]
for network in networks:
hostaddr = next(network.hosts())
logging.info('Using %s from %s', hostaddr, network)
asns[hostaddr] = geoip2.models.ASN(
hostaddr,
network=network,
autonomous_system_organization=fake.company(),
autonomous_system_number=fake.random_number(5))
cities[hostaddr] = geoip2.models.City(
locales=['en'],
city={'names': {'en': fake.city()}},
country={'iso_code': fake.country_code(),
'names': {'en': fake.country()}},
continent={'code': random.choice(continents)})
return asns, cities
def get_mock_reader(test_data):
'''
Mock the geoip2.database.Reader
'''
def _asn_lookup(ip):
try:
logging.info('Looking up ASN info for %s', ip)
return test_data[0][ip]
except KeyError as exc:
raise geoip2.errors.AddressNotFoundError(
f'{ip} not in test database') from exc
def _city_lookup(ip):
try:
logging.info('Looking up City info for %s', ip)
return test_data[1][ip]
except KeyError as exc:
raise geoip2.errors.AddressNotFoundError(
f'{ip} not in test database') from exc
mock_reader = MagicMock()
mock_reader_ctx = MagicMock()
mock_reader_ctx.test_data = test_data
mock_reader_ctx.asn = _asn_lookup
mock_reader_ctx.city = _city_lookup
mock_reader.__enter__ = lambda _: mock_reader_ctx
return mock_reader
def test_no_query():
"""Test searching without a query parameter"""
test_data = gen_testdata()
with patch('geoip2.database.Reader',
return_value=get_mock_reader(test_data)):
response = client.get("/")
assert response.status_code == 200
results = response.json()
logging.info(results)
assert results['ip'] == fake_ipv6
assert len(results) > 0
def test_single_query():
"""Test searching with an ip address"""
test_data = gen_testdata()
with patch('geoip2.database.Reader',
return_value=get_mock_reader(test_data)):
fake_ipv4 = fake.ipv4_public()
response = client.get(f"/{fake_ipv4}")
assert response.status_code == 200
results = response.json()
logging.info(results)
assert results['ip'] == fake_ipv4
assert len(results) > 0
def test_multi_query():
"""Test searching with an ip address"""
test_data = gen_testdata()
with patch('geoip2.database.Reader',
return_value=get_mock_reader(test_data)):
fake_ips = [{'ip': fake.ipv6() if random.random() > 0.5
else fake.ipv4()}
for _ in range(16)]
response = client.post("/", json=fake_ips)
assert response.status_code == 200
results = response.json()
logging.info(results)
for ip in map(itemgetter('ip'), results):
assert ip in map(itemgetter('ip'), fake_ips)
assert len(results) > 0
def test_invalid_query():
"""Test searching with an invalid ip address"""
test_data = gen_testdata()
with patch('geoip2.database.Reader',
return_value=get_mock_reader(test_data)):
invalid_ip = '500.312.77.31337'
test_pattern = 'Input is not a valid IPv[46] address'
response = client.get(f"/{invalid_ip}")
assert response.status_code == 422
results = response.json()
logging.info(results)
assert all(map(lambda x: x == invalid_ip, (
map(itemgetter('input'), results['detail']))))
assert all(map(lambda x: re.match(test_pattern, x), (
map(itemgetter('msg'), results['detail']))))
assert len(results) > 0
if __name__ == "__main__":
# Run tests
test_no_query()
test_single_query()
test_invalid_query()
test_multi_query()
print("All tests passed!")