import json
import logging
from typing import Tuple, Dict
import pooch
import pycountry
from pydantic_geojson import FeatureModel, FeatureCollectionModel
from unidecode import unidecode
from .api_types import (
FeatureCollectionModel as DFeatureCollectionModel,
FeatureModel as DFeatureModel,
)
logger = logging.getLogger(__name__)
[docs]
class PFeatureModel(FeatureModel):
properties: dict
[docs]
class PFeatureCollectionModel(FeatureCollectionModel):
features: list[PFeatureModel]
data_path = "https://geodata.ucdavis.edu/gadm/gadm4.1/json/gadm41_{country_code}_{level}.json.zip"
# country_codes= {'vietnam': 'VNM', 'laos': 'LAO', 'cambodia': 'KHM', 'thailand': 'THA', 'myanmar': 'MMR', 'brazil': 'BRA', 'colombia': 'COL', 'peru': 'PER', 'ecuador': 'ECU', 'bolivia': 'BOL', 'paraguay': 'PRY'}
country_names = [
"brazil",
"mexico",
"el salvador",
"paraguay",
"peru",
"colombia",
"ecuador",
"nicaragua",
"panama",
"argentina",
"indonesia",
"philippines",
"thailand",
"vietnam",
"laos",
"malaysia",
"cambodia",
"singapore",
]
country_codes_l = [
"BRA",
"MEX",
"SLV",
"PRY",
"PER",
"COL",
"ECU",
"NIC",
"PAN",
"ARG",
"IDN",
"PHL",
"THA",
"VNM",
"LAO",
"MYS",
"KHM",
"SGP",
]
country_codes = dict(zip(country_names, country_codes_l))
[docs]
def normalize_name(name: str) -> str:
return unidecode(name.replace(" ", "").lower())
[docs]
def add_id(feature, admin_level=1, lookup_dict=None):
id = feature.properties[f"NAME_{admin_level}"]
if lookup_dict:
id = lookup_dict[normalize_name(id)]
return DFeatureModel(**feature.model_dump(), id=id)
[docs]
def get_area_polygons(country: str, regions: list[str] = None, admin_level: int = 1) -> FeatureCollectionModel:
"""
Get the polygons for the specified regions in the specified country (only ADMIN1 supported)
Returns only the regions that are found in the data
Name is put in the id field of the feature
Parameters
----------
country : str
The country name
regions : list[str], optional
Filtering which regions to get the polygons for based on region name
admin_level : int
Which administrative level to get polygons for (0 for country, 1 for admin-1, etc)
Returns
-------
FeatureCollectionModel
The polygons for the specified regions
"""
# TODO: switch feature_dict to use ID_{admin_level} instead of name (there can be duplicate names)
data = get_country_data(country, admin_level=admin_level)
feature_dict = {normalize_name(feature.properties[f"NAME_{admin_level}"]): feature for feature in data.features}
for f in feature_dict.values():
print(f.properties)
logger.info(f"Polygon data available for regions: {list(feature_dict.keys())}")
if regions:
logger.info(f"Filtering to requested regions: {[normalize_name(region) for region in regions]}")
normalized_to_original = {normalize_name(region): region for region in regions}
features = [
add_id(feature_dict[normalize_name(region)], admin_level, normalized_to_original)
for region in regions
if normalize_name(region) in feature_dict
]
else:
features = [add_id(feature, admin_level) for feature in feature_dict.values()]
return DFeatureCollectionModel(
type="FeatureCollection",
features=features,
)
[docs]
def get_country_data_file(country: str, level=1):
real_name = country.capitalize()
print(real_name)
country_code = pycountry.countries.get(name=real_name).alpha_3
# country_code = country_codes[country.lower()]
return get_data_file(country_code, level)
[docs]
def get_data_file(country_code: str, level=1):
data_url = data_path.format(country_code=country_code, level=level)
return pooch.retrieve(data_url, None)
[docs]
def get_country_data(country, admin_level) -> PFeatureCollectionModel:
zip_filaname = get_country_data_file(country, admin_level)
# read zipfile
import zipfile
with zipfile.ZipFile(zip_filaname) as z:
filaname = z.namelist()[0]
with z.open(filaname) as f:
return PFeatureCollectionModel.model_validate_json(f.read())
# return DFeatureCollectionModel.model_validate_json(open(filaname).read())
[docs]
def get_all_data():
return ((country, get_country_data(country)) for country in country_codes.keys())
[docs]
class Polygons:
def __init__(self, polygons):
self._polygons = polygons
def __eq__(self, other):
return self._polygons == other._polygons
def __len__(self):
return len(self._polygons.features)
def __iter__(self):
for feat in self._polygons.features:
yield feat
@property
def __geo_interface__(self):
return self.to_geojson()
@property
def data(self) -> FeatureCollectionModel:
return self._polygons
@property
def bbox(self):
from .geoutils import feature_bbox
boxes = (feature_bbox(feat) for feat in self)
xmins, ymins, xmaxs, ymaxs = zip(*boxes)
bbox = min(xmins), min(ymins), max(xmaxs), max(ymaxs)
return bbox
[docs]
@classmethod
def from_file(cls, filename, id_property="id"):
return cls.from_geojson(json.load(open(filename)), id_property=id_property)
[docs]
def to_file(self, filename):
json.dump(self.to_geojson(), open(filename, "w"))
[docs]
def id_to_name_tuple_dict(self) -> Dict[str, Tuple[str]]:
"""Returns a dictionary with the id as key and a tuple with the name and parent as value"""
lookup = {}
for feat in self.feature_collection().features:
if feat.properties is None:
lookup[feat.id] = (feat.id, "-")
continue
lookup[feat.id] = (feat.properties.get("name", feat.id), feat.properties.get("parent", "-"))
return lookup
[docs]
def get_parent_dict(self):
return {
feature.id: feature.properties.get("parent", "-") if feature.properties else "-"
for feature in self._polygons.features
}
[docs]
def get_predecessors_map(self, predecessors: list[str]):
predecessors = set(predecessors)
map = {}
for feature in self._polygons.features:
if feature.properties is None:
raise ValueError(f"Feature {feature.id} has no properties")
if "parentGraph" not in feature.properties:
raise ValueError(f"Feature {feature.id} has no parentGraph property")
possible_parents = [p for p in feature.properties["parentGraph"].split("/") if p in predecessors]
if len(possible_parents) == 0:
raise ValueError(f"Feature {feature.id} has no parent in predecessors")
if len(possible_parents) > 1:
raise ValueError(f"Feature {feature.id} has multiple parents in predecessors")
map[feature.id] = possible_parents[0]
return map
[docs]
def filter_locations(self, locations: list[str]):
"""
Filter the polygons to only include the specified locations
"""
filtered = [feature for feature in self._polygons.features if feature.id in locations]
return Polygons(DFeatureCollectionModel(features=filtered))
@classmethod
def _add_ids(cls, features: DFeatureCollectionModel, id_property: str):
for feature in features.features:
try:
feature.id = feature.id or unidecode(str(feature.properties[id_property]))
except AttributeError:
logging.error(f"{feature.properties[id_property]}")
raise
return features
[docs]
@classmethod
def from_geojson(cls, geojson: dict, id_property: str = "id"):
# TODO: check or skip non-Polygon features
# ...
# validate features one-by-one
errors = 0
features = []
for feat in geojson["features"]:
# skip invalid features
try:
feat = DFeatureModel.model_validate(feat)
features.append(feat)
except Exception:
errors += 1
# warn user about skipped features
if errors:
logger.warning(f"Skipped {errors} geojson features due to failed validation")
# create pydantic feature collection and add ids
features = DFeatureCollectionModel(features=features)
features = cls._add_ids(features, id_property)
# init class object
return cls(features)
[docs]
def to_geojson(self):
return self._polygons.model_dump()
[docs]
def feature_collection(self):
return self._polygons