"""Traversal of a datastructure."""
import itertools
from typing import Any, Iterable, List, NamedTuple, Optional, Sequence, Union, cast
import odin
from odin.exceptions import InvalidPathError, MultipleMatchesError, NoMatchError
from odin.resources import ResourceBase
from odin.utils import getmeta
class _NotSupplied:
"""Placeholder"""
__slots__ = ()
NotSupplied = _NotSupplied()
NotSuppliedString = Union[str, _NotSupplied]
class PathAtom(NamedTuple):
"""Atom within a traversal path."""
@classmethod
def split(cls, atom: str) -> "PathAtom":
# Index/Key into attribute
if "[" in atom:
attr, _, key = atom.rstrip("]").partition("[")
return cls(key, NotSupplied, attr)
# Filter attributes
if "{" in atom:
attr, _, kv = atom.rstrip("}").partition("{")
key, _, value = kv.partition("=")
return cls(key, value, attr)
# Basic attribute
return cls(NotSupplied, NotSupplied, atom)
@classmethod
def create(
cls,
attr: str = "",
key: NotSuppliedString = NotSupplied,
value: NotSuppliedString = NotSupplied,
) -> "PathAtom":
"""Create a new PathAtom."""
return cls(key, value, attr)
key: NotSuppliedString
value: NotSuppliedString
attr: str
def __repr__(self):
return f"<PathAtom: {self}>"
def __str__(self):
key, value, attr = self
if key is NotSupplied:
return attr
if value is NotSupplied:
return f"{attr}[{key}]"
return f"{attr}{{{key}={value}}}"
@property
def is_indexed(self) -> bool:
"""Indexes into a attribute."""
key, value, _ = self
return key is not NotSupplied and value is NotSupplied
@property
def is_filter(self) -> bool:
"""This is a filter atom."""
_, value, _ = self
return value is not NotSupplied
def extract_element(self, resource: ResourceBase) -> Any:
"""Extract an element from a resource instance"""
key, value, attr = self
try:
field = getmeta(resource).field_map[attr]
except KeyError:
raise InvalidPathError(self, f"Unknown field {attr!r}") from None
element = field.value_from_object(resource)
if key is NotSupplied:
# No additional lookup required
return element
elif value is NotSupplied:
# Index or key into element
key = cast(odin.CompositeField, field).key_to_python(key)
try:
return element[key]
except LookupError:
raise NoMatchError(
self, f"Could not find index {key!r} in {field}."
) from None
else:
# Filter elements
if isinstance(element, dict):
element = element.values()
values = tuple(r for r in element if getattr(r, key) == value)
if len(values) == 0:
raise NoMatchError(
self,
f"Filter matched no values; {key!r} == {value!r} in {field}.",
)
if len(values) > 1:
raise MultipleMatchesError(
self,
f"Filter matched multiple values; {key!r} == {value!r}.",
)
return values[0]
class TraversalPath(tuple, Sequence[PathAtom]):
"""A path through a resource structure."""
@classmethod
def parse(cls, path: Union["TraversalPath", str]) -> Optional["TraversalPath"]:
"""Parse a traversal path string."""
if isinstance(path, TraversalPath):
return path
if isinstance(path, str):
return cls(PathAtom.split(a) for a in path.split(".")) if path else cls()
__slots__ = ()
def __repr__(self):
return f"<TraversalPath: {self}>"
def __str__(self) -> str:
return ".".join(str(a) for a in self)
def __add__(self, other: Union["TraversalPath", str, PathAtom]) -> "TraversalPath":
"""Join paths together."""
if isinstance(other, TraversalPath):
return TraversalPath(itertools.chain(self, other))
if isinstance(other, str):
other = PathAtom.split(other)
if isinstance(other, PathAtom):
return TraversalPath(itertools.chain(self, (other,)))
raise TypeError(f"Cannot add '{other}' to a path.")
@property
def parent(self) -> Optional["TraversalPath"]:
"""Get parent item"""
if len(self) > 1:
return TraversalPath(self[:-1])
def iter_resource(self, root_resource: ResourceBase):
"""Iterate over a resource document. Yielding each parent."""
current = root_resource
yield current
for atom in self:
current = atom.extract_element(current)
yield current
def get_value(self, root_resource: ResourceBase):
"""Get a value from a resource document."""
return tuple(self.iter_resource(root_resource))[-1]
[docs]
class ResourceTraversalIterator:
"""Iterator for traversing (walking) a resource structure, including traversing
composite fields to fully navigate a resource tree.
This class has hooks that can be used by subclasses to customise the behaviour of the class:
- *on_enter* - Called after entering a new resource.
- *on_exit* - Called after exiting a resource.
"""
__slots__ = ("_resource_iters", "_field_iters", "_path", "_resource_stack")
def __init__(self, resource: Union[ResourceBase, Sequence[ResourceBase]]):
"""Initialise instance with the initial resource or sequence of resources."""
if isinstance(resource, (list, tuple)):
# Stack of resource iterators (starts initially with entries from the list)
self._resource_iters = [iter([(i, r) for i, r in enumerate(resource)])]
else:
# Stack of resource iterators (starts initially with single entry of the root resource)
self._resource_iters = [iter([(None, resource)])]
# Stack of composite fields, found on each resource, each composite field is interrogated for resources.
self._field_iters = []
# The "path" to the current resource.
self._path: List[PathAtom] = [PathAtom.create()]
self._resource_stack = [None]
def __iter__(self) -> Iterable[ResourceBase]:
"""Obtain an iterable instance."""
return self
def __next__(self) -> ResourceBase:
"""Get next resource instance."""
if not self._resource_iters:
raise StopIteration()
if self._field_iters:
# Check if the last entry in the field stack has any unprocessed fields.
if self._field_iters[-1]:
# Select the very last field in the field stack.
field = self._field_iters[-1][0]
# Request a list of resources along with keys from the composite field.
self._resource_iters.append(
field.item_iter_from_object(self.current_resource)
)
# Update the path
self._path.append(PathAtom.create(field.attname))
self._resource_stack.append(None)
# Remove the field from the list (and remove this field entry if it has been emptied)
self._field_iters[-1].pop(0)
else:
self._field_iters.pop()
if self.current_resource and hasattr(self, "on_exit"):
self.on_exit()
try:
key, next_resource = next(self._resource_iters[-1])
except StopIteration:
# End of the current list of resources pop this list off and get the next list.
self._path.pop()
self._resource_iters.pop()
self._resource_stack.pop()
return next(self)
else:
next_meta = getmeta(next_resource)
# If we have a key (ie DictOf, ListOf composite fields) update the path key field.
if key is not None:
_, value, field = self._path[-1]
if next_meta.key_field:
value = next_meta.key_field.value_from_object(next_resource)
key = next_meta.key_field.attname
self._path[-1] = PathAtom(key, value, field)
# Get list of any composite fields for this resource (this is a cached field).
self._field_iters.append(list(next_meta.composite_fields))
# self.current_resource = next_resource
self._resource_stack[-1] = next_resource
if hasattr(self, "on_enter"):
self.on_enter()
return next_resource
@property
def path(self) -> TraversalPath:
"""Path to the current resource node in the tree structure.
This path can be used to later traverse the tree structure to find get to the specified resource.
"""
# The path is offset by one as the path includes the root to simplify next method.
return TraversalPath(self._path[1:])
@property
def depth(self) -> int:
"""Depth of the current resource in the tree structure."""
return len(self._path) - 1
@property
def current_resource(self) -> Optional[ResourceBase]:
"""The current resource being traversed."""
if self._resource_stack:
return self._resource_stack[-1]