Files
ismijnipverweg/app/main.py

130 lines
3.9 KiB
Python
Raw Permalink Normal View History

2023-12-12 10:43:09 +01:00
'''
Simple Geolocation with FastAPI
'''
import os
2023-12-12 10:43:09 +01:00
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Annotated, Optional, Union
import geoip2.database
from dotenv import load_dotenv
from fastapi import Body, FastAPI, Path, Request, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from geoip2.errors import AddressNotFoundError
2023-12-12 10:43:09 +01:00
from pydantic import BaseModel
# Load environment variables
load_dotenv()
2023-12-12 10:43:09 +01:00
app = FastAPI()
# Configure CORS from environment variables
cors_origins = os.getenv('CORS_ALLOW_ORIGINS', 'http://localhost')
allow_origins = [origin.strip() for origin in cors_origins.split(',')
if origin.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
GEOLITE2_ASN_DB = os.getenv('GEOLITE2_ASN_DB',
'/usr/local/share/GeoIP/GeoLite2-ASN.mmdb')
GEOLITE2_CITY_DB = os.getenv('GEOLITE2_CITY_DB',
'/usr/local/share/GeoIP/GeoLite2-City.mmdb')
2023-12-12 10:43:09 +01:00
2024-05-07 17:12:23 +02:00
class IPAddressParam(BaseModel):
2023-12-12 14:49:53 +01:00
'''
Payload entry as used in POST
'''
2024-05-07 17:12:23 +02:00
ip: Union[IPv6Address, IPv4Address]
2023-12-12 10:43:09 +01:00
class Locality(BaseModel):
'''
Locality data
'''
city: Optional[str] = None
country: Optional[str] = None
continent: Optional[str] = None
is_eu: bool = False
2023-12-12 10:43:09 +01:00
2024-05-07 17:12:23 +02:00
2023-12-12 10:43:09 +01:00
class GeoLocation(BaseModel):
'''
Geolocation data model
'''
2024-05-07 17:12:23 +02:00
ip: Optional[Union[IPv6Address, IPv4Address]] = None
asn: Optional[int] = None
asn_org: Optional[str] = None
2024-05-07 17:12:23 +02:00
network: Optional[Union[IPv6Network, IPv4Network]] = None
locality: Locality = Locality()
2023-12-12 10:43:09 +01:00
2024-05-07 17:12:23 +02:00
@app.post("/")
2024-05-07 17:12:23 +02:00
async def root_post(ipaddresses: Annotated[
list[IPAddressParam],
Body(title="The IPAddresses to geolocate")],
2024-05-06 17:24:27 +02:00
response: Response
2024-05-07 17:12:23 +02:00
) -> list[GeoLocation]:
2023-12-12 14:49:53 +01:00
'''
Return GeoLocation item(s) for a list of IPAddressParam objects
'''
geolocations = []
with (geoip2.database.Reader(GEOLITE2_ASN_DB) as reader_asn,
geoip2.database.Reader(GEOLITE2_CITY_DB) as reader_city):
for ipaddress in ipaddresses:
try:
asn_data = reader_asn.asn(ipaddress.ip)
city_data = reader_city.city(ipaddress.ip)
geolocations.append(GeoLocation(
ip=ipaddress.ip,
asn=asn_data.autonomous_system_number,
asn_org=asn_data.autonomous_system_organization,
network=asn_data.network,
locality=Locality(
city=city_data.city.name,
country=city_data.country.iso_code,
continent=city_data.continent.code,
is_eu=city_data.country.is_in_european_union
)
))
except AddressNotFoundError:
geolocations.append(GeoLocation(
ip=ipaddress.ip
)
)
2024-05-06 17:24:27 +02:00
if geolocations:
response.headers['Cache-Control'] = 'private, max-age=604800'
return geolocations
2023-12-12 10:43:09 +01:00
@app.get("/{ipaddress}")
2024-05-07 17:12:23 +02:00
async def root_get(ipaddress: Annotated[
Union[IPv4Address, IPv6Address],
Path(title="The IPAddress to geolocate")],
response: Response
2024-05-07 17:12:23 +02:00
) -> GeoLocation:
2023-12-12 10:43:09 +01:00
'''
Look up geolocation for ip in path parameter
'''
2024-05-07 16:50:41 +02:00
locations = await root_post([IPAddressParam(ip=ipaddress)], response)
if locations:
2024-05-06 17:24:27 +02:00
response.headers['Cache-Control'] = 'private, max-age=604800'
return locations.pop()
response.status_code = status.HTTP_404_NOT_FOUND
return GeoLocation()
2024-05-07 17:12:23 +02:00
@app.get("/")
def root_redirect(req: Request) -> RedirectResponse:
'''
Redirect empty request using REMOTE_ADDR
'''
2024-05-07 17:12:23 +02:00
return RedirectResponse(url=str(req.url) + str(req.client.host))