Mock up geoip2.database asn + city methods in contextmanager mode
Some checks failed
Flake8 / audit (3.11) (pull_request) Failing after 5m19s
Mypy / audit-typing (3.11) (pull_request) Successful in 10m50s
Pylint / audit-runtime-security (3.11) (pull_request) Successful in 10m39s
Python Coverage / test-and-coverage (3.11) (pull_request) Successful in 10m25s
Bandit / audit-runtime-security (3.11) (pull_request) Successful in 9m57s
pip-audit / audit-dependency-security (3.11) (pull_request) Successful in 11m20s

This commit is contained in:
2026-03-17 13:18:53 +01:00
parent bfbfc13dda
commit 72cbe73cce

View File

@ -4,11 +4,13 @@ Test ismijnverweg geolookup api
import logging
import random
import re
from ipaddress import ip_network
from operator import itemgetter
from unittest.mock import MagicMock, patch
import geoip2.database
from faker import Faker
from fastapi.testclient import TestClient
from main import app # type: ignore
# Set up logging
@ -23,63 +25,212 @@ fake_ipv6 = fake.ipv6()
client = TestClient(app, client=(fake_ipv6, 31337))
def gen_testdata():
'''
Generate some mocked up GeoIP2 City/ASN entries
'''
continents = ('EU', 'NA', 'SA', 'AS', 'AU')
asns = {}
cities = {}
# get me max 10 networks to create mocked up entries
networks = list(filter(lambda network: (network.version == 4
and network.prefixlen < 32
and network.prefixlen >= 8)
or (network.version == 6
and network.prefixlen <= 64
and network.prefixlen >= 56),
(ip_network(fake.unique.ipv4_public(network=True)
if random.random() < 0.25
else fake.unique.ipv6(network=True))
for _ in range(50))))[0:10]
for network in networks:
hostaddr = next(network.hosts())
logging.info('Using %s from %s', hostaddr, network)
asns[hostaddr] = geoip2.models.ASN(
hostaddr,
network=network,
autonomous_system_organization=fake.company(),
autonomous_system_number=fake.random_number(5))
cities[hostaddr] = geoip2.models.City(
locales=['en'],
city={'names': {'en': fake.city()}},
country={'iso_code': fake.country_code(),
'names': {'en': fake.country()}},
continent={'code': random.choice(continents)})
return asns, cities
def get_mock_reader(test_data):
'''
Mock the geoip2.database.Reader
'''
def _asn_lookup(ip):
try:
logging.info('Looking up ASN info for %s', ip)
return test_data[0][ip]
except KeyError as exc:
raise geoip2.errors.AddressNotFoundError(
f'{ip} not in test database') from exc
def _city_lookup(ip):
try:
logging.info('Looking up City info for %s', ip)
return test_data[1][ip]
except KeyError as exc:
raise geoip2.errors.AddressNotFoundError(
f'{ip} not in test database') from exc
mock_reader = MagicMock()
mock_reader_ctx = MagicMock()
mock_reader_ctx.test_data = test_data
mock_reader_ctx.asn = _asn_lookup
mock_reader_ctx.city = _city_lookup
mock_reader.__enter__ = lambda _: mock_reader_ctx
return mock_reader
def mock_reader_city(ip):
'''
Must throw geoip2.errors.AddressNotFoundError when not in our mocked data
Must return
geoip2.models.City(['en'], continent={'code': 'EU', 'geoname_id': 6255148,
'names': {'de': 'Europa', 'en': 'Europe', 'es': 'Europa', 'fr': 'Europe', 'ja':
'ヨーロッパ', 'pt-BR': 'Europa', 'ru': 'Европа', 'zh-CN': '欧洲'}},
country={'geoname_id': 2750405, 'is_in_european_union': True, 'iso_code': 'NL',
'names': {'de': 'Niederlande', 'en': 'The Netherlands', 'es': 'Holanda', 'fr':
'Pays-Bas', 'ja': 'オランダ王国', 'pt-BR': 'Holanda', 'ru': 'Нидерланды',
'zh-CN': '荷兰'}}, registered_country={'geoname_id': 2750405,
'is_in_european_union': True, 'iso_code': 'NL', 'names': {'de': 'Niederlande',
'en': 'The Netherlands', 'es': 'Holanda', 'fr': 'Pays-Bas', 'ja':
'オランダ王国', 'pt-BR': 'Holanda', 'ru': 'Нидерланды', 'zh-CN': '荷兰'}},
traits={'ip_address': '2a02:898:96::5e8e:f508', 'network': '2a02:898::/32'},
city={'geoname_id': 2759794, 'names': {'de': 'Amsterdam', 'en': 'Amsterdam',
'es': 'Ámsterdam', 'fr': 'Amsterdam', 'ja': 'Amusuterudamu', 'pt-BR':
'Amesterdã', 'ru': 'Амстердам', 'zh-CN': '阿姆斯特丹'}},
location={'accuracy_radius': 20, 'latitude': 52.3759, 'longitude': 4.8975,
'time_zone': 'Europe/Amsterdam'}, postal={'code': '1012'},
subdivisions=[{'geoname_id': 2749879, 'iso_code': 'NH', 'names': {'de':
'Nordholland', 'en': 'North Holland', 'es': 'Holanda Septentrional', 'fr':
'Hollande-Septentrionale', 'ja': '
北ホラント州', 'pt-BR': 'Holanda do Norte', 'ru': 'Северная Голландия', 'zh-CN': '北荷兰省'}}])
geoip2.models.City(
locales: Optional[collections.abc.Sequence[str]],
*,
city: Optional[dict] = None,
continent: Optional[dict] = None,
country: Optional[dict] = None,
location: Optional[dict] = None,
ip_address: Union[str, ipaddress.IPv6Address, ipaddress.IPv4Address, NoneType] = None,
maxmind: Optional[dict] = None,
postal: Optional[dict] = None,
prefix_len: Optional[int] = None,
registered_country: Optional[dict] = None,
represented_country: Optional[dict] = None,
subdivisions: Optional[list[dict]] = None,
traits: Optional[dict] = None,
**_,
) -> None
Docstring: Model for the City Plus web service and the City database.
File: /usr/local/lib/python3.11/site-packages/geoip2/models.py
Type: ABCMeta
Subclasses: Insights, Enterprise
'''
pass
def mock_reader_asn():
'''
Must throw geoip2.errors.AddressNotFoundError when not in our mocked data
Must return
geoip2.models.ASN('2a02:898:96::5e8e:f508', autonomous_system_number=8283, autonomous_system_organization='Netwerkvereniging Coloclue', network='2a02:898::/32')
Init signature:
geoip2.models.ASN(
ip_address: Union[str, ipaddress.IPv6Address, ipaddress.IPv4Address],
*,
autonomous_system_number: Optional[int] = None,
autonomous_system_organization: Optional[str] = None,
network: Optional[str] = None,
prefix_len: Optional[int] = None,
**_,
) -> None
Docstring: Model class for the GeoLite2 ASN.
File: /usr/local/lib/python3.11/site-packages/geoip2/models.py
Type: ABCMeta
Subclasses: ISP
'''
pass
def test_no_query():
"""Test searching without a query parameter"""
test_data = gen_testdata()
response = client.get("/")
with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)):
assert response.status_code == 200
results = response.json()
logging.info(results)
assert results['ip'] == fake_ipv6
assert len(results) > 0
response = client.get("/")
assert response.status_code == 200
results = response.json()
logging.info(results)
assert results['ip'] == fake_ipv6
assert len(results) > 0
def test_single_query():
"""Test searching with an ip address"""
fake_ipv4 = fake.ipv4_public()
test_data = gen_testdata()
response = client.get(f"/{fake_ipv4}")
with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)):
fake_ipv4 = fake.ipv4_public()
assert response.status_code == 200
results = response.json()
logging.info(results)
assert results['ip'] == fake_ipv4
assert len(results) > 0
response = client.get(f"/{fake_ipv4}")
assert response.status_code == 200
results = response.json()
logging.info(results)
assert results['ip'] == fake_ipv4
assert len(results) > 0
def test_multi_query():
"""Test searching with an ip address"""
fake_ips = [{'ip': fake.ipv6() if random.random() > 0.5 else fake.ipv4()}
for _ in range(16)]
test_data = gen_testdata()
response = client.post("/", json=fake_ips)
with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)):
fake_ips = [{'ip': fake.ipv6() if random.random() > 0.5 else fake.ipv4()}
for _ in range(16)]
assert response.status_code == 200
results = response.json()
logging.info(results)
response = client.post("/", json=fake_ips)
for ip in map(itemgetter('ip'), results):
assert ip in map(itemgetter('ip'), fake_ips)
assert response.status_code == 200
results = response.json()
logging.info(results)
assert len(results) > 0
for ip in map(itemgetter('ip'), results):
assert ip in map(itemgetter('ip'), fake_ips)
assert len(results) > 0
def test_invalid_query():
"""Test searching with an invalid ip address"""
invalid_ip = '500.312.77.31337'
test_pattern = 'Input is not a valid IPv[46] address'
test_data = gen_testdata()
response = client.get(f"/{invalid_ip}")
with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)):
invalid_ip = '500.312.77.31337'
test_pattern = 'Input is not a valid IPv[46] address'
assert response.status_code == 422
results = response.json()
logging.info(results)
assert all(map(lambda x: x == invalid_ip, (
map(itemgetter('input'), results['detail']))))
assert all(map(lambda x: re.match(test_pattern, x), (
map(itemgetter('msg'), results['detail']))))
assert len(results) > 0
response = client.get(f"/{invalid_ip}")
assert response.status_code == 422
results = response.json()
logging.info(results)
assert all(map(lambda x: x == invalid_ip, (
map(itemgetter('input'), results['detail']))))
assert all(map(lambda x: re.match(test_pattern, x), (
map(itemgetter('msg'), results['detail']))))
assert len(results) > 0
if __name__ == "__main__":