Source code for birdy.client.outputs

# noqa: D100

import tempfile
from collections import namedtuple

from owslib.wps import WPSExecution

from birdy.client import utils
from birdy.client.converters import convert
from birdy.exceptions import ProcessFailed, ProcessIsNotComplete
from birdy.utils import delist, sanitize


[docs] class WPSResult(WPSExecution): # noqa: D101
[docs] def attach(self, wps_outputs, converters=None): """Attach the outputs according to converters. Parameters ---------- wps_outputs: dict converters: dict Converter dictionary {name: object} """ self._wps_outputs = wps_outputs self._converters = converters self._path = tempfile.mkdtemp()
[docs] def get(self, asobj=False): """Return the process response outputs. Parameters ---------- asobj: bool If True, object_converters will be used. """ if not self.isComplete(): raise ProcessIsNotComplete("Please wait ...") if not self.isSucceded(): # TODO: add reason for failure raise ProcessFailed("Sorry, process failed.") return self._make_output(asobj)
[docs] def _make_output(self, convert_objects=False): Output = namedtuple( sanitize(self.process.identifier) + "Response", [sanitize(o.identifier) for o in self.processOutputs], ) Output.__repr__ = utils.pretty_repr return Output( *[self._process_output(o, convert_objects) for o in self.processOutputs] )
[docs] def _process_output(self, output, convert_objects=False): """Process the output response, whether it is actual data or a URL to a file. Parameters ---------- output: owslib.wps.Output convert_objects: bool If True, object_converters will be used. """ # Get the data for recognized types. if output.data: data_type = output.dataType if data_type is None: data_type = self._wps_outputs[output.identifier].dataType data = [utils.from_owslib(d, data_type) for d in output.data] return delist(data) if convert_objects: return convert(output, self._path, self._converters, self.auth.verify) else: return output.reference