Source code for odin.traversal

"""Traversal of a datastructure."""

import itertools
from collections.abc import Iterable, Sequence
from typing import Any, NamedTuple, Optional, Union, cast

import odin
from odin.exceptions import InvalidPathError, MultipleMatchesError, NoMatchError
from odin.resources import ResourceBase
from odin.utils import getmeta


class _NotSupplied:
    """Placeholder"""

    __slots__ = ()


NotSupplied = _NotSupplied()
NotSuppliedString = str | _NotSupplied


class PathAtom(NamedTuple):
    """Atom within a traversal path."""

    @classmethod
    def split(cls, atom: str) -> "PathAtom":
        # Index/Key into attribute
        if "[" in atom:
            attr, _, key = atom.rstrip("]").partition("[")
            return cls(key, NotSupplied, attr)

        # Filter attributes
        if "{" in atom:
            attr, _, kv = atom.rstrip("}").partition("{")
            key, _, value = kv.partition("=")
            return cls(key, value, attr)

        # Basic attribute
        return cls(NotSupplied, NotSupplied, atom)

    @classmethod
    def create(
        cls,
        attr: str = "",
        key: NotSuppliedString = NotSupplied,
        value: NotSuppliedString = NotSupplied,
    ) -> "PathAtom":
        """Create a new PathAtom."""
        return cls(key, value, attr)

    key: NotSuppliedString
    value: NotSuppliedString
    attr: str

    def __repr__(self):
        return f"<PathAtom: {self}>"

    def __str__(self):
        key, value, attr = self

        if key is NotSupplied:
            return attr

        if value is NotSupplied:
            return f"{attr}[{key}]"

        return f"{attr}{{{key}={value}}}"

    @property
    def is_indexed(self) -> bool:
        """Indexes into a attribute."""
        key, value, _ = self
        return key is not NotSupplied and value is NotSupplied

    @property
    def is_filter(self) -> bool:
        """This is a filter atom."""
        _, value, _ = self
        return value is not NotSupplied

    def extract_element(self, resource: ResourceBase) -> Any:
        """Extract an element from a resource instance"""
        key, value, attr = self

        try:
            field = getmeta(resource).field_map[attr]
        except KeyError:
            raise InvalidPathError(self, f"Unknown field {attr!r}") from None
        element = field.value_from_object(resource)

        if key is NotSupplied:
            # No additional lookup required
            return element

        elif value is NotSupplied:
            # Index or key into element
            key = cast(odin.CompositeField, field).key_to_python(key)
            try:
                return element[key]
            except LookupError:
                raise NoMatchError(
                    self, f"Could not find index {key!r} in {field}."
                ) from None

        else:
            # Filter elements
            if isinstance(element, dict):
                element = element.values()

            values = tuple(r for r in element if getattr(r, key) == value)
            if len(values) == 0:
                raise NoMatchError(
                    self,
                    f"Filter matched no values; {key!r} == {value!r} in {field}.",
                )

            if len(values) > 1:
                raise MultipleMatchesError(
                    self,
                    f"Filter matched multiple values; {key!r} == {value!r}.",
                )

            return values[0]


class TraversalPath(tuple, Sequence[PathAtom]):
    """A path through a resource structure."""

    @classmethod
    def parse(cls, path: Union["TraversalPath", str]) -> Optional["TraversalPath"]:
        """Parse a traversal path string."""
        if isinstance(path, TraversalPath):
            return path
        if isinstance(path, str):
            return cls(PathAtom.split(a) for a in path.split(".")) if path else cls()

    __slots__ = ()

    def __repr__(self):
        return f"<TraversalPath: {self}>"

    def __str__(self) -> str:
        return ".".join(str(a) for a in self)

    def __add__(self, other: Union["TraversalPath", str, PathAtom]) -> "TraversalPath":
        """Join paths together."""
        if isinstance(other, TraversalPath):
            return TraversalPath(itertools.chain(self, other))

        if isinstance(other, str):
            other = PathAtom.split(other)

        if isinstance(other, PathAtom):
            return TraversalPath(itertools.chain(self, (other,)))

        raise TypeError(f"Cannot add '{other}' to a path.")

    @property
    def parent(self) -> Optional["TraversalPath"]:
        """Get parent item"""
        if len(self) > 1:
            return TraversalPath(self[:-1])

    def iter_resource(self, root_resource: ResourceBase):
        """Iterate over a resource document. Yielding each parent."""
        current = root_resource
        yield current

        for atom in self:
            current = atom.extract_element(current)
            yield current

    def get_value(self, root_resource: ResourceBase):
        """Get a value from a resource document."""
        return tuple(self.iter_resource(root_resource))[-1]


[docs] class ResourceTraversalIterator: """Iterator for traversing (walking) a resource structure, including traversing composite fields to fully navigate a resource tree. This class has hooks that can be used by subclasses to customise the behaviour of the class: - *on_enter* - Called after entering a new resource. - *on_exit* - Called after exiting a resource. """ __slots__ = ("_resource_iters", "_field_iters", "_path", "_resource_stack") def __init__(self, resource: ResourceBase | Sequence[ResourceBase]): """Initialise instance with the initial resource or sequence of resources.""" if isinstance(resource, list | tuple): # Stack of resource iterators (starts initially with entries from the list) self._resource_iters = [iter([(i, r) for i, r in enumerate(resource)])] else: # Stack of resource iterators (starts initially with single entry of the root resource) self._resource_iters = [iter([(None, resource)])] # Stack of composite fields, found on each resource, each composite field is interrogated for resources. self._field_iters = [] # The "path" to the current resource. self._path: list[PathAtom] = [PathAtom.create()] self._resource_stack = [None] def __iter__(self) -> Iterable[ResourceBase]: """Obtain an iterable instance.""" return self def __next__(self) -> ResourceBase: """Get next resource instance.""" if not self._resource_iters: raise StopIteration() if self._field_iters: # Check if the last entry in the field stack has any unprocessed fields. if self._field_iters[-1]: # Select the very last field in the field stack. field = self._field_iters[-1][0] # Request a list of resources along with keys from the composite field. self._resource_iters.append( field.item_iter_from_object(self.current_resource) ) # Update the path self._path.append(PathAtom.create(field.attname)) self._resource_stack.append(None) # Remove the field from the list (and remove this field entry if it has been emptied) self._field_iters[-1].pop(0) else: self._field_iters.pop() if self.current_resource and hasattr(self, "on_exit"): self.on_exit() try: key, next_resource = next(self._resource_iters[-1]) except StopIteration: # End of the current list of resources pop this list off and get the next list. self._path.pop() self._resource_iters.pop() self._resource_stack.pop() return next(self) else: next_meta = getmeta(next_resource) # If we have a key (ie DictOf, ListOf composite fields) update the path key field. if key is not None: _, value, field = self._path[-1] if next_meta.key_field: value = next_meta.key_field.value_from_object(next_resource) key = next_meta.key_field.attname self._path[-1] = PathAtom(key, value, field) # Get list of any composite fields for this resource (this is a cached field). self._field_iters.append(list(next_meta.composite_fields)) # self.current_resource = next_resource self._resource_stack[-1] = next_resource if hasattr(self, "on_enter"): self.on_enter() return next_resource @property def path(self) -> TraversalPath: """Path to the current resource node in the tree structure. This path can be used to later traverse the tree structure to find get to the specified resource. """ # The path is offset by one as the path includes the root to simplify next method. return TraversalPath(self._path[1:]) @property def depth(self) -> int: """Depth of the current resource in the tree structure.""" return len(self._path) - 1 @property def current_resource(self) -> ResourceBase | None: """The current resource being traversed.""" if self._resource_stack: return self._resource_stack[-1]