from collections import namedtuple
from itertools import cycle, chain
import logging
import re
from rdflib.term import URIRef
from rdflib.namespace import RDF, RDFS
from . import RDF_CONTEXT, RDFS_CONTEXT
from .dataobject import (BaseDataObject,
This,
ObjectProperty,
UnionProperty)
from .dataobject_property import UnionProperty as UnionPropertyType
L = logging.getLogger(__name__)
CONTAINER_MEMBERSHIP_PROPERTY_RE = re.compile(r'^_([1-9]+[0-9]*)$')
[docs]class Container(BaseDataObject):
'''
Base class for rdfs:Containers
Example (`Bag`, `Alt`, and `Seq` have the same operations)::
>>> nums = Bag(ident="http://example.org/fav-numbers")
>>> nums[1] = 42
>>> nums.set_member(2, 415)
owmeta_core.statement.Statement(...)
>>> nums._3(15)
owmeta_core.statement.Statement(...)
>>> nums._2.index
2
>>> nums._1()
42
>>> nums[2]
415
>>> nums._2(6)
owmeta_core.statement.Statement(...)
>>> nums[2]
6
Note that because the set of entries in ``rdfs:Container`` is not bounded, iteration
over `Containers <Container>` is not bounded. To iterate over a `Container`, it is
recommended to add some external bound with `itertools.islice` or something like
``zip(range(bound), container)``. Where values have not been set, `None` will be
returned.
'''
rdf_type = RDFS.Container
class_context = RDFS_CONTEXT
def __getitem__(self, index):
prop = getattr(self, f'_{index}', None)
if prop is None:
return None
item_to_return = None
extra_items = None
for item in prop.get():
if item_to_return is None:
item_to_return = item
elif item_to_return == item:
pass
elif extra_items is None:
extra_items = [item_to_return, item]
else:
extra_items.append(item)
if extra_items:
# Unlike regular Property access, there's generally not a presumption that
# one of many values can be selected arbitrarily. Also, an iteration that
# sometimes does what you expect and sometimes doesn't is really frustrating.
raise ContainerValueConflict(index, extra_items)
return item_to_return
def __getattr__(self, name):
md = CONTAINER_MEMBERSHIP_PROPERTY_RE.match(name)
if md:
try:
prop = super().__getattribute__(name)
except AttributeError:
prop = None
if prop is None:
prop = self.attach_property(ContainerMembershipProperty, index=int(md.group(1)))
return prop
raise AttributeError(name)
def __setitem__(self, index, item):
self.set_member(index, item)
[docs] def set_member(self, index, item):
'''
Set a member at the given index.
If an existing value is set at the given index, then it will be replaced. Note
that, as described in the `RDF Primer`_, there is no well-formedness guarantee: in
particular, some other instance of a container may declare a different value at
the same index.
.. _RDF Primer: https://www.w3.org/TR/rdf-primer/#collections
'''
prop = getattr(self, f'_{index}', None)
if isinstance(prop, ContainerMembershipProperty):
return prop(item)
raise Exception(f'Non-ContainerMembershipProperty set at _{index}: {prop}')
class ContainerValueConflict(Exception):
def __init__(self, index, items):
super().__init__(f'More than one item is declared at index {index}. Items: {items!r}')
self.index = index
self.items = items
[docs]class ContainerMembershipProperty(UnionPropertyType):
'''
Base class for container membership properties like ``rdf:_1``, ``rdf:_2``, ...
'''
class_context = RDFS_CONTEXT
owner_type = BaseDataObject
rdf_type = RDFS.ContainerMembershipProperty
def __init__(self, index, **kwargs):
super().__init__(**kwargs)
if isinstance(index, str):
md = CONTAINER_MEMBERSHIP_PROPERTY_RE.match(index)
if not md:
raise ValueError(f'Expected an integer > 0. Received {index!r}.')
index = int(md.group(1))
elif isinstance(index, int):
if index <= 0:
raise ValueError('Expected an integer > 0')
else:
raise ValueError('Expected an integer > 0')
self.__index = index
name = f'_{index}'
try:
self.link = RDF[name]
except KeyError as e:
raise ValueError('Expected an integer > 0') from e
self.linkName = name
# We need to add the (..., rdf:type, rdfs:ContainerMembershipProperty) triples to
# do proper entailment, ultimately of the rdfs:subPropertyOf(rdfs:member)
# relationship.
type(self).rdf_type_class.contextualize(self.context)(ident=self.link)
@property
def index(self):
return self.__index
[docs]class Bag(Container):
"""
A convenience class for working with a rdf:Bag
"""
rdf_type = RDF.Bag
class_context = RDF_CONTEXT
class Alt(Container):
rdf_type = RDF.Alt
class_context = RDF_CONTEXT
class Seq(Container):
rdf_type = RDF.Seq
class_context = RDF_CONTEXT
class List(BaseDataObject):
class_context = RDF_CONTEXT
rdf_type = RDF.List
first = UnionProperty(link=RDF.first)
rest = ObjectProperty(link=RDF.rest, value_type=This)
@classmethod
def from_sequence(cls, sequence, ident=None):
first = cls.nil
last = None
for i, s in enumerate(sequence):
this = cls(first=s)
if first is cls.nil:
first = this
if ident:
this.identifier = URIRef(ident)
else:
if ident:
this.identifier = URIRef(ident + f"#_{i}")
if last is not None:
last.rest(this)
last = this
last.rest(cls.nil)
return first
def load_dataobject_sequences(self):
'''
Loads the sequences of `rest` values starting from this node.
If this node is undefined, then this method generates *all* lists, including
sub-lists, in the configured RDF graph. Also, there is no guarantee that there is
just *one* list starting from this node.
'''
return self._load_dataobject_sequences()
def _load_dataobject_sequences(self, seen=None):
if seen is None:
seen = list()
if self.idl == type(self).nil.identifier:
yield []
return
for m in self.load():
rests = m.rest.get()
if m.identifier in seen:
# Maybe a loop was made on purpose, so no warning, but still worth noting.
L.info('Loop detected: %s in %s', self, seen)
yield _Loop((), m)
# We can drop here since there's only going to be one result except for
# when we initially loaded from something without an identifier, but if
# there's something in `seen`, then we've already passed that case. You
# *could* pass something in to `seen` on the initial call, but that isn't
# a part of the *public* interface
return
seen.append(m.identifier)
hit = False
for rest in rests:
for rest_lst in rest._load_dataobject_sequences(seen):
hit = True
if isinstance(rest_lst, _Loop):
if rest_lst.loop.identifier == m.identifier:
yield cycle((m,) + rest_lst.parts)
else:
yield _Loop((m,) + rest_lst.parts, rest_lst.loop)
elif isinstance(rest_lst, (chain, cycle)):
yield chain((m,), rest_lst)
else:
yield [m] + rest_lst
seen.pop()
if not hit:
L.warning('List %s was not properly terminated', m)
yield [m]
def load_sequences(self):
for m in self.load_dataobject_sequences():
if isinstance(m, list):
yield [x.first() for x in m]
else:
yield (x.first() for x in m)
_Loop = namedtuple('Loop', ('parts', 'loop'))
List.nil = List.definition_context(List)(ident=RDF.nil)