import abc
import atexit
import contextlib
import contextvars
import csv
import enum
import functools
import inspect
import os
import math
import platform
import signal
import subprocess
import textwrap
import threading
import typing
import warnings
from typing import Union
from rpy2.rinterface_lib import openrlib
import rpy2.rinterface_lib._rinterface_capi as _rinterface
import rpy2.rinterface_lib.embedded as embedded
import rpy2.rinterface_lib.conversion as conversion
from rpy2.rinterface_lib.conversion import _cdata_res_to_rinterface
import rpy2.rinterface_lib.memorymanagement as memorymanagement
from rpy2.rinterface_lib import na_values
from rpy2.rinterface_lib.sexp import NULL
from rpy2.rinterface_lib.sexp import NULLType
import rpy2.rinterface_lib.bufferprotocol as bufferprotocol
from rpy2.rinterface_lib import sexp
from rpy2.rinterface_lib.sexp import CharSexp # noqa: F401
from rpy2.rinterface_lib.sexp import RTYPES
from rpy2.rinterface_lib.sexp import SexpVector
from rpy2.rinterface_lib.sexp import StrSexpVector
from rpy2.rinterface_lib.sexp import Sexp
from rpy2.rinterface_lib.sexp import SexpEnvironment
from rpy2.rinterface_lib.sexp import unserialize # noqa: F401
from rpy2.rinterface_lib.sexp import emptyenv
from rpy2.rinterface_lib.sexp import baseenv
from rpy2.rinterface_lib.sexp import globalenv
if os.name == 'nt':
import rpy2.rinterface_lib.embedded_mswin as embedded_mswin
embedded._initr = embedded_mswin._initr_win32
R_NilValue = openrlib.rlib.R_NilValue
endr = embedded.endr
evaluation_context = contextvars.ContextVar('evaluation_context',
default=globalenv)
class _ENVVAR_ACTION(enum.Enum):
KEEP_WARN = 0
REPLACE_WARN = 1
KEEP_NOWARN = 2
REPLACE_NOWARN = 3
_DEFAULT_ENVVAR_ACTION: _ENVVAR_ACTION = _ENVVAR_ACTION.REPLACE_WARN
def get_evaluation_context() -> SexpEnvironment:
"""Get the frame (environment) in which R code is currently evaluated."""
warnings.warn('This function is deprecated. '
'Use evaluation_context directly.',
DeprecationWarning, stacklevel=2)
return evaluation_context.get()
@contextlib.contextmanager
def local_context(
env: typing.Optional[SexpEnvironment] = None,
use_rlock: bool = True
) -> typing.Iterator[SexpEnvironment]:
"""Local context for the evaluation of R code.
Args:
- env: an environment to use as a context. If not specified (None, the
default), a child environment to the current context is created.
- use_rlock: whether to use a threading lock (see the documentation about
"rlock". The default is True.
Returns:
Yield the environment (passed to env, or created).
"""
parent_frame = evaluation_context.get()
if env is None:
env = baseenv['new.env'](
baseenv['parent.frame']()
if parent_frame is None
else parent_frame)
try:
if use_rlock:
with openrlib.rlock:
token = evaluation_context.set(env)
yield env
else:
token = evaluation_context.set(env)
yield env
finally:
evaluation_context.reset(token)
def _sigint_handler(sig, frame):
raise KeyboardInterrupt()
@_cdata_res_to_rinterface
def parse(text: str, num: int = -1):
"""Parse a string as R code.
:param text: A string with R code to parse.
:param num: The maximum number of lines to parse. If -1, no
limit is applied.
"""
if not isinstance(text, str):
raise TypeError('text must be a string.')
robj = StrSexpVector([text])
with memorymanagement.rmemory() as rmemory:
res = _rinterface._parse(robj.__sexp__._cdata, num, rmemory)
return res
def evalr_expr(
expr: 'ExprSexpVector',
envir: typing.Union[
None,
'SexpEnvironment', 'NULLType',
'ListSexpVector', 'PairlistSexpVector', int,
'_MissingArgType'] = None,
enclos: typing.Union[
None,
'ListSexpVector', 'PairlistSexpVector',
'NULLType',
'_MissingArgType'] = None
) -> sexp.Sexp:
"""Evaluate an R expression.
:param expr: An R expression.
:param envir: An optional R environment in which the evaluation
will happen. If None, which is the default, the environment in
the context variable `evaluation_context` is used.
:param enclos: An enclosure. This is only relevant when envir
is a list, a pairlist, or a data.frame. It specifies where to
look for objects not found in envir.
:return: The R objects resulting from the evaluation of the code"""
if envir is None:
envir = evaluation_context.get()
if enclos is None:
enclos = MissingArg
res = baseenv['eval'](expr, envir=envir, enclos=enclos)
return res
def evalr_expr_with_visible(
expr: 'ExprSexpVector',
envir: typing.Union[
None,
'SexpEnvironment'] = None
) -> 'ListSexpVector':
"""Evaluate an R expression and return value and visibility flag.
:param expr: An R expression.
:param envir: An environment in which the expression will be evaluated.
:return: An R list with (value, visibility) where visibility is
an R boolean.
"""
if envir is None:
envir = evaluation_context.get()
assert isinstance(envir, SexpEnvironment)
error_occured = _rinterface.ffi.new('int *', 0)
with memorymanagement.rmemory() as rmemory:
r_call_nested = rmemory.protect(
openrlib.rlib.Rf_lang2(
baseenv['eval'].__sexp__._cdata,
expr.__sexp__._cdata)
)
r_call = rmemory.protect(
openrlib.rlib.Rf_lang2(
baseenv['withVisible'].__sexp__._cdata,
r_call_nested)
)
r_res = rmemory.protect(
openrlib.rlib.R_tryEval(
r_call,
envir.__sexp__._cdata, # call context.
error_occured)
)
if error_occured[0]:
raise embedded.RRuntimeError(_rinterface._geterrmessage())
res = conversion._cdata_to_rinterface(r_res)
assert isinstance(res, ListSexpVector)
return res
def evalr(
source: str,
maxlines: int = -1,
envir: typing.Union[
None,
'SexpEnvironment', 'NULLType',
'ListSexpVector', 'PairlistSexpVector', int,
'_MissingArgType'] = None,
enclos: typing.Union[
None,
'ListSexpVector', 'PairlistSexpVector',
'NULLType', '_MissingArgType'] = None
) -> sexp.Sexp:
"""Evaluate a string as R code.
Evaluate a string as R just as it would happen when writing
code in an R terminal.
:param str text: A string to be evaluated as R code,
or an R expression.
:param int maxlines: The maximum number of lines to parse. If -1, no
limit is applied.
:param envir: An optional R environment in which the evaluation
will happen.
:param enclos: An enclosure. This is only relevant when envir
is a list, a pairlist, or a data.frame. It specifies where to
look for objects not found in envir.
:return: The R objects resulting from the evaluation of the code"""
expr = parse(source, num=maxlines)
res = evalr_expr(expr, envir=envir, enclos=enclos)
return res
def vector_memoryview(obj: sexp.SexpVector,
sizeof_str: str, cast_str: str) -> memoryview:
"""
:param obj: R vector
:param str sizeof_str: Type in a string to use with ffi.sizeof()
(for example "int")
:param str cast_str: Type in a string to use with memoryview.cast()
(for example "i")
"""
b = openrlib.ffi.buffer(
obj._R_GET_PTR(obj.__sexp__._cdata),
openrlib.ffi.sizeof(sizeof_str) * len(obj))
shape = bufferprotocol.getshape(obj.__sexp__._cdata)
# One could have expected to only need builtin Python
# and do something like
# ```
# mv = memoryview(b).cast(cast_str, shape, order='F')
# ```
# but Python does not handle FORTRAN-ordered arrays without having
# to write C extensions (see
# https://bugs.python.org/issue34778 is not resolved).
# Having numpy a requirement just for this is a problem so a
# C function to swap the strides in place was written
try:
import rpy2.rinterface_lib._bufferprotocol as bp # type: ignore
mv = memoryview(b).cast(cast_str, shape)
bp.memoryview_swapstrides(mv)
except ImportError:
import numpy # type: ignore
a = numpy.frombuffer(b, dtype=cast_str).reshape(shape, order='F')
# The typed signature for memoryview does not help the static
# type checker verify that a numpy.ndarray implement the buffer
# protocol. Type checking is ignored to avoid a check error.
mv = memoryview(a) # type: ignore
return mv
class SexpSymbol(sexp.Sexp):
"""An unevaluated R symbol."""
def __init__(
self,
obj: Union[Sexp,
_rinterface.SexpCapsule,
_rinterface.UninitializedRCapsule,
str]
):
if isinstance(obj, Sexp) or isinstance(obj, _rinterface.CapsuleBase):
super().__init__(obj)
elif isinstance(obj, str):
name_cdata = _rinterface.ffi.new('char []', obj.encode('utf-8'))
sexp = _rinterface.SexpCapsule(
openrlib.rlib.Rf_install(name_cdata))
super().__init__(sexp)
else:
raise TypeError(
'The constructor must be called '
'with that is an instance of rpy2.rinterface.sexp.Sexp '
'or an instance of rpy2.rinterface._rinterface.SexpCapsule')
def __str__(self) -> str:
return conversion._cchar_to_str(
openrlib._STRING_VALUE(
self._sexpobject._cdata
), 'utf-8'
)
class _MissingArgType(SexpSymbol, metaclass=sexp.SingletonABC):
"""Missing argument in a call to an R function.
When evaluating a function, arguments that are not specified
can take a default value when named, otherwise they will
take a special value that indicates that they are missing."""
def __init__(self):
if embedded.isready():
tmp = sexp.Sexp(
_rinterface.UnmanagedSexpCapsule(
openrlib.rlib.R_MissingArg
)
)
else:
tmp = sexp.Sexp(
_rinterface.UninitializedRCapsule(RTYPES.SYMSXP.value)
)
super().__init__(tmp)
def __bool__(self) -> bool:
"""This is always False."""
return False
@property
def __sexp__(self) -> typing.Union['_rinterface.SexpCapsule',
'_rinterface.UninitializedRCapsule']:
return self._sexpobject
@__sexp__.setter
def __sexp__(self, value: typing.Union['_rinterface.SexpCapsule',
'_rinterface.UninitializedRCapsule']) -> None:
raise TypeError('The capsule for the R object cannot be modified.')
MissingArg = _MissingArgType()
class SexpPromise(Sexp):
@_cdata_res_to_rinterface
def eval(self, env: typing.Optional[SexpEnvironment] = None) -> sexp.Sexp:
""""Evalute the R "promise".
:param env: The environment in which to evaluate the
promise.
"""
if env is None:
env = globalenv
return openrlib.rlib.Rf_eval(self.__sexp__._cdata, env)
class SexpVectorWithNumpyInterface(SexpVector):
"""Numpy-specific API for accessing the content of a numpy array.
This interface implements version 3 of Numpy's `__array_interface__`
and is only available / possible for some of the R vectors."""
@property
@abc.abstractmethod
def _NP_TYPESTR(self) -> str:
pass
@property
def __array_interface__(
self
) -> dict:
"""Return an `__array_interface__` version 3.
Note that the pointer returned in the items 'data' corresponds to
a memory area under R's memory management and that it will become
invalid once the area once R frees the object. It is safer to keep
the rpy2 object proxying the R object alive for the duration the
pointer is used in Python / numpy."""
shape = bufferprotocol.getshape(self.__sexp__._cdata)
data = openrlib.ffi.buffer(self._R_GET_PTR(self.__sexp__._cdata))
strides = bufferprotocol.getstrides(self.__sexp__._cdata,
shape,
self._R_SIZEOF_ELT)
return {'shape': shape,
'typestr': self._NP_TYPESTR,
'strides': strides,
'data': data,
'version': 3}
@classmethod
def _check_C_compatible(cls, mview):
return (
mview.itemsize == cls._R_SIZEOF_ELT
and
(
cls._NP_TYPESTR == '|u1'
or
cls._NP_TYPESTR.endswith(mview.format)
)
)
[docs]class ByteSexpVector(SexpVectorWithNumpyInterface):
"""Array of bytes.
This is the R equivalent to a Python :class:`bytesarray`.
"""
_R_TYPE = openrlib.rlib.RAWSXP
_R_SIZEOF_ELT = _rinterface.ffi.sizeof('char')
_NP_TYPESTR = '|u1'
_R_GET_PTR = staticmethod(openrlib.RAW)
@staticmethod
def _CAST_IN(x: typing.Any) -> int:
if isinstance(x, int):
if x > 255:
raise ValueError('byte must be in range(0, 256)')
res = x
elif isinstance(x, (bytes, bytearray)):
if len(x) != 1:
raise ValueError('byte must be a single character')
res = ord(x)
else:
raise ValueError('byte must be an integer [0, 255] or a '
'single byte character')
return res
@staticmethod
def _R_VECTOR_ELT(x, i: int) -> None:
return openrlib.RAW(x)[i]
@staticmethod
def _R_SET_VECTOR_ELT(x, i: int, val) -> None:
openrlib.RAW(x)[i] = val
def __getitem__(self,
i: Union[int, slice]) -> Union[int, 'ByteSexpVector']:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
res = openrlib.RAW_ELT(cdata, i_c)
elif isinstance(i, slice):
res = type(self).from_iterable(
[openrlib.RAW_ELT(
cdata, i_c
) for i_c in range(*i.indices(len(self)))
]
)
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
return res
def __setitem__(self, i: Union[int, slice], value) -> None:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
openrlib.RAW(cdata)[i_c] = self._CAST_IN(value)
elif isinstance(i, slice):
for i_c, v in zip(range(*i.indices(len(self))), value):
if v > 255:
raise ValueError('byte must be in range(0, 256)')
openrlib.RAW(cdata)[i_c] = self._CAST_IN(v)
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
[docs]class BoolSexpVector(SexpVectorWithNumpyInterface):
"""Array of booleans.
Note that R is internally storing booleans as integers to
allow an additional "NA" value to represent missingness."""
_R_TYPE = openrlib.rlib.LGLSXP
_R_SIZEOF_ELT = _rinterface.ffi.sizeof('Rboolean')
_NP_TYPESTR = '|i'
_R_VECTOR_ELT = openrlib.LOGICAL_ELT
_R_SET_VECTOR_ELT = openrlib.SET_LOGICAL_ELT
_R_GET_PTR = staticmethod(openrlib.LOGICAL)
@staticmethod
def _CAST_IN(x):
if x is None or x == openrlib.rlib.R_NaInt:
return NA_Logical
else:
return bool(x)
def __getitem__(self, i: Union[int, slice]) -> Union[bool,
'sexp.NALogicalType',
'BoolSexpVector']:
res: Union[bool,
'sexp.NALogicalType',
'BoolSexpVector']
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
elt = openrlib.LOGICAL_ELT(cdata, i_c)
res = (na_values.NA_Logical # type: ignore
if elt == NA_Logical else bool(elt))
elif isinstance(i, slice):
res = type(self).from_iterable(
[openrlib.LOGICAL_ELT(cdata, i_c)
for i_c in range(*i.indices(len(self)))]
)
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
return res
def __setitem__(self, i: Union[int, slice], value) -> None:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
openrlib.SET_LOGICAL_ELT(cdata, i_c,
int(value))
elif isinstance(i, slice):
for i_c, v in zip(range(*i.indices(len(self))), value):
openrlib.SET_LOGICAL_ELT(cdata, i_c,
int(v))
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
def memoryview(self) -> memoryview:
return vector_memoryview(self, 'int', 'i')
def nullable_int(v):
if type(v) is float and math.isnan(v):
return openrlib.rlib.R_NaInt
else:
return int(v)
[docs]class IntSexpVector(SexpVectorWithNumpyInterface):
_R_TYPE = openrlib.rlib.INTSXP
_R_SET_VECTOR_ELT = openrlib.SET_INTEGER_ELT
_R_VECTOR_ELT = openrlib.INTEGER_ELT
_R_SIZEOF_ELT = _rinterface.ffi.sizeof('int')
_NP_TYPESTR = '|i'
_R_GET_PTR = staticmethod(openrlib.INTEGER)
_CAST_IN = staticmethod(nullable_int)
def __getitem__(self, i: Union[int, slice]) -> Union[int, 'IntSexpVector']:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
res = openrlib.INTEGER_ELT(cdata, i_c)
if res == NA_Integer:
res = NA_Integer
elif isinstance(i, slice):
res = type(self).from_iterable(
[openrlib.INTEGER_ELT(
cdata, i_c
) for i_c in range(*i.indices(len(self)))]
)
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
return res
def __setitem__(self, i: Union[int, slice], value) -> None:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
openrlib.SET_INTEGER_ELT(cdata, i_c,
int(value))
elif isinstance(i, slice):
for i_c, v in zip(range(*i.indices(len(self))), value):
openrlib.SET_INTEGER_ELT(cdata, i_c,
int(v))
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
def memoryview(self) -> memoryview:
return vector_memoryview(self, 'int', 'i')
[docs]class FloatSexpVector(SexpVectorWithNumpyInterface):
_R_TYPE = openrlib.rlib.REALSXP
_R_VECTOR_ELT = openrlib.REAL_ELT
_R_SET_VECTOR_ELT = openrlib.SET_REAL_ELT
_R_SIZEOF_ELT = _rinterface.ffi.sizeof('double')
_NP_TYPESTR = '|d'
_CAST_IN = staticmethod(float)
_R_GET_PTR = staticmethod(openrlib.REAL)
def __getitem__(
self, i: Union[int, slice]
) -> Union[float, 'FloatSexpVector']:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
res = openrlib.REAL_ELT(cdata, i_c)
elif isinstance(i, slice):
res = type(self).from_iterable(
[openrlib.REAL_ELT(
cdata, i_c) for i_c in range(*i.indices(len(self)))]
)
else:
raise TypeError('Indices must be integers or slices, not %s' %
type(i))
return res
def __setitem__(self, i: Union[int, slice], value) -> None:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
openrlib.SET_REAL_ELT(cdata, i_c,
float(value))
elif isinstance(i, slice):
for i_c, v in zip(range(*i.indices(len(self))), value):
openrlib.SET_REAL_ELT(cdata, i_c,
float(v))
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
def memoryview(self) -> memoryview:
return vector_memoryview(self, 'double', 'd')
[docs]class ComplexSexpVector(SexpVector):
_R_TYPE = openrlib.rlib.CPLXSXP
_R_GET_PTR = staticmethod(openrlib.COMPLEX)
_R_SIZEOF_ELT = _rinterface.ffi.sizeof('Rcomplex')
@staticmethod
def _R_VECTOR_ELT(x, i):
return openrlib.COMPLEX(x)[i]
@staticmethod
def _R_SET_VECTOR_ELT(x, i, v):
openrlib.COMPLEX(x).__setitem__(i, v)
@staticmethod
def _CAST_IN(x):
if isinstance(x, complex):
res = (x.real, x.imag)
else:
try:
res = (x.r, x.i)
except AttributeError:
raise TypeError(
'Unable to turn value into an R complex number.'
)
return res
def __getitem__(
self, i: Union[int, slice]
) -> Union[complex, 'ComplexSexpVector']:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
_ = openrlib.COMPLEX_ELT(cdata, i_c)
res = complex(_.r, _.i)
elif isinstance(i, slice):
res = type(self).from_iterable(
[openrlib.COMPLEX_ELT(
cdata, i_c) for i_c in range(*i.indices(len(self)))]
)
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
return res
def __setitem__(self, i: Union[int, slice], value) -> None:
cdata = self.__sexp__._cdata
if isinstance(i, int):
i_c = _rinterface._python_index_to_c(cdata, i)
openrlib.COMPLEX(cdata)[i_c] = self._CAST_IN(value)
elif isinstance(i, slice):
for i_c, v in zip(range(*i.indices(len(self))), value):
openrlib.COMPLEX(cdata)[i_c] = self._CAST_IN(v)
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
[docs]class ListSexpVector(SexpVector):
"""R list.
An R list an R vector (array) that is similar to a Python list in
the sense that items in the list can be of any type, whereas most
other R vectors are homogeneous (all items are of the same type).
"""
_R_TYPE = openrlib.rlib.VECSXP
_R_GET_PTR = staticmethod(openrlib._VECTOR_PTR)
_R_SIZEOF_ELT = None
_R_VECTOR_ELT = openrlib.rlib.VECTOR_ELT
_R_SET_VECTOR_ELT = openrlib.rlib.SET_VECTOR_ELT
_CAST_IN = staticmethod(conversion._get_cdata)
[docs]class PairlistSexpVector(SexpVector):
"""R pairlist.
A R pairlist is rarely used outside of R's internal libraries and
a relatively small number of use cases. It is essentially a LISP-like
list of (name, value) pairs.
"""
_R_TYPE = openrlib.rlib.LISTSXP
_R_GET_PTR = None
_R_SIZEOF_ELT = None
_R_VECTOR_ELT = None
_R_SET_VECTOR_ELT = None
_CAST_IN = staticmethod(conversion._get_cdata)
def __getitem__(self, i: Union[int, slice]) -> Sexp:
cdata = self.__sexp__._cdata
rlib = openrlib.rlib
if isinstance(i, int):
# R-exts says that it is converted to a VECSXP when subsetted.
i_c = _rinterface._python_index_to_c(cdata, i)
item_cdata = rlib.Rf_nthcdr(cdata, i_c)
with memorymanagement.rmemory() as rmemory:
res_cdata = rmemory.protect(
rlib.Rf_allocVector(RTYPES.VECSXP, 1))
rlib.SET_VECTOR_ELT(
res_cdata,
0,
rlib.CAR(
item_cdata
))
res_name = rmemory.protect(
rlib.Rf_allocVector(RTYPES.STRSXP, 1))
item_cdata_name = rlib.PRINTNAME(rlib.TAG(item_cdata))
if _rinterface._TYPEOF(item_cdata_name) != rlib.NILSXP:
rlib.SET_STRING_ELT(
res_name,
0,
item_cdata_name)
rlib.Rf_namesgets(res_cdata, res_name)
res = conversion._cdata_to_rinterface(res_cdata)
elif isinstance(i, slice):
iter_indices = range(*i.indices(len(self)))
n = len(iter_indices)
with memorymanagement.rmemory() as rmemory:
res_cdata = rmemory.protect(
rlib.Rf_allocVector(
self._R_TYPE, n)
)
iter_res_cdata = res_cdata
prev_i = 0
lst_cdata = self.__sexp__._cdata
for i in iter_indices:
if i >= len(self):
raise IndexError('index out of range')
lst_cdata = rlib.Rf_nthcdr(lst_cdata, i - prev_i)
prev_i = i
rlib.SETCAR(iter_res_cdata,
rlib.CAR(lst_cdata))
rlib.SET_TAG(iter_res_cdata,
rlib.TAG(lst_cdata))
iter_res_cdata = rlib.CDR(iter_res_cdata)
res = conversion._cdata_to_rinterface(res_cdata)
else:
raise TypeError(
'Indices must be integers or slices, not %s' % type(i))
return res
@classmethod
@_cdata_res_to_rinterface
def from_iterable(cls, iterable, cast_in=None):
raise NotImplementedError()
class ExprSexpVector(SexpVector):
_R_TYPE = openrlib.rlib.EXPRSXP
_R_GET_PTR = None
_CAST_IN = None
_R_SIZEOF_ELT = None
_R_VECTOR_ELT = openrlib.rlib.VECTOR_ELT
_R_SET_VECTOR_ELT = None
_str2lang = None
def _get_str2lang():
global _str2lang
if _str2lang is None:
try:
_str2lang = baseenv['str2lang']
except KeyError:
# TODO: This exists to ensure compatibility with
# older versions of R. If should be removed when
# older versions of R are no longer supported.
_str2lang = evalr('function(s) '
'base::parse(text=s, keep.source=FALSE)[[1]]')
return _str2lang
LangSexpVector_VT = typing.TypeVar('LangSexpVector_VT', bound='LangSexpVector')
[docs]class LangSexpVector(SexpVector):
"""An R language object.
To create from a (Python) string containing R code
use the classmethod `from_string`."""
_R_TYPE = openrlib.rlib.LANGSXP
_R_GET_PTR = None
_CAST_IN = None
_R_SIZEOF_ELT = None
_R_VECTOR_ELT = None
_R_SET_VECTOR_ELT = None
@_cdata_res_to_rinterface
def __getitem__(self, i: int):
cdata = self.__sexp__._cdata
i_c = _rinterface._python_index_to_c(cdata, i)
return openrlib.rlib.CAR(
openrlib.rlib.Rf_nthcdr(cdata, i_c)
)
def __setitem__(self, i: typing.Union[int, slice],
value: sexp.SupportsSEXP) -> None:
if isinstance(i, slice):
raise NotImplementedError(
'Assigning slices to LangSexpVectors is not yet implemented.'
)
cdata = self.__sexp__._cdata
i_c = _rinterface._python_index_to_c(cdata, i)
openrlib.rlib.SETCAR(
openrlib.rlib.Rf_nthcdr(cdata, i_c),
value.__sexp__._cdata
)
[docs] @classmethod
def from_string(
cls: typing.Type[LangSexpVector_VT], s: str
) -> LangSexpVector_VT:
"""Create an R language object from a string.
This creates an unevaluated R language object.
Args:
s: R source code in a string.
Returns:
An instance of the class the method is called from.
"""
return cls(_get_str2lang()(s))
class SexpClosure(Sexp):
@_cdata_res_to_rinterface
def __call__(self, *args, **kwargs) -> Sexp:
error_occured = _rinterface.ffi.new('int *', 0)
with memorymanagement.rmemory() as rmemory:
call_r = rmemory.protect(
_rinterface.build_rcall(self.__sexp__._cdata, args,
kwargs.items()))
call_context = evaluation_context.get()
res = rmemory.protect(
openrlib.rlib.R_tryEval(
call_r,
call_context.__sexp__._cdata,
error_occured)
)
if error_occured[0]:
raise embedded.RRuntimeError(_rinterface._geterrmessage())
return res
@_cdata_res_to_rinterface
def rcall(self, keyvals,
environment: typing.Optional[SexpEnvironment] = None):
"""Call/evaluate an R function.
Args:
- keyvals: a sequence of key/value (name/parameter) pairs. A
name/parameter that is None will indicated an unnamed parameter.
Like in R, keys/names do not have to be unique, partial matching
can be used, and named/unnamed parameters can occur at any position
in the sequence.
- environment: an optional R environment to evaluate the function.
"""
# TODO: check keyvals are pairs ?
if environment is None:
environment = evaluation_context.get()
assert isinstance(environment, SexpEnvironment)
error_occured = _rinterface.ffi.new('int *', 0)
with memorymanagement.rmemory() as rmemory:
call_r = rmemory.protect(
_rinterface.build_rcall(self.__sexp__._cdata, [],
keyvals))
res = rmemory.protect(
openrlib.rlib.R_tryEval(call_r,
environment.__sexp__._cdata,
error_occured))
if error_occured[0]:
raise embedded.RRuntimeError(_rinterface._geterrmessage())
return res
@property # type: ignore
@_cdata_res_to_rinterface
def closureenv(self) -> SexpEnvironment:
"""Closure of the R function."""
return openrlib.rlib.CLOENV(self.__sexp__._cdata)
[docs]class SexpS4(Sexp):
"""R "S4" object.
S4 objects as one of the available forms of Object-Oriented Programming
in R."""
pass
# TODO: clean up
def make_extptr(obj, tag, protected):
if protected is None:
cdata_protected = openrlib.rlib.R_NilValue
else:
try:
cdata_protected = protected.__sexp__._cdata
except AttributeError:
raise TypeError('Argument protected must inherit from %s' %
type(Sexp))
ptr = _rinterface.ffi.new_handle(obj)
with memorymanagement.rmemory() as rmemory:
cdata = rmemory.protect(
openrlib.rlib.R_MakeExternalPtr(
ptr,
tag,
cdata_protected))
openrlib.rlib.R_RegisterCFinalizer(
cdata,
(_rinterface._capsule_finalizer_c
if _rinterface._capsule_finalizer_c
else _rinterface._capsule_finalizer))
res = _rinterface.SexpCapsuleWithPassenger(cdata, obj, ptr)
return res
[docs]class SexpExtPtr(Sexp):
"""R "External Pointer" object."""
TYPE_TAG = 'Python'
@classmethod
def from_pyobject(cls, func, tag: str = TYPE_TAG,
protected=None):
if not isinstance(tag, str):
raise TypeError('The tag must be a string.')
scaps = make_extptr(func,
conversion._str_to_charsxp(cls.TYPE_TAG),
protected)
res = cls(scaps)
if tag != cls.TYPE_TAG:
res.TYPE_TAG = tag
return res
# TODO: Only use rinterface-level ?
conversion._R_RPY2_MAP.update({
openrlib.rlib.NILSXP: NULLType,
openrlib.rlib.EXPRSXP: ExprSexpVector,
openrlib.rlib.LANGSXP: LangSexpVector,
openrlib.rlib.ENVSXP: SexpEnvironment,
openrlib.rlib.RAWSXP: ByteSexpVector,
openrlib.rlib.LGLSXP: BoolSexpVector,
openrlib.rlib.INTSXP: IntSexpVector,
openrlib.rlib.REALSXP: FloatSexpVector,
openrlib.rlib.CPLXSXP: ComplexSexpVector,
openrlib.rlib.STRSXP: StrSexpVector,
openrlib.rlib.VECSXP: ListSexpVector,
openrlib.rlib.LISTSXP: PairlistSexpVector,
openrlib.rlib.CLOSXP: SexpClosure,
openrlib.rlib.BUILTINSXP: SexpClosure,
openrlib.rlib.SPECIALSXP: SexpClosure,
openrlib.rlib.EXTPTRSXP: SexpExtPtr,
openrlib.rlib.SYMSXP: SexpSymbol,
openrlib.rlib.S4SXP: SexpS4
})
conversion._R_RPY2_DEFAULT_MAP = Sexp
conversion._PY_RPY2_MAP.update({
int: conversion._int_to_sexp,
float: conversion._float_to_sexp,
complex: conversion._complex_to_sexp
})
conversion._PY_R_MAP.update({
_rinterface.ffi.CData: False,
# integer
int: conversion._int_to_sexp,
sexp.NAIntegerType: conversion._int_to_sexp,
# float
float: conversion._float_to_sexp,
sexp.NARealType: conversion._float_to_sexp,
# boolean
bool: conversion._bool_to_sexp,
sexp.NALogicalType: conversion._bool_to_sexp,
# string
str: conversion._str_to_sexp,
sexp.CharSexp: None,
sexp.NACharacterType: None,
# complex
complex: conversion._complex_to_sexp,
sexp.NAComplexType: conversion._complex_to_sexp,
# None
type(None): lambda x: openrlib.rlib.R_NilValue})
def vector(iterable, rtype: RTYPES) -> SexpVector:
"""Create an R vector.
While the different types of R vectors all have their own class,
the creation of array in Python is often available through factory
function that accept the type of the array as a parameters. This
function is providing a similar functionality for R vectors."""
error = False
try:
cls = conversion._R_RPY2_MAP[rtype]
except KeyError:
error = True
if not error and not issubclass(cls, SexpVector):
error = True
if error:
raise ValueError(
'Unable to build a vector from type "%s"' % RTYPES(rtype))
return cls.from_iterable(iterable)
class RRuntimeWarning(RuntimeWarning):
pass
MissingArg = _MissingArgType()
NA_Character = None
NA_Integer = None
NA_Logical = None
NA = None
NA_Real = None
NA_Complex = None
def initr_simple() -> typing.Optional[int]:
"""Initialize R's embedded C library."""
with openrlib.rlock:
status = embedded._initr()
atexit.register(endr, 0)
_rinterface._register_external_symbols()
_post_initr_setup()
return status
def initr_checkenv(
interactive: typing.Optional[bool] = None,
_want_setcallbacks: bool = True,
_c_stack_limit: typing.Optional[int] = None
) -> typing.Optional[int]:
"""Initialize the embedded R.
:param interactive: Should R run in interactive or non-interactive mode?
if `None` the value in `_DEFAULT_R_INTERACTIVE` will be used.
:param _want_setcallbacks: Should custom rpy2 callbacks for R frontends
be set?.
:param _c_stack_limit: Limit for the C Stack.
if `None` the value in `_DEFAULT_C_STACK_LIMIT` will be used.
"""
# Force the internal initialization flag if there is an environment
# variable that indicates that R was already initialized in the current
# process.
status = None
with openrlib.rlock:
_setrenvvars(_DEFAULT_ENVVAR_ACTION)
if embedded.is_r_externally_initialized():
embedded._setinitialized()
else:
status = embedded._initr(interactive=interactive,
_want_setcallbacks=_want_setcallbacks,
_c_stack_limit=_c_stack_limit)
embedded.set_python_process_info()
_rinterface._register_external_symbols()
_post_initr_setup()
return status
initr = initr_checkenv
def _update_R_ENC_PY():
conversion._R_ENC_PY[openrlib.rlib.CE_UTF8] = 'utf-8'
l10n_info = tuple(baseenv['l10n_info']())
if platform.system() == 'Windows':
val_latin1 = 'cp{:d}'.format(l10n_info[3][0])
else:
val_latin1 = 'latin1'
conversion._R_ENC_PY[openrlib.rlib.CE_LATIN1] = val_latin1
if l10n_info[1]:
val_native = conversion._R_ENC_PY[openrlib.rlib.CE_UTF8]
else:
val_native = val_latin1
conversion._R_ENC_PY[openrlib.rlib.CE_NATIVE] = val_native
conversion._R_ENC_PY[openrlib.rlib.CE_ANY] = 'utf-8'
# TODO: This function could be used by situation.py. May be better to
# place elsewhere.
def _getrenvvars(
baselinevars: typing.Optional[typing.MutableMapping[str, str]] = None,
r_home: typing.Optional[str] = None
) -> typing.Tuple[typing.Tuple[str, str], ...]:
"""Get the environment variables defined by the R front-end script."""
if baselinevars is None:
baselinevars = os.environ
if r_home is None:
r_home = openrlib.R_HOME
if r_home is None:
raise RuntimeError('Unable to determine R_HOME.')
cmd = (
os.path.join(r_home, 'bin', 'Rscript'),
'-e',
'x<-Sys.getenv();y<-as.character(x);names(y)<-names(x);'
'write.table(y,col.names=FALSE,quote=TRUE,sep=",")'
)
envvars = subprocess.check_output(cmd,
universal_newlines=True,
stderr=subprocess.PIPE)
res = []
reader = csv.reader(row for row in envvars.split(os.linesep) if row != '')
for k, v in reader:
if (
(k not in baselinevars)
or
(baselinevars[k] != v)
):
res.append((k, v))
return tuple(res)
def _setrenvvars(action: _ENVVAR_ACTION):
new_envvars = {}
for k, v in _getrenvvars():
if k in os.environ:
if (
action in (_ENVVAR_ACTION.KEEP_WARN, _ENVVAR_ACTION.KEEP_NOWARN)
):
if action is _ENVVAR_ACTION.KEEP_WARN:
warnings.warn(
f'Environment variable "{k}" redefined by R but ignored.'
)
continue
elif (
action in (_ENVVAR_ACTION.REPLACE_WARN, _ENVVAR_ACTION.REPLACE_NOWARN)
):
if action is _ENVVAR_ACTION.REPLACE_WARN:
warnings.warn(
f'Environment variable "{k}" redefined by R and overriding '
'existing variable.'
)
new_envvars[k] = v
os.environ.update(new_envvars)
def _post_initr_setup() -> None:
emptyenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_EmptyEnv)
baseenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_BaseEnv)
globalenv.__sexp__ = _rinterface.SexpCapsule(openrlib.rlib.R_GlobalEnv)
NULL._sexpobject = _rinterface.UnmanagedSexpCapsule(
openrlib.rlib.R_NilValue
)
MissingArg._sexpobject = _rinterface.UnmanagedSexpCapsule(
openrlib.rlib.R_MissingArg
)
global NA_Character
na_values.NA_Character = sexp.NACharacterType()
NA_Character = na_values.NA_Character
global NA_Integer
na_values.NA_Integer = sexp.NAIntegerType(openrlib.rlib.R_NaInt)
NA_Integer = na_values.NA_Integer
global NA_Logical, NA
na_values.NA_Logical = sexp.NALogicalType(openrlib.rlib.R_NaInt)
NA_Logical = na_values.NA_Logical
NA = NA_Logical
global NA_Real
na_values.NA_Real = sexp.NARealType(openrlib.rlib.R_NaReal)
NA_Real = na_values.NA_Real
global NA_Complex
na_values.NA_Complex = sexp.NAComplexType(
_rinterface.ffi.new(
'Rcomplex *',
[openrlib.rlib.R_NaReal, openrlib.rlib.R_NaReal])
)
NA_Complex = na_values.NA_Complex
warn_about_thread = False
if threading.current_thread() is threading.main_thread():
try:
signal.signal(signal.SIGINT, _sigint_handler)
except ValueError as ve:
if str(ve) == 'signal only works in main thread':
warn_about_thread = True
else:
raise ve
else:
warn_about_thread = True
if warn_about_thread:
warnings.warn(
textwrap.dedent(
"""R is not initialized by the main thread.
Its taking over SIGINT cannot be reversed here, and as a
consequence the embedded R cannot be interrupted with Ctrl-C.
Consider (re)setting the signal handler of your choice from
the main thread."""
)
)
_update_R_ENC_PY()
def _find_first(nodes, of_type):
for node in nodes:
if isinstance(node, of_type):
return node
raise ValueError(
f'No node of type {of_type}'
)
[docs]def rternalize(
function: typing.Optional[typing.Callable] = None, *,
signature: bool = False
) -> typing.Union[SexpClosure, functools.partial]:
""" Make a Python function callable from R.
Takes an arbitrary Python function and wrap it in such a way that
it can be called from the R side.
This factory can be used as a decorator, and has an optional
named argument "signature" that can be True or False (default is
False). When True, the R function wrapping the Python one will
have a matching signature or a close one, as detailed below.
The Python ellipsis arguments`*args` and `**kwargs` are
translated to the R ellipsis `...`.
For example:
.. code-block:: python
@rternalize(signature=True)
def foo(x, *args, y=2):
pass
will be visible in R with the signature:
.. code-block:: r
function (x, ..., y)
The only limitation is that whenever `*args` and `**kwargs` are
both present in the Python declaration they must be consecutive.
For example:
.. code-block:: python
def foo(x, *args, y=2, **kwargs):
pass
is a valid Python declaration. However, it cannot be "rternalized"
because the R ellipsis can only appear at most once in the signature
of a given function. Trying to apply the decorator `rternalize` would
raise an exception.
The following Python function can be "rternalized":
.. code-block:: python
def foo(x, *args, **kwargs):
pass
It is visible to R with the signature
.. code-block:: r
function (x, ...)
Python function definitions can allow the optional naming of required
arguments. The mapping of signatures between Python and R is then
quasi-indentical since R does it for unnamed arguments. The check
that all arguments are present is still performed on the Python side.
Example:
.. code-block:: python
@rternalize(signature=True)
def foo(x, *, y, z):
print(f'x: {x[0]}, y: {y[0]}, z: {z[0]}')
return ri.NULL
>>> _ = foo(1, 2, 3)
x: 1, y: 2, z: 3
ValueErro: None
>>> _ = foo(1)
TypeError: rternalized foo() missing 2 required keyword-only arguments: 'y' and 'z'
>>> _ = foo(1, z=2, y=3)
x: 1, y: 3, z: 2
>>> _ = foo(1, z=2, y=3)
x: 1, y: 3, z: 2
Note that whenever the Python function has an ellipsis (either `*args`
or `**kwargs`) earlier parameters in the signature that are
positional-or-keyword are considered to be positional arguments in a
function call.
:param function: A Python callable object. This is a positional
argument with a default value `None` to allow the decorator function
without parentheses when optional argument is not wanted.
:return: A wrapped R object that can be use like any other rpy2
object.
"""
if not embedded.isinitialized():
raise embedded.RNotReadyError('The embedded R is not yet initialized.')
if function is None:
return functools.partial(rternalize, signature=signature)
assert callable(function)
rpy_fun = SexpExtPtr.from_pyobject(function)
if not signature:
template = parse("""
function(...) { .External(".Python", foo, ...);
}
""")
template[0][2][1][2] = rpy_fun
else:
has_ellipsis = None
params_r_sig = []
for p_i, (name, param) in enumerate(
inspect.signature(function).parameters.items()
):
if (
param.kind is inspect.Parameter.VAR_POSITIONAL or
param.kind is inspect.Parameter.VAR_KEYWORD
):
if has_ellipsis:
if has_ellipsis != (p_i - 1):
raise ValueError(
'R functions can only have one ellipsis. '
'As consequence your Python function must have *args '
'and **kwargs that are consecutive in function '
'signature.'
)
else:
# The ellipsis can only be added once.
has_ellipsis = p_i
params_r_sig.append('...')
else:
params_r_sig.append(name)
r_func_args = ', '.join(params_r_sig)
r_src = f"""
function({r_func_args}) {{
py_func <- RPY2_FUN_PLACEHOLDER
lst_args <- base::as.list(base::match.call()[-1])
RPY2_ARGUMENTS <- base::c(
base::list(
".Python",
py_func
),
lst_args
)
res <- base::do.call(
base::.External,
RPY2_ARGUMENTS
);
res
}}
"""
template = parse(r_src)
function_definition = _find_first(template, of_type=LangSexpVector)
function_body = _find_first(function_definition, of_type=LangSexpVector)
rpy_fun_node = function_body[1]
assert str(rpy_fun_node[2]) == 'RPY2_FUN_PLACEHOLDER'
rpy_fun_node[2] = rpy_fun
# TODO: use lower-level eval ?
res = baseenv['eval'](template)
# TODO: hack to prevent the nested function from having its
# refcount down to zero when returning
res.__nested_sexp__ = rpy_fun.__sexp__
return res
[docs]def process_revents() -> None:
"""Process R events.
Calling this function a regular interval can help ensure that
R is processing input events (e.g., resizing of a window with
graphics)."""
openrlib.rlib.rpy2_runHandlers(openrlib.rlib.R_InputHandlers)