Source code for rpy2.rinterface

import atexit
import contextlib
import os
import math
import platform
import signal
import textwrap
import threading
import typing
import warnings
from typing import Union
from rpy2.rinterface_lib import openrlib
import rpy2.rinterface_lib._rinterface_capi as _rinterface
import rpy2.rinterface_lib.embedded as embedded
import rpy2.rinterface_lib.conversion as conversion
from rpy2.rinterface_lib.conversion import _cdata_res_to_rinterface
import rpy2.rinterface_lib.memorymanagement as memorymanagement
from rpy2.rinterface_lib import na_values
from rpy2.rinterface_lib.sexp import NULLType
import rpy2.rinterface_lib.bufferprotocol as bufferprotocol
from rpy2.rinterface_lib import sexp
from rpy2.rinterface_lib.sexp import CharSexp  # noqa: F401
from rpy2.rinterface_lib.sexp import RTYPES
from rpy2.rinterface_lib.sexp import SexpVector
from rpy2.rinterface_lib.sexp import StrSexpVector
from rpy2.rinterface_lib.sexp import Sexp
from rpy2.rinterface_lib.sexp import SexpEnvironment
from rpy2.rinterface_lib.sexp import unserialize  # noqa: F401
from rpy2.rinterface_lib.sexp import emptyenv
from rpy2.rinterface_lib.sexp import baseenv
from rpy2.rinterface_lib.sexp import globalenv

if os.name == 'nt':
    import rpy2.rinterface_lib.embedded_mswin as embedded_mswin
    embedded._initr = embedded_mswin._initr_win32

R_NilValue = openrlib.rlib.R_NilValue

endr = embedded.endr

_evaluation_context = globalenv


def get_evaluation_context() -> SexpEnvironment:
    """Get the frame (environment) in which R code is currently evaluated."""
    return _evaluation_context


@contextlib.contextmanager
def local_context(
        env: typing.Optional[SexpEnvironment] = None,
        use_rlock: bool = True
) -> typing.Iterator[SexpEnvironment]:
    """Local context for the evaluation of R code.

    Args:
    - env: an environment to use as a context. If not specified (None, the
      default), a child environment to the current context is created.
    - use_rlock: whether to use a threading lock (see the documentation about
      "rlock". The default is True.

    Returns:
    Yield the environment (passed to env, or created).
    """

    global _evaluation_context

    parent_frame = _evaluation_context
    if env is None:
        env = baseenv['new.env'](
            baseenv['parent.frame']()
            if parent_frame is None
            else parent_frame)
    try:
        if use_rlock:
            with openrlib.rlock:
                _evaluation_context = env
                yield env
        else:
            _evaluation_context = env
            yield env
    finally:
        _evaluation_context = parent_frame


def _sigint_handler(sig, frame):
    raise KeyboardInterrupt()


@_cdata_res_to_rinterface
def parse(text: str, num: int = -1):
    """Parse a string as R code.

    :param:`text` A string with R code to parse.
    :param:`num` The maximum number of lines to parse. If -1, no
      limit is applied.
    """

    if not isinstance(text, str):
        raise TypeError('text must be a string.')
    robj = StrSexpVector([text])
    with memorymanagement.rmemory() as rmemory:
        res = _rinterface._parse(robj.__sexp__._cdata, num, rmemory)
    return res


def evalr(source: str, maxlines: int = -1) -> sexp.Sexp:
    """Evaluate a string as R code.

    Evaluate a string as R just as it would happen when writing
    code in an R terminal.

    :param:`text` A string to be evaluated as R code.
    :param:`maxlines` The maximum number of lines to parse. If -1, no
      limit is applied."""

    res = parse(source, num=maxlines)
    res = baseenv['eval'](res)
    return res


def vector_memoryview(obj: sexp.SexpVector,
                      sizeof_str: str, cast_str: str) -> memoryview:
    """
    :param:`obj` R vector
    :param:`sizeof_str` Type in a string to use with ffi.sizeof()
        (for example "int")
    :param:`cast_str` Type in a string to use with memoryview.cast()
        (for example "i")
    """
    b = openrlib.ffi.buffer(
        obj._R_GET_PTR(obj.__sexp__._cdata),
        openrlib.ffi.sizeof(sizeof_str) * len(obj))
    shape = bufferprotocol.getshape(obj.__sexp__._cdata)
    # One could have expected to only need builtin Python
    # and do something like
    # ```
    # mv = memoryview(b).cast(cast_str, shape, order='F')
    # ```
    # but Python does not handle FORTRAN-ordered arrays without having
    # to write C extensions. We have to use numpy.
    # TODO: Having numpy a requirement just for this is a problem.
    # TODO: numpy needed for memoryview
    #   (as long as https://bugs.python.org/issue34778 not resolved)
    import numpy
    a = numpy.frombuffer(b, dtype=cast_str).reshape(shape, order='F')
    mv = memoryview(a)
    return mv


class SexpSymbol(sexp.Sexp):
    """An unevaluated R symbol."""

    def __init__(
            self,
            obj: Union[Sexp,
                       _rinterface.SexpCapsule,
                       _rinterface.UninitializedRCapsule,
                       str]
    ):
        if isinstance(obj, Sexp) or isinstance(obj, _rinterface.CapsuleBase):
            super().__init__(obj)
        elif isinstance(obj, str):
            name_cdata = _rinterface.ffi.new('char []', obj.encode('utf-8'))
            sexp = _rinterface.SexpCapsule(
                openrlib.rlib.Rf_install(name_cdata))
            super().__init__(sexp)
        else:
            raise TypeError(
                'The constructor must be called '
                'with that is an instance of rpy2.rinterface.sexp.Sexp '
                'or an instance of rpy2.rinterface._rinterface.SexpCapsule')

    def __str__(self) -> str:
        return conversion._cchar_to_str(
            openrlib._STRING_VALUE(
                self._sexpobject._cdata
            ), 'utf-8'
        )


class _MissingArgType(SexpSymbol, metaclass=sexp.SingletonABC):

    def __init__(self):
        if embedded.isready():
            tmp = sexp.Sexp(
                _rinterface.UnmanagedSexpCapsule(
                    openrlib.rlib.R_MissingArg
                )
            )
        else:
            tmp = sexp.Sexp(
                _rinterface.UninitializedRCapsule(RTYPES.SYMSXP.value)
            )
        super().__init__(tmp)

    def __bool__(self) -> bool:
        """This is always False."""
        return False

    @property
    def __sexp__(self) -> _rinterface.SexpCapsule:
        return self._sexpobject


MissingArg = _MissingArgType()


class SexpPromise(Sexp):

    @_cdata_res_to_rinterface
    def eval(self, env: typing.Optional[SexpEnvironment] = None) -> sexp.Sexp:
        """"Evalute the R "promise".

        :param:`env` The environment in which to evaluate the
          promise.
        """
        if env is None:
            env = globalenv
        return openrlib.rlib.Rf_eval(self.__sexp__._cdata, env)


NPCOMPAT_TYPE = typing.TypeVar('NPCOMPAT_TYPE',
                               'ByteSexpVector',
                               'BoolSexpVector',
                               'IntSexpVector',
                               'FloatSexpVector')


class NumpyArrayMixin:
    """Numpy-specific API for accessing the content of a numpy array.

    This interface implements version 3 of Numpy's `__array_interface__`
    and is only available / possible for some of the R vectors."""

    @property
    def __array_interface__(
            self: NPCOMPAT_TYPE
    ) -> dict:
        """Return an `__array_interface__` version 3.

        Note that the pointer returned in the items 'data' corresponds to
        a memory area under R's memory management and that it will become
        invalid once the area once R frees the object. It is safer to keep
        the rpy2 object proxying the R object alive for the duration the
        pointer is used in Python / numpy."""
        shape = bufferprotocol.getshape(self.__sexp__._cdata)
        data = openrlib.ffi.buffer(self._R_GET_PTR(self.__sexp__._cdata))
        strides = bufferprotocol.getstrides(self.__sexp__._cdata,
                                            shape,
                                            self._R_SIZEOF_ELT)
        return {'shape': shape,
                'typestr': self._NP_TYPESTR,
                'strides': strides,
                'data': data,
                'version': 3}


[docs]class ByteSexpVector(NumpyArrayMixin, SexpVector): """Array of bytes. This is the R equivalent to a Python :class:`bytesarray`. """ _R_TYPE = openrlib.rlib.RAWSXP _R_SIZEOF_ELT = _rinterface.ffi.sizeof('char') _NP_TYPESTR = '|u1' _R_GET_PTR = staticmethod(openrlib.RAW) @staticmethod def _CAST_IN(x: typing.Any) -> int: if isinstance(x, int): if x > 255: raise ValueError('byte must be in range(0, 256)') res = x elif isinstance(x, (bytes, bytearray)): if len(x) != 1: raise ValueError('byte must be a single character') res = ord(x) else: raise ValueError('byte must be an integer [0, 255] or a ' 'single byte character') return res @staticmethod def _R_VECTOR_ELT(x, i: int) -> None: return openrlib.RAW(x)[i] @staticmethod def _R_SET_VECTOR_ELT(x, i: int, val) -> None: openrlib.RAW(x)[i] = val def __getitem__(self, i: Union[int, slice]) -> Union[int, 'ByteSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = openrlib.RAW_ELT(cdata, i_c) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.RAW_ELT( cdata, i_c ) for i_c in range(*i.indices(len(self))) ] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.RAW(cdata)[i_c] = self._CAST_IN(value) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): if v > 255: raise ValueError('byte must be in range(0, 256)') openrlib.RAW(cdata)[i_c] = self._CAST_IN(v) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i))
[docs]class BoolSexpVector(NumpyArrayMixin, SexpVector): """Array of booleans. Note that R is internally storing booleans as integers to allow an additional "NA" value to represent missingness.""" _R_TYPE = openrlib.rlib.LGLSXP _R_SIZEOF_ELT = _rinterface.ffi.sizeof('Rboolean') _NP_TYPESTR = '|i' _R_VECTOR_ELT = openrlib.LOGICAL_ELT _R_SET_VECTOR_ELT = openrlib.SET_LOGICAL_ELT _R_GET_PTR = staticmethod(openrlib.LOGICAL) @staticmethod def _CAST_IN(x): if x is None or x == openrlib.rlib.R_NaInt: return NA_Logical else: return bool(x) def __getitem__(self, i: Union[int, slice]) -> Union[typing.Optional[bool], 'BoolSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) elt = openrlib.LOGICAL_ELT(cdata, i_c) res = na_values.NA_Logical if elt == NA_Logical else bool(elt) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.LOGICAL_ELT(cdata, i_c) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.SET_LOGICAL_ELT(cdata, i_c, int(value)) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.SET_LOGICAL_ELT(cdata, i_c, int(v)) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) def memoryview(self) -> memoryview: return vector_memoryview(self, 'int', 'i')
def nullable_int(v): if type(v) is float and math.isnan(v): return openrlib.rlib.R_NaInt else: return int(v)
[docs]class IntSexpVector(NumpyArrayMixin, SexpVector): _R_TYPE = openrlib.rlib.INTSXP _R_SET_VECTOR_ELT = openrlib.SET_INTEGER_ELT _R_VECTOR_ELT = openrlib.INTEGER_ELT _R_SIZEOF_ELT = _rinterface.ffi.sizeof('int') _NP_TYPESTR = '|i' _R_GET_PTR = staticmethod(openrlib.INTEGER) _CAST_IN = staticmethod(nullable_int) def __getitem__(self, i: Union[int, slice]) -> Union[int, 'IntSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = openrlib.INTEGER_ELT(cdata, i_c) if res == NA_Integer: res = NA_Integer elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.INTEGER_ELT( cdata, i_c ) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.SET_INTEGER_ELT(cdata, i_c, int(value)) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.SET_INTEGER_ELT(cdata, i_c, int(v)) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) def memoryview(self) -> memoryview: return vector_memoryview(self, 'int', 'i')
[docs]class FloatSexpVector(NumpyArrayMixin, SexpVector): _R_TYPE = openrlib.rlib.REALSXP _R_VECTOR_ELT = openrlib.REAL_ELT _R_SET_VECTOR_ELT = openrlib.SET_REAL_ELT _R_SIZEOF_ELT = _rinterface.ffi.sizeof('double') _NP_TYPESTR = '|d' _CAST_IN = staticmethod(float) _R_GET_PTR = staticmethod(openrlib.REAL) def __getitem__( self, i: Union[int, slice] ) -> Union[float, 'FloatSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) res = openrlib.REAL_ELT(cdata, i_c) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.REAL_ELT( cdata, i_c) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError('Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.SET_REAL_ELT(cdata, i_c, float(value)) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.SET_REAL_ELT(cdata, i_c, float(v)) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) def memoryview(self) -> memoryview: return vector_memoryview(self, 'double', 'd')
[docs]class ComplexSexpVector(SexpVector): _R_TYPE = openrlib.rlib.CPLXSXP _R_GET_PTR = staticmethod(openrlib.COMPLEX) _R_SIZEOF_ELT = _rinterface.ffi.sizeof('Rcomplex') @staticmethod def _R_VECTOR_ELT(x, i): return openrlib.COMPLEX(x)[i] @staticmethod def _R_SET_VECTOR_ELT(x, i, v): openrlib.COMPLEX(x).__setitem__(i, v) @staticmethod def _CAST_IN(x): if isinstance(x, complex): res = (x.real, x.imag) else: try: res = (x.r, x.i) except AttributeError: raise TypeError( 'Unable to turn value into an R complex number.' ) return res def __getitem__( self, i: Union[int, slice] ) -> Union[complex, 'ComplexSexpVector']: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) _ = openrlib.COMPLEX_ELT(cdata, i_c) res = complex(_.r, _.i) elif isinstance(i, slice): res = type(self).from_iterable( [openrlib.COMPLEX_ELT( cdata, i_c) for i_c in range(*i.indices(len(self)))] ) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res def __setitem__(self, i: Union[int, slice], value) -> None: cdata = self.__sexp__._cdata if isinstance(i, int): i_c = _rinterface._python_index_to_c(cdata, i) openrlib.COMPLEX(cdata)[i_c] = self._CAST_IN(value) elif isinstance(i, slice): for i_c, v in zip(range(*i.indices(len(self))), value): openrlib.COMPLEX(cdata)[i_c] = self._CAST_IN(v) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i))
[docs]class ListSexpVector(SexpVector): """R list. An R list an R vector (array) that is similar to a Python list in the sense that items in the list can be of any type, whereas most other R vectors are homogeneous (all items are of the same type). """ _R_TYPE = openrlib.rlib.VECSXP _R_GET_PTR = staticmethod(openrlib._VECTOR_PTR) _R_SIZEOF_ELT = None _R_VECTOR_ELT = openrlib.rlib.VECTOR_ELT _R_SET_VECTOR_ELT = openrlib.rlib.SET_VECTOR_ELT _CAST_IN = staticmethod(conversion._get_cdata)
[docs]class PairlistSexpVector(SexpVector): """R pairlist. A R pairlist is rarely used outside of R's internal libraries and a relatively small number of use cases. It is essentially a LISP-like list of (name, value) pairs. """ _R_TYPE = openrlib.rlib.LISTSXP _R_GET_PTR = None _R_SIZEOF_ELT = None _R_VECTOR_ELT = None _R_SET_VECTOR_ELT = None _CAST_IN = staticmethod(conversion._get_cdata) def __getitem__(self, i: Union[int, slice]) -> Sexp: cdata = self.__sexp__._cdata rlib = openrlib.rlib if isinstance(i, int): # R-exts says that it is converted to a VECSXP when subsetted. i_c = _rinterface._python_index_to_c(cdata, i) item_cdata = rlib.Rf_nthcdr(cdata, i_c) with memorymanagement.rmemory() as rmemory: res_cdata = rmemory.protect( rlib.Rf_allocVector(RTYPES.VECSXP, 1)) rlib.SET_VECTOR_ELT( res_cdata, 0, rlib.CAR( item_cdata )) res_name = rmemory.protect( rlib.Rf_allocVector(RTYPES.STRSXP, 1)) item_cdata_name = rlib.PRINTNAME(rlib.TAG(item_cdata)) if _rinterface._TYPEOF(item_cdata_name) != rlib.NILSXP: rlib.SET_STRING_ELT( res_name, 0, item_cdata_name) rlib.Rf_namesgets(res_cdata, res_name) res = conversion._cdata_to_rinterface(res_cdata) elif isinstance(i, slice): iter_indices = range(*i.indices(len(self))) n = len(iter_indices) with memorymanagement.rmemory() as rmemory: res_cdata = rmemory.protect( rlib.Rf_allocVector( self._R_TYPE, n) ) iter_res_cdata = res_cdata prev_i = 0 lst_cdata = self.__sexp__._cdata for i in iter_indices: if i >= len(self): raise IndexError('index out of range') lst_cdata = rlib.Rf_nthcdr(lst_cdata, i - prev_i) prev_i = i rlib.SETCAR(iter_res_cdata, rlib.CAR(lst_cdata)) rlib.SET_TAG(iter_res_cdata, rlib.TAG(lst_cdata)) iter_res_cdata = rlib.CDR(iter_res_cdata) res = conversion._cdata_to_rinterface(res_cdata) else: raise TypeError( 'Indices must be integers or slices, not %s' % type(i)) return res @classmethod @_cdata_res_to_rinterface def from_iterable(cls, iterable, cast_in=None): raise NotImplementedError()
class ExprSexpVector(SexpVector): _R_TYPE = openrlib.rlib.EXPRSXP _R_GET_PTR = None _CAST_IN = None _R_SIZEOF_ELT = None _R_VECTOR_ELT = openrlib.rlib.VECTOR_ELT _R_SET_VECTOR_ELT = None
[docs]class LangSexpVector(SexpVector): _R_TYPE = openrlib.rlib.LANGSXP _R_GET_PTR = None _CAST_IN = None _R_SIZEOF_ELT = None _R_VECTOR_ELT = None _R_SET_VECTOR_ELT = None @_cdata_res_to_rinterface def __getitem__(self, i: int): cdata = self.__sexp__._cdata i_c = _rinterface._python_index_to_c(cdata, i) return openrlib.rlib.CAR( openrlib.rlib.Rf_nthcdr(cdata, i_c) ) def __setitem__(self, i: int, value: sexp.SupportsSEXP) -> None: cdata = self.__sexp__._cdata i_c = _rinterface._python_index_to_c(cdata, i) openrlib.rlib.SETCAR( openrlib.rlib.Rf_nthcdr(cdata, i_c), value.__sexp__._cdata )
class SexpClosure(Sexp): @_cdata_res_to_rinterface def __call__(self, *args, **kwargs) -> Sexp: error_occured = _rinterface.ffi.new('int *', 0) with memorymanagement.rmemory() as rmemory: call_r = rmemory.protect( _rinterface.build_rcall(self.__sexp__._cdata, args, kwargs.items())) call_context = _evaluation_context res = rmemory.protect( openrlib.rlib.R_tryEval( call_r, call_context.__sexp__._cdata, error_occured) ) if error_occured[0]: raise embedded.RRuntimeError(_rinterface._geterrmessage()) return res @_cdata_res_to_rinterface def rcall(self, keyvals, environment: typing.Optional[SexpEnvironment] = None): """Call/evaluate an R function. Args: - keyvals: a sequence of key/value (name/parameter) pairs. A name/parameter that is None will indicated an unnamed parameter. Like in R, keys/names do not have to be unique, partial matching can be used, and named/unnamed parameters can occur at any position in the sequence. - environment: an optional R environment to evaluate the function. """ # TODO: check keyvals are pairs ? if environment is None: environment = _evaluation_context assert isinstance(environment, SexpEnvironment) error_occured = _rinterface.ffi.new('int *', 0) with memorymanagement.rmemory() as rmemory: call_r = rmemory.protect( _rinterface.build_rcall(self.__sexp__._cdata, [], keyvals)) res = rmemory.protect( openrlib.rlib.R_tryEval(call_r, environment.__sexp__._cdata, error_occured)) if error_occured[0]: raise embedded.RRuntimeError(_rinterface._geterrmessage()) return res @property @_cdata_res_to_rinterface def closureenv(self) -> SexpEnvironment: """Closure of the R function.""" return openrlib.rlib.CLOENV(self.__sexp__._cdata)
[docs]class SexpS4(Sexp): """R "S4" object.""" pass
# TODO: clean up def make_extptr(obj, tag, protected): if protected is None: cdata_protected = openrlib.rlib.R_NilValue else: try: cdata_protected = protected.__sexp__._cdata except AttributeError: raise TypeError('Argument protected must inherit from %s' % type(Sexp)) ptr = _rinterface.ffi.new_handle(obj) with memorymanagement.rmemory() as rmemory: cdata = rmemory.protect( openrlib.rlib.R_MakeExternalPtr( ptr, tag, cdata_protected)) openrlib.rlib.R_RegisterCFinalizer( cdata, (_rinterface._capsule_finalizer_c if _rinterface._capsule_finalizer_c else _rinterface._capsule_finalizer)) res = _rinterface.SexpCapsuleWithPassenger(cdata, obj, ptr) return res
[docs]class SexpExtPtr(Sexp): TYPE_TAG = 'Python' @classmethod def from_pyobject(cls, func, tag: str = TYPE_TAG, protected=None): if not isinstance(tag, str): raise TypeError('The tag must be a string.') scaps = make_extptr(func, conversion._str_to_charsxp(cls.TYPE_TAG), protected) res = cls(scaps) if tag != cls.TYPE_TAG: res.TYPE_TAG = tag return res
# TODO: Only use rinterface-level ? conversion._R_RPY2_MAP.update({ openrlib.rlib.NILSXP: NULLType, openrlib.rlib.EXPRSXP: ExprSexpVector, openrlib.rlib.LANGSXP: LangSexpVector, openrlib.rlib.ENVSXP: SexpEnvironment, openrlib.rlib.RAWSXP: ByteSexpVector, openrlib.rlib.LGLSXP: BoolSexpVector, openrlib.rlib.INTSXP: IntSexpVector, openrlib.rlib.REALSXP: FloatSexpVector, openrlib.rlib.CPLXSXP: ComplexSexpVector, openrlib.rlib.STRSXP: StrSexpVector, openrlib.rlib.VECSXP: ListSexpVector, openrlib.rlib.LISTSXP: PairlistSexpVector, openrlib.rlib.CLOSXP: SexpClosure, openrlib.rlib.BUILTINSXP: SexpClosure, openrlib.rlib.SPECIALSXP: SexpClosure, openrlib.rlib.EXTPTRSXP: SexpExtPtr, openrlib.rlib.SYMSXP: SexpSymbol, openrlib.rlib.S4SXP: SexpS4 }) conversion._R_RPY2_DEFAULT_MAP = Sexp conversion._PY_RPY2_MAP.update({ int: conversion._int_to_sexp, float: conversion._float_to_sexp, complex: conversion._complex_to_sexp }) conversion._PY_R_MAP.update({ _rinterface.ffi.CData: False, # integer int: conversion._int_to_sexp, sexp.NAIntegerType: conversion._int_to_sexp, # float float: conversion._float_to_sexp, sexp.NARealType: conversion._float_to_sexp, # boolean bool: conversion._bool_to_sexp, sexp.NALogicalType: conversion._bool_to_sexp, # string str: conversion._str_to_sexp, sexp.CharSexp: None, sexp.NACharacterType: None, # complex complex: conversion._complex_to_sexp, sexp.NAComplexType: conversion._complex_to_sexp, # None type(None): lambda x: openrlib.rlib.R_NilValue}) def vector(iterable, rtype: RTYPES) -> SexpVector: """Create an R vector. While the different types of R vectors all have their own class, the creation of array in Python is often available through factory function that accept the type of the array as a parameters. This function is providing a similar functionality for R vectors.""" error = False try: cls = conversion._R_RPY2_MAP[rtype] except KeyError: error = True if not error and not issubclass(cls, SexpVector): error = True if error: raise ValueError( 'Unable to build a vector from type "%s"' % RTYPES(rtype)) return cls.from_iterable(iterable) class RRuntimeWarning(RuntimeWarning): pass NULL = None MissingArg = _MissingArgType() NA_Character = None NA_Integer = None NA_Logical = None NA = None NA_Real = None NA_Complex = None def initr_simple() -> typing.Optional[int]: """Initialize R's embedded C library.""" with openrlib.rlock: status = embedded._initr() atexit.register(endr, 0) _rinterface._register_external_symbols() _post_initr_setup() return status def initr_checkenv() -> typing.Optional[int]: # Force the internal initialization flag if there is an environment # variable that indicates that R was already initialized in the current # process. status = None with openrlib.rlock: if embedded.is_r_externally_initialized(): embedded._setinitialized() else: status = embedded._initr() embedded.set_python_process_info() _rinterface._register_external_symbols() _post_initr_setup() return status initr = initr_checkenv def _update_R_ENC_PY(): conversion._R_ENC_PY[openrlib.rlib.CE_UTF8] = 'utf-8' l10n_info = tuple(baseenv['l10n_info']()) if platform.system() == 'Windows': val_latin1 = 'cp{:d}'.format(l10n_info[3][0]) else: val_latin1 = 'latin1' conversion._R_ENC_PY[openrlib.rlib.CE_LATIN1] = val_latin1 if l10n_info[1]: val_native = conversion._R_ENC_PY[openrlib.rlib.CE_UTF8] else: val_native = val_latin1 conversion._R_ENC_PY[openrlib.rlib.CE_NATIVE] = val_native conversion._R_ENC_PY[openrlib.rlib.CE_ANY] = 'utf-8' def _post_initr_setup() -> None: emptyenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_EmptyEnv) baseenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_BaseEnv) globalenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_GlobalEnv) global NULL NULL = NULLType() MissingArg._sexpobject = _rinterface.UnmanagedSexpCapsule( openrlib.rlib.R_MissingArg ) global NA_Character na_values.NA_Character = sexp.NACharacterType() NA_Character = na_values.NA_Character global NA_Integer na_values.NA_Integer = sexp.NAIntegerType(openrlib.rlib.R_NaInt) NA_Integer = na_values.NA_Integer global NA_Logical, NA na_values.NA_Logical = sexp.NALogicalType(openrlib.rlib.R_NaInt) NA_Logical = na_values.NA_Logical NA = NA_Logical global NA_Real na_values.NA_Real = sexp.NARealType(openrlib.rlib.R_NaReal) NA_Real = na_values.NA_Real global NA_Complex na_values.NA_Complex = sexp.NAComplexType( _rinterface.ffi.new( 'Rcomplex *', [openrlib.rlib.R_NaReal, openrlib.rlib.R_NaReal]) ) NA_Complex = na_values.NA_Complex warn_about_thread = False if threading.current_thread() is threading.main_thread(): try: signal.signal(signal.SIGINT, _sigint_handler) except ValueError as ve: if str(ve) == 'signal only works in main thread': warn_about_thread = True else: raise ve else: warn_about_thread = True if warn_about_thread: warnings.warn( textwrap.dedent( """R is not initialized by the main thread. Its taking over SIGINT cannot be reversed here, and as a consequence the embedded R cannot be interrupted with Ctrl-C. Consider (re)setting the signal handler of your choice from the main thread.""" ) ) _update_R_ENC_PY()
[docs]def rternalize(function: typing.Callable) -> SexpClosure: """ Make a Python function callable from R. Takes an arbitrary Python function and wrap it in such a way that it can be called from the R side. :param function: A Python callable object. :return: A wrapped R object that can be use like any other rpy2 object. """ assert callable(function) rpy_fun = SexpExtPtr.from_pyobject(function) # TODO: this is a hack. Find a better way. template = parse(""" function(...) { .External(".Python", foo, ...); } """) template[0][2][1][2] = rpy_fun # TODO: use lower-level eval ? res = baseenv['eval'](template) # TODO: hack to prevent the nested function from having its # refcount down to zero when returning res.__nested_sexp__ = rpy_fun.__sexp__ return res
[docs]def process_revents() -> None: """Process R events. Calling this function a regular interval can help ensure that R is processing input events (e.g., resizing of a window with graphics).""" openrlib.rlib.rpy2_runHandlers(openrlib.rlib.R_InputHandlers)