90 lines
2.9 KiB
Python
90 lines
2.9 KiB
Python
'''
|
|
Simple Geolocation with FastAPI
|
|
'''
|
|
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
|
|
from typing import Annotated, Optional, Union
|
|
|
|
import geoip2.database
|
|
from geoip2.errors import AddressNotFoundError
|
|
from fastapi import FastAPI, Path, Body, Response, status
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI()
|
|
|
|
GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb'
|
|
GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb'
|
|
|
|
class IPAddressParam(BaseModel):
|
|
'''
|
|
Payload entry as used in POST
|
|
'''
|
|
ip: Union[IPv6Address,IPv4Address]
|
|
|
|
class Locality(BaseModel):
|
|
'''
|
|
Locality data
|
|
'''
|
|
city: Optional[str] = None
|
|
country: Optional[str] = None
|
|
continent: Optional[str] = None
|
|
is_eu: bool = False
|
|
|
|
class GeoLocation(BaseModel):
|
|
'''
|
|
Geolocation data model
|
|
'''
|
|
ip: Optional[Union[IPv6Address,IPv4Address]] = None
|
|
asn: Optional[int] = None
|
|
asn_org: Optional[str] = None
|
|
network: Optional[Union[IPv6Network,IPv4Network]] = None
|
|
locality: Locality = Locality()
|
|
|
|
@app.post("/")
|
|
async def root_post(ipaddresses: Annotated[list[IPAddressParam],
|
|
Body(title="The IPAddresses to geolocate")]
|
|
) -> list[GeoLocation]:
|
|
'''
|
|
Return GeoLocation item(s) for a list of IPAddressParam objects
|
|
'''
|
|
geolocations = []
|
|
with (geoip2.database.Reader(GEOLITE2_ASN_DB) as reader_asn,
|
|
geoip2.database.Reader(GEOLITE2_CITY_DB) as reader_city):
|
|
for ipaddress in ipaddresses:
|
|
try:
|
|
asn_data = reader_asn.asn(ipaddress.ip)
|
|
city_data = reader_city.city(ipaddress.ip)
|
|
|
|
geolocations.append(GeoLocation(
|
|
ip=ipaddress.ip,
|
|
asn=asn_data.autonomous_system_number,
|
|
asn_org=asn_data.autonomous_system_organization,
|
|
network=asn_data.network,
|
|
locality=Locality(
|
|
city=city_data.city.name,
|
|
country=city_data.country.iso_code,
|
|
continent=city_data.continent.code,
|
|
is_eu=city_data.country.is_in_european_union
|
|
)
|
|
))
|
|
except AddressNotFoundError:
|
|
geolocations.append(GeoLocation(
|
|
ip=ipaddress.ip
|
|
)
|
|
)
|
|
return geolocations
|
|
|
|
|
|
@app.get("/{ipaddress}")
|
|
async def root_get(ipaddress: Annotated[Union[IPv4Address,IPv6Address],
|
|
Path(title="The IPAddress to geolocate")],
|
|
response: Response
|
|
) -> GeoLocation:
|
|
'''
|
|
Look up geolocation for ip in path parameter
|
|
'''
|
|
locations = await root_post([IPAddressParam(ip=ipaddress)])
|
|
if locations:
|
|
return locations.pop()
|
|
response.status_code = status.HTTP_404_NOT_FOUND
|
|
return GeoLocation()
|