Open Beta — все 133 форматов бесплатно, без лимитов Подробнее

Массовый экспорт карт: автоматизация через API

2026-05-116 мин чтения
APIbatchавтоматизацияPythonREST

Экспортировать одну карту вручную — просто. Но что если нужно выгрузить 50 городов в GeoJSON, 200 кварталов в DXF или ежедневно обновлять данные по 10 регионам? Для этого существует REST API с поддержкой массового экспорта.

Обзор API

API osm2cdr.ru построен на REST-принципах. Основные эндпоинты:

Эндпоинт Метод Описание
/api/export POST Создать задачу экспорта
/api/export/{task_id} GET Статус задачи
/api/export/{task_id}/download GET Скачать результат
/api/batch-export POST Массовый экспорт (до 50 задач)
/api/formats GET Список доступных форматов

Для доступа к API нужен API-ключ, который можно получить в личном кабинете.

Одиночный экспорт: curl

Начнём с простого запроса на экспорт одной области:

curl -X POST https://osm2cdr.ru/api/export \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "format": "geojson",
    "bbox": [37.59, 55.73, 37.65, 55.76],
    "style": "osm",
    "detail_level": 4
  }'

Ответ:

{
  "task_id": "abc123-def456",
  "status": "pending",
  "format": "geojson",
  "estimated_time": 15
}

Проверка статуса и скачивание:

# Проверить статус
curl -H "Authorization: Bearer YOUR_API_KEY" \
  https://osm2cdr.ru/api/export/abc123-def456

# Скачать результат (когда status = "completed")
curl -H "Authorization: Bearer YOUR_API_KEY" \
  -o moscow_center.geojson \
  https://osm2cdr.ru/api/export/abc123-def456/download

Массовый экспорт: batch endpoint

Batch endpoint принимает массив задач и возвращает массив task_id. Это эффективнее, чем отправлять запросы по одному — сервер оптимизирует выполнение и может переиспользовать кэш PostGIS.

curl -X POST https://osm2cdr.ru/api/batch-export \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "tasks": [
      {"format": "geojson", "bbox": [37.59, 55.73, 37.65, 55.76], "name": "Moscow Center"},
      {"format": "geojson", "bbox": [30.28, 59.92, 30.36, 59.96], "name": "SPB Center"},
      {"format": "dxf", "bbox": [37.59, 55.73, 37.65, 55.76], "name": "Moscow DXF"}
    ]
  }'

Лимиты batch:

Тир Задач в batch Задач в сутки Параллельно
Free 5 10 1
Basic 20 100 3
Pro 50 500 10
Enterprise 200 Без лимита 50

Автоматизация на Python

Экспорт списка городов

import requests
import time
from pathlib import Path

API_URL = "https://osm2cdr.ru/api"
API_KEY = "YOUR_API_KEY"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}

# Список городов с bbox
cities = {
    "moscow": [37.32, 55.57, 37.95, 55.92],
    "saint_petersburg": [30.08, 59.83, 30.55, 60.09],
    "berlin": [13.08, 52.34, 13.76, 52.68],
    "paris": [2.22, 48.81, 2.47, 48.90],
    "london": [-0.35, 51.38, 0.15, 51.60],
}

def export_city(name: str, bbox: list, fmt: str = "geojson") -> str:
    """Создать задачу экспорта и дождаться результата."""
    # Создать задачу
    resp = requests.post(f"{API_URL}/export", headers=HEADERS, json={
        "format": fmt,
        "bbox": bbox,
        "style": "osm",
        "detail_level": 4
    })
    resp.raise_for_status()
    task_id = resp.json()["task_id"]
    print(f"[{name}] Task created: {task_id}")

    # Ожидание завершения
    for _ in range(120):  # Максимум 10 минут
        status = requests.get(f"{API_URL}/export/{task_id}", headers=HEADERS).json()
        if status["status"] == "completed":
            break
        if status["status"] == "failed":
            raise RuntimeError(f"Export failed: {status.get('error')}")
        time.sleep(5)

    # Скачивание
    output = Path(f"output/{name}.{fmt}")
    output.parent.mkdir(exist_ok=True)
    dl = requests.get(f"{API_URL}/export/{task_id}/download", headers=HEADERS)
    output.write_bytes(dl.content)
    print(f"[{name}] Saved to {output} ({len(dl.content)} bytes)")
    return str(output)

# Экспорт всех городов
for city, bbox in cities.items():
    export_city(city, bbox, "geojson")

Batch-экспорт с параллельным скачиванием

from concurrent.futures import ThreadPoolExecutor

def batch_export(cities: dict, fmt: str = "shp"):
    """Массовый экспорт через batch endpoint."""
    tasks = [
        {"format": fmt, "bbox": bbox, "name": name}
        for name, bbox in cities.items()
    ]

    # Отправить batch
    resp = requests.post(f"{API_URL}/batch-export", headers=HEADERS, json={"tasks": tasks})
    resp.raise_for_status()
    task_ids = resp.json()["task_ids"]
    print(f"Batch created: {len(task_ids)} tasks")

    # Ожидание и скачивание параллельно
    def wait_and_download(task_id, name):
        for _ in range(120):
            st = requests.get(f"{API_URL}/export/{task_id}", headers=HEADERS).json()
            if st["status"] == "completed":
                break
            time.sleep(5)
        dl = requests.get(f"{API_URL}/export/{task_id}/download", headers=HEADERS)
        path = Path(f"output/{name}.{fmt}")
        path.parent.mkdir(exist_ok=True)
        path.write_bytes(dl.content)
        return path

    with ThreadPoolExecutor(max_workers=5) as pool:
        futures = [
            pool.submit(wait_and_download, tid, name)
            for tid, name in zip(task_ids, cities.keys())
        ]
        results = [f.result() for f in futures]

    print(f"Downloaded {len(results)} files")
    return results

batch_export(cities, "shp")

Параллельный поллинг множества задач

Для долгих задач (большие области, сложные форматы) и для батчей удобно отслеживать прогресс через параллельный поллинг REST-эндпоинта /api/status/{task_id}. WebSocket-эндпоинт /ws/task/{task_id} существует как заглушка (немедленно закрывается с кодом 1001) и зарезервирован для будущей реализации через Celery -> Redis Pub/Sub — не полагайтесь на него в production-скриптах.

Базовый шаблон поллинга одной задачи:

import requests
import time

def poll_until_done(task_id: str, interval: int = 5, timeout: int = 600) -> dict:
    """Поллинг GET /api/status/{task_id} до completed/failed."""
    deadline = time.time() + timeout
    while time.time() < deadline:
        st = requests.get(f"{API_URL}/status/{task_id}", headers=HEADERS).json()
        progress = st.get("progress", 0)
        step = st.get("step", "")
        print(f"[{task_id[:8]}] {st['status']} {progress}% — {step}")
        if st["status"] == "completed":
            return st
        if st["status"] == "failed":
            raise RuntimeError(f"Export failed: {st.get('error')}")
        time.sleep(interval)
    raise TimeoutError(f"Task {task_id} did not finish in {timeout}s")

Параллельный мониторинг батча через ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor, as_completed

def poll_batch(task_ids: list[str], max_workers: int = 5) -> dict:
    """Параллельный поллинг списка task_id; возвращает {task_id: final_status}."""
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(poll_until_done, tid): tid for tid in task_ids}
        for fut in as_completed(futures):
            tid = futures[fut]
            try:
                results[tid] = fut.result()
            except Exception as e:
                results[tid] = {"status": "failed", "error": str(e)}
    return results

# Использование: после batch-export получаем task_ids и ждём всех параллельно
# results = poll_batch(task_ids, max_workers=5)

При HTTP 429 (rate limit) увеличьте interval или уменьшите max_workers — sliding window rate limiter учитывает все запросы по API-ключу, включая /api/status/.

Rate Limiting

API использует sliding window rate limiting. При превышении лимита возвращается HTTP 429 с заголовком Retry-After:

resp = requests.post(f"{API_URL}/export", headers=HEADERS, json=payload)
if resp.status_code == 429:
    wait = int(resp.headers.get("Retry-After", 60))
    print(f"Rate limited. Waiting {wait}s...")
    time.sleep(wait)

Форматы для автоматизации

Не все форматы одинаково подходят для batch-сценариев:

Формат Скорость Размер Лучший для
GeoJSON Быстрый Средний Веб-приложения, анализ
Shapefile Средний Компактный ГИС, ArcGIS, QGIS
GeoPackage Средний Компактный Один файл вместо SHP bundle
DXF Средний Большой AutoCAD, инженерия
CSV Быстрый Малый Анализ данных, таблицы
FlatGeobuf Быстрый Компактный Стриминг, веб-карты

Для массовых операций рекомендуем GeoJSON (универсальный) или FlatGeobuf (быстрый и компактный).

Пример: ежедневное обновление данных

import schedule

def daily_update():
    """Обновить данные по ключевым регионам."""
    regions = {
        "moscow_roads": {"bbox": [37.3, 55.5, 37.9, 55.9], "format": "geojson"},
        "spb_buildings": {"bbox": [30.1, 59.8, 30.5, 60.1], "format": "shp"},
    }
    for name, params in regions.items():
        try:
            export_city(name, params["bbox"], params["format"])
        except Exception as e:
            print(f"Error exporting {name}: {e}")

schedule.every().day.at("03:00").do(daily_update)

while True:
    schedule.run_pending()
    time.sleep(60)

Автоматизация экспорта через API экономит часы ручной работы. Batch endpoint, параллельный поллинг статусов и параллельное скачивание позволяют обрабатывать сотни областей за минуты.

← Все статьи