# noqa: D100
import logging
import types
from collections import OrderedDict
from textwrap import dedent
from warnings import warn
import owslib
import packaging.version
import requests
import requests.auth
from boltons.funcutils import FunctionBuilder
from owslib.util import ServiceException
from owslib.wps import (
ASYNC,
SYNC,
WPS_DEFAULT_VERSION,
ComplexData,
WebProcessingService,
)
from birdy.client import notebook, utils
from birdy.client.outputs import WPSResult
from birdy.exceptions import UnauthorizedException
from birdy.utils import embed, fix_url, guess_type, sanitize
# TODO: Support passing ComplexInput's data using POST.
[docs]
class WPSClient:
"""Returns a class where every public method is a WPS process available at the given url.
Examples
--------
>>> emu = WPSClient(url='<server url>')
>>> emu.hello('stranger')
'Hello stranger'
"""
def __init__(
self,
url,
processes=None,
converters=None,
username=None,
password=None,
headers=None,
auth=None,
verify=True,
cert=None,
progress=False,
version=WPS_DEFAULT_VERSION,
caps_xml=None,
desc_xml=None,
language=None,
lineage=False,
**kwds,
):
"""Initialize WPSClient.
Parameters
----------
url: str
Link to WPS provider. config (Config): an instance
processes:
Specify a subset of processes to bind. Defaults to all processes.
converters: dict
Correspondence of {mimetype: class} to convert this mimetype to a python object.
username: str
passed to :class:`owslib.wps.WebProcessingService`
password: str
passed to :class:`owslib.wps.WebProcessingService`
headers: str
passed to :class:`owslib.wps.WebProcessingService`
auth: requests.auth.AuthBase
requests-style auth class to authenticate.
see https://2.python-requests.org/en/master/user/authentication/
verify: bool
passed to :class:`owslib.wps.WebProcessingService`
cert: str
passed to :class:`owslib.wps.WebProcessingService`
verbose: bool
Deprecated. passed to :class:`owslib.wps.WebProcessingService` for owslib < 0.29
progress: bool
If True, enable interactive user mode.
version: str
WPS version to use.
language: str
passed to :class:`owslib.wps.WebProcessingService` (ex: 'fr-CA', 'en_US').
lineage: bool
If True, the Execute operation includes lineage information.
"""
self._converters = converters
self._interactive = progress
self._mode = ASYNC if progress else SYNC
self._lineage = lineage
self._notebook = notebook.is_notebook()
self._inputs = {}
self._outputs = {}
if not verify:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
if headers is None:
headers = {}
if auth is not None:
if isinstance(auth, tuple) and len(auth) == 2:
# special-case basic HTTP auth
auth = requests.auth.HTTPBasicAuth(*auth)
# We only need some headers from the requests.auth.AuthBase implementation
# We prepare a dummy request, call the auth object with it, and get its headers
dummy_request = requests.Request("get", "http://localhost")
r = auth(dummy_request.prepare())
auth_headers = ["Authorization", "Proxy-Authorization", "Cookie"]
headers.update({h: r.headers[h] for h in auth_headers if h in r.headers})
if "verbose" in kwds:
if packaging.version.parse(owslib.__version__) >= packaging.version.parse(
"0.29.0"
):
kwds.pop("verbose")
warn(
"The 'verbose' keyword is deprecated and will be removed in a future version. Starting with owslib "
"0.29.0, debugging information is logged instead of printed.",
DeprecationWarning,
)
self._wps = WebProcessingService(
url,
version=version,
username=username,
password=password,
headers=headers,
verify=verify,
cert=cert,
skip_caps=True,
language=language,
**kwds,
)
try:
self._wps.getcapabilities(xml=caps_xml)
except ServiceException as e:
if "AccessForbidden" in str(e):
raise UnauthorizedException(
"You are not authorized to do a request of type: GetCapabilities"
)
raise
self._processes = self._get_process_description(processes, xml=desc_xml)
# Build the methods
for pid in self._processes:
setattr(
self, sanitize(pid), types.MethodType(self._method_factory(pid), self)
)
self.logger = logging.getLogger("WPSClient")
if progress:
self._setup_logging()
self.__doc__ = utils.build_wps_client_doc(self._wps, self._processes)
@property
def language(self): # noqa: D102
return self._wps.language
@language.setter
def language(self, value): # noqa: D102
self._wps.language = value
@property
def languages(self): # noqa: D102
return self._wps.languages
[docs]
def _get_process_description(self, processes=None, xml=None):
"""Return the description for each process.
Sends the server a `describeProcess` request for each process.
Parameters
----------
processes: str, list, None
A process name, a list of process names or None (for all processes).
Returns
-------
OrderedDict
A dictionary keyed by the process identifier of process descriptions.
"""
all_wps_processes = [p.identifier for p in self._wps.processes]
if processes is None:
try:
# Get the description for all processes in one request.
ps = self._wps.describeprocess("all", xml=xml)
return OrderedDict((p.identifier, p) for p in ps)
except (ServiceException, ValueError):
processes = all_wps_processes
# Check for invalid process names, i.e. not matching the getCapabilities response.
process_names, missing = utils.filter_case_insensitive(
processes, all_wps_processes
)
if missing:
message = "These process names were not found on the WPS server: {}"
raise ValueError(message.format(", ".join(missing)))
# Get the description for each process.
ps = [self._wps.describeprocess(pid, xml=xml) for pid in process_names]
return OrderedDict((p.identifier, p) for p in ps)
[docs]
def _setup_logging(self):
self.logger.setLevel(logging.INFO)
import sys
fh = logging.StreamHandler(sys.stdout)
fh.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
self.logger.addHandler(fh)
[docs]
def _method_factory(self, pid):
"""Create a custom function signature with docstring, instantiate it and pass it to a wrapper.
The wrapper will call the process on reception.
Parameters
----------
pid: str
Identifier of the WPS process.
Returns
-------
func
A Python function calling the remote process, complete with docstring and signature.
"""
process = self._processes[pid]
required_inputs_first = sorted(process.dataInputs, key=sort_inputs_key)
input_names = []
# defaults will be set to the function's __defaults__:
# A tuple containing default argument values for those arguments that have defaults,
# or None if no arguments have a default value.
defaults = []
# Set process inputs
for inpt in required_inputs_first:
input_names.append(sanitize(inpt.identifier))
if inpt.minOccurs == 0 or inpt.defaultValue is not None:
default = inpt.defaultValue if inpt.dataType != "ComplexData" else None
defaults.append(utils.from_owslib(default, inpt.dataType))
# Set generic 'output_formats' input, only if one of the output is a ComplexData.
for o in process.processOutputs:
if o.dataType == "ComplexData":
if len(o.supportedValues) > 1:
input_names.append("output_formats")
defaults.append(None)
break
# convert defaults
defaults = tuple(defaults) # if defaults else None
body = dedent(
"""
inputs = locals()
inputs.pop('self')
return self._execute('{pid}', **inputs)
"""
).format(pid=pid)
func_builder = FunctionBuilder(
name=sanitize(pid),
doc=utils.build_process_doc(process),
args=["self"] + input_names,
defaults=defaults,
body=body,
filename=__file__,
module=self.__module__,
)
self._inputs[pid] = {}
if hasattr(process, "dataInputs"):
self._inputs[pid] = OrderedDict(
(i.identifier, i) for i in process.dataInputs
)
self._outputs[pid] = {}
if hasattr(process, "processOutputs"):
self._outputs[pid] = OrderedDict(
(o.identifier, o) for o in process.processOutputs
)
func = func_builder.get_func()
return func
[docs]
def _execute(self, pid, **kwargs):
"""Execute the process."""
wps_inputs = self._build_inputs(pid, **kwargs)
wps_outputs = self._parse_output_formats(kwargs.get("output_formats", {}))
if not wps_outputs:
wps_outputs = [
(o.identifier, "ComplexData" in o.dataType)
for o in list(self._outputs[pid].values())
]
mode = self._mode if self._processes[pid].storeSupported else SYNC
try:
wps_response = self._wps.execute(
pid,
inputs=wps_inputs,
output=wps_outputs,
mode=mode,
lineage=self._lineage,
)
if self._interactive and self._processes[pid].statusSupported:
if self._notebook:
notebook.monitor(wps_response, sleep=0.2)
else:
self._console_monitor(wps_response)
except ServiceException as e:
if "AccessForbidden" in str(e):
raise UnauthorizedException(
"You are not authorized to do a request of type: Execute"
)
raise
# Add the convenience methods of WPSResult to the WPSExecution class. This adds a `get` method.
utils.extend_instance(wps_response, WPSResult)
wps_response.attach(wps_outputs=self._outputs[pid], converters=self._converters)
return wps_response
[docs]
def _console_monitor(self, execution, sleep=3):
"""Monitor the execution of a process.
Parameters
----------
execution : WPSExecution instance
The execute response to monitor.
sleep: float
Number of seconds to wait before each status check.
"""
import signal
# Intercept CTRL-C
def sigint_handler(signum, frame):
self.cancel()
signal.signal(signal.SIGINT, sigint_handler)
while not execution.isComplete():
execution.checkStatus(sleepSecs=sleep)
self.logger.info(
"{} [{}/100] - {} ".format(
execution.process.identifier,
execution.percentCompleted,
execution.statusMessage[:50],
)
)
if execution.isSucceded():
self.logger.info(f"{execution.process.identifier} done.")
else:
self.logger.info(f"{execution.process.identifier} failed.")