From 72cbe73cce66db2b2ef39565979e648225b6b89c Mon Sep 17 00:00:00 2001 From: Ruben van Staveren Date: Tue, 17 Mar 2026 13:18:53 +0100 Subject: [PATCH] Mock up geoip2.database asn + city methods in contextmanager mode --- tests/test_iplookup.py | 219 ++++++++++++++++++++++++++++++++++------- 1 file changed, 185 insertions(+), 34 deletions(-) diff --git a/tests/test_iplookup.py b/tests/test_iplookup.py index 55f3956..8bdf797 100644 --- a/tests/test_iplookup.py +++ b/tests/test_iplookup.py @@ -4,11 +4,13 @@ Test ismijnverweg geolookup api import logging import random import re +from ipaddress import ip_network from operator import itemgetter +from unittest.mock import MagicMock, patch +import geoip2.database from faker import Faker from fastapi.testclient import TestClient - from main import app # type: ignore # Set up logging @@ -23,63 +25,212 @@ fake_ipv6 = fake.ipv6() client = TestClient(app, client=(fake_ipv6, 31337)) +def gen_testdata(): + ''' + Generate some mocked up GeoIP2 City/ASN entries + ''' + continents = ('EU', 'NA', 'SA', 'AS', 'AU') + asns = {} + cities = {} + # get me max 10 networks to create mocked up entries + networks = list(filter(lambda network: (network.version == 4 + and network.prefixlen < 32 + and network.prefixlen >= 8) + or (network.version == 6 + and network.prefixlen <= 64 + and network.prefixlen >= 56), + (ip_network(fake.unique.ipv4_public(network=True) + if random.random() < 0.25 + else fake.unique.ipv6(network=True)) + for _ in range(50))))[0:10] + for network in networks: + hostaddr = next(network.hosts()) + logging.info('Using %s from %s', hostaddr, network) + asns[hostaddr] = geoip2.models.ASN( + hostaddr, + network=network, + autonomous_system_organization=fake.company(), + autonomous_system_number=fake.random_number(5)) + cities[hostaddr] = geoip2.models.City( + locales=['en'], + city={'names': {'en': fake.city()}}, + country={'iso_code': fake.country_code(), + 'names': {'en': fake.country()}}, + continent={'code': random.choice(continents)}) + return asns, cities + + +def get_mock_reader(test_data): + ''' + Mock the geoip2.database.Reader + ''' + + def _asn_lookup(ip): + try: + logging.info('Looking up ASN info for %s', ip) + return test_data[0][ip] + except KeyError as exc: + raise geoip2.errors.AddressNotFoundError( + f'{ip} not in test database') from exc + + def _city_lookup(ip): + try: + logging.info('Looking up City info for %s', ip) + return test_data[1][ip] + except KeyError as exc: + raise geoip2.errors.AddressNotFoundError( + f'{ip} not in test database') from exc + + mock_reader = MagicMock() + mock_reader_ctx = MagicMock() + mock_reader_ctx.test_data = test_data + mock_reader_ctx.asn = _asn_lookup + mock_reader_ctx.city = _city_lookup + mock_reader.__enter__ = lambda _: mock_reader_ctx + return mock_reader + + +def mock_reader_city(ip): + ''' + Must throw geoip2.errors.AddressNotFoundError when not in our mocked data + Must return + geoip2.models.City(['en'], continent={'code': 'EU', 'geoname_id': 6255148, + 'names': {'de': 'Europa', 'en': 'Europe', 'es': 'Europa', 'fr': 'Europe', 'ja': + 'ヨーロッパ', 'pt-BR': 'Europa', 'ru': 'Европа', 'zh-CN': '欧洲'}}, + country={'geoname_id': 2750405, 'is_in_european_union': True, 'iso_code': 'NL', + 'names': {'de': 'Niederlande', 'en': 'The Netherlands', 'es': 'Holanda', 'fr': + 'Pays-Bas', 'ja': 'オランダ王国', 'pt-BR': 'Holanda', 'ru': 'Нидерланды', + 'zh-CN': '荷兰'}}, registered_country={'geoname_id': 2750405, + 'is_in_european_union': True, 'iso_code': 'NL', 'names': {'de': 'Niederlande', + 'en': 'The Netherlands', 'es': 'Holanda', 'fr': 'Pays-Bas', 'ja': + 'オランダ王国', 'pt-BR': 'Holanda', 'ru': 'Нидерланды', 'zh-CN': '荷兰'}}, + traits={'ip_address': '2a02:898:96::5e8e:f508', 'network': '2a02:898::/32'}, + city={'geoname_id': 2759794, 'names': {'de': 'Amsterdam', 'en': 'Amsterdam', + 'es': 'Ámsterdam', 'fr': 'Amsterdam', 'ja': 'Amusuterudamu', 'pt-BR': + 'Amesterdã', 'ru': 'Амстердам', 'zh-CN': '阿姆斯特丹'}}, + location={'accuracy_radius': 20, 'latitude': 52.3759, 'longitude': 4.8975, + 'time_zone': 'Europe/Amsterdam'}, postal={'code': '1012'}, + subdivisions=[{'geoname_id': 2749879, 'iso_code': 'NH', 'names': {'de': + 'Nordholland', 'en': 'North Holland', 'es': 'Holanda Septentrional', 'fr': + 'Hollande-Septentrionale', 'ja': ' + 北ホラント州', 'pt-BR': 'Holanda do Norte', 'ru': 'Северная Голландия', 'zh-CN': '北荷兰省'}}]) + geoip2.models.City( + locales: Optional[collections.abc.Sequence[str]], + *, + city: Optional[dict] = None, + continent: Optional[dict] = None, + country: Optional[dict] = None, + location: Optional[dict] = None, + ip_address: Union[str, ipaddress.IPv6Address, ipaddress.IPv4Address, NoneType] = None, + maxmind: Optional[dict] = None, + postal: Optional[dict] = None, + prefix_len: Optional[int] = None, + registered_country: Optional[dict] = None, + represented_country: Optional[dict] = None, + subdivisions: Optional[list[dict]] = None, + traits: Optional[dict] = None, + **_, + ) -> None + Docstring: Model for the City Plus web service and the City database. + File: /usr/local/lib/python3.11/site-packages/geoip2/models.py + Type: ABCMeta + Subclasses: Insights, Enterprise + ''' + pass + +def mock_reader_asn(): + ''' + Must throw geoip2.errors.AddressNotFoundError when not in our mocked data + Must return + geoip2.models.ASN('2a02:898:96::5e8e:f508', autonomous_system_number=8283, autonomous_system_organization='Netwerkvereniging Coloclue', network='2a02:898::/32') + Init signature: + geoip2.models.ASN( + ip_address: Union[str, ipaddress.IPv6Address, ipaddress.IPv4Address], + *, + autonomous_system_number: Optional[int] = None, + autonomous_system_organization: Optional[str] = None, + network: Optional[str] = None, + prefix_len: Optional[int] = None, + **_, + ) -> None + Docstring: Model class for the GeoLite2 ASN. + File: /usr/local/lib/python3.11/site-packages/geoip2/models.py + Type: ABCMeta + Subclasses: ISP + + ''' + pass + + def test_no_query(): """Test searching without a query parameter""" + test_data = gen_testdata() - response = client.get("/") + with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)): - assert response.status_code == 200 - results = response.json() - logging.info(results) - assert results['ip'] == fake_ipv6 - assert len(results) > 0 + response = client.get("/") + + assert response.status_code == 200 + results = response.json() + logging.info(results) + assert results['ip'] == fake_ipv6 + assert len(results) > 0 def test_single_query(): """Test searching with an ip address""" - fake_ipv4 = fake.ipv4_public() + test_data = gen_testdata() - response = client.get(f"/{fake_ipv4}") + with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)): + fake_ipv4 = fake.ipv4_public() - assert response.status_code == 200 - results = response.json() - logging.info(results) - assert results['ip'] == fake_ipv4 - assert len(results) > 0 + response = client.get(f"/{fake_ipv4}") + + assert response.status_code == 200 + results = response.json() + logging.info(results) + assert results['ip'] == fake_ipv4 + assert len(results) > 0 def test_multi_query(): """Test searching with an ip address""" - fake_ips = [{'ip': fake.ipv6() if random.random() > 0.5 else fake.ipv4()} - for _ in range(16)] + test_data = gen_testdata() - response = client.post("/", json=fake_ips) + with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)): + fake_ips = [{'ip': fake.ipv6() if random.random() > 0.5 else fake.ipv4()} + for _ in range(16)] - assert response.status_code == 200 - results = response.json() - logging.info(results) + response = client.post("/", json=fake_ips) - for ip in map(itemgetter('ip'), results): - assert ip in map(itemgetter('ip'), fake_ips) + assert response.status_code == 200 + results = response.json() + logging.info(results) - assert len(results) > 0 + for ip in map(itemgetter('ip'), results): + assert ip in map(itemgetter('ip'), fake_ips) + + assert len(results) > 0 def test_invalid_query(): """Test searching with an invalid ip address""" - invalid_ip = '500.312.77.31337' - test_pattern = 'Input is not a valid IPv[46] address' + test_data = gen_testdata() - response = client.get(f"/{invalid_ip}") + with patch('geoip2.database.Reader', return_value=get_mock_reader(test_data)): + invalid_ip = '500.312.77.31337' + test_pattern = 'Input is not a valid IPv[46] address' - assert response.status_code == 422 - results = response.json() - logging.info(results) - assert all(map(lambda x: x == invalid_ip, ( - map(itemgetter('input'), results['detail'])))) - assert all(map(lambda x: re.match(test_pattern, x), ( - map(itemgetter('msg'), results['detail'])))) - assert len(results) > 0 + response = client.get(f"/{invalid_ip}") + + assert response.status_code == 422 + results = response.json() + logging.info(results) + assert all(map(lambda x: x == invalid_ip, ( + map(itemgetter('input'), results['detail'])))) + assert all(map(lambda x: re.match(test_pattern, x), ( + map(itemgetter('msg'), results['detail'])))) + assert len(results) > 0 if __name__ == "__main__":