from copy import deepcopy
import json
import logging
from typing import (
Any,
Dict,
Iterator,
List,
Optional,
TYPE_CHECKING,
Union,
)
from urllib.parse import urlparse
import re
from requests import Request, Session
import pystac
from pystac.link import Link
from pystac.serialization import (
merge_common_properties,
identify_stac_object_type,
identify_stac_object,
migrate_to_latest,
)
from pystac.stac_io import DefaultStacIO
import pystac_client
from .exceptions import APIError
from .conformance import ConformanceClasses, CONFORMANCE_URIS
if TYPE_CHECKING:
from pystac.stac_object import STACObject as STACObject_Type
from pystac.catalog import Catalog as Catalog_Type
logger = logging.getLogger(__name__)
[docs]class StacApiIO(DefaultStacIO):
def __init__(
self,
headers: Optional[Dict] = None,
conformance: Optional[List[str]] = None,
parameters: Optional[Dict] = None,
):
"""Initialize class for API IO
Args:
headers : Optional dictionary of headers to include in all requests
conformance : Optional list of `Conformance Classes
<https://github.com/radiantearth/stac-api-spec/blob/master/overview.md#conformance-classes>`__.
parameters: Optional dictionary of query string parameters to include in all requests.
Return:
StacApiIO : StacApiIO instance
"""
# TODO - this should super() to parent class
self.session = Session()
self.session.headers.update(headers or {})
self.session.params.update(parameters or {})
self._conformance = conformance
[docs] def read_text(self,
source: Union[str, Link],
*args: Any,
parameters: Optional[dict] = {},
**kwargs: Any) -> str:
"""Read text from the given URI.
Overwrites the default method for reading text from a URL or file to allow :class:`urllib.request.Request`
instances as input. This method also raises any :exc:`urllib.error.HTTPError` exceptions rather than catching
them to allow us to handle different response status codes as needed.
"""
if isinstance(source, str):
href = source
if bool(urlparse(href).scheme):
return self.request(href, *args, parameters=parameters, **kwargs)
else:
with open(href) as f:
href_contents = f.read()
return href_contents
elif isinstance(source, Link):
link = source.to_dict()
href = link['href']
# get headers and body from Link and add to request from simple stac resolver
merge = bool(link.get('merge', False))
# If the link object includes a "method" property, use that. If not fall back to 'GET'.
method = link.get('method', 'GET')
# If the link object includes a "headers" property, use that and respect the "merge" property.
headers = link.get('headers', None)
# If "POST" use the body object that and respect the "merge" property.
link_body = link.get('body', {})
if method == 'POST':
parameters = {**parameters, **link_body} if merge else link_body
else:
# parameters are already in the link href
parameters = {}
return self.request(href, *args, method=method, headers=headers, parameters=parameters)
[docs] def request(self,
href: str,
method: Optional[str] = 'GET',
headers: Optional[dict] = {},
parameters: Optional[dict] = {}) -> str:
"""Makes a request to an http endpoint
Args:
href (str): The request URL
method (Optional[str], optional): The http method to use, 'GET' or 'POST'. Defaults to 'GET'.
headers (Optional[dict], optional): Additional headers to include in request. Defaults to {}.
parameters (Optional[dict], optional): parameters to send with request. Defaults to {}.
Raises:
APIError: raised if the server returns an error response
Return:
str: The decoded response from the endpoint
"""
if method == 'POST':
request = Request(method=method, url=href, headers=headers, json=parameters)
else:
params = deepcopy(parameters)
if 'intersects' in params:
params['intersects'] = json.dumps(params['intersects'])
request = Request(method=method, url=href, headers=headers, params=params)
try:
prepped = self.session.prepare_request(request)
msg = f"{prepped.method} {prepped.url} Headers: {prepped.headers}"
if method == 'POST':
msg += f" Payload: {json.dumps(request.json)}"
logger.debug(msg)
resp = self.session.send(prepped)
if resp.status_code != 200:
raise APIError(resp.text)
return resp.content.decode("utf-8")
except Exception as err:
raise APIError(str(err))
[docs] def write_text_to_href(self, href: str, *args: Any, **kwargs: Any) -> None:
if bool(urlparse(href).scheme):
raise APIError("Transactions not supported")
else:
return super().write_text_to_href(*args, **kwargs)
[docs] def stac_object_from_dict(
self,
d: Dict[str, Any],
href: Optional[str] = None,
root: Optional["Catalog_Type"] = None,
preserve_dict: bool = True,
) -> "STACObject_Type":
"""Deserializes a :class:`~pystac.STACObject` sub-class instance from a dictionary.
Args:
d : The dictionary to deserialize
href : Optional href to associate with the STAC object
root : Optional root :class:`~pystac.Catalog` to associate with the
STAC object.
preserve_dict: If ``False``, the dict parameter ``d`` may be modified
during this method call. Otherwise the dict is not mutated.
Defaults to ``True``, which results results in a deepcopy of the
parameter. Set to ``False`` when possible to avoid the performance
hit of a deepcopy.
"""
if identify_stac_object_type(d) == pystac.STACObjectType.ITEM:
collection_cache = None
if root is not None:
collection_cache = root._resolved_objects.as_collection_cache()
# Merge common properties in case this is an older STAC object.
merge_common_properties(d, json_href=href, collection_cache=collection_cache)
info = identify_stac_object(d)
d = migrate_to_latest(d, info)
if info.object_type == pystac.STACObjectType.CATALOG:
result = pystac_client.Client.from_dict(d,
href=href,
root=root,
migrate=False,
preserve_dict=preserve_dict)
result._stac_io = self
return result
if info.object_type == pystac.STACObjectType.COLLECTION:
return pystac_client.CollectionClient.from_dict(d,
href=href,
root=root,
migrate=False,
preserve_dict=preserve_dict)
if info.object_type == pystac.STACObjectType.ITEM:
return pystac.Item.from_dict(d,
href=href,
root=root,
migrate=False,
preserve_dict=preserve_dict)
raise ValueError(f"Unknown STAC object type {info.object_type}")
[docs] def get_pages(self, url, method='GET', parameters={}) -> Iterator[Dict]:
"""Iterator that yields dictionaries for each page at a STAC paging endpoint, e.g., /collections, /search
Return:
Dict : JSON content from a single page
"""
page = self.read_json(url, method=method, parameters=parameters)
yield page
next_link = next((link for link in page.get('links', []) if link['rel'] == 'next'), None)
while next_link:
link = Link.from_dict(next_link)
page = self.read_json(link, parameters=parameters)
yield page
# get the next link and make the next request
next_link = next((link for link in page.get('links', []) if link['rel'] == 'next'),
None)