Source code for rpy2.rinterface_lib.sexp

"""Base definitions for R objects."""

import abc
import collections.abc
from collections import OrderedDict
import enum
import typing
from rpy2.rinterface_lib import embedded
from rpy2.rinterface_lib import memorymanagement
from rpy2.rinterface_lib import openrlib
import rpy2.rinterface_lib._rinterface_capi as _rinterface
from rpy2.rinterface_lib._rinterface_capi import _evaluated_promise
from rpy2.rinterface_lib._rinterface_capi import SupportsSEXP
from rpy2.rinterface_lib import conversion
from rpy2.rinterface_lib.conversion import _cdata_res_to_rinterface
from rpy2.rinterface_lib import na_values


class Singleton(type):

    _instances: typing.Dict[typing.Type['Singleton'], 'Singleton'] = {}

    def __call__(cls, *args, **kwargs):
        instances = cls._instances
        if cls not in instances:
            instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return instances[cls]


class SingletonABC(Singleton, abc.ABCMeta):
    pass


class RTYPES(enum.IntEnum):
    """Native R types as defined in R's C API."""

    NILSXP = openrlib.rlib.NILSXP
    SYMSXP = openrlib.rlib.SYMSXP
    LISTSXP = openrlib.rlib.LISTSXP
    CLOSXP = openrlib.rlib.CLOSXP
    ENVSXP = openrlib.rlib.ENVSXP
    PROMSXP = openrlib.rlib.PROMSXP
    LANGSXP = openrlib.rlib.LANGSXP
    SPECIALSXP = openrlib.rlib.SPECIALSXP
    BUILTINSXP = openrlib.rlib.BUILTINSXP
    CHARSXP = openrlib.rlib.CHARSXP
    LGLSXP = openrlib.rlib.LGLSXP
    INTSXP = openrlib.rlib.INTSXP
    REALSXP = openrlib.rlib.REALSXP
    CPLXSXP = openrlib.rlib.CPLXSXP
    STRSXP = openrlib.rlib.STRSXP
    DOTSXP = openrlib.rlib.DOTSXP
    ANYSXP = openrlib.rlib.ANYSXP
    VECSXP = openrlib.rlib.VECSXP
    EXPRSXP = openrlib.rlib.EXPRSXP
    BCODESXP = openrlib.rlib.BCODESXP
    EXTPTRSXP = openrlib.rlib.EXTPTRSXP
    WEAKREFSXP = openrlib.rlib.WEAKREFSXP
    RAWSXP = openrlib.rlib.RAWSXP
    S4SXP = openrlib.rlib.S4SXP

    NEWSXP = openrlib.rlib.NEWSXP
    FREESXP = openrlib.rlib.FREESXP

    FUNSXP = openrlib.rlib.FUNSXP


[docs]class Sexp(SupportsSEXP): """Base class for R objects. The name of a class corresponds to the name SEXP used in R's C API.""" __slots__ = ('_sexpobject', ) def __init__(self, sexp: typing.Union[SupportsSEXP, '_rinterface.SexpCapsule', '_rinterface.UninitializedRCapsule']): if isinstance(sexp, SupportsSEXP): self._sexpobject = sexp.__sexp__ elif isinstance(sexp, _rinterface.CapsuleBase): self._sexpobject = sexp else: raise ValueError( 'The constructor must be called ' 'with an instance of rpy2.rinterface.Sexp ' 'or an instance of ' 'rpy2.rinterface._rinterface.SexpCapsule') def __repr__(self) -> str: return super().__repr__() + (' [%s]' % self.typeof) @property def __sexp__(self) -> '_rinterface.CapsuleBase': """Access to the underlying C pointer to the R object. When assigning a new SexpCapsule to this attribute, the R C-level type of the new capsule must be equal to the type of the old capsule. A ValueError is raised otherwise.""" return self._sexpobject @__sexp__.setter def __sexp__(self, value: '_rinterface.CapsuleBase') -> None: assert isinstance(value, _rinterface.SexpCapsule) if value.typeof != self.__sexp__.typeof: raise ValueError('New capsule type mismatch: %s' % RTYPES(value.typeof)) self._sexpobject = value @property def __sexp_refcount__(self) -> int: """Count the number of independent Python references to the underlying R object.""" return _rinterface._R_PRESERVED[ _rinterface.get_rid(self.__sexp__._cdata) ] def __getstate__(self) -> bytes: with memorymanagement.rmemory() as rmemory: ser = rmemory.protect( _rinterface.serialize( self.__sexp__._cdata, globalenv.__sexp__._cdata) ) n = openrlib.rlib.Rf_xlength(ser) res = bytes(_rinterface.ffi.buffer(openrlib.rlib.RAW(ser), n)) return res def __setstate__(self, state: bytes) -> None: self._sexpobject = unserialize(state) @property def rclass(self) -> 'StrSexpVector': """Get or set the R "class" attribute for the object.""" return rclass_get(self.__sexp__) @rclass.setter def rclass(self, value: 'typing.Union[StrSexpVector, str]'): rclass_set(self.__sexp__, value) @property def rid(self) -> int: """ID of the underlying R object (memory address).""" return _rinterface.get_rid(self.__sexp__._cdata) @property def typeof(self) -> RTYPES: return RTYPES(_rinterface._TYPEOF(self.__sexp__._cdata)) @property def named(self) -> int: return _rinterface._NAMED(self.__sexp__._cdata) @conversion._cdata_res_to_rinterface def list_attrs(self) -> 'typing.Union[StrSexpVector, str]': return _rinterface._list_attrs(self.__sexp__._cdata) @conversion._cdata_res_to_rinterface def do_slot(self, name: str) -> None: _rinterface._assert_valid_slotname(name) cchar = conversion._str_to_cchar(name) with memorymanagement.rmemory() as rmemory: name_r = rmemory.protect(openrlib.rlib.Rf_install(cchar)) if not _rinterface._has_slot(self.__sexp__._cdata, name_r): raise LookupError(name) res = openrlib.rlib.R_do_slot(self.__sexp__._cdata, name_r) return res
[docs] def do_slot_assign(self, name: str, value) -> None: _rinterface._assert_valid_slotname(name) cchar = conversion._str_to_cchar(name) with memorymanagement.rmemory() as rmemory: name_r = rmemory.protect(openrlib.rlib.Rf_install(cchar)) cdata = rmemory.protect(conversion._get_cdata(value)) openrlib.rlib.R_do_slot_assign(self.__sexp__._cdata, name_r, cdata)
@conversion._cdata_res_to_rinterface def get_attrib(self, name: str) -> 'Sexp': res = openrlib.rlib.Rf_getAttrib(self.__sexp__._cdata, conversion._str_to_charsxp(name)) return res # TODO: deprecate this (and implement __eq__) ?
[docs] def rsame(self, sexp) -> bool: if isinstance(sexp, Sexp): return self.__sexp__._cdata == sexp.__sexp__._cdata elif isinstance(sexp, _rinterface.SexpCapsule): return sexp._cdata == sexp._cdata else: raise ValueError('Not an R object.')
@property def names(self) -> 'Sexp': return baseenv['names'](self) @names.setter def names(self, value) -> None: if not isinstance(value, StrSexpVector): raise ValueError('The new names should be a StrSexpVector.') openrlib.rlib.Rf_namesgets( self.__sexp__._cdata, value.__sexp__._cdata) @property @conversion._cdata_res_to_rinterface def names_from_c_attribute(self) -> 'Sexp': return openrlib.rlib.Rf_getAttrib( self.__sexp__._cdata, openrlib.rlib.R_NameSymbol)
class NULLType(Sexp, metaclass=SingletonABC): """A singleton class for R's NULL.""" def __init__(self): if embedded.isready(): tmp = Sexp( _rinterface.UnmanagedSexpCapsule( openrlib.rlib.R_NilValue ) ) else: tmp = Sexp(_rinterface.UninitializedRCapsule(RTYPES.NILSXP.value)) super().__init__(tmp) def __bool__(self) -> bool: """This is always False.""" return False @property def __sexp__(self) -> _rinterface.CapsuleBase: return self._sexpobject @property def rid(self) -> int: return self._sexpobject.rid class CETYPE(enum.Enum): """Character encodings for R string.""" CE_NATIVE = openrlib.rlib.CE_NATIVE CE_UTF8 = openrlib.rlib.CE_UTF8 CE_LATIN1 = openrlib.rlib.CE_LATIN1 CE_BYTES = openrlib.rlib.CE_BYTES CE_SYMBOL = openrlib.rlib.CE_SYMBOL CE_ANY = openrlib.rlib.CE_ANY class NCHAR_TYPE(enum.Enum): """Type of string scalar in R.""" Bytes = 0 Chars = 1 Width = 2 class CharSexp(Sexp): """R's internal (C API-level) scalar for strings.""" _R_TYPE = openrlib.rlib.CHARSXP _NCHAR_MSG = openrlib.ffi.new('char []', b'rpy2.rinterface.CharSexp.nchar') @property def encoding(self) -> CETYPE: return CETYPE( openrlib.rlib.Rf_getCharCE(self.__sexp__._cdata) ) def nchar(self, what: NCHAR_TYPE = NCHAR_TYPE.Bytes) -> int: # TODO: nchar_type is not parsed properly by cffi ? return openrlib.rlib.R_nchar(self.__sexp__._cdata, what.value, openrlib.rlib.FALSE, openrlib.rlib.FALSE, self._NCHAR_MSG) class SexpEnvironment(Sexp): """Proxy for an R "environment" object. An R "environment" object can be thought of as a mix of a mapping (like a `dict`) and a scope. To make it more "Pythonic", both aspects are kept separate and the method `__getitem__` will get an item as it would for a Python `dict` while the method `find` will get an item as if it was a scope. As soon as R is initialized the following main environments become available to the user: - `globalenv`: The "workspace" for the current R process. This can be thought of as when `__name__ == '__main__'` in Python. - `baseenv`: The namespace of R's "base" package. """ @_cdata_res_to_rinterface @_evaluated_promise def find(self, key: str, wantfun: bool = False) -> Sexp: """Find an item, starting with this R environment. Raises a `KeyError` if the key cannot be found. This method is called `find` because it is somewhat different from the method :meth:`get` in Python mappings such :class:`dict`. This is looking for a key across enclosing environments, returning the first key found.""" if not isinstance(key, str): raise TypeError('The key must be a non-empty string.') elif not len(key): raise ValueError('The key must be a non-empty string.') with memorymanagement.rmemory() as rmemory: key_cchar = conversion._str_to_cchar(key, 'utf-8') symbol = rmemory.protect( openrlib.rlib.Rf_install(key_cchar) ) if wantfun: # One would expect this to be like # res = _rinterface._findfun(symbol, self.__sexp__._cdata) # but R's findfun will segfault if the symbol is not in # the environment. :/ rho = self while rho.rid != emptyenv.rid: res = rmemory.protect( _rinterface.findvar_in_frame_wrap( rho.__sexp__._cdata, symbol ) ) if _rinterface._TYPEOF(res) in (openrlib.rlib.CLOSXP, openrlib.rlib.BUILTINSXP): break # TODO: move check of R_UnboundValue to _rinterface ? res = openrlib.rlib.R_UnboundValue rho = rho.enclos else: res = _rinterface._findvar(symbol, self.__sexp__._cdata) # TODO: move check of R_UnboundValue to _rinterface ? if res == openrlib.rlib.R_UnboundValue: raise KeyError("'%s' not found" % key) return res @_cdata_res_to_rinterface @_evaluated_promise def __getitem__(self, key: str) -> typing.Any: if not isinstance(key, str): raise TypeError('The key must be a non-empty string.') elif not len(key): raise ValueError('The key must be a non-empty string.') with memorymanagement.rmemory() as rmemory: key_cchar = conversion._str_to_cchar(key) symbol = rmemory.protect( openrlib.rlib.Rf_install(key_cchar) ) res = rmemory.protect( _rinterface.findvar_in_frame_wrap( self.__sexp__._cdata, symbol ) ) # TODO: move check of R_UnboundValue to _rinterface if res == openrlib.rlib.R_UnboundValue: raise KeyError("'%s' not found" % key) return res def __setitem__(self, key: str, value) -> None: # TODO: move body to _rinterface-level function if not isinstance(key, str): raise TypeError('The key must be a non-empty string.') elif not len(key): raise ValueError('The key must be a non-empty string.') if (self.__sexp__._cdata == openrlib.rlib.R_BaseEnv) or \ (self.__sexp__._cdata == openrlib.rlib.R_EmptyEnv): raise ValueError('Cannot remove variables from the base or ' 'empty environments.') # TODO: call to Rf_duplicate needed ? with memorymanagement.rmemory() as rmemory: key_cchar = conversion._str_to_cchar(key) symbol = rmemory.protect( openrlib.rlib.Rf_install(key_cchar) ) cdata = rmemory.protect(conversion._get_cdata(value)) cdata_copy = rmemory.protect( openrlib.rlib.Rf_duplicate(cdata) ) openrlib.rlib.Rf_defineVar(symbol, cdata_copy, self.__sexp__._cdata) def __len__(self) -> int: with memorymanagement.rmemory() as rmemory: symbols = rmemory.protect( openrlib.rlib.R_lsInternal(self.__sexp__._cdata, openrlib.rlib.TRUE) ) n = openrlib.rlib.Rf_xlength(symbols) return n def __delitem__(self, key: str) -> None: # Testing that key is a non-empty string is implicitly # performed when checking that the key is in the environment. if key not in self: raise KeyError("'%s' not found" % key) if self.__sexp__ == baseenv.__sexp__: raise ValueError('Values from the R base environment ' 'cannot be removed.') # TODO: also check it is not R_EmpyEnv or R_BaseNamespace if self.is_locked(): ValueError('Cannot remove an item from a locked ' 'environment.') with memorymanagement.rmemory() as rmemory: key_cdata = rmemory.protect( openrlib.rlib.Rf_mkString(conversion._str_to_cchar(key)) ) _rinterface._remove(key_cdata, self.__sexp__._cdata, openrlib.rlib.Rf_ScalarLogical( openrlib.rlib.FALSE)) @_cdata_res_to_rinterface def frame(self) -> 'typing.Union[NULLType, SexpEnvironment]': """Get the parent frame of the environment.""" return openrlib.rlib.FRAME(self.__sexp__._cdata) @property @_cdata_res_to_rinterface def enclos(self) -> 'typing.Union[NULLType, SexpEnvironment]': """Get or set the enclosing environment.""" return openrlib.rlib.ENCLOS(self.__sexp__._cdata) @enclos.setter def enclos(self, value: 'SexpEnvironment') -> None: assert isinstance(value, SexpEnvironment) openrlib.rlib.SET_ENCLOS(self.__sexp__._cdata, value.__sexp__._cdata) def keys(self) -> typing.Generator[str, None, None]: """Generator over the keys (symbols) in the environment.""" with memorymanagement.rmemory() as rmemory: symbols = rmemory.protect( openrlib.rlib.R_lsInternal(self.__sexp__._cdata, openrlib.rlib.TRUE) ) n = openrlib.rlib.Rf_xlength(symbols) res = [] for i in range(n): res.append(_rinterface._string_getitem(symbols, i)) for e in res: yield e def __iter__(self) -> typing.Generator[str, None, None]: """See method `keys()`.""" return self.keys() def is_locked(self) -> bool: return openrlib.rlib.R_EnvironmentIsLocked( self.__sexp__._cdata) _UNINIT_CAPSULE_ENV = _rinterface.UninitializedRCapsule(RTYPES.ENVSXP.value) emptyenv = SexpEnvironment(_UNINIT_CAPSULE_ENV) baseenv = SexpEnvironment(_UNINIT_CAPSULE_ENV) globalenv = SexpEnvironment(_UNINIT_CAPSULE_ENV) # TODO: move to _rinterface-level function (as ABI / API compatibility # will have API-defined code compile for efficiency). def _populate_r_vector(iterable, r_vector, set_elt, cast_value): for i, v in enumerate(iterable): set_elt(r_vector, i, cast_value(v)) VT = typing.TypeVar('VT', bound='SexpVector')
[docs]class SexpVector(Sexp, metaclass=abc.ABCMeta): """Base abstract class for R vector objects. R vector objects are, at the C level, essentially C arrays wrapped in the general structure for R objects.""" @property @abc.abstractmethod def _R_TYPE(self): pass @property @abc.abstractmethod def _R_SIZEOF_ELT(self): pass @staticmethod @abc.abstractmethod def _CAST_IN(o): pass @staticmethod @abc.abstractmethod def _R_SET_VECTOR_ELT(x, i, v): pass @staticmethod @abc.abstractmethod def _R_VECTOR_ELT(x, i): pass @staticmethod @abc.abstractmethod def _R_GET_PTR(o): pass def __init__(self, obj: typing.Union[_rinterface.SexpCapsule, collections.abc.Sized]): if isinstance(obj, Sexp) or isinstance(obj, _rinterface.SexpCapsule): super().__init__(obj) elif isinstance(obj, collections.abc.Sized): super().__init__(self.from_object(obj).__sexp__) else: raise TypeError('The constructor must be called ' 'with an instance of ' 'rpy2.rinterface.Sexp ' 'or an instance of ' 'rpy2.rinterface._rinterface.SexpCapsule') @classmethod @_cdata_res_to_rinterface def from_iterable(cls, iterable, populate_func=None, set_elt=None, cast_value=None) -> VT: """Create an R vector/array from an iterable.""" if not embedded.isready(): raise embedded.RNotReadyError('Embedded R is not ready to use.') if populate_func is None: populate_func = _populate_r_vector if set_elt is None: set_elt = cls._R_SET_VECTOR_ELT if cast_value is None: cast_value = cls._CAST_IN n = len(iterable) with memorymanagement.rmemory() as rmemory: r_vector = rmemory.protect( openrlib.rlib.Rf_allocVector( cls._R_TYPE, n) ) populate_func(iterable, r_vector, set_elt, cast_value) return r_vector @classmethod @_cdata_res_to_rinterface def from_memoryview(cls, mview: memoryview) -> VT: """Create an R vector/array from a memoryview. The memoryview must be contiguous, and the C representation for the vector must be compatible between R and Python. If not the case, a :class:`ValueError` exception with will be raised.""" if not embedded.isready(): raise embedded.RNotReadyError('Embedded R is not ready to use.') if not mview.contiguous: raise ValueError('The memory view must be contiguous.') if ( (mview.itemsize != cls._R_SIZEOF_ELT) or not hasattr(cls, '_NP_TYPESTR') or not (cls._NP_TYPESTR == '|u1' or cls._NP_TYPESTR.endswith(mview.format)) ): msg = ( 'Incompatible C type sizes. ' 'The R array type is {r_type} with {r_size} byte{r_size_pl} ' 'per item ' 'while the Python array type is {py_type} with {py_size} ' 'byte{py_size_pl} per item.' .format(r_type=cls._R_TYPE, r_size=cls._R_SIZEOF_ELT, r_size_pl='s' if cls._R_SIZEOF_ELT > 1 else '', py_type=mview.format, py_size=mview.itemsize, py_size_pl='s' if mview.itemsize > 1 else '') ) raise ValueError(msg) r_vector = None n = len(mview) with memorymanagement.rmemory() as rmemory: r_vector = rmemory.protect( openrlib.rlib.Rf_allocVector( cls._R_TYPE, n) ) dest_ptr = cls._R_GET_PTR(r_vector) src_ptr = _rinterface.ffi.from_buffer(mview) nbytes = n * mview.itemsize _rinterface.ffi.memmove(dest_ptr, src_ptr, nbytes) return r_vector
[docs] @classmethod def from_object(cls, obj) -> VT: """Create an R vector/array from a Python object, if possible. An exception :class:`ValueError` will be raised if not possible.""" res = None try: mv = memoryview(obj) res = cls.from_memoryview(mv) except (TypeError, ValueError): try: res = cls.from_iterable(obj) except ValueError: msg = ('The class methods from_memoryview() and ' 'from_iterable() both failed to make a {} ' 'from an object of class {}' .format(cls, type(obj))) raise ValueError(msg) return res
def __getitem__( self, i: typing.Union[int, slice]) -> typing.Union[Sexp, VT, typing.Any]: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = conversion._cdata_to_rinterface( self._R_VECTOR_ELT(cdata, i_c)) elif isinstance(i, slice): res = self.from_iterable( [ self._R_VECTOR_ELT( cdata, i_c, ) for i_c in range(*i.indices(len(self))) ], cast_value=lambda x: x ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: typing.Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) self._R_SET_VECTOR_ELT(cdata, i_c, value.__sexp__._cdata) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): self._R_SET_VECTOR_ELT(cdata, i_c, v.__sexp__._cdata) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) def __len__(self) -> int: return openrlib.rlib.Rf_xlength(self.__sexp__._cdata) def __iter__(self) -> typing.Iterator[typing.Union[Sexp, VT, typing.Any]]: for i in range(len(self)): yield self[i] def index(self, item: typing.Any) -> int: for i, e in enumerate(self): if e == item: return i raise ValueError("'%s' is not in R vector" % item)
def _as_charsxp_cdata(x: typing.Union[CharSexp, str]): if isinstance(x, CharSexp): return x.__sexp__._cdata else: return conversion._str_to_charsxp(x)
[docs]class StrSexpVector(SexpVector): """R vector of strings.""" _R_TYPE = openrlib.rlib.STRSXP _R_GET_PTR = openrlib._STRING_PTR _R_SIZEOF_ELT = None _R_VECTOR_ELT = openrlib.rlib.STRING_ELT _R_SET_VECTOR_ELT = openrlib.rlib.SET_STRING_ELT _CAST_IN = _as_charsxp_cdata def __getitem__( self, i: typing.Union[int, slice] ) -> typing.Union['StrSexpVector', str, 'na_values.NA_Character']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = _rinterface._string_getitem(cdata, i_c) if res is None: res = na_values.NA_Character elif isinstance(i, slice): res = self.from_iterable( [_rinterface._string_getitem(cdata, i_c) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError('Indices must be integers or slices,' ' not %s' % type(i)) return res def __setitem__( self, i: typing.Union[int, slice], value: typing.Union[str, typing.Sequence[typing.Optional[str]], 'StrSexpVector', 'na_values.NA_Character'] ) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) if isinstance(value, Sexp): val_cdata = value.__sexp__._cdata else: if not isinstance(value, str): value = str(value) val_cdata = _as_charsxp_cdata(value) self._R_SET_VECTOR_ELT( cdata, i_c, val_cdata ) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): if v is None: v_cdata = openrlib.rlib.R_NaString else: if not isinstance(value, str): v = str(v) v_cdata = _as_charsxp_cdata(v) self._R_SET_VECTOR_ELT( cdata, i_c, v_cdata ) else: raise TypeError('Indices must be integers or slices, ' 'not %s' % type(i))
[docs] def get_charsxp(self, i: int) -> CharSexp: """Get the R CharSexp objects for the index i.""" i_c = _rinterface._python_index_to_c(self.__sexp__._cdata, i) return CharSexp( _rinterface.SexpCapsule( openrlib.rlib.STRING_ELT(self.__sexp__._cdata, i_c) ) )
class RVersion(metaclass=Singleton): _version = None def __init__(self): assert embedded.isinitialized() robj = StrSexpVector(['R.version']) with memorymanagement.rmemory() as rmemory: parsed = _rinterface._parse(robj.__sexp__._cdata, 1, rmemory) res = baseenv['eval'](parsed) self._version = OrderedDict((k, v[0]) for k, v in zip(res.names, res)) def __getitem__(self, k): return self._version[k] def keys(self): return self._version.keys() _TYPE2STR = { RTYPES.NILSXP: 'NULL', RTYPES.SYMSXP: 'symbol', # alias: name RTYPES.LISTSXP: 'pairlist', RTYPES.CLOSXP: 'closure', RTYPES.ENVSXP: 'environment', RTYPES.PROMSXP: 'promise', RTYPES.LANGSXP: 'language', RTYPES.SPECIALSXP: 'special', RTYPES.BUILTINSXP: 'builtin', RTYPES.CHARSXP: 'char', RTYPES.LGLSXP: 'logical', RTYPES.INTSXP: 'integer', RTYPES.REALSXP: 'double', # alias: numeric RTYPES.CPLXSXP: 'complex', RTYPES.STRSXP: 'character', RTYPES.DOTSXP: '...', RTYPES.ANYSXP: 'any', RTYPES.EXPRSXP: 'expression', RTYPES.VECSXP: 'list', RTYPES.EXTPTRSXP: 'externalptr', RTYPES.BCODESXP: 'bytecode', RTYPES.WEAKREFSXP: 'weakref', RTYPES.RAWSXP: 'raw', RTYPES.S4SXP: 'S4' } def rclass_get(scaps: _rinterface.CapsuleBase) -> StrSexpVector: """ Get the R class name. If no specific attribute "class" is defined from the objects, this will perform the equivalent of R_data_class() (src/main/attrib.c in the R source code). """ rlib = openrlib.rlib with memorymanagement.rmemory() as rmemory: classes = rmemory.protect( rlib.Rf_getAttrib(scaps._cdata, rlib.R_ClassSymbol)) if rlib.Rf_length(classes) == 0: dim = rmemory.protect( rlib.Rf_getAttrib(scaps._cdata, rlib.R_DimSymbol)) ndim = rlib.Rf_length(dim) if ndim > 0: if ndim == 2: if int(RVersion()['major']) >= 4: classname = ('matrix', 'array') else: classname = ('matrix', ) else: classname = ('array', ) else: typeof = RTYPES(scaps.typeof) if typeof in (RTYPES.CLOSXP, RTYPES.SPECIALSXP, RTYPES.BUILTINSXP): classname = ('function', ) elif typeof == RTYPES.REALSXP: classname = ('numeric', ) elif typeof == RTYPES.SYMSXP: classname = ('name', ) elif typeof == RTYPES.LANGSXP: symb = rlib.CAR(scaps._cdata) if openrlib.rlib.Rf_isSymbol(symb): symb_rstr = openrlib.rlib.PRINTNAME(symb) symb_str = conversion._cchar_to_str( openrlib.rlib.R_CHAR(symb_rstr), conversion._R_ENC_PY[openrlib.rlib .Rf_getCharCE(symb_rstr)] ) if symb_str in ('if', 'while', 'for', '=', '<-', '(', '{'): classname = (symb_str, ) else: classname = ('call', ) else: classname = ('call', ) else: classname = (_TYPE2STR.get(typeof, str(typeof)), ) classes = StrSexpVector.from_iterable(classname) else: classes = conversion._cdata_to_rinterface(classes) return classes def rclass_set( scaps: _rinterface.CapsuleBase, value: 'typing.Union[StrSexpVector, str]' ) -> None: """ Set the R class. :param:`scaps` A capsule with a pointer to an R object. :param:`value` An R vector of strings.""" if isinstance(value, StrSexpVector): value_r = value elif isinstance(value, str): value_r = StrSexpVector.from_iterable( [value]) else: raise TypeError('Value should a str or ' 'a rpy2.rinterface.sexp.StrSexpVector.') openrlib.rlib.Rf_setAttrib(scaps._cdata, openrlib.rlib.R_ClassSymbol, value_r.__sexp__._cdata) def unserialize(state): n = len(state) with memorymanagement.rmemory() as rmemory: cdata = rmemory.protect( openrlib.rlib.Rf_allocVector(openrlib.rlib.RAWSXP, n)) _rinterface.ffi.memmove( openrlib.rlib.RAW(cdata), state, n) ser = rmemory.protect( _rinterface.unserialize(cdata, globalenv.__sexp__._cdata) ) res = _rinterface.SexpCapsule(ser) return res
[docs]class NAIntegerType(int, metaclass=Singleton): def __new__(cls, *args, **kwargs): embedded.assert_isready() return super().__new__(cls, openrlib.rlib.R_NaInt) def __repr__(self) -> str: return 'NA_integer_' def __str__(self) -> str: return 'NA_integer_' def __bool__(self): raise ValueError('R value for missing integer value')
[docs]class NACharacterType(CharSexp, metaclass=SingletonABC): def __init__(self): embedded.assert_isready() super().__init__( CharSexp( _rinterface.SexpCapsule(openrlib.rlib.R_NaString) ) ) def __repr__(self) -> str: return 'NA_character_' def __str__(self) -> str: return 'NA_character_' def __bool__(self): raise ValueError('R value for missing character value')
[docs]class NALogicalType(int, metaclass=Singleton): def __new__(cls, *args, **kwargs): embedded.assert_isready() return super().__new__(cls, openrlib.rlib.R_NaInt) def __repr__(self) -> str: return 'NA' def __str__(self) -> str: return 'NA' def __bool__(self) -> bool: raise ValueError('R value for missing boolean value')
[docs]class NARealType(float, metaclass=Singleton): def __new__(cls, *args, **kwargs): embedded.assert_isready() return super().__new__(cls, openrlib.rlib.R_NaReal) def __repr__(self) -> str: return 'NA_real_' def __str__(self) -> str: return 'NA_real_' def __bool__(self) -> bool: raise ValueError('R value for missing float value')
[docs]class NAComplexType(complex, metaclass=Singleton): def __new__(cls, *args, **kwargs): embedded.assert_isready() return super().__new__(cls, openrlib.rlib.R_NaReal, openrlib.rlib.R_NaReal) def __repr__(self) -> str: return 'NA_complex_' def __str__(self) -> str: return 'NA_complex_' def __bool__(self): raise ValueError('R value for missing complex value')