Source code for geo_sampling.data.gadm
"""GADM (Global Administrative Areas) data provider."""
import os
import zipfile
from typing import List, Tuple, Optional
from urllib.parse import urljoin
import requests
import shapefile
from shapely.geometry import Polygon
from shapely.ops import unary_union
from .._types import BoundingBox
[docs]
class GADMProvider:
"""Provider for GADM administrative boundary data."""
GADM_BASE_URL = "https://geodata.ucdavis.edu/gadm/gadm4.1/shp/"
GADM_URL_FORMAT = "gadm41_{0}_shp.zip"
def __init__(self, data_dir: str = "data"):
"""Initialize GADM provider.
Args:
data_dir: Directory to store downloaded data
"""
self.data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
[docs]
def get_country_list(self) -> List[str]:
"""Get list of available countries from GADM.
Returns:
List of country names
"""
try:
response = requests.get(
"https://gadm.org/download_country.html", timeout=30
)
response.raise_for_status()
# Extract country codes from HTML
# This is a simplified version - in practice you'd parse the HTML properly
countries = []
for line in response.text.split("\n"):
if "option value=" in line and "selected" not in line:
# Extract country name from HTML option
start = line.find(">") + 1
end = line.find("<", start)
if start > 0 and end > start:
country = line[start:end].strip()
if country and country != "Select country":
countries.append(country)
return sorted(countries)
except Exception as e:
print(f"Warning: Could not fetch country list: {e}")
return []
[docs]
def download_country_data(self, country_code: str) -> str:
"""Download GADM shapefile data for a country.
Args:
country_code: Three-letter country code (e.g., 'IND')
Returns:
Path to the downloaded and extracted directory
"""
filename = self.GADM_URL_FORMAT.format(country_code)
url = urljoin(self.GADM_BASE_URL, filename)
local_zip_path = os.path.join(self.data_dir, filename)
extract_dir = os.path.join(self.data_dir, country_code)
# Download if not already present
if not os.path.exists(local_zip_path):
print(f"Downloading {url}...")
self._download_file(url, local_zip_path)
# Extract if not already done
if not os.path.exists(extract_dir):
print(f"Extracting {filename}...")
with zipfile.ZipFile(local_zip_path, "r") as zip_ref:
zip_ref.extractall(extract_dir)
return extract_dir
[docs]
def load_boundaries(
self, country_code: str, admin_level: int, region_name: Optional[str] = None
) -> Tuple[List[str], Polygon, BoundingBox]:
"""Load administrative boundaries for a country/region.
Args:
country_code: Three-letter country code
admin_level: Administrative level (1-4)
region_name: Specific region name to filter by
Returns:
Tuple of (region_names, combined_polygon, bounding_box)
"""
extract_dir = self.download_country_data(country_code)
shapefile_path = os.path.join(
extract_dir, f"gadm41_{country_code}_{admin_level}"
)
if not os.path.exists(f"{shapefile_path}.shp"):
raise FileNotFoundError(f"Shapefile not found: {shapefile_path}.shp")
# Load shapefile
sf = shapefile.Reader(shapefile_path)
# Get field names to find the appropriate name field
field_names = [field[0] for field in sf.fields[1:]] # Skip DeletionFlag
name_field = f"NAME_{admin_level}"
if name_field not in field_names:
# Fallback to other possible name fields
for field in ["NAME_EN", "NAME", "NAME_1", "NAME_2"]:
if field in field_names:
name_field = field
break
region_names = []
polygons = []
# Process each shape record
for record in sf.iterShapeRecords():
shape = record.shape
rec = record.record
# Get region name
try:
name_index = field_names.index(name_field)
current_name = rec[name_index]
except (ValueError, IndexError):
current_name = "Unknown"
# Filter by region name if specified
if region_name and region_name.lower() not in current_name.lower():
continue
region_names.append(current_name)
# Convert shape to Shapely polygon
if shape.shapeType == 5: # Polygon
polygons.append(Polygon(shape.points))
elif shape.shapeType == 15: # PolygonZ
polygons.append(Polygon([(x, y) for x, y, z in shape.points]))
if not polygons:
if region_name:
raise ValueError(
f"Region '{region_name}' not found in {country_code} level {admin_level}"
)
else:
raise ValueError(
f"No boundaries found for {country_code} level {admin_level}"
)
# Combine all polygons
combined_polygon = unary_union(polygons)
# Calculate bounding box
bounds = combined_polygon.bounds
bbox = BoundingBox(
min_long=bounds[0], min_lat=bounds[1], max_long=bounds[2], max_lat=bounds[3]
)
return region_names, combined_polygon, bbox
def _download_file(self, url: str, local_path: str) -> None:
"""Download a file from URL to local path."""
response = requests.get(url, timeout=30, stream=True)
response.raise_for_status()
with open(local_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)