Source code for tes.client

"""TES access methods and helper functions."""

import re
import requests
import time

from attr import attrs, attrib
from attr.validators import instance_of, optional
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional

from tes.models import (Task, ListTasksRequest, ListTasksResponse, ServiceInfo,
                        GetTaskRequest, CancelTaskRequest, CreateTaskResponse,
                        strconv)
from tes.utils import unmarshal, TimeoutError


[docs] def append_suffixes_to_url( urls: List[str], suffixes: List[str] ) -> List[str]: """Compile all combinations of full paths from paths and suffixes. Args: urls: List of URL paths. prefixes: List of suffixes to be appended to `urls`. Returns: List of full path combinations, in the provided order of `paths` and `suffixes`, starting with all suffix combinations for the first path, then those for the second path, and so on. Paths are stripped of any trailing slashes. Examples: >>> client = tes.HTTPClient.append_suffixes_to_url(['https://funnel.exa mple.com'], ['ga4gh/tes/v1', 'v1', '']) ['https://funnel.example.com/ga4gh/tes/v1', 'https://funnel.example.com /v1', 'https://funnel.example.com'] """ compiled_paths: List[str] = [] for url in urls: for suffix in suffixes: compiled_paths.append( f"{url.rstrip('/')}/{suffix.strip('/')}".rstrip('/')) return compiled_paths
[docs] def send_request( paths: List[str], method: str = 'get', kwargs_requests: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> requests.Response: """Send request to a list of URLs, returning the first valid response. Args: paths: List of fully qualified URLs. method: HTTP method to use for the request; one of 'get' (default), 'post', 'put', and 'delete'. kwargs_requests: Keyword arguments to pass to the :mod:`requests` call. **kwargs: Keyword arguments for path parameter substition. Returns: The first successful response from the list of endpoints. Raises: requests.exceptions.HTTPError: If no response is received from any path. requests.exceptions.HTTPError: As soon as the first 4xx or 5xx status code is received. requests.exceptions.HTTPError: If, after trying all paths, at least one 404 status code and no other 4xx or 5xx status codes are received. ValueError: If an unsupported HTTP method is provided. """ if kwargs_requests is None: kwargs_requests = {} if method not in ('get', 'post', 'put', 'delete'): raise ValueError(f"Unsupported HTTP method: {method}") response: requests.Response = requests.Response() http_exceptions: Dict[str, Exception] = {} for path in paths: try: response = getattr(requests, method)( path.format(**kwargs), **kwargs_requests) except requests.exceptions.RequestException as exc: http_exceptions[path] = exc continue if response.status_code != 404: break if response.status_code is None: raise requests.exceptions.HTTPError( f"No response received; HTTP Exceptions: {http_exceptions}") response.raise_for_status() return response
[docs] def process_url(value): return re.sub("[/]+$", "", value)
[docs] @attrs class HTTPClient(object): """HTTP client class for interacting with the TES API.""" url: str = attrib(converter=process_url, validator=instance_of(str)) timeout: int = attrib(default=10, validator=instance_of(int)) user: Optional[str] = attrib( default=None, converter=strconv, validator=optional(instance_of(str))) password: Optional[str] = attrib( default=None, converter=strconv, validator=optional(instance_of(str))) token: Optional[str] = attrib( default=None, converter=strconv, validator=optional(instance_of(str))) def __attrs_post_init__(self): # for backward compatibility self.urls: List[str] = append_suffixes_to_url( [self.url], ["/ga4gh/tes/v1", "/v1", "/"] ) @url.validator # type: ignore def __check_url(self, attribute, value): """Validate URL scheme of TES instance. `attrs` validator function for `HTTPClient.url`. Args: attribute: Attribute being validated. value: Attribute value. Raises: ValueError: If URL scheme is unsupported. """ u = urlparse(value) if u.scheme not in ["http", "https"]: raise ValueError( "Unsupported URL scheme - must be one of [%s,%s]" % ("http", "https") )
[docs] def get_service_info(self) -> ServiceInfo: """Access method for `GET /service-info`. Returns: `tes.models.ServiceInfo` instance. """ kwargs: Dict[str, Any] = self._request_params() paths = append_suffixes_to_url( self.urls, ["service-info", "tasks/service-info"] ) response = send_request(paths=paths, kwargs_requests=kwargs) return unmarshal(response.json(), ServiceInfo)
[docs] def create_task(self, task: Task) -> CreateTaskResponse: """Access method for `POST /tasks`. Args: task: `tes.models.Task` instance. Returns: `tes.models.CreateTaskResponse` instance. Raises: TypeError: If `task` is not a `tes.models.Task` instance. """ if isinstance(task, Task): msg = task.as_json() else: raise TypeError("Expected Task instance") kwargs: Dict[str, Any] = self._request_params(data=msg) paths = append_suffixes_to_url(self.urls, ["/tasks"]) response = send_request(paths=paths, method='post', kwargs_requests=kwargs) return unmarshal(response.json(), CreateTaskResponse).id
[docs] def get_task(self, task_id: str, view: str = "BASIC") -> Task: """Access method for `GET /tasks/{id}`. Args: task_id: TES Task ID. view: Task info verbosity. One of `MINIMAL`, `BASIC` and `FULL`. Returns: `tes.models.Task` instance. """ req: GetTaskRequest = GetTaskRequest(task_id, view) payload: Dict[str, Optional[str]] = {"view": req.view} kwargs: Dict[str, Any] = self._request_params(params=payload) paths = append_suffixes_to_url(self.urls, ["/tasks/{task_id}"]) response = send_request(paths=paths, kwargs_requests=kwargs, task_id=req.id) return unmarshal(response.json(), Task)
[docs] def cancel_task(self, task_id: str) -> None: """Access method for `POST /tasks/{id}:cancel`. Args: task_id: TES Task ID. """ req: CancelTaskRequest = CancelTaskRequest(task_id) kwargs: Dict[str, Any] = self._request_params() paths = append_suffixes_to_url(self.urls, ["/tasks/{task_id}:cancel"]) send_request(paths=paths, method='post', kwargs_requests=kwargs, task_id=req.id) return None
[docs] def list_tasks( self, view: str = "MINIMAL", page_size: Optional[int] = None, page_token: Optional[str] = None ) -> ListTasksResponse: """Access method for `GET /tasks`. Args: view: Task info verbosity. One of `MINIMAL`, `BASIC` and `FULL`. page_size: Number of tasks to return. page_token: Token to retrieve the next page of tasks. Returns: `tes.models.ListTasksResponse` instance. """ req = ListTasksRequest( view=view, page_size=page_size, page_token=page_token, name_prefix=None, project=None ) msg: Dict = req.as_dict() kwargs: Dict[str, Any] = self._request_params(params=msg) paths = append_suffixes_to_url(self.urls, ["/tasks"]) response = send_request(paths=paths, kwargs_requests=kwargs) return unmarshal(response.json(), ListTasksResponse)
[docs] def wait(self, task_id: str, timeout=None) -> Task: def check_success(data: Task) -> bool: return data.state not in ["QUEUED", "RUNNING", "INITIALIZING"] max_time = time.time() + timeout if timeout else None response: Optional[Task] = None while True: try: response = self.get_task(task_id, "MINIMAL") except Exception: raise Exception(f"Failed to get task {task_id}") if response is not None: if check_success(response): return response if max_time is not None and time.time() >= max_time: raise TimeoutError("last_response: {response.as_dict()}") time.sleep(0.5)
def _request_params( self, data: Optional[str] = None, params: Optional[Dict] = None ) -> Dict[str, Any]: """Compile request parameters. Args: data: JSON payload to be sent in the request body. Returns: Dictionary of request parameters. """ kwargs: Dict[str, Any] = {} kwargs['timeout'] = self.timeout kwargs['headers'] = {} kwargs['headers']['Content-type'] = 'application/json' if self.user is not None and self.password is not None: kwargs['auth'] = (self.user, self.password) if data: kwargs['data'] = data if params: kwargs['params'] = params if self.token: kwargs['headers']['Authorization'] = f"Bearer {self.token}" return kwargs