Массовый экспорт карт: автоматизация через API
Экспортировать одну карту вручную — просто. Но что если нужно выгрузить 50 городов в GeoJSON, 200 кварталов в DXF или ежедневно обновлять данные по 10 регионам? Для этого существует REST API с поддержкой массового экспорта.
Обзор API
API osm2cdr.ru построен на REST-принципах. Основные эндпоинты:
| Эндпоинт | Метод | Описание |
|---|---|---|
/api/export |
POST | Создать задачу экспорта |
/api/export/{task_id} |
GET | Статус задачи |
/api/export/{task_id}/download |
GET | Скачать результат |
/api/batch-export |
POST | Массовый экспорт (до 50 задач) |
/api/formats |
GET | Список доступных форматов |
Для доступа к API нужен API-ключ, который можно получить в личном кабинете.
Одиночный экспорт: curl
Начнём с простого запроса на экспорт одной области:
curl -X POST https://osm2cdr.ru/api/export \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"format": "geojson",
"bbox": [37.59, 55.73, 37.65, 55.76],
"style": "osm",
"detail_level": 4
}'
Ответ:
{
"task_id": "abc123-def456",
"status": "pending",
"format": "geojson",
"estimated_time": 15
}
Проверка статуса и скачивание:
# Проверить статус
curl -H "Authorization: Bearer YOUR_API_KEY" \
https://osm2cdr.ru/api/export/abc123-def456
# Скачать результат (когда status = "completed")
curl -H "Authorization: Bearer YOUR_API_KEY" \
-o moscow_center.geojson \
https://osm2cdr.ru/api/export/abc123-def456/download
Массовый экспорт: batch endpoint
Batch endpoint принимает массив задач и возвращает массив task_id. Это эффективнее, чем отправлять запросы по одному — сервер оптимизирует выполнение и может переиспользовать кэш PostGIS.
curl -X POST https://osm2cdr.ru/api/batch-export \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"tasks": [
{"format": "geojson", "bbox": [37.59, 55.73, 37.65, 55.76], "name": "Moscow Center"},
{"format": "geojson", "bbox": [30.28, 59.92, 30.36, 59.96], "name": "SPB Center"},
{"format": "dxf", "bbox": [37.59, 55.73, 37.65, 55.76], "name": "Moscow DXF"}
]
}'
Лимиты batch:
| Тир | Задач в batch | Задач в сутки | Параллельно |
|---|---|---|---|
| Free | 5 | 10 | 1 |
| Basic | 20 | 100 | 3 |
| Pro | 50 | 500 | 10 |
| Enterprise | 200 | Без лимита | 50 |
Автоматизация на Python
Экспорт списка городов
import requests
import time
from pathlib import Path
API_URL = "https://osm2cdr.ru/api"
API_KEY = "YOUR_API_KEY"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
# Список городов с bbox
cities = {
"moscow": [37.32, 55.57, 37.95, 55.92],
"saint_petersburg": [30.08, 59.83, 30.55, 60.09],
"berlin": [13.08, 52.34, 13.76, 52.68],
"paris": [2.22, 48.81, 2.47, 48.90],
"london": [-0.35, 51.38, 0.15, 51.60],
}
def export_city(name: str, bbox: list, fmt: str = "geojson") -> str:
"""Создать задачу экспорта и дождаться результата."""
# Создать задачу
resp = requests.post(f"{API_URL}/export", headers=HEADERS, json={
"format": fmt,
"bbox": bbox,
"style": "osm",
"detail_level": 4
})
resp.raise_for_status()
task_id = resp.json()["task_id"]
print(f"[{name}] Task created: {task_id}")
# Ожидание завершения
for _ in range(120): # Максимум 10 минут
status = requests.get(f"{API_URL}/export/{task_id}", headers=HEADERS).json()
if status["status"] == "completed":
break
if status["status"] == "failed":
raise RuntimeError(f"Export failed: {status.get('error')}")
time.sleep(5)
# Скачивание
output = Path(f"output/{name}.{fmt}")
output.parent.mkdir(exist_ok=True)
dl = requests.get(f"{API_URL}/export/{task_id}/download", headers=HEADERS)
output.write_bytes(dl.content)
print(f"[{name}] Saved to {output} ({len(dl.content)} bytes)")
return str(output)
# Экспорт всех городов
for city, bbox in cities.items():
export_city(city, bbox, "geojson")
Batch-экспорт с параллельным скачиванием
from concurrent.futures import ThreadPoolExecutor
def batch_export(cities: dict, fmt: str = "shp"):
"""Массовый экспорт через batch endpoint."""
tasks = [
{"format": fmt, "bbox": bbox, "name": name}
for name, bbox in cities.items()
]
# Отправить batch
resp = requests.post(f"{API_URL}/batch-export", headers=HEADERS, json={"tasks": tasks})
resp.raise_for_status()
task_ids = resp.json()["task_ids"]
print(f"Batch created: {len(task_ids)} tasks")
# Ожидание и скачивание параллельно
def wait_and_download(task_id, name):
for _ in range(120):
st = requests.get(f"{API_URL}/export/{task_id}", headers=HEADERS).json()
if st["status"] == "completed":
break
time.sleep(5)
dl = requests.get(f"{API_URL}/export/{task_id}/download", headers=HEADERS)
path = Path(f"output/{name}.{fmt}")
path.parent.mkdir(exist_ok=True)
path.write_bytes(dl.content)
return path
with ThreadPoolExecutor(max_workers=5) as pool:
futures = [
pool.submit(wait_and_download, tid, name)
for tid, name in zip(task_ids, cities.keys())
]
results = [f.result() for f in futures]
print(f"Downloaded {len(results)} files")
return results
batch_export(cities, "shp")
Параллельный поллинг множества задач
Для долгих задач (большие области, сложные форматы) и для батчей удобно отслеживать прогресс через параллельный поллинг REST-эндпоинта /api/status/{task_id}. WebSocket-эндпоинт /ws/task/{task_id} существует как заглушка (немедленно закрывается с кодом 1001) и зарезервирован для будущей реализации через Celery -> Redis Pub/Sub — не полагайтесь на него в production-скриптах.
Базовый шаблон поллинга одной задачи:
import requests
import time
def poll_until_done(task_id: str, interval: int = 5, timeout: int = 600) -> dict:
"""Поллинг GET /api/status/{task_id} до completed/failed."""
deadline = time.time() + timeout
while time.time() < deadline:
st = requests.get(f"{API_URL}/status/{task_id}", headers=HEADERS).json()
progress = st.get("progress", 0)
step = st.get("step", "")
print(f"[{task_id[:8]}] {st['status']} {progress}% — {step}")
if st["status"] == "completed":
return st
if st["status"] == "failed":
raise RuntimeError(f"Export failed: {st.get('error')}")
time.sleep(interval)
raise TimeoutError(f"Task {task_id} did not finish in {timeout}s")
Параллельный мониторинг батча через ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor, as_completed
def poll_batch(task_ids: list[str], max_workers: int = 5) -> dict:
"""Параллельный поллинг списка task_id; возвращает {task_id: final_status}."""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(poll_until_done, tid): tid for tid in task_ids}
for fut in as_completed(futures):
tid = futures[fut]
try:
results[tid] = fut.result()
except Exception as e:
results[tid] = {"status": "failed", "error": str(e)}
return results
# Использование: после batch-export получаем task_ids и ждём всех параллельно
# results = poll_batch(task_ids, max_workers=5)
При HTTP 429 (rate limit) увеличьте interval или уменьшите max_workers — sliding window rate limiter учитывает все запросы по API-ключу, включая /api/status/.
Rate Limiting
API использует sliding window rate limiting. При превышении лимита возвращается HTTP 429 с заголовком Retry-After:
resp = requests.post(f"{API_URL}/export", headers=HEADERS, json=payload)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 60))
print(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)
Форматы для автоматизации
Не все форматы одинаково подходят для batch-сценариев:
| Формат | Скорость | Размер | Лучший для |
|---|---|---|---|
| GeoJSON | Быстрый | Средний | Веб-приложения, анализ |
| Shapefile | Средний | Компактный | ГИС, ArcGIS, QGIS |
| GeoPackage | Средний | Компактный | Один файл вместо SHP bundle |
| DXF | Средний | Большой | AutoCAD, инженерия |
| CSV | Быстрый | Малый | Анализ данных, таблицы |
| FlatGeobuf | Быстрый | Компактный | Стриминг, веб-карты |
Для массовых операций рекомендуем GeoJSON (универсальный) или FlatGeobuf (быстрый и компактный).
Пример: ежедневное обновление данных
import schedule
def daily_update():
"""Обновить данные по ключевым регионам."""
regions = {
"moscow_roads": {"bbox": [37.3, 55.5, 37.9, 55.9], "format": "geojson"},
"spb_buildings": {"bbox": [30.1, 59.8, 30.5, 60.1], "format": "shp"},
}
for name, params in regions.items():
try:
export_city(name, params["bbox"], params["format"])
except Exception as e:
print(f"Error exporting {name}: {e}")
schedule.every().day.at("03:00").do(daily_update)
while True:
schedule.run_pending()
time.sleep(60)
Автоматизация экспорта через API экономит часы ручной работы. Batch endpoint, параллельный поллинг статусов и параллельное скачивание позволяют обрабатывать сотни областей за минуты.