Source code for rpy2.rinterface

import abc
import atexit
import contextlib
import contextvars
import csv
import enum
import functools
import inspect
import os
import math
import platform
import signal
import subprocess
import textwrap
import threading
import typing
import warnings
from typing import Union
from rpy2.rinterface_lib import openrlib
import rpy2.rinterface_lib._rinterface_capi as _rinterface
import rpy2.rinterface_lib.embedded as embedded
import rpy2.rinterface_lib.conversion as conversion
from rpy2.rinterface_lib.conversion import _cdata_res_to_rinterface
import rpy2.rinterface_lib.memorymanagement as memorymanagement
from rpy2.rinterface_lib import na_values
from rpy2.rinterface_lib.sexp import NULL
from rpy2.rinterface_lib.sexp import NULLType
import rpy2.rinterface_lib.bufferprotocol as bufferprotocol
from rpy2.rinterface_lib import sexp
from rpy2.rinterface_lib.sexp import CharSexp  # noqa: F401
from rpy2.rinterface_lib.sexp import RTYPES
from rpy2.rinterface_lib.sexp import SexpVector
from rpy2.rinterface_lib.sexp import StrSexpVector
from rpy2.rinterface_lib.sexp import Sexp
from rpy2.rinterface_lib.sexp import SexpEnvironment
from rpy2.rinterface_lib.sexp import unserialize  # noqa: F401
from rpy2.rinterface_lib.sexp import emptyenv
from rpy2.rinterface_lib.sexp import baseenv
from rpy2.rinterface_lib.sexp import globalenv

if os.name == 'nt':
    import rpy2.rinterface_lib.embedded_mswin as embedded_mswin
    embedded._initr = embedded_mswin._initr_win32

R_NilValue = openrlib.rlib.R_NilValue

endr = embedded.endr

evaluation_context = contextvars.ContextVar('evaluation_context',
                                            default=globalenv)


class _ENVVAR_ACTION(enum.Enum):
    KEEP_WARN = 0
    REPLACE_WARN = 1
    KEEP_NOWARN = 2
    REPLACE_NOWARN = 3


_DEFAULT_ENVVAR_ACTION: _ENVVAR_ACTION = _ENVVAR_ACTION.REPLACE_WARN


def get_evaluation_context() -> SexpEnvironment:
    """Get the frame (environment) in which R code is currently evaluated."""
    warnings.warn('This function is deprecated. '
                  'Use evaluation_context directly.',
                  DeprecationWarning, stacklevel=2)
    return evaluation_context.get()


@contextlib.contextmanager
def local_context(
        env: typing.Optional[SexpEnvironment] = None,
        use_rlock: bool = True
) -> typing.Iterator[SexpEnvironment]:
    """Local context for the evaluation of R code.

    Args:
    - env: an environment to use as a context. If not specified (None, the
      default), a child environment to the current context is created.
    - use_rlock: whether to use a threading lock (see the documentation about
      "rlock". The default is True.

    Returns:
    Yield the environment (passed to env, or created).
    """

    parent_frame = evaluation_context.get()
    if env is None:
        env = baseenv['new.env'](
            baseenv['parent.frame']()
            if parent_frame is None
            else parent_frame)
    try:
        if use_rlock:
            with openrlib.rlock:
                token = evaluation_context.set(env)
                yield env
        else:
            token = evaluation_context.set(env)
            yield env
    finally:
        evaluation_context.reset(token)


def _sigint_handler(sig, frame):
    raise KeyboardInterrupt()


@_cdata_res_to_rinterface
def parse(text: str, num: int = -1):
    """Parse a string as R code.

    :param text: A string with R code to parse.
    :param num: The maximum number of lines to parse. If -1, no
      limit is applied.
    """

    if not isinstance(text, str):
        raise TypeError('text must be a string.')
    robj = StrSexpVector([text])
    with memorymanagement.rmemory() as rmemory:
        res = _rinterface._parse(robj.__sexp__._cdata, num, rmemory)
    return res


def evalr_expr(
        expr: 'ExprSexpVector',
        envir: typing.Union[
            None,
            'SexpEnvironment', 'NULLType',
            'ListSexpVector', 'PairlistSexpVector', int,
            '_MissingArgType'] = None,
        enclos: typing.Union[
            None,
            'ListSexpVector', 'PairlistSexpVector',
            'NULLType',
            '_MissingArgType'] = None
) -> sexp.Sexp:
    """Evaluate an R expression.

    :param expr: An R expression.
    :param envir: An optional R environment in which the evaluation
      will happen. If None, which is the default, the environment in
     the context variable `evaluation_context` is used.
    :param enclos: An enclosure. This is only relevant when envir
      is a list, a pairlist, or a data.frame. It specifies where to
    look for objects not found in envir.
    :return: The R objects resulting from the evaluation of the code"""

    if envir is None:
        envir = evaluation_context.get()
    if enclos is None:
        enclos = MissingArg
    res = baseenv['eval'](expr, envir=envir, enclos=enclos)
    return res


def evalr_expr_with_visible(
        expr: 'ExprSexpVector',
        envir: typing.Union[
            None,
            'SexpEnvironment'] = None
) -> 'ListSexpVector':
    """Evaluate an R expression and return value and visibility flag.

    :param expr: An R expression.
    :param envir: An environment in which the expression will be evaluated.

    :return: An R list with (value, visibility) where visibility is
    an R boolean.
    """
    if envir is None:
        envir = evaluation_context.get()
    assert isinstance(envir, SexpEnvironment)

    error_occured = _rinterface.ffi.new('int *', 0)
    with memorymanagement.rmemory() as rmemory:
        r_call_nested = rmemory.protect(
            openrlib.rlib.Rf_lang2(
                baseenv['eval'].__sexp__._cdata,
                expr.__sexp__._cdata)
        )
        r_call = rmemory.protect(
            openrlib.rlib.Rf_lang2(
                baseenv['withVisible'].__sexp__._cdata,
                r_call_nested)
        )
        r_res = rmemory.protect(
                openrlib.rlib.R_tryEval(
                    r_call,
                    envir.__sexp__._cdata,  # call context.
                    error_occured)
        )
        if error_occured[0]:
            raise embedded.RRuntimeError(_rinterface._geterrmessage())
        res = conversion._cdata_to_rinterface(r_res)
        assert isinstance(res, ListSexpVector)
    return res


def evalr(
        source: str,
        maxlines: int = -1,
        envir: typing.Union[
            None,
            'SexpEnvironment', 'NULLType',
            'ListSexpVector', 'PairlistSexpVector', int,
            '_MissingArgType'] = None,
        enclos: typing.Union[
            None,
            'ListSexpVector', 'PairlistSexpVector',
            'NULLType', '_MissingArgType'] = None
) -> sexp.Sexp:
    """Evaluate a string as R code.

    Evaluate a string as R just as it would happen when writing
    code in an R terminal.

    :param str text: A string to be evaluated as R code,
    or an R expression.
    :param int maxlines: The maximum number of lines to parse. If -1, no
      limit is applied.
    :param envir: An optional R environment in which the evaluation
      will happen.
    :param enclos: An enclosure. This is only relevant when envir
      is a list, a pairlist, or a data.frame. It specifies where to
    look for objects not found in envir.
    :return: The R objects resulting from the evaluation of the code"""

    expr = parse(source, num=maxlines)
    res = evalr_expr(expr, envir=envir, enclos=enclos)
    return res


def vector_memoryview(obj: sexp.SexpVector,
                      sizeof_str: str, cast_str: str) -> memoryview:
    """
    :param obj: R vector
    :param str sizeof_str: Type in a string to use with ffi.sizeof()
        (for example "int")
    :param str cast_str: Type in a string to use with memoryview.cast()
        (for example "i")
    """
    b = openrlib.ffi.buffer(
        obj._R_GET_PTR(obj.__sexp__._cdata),
        openrlib.ffi.sizeof(sizeof_str) * len(obj))
    shape = bufferprotocol.getshape(obj.__sexp__._cdata)
    # One could have expected to only need builtin Python
    # and do something like
    # ```
    # mv = memoryview(b).cast(cast_str, shape, order='F')
    # ```
    # but Python does not handle FORTRAN-ordered arrays without having
    # to write C extensions (see
    # https://bugs.python.org/issue34778 is not resolved).
    # Having numpy a requirement just for this is a problem so a
    # C function to swap the strides in place was written
    try:
        import rpy2.rinterface_lib._bufferprotocol as bp   # type: ignore
        mv = memoryview(b).cast(cast_str, shape)
        bp.memoryview_swapstrides(mv)
    except ImportError:
        import numpy  # type: ignore
        a = numpy.frombuffer(b, dtype=cast_str).reshape(shape, order='F')
        # The typed signature for memoryview does not help the static
        # type checker verify that a numpy.ndarray implement the buffer
        # protocol. Type checking is ignored to avoid a check error.
        mv = memoryview(a)  # type: ignore
    return mv


class SexpSymbol(sexp.Sexp):
    """An unevaluated R symbol."""

    def __init__(
            self,
            obj: Union[Sexp,
                       _rinterface.SexpCapsule,
                       _rinterface.UninitializedRCapsule,
                       str]
    ):
        if isinstance(obj, Sexp) or isinstance(obj, _rinterface.CapsuleBase):
            super().__init__(obj)
        elif isinstance(obj, str):
            name_cdata = _rinterface.ffi.new('char []', obj.encode('utf-8'))
            sexp = _rinterface.SexpCapsule(
                openrlib.rlib.Rf_install(name_cdata))
            super().__init__(sexp)
        else:
            raise TypeError(
                'The constructor must be called '
                'with that is an instance of rpy2.rinterface.sexp.Sexp '
                'or an instance of rpy2.rinterface._rinterface.SexpCapsule')

    def __str__(self) -> str:
        return conversion._cchar_to_str(
            openrlib._STRING_VALUE(
                self._sexpobject._cdata
            ), 'utf-8'
        )


class _MissingArgType(SexpSymbol, metaclass=sexp.SingletonABC):
    """Missing argument in a call to an R function.

    When evaluating a function, arguments that are not specified
    can take a default value when named, otherwise they will
    take a special value that indicates that they are missing."""

    def __init__(self):
        if embedded.isready():
            tmp = sexp.Sexp(
                _rinterface.UnmanagedSexpCapsule(
                    openrlib.rlib.R_MissingArg
                )
            )
        else:
            tmp = sexp.Sexp(
                _rinterface.UninitializedRCapsule(RTYPES.SYMSXP.value)
            )
        super().__init__(tmp)

    def __bool__(self) -> bool:
        """This is always False."""
        return False

    @property
    def __sexp__(self) -> typing.Union['_rinterface.SexpCapsule',
                                       '_rinterface.UninitializedRCapsule']:
        return self._sexpobject

    @__sexp__.setter
    def __sexp__(self, value: typing.Union['_rinterface.SexpCapsule',
                                           '_rinterface.UninitializedRCapsule']) -> None:
        raise TypeError('The capsule for the R object cannot be modified.')


MissingArg = _MissingArgType()


class SexpPromise(Sexp):

    @_cdata_res_to_rinterface
    def eval(self, env: typing.Optional[SexpEnvironment] = None) -> sexp.Sexp:
        """"Evalute the R "promise".

        :param env: The environment in which to evaluate the
          promise.
        """
        if env is None:
            env = globalenv
        return openrlib.rlib.Rf_eval(self.__sexp__._cdata, env)


class SexpVectorWithNumpyInterface(SexpVector):
    """Numpy-specific API for accessing the content of a numpy array.

    This interface implements version 3 of Numpy's `__array_interface__`
    and is only available / possible for some of the R vectors."""

    @property
    @abc.abstractmethod
    def _NP_TYPESTR(self) -> str:
        pass

    @property
    def __array_interface__(
            self
    ) -> dict:
        """Return an `__array_interface__` version 3.

        Note that the pointer returned in the items 'data' corresponds to
        a memory area under R's memory management and that it will become
        invalid once the area once R frees the object. It is safer to keep
        the rpy2 object proxying the R object alive for the duration the
        pointer is used in Python / numpy."""
        shape = bufferprotocol.getshape(self.__sexp__._cdata)
        data = openrlib.ffi.buffer(self._R_GET_PTR(self.__sexp__._cdata))
        strides = bufferprotocol.getstrides(self.__sexp__._cdata,
                                            shape,
                                            self._R_SIZEOF_ELT)
        return {'shape': shape,
                'typestr': self._NP_TYPESTR,
                'strides': strides,
                'data': data,
                'version': 3}

    @classmethod
    def _check_C_compatible(cls, mview):
        return (
            mview.itemsize == cls._R_SIZEOF_ELT
            and
            (
                cls._NP_TYPESTR == '|u1'
                or
                cls._NP_TYPESTR.endswith(mview.format)
            )
        )


[docs]class ByteSexpVector(SexpVectorWithNumpyInterface): """Array of bytes. This is the R equivalent to a Python :class:`bytesarray`. """ _R_TYPE = openrlib.rlib.RAWSXP _R_SIZEOF_ELT = _rinterface.ffi.sizeof('char') _NP_TYPESTR = '|u1' _R_GET_PTR = staticmethod(openrlib.RAW) @staticmethod def _CAST_IN(x: typing.Any) -> int: if isinstance(x, int): if x > 255: raise ValueError('byte must be in range(0, 256)') res = x elif isinstance(x, (bytes, bytearray)): if len(x) != 1: raise ValueError('byte must be a single character') res = ord(x) else: raise ValueError('byte must be an integer [0, 255] or a ' 'single byte character') return res @staticmethod def _R_VECTOR_ELT(x, i: int) -> None: return openrlib.RAW(x)[i] @staticmethod def _R_SET_VECTOR_ELT(x, i: int, val) -> None: openrlib.RAW(x)[i] = val def __getitem__(self, i: Union[int, slice]) -> Union[int, 'ByteSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = openrlib.RAW_ELT(cdata, i_c) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.RAW_ELT( cdata, i_c ) for i_c in range(*i.indices(len(self))) ] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.RAW(cdata)[i_c] = self._CAST_IN(value) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): if v > 255: raise ValueError('byte must be in range(0, 256)') openrlib.RAW(cdata)[i_c] = self._CAST_IN(v) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i))
[docs]class BoolSexpVector(SexpVectorWithNumpyInterface): """Array of booleans. Note that R is internally storing booleans as integers to allow an additional "NA" value to represent missingness.""" _R_TYPE = openrlib.rlib.LGLSXP _R_SIZEOF_ELT = _rinterface.ffi.sizeof('Rboolean') _NP_TYPESTR = '|i' _R_VECTOR_ELT = openrlib.LOGICAL_ELT _R_SET_VECTOR_ELT = openrlib.SET_LOGICAL_ELT _R_GET_PTR = staticmethod(openrlib.LOGICAL) @staticmethod def _CAST_IN(x): if x is None or x == openrlib.rlib.R_NaInt: return NA_Logical else: return bool(x) def __getitem__(self, i: Union[int, slice]) -> Union[bool, 'sexp.NALogicalType', 'BoolSexpVector']: res: Union[bool, 'sexp.NALogicalType', 'BoolSexpVector'] cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) elt = openrlib.LOGICAL_ELT(cdata, i_c) res = (na_values.NA_Logical # type: ignore if elt == NA_Logical else bool(elt)) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.LOGICAL_ELT(cdata, i_c) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.SET_LOGICAL_ELT(cdata, i_c, int(value)) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.SET_LOGICAL_ELT(cdata, i_c, int(v)) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) def memoryview(self) -> memoryview: return vector_memoryview(self, 'int', 'i')
def nullable_int(v): if type(v) is float and math.isnan(v): return openrlib.rlib.R_NaInt else: return int(v)
[docs]class IntSexpVector(SexpVectorWithNumpyInterface): _R_TYPE = openrlib.rlib.INTSXP _R_SET_VECTOR_ELT = openrlib.SET_INTEGER_ELT _R_VECTOR_ELT = openrlib.INTEGER_ELT _R_SIZEOF_ELT = _rinterface.ffi.sizeof('int') _NP_TYPESTR = '|i' _R_GET_PTR = staticmethod(openrlib.INTEGER) _CAST_IN = staticmethod(nullable_int) def __getitem__(self, i: Union[int, slice]) -> Union[int, 'IntSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = openrlib.INTEGER_ELT(cdata, i_c) if res == NA_Integer: res = NA_Integer elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.INTEGER_ELT( cdata, i_c ) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.SET_INTEGER_ELT(cdata, i_c, int(value)) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.SET_INTEGER_ELT(cdata, i_c, int(v)) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) def memoryview(self) -> memoryview: return vector_memoryview(self, 'int', 'i')
[docs]class FloatSexpVector(SexpVectorWithNumpyInterface): _R_TYPE = openrlib.rlib.REALSXP _R_VECTOR_ELT = openrlib.REAL_ELT _R_SET_VECTOR_ELT = openrlib.SET_REAL_ELT _R_SIZEOF_ELT = _rinterface.ffi.sizeof('double') _NP_TYPESTR = '|d' _CAST_IN = staticmethod(float) _R_GET_PTR = staticmethod(openrlib.REAL) def __getitem__( self, i: Union[int, slice] ) -> Union[float, 'FloatSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = openrlib.REAL_ELT(cdata, i_c) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.REAL_ELT( cdata, i_c) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError('Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.SET_REAL_ELT(cdata, i_c, float(value)) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.SET_REAL_ELT(cdata, i_c, float(v)) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) def memoryview(self) -> memoryview: return vector_memoryview(self, 'double', 'd')
[docs]class ComplexSexpVector(SexpVector): _R_TYPE = openrlib.rlib.CPLXSXP _R_GET_PTR = staticmethod(openrlib.COMPLEX) _R_SIZEOF_ELT = _rinterface.ffi.sizeof('Rcomplex') @staticmethod def _R_VECTOR_ELT(x, i): return openrlib.COMPLEX(x)[i] @staticmethod def _R_SET_VECTOR_ELT(x, i, v): openrlib.COMPLEX(x).__setitem__(i, v) @staticmethod def _CAST_IN(x): if isinstance(x, complex): res = (x.real, x.imag) else: try: res = (x.r, x.i) except AttributeError: raise TypeError( 'Unable to turn value into an R complex number.' ) return res def __getitem__( self, i: Union[int, slice] ) -> Union[complex, 'ComplexSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) _ = openrlib.COMPLEX_ELT(cdata, i_c) res = complex(_.r, _.i) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.COMPLEX_ELT( cdata, i_c) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.COMPLEX(cdata)[i_c] = self._CAST_IN(value) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.COMPLEX(cdata)[i_c] = self._CAST_IN(v) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i))
[docs]class ListSexpVector(SexpVector): """R list. An R list an R vector (array) that is similar to a Python list in the sense that items in the list can be of any type, whereas most other R vectors are homogeneous (all items are of the same type). """ _R_TYPE = openrlib.rlib.VECSXP _R_GET_PTR = staticmethod(openrlib._VECTOR_PTR) _R_SIZEOF_ELT = None _R_VECTOR_ELT = openrlib.rlib.VECTOR_ELT _R_SET_VECTOR_ELT = openrlib.rlib.SET_VECTOR_ELT _CAST_IN = staticmethod(conversion._get_cdata)
[docs]class PairlistSexpVector(SexpVector): """R pairlist. A R pairlist is rarely used outside of R's internal libraries and a relatively small number of use cases. It is essentially a LISP-like list of (name, value) pairs. """ _R_TYPE = openrlib.rlib.LISTSXP _R_GET_PTR = None _R_SIZEOF_ELT = None _R_VECTOR_ELT = None _R_SET_VECTOR_ELT = None _CAST_IN = staticmethod(conversion._get_cdata) def __getitem__(self, i: Union[int, slice]) -> Sexp: cdata = self.__sexp__._cdata rlib = openrlib.rlib if isinstance(i, int): # R-exts says that it is converted to a VECSXP when subsetted. i_c = _rinterface._python_index_to_c(cdata, i) item_cdata = rlib.Rf_nthcdr(cdata, i_c) with memorymanagement.rmemory() as rmemory: res_cdata = rmemory.protect( rlib.Rf_allocVector(RTYPES.VECSXP, 1)) rlib.SET_VECTOR_ELT( res_cdata, 0, rlib.CAR( item_cdata )) res_name = rmemory.protect( rlib.Rf_allocVector(RTYPES.STRSXP, 1)) item_cdata_name = rlib.PRINTNAME(rlib.TAG(item_cdata)) if _rinterface._TYPEOF(item_cdata_name) != rlib.NILSXP: rlib.SET_STRING_ELT( res_name, 0, item_cdata_name) rlib.Rf_namesgets(res_cdata, res_name) res = conversion._cdata_to_rinterface(res_cdata) elif isinstance(i, slice): iter_indices = range(*i.indices(len(self))) n = len(iter_indices) with memorymanagement.rmemory() as rmemory: res_cdata = rmemory.protect( rlib.Rf_allocVector( self._R_TYPE, n) ) iter_res_cdata = res_cdata prev_i = 0 lst_cdata = self.__sexp__._cdata for i in iter_indices: if i >= len(self): raise IndexError('index out of range') lst_cdata = rlib.Rf_nthcdr(lst_cdata, i - prev_i) prev_i = i rlib.SETCAR(iter_res_cdata, rlib.CAR(lst_cdata)) rlib.SET_TAG(iter_res_cdata, rlib.TAG(lst_cdata)) iter_res_cdata = rlib.CDR(iter_res_cdata) res = conversion._cdata_to_rinterface(res_cdata) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res @classmethod @_cdata_res_to_rinterface def from_iterable(cls, iterable, cast_in=None): raise NotImplementedError()
class ExprSexpVector(SexpVector): _R_TYPE = openrlib.rlib.EXPRSXP _R_GET_PTR = None _CAST_IN = None _R_SIZEOF_ELT = None _R_VECTOR_ELT = openrlib.rlib.VECTOR_ELT _R_SET_VECTOR_ELT = None _str2lang = None def _get_str2lang(): global _str2lang if _str2lang is None: try: _str2lang = baseenv['str2lang'] except KeyError: # TODO: This exists to ensure compatibility with # older versions of R. If should be removed when # older versions of R are no longer supported. _str2lang = evalr('function(s) ' 'base::parse(text=s, keep.source=FALSE)[[1]]') return _str2lang LangSexpVector_VT = typing.TypeVar('LangSexpVector_VT', bound='LangSexpVector')
[docs]class LangSexpVector(SexpVector): """An R language object. To create from a (Python) string containing R code use the classmethod `from_string`.""" _R_TYPE = openrlib.rlib.LANGSXP _R_GET_PTR = None _CAST_IN = None _R_SIZEOF_ELT = None _R_VECTOR_ELT = None _R_SET_VECTOR_ELT = None @_cdata_res_to_rinterface def __getitem__(self, i: int): cdata = self.__sexp__._cdata i_c = _rinterface._python_index_to_c(cdata, i) return openrlib.rlib.CAR( openrlib.rlib.Rf_nthcdr(cdata, i_c) ) def __setitem__(self, i: typing.Union[int, slice], value: sexp.SupportsSEXP) -> None: if isinstance(i, slice): raise NotImplementedError( 'Assigning slices to LangSexpVectors is not yet implemented.' ) cdata = self.__sexp__._cdata i_c = _rinterface._python_index_to_c(cdata, i) openrlib.rlib.SETCAR( openrlib.rlib.Rf_nthcdr(cdata, i_c), value.__sexp__._cdata )
[docs] @classmethod def from_string( cls: typing.Type[LangSexpVector_VT], s: str ) -> LangSexpVector_VT: """Create an R language object from a string. This creates an unevaluated R language object. Args: s: R source code in a string. Returns: An instance of the class the method is called from. """ return cls(_get_str2lang()(s))
class SexpClosure(Sexp): @_cdata_res_to_rinterface def __call__(self, *args, **kwargs) -> Sexp: error_occured = _rinterface.ffi.new('int *', 0) with memorymanagement.rmemory() as rmemory: call_r = rmemory.protect( _rinterface.build_rcall(self.__sexp__._cdata, args, kwargs.items())) call_context = evaluation_context.get() res = rmemory.protect( openrlib.rlib.R_tryEval( call_r, call_context.__sexp__._cdata, error_occured) ) if error_occured[0]: raise embedded.RRuntimeError(_rinterface._geterrmessage()) return res @_cdata_res_to_rinterface def rcall(self, keyvals, environment: typing.Optional[SexpEnvironment] = None): """Call/evaluate an R function. Args: - keyvals: a sequence of key/value (name/parameter) pairs. A name/parameter that is None will indicated an unnamed parameter. Like in R, keys/names do not have to be unique, partial matching can be used, and named/unnamed parameters can occur at any position in the sequence. - environment: an optional R environment to evaluate the function. """ # TODO: check keyvals are pairs ? if environment is None: environment = evaluation_context.get() assert isinstance(environment, SexpEnvironment) error_occured = _rinterface.ffi.new('int *', 0) with memorymanagement.rmemory() as rmemory: call_r = rmemory.protect( _rinterface.build_rcall(self.__sexp__._cdata, [], keyvals)) res = rmemory.protect( openrlib.rlib.R_tryEval(call_r, environment.__sexp__._cdata, error_occured)) if error_occured[0]: raise embedded.RRuntimeError(_rinterface._geterrmessage()) return res @property # type: ignore @_cdata_res_to_rinterface def closureenv(self) -> SexpEnvironment: """Closure of the R function.""" return openrlib.rlib.CLOENV(self.__sexp__._cdata)
[docs]class SexpS4(Sexp): """R "S4" object. S4 objects as one of the available forms of Object-Oriented Programming in R.""" pass
# TODO: clean up def make_extptr(obj, tag, protected): if protected is None: cdata_protected = openrlib.rlib.R_NilValue else: try: cdata_protected = protected.__sexp__._cdata except AttributeError: raise TypeError('Argument protected must inherit from %s' % type(Sexp)) ptr = _rinterface.ffi.new_handle(obj) with memorymanagement.rmemory() as rmemory: cdata = rmemory.protect( openrlib.rlib.R_MakeExternalPtr( ptr, tag, cdata_protected)) openrlib.rlib.R_RegisterCFinalizer( cdata, (_rinterface._capsule_finalizer_c if _rinterface._capsule_finalizer_c else _rinterface._capsule_finalizer)) res = _rinterface.SexpCapsuleWithPassenger(cdata, obj, ptr) return res
[docs]class SexpExtPtr(Sexp): """R "External Pointer" object.""" TYPE_TAG = 'Python' @classmethod def from_pyobject(cls, func, tag: str = TYPE_TAG, protected=None): if not isinstance(tag, str): raise TypeError('The tag must be a string.') scaps = make_extptr(func, conversion._str_to_charsxp(cls.TYPE_TAG), protected) res = cls(scaps) if tag != cls.TYPE_TAG: res.TYPE_TAG = tag return res
# TODO: Only use rinterface-level ? conversion._R_RPY2_MAP.update({ openrlib.rlib.NILSXP: NULLType, openrlib.rlib.EXPRSXP: ExprSexpVector, openrlib.rlib.LANGSXP: LangSexpVector, openrlib.rlib.ENVSXP: SexpEnvironment, openrlib.rlib.RAWSXP: ByteSexpVector, openrlib.rlib.LGLSXP: BoolSexpVector, openrlib.rlib.INTSXP: IntSexpVector, openrlib.rlib.REALSXP: FloatSexpVector, openrlib.rlib.CPLXSXP: ComplexSexpVector, openrlib.rlib.STRSXP: StrSexpVector, openrlib.rlib.VECSXP: ListSexpVector, openrlib.rlib.LISTSXP: PairlistSexpVector, openrlib.rlib.CLOSXP: SexpClosure, openrlib.rlib.BUILTINSXP: SexpClosure, openrlib.rlib.SPECIALSXP: SexpClosure, openrlib.rlib.EXTPTRSXP: SexpExtPtr, openrlib.rlib.SYMSXP: SexpSymbol, openrlib.rlib.S4SXP: SexpS4 }) conversion._R_RPY2_DEFAULT_MAP = Sexp conversion._PY_RPY2_MAP.update({ int: conversion._int_to_sexp, float: conversion._float_to_sexp, complex: conversion._complex_to_sexp }) conversion._PY_R_MAP.update({ _rinterface.ffi.CData: False, # integer int: conversion._int_to_sexp, sexp.NAIntegerType: conversion._int_to_sexp, # float float: conversion._float_to_sexp, sexp.NARealType: conversion._float_to_sexp, # boolean bool: conversion._bool_to_sexp, sexp.NALogicalType: conversion._bool_to_sexp, # string str: conversion._str_to_sexp, sexp.CharSexp: None, sexp.NACharacterType: None, # complex complex: conversion._complex_to_sexp, sexp.NAComplexType: conversion._complex_to_sexp, # None type(None): lambda x: openrlib.rlib.R_NilValue}) def vector(iterable, rtype: RTYPES) -> SexpVector: """Create an R vector. While the different types of R vectors all have their own class, the creation of array in Python is often available through factory function that accept the type of the array as a parameters. This function is providing a similar functionality for R vectors.""" error = False try: cls = conversion._R_RPY2_MAP[rtype] except KeyError: error = True if not error and not issubclass(cls, SexpVector): error = True if error: raise ValueError( 'Unable to build a vector from type "%s"' % RTYPES(rtype)) return cls.from_iterable(iterable) class RRuntimeWarning(RuntimeWarning): pass MissingArg = _MissingArgType() NA_Character = None NA_Integer = None NA_Logical = None NA = None NA_Real = None NA_Complex = None def initr_simple() -> typing.Optional[int]: """Initialize R's embedded C library.""" with openrlib.rlock: status = embedded._initr() atexit.register(endr, 0) _rinterface._register_external_symbols() _post_initr_setup() return status def initr_checkenv( interactive: typing.Optional[bool] = None, _want_setcallbacks: bool = True, _c_stack_limit: typing.Optional[int] = None ) -> typing.Optional[int]: """Initialize the embedded R. :param interactive: Should R run in interactive or non-interactive mode? if `None` the value in `_DEFAULT_R_INTERACTIVE` will be used. :param _want_setcallbacks: Should custom rpy2 callbacks for R frontends be set?. :param _c_stack_limit: Limit for the C Stack. if `None` the value in `_DEFAULT_C_STACK_LIMIT` will be used. """ # Force the internal initialization flag if there is an environment # variable that indicates that R was already initialized in the current # process. status = None with openrlib.rlock: _setrenvvars(_DEFAULT_ENVVAR_ACTION) if embedded.is_r_externally_initialized(): embedded._setinitialized() else: status = embedded._initr(interactive=interactive, _want_setcallbacks=_want_setcallbacks, _c_stack_limit=_c_stack_limit) embedded.set_python_process_info() _rinterface._register_external_symbols() _post_initr_setup() return status initr = initr_checkenv def _update_R_ENC_PY(): conversion._R_ENC_PY[openrlib.rlib.CE_UTF8] = 'utf-8' l10n_info = tuple(baseenv['l10n_info']()) if platform.system() == 'Windows': val_latin1 = 'cp{:d}'.format(l10n_info[3][0]) else: val_latin1 = 'latin1' conversion._R_ENC_PY[openrlib.rlib.CE_LATIN1] = val_latin1 if l10n_info[1]: val_native = conversion._R_ENC_PY[openrlib.rlib.CE_UTF8] else: val_native = val_latin1 conversion._R_ENC_PY[openrlib.rlib.CE_NATIVE] = val_native conversion._R_ENC_PY[openrlib.rlib.CE_ANY] = 'utf-8' # TODO: This function could be used by situation.py. May be better to # place elsewhere. def _getrenvvars( baselinevars: typing.Optional[typing.MutableMapping[str, str]] = None, r_home: typing.Optional[str] = None ) -> typing.Tuple[typing.Tuple[str, str], ...]: """Get the environment variables defined by the R front-end script.""" if baselinevars is None: baselinevars = os.environ if r_home is None: r_home = openrlib.R_HOME if r_home is None: raise RuntimeError('Unable to determine R_HOME.') cmd = ( os.path.join(r_home, 'bin', 'Rscript'), '-e', 'x<-Sys.getenv();y<-as.character(x);names(y)<-names(x);' 'write.table(y,col.names=FALSE,quote=TRUE,sep=",")' ) envvars = subprocess.check_output(cmd, universal_newlines=True, stderr=subprocess.PIPE) res = [] reader = csv.reader(row for row in envvars.split(os.linesep) if row != '') for k, v in reader: if ( (k not in baselinevars) or (baselinevars[k] != v) ): res.append((k, v)) return tuple(res) def _setrenvvars(action: _ENVVAR_ACTION): new_envvars = {} for k, v in _getrenvvars(): if k in os.environ: if ( action in (_ENVVAR_ACTION.KEEP_WARN, _ENVVAR_ACTION.KEEP_NOWARN) ): if action is _ENVVAR_ACTION.KEEP_WARN: warnings.warn( f'Environment variable "{k}" redefined by R but ignored.' ) continue elif ( action in (_ENVVAR_ACTION.REPLACE_WARN, _ENVVAR_ACTION.REPLACE_NOWARN) ): if action is _ENVVAR_ACTION.REPLACE_WARN: warnings.warn( f'Environment variable "{k}" redefined by R and overriding ' 'existing variable.' ) new_envvars[k] = v os.environ.update(new_envvars) def _post_initr_setup() -> None: emptyenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_EmptyEnv) baseenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_BaseEnv) globalenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_GlobalEnv) NULL._sexpobject = _rinterface.UnmanagedSexpCapsule( openrlib.rlib.R_NilValue ) MissingArg._sexpobject = _rinterface.UnmanagedSexpCapsule( openrlib.rlib.R_MissingArg ) global NA_Character na_values.NA_Character = sexp.NACharacterType() NA_Character = na_values.NA_Character global NA_Integer na_values.NA_Integer = sexp.NAIntegerType(openrlib.rlib.R_NaInt) NA_Integer = na_values.NA_Integer global NA_Logical, NA na_values.NA_Logical = sexp.NALogicalType(openrlib.rlib.R_NaInt) NA_Logical = na_values.NA_Logical NA = NA_Logical global NA_Real na_values.NA_Real = sexp.NARealType(openrlib.rlib.R_NaReal) NA_Real = na_values.NA_Real global NA_Complex na_values.NA_Complex = sexp.NAComplexType( _rinterface.ffi.new( 'Rcomplex *', [openrlib.rlib.R_NaReal, openrlib.rlib.R_NaReal]) ) NA_Complex = na_values.NA_Complex warn_about_thread = False if threading.current_thread() is threading.main_thread(): try: signal.signal(signal.SIGINT, _sigint_handler) except ValueError as ve: if str(ve) == 'signal only works in main thread': warn_about_thread = True else: raise ve else: warn_about_thread = True if warn_about_thread: warnings.warn( textwrap.dedent( """R is not initialized by the main thread. Its taking over SIGINT cannot be reversed here, and as a consequence the embedded R cannot be interrupted with Ctrl-C. Consider (re)setting the signal handler of your choice from the main thread.""" ) ) _update_R_ENC_PY() def _find_first(nodes, of_type): for node in nodes: if isinstance(node, of_type): return node raise ValueError( f'No node of type {of_type}' )
[docs]def rternalize( function: typing.Optional[typing.Callable] = None, *, signature: bool = False ) -> typing.Union[SexpClosure, functools.partial]: """ Make a Python function callable from R. Takes an arbitrary Python function and wrap it in such a way that it can be called from the R side. This factory can be used as a decorator, and has an optional named argument "signature" that can be True or False (default is False). When True, the R function wrapping the Python one will have a matching signature or a close one, as detailed below. The Python ellipsis arguments`*args` and `**kwargs` are translated to the R ellipsis `...`. For example: .. code-block:: python @rternalize(signature=True) def foo(x, *args, y=2): pass will be visible in R with the signature: .. code-block:: r function (x, ..., y) The only limitation is that whenever `*args` and `**kwargs` are both present in the Python declaration they must be consecutive. For example: .. code-block:: python def foo(x, *args, y=2, **kwargs): pass is a valid Python declaration. However, it cannot be "rternalized" because the R ellipsis can only appear at most once in the signature of a given function. Trying to apply the decorator `rternalize` would raise an exception. The following Python function can be "rternalized": .. code-block:: python def foo(x, *args, **kwargs): pass It is visible to R with the signature .. code-block:: r function (x, ...) Python function definitions can allow the optional naming of required arguments. The mapping of signatures between Python and R is then quasi-indentical since R does it for unnamed arguments. The check that all arguments are present is still performed on the Python side. Example: .. code-block:: python @rternalize(signature=True) def foo(x, *, y, z): print(f'x: {x[0]}, y: {y[0]}, z: {z[0]}') return ri.NULL >>> _ = foo(1, 2, 3) x: 1, y: 2, z: 3 ValueErro: None >>> _ = foo(1) TypeError: rternalized foo() missing 2 required keyword-only arguments: 'y' and 'z' >>> _ = foo(1, z=2, y=3) x: 1, y: 3, z: 2 >>> _ = foo(1, z=2, y=3) x: 1, y: 3, z: 2 Note that whenever the Python function has an ellipsis (either `*args` or `**kwargs`) earlier parameters in the signature that are positional-or-keyword are considered to be positional arguments in a function call. :param function: A Python callable object. This is a positional argument with a default value `None` to allow the decorator function without parentheses when optional argument is not wanted. :return: A wrapped R object that can be use like any other rpy2 object. """ if not embedded.isinitialized(): raise embedded.RNotReadyError('The embedded R is not yet initialized.') if function is None: return functools.partial(rternalize, signature=signature) assert callable(function) rpy_fun = SexpExtPtr.from_pyobject(function) if not signature: template = parse(""" function(...) { .External(".Python", foo, ...); } """) template[0][2][1][2] = rpy_fun else: has_ellipsis = None params_r_sig = [] for p_i, (name, param) in enumerate( inspect.signature(function).parameters.items() ): if ( param.kind is inspect.Parameter.VAR_POSITIONAL or param.kind is inspect.Parameter.VAR_KEYWORD ): if has_ellipsis: if has_ellipsis != (p_i - 1): raise ValueError( 'R functions can only have one ellipsis. ' 'As consequence your Python function must have *args ' 'and **kwargs that are consecutive in function ' 'signature.' ) else: # The ellipsis can only be added once. has_ellipsis = p_i params_r_sig.append('...') else: params_r_sig.append(name) r_func_args = ', '.join(params_r_sig) r_src = f""" function({r_func_args}) {{ py_func <- RPY2_FUN_PLACEHOLDER lst_args <- base::as.list(base::match.call()[-1]) RPY2_ARGUMENTS <- base::c( base::list( ".Python", py_func ), lst_args ) res <- base::do.call( base::.External, RPY2_ARGUMENTS ); res }} """ template = parse(r_src) function_definition = _find_first(template, of_type=LangSexpVector) function_body = _find_first(function_definition, of_type=LangSexpVector) rpy_fun_node = function_body[1] assert str(rpy_fun_node[2]) == 'RPY2_FUN_PLACEHOLDER' rpy_fun_node[2] = rpy_fun # TODO: use lower-level eval ? res = baseenv['eval'](template) # TODO: hack to prevent the nested function from having its # refcount down to zero when returning res.__nested_sexp__ = rpy_fun.__sexp__ return res
[docs]def process_revents() -> None: """Process R events. Calling this function a regular interval can help ensure that R is processing input events (e.g., resizing of a window with graphics).""" openrlib.rlib.rpy2_runHandlers(openrlib.rlib.R_InputHandlers)