Source code for rpy2.rinterface_lib.callbacks

"""Callbacks available from R's C-API.

The callbacks make available in R's C-API can be specified as
Python functions, with the module providing the adapter code
that makes it possible."""

from contextlib import contextmanager
import logging
import typing
from .openrlib import ffi
from .openrlib import _rinterface_cffi
from . import ffi_proxy
from . import conversion

logger = logging.getLogger(__name__)


# TODO: rename to "replace_in_module"
@contextmanager
def obj_in_module(module, name: str, obj):
    obj_orig = getattr(module, name)
    setattr(module, name, obj)
    try:
        yield
    finally:
        setattr(module, name, obj_orig)


[docs]def consoleflush(): pass
_FLUSHCONSOLE_EXCEPTION_LOG = 'R[flush console]: %s' @ffi_proxy.callback(ffi_proxy._consoleflush_def, _rinterface_cffi) def _consoleflush(): try: consoleflush() except Exception as e: logger.error(_FLUSHCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consoleread(prompt: str) -> str: """Read input for the R console. :param prompt: The message prompted. :return: A string with the input returned by the user. """ return input(prompt)
_READCONSOLE_EXCEPTION_LOG = 'R[read into console]: %s' _READCONSOLE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error with ' '_consoleread callback: %s') @ffi_proxy.callback(ffi_proxy._consoleread_def, _rinterface_cffi) def _consoleread(prompt, buf, n: int, addtohistory) -> int: success = None try: s = conversion._cchar_to_str(prompt) reply = consoleread(s) except Exception as e: success = 0 logger.error(_READCONSOLE_EXCEPTION_LOG, str(e)) if success == 0: return success try: # TODO: Should the coding by dynamically extracted from # elsewhere ? reply_b = reply.encode('utf-8') reply_n = min(n, len(reply_b)) pybuf = bytearray(n) pybuf[:reply_n] = reply_b[:reply_n] ffi.memmove(buf, pybuf, n) if reply_n == 0: success = 0 else: success = 1 except Exception as e: success = 0 logger.error(_READCONSOLE_INTERNAL_EXCEPTION_LOG, str(e)) return success def consolereset() -> None: pass _RESETCONSOLE_EXCEPTION_LOG = 'R[reset console]: %s' @ffi_proxy.callback(ffi_proxy._consolereset_def, _rinterface_cffi) def _consolereset() -> None: try: consolereset() except Exception as e: logger.error(_RESETCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consolewrite_print(s: str) -> None: """R writing to the console/terminal. :param s: the data to write to the console/terminal. """ # TODO: is the callback for flush working with Linux ? print(s, end='', flush=True)
[docs]def consolewrite_warnerror(s: str) -> None: # TODO: use an rpy2/R-specific warning instead of UserWarning. logger.warning(_WRITECONSOLE_EXCEPTION_LOG, s)
_WRITECONSOLE_EXCEPTION_LOG = 'R[write to console]: %s' @ffi_proxy.callback(ffi_proxy._consolewrite_ex_def, _rinterface_cffi) def _consolewrite_ex(buf, n: int, otype) -> None: s = conversion._cchar_to_str_with_maxlen(buf, maxlen=n) try: if otype == 0: consolewrite_print(s) else: consolewrite_warnerror(s) except Exception as e: logger.error(_WRITECONSOLE_EXCEPTION_LOG, str(e))
[docs]def showmessage(s: str) -> None: print('R wants to show a message') print(s)
_SHOWMESSAGE_EXCEPTION_LOG = 'R[show message]: %s' @ffi_proxy.callback(ffi_proxy._showmessage_def, _rinterface_cffi) def _showmessage(buf): s = conversion._cchar_to_str(buf) try: showmessage(s) except Exception as e: logger.error(_SHOWMESSAGE_EXCEPTION_LOG, str(e)) def choosefile(new): return input('Enter file name:') _CHOOSEFILE_EXCEPTION_LOG = 'R[choose file]: %s' @ffi_proxy.callback(ffi_proxy._choosefile_def, _rinterface_cffi) def _choosefile(new, buf, n: int) -> int: try: res = choosefile(new) except Exception as e: logger.error(_CHOOSEFILE_EXCEPTION_LOG, str(e)) res = None if res is None: return 0 res_cdata = conversion._str_to_cchar(res) ffi.memmove(buf, res_cdata, len(res_cdata)) return len(res_cdata)
[docs]def showfiles(filenames: typing.Tuple[str, ...], headers: typing.Tuple[str, ...], wtitle: typing.Optional[str], pager: typing.Optional[str]) -> None: """R showing files. :param filenames: A tuple of file names. :param headers: A tuple of strings (TODO: check what it is) :wtitle: Title of the "window" showing the files. :pager: Pager to use to show the list of files. """ for fn in filenames: print('File: %s' % fn) with open(fn) as fh: for row in fh: print(row) print('---')
_SHOWFILE_EXCEPTION_LOG = 'R[show file]: %s' _SHOWFILE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error while ' 'showing files for R: %s') @ffi_proxy.callback(ffi_proxy._showfiles_def, _rinterface_cffi) def _showfiles(nfiles: int, files, headers, wtitle, delete, pager) -> int: filenames = [] headers_str = [] wtitle_str = None pager_str = None try: wtitle_str = conversion._cchar_to_str(wtitle) pager_str = conversion._cchar_to_str(pager) for i in range(nfiles): filenames.append(conversion._cchar_to_str(files[i])) headers_str.append(conversion._cchar_to_str(headers[i])) except Exception as e: logger.error(_SHOWFILE_INTERNAL_EXCEPTION_LOG, str(e)) if len(filenames): res = 0 else: res = 1 try: showfiles(tuple(filenames), tuple(headers_str), wtitle_str, pager_str) except Exception as e: res = 1 logger.error(_SHOWFILE_EXCEPTION_LOG, str(e)) return res def cleanup(saveact, status, runlast): pass _CLEANUP_EXCEPTION_LOG = 'R[cleanup]: %s' @ffi_proxy.callback(ffi_proxy._cleanup_def, _rinterface_cffi) def _cleanup(saveact, status, runlast): try: cleanup(saveact, status, runlast) except Exception as e: logger.error(_CLEANUP_EXCEPTION_LOG, str(e))
[docs]def processevents() -> None: """Process R events. This function can be periodically called by R to handle events such as window resizing in an interactive graphical device.""" pass
_PROCESSEVENTS_EXCEPTION_LOG = 'R[processevents]: %s' @ffi_proxy.callback(ffi_proxy._processevents_def, _rinterface_cffi) def _processevents() -> None: try: processevents() except Exception as e: logger.error(_PROCESSEVENTS_EXCEPTION_LOG, str(e))
[docs]def busy(x: int) -> None: """R is busy. :param x: TODO this is an integer but do not know what it does. """ pass
_BUSY_EXCEPTION_LOG = 'R[busy]: %s' @ffi_proxy.callback(ffi_proxy._busy_def, _rinterface_cffi) def _busy(which) -> None: try: busy(which) except Exception as e: logger.error(_BUSY_EXCEPTION_LOG, str(e)) def callback() -> None: pass _CALLBACK_EXCEPTION_LOG = 'R[callback]: %s' @ffi_proxy.callback(ffi_proxy._callback_def, _rinterface_cffi) def _callback() -> None: try: callback() except Exception as e: logger.error(_CALLBACK_EXCEPTION_LOG, str(e))
[docs]def yesnocancel(question: str) -> int: """Asking a user to answer yes, no, or cancel. :param question: The question asked to the user :return: An integer with the answer. """ return int(input(question))
_YESNOCANCEL_EXCEPTION_LOG = 'R[yesnocancel]: %s' @ffi_proxy.callback(ffi_proxy._yesnocancel_def, _rinterface_cffi) def _yesnocancel(question): try: q = conversion._cchar_to_str(question) res = yesnocancel(q) except Exception as e: logger.error(_YESNOCANCEL_EXCEPTION_LOG, str(e)) return res