Source code for dtmapi.api

import logging
import os
import time
from typing import Any, Dict, Optional, Union

import pandas as pd
import requests

from dtmapi.config import (
    COUNTRY_LIST_API,
    COUNTRY_LIST_API_V2,
    IDP_ADMIN_0_API,
    IDP_ADMIN_0_API_V2,
    IDP_ADMIN_1_API,
    IDP_ADMIN_1_API_V2,
    IDP_ADMIN_2_API,
    IDP_ADMIN_2_API_V2,
    OPERATION_LIST_API,
    OPERATION_LIST_API_V2,
)
from dtmapi.exceptions import (
    DTMApiRequestError,
    DTMApiResponseError,
    DTMApiTimeoutError,
    DTMAuthenticationError,
    DTMApiVersionError,
)
from dtmapi.validators import (
    ValidationError,
    validate_date_format,
    validate_date_range,
    validate_required_params,
    validate_round_number_range,
)

# Configure logger
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


[docs] class DTMApi: """ Python interface to DTM API endpoints. Supports both v2 (legacy) and v3 (current). Requires Ocp-Apim-Subscription-Key for authentication. **API Version Differences:** +-------------------------------+----------+----------+ | Feature | v2 | v3 | +===============================+==========+==========+ | IDP Admin 0, 1, 2 Data | Yes | Yes | +-------------------------------+----------+----------+ | Country List | Yes | Yes | +-------------------------------+----------+----------+ | Operation List | Yes | Yes | +-------------------------------+----------+----------+ | Gender/Sex Disaggregation | No | **Yes** | +-------------------------------+----------+----------+ | Origin of Displacement | No | **Yes** | +-------------------------------+----------+----------+ | Displacement Reason | No | **Yes** | +-------------------------------+----------+----------+ | Status | Legacy | Current | +-------------------------------+----------+----------+ **Usage Examples:** v3 API (default - recommended for new projects):: api = DTMApi(subscription_key="YOUR-KEY") data = api.get_idp_admin0_data(CountryName="Sudan") # Returns IDP data WITH gender, origin, and reason fields v2 API (legacy - for historical data compatibility):: api = DTMApi(subscription_key="YOUR-KEY", api_version="v2") data = api.get_idp_admin0_data(CountryName="Sudan") # Returns IDP data WITHOUT demographic disaggregation """ DEFAULT_TIMEOUT = 30 # seconds DEFAULT_MAX_RETRIES = 3 DEFAULT_RETRY_DELAY = 1 # seconds def __init__( self, subscription_key: Optional[str] = None, api_version: str = "v3", timeout: Optional[int] = None, max_retries: Optional[int] = None, retry_delay: Optional[float] = None, ): """ Initialize the DTM API client. :param subscription_key: DTM API subscription key. Can also be set via DTMAPI_SUBSCRIPTION_KEY env var. :type subscription_key: Optional[str] :param api_version: API version to use: "v2" (legacy) or "v3" (current, default). :type api_version: str :param timeout: Request timeout in seconds (default: 30). :type timeout: Optional[int] :param max_retries: Maximum number of retry attempts for failed requests (default: 3). :type max_retries: Optional[int] :param retry_delay: Base delay in seconds between retries (default: 1). Uses exponential backoff. :type retry_delay: Optional[float] :raises DTMAuthenticationError: If no subscription key is provided. :raises DTMApiVersionError: If api_version is not "v2" or "v3". """ logger.debug("Initializing DTMApi client") # Validate API version if api_version not in ["v2", "v3"]: raise DTMApiVersionError( f"api_version must be 'v2' or 'v3', got: '{api_version}'" ) self.api_version = api_version # Validate and set subscription key self.subscription_key = subscription_key or os.getenv("DTMAPI_SUBSCRIPTION_KEY") if not self.subscription_key: logger.error("No subscription key provided") raise DTMAuthenticationError( "A DTM API Subscription Key is required. " "Provide it as an argument or set the DTMAPI_SUBSCRIPTION_KEY environment variable." ) # Set timeout and retry parameters self.timeout = timeout or self.DEFAULT_TIMEOUT self.max_retries = max_retries if max_retries is not None else self.DEFAULT_MAX_RETRIES self.retry_delay = retry_delay if retry_delay is not None else self.DEFAULT_RETRY_DELAY logger.info( f"DTMApi client initialized with api_version={self.api_version}, " f"timeout={self.timeout}s, max_retries={self.max_retries}, " f"retry_delay={self.retry_delay}s" ) def _headers(self) -> Dict[str, str]: return { "User-Agent": "Mozilla/5.0 (compatible; DTMClient/2.0)", "Ocp-Apim-Subscription-Key": self.subscription_key, } def _get_endpoint(self, endpoint_type: str) -> str: """ Get the appropriate API endpoint URL based on the API version. :param endpoint_type: Type of endpoint ("admin0", "admin1", "admin2", "countries", "operations"). :type endpoint_type: str :return: The full API endpoint URL. :rtype: str """ endpoint_map = { "v3": { "admin0": IDP_ADMIN_0_API, "admin1": IDP_ADMIN_1_API, "admin2": IDP_ADMIN_2_API, "countries": COUNTRY_LIST_API, "operations": OPERATION_LIST_API, }, "v2": { "admin0": IDP_ADMIN_0_API_V2, "admin1": IDP_ADMIN_1_API_V2, "admin2": IDP_ADMIN_2_API_V2, "countries": COUNTRY_LIST_API_V2, "operations": OPERATION_LIST_API_V2, }, } return endpoint_map[self.api_version][endpoint_type] def _is_retryable_error(self, exception: Exception) -> bool: """ Determine if an error should trigger a retry. :param exception: The exception to check. :type exception: Exception :return: True if the error is retryable, False otherwise. :rtype: bool """ # Retry on timeout if isinstance(exception, requests.Timeout): return True # Retry on connection errors if isinstance(exception, requests.ConnectionError): return True # Retry on specific HTTP status codes (429, 500, 502, 503, 504) if isinstance(exception, requests.HTTPError): if exception.response is not None: status_code = exception.response.status_code # Rate limiting or server errors if status_code in [429, 500, 502, 503, 504]: return True return False def _fetch_data( self, api_url: str, params: Optional[Dict[str, Any]] = None, to_pandas: bool = True, ) -> Union[pd.DataFrame, Dict[str, Any]]: """ Helper function to fetch data from the specified API URL with given parameters. Implements retry logic with exponential backoff for transient errors. :param api_url: The API endpoint URL. :type api_url: str :param params: The query parameters for the API request. :type params: Dict[str, Any] :param to_pandas: If True, the data will be returned as a pandas DataFrame. Otherwise, it will be returned as a JSON object. :type to_pandas: bool :return: The data matching the specified criteria, either as a DataFrame or a JSON object. :rtype: Union[pd.DataFrame, Dict[str, Any]] :raises DTMApiTimeoutError: If the request times out. :raises DTMApiAuthenticationError: If authentication fails. :raises DTMApiResponseError: If the API returns an error response. :raises DTMApiRequestError: If the request fails for other reasons. """ logger.debug(f"Fetching data from {api_url} with params={params}") last_exception = None for attempt in range(self.max_retries + 1): try: if attempt > 0: # Exponential backoff: delay * (2 ^ (attempt - 1)) delay = self.retry_delay * (2 ** (attempt - 1)) logger.info(f"Retrying request (attempt {attempt + 1}/{self.max_retries + 1}) after {delay}s delay") time.sleep(delay) response = requests.get( api_url, params=params, headers=self._headers(), timeout=self.timeout ) logger.debug(f"Received response: status={response.status_code}") response.raise_for_status() data = response.json() # For endpoints that wrap result in 'isSuccess' if not data.get("isSuccess"): error_messages = data.get("errorMessages", []) error_message = ( error_messages[0] if error_messages else "Unknown API error" ) logger.error(f"API returned error: {error_message}") raise DTMApiResponseError(error_message, error_messages) result = data["result"] result_count = len(result) if isinstance(result, list) else 1 logger.info(f"Successfully fetched {result_count} records from {api_url}") return pd.DataFrame(result) if to_pandas else result except requests.Timeout as e: last_exception = e logger.warning(f"Request timed out after {self.timeout} seconds: {api_url}") if not self._is_retryable_error(e) or attempt >= self.max_retries: logger.error(f"Request timed out after {self.timeout} seconds (no more retries)") raise DTMApiTimeoutError( f"Request timed out after {self.timeout} seconds" ) from e except requests.HTTPError as e: last_exception = e if e.response.status_code == 401 or e.response.status_code == 403: logger.error(f"Authentication failed: {e.response.status_code}") raise DTMAuthenticationError( f"Authentication failed: {e.response.status_code} {e.response.reason}" ) from e logger.warning(f"HTTP error: {e.response.status_code} {e.response.reason}") if not self._is_retryable_error(e) or attempt >= self.max_retries: logger.error(f"HTTP error occurred (no more retries): {e.response.status_code}") raise DTMApiRequestError( f"HTTP error occurred: {e.response.status_code} {e.response.reason}" ) from e except requests.RequestException as e: last_exception = e logger.warning(f"Request failed: {e}") if not self._is_retryable_error(e) or attempt >= self.max_retries: logger.error(f"Request failed (no more retries): {e}") raise DTMApiRequestError(f"API request failed: {e}") from e # This should not be reached, but just in case if last_exception: raise DTMApiRequestError(f"API request failed after {self.max_retries} retries") from last_exception # ----------- Public API Methods -----------
[docs] def get_all_countries( self, to_pandas: bool = True ) -> Union[pd.DataFrame, Dict[str, Any]]: """ Retrieve all countries for which DTM data is publicly available through the API. :return: All countries for which DTM data is publicly available through the API. :rtype: Union[pd.DataFrame, Dict[str, Any]] """ return self._fetch_data(self._get_endpoint("countries"), to_pandas=to_pandas)
[docs] def get_all_operations( self, to_pandas: bool = True ) -> Union[pd.DataFrame, Dict[str, Any]]: """ Retrieve all operations for which DTM data is publicly available through the API. :return: All operations for which DTM data is publicly available through the API. :rtype: Union[pd.DataFrame, Dict[str, Any]] """ return self._fetch_data(self._get_endpoint("operations"), to_pandas=to_pandas)
[docs] def get_idp_admin0_data( self, Operation: Optional[str] = None, CountryName: Optional[str] = None, Admin0Pcode: Optional[str] = None, FromReportingDate: Optional[str] = None, ToReportingDate: Optional[str] = None, FromRoundNumber: Optional[int] = None, ToRoundNumber: Optional[int] = None, to_pandas: bool = True, ) -> Union[pd.DataFrame, Dict[str, Any]]: """ Retrieve IDP data at Admin 0 level based on specified parameters. At least one of the following parameters must be provided: Operation, CountryName, or Admin0Pcode. :param Operation: Name of the DTM operation for which the data was collected. :type Operation: Optional[str] :param CountryName: Name of the country where the data was collected. :type CountryName: Optional[str] :param Admin0Pcode: Country code (ISO 3166-1 alpha-3). :type Admin0Pcode: Optional[str] :param FromReportingDate: Start date for the reporting period (format: 'YYYY-MM-DD'). :type FromReportingDate: Optional[str] :param ToReportingDate: End date for the reporting period (format: 'YYYY-MM-DD'). :type ToReportingDate: Optional[str] :param FromRoundNumber: Starting round number for the data collection range. :type FromRoundNumber: Optional[int] :param ToRoundNumber: Ending round number for the data collection range. :type ToRoundNumber: Optional[int] :param to_pandas: If True, the data will be returned as a pandas DataFrame. Otherwise, it will be returned as a JSON object. :type to_pandas: bool :return: The IDP Admin0 data matching the specified criteria, either as a DataFrame or a JSON object. :rtype: Union[pd.DataFrame, Dict[str, Any]] :raises ValidationError: If parameter validation fails. """ # Validate required parameters validate_required_params( {"Operation": Operation, "CountryName": CountryName, "Admin0Pcode": Admin0Pcode}, ["Operation", "CountryName", "Admin0Pcode"] ) # Validate date formats if FromReportingDate: validate_date_format(FromReportingDate, "FromReportingDate") if ToReportingDate: validate_date_format(ToReportingDate, "ToReportingDate") validate_date_range(FromReportingDate, ToReportingDate, "FromReportingDate", "ToReportingDate") # Validate round numbers validate_round_number_range(FromRoundNumber, ToRoundNumber) params = { "Operation": Operation, "CountryName": CountryName, "Admin0Pcode": Admin0Pcode, "FromReportingDate": FromReportingDate, "ToReportingDate": ToReportingDate, "FromRoundNumber": FromRoundNumber, "ToRoundNumber": ToRoundNumber, } # Remove None values params = {k: v for k, v in params.items() if v is not None} return self._fetch_data(self._get_endpoint("admin0"), params, to_pandas)
[docs] def get_idp_admin1_data( self, Operation: Optional[str] = None, CountryName: Optional[str] = None, Admin0Pcode: Optional[str] = None, Admin1Name: Optional[str] = None, Admin1Pcode: Optional[str] = None, FromReportingDate: Optional[str] = None, ToReportingDate: Optional[str] = None, FromRoundNumber: Optional[int] = None, ToRoundNumber: Optional[int] = None, to_pandas: bool = True, ) -> Union[pd.DataFrame, Dict[str, Any]]: """ Retrieve IDP data at Admin 1 level based on specified parameters. At least one of the following parameters must be provided: Operation, CountryName, or Admin0Pcode. :param Operation: Name of the DTM operation for which the data was collected. :type Operation: Optional[str] :param CountryName: Name of the country where the data was collected. :type CountryName: Optional[str] :param Admin0Pcode: Country code (ISO 3166-1 alpha-3). :type Admin0Pcode: Optional[str] :param Admin1Name: Name of level 1 administrative boundaries. :type Admin1Name: Optional[str] :param Admin1Pcode: Place code of level 1 administrative boundaries. :type Admin1Pcode: Optional[str] :param FromReportingDate: Start date for the reporting period (format: 'YYYY-MM-DD'). :type FromReportingDate: Optional[str] :param ToReportingDate: End date for the reporting period (format: 'YYYY-MM-DD'). :type ToReportingDate: Optional[str] :param FromRoundNumber: Starting round number for the data collection range. :type FromRoundNumber: Optional[int] :param ToRoundNumber: Ending round number for the data collection range. :type ToRoundNumber: Optional[int] :param to_pandas: If True, the data will be returned as a pandas DataFrame. Otherwise, it will be returned as a JSON object. :type to_pandas: bool :return: The IDP Admin1 data matching the specified criteria, either as a DataFrame or a JSON object. :rtype: Union[pd.DataFrame, Dict[str, Any]] :raises ValidationError: If parameter validation fails. """ # Validate required parameters validate_required_params( {"Operation": Operation, "CountryName": CountryName, "Admin0Pcode": Admin0Pcode}, ["Operation", "CountryName", "Admin0Pcode"] ) # Validate date formats if FromReportingDate: validate_date_format(FromReportingDate, "FromReportingDate") if ToReportingDate: validate_date_format(ToReportingDate, "ToReportingDate") validate_date_range(FromReportingDate, ToReportingDate, "FromReportingDate", "ToReportingDate") # Validate round numbers validate_round_number_range(FromRoundNumber, ToRoundNumber) params = { "Operation": Operation, "CountryName": CountryName, "Admin0Pcode": Admin0Pcode, "Admin1Name": Admin1Name, "Admin1Pcode": Admin1Pcode, "FromReportingDate": FromReportingDate, "ToReportingDate": ToReportingDate, "FromRoundNumber": FromRoundNumber, "ToRoundNumber": ToRoundNumber, } params = {k: v for k, v in params.items() if v is not None} return self._fetch_data(self._get_endpoint("admin1"), params, to_pandas)
[docs] def get_idp_admin2_data( self, Operation: Optional[str] = None, CountryName: Optional[str] = None, Admin0Pcode: Optional[str] = None, Admin1Name: Optional[str] = None, Admin1Pcode: Optional[str] = None, Admin2Name: Optional[str] = None, Admin2Pcode: Optional[str] = None, FromReportingDate: Optional[str] = None, ToReportingDate: Optional[str] = None, FromRoundNumber: Optional[int] = None, ToRoundNumber: Optional[int] = None, to_pandas: bool = True, ) -> Union[pd.DataFrame, Dict[str, Any]]: """ Retrieve IDP data at Admin 2 level based on specified parameters. At least one of the following parameters must be provided: Operation, CountryName, or Admin0Pcode. :param Operation: Name of the DTM operation for which the data was collected. :type Operation: Optional[str] :param CountryName: Name of the country where the data was collected. :type CountryName: Optional[str] :param Admin0Pcode: Country code (ISO 3166-1 alpha-3). :type Admin0Pcode: Optional[str] :param Admin1Name: Name of level 1 administrative boundaries. :type Admin1Name: Optional[str] :param Admin1Pcode: Place code of level 1 administrative boundaries. :type Admin1Pcode: Optional[str] :param Admin2Name: Name of level 2 administrative boundaries. :type Admin2Name: Optional[str] :param Admin2Pcode: Place code of level 2 administrative boundaries. :type Admin2Pcode: Optional[str] :param FromReportingDate: Start date for the reporting period (format: 'YYYY-MM-DD'). :type FromReportingDate: Optional[str] :param ToReportingDate: End date for the reporting period (format: 'YYYY-MM-DD'). :type ToReportingDate: Optional[str] :param FromRoundNumber: Starting round number for the data collection range. :type FromRoundNumber: Optional[int] :param ToRoundNumber: Ending round number for the data collection range. :type ToRoundNumber: Optional[int] :param to_pandas: If True, the data will be returned as a pandas DataFrame. Otherwise, it will be returned as a JSON object. :type to_pandas: bool :returns: The IDP Admin2 data matching the specified criteria, either as a DataFrame or a JSON object. :rtype: Union[pd.DataFrame, Dict[str, Any]] :raises ValidationError: If parameter validation fails. """ # Validate required parameters validate_required_params( {"Operation": Operation, "CountryName": CountryName, "Admin0Pcode": Admin0Pcode}, ["Operation", "CountryName", "Admin0Pcode"] ) # Validate date formats if FromReportingDate: validate_date_format(FromReportingDate, "FromReportingDate") if ToReportingDate: validate_date_format(ToReportingDate, "ToReportingDate") validate_date_range(FromReportingDate, ToReportingDate, "FromReportingDate", "ToReportingDate") # Validate round numbers validate_round_number_range(FromRoundNumber, ToRoundNumber) params = { "Operation": Operation, "CountryName": CountryName, "Admin0Pcode": Admin0Pcode, "Admin1Name": Admin1Name, "Admin1Pcode": Admin1Pcode, "Admin2Name": Admin2Name, "Admin2Pcode": Admin2Pcode, "FromReportingDate": FromReportingDate, "ToReportingDate": ToReportingDate, "FromRoundNumber": FromRoundNumber, "ToRoundNumber": ToRoundNumber, } params = {k: v for k, v in params.items() if v is not None} return self._fetch_data(self._get_endpoint("admin2"), params, to_pandas)