Source code for geo_sampling.data.osm
"""OpenStreetMap data provider via BBBike.org."""
import os
import time
import zipfile
from typing import List, Optional
from urllib.parse import urlencode
import requests
from bs4 import BeautifulSoup
import shapefile
from shapely.geometry import Polygon, LineString
import pyproj
import utm
from .._types import BoundingBox, RoadSegment
[docs]
class OSMProvider:
"""Provider for OpenStreetMap data via BBBike.org extract service."""
BBBIKE_BASE_URL = "http://extract.bbbike.org"
MAX_EXTRACT_POINTS = 300
MAX_WAIT_TIME = 50 # seconds
def __init__(self, data_dir: str = "data"):
"""Initialize OSM provider.
Args:
data_dir: Directory to store downloaded data
"""
self.data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
[docs]
def generate_extract_url(
self, region_polygon: Polygon, region_name: str, bbox: BoundingBox
) -> str:
"""Generate BBBike extract URL for a polygon region.
Args:
region_polygon: Shapely polygon defining the region
region_name: Name for the extract
bbox: Bounding box coordinates
Returns:
BBBike extract URL
"""
# Simplify polygon to reduce points if needed
simplified_polygon = self._simplify_polygon_for_bbbike(region_polygon)
# Convert polygon to coordinate string format
coords = []
for x, y in simplified_polygon.exterior.coords[
:-1
]: # Skip last duplicate point
coords.append(f"{x:.3f},{y:.3f}")
# Build URL parameters
params = {
"sw_lng": bbox.min_long,
"sw_lat": bbox.min_lat,
"ne_lng": bbox.max_long,
"ne_lat": bbox.max_lat,
"coords": "|".join(coords),
"format": "shp.zip",
"city": region_name.lower().replace(" ", "_"),
}
return f"{self.BBBIKE_BASE_URL}/?{urlencode(params)}"
[docs]
def submit_extract_request(self, extract_url: str) -> str:
"""Submit extract request to BBBike and get job ID.
Args:
extract_url: BBBike extract URL
Returns:
Extract job ID for checking status
"""
print("Submitting extract request...")
response = requests.get(extract_url, timeout=30)
response.raise_for_status()
# Parse response to get extract ID
soup = BeautifulSoup(response.text, "html.parser")
# Look for extract ID in various possible locations
extract_id = None
# Check for redirect or confirmation page
for link in soup.find_all("a", href=True):
href = link["href"]
if "extract" in href and "id=" in href:
# Extract ID from URL parameter
extract_id = href.split("id=")[1].split("&")[0]
break
if not extract_id:
# Look for extract ID in text
text = response.text
if "extract/" in text:
for line in text.split("\n"):
if "extract/" in line:
# Try to extract ID from line
parts = line.split("extract/")
if len(parts) > 1:
extract_id = parts[1].split("/")[0].split('"')[0]
break
if not extract_id:
raise RuntimeError("Could not determine extract ID from BBBike response")
print(f"Extract job submitted with ID: {extract_id}")
return extract_id
[docs]
def check_extract_status(self, extract_id: str) -> Optional[str]:
"""Check status of BBBike extract job.
Args:
extract_id: Job ID from submit_extract_request
Returns:
Download URL if ready, None if still processing
"""
status_url = f"http://download.bbbike.org/osm/extract/{extract_id}/"
try:
response = requests.get(status_url, timeout=10)
response.raise_for_status()
# Look for download links
soup = BeautifulSoup(response.text, "html.parser")
for link in soup.find_all("a", href=True):
href = link["href"]
if href.endswith(".zip") and "shp" in href:
# Found download link
download_url = (
f"http://download.bbbike.org{href}"
if href.startswith("/")
else href
)
return download_url
return None # Still processing
except Exception as e:
print(f"Warning: Could not check extract status: {e}")
return None
[docs]
def download_osm_data(
self, region_polygon: Polygon, region_name: str, bbox: BoundingBox
) -> str:
"""Download OSM data for a region and return path to road shapefile.
Args:
region_polygon: Shapely polygon defining region
region_name: Name for the extract
bbox: Bounding box coordinates
Returns:
Path to the roads shapefile (without .shp extension)
"""
# Generate extract URL
extract_url = self.generate_extract_url(region_polygon, region_name, bbox)
print(f"Extract URL: {extract_url}")
# Submit extract request
extract_id = self.submit_extract_request(extract_url)
# Wait for extract to complete
print("Waiting for extract to complete...")
download_url = None
for attempt in range(self.MAX_WAIT_TIME):
download_url = self.check_extract_status(extract_id)
if download_url:
break
time.sleep(1)
if not download_url:
raise TimeoutError(
f"Extract did not complete within {self.MAX_WAIT_TIME} seconds"
)
# Download and extract data
print(f"Downloading from: {download_url}")
zip_filename = f"{region_name.replace(' ', '_')}_osm.zip"
zip_path = os.path.join(self.data_dir, zip_filename)
self._download_file(download_url, zip_path)
# Extract zip file
extract_dir = os.path.join(
self.data_dir, f"{region_name.replace(' ', '_')}_osm"
)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_dir)
# Find roads shapefile
roads_shapefile = os.path.join(extract_dir, "roads")
if not os.path.exists(f"{roads_shapefile}.shp"):
raise FileNotFoundError("Roads shapefile not found in extracted data")
return roads_shapefile
[docs]
def extract_road_segments(
self,
shapefile_path: str,
road_types: Optional[List[str]] = None,
segment_length: int = 500,
) -> List[RoadSegment]:
"""Extract road segments from OSM shapefile.
Args:
shapefile_path: Path to roads shapefile (without .shp)
road_types: List of road types to include (None = all)
segment_length: Target segment length in meters
Returns:
List of RoadSegment objects
"""
sf = shapefile.Reader(shapefile_path)
# Get field names
field_names = [field[0] for field in sf.fields[1:]]
segments = []
segment_id = 1
for record in sf.iterShapeRecords():
shape = record.shape
rec = record.record
# Extract road metadata
osm_id = str(
rec[field_names.index("osm_id")] if "osm_id" in field_names else ""
)
osm_name = str(
rec[field_names.index("name")] if "name" in field_names else ""
)
osm_type = str(
rec[field_names.index("type")] if "type" in field_names else "unknown"
)
# Filter by road type if specified
if road_types and osm_type not in road_types:
continue
# Skip non-linestring shapes
if shape.shapeType not in [3, 13]: # Polyline, PolylineZ
continue
# Convert to LineString and split into segments
if shape.shapeType == 3: # Polyline
line = LineString(shape.points)
else: # PolylineZ
line = LineString([(x, y) for x, y, z in shape.points])
# Split line into segments of specified length
split_segments = self._split_linestring(line, segment_length)
for segment_line in split_segments:
coords = list(segment_line.coords)
start_point = coords[0]
end_point = coords[-1]
segments.append(
RoadSegment(
segment_id=segment_id,
osm_id=osm_id,
osm_name=osm_name,
osm_type=osm_type,
start_lat=start_point[1],
start_long=start_point[0],
end_lat=end_point[1],
end_long=end_point[0],
)
)
segment_id += 1
return segments
def _simplify_polygon_for_bbbike(self, polygon: Polygon) -> Polygon:
"""Simplify polygon to meet BBBike point limit."""
if len(polygon.exterior.coords) <= self.MAX_EXTRACT_POINTS:
return polygon
# Progressively increase tolerance until we're under the limit
tolerance = 0.001
simplified = polygon
while (
len(simplified.exterior.coords) > self.MAX_EXTRACT_POINTS
and tolerance < 0.1
):
simplified = polygon.simplify(tolerance)
tolerance *= 1.5
return simplified
def _split_linestring(
self, line: LineString, segment_length: int
) -> List[LineString]:
"""Split a LineString into segments of specified length."""
if line.length == 0:
return []
# Convert to UTM for accurate distance calculations
# Use the centroid to determine UTM zone
centroid = line.centroid
utm_zone, hemisphere = utm.from_latlon(centroid.y, centroid.x)[:2]
# Create transformer to UTM
utm_crs = f"+proj=utm +zone={utm_zone} +{'south' if hemisphere < 0 else 'north'} +datum=WGS84"
transformer = pyproj.Transformer.from_crs("EPSG:4326", utm_crs, always_xy=True)
# Transform to UTM
utm_coords = [transformer.transform(x, y) for x, y in line.coords]
utm_line = LineString(utm_coords)
segments = []
current_distance = 0
total_length = utm_line.length
while current_distance < total_length:
# Get start point
start_point = utm_line.interpolate(current_distance)
# Get end point (or line end if remaining distance < segment_length)
end_distance = min(current_distance + segment_length, total_length)
end_point = utm_line.interpolate(end_distance)
# Create segment
segment = LineString([start_point.coords[0], end_point.coords[0]])
# Transform back to WGS84
wgs84_coords = []
for x, y in segment.coords:
lat, lon = utm.to_latlon(x, y, utm_zone, hemisphere)
wgs84_coords.append((lon, lat))
segments.append(LineString(wgs84_coords))
current_distance = end_distance
return segments
def _download_file(self, url: str, local_path: str) -> None:
"""Download a file from URL to local path."""
response = requests.get(url, timeout=60, stream=True)
response.raise_for_status()
with open(local_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)