Compare commits

..

1 Commits

Author SHA1 Message Date
b100d24273 This decorator messes up pydantics BaseModel 2023-12-12 10:50:16 +01:00
10 changed files with 32 additions and 221 deletions

View File

@ -1,17 +0,0 @@
---
name: Bandit
on: [push]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Analyse code with Bandit
run: |
bandit -r .

View File

@ -1,17 +0,0 @@
---
name: Flake8
on: [push]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Analyse code with Flake8
run: |
flake8 $(git ls-files '*.py')

View File

@ -1,17 +0,0 @@
---
name: Mypy
on: [push]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Analyse code with Mypy
run: |
mypy --install-types --non-interactive $(git ls-files '*.py')

View File

@ -1,23 +0,0 @@
---
name: pip-audit
on:
push:
branches: [main]
pull_request:
branches: [main]
schedule:
- cron: '0 0 * * 0' # Weekly on Sunday
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Check vulnerable components with pip-audit
run: |
pip-audit .

View File

@ -1,17 +0,0 @@
---
name: Pylint
on: [push]
# XXX need to do stuff with uv
jobs:
build:
runs-on: freebsd
strategy:
matrix:
python-version: ["3.11"]
steps:
- uses: actions/checkout@v4
- name: Analyse code with Pylint
run: |
pylint $(git ls-files '*.py')

2
.gitignore vendored
View File

@ -160,5 +160,3 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Vim swap files
*.sw?

View File

@ -1,89 +1,52 @@
'''
Simple Geolocation with FastAPI
'''
import os
import dataclasses
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import Annotated, Optional, Union
import geoip2.database
from dotenv import load_dotenv
from fastapi import Body, FastAPI, Path, Request, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from geoip2.errors import AddressNotFoundError
from fastapi import FastAPI, Path
from pydantic import BaseModel
# Load environment variables
load_dotenv()
app = FastAPI()
# Configure CORS from environment variables
cors_origins = os.getenv('CORS_ALLOW_ORIGINS', 'http://localhost')
allow_origins = [origin.strip() for origin in cors_origins.split(',')
if origin.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
)
GEOLITE2_ASN_DB = os.getenv('GEOLITE2_ASN_DB',
'/usr/local/share/GeoIP/GeoLite2-ASN.mmdb')
GEOLITE2_CITY_DB = os.getenv('GEOLITE2_CITY_DB',
'/usr/local/share/GeoIP/GeoLite2-City.mmdb')
class IPAddressParam(BaseModel):
'''
Payload entry as used in POST
'''
ip: Union[IPv6Address, IPv4Address]
GEOLITE2_ASN_DB = '/usr/local/share/GeoIP/GeoLite2-ASN.mmdb'
GEOLITE2_CITY_DB = '/usr/local/share/GeoIP/GeoLite2-City.mmdb'
class Locality(BaseModel):
'''
Locality data
'''
city: Optional[str] = None
country: Optional[str] = None
continent: Optional[str] = None
is_eu: bool = False
city: Optional[str]
country: Optional[str]
continent: Optional[str]
is_eu: bool
class GeoLocation(BaseModel):
'''
Geolocation data model
'''
ip: Optional[Union[IPv6Address, IPv4Address]] = None
asn: Optional[int] = None
asn_org: Optional[str] = None
network: Optional[Union[IPv6Network, IPv4Network]] = None
locality: Locality = Locality()
ip: Union[IPv6Address,IPv4Address]
asn: Optional[int]
asn_org: Optional[str]
network: Union[IPv6Network,IPv4Network,None]
locality: Locality
@app.post("/")
async def root_post(ipaddresses: Annotated[
list[IPAddressParam],
Body(title="The IPAddresses to geolocate")],
response: Response
) -> list[GeoLocation]:
@app.get("/{ipaddress}")
async def root(ipaddress: Annotated[Union[IPv4Address,IPv6Address],
Path(title="The IPAddress to geolocate")]
) -> GeoLocation:
'''
Return GeoLocation item(s) for a list of IPAddressParam objects
Look up geolocation for ip in path parameter
'''
geolocations = []
with (geoip2.database.Reader(GEOLITE2_ASN_DB) as reader_asn,
geoip2.database.Reader(GEOLITE2_CITY_DB) as reader_city):
for ipaddress in ipaddresses:
try:
asn_data = reader_asn.asn(ipaddress.ip)
city_data = reader_city.city(ipaddress.ip)
asn_data = reader_asn.asn(ipaddress)
city_data = reader_city.city(ipaddress)
geolocations.append(GeoLocation(
ip=ipaddress.ip,
return GeoLocation(
ip=ipaddress,
asn=asn_data.autonomous_system_number,
asn_org=asn_data.autonomous_system_organization,
network=asn_data.network,
@ -93,37 +56,4 @@ async def root_post(ipaddresses: Annotated[
continent=city_data.continent.code,
is_eu=city_data.country.is_in_european_union
)
))
except AddressNotFoundError:
geolocations.append(GeoLocation(
ip=ipaddress.ip
)
)
if geolocations:
response.headers['Cache-Control'] = 'private, max-age=604800'
return geolocations
@app.get("/{ipaddress}")
async def root_get(ipaddress: Annotated[
Union[IPv4Address, IPv6Address],
Path(title="The IPAddress to geolocate")],
response: Response
) -> GeoLocation:
'''
Look up geolocation for ip in path parameter
'''
locations = await root_post([IPAddressParam(ip=ipaddress)], response)
if locations:
response.headers['Cache-Control'] = 'private, max-age=604800'
return locations.pop()
response.status_code = status.HTTP_404_NOT_FOUND
return GeoLocation()
@app.get("/")
def root_redirect(req: Request) -> RedirectResponse:
'''
Redirect empty request using REMOTE_ADDR
'''
return RedirectResponse(url=str(req.url) + str(req.client.host))

View File

@ -1,2 +0,0 @@
geoip2==4.7.0
fastapi==0.115.6

View File

View File

@ -1,24 +0,0 @@
#!/bin/sh
NAME=ismijnipverweg
DIR=/opt/apps/ismijnipverweg/app
USER=www
GROUP=nobody
WORKERS=3
WORKER_CLASS=uvicorn.workers.UvicornWorker
#VENV=$DIR/.venv/bin/activate
BIND=unix:$DIR/../run/gunicorn.sock
LOG_LEVEL=error
cd $DIR
# source $VENV
exec /usr/local/bin/gunicorn main:app \
--name $NAME \
--workers $WORKERS \
--worker-class $WORKER_CLASS \
--user=$USER \
--group=$GROUP \
--bind=$BIND \
--log-level=$LOG_LEVEL \
--log-file=-