Source code for rpy2.rinterface_lib.embedded

import collections
import enum
import logging
import os
import sys
import typing
import warnings
from rpy2.rinterface_lib import openrlib
from rpy2.rinterface_lib import callbacks
from typing import Protocol

logger = logging.getLogger(__name__)

ffi = openrlib.ffi

# Container for module-level C objects needing to be protected from garbage
# collection on the Python side.
__cffi_protected = {}

_options = ('rpy2', '--quiet', '--no-save')  # type: typing.Tuple[str, ...]
logger.info('Default options to initialize R: {}'.format(', '.join(_options)))

# These constants are default values from R sources
_DEFAULT_VSIZE: int = 67108864  # vector heap size
_DEFAULT_NSIZE: int = 350000  # language heap size
_DEFAULT_MAX_VSIZE: int = sys.maxsize  # max vector heap size
_DEFAULT_MAX_NSIZE: int = 50000000  # max language heap size
_DEFAULT_PPSIZE: int = 50000  # stack size
_DEFAULT_C_STACK_LIMIT: int = -1
_DEFAULT_R_INTERACTIVE: bool = True

rpy2_embeddedR_isinitialized = 0x00


class Is_RStart(Protocol):
    @property
    def rhome(self): ...

    @rhome.setter
    def rhome(self, value) -> None: ...

    @property
    def home(self): ...

    @home.setter
    def home(self, value) -> None: ...

    @property
    def CharacterMode(self): ...

    @CharacterMode.setter
    def CharacterMode(self, value) -> None: ...

    @property
    def ReadConsole(self): ...

    @ReadConsole.setter
    def ReadConsole(self, value) -> None: ...

    @property
    def WriteConsoleEx(self): ...

    @WriteConsoleEx.setter
    def WriteConsoleEx(self, value) -> None: ...

    @property
    def CallBack(self): ...

    @CallBack.setter
    def CallBack(self, value) -> None: ...

    @property
    def ShowMessage(self): ...

    @ShowMessage.setter
    def ShowMessage(self, value) -> None: ...

    @property
    def YesNoCancel(self): ...

    @YesNoCancel.setter
    def YesNoCancel(self, value) -> None: ...

    @property
    def Busy(self): ...

    @Busy.setter
    def Busy(self, value) -> None: ...

    @property
    def R_Quiet(self): ...

    @R_Quiet.setter
    def R_Quiet(self, value) -> None: ...

    @property
    def R_Interactive(self): ...

    @R_Interactive.setter
    def R_Interactive(self, value) -> None: ...

    @property
    def RestoreAction(self): ...

    @RestoreAction.setter
    def RestoreAction(self, value) -> None: ...

    @property
    def SaveAction(self): ...

    @SaveAction.setter
    def SaveAction(self, value) -> None: ...

    @property
    def vsize(self): ...

    @vsize.setter
    def vsize(self, value) -> None: ...

    @property
    def nsize(self): ...

    @nsize.setter
    def nsize(self, value) -> None: ...

    @property
    def max_vsize(self): ...

    @max_vsize.setter
    def max_vsize(self, value) -> None: ...

    @property
    def max_nsize(self): ...

    @max_nsize.setter
    def max_nsize(self, value) -> None: ...

    @property
    def ppsize(self): ...

    @ppsize.setter
    def ppsize(self, value) -> None: ...


rstart: Is_RStart = None  # type: ignore


def _build_rstart(rhome, interactive, setcallbacks):
    rstart = ffi.new('Rstart')
    __cffi_protected['rstart'] = rstart
    openrlib.rlib.R_DefParams(rstart)
    rstart.rhome = rhome
    userhome = ffi.new("char[]", ffi.string(openrlib.rlib.getRUser()))
    __cffi_protected['userhome'] = userhome
    rstart.home = userhome
    rstart.CharacterMode = openrlib.rlib.LinkDLL
    if setcallbacks:
        for cb in CALLBACK_INIT_PAIRS:
            if cb.c_name_nt:
                setattr(rstart, cb.c_name_nt,
                        getattr(callbacks, cb.py_name))

    rstart.R_Quiet = True
    rstart.R_Interactive = interactive
    rstart.RestoreAction = openrlib.rlib.SA_RESTORE
    rstart.SaveAction = openrlib.rlib.SA_NOSAVE

    rstart.vsize = ffi.cast('size_t', _DEFAULT_VSIZE)
    rstart.nsize = ffi.cast('size_t', _DEFAULT_NSIZE)
    rstart.max_vsize = ffi.cast('size_t', _DEFAULT_MAX_VSIZE)
    rstart.max_nsize = ffi.cast('size_t', _DEFAULT_MAX_NSIZE)
    rstart.ppsize = ffi.cast('size_t', _DEFAULT_PPSIZE)
    return rstart


# TODO: move initialization-related code to _rinterface ?
class RPY_R_Status(enum.Enum):
    """Possible status for the embedded R."""
    INITIALIZED = 0x01
    BUSY = 0x02
    ENDED = 0x04


def set_initoptions(options: typing.Tuple[str]) -> None:
    """Set initialization options for the embedded R.

    :param:`options` A tuple of string with the options
    (e.g., '--verbose', '--quiet').
    """
    if rpy2_embeddedR_isinitialized:
        raise RuntimeError('Options can no longer be set once '
                           'R is initialized.')
    global _options
    for x in options:
        assert isinstance(x, str)
    with openrlib.rlock:
        logger.info('Setting options to initialize R: {}'
                    .format(', '.join(options)))
        _options = tuple(options)


def get_initoptions() -> typing.Tuple[str, ...]:
    """Get the initialization options for the embedded R."""
    return _options


def isinitialized() -> bool:
    """Is the embedded R initialized."""
    return bool(rpy2_embeddedR_isinitialized & RPY_R_Status.INITIALIZED.value)


def _setinitialized() -> None:
    """Set the embedded R as initialized.

    This may result in a later segfault if used with the embedded R has not
    been initialized. You should not have to use it."""
    global rpy2_embeddedR_isinitialized
    rpy2_embeddedR_isinitialized = RPY_R_Status.INITIALIZED.value


def isready() -> bool:
    """Is the embedded R ready for use."""
    INITIALIZED = RPY_R_Status.INITIALIZED
    return bool(
        rpy2_embeddedR_isinitialized == INITIALIZED.value
    )


def assert_isready() -> None:
    """Assert whether R is ready (initialized).

    Raises an RNotReadyError if it is not."""
    if not isready():
        raise RNotReadyError(
            'The embedded R is not ready to use.')


class RNotReadyError(Exception):
    """Embedded R is not ready to use."""
    pass


class RRuntimeError(Exception):
    """Error generated by R."""
    pass


def _setcallback(rlib, rlib_symbol: str,
                 callbacks,
                 callback_symbol: typing.Optional[str]) -> None:
    """Set R callbacks.

    :param rlib: Namespace
    :param rlib_symbol: Symbol (name) in the namespace in which to place
      the new callback function
    :param callbacks: Namespace in which to find the callback function.
    :param callbacks_symbol: Symbol (name) of the new callback function.
"""
    if callback_symbol is None:
        new_callback = ffi.NULL
    else:
        new_callback = getattr(callbacks, callback_symbol)
    setattr(rlib, rlib_symbol, new_callback)


_CallbackInit = collections.namedtuple(
    '_CallbackInit',
    ('c_name_posix', 'c_name_nt', 'py_name')
)

CALLBACK_INIT_PAIRS: typing.Tuple[_CallbackInit, ...] = (
    _CallbackInit('ptr_R_WriteConsoleEx', 'WriteConsoleEx', '_consolewrite_ex'),
    _CallbackInit('ptr_R_WriteConsole', 'WriteConsole', None),
    _CallbackInit('ptr_R_ShowMessage', 'ShowMessage', '_showmessage'),
    _CallbackInit('ptr_R_ReadConsole', 'ReadConsole', '_consoleread'),
    _CallbackInit('ptr_R_FlushConsole', None, '_consoleflush'),
    _CallbackInit('ptr_R_ResetConsole', None, '_consolereset'),
    _CallbackInit('ptr_R_ChooseFile', None, '_choosefile'),
    _CallbackInit('ptr_R_ShowFiles', None, '_showfiles'),
    _CallbackInit('ptr_R_CleanUp', None, '_cleanup'),
    _CallbackInit('ptr_R_ProcessEvents', None, '_processevents'),
    _CallbackInit(None, 'YesNoCancel', '_yesnocancel'),
    _CallbackInit(None, 'CallBack', '_callback'),
    _CallbackInit('ptr_R_Busy', 'Busy', '_busy'),
)


def _initr(
        interactive: typing.Optional[bool] = None,
        _want_setcallbacks: bool = True,
        _c_stack_limit: typing.Optional[int] = None
) -> typing.Optional[int]:
    """Initialize the embedded R.

    :param interactive: Should R run in interactive or non-interactive mode?
    if `None` the value in `_DEFAULT_R_INTERACTIVE` will be used.
    :param _want_setcallbacks: Should custom rpy2 callbacks for R frontends
    be set?.
    :param _c_stack_limit: Limit for the C Stack.
    if `None` the value in `_DEFAULT_C_STACK_LIMIT` will be used.
    """

    if interactive is None:
        interactive = _DEFAULT_R_INTERACTIVE
    if _c_stack_limit is None:
        _c_stack_limit = _DEFAULT_C_STACK_LIMIT

    rlib = openrlib.rlib
    ffi_proxy = openrlib.ffi_proxy
    if (
            ffi_proxy.get_ffi_mode(openrlib._rinterface_cffi)
            ==
            ffi_proxy.InterfaceType.ABI
    ):
        callback_funcs = callbacks
    else:
        callback_funcs = rlib

    with openrlib.rlock:
        if isinitialized():
            logger.info('R is already initialized. No need to initialize.')
            return None
        elif openrlib.rlib.R_NilValue != ffi.NULL:
            msg = ('R was initialized outside of rpy2 (R_NilValue != NULL). '
                   'Trying to use it nevertheless.')
            warnings.warn(msg)
            logger.warn(msg)
            _setinitialized()
            return None

        # TODO: Setting LD_LIBRARY_PATH after the process has started
        # is too late. Because of this, the line below does not help
        # address issues where calling R from the command line is working
        # (as it is a shell script setting environment variables before
        # start the binary in a child process). Calling C's dlopen with
        # the path of the shared library could address this but for the
        # API mode this would require writing a C wrapper to manually
        # load each each symbol in the C library.
        options_c = [ffi.new('char[]', o.encode('ASCII')) for o in _options]
        n_options = len(options_c)
        n_options_c = ffi.cast('int', n_options)

        if os.name == 'nt':
            status = openrlib.rlib.Rf_initEmbeddedR(n_options_c, options_c)
            _setinitialized()

            rhome = openrlib.rlib.get_R_HOME()
            __cffi_protected['rhome'] = rhome
            rstart = _build_rstart(rhome, interactive, _want_setcallbacks)
            openrlib.rlib.R_SetParams(rstart)

            # TODO: still needed ?
            openrlib.rlib.R_CStackLimit = ffi.cast('uintptr_t', _c_stack_limit)
        else:
            if openrlib.R_HOME is None:
                raise ValueError('openrlib.R_HOME cannot be None.')
            else:
                os.environ['R_HOME'] = openrlib.R_HOME

            # TODO: Conditional in C code
            rlib.R_SignalHandlers = 0

            # Instead of calling Rf_initEmbeddedR which breaks threaded context
            # perform the initialization manually to set R_CStackLimit before
            # calling setup_Rmainloop(), see:
            # https://github.com/rpy2/rpy2/issues/729
            rlib.Rf_initialize_R(n_options_c, options_c)
            if _c_stack_limit:
                rlib.R_CStackLimit = ffi.cast('uintptr_t', _c_stack_limit)
            rlib.R_Interactive = True
            logger.debug('Calling R setup_Rmainloop.')
            rlib.setup_Rmainloop()

            _setinitialized()

            rlib.R_Interactive = interactive

            # TODO: Conditional definition in C code
            #   (Aqua, TERM, and TERM not "dumb")
            rlib.R_Outputfile = ffi.NULL
            rlib.R_Consolefile = ffi.NULL

            if _want_setcallbacks:
                logger.debug('Setting functions for R callbacks.')
                for cb in CALLBACK_INIT_PAIRS:
                    if cb.c_name_posix:
                        _setcallback(rlib, cb.c_name_posix,
                                     callback_funcs, cb.py_name)
            status = 1
    return status


[docs] def endr(fatal: int) -> None: logger.debug('Ending embedded R process.') global rpy2_embeddedR_isinitialized rlib = openrlib.rlib with openrlib.rlock: if rpy2_embeddedR_isinitialized & RPY_R_Status.ENDED.value: logger.info('Embedded R already ended.') return logger.debug('R_do_Last()') rlib.R_dot_Last() logger.debug('R_RunExitFinalizers()') rlib.R_RunExitFinalizers() logger.debug('Rf_KillAllDevices()') rlib.Rf_KillAllDevices() logger.debug('R_CleanTempDir()') rlib.R_CleanTempDir() logger.debug('R_gc') rlib.R_gc() logger.debug('Rf_endEmbeddedR(fatal)') rlib.Rf_endEmbeddedR(fatal) rpy2_embeddedR_isinitialized ^= RPY_R_Status.ENDED.value logger.info('Embedded R ended.')
_REFERENCE_TO_R_SESSIONS = 'https://github.com/rstudio/reticulate/issues/98' _R_SESSION_INITIALIZED = 'R_SESSION_INITIALIZED' _PYTHON_SESSION_INITIALIZED = 'PYTHON_SESSION_INITIALIZED' def get_r_session_status(r_session_init=None) -> dict: """Return information about the R session, if available. Information about the R session being already initialized can be communicated by an environment variable exported by the process that initialized it. See discussion at: %s """ % _REFERENCE_TO_R_SESSIONS res = {'current_pid': os.getpid()} if r_session_init is None: r_session_init = os.environ.get(_R_SESSION_INITIALIZED) if r_session_init: for item in r_session_init.split(':'): try: key, value = item.split('=', 1) except ValueError: warnings.warn( 'The item %s in %s should be of the form key=value.' % (item, _R_SESSION_INITIALIZED) ) res[key] = value return res def is_r_externally_initialized() -> bool: r_status = get_r_session_status() return str(r_status['current_pid']) == str(r_status.get('PID')) def set_python_process_info() -> None: """Set information about the Python process in an environment variable. See discussion at: %s """ % _REFERENCE_TO_R_SESSIONS info = (('current_pid', os.getpid()), ('sys.executable', sys.executable)) info_string = ':'.join('%s=%s' % x for x in info) os.environ[_PYTHON_SESSION_INITIALIZED] = info_string