71 lines
2.2 KiB
Python
71 lines
2.2 KiB
Python
'''
|
|
Simple Geolocation with FastAPI
|
|
'''
|
|
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
|
|
from typing import Annotated, Optional, Union
|
|
|
|
import geoip2.database
|
|
from fastapi import FastAPI, Path, Body
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI()
|
|
|
|
GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb'
|
|
GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb'
|
|
|
|
class IPAddressParam(BaseModel):
|
|
ip: Union[IPv6Address,IPv4Address]
|
|
|
|
class Locality(BaseModel):
|
|
'''
|
|
Locality data
|
|
'''
|
|
city: Optional[str]
|
|
country: Optional[str]
|
|
continent: Optional[str]
|
|
is_eu: bool
|
|
|
|
class GeoLocation(BaseModel):
|
|
'''
|
|
Geolocation data model
|
|
'''
|
|
ip: Union[IPv6Address,IPv4Address]
|
|
asn: Optional[int]
|
|
asn_org: Optional[str]
|
|
network: Union[IPv6Network,IPv4Network,None]
|
|
locality: Locality
|
|
|
|
@app.post("/")
|
|
async def root_post(ipaddresses: Annotated[list[IPAddressParam],
|
|
Body(title="The IPAddresses to geolocate")]
|
|
) -> list[GeoLocation]:
|
|
geolocations = []
|
|
with (geoip2.database.Reader(GEOLITE2_ASN_DB) as reader_asn,
|
|
geoip2.database.Reader(GEOLITE2_CITY_DB) as reader_city):
|
|
for ipaddress in ipaddresses:
|
|
asn_data = reader_asn.asn(ipaddress.ip)
|
|
city_data = reader_city.city(ipaddress.ip)
|
|
geolocations.append(GeoLocation(
|
|
ip=ipaddress.ip,
|
|
asn=asn_data.autonomous_system_number,
|
|
asn_org=asn_data.autonomous_system_organization,
|
|
network=asn_data.network,
|
|
locality=Locality(
|
|
city=city_data.city.name,
|
|
country=city_data.country.iso_code,
|
|
continent=city_data.continent.code,
|
|
is_eu=city_data.country.is_in_european_union
|
|
)
|
|
))
|
|
return geolocations
|
|
|
|
|
|
@app.get("/{ipaddress}")
|
|
async def root_get(ipaddress: Annotated[Union[IPv4Address,IPv6Address],
|
|
Path(title="The IPAddress to geolocate")]
|
|
) -> GeoLocation:
|
|
'''
|
|
Look up geolocation for ip in path parameter
|
|
'''
|
|
return (await root_post([IPAddressParam(ip=ipaddress)])).pop()
|