109 lines
3.3 KiB
Python
Raw Normal View History

2023-12-12 10:43:09 +01:00
'''
Simple Geolocation with FastAPI
'''
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Annotated, Optional, Union
import geoip2.database
from geoip2.errors import AddressNotFoundError
from fastapi import FastAPI, Path, Body, Request, Response, status
from fastapi.responses import RedirectResponse
2023-12-12 10:43:09 +01:00
from pydantic import BaseModel
app = FastAPI()
GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb'
GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb'
2024-05-07 17:12:23 +02:00
class IPAddressParam(BaseModel):
2023-12-12 14:49:53 +01:00
'''
Payload entry as used in POST
'''
2024-05-07 17:12:23 +02:00
ip: Union[IPv6Address, IPv4Address]
2023-12-12 10:43:09 +01:00
class Locality(BaseModel):
'''
Locality data
'''
city: Optional[str] = None
country: Optional[str] = None
continent: Optional[str] = None
is_eu: bool = False
2023-12-12 10:43:09 +01:00
2024-05-07 17:12:23 +02:00
2023-12-12 10:43:09 +01:00
class GeoLocation(BaseModel):
'''
Geolocation data model
'''
2024-05-07 17:12:23 +02:00
ip: Optional[Union[IPv6Address, IPv4Address]] = None
asn: Optional[int] = None
asn_org: Optional[str] = None
2024-05-07 17:12:23 +02:00
network: Optional[Union[IPv6Network, IPv4Network]] = None
locality: Locality = Locality()
2023-12-12 10:43:09 +01:00
2024-05-07 17:12:23 +02:00
@app.post("/")
2024-05-07 17:12:23 +02:00
async def root_post(ipaddresses: Annotated[
list[IPAddressParam],
Body(title="The IPAddresses to geolocate")],
2024-05-06 17:24:27 +02:00
response: Response
2024-05-07 17:12:23 +02:00
) -> list[GeoLocation]:
2023-12-12 14:49:53 +01:00
'''
Return GeoLocation item(s) for a list of IPAddressParam objects
'''
geolocations = []
with (geoip2.database.Reader(GEOLITE2_ASN_DB) as reader_asn,
geoip2.database.Reader(GEOLITE2_CITY_DB) as reader_city):
for ipaddress in ipaddresses:
try:
asn_data = reader_asn.asn(ipaddress.ip)
city_data = reader_city.city(ipaddress.ip)
geolocations.append(GeoLocation(
ip=ipaddress.ip,
asn=asn_data.autonomous_system_number,
asn_org=asn_data.autonomous_system_organization,
network=asn_data.network,
locality=Locality(
city=city_data.city.name,
country=city_data.country.iso_code,
continent=city_data.continent.code,
is_eu=city_data.country.is_in_european_union
)
))
except AddressNotFoundError:
geolocations.append(GeoLocation(
ip=ipaddress.ip
)
)
2024-05-06 17:24:27 +02:00
if geolocations:
response.headers['Cache-Control'] = 'private, max-age=604800'
return geolocations
2023-12-12 10:43:09 +01:00
@app.get("/{ipaddress}")
2024-05-07 17:12:23 +02:00
async def root_get(ipaddress: Annotated[
Union[IPv4Address, IPv6Address],
Path(title="The IPAddress to geolocate")],
response: Response
2024-05-07 17:12:23 +02:00
) -> GeoLocation:
2023-12-12 10:43:09 +01:00
'''
Look up geolocation for ip in path parameter
'''
2024-05-07 16:50:41 +02:00
locations = await root_post([IPAddressParam(ip=ipaddress)], response)
if locations:
2024-05-06 17:24:27 +02:00
response.headers['Cache-Control'] = 'private, max-age=604800'
return locations.pop()
response.status_code = status.HTTP_404_NOT_FOUND
return GeoLocation()
2024-05-07 17:12:23 +02:00
@app.get("/")
def root_redirect(req: Request) -> RedirectResponse:
'''
Redirect empty request using REMOTE_ADDR
'''
2024-05-07 17:12:23 +02:00
return RedirectResponse(url=str(req.url) + str(req.client.host))