Compare commits

...

12 Commits

7 changed files with 146 additions and 31 deletions

View File

@ -0,0 +1,17 @@
---
name: Flake8
on: [push]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Analyse code with Flake8
run: |
flake8 $(git ls-files '*.py')

17
.gitea/workflows/mypy.yml Normal file
View File

@ -0,0 +1,17 @@
---
name: Mypy
on: [push]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Analyse code with Mypy
run: |
mypy --install-types --non-interactive $(git ls-files '*.py')

View File

@ -0,0 +1,17 @@
---
name: Pylint
on: [push]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Analyse code with Pylint
run: |
pylint $(git ls-files '*.py')

View File

@ -5,7 +5,9 @@ from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Annotated, Optional, Union from typing import Annotated, Optional, Union
import geoip2.database import geoip2.database
from fastapi import FastAPI, Path, Body from geoip2.errors import AddressNotFoundError
from fastapi import FastAPI, Path, Body, Request, Response, status
from fastapi.responses import RedirectResponse
from pydantic import BaseModel from pydantic import BaseModel
app = FastAPI() app = FastAPI()
@ -13,38 +15,52 @@ app = FastAPI()
GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb' GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb'
GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb' GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb'
class IPAddressParam(BaseModel): class IPAddressParam(BaseModel):
ip: Union[IPv6Address,IPv4Address] '''
Payload entry as used in POST
'''
ip: Union[IPv6Address, IPv4Address]
class Locality(BaseModel): class Locality(BaseModel):
''' '''
Locality data Locality data
''' '''
city: Optional[str] city: Optional[str] = None
country: Optional[str] country: Optional[str] = None
continent: Optional[str] continent: Optional[str] = None
is_eu: bool is_eu: bool = False
class GeoLocation(BaseModel): class GeoLocation(BaseModel):
''' '''
Geolocation data model Geolocation data model
''' '''
ip: Union[IPv6Address,IPv4Address] ip: Optional[Union[IPv6Address, IPv4Address]] = None
asn: Optional[int] asn: Optional[int] = None
asn_org: Optional[str] asn_org: Optional[str] = None
network: Union[IPv6Network,IPv4Network,None] network: Optional[Union[IPv6Network, IPv4Network]] = None
locality: Locality locality: Locality = Locality()
@app.post("/") @app.post("/")
async def root_post(ipaddresses: Annotated[list[IPAddressParam], async def root_post(ipaddresses: Annotated[
Body(title="The IPAddresses to geolocate")] list[IPAddressParam],
Body(title="The IPAddresses to geolocate")],
response: Response
) -> list[GeoLocation]: ) -> list[GeoLocation]:
'''
Return GeoLocation item(s) for a list of IPAddressParam objects
'''
geolocations = [] geolocations = []
with (geoip2.database.Reader(GEOLITE2_ASN_DB) as reader_asn, with (geoip2.database.Reader(GEOLITE2_ASN_DB) as reader_asn,
geoip2.database.Reader(GEOLITE2_CITY_DB) as reader_city): geoip2.database.Reader(GEOLITE2_CITY_DB) as reader_city):
for ipaddress in ipaddresses: for ipaddress in ipaddresses:
try:
asn_data = reader_asn.asn(ipaddress.ip) asn_data = reader_asn.asn(ipaddress.ip)
city_data = reader_city.city(ipaddress.ip) city_data = reader_city.city(ipaddress.ip)
geolocations.append(GeoLocation( geolocations.append(GeoLocation(
ip=ipaddress.ip, ip=ipaddress.ip,
asn=asn_data.autonomous_system_number, asn=asn_data.autonomous_system_number,
@ -57,14 +73,36 @@ async def root_post(ipaddresses: Annotated[list[IPAddressParam],
is_eu=city_data.country.is_in_european_union is_eu=city_data.country.is_in_european_union
) )
)) ))
except AddressNotFoundError:
geolocations.append(GeoLocation(
ip=ipaddress.ip
)
)
if geolocations:
response.headers['Cache-Control'] = 'private, max-age=604800'
return geolocations return geolocations
@app.get("/{ipaddress}") @app.get("/{ipaddress}")
async def root_get(ipaddress: Annotated[Union[IPv4Address,IPv6Address], async def root_get(ipaddress: Annotated[
Path(title="The IPAddress to geolocate")] Union[IPv4Address, IPv6Address],
Path(title="The IPAddress to geolocate")],
response: Response
) -> GeoLocation: ) -> GeoLocation:
''' '''
Look up geolocation for ip in path parameter Look up geolocation for ip in path parameter
''' '''
return (await root_post([IPAddressParam(ip=ipaddress)])).pop() locations = await root_post([IPAddressParam(ip=ipaddress)], response)
if locations:
response.headers['Cache-Control'] = 'private, max-age=604800'
return locations.pop()
response.status_code = status.HTTP_404_NOT_FOUND
return GeoLocation()
@app.get("/")
def root_redirect(req: Request) -> RedirectResponse:
'''
Redirect empty request using REMOTE_ADDR
'''
return RedirectResponse(url=str(req.url) + str(req.client.host))

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
geoip2==4.7.0
fastapi==0.115.6

0
run/.gitkeep Normal file
View File

24
start.sh Executable file
View File

@ -0,0 +1,24 @@
#!/bin/sh
NAME=ismijnipverweg
DIR=/opt/apps/ismijnipverweg/app
USER=www
GROUP=nobody
WORKERS=3
WORKER_CLASS=uvicorn.workers.UvicornWorker
#VENV=$DIR/.venv/bin/activate
BIND=unix:$DIR/../run/gunicorn.sock
LOG_LEVEL=error
cd $DIR
# source $VENV
exec /usr/local/bin/gunicorn main:app \
--name $NAME \
--workers $WORKERS \
--worker-class $WORKER_CLASS \
--user=$USER \
--group=$GROUP \
--bind=$BIND \
--log-level=$LOG_LEVEL \
--log-file=-