Source code for rpy2.rinterface_lib.callbacks

"""Callbacks available from R's C-API.

The callbacks make available in R's C-API can be specified as
Python functions, with the module providing the adapter code
that makes it possible."""

from contextlib import contextmanager
import logging
import typing
from _rinterface_cffi import ffi
from . import conversion

logger = logging.getLogger(__name__)

READCONSOLE_SIGNATURE = 'int(char *, unsigned char *, int, int)'
RESETCONSOLE_SIGNATURE = 'void(void)'
WRITECONSOLE_SIGNATURE = 'void(char *, int)'
WRITECONSOLE_EX_SIGNATURE = 'void(char *, int, int)'
SHOWMESSAGE_SIGNATURE = 'void(char *)'
CHOOSEFILE_SIGNATURE = 'int(int, char *, int)'
CLEANUP_SIGNATURE = 'void(SA_TYPE, int, int)'
SHOWFILE_SIGNATURE = ('int(int, const char **, const char **, '
                      '    const char *, Rboolean, const char *)')


# TODO: rename to "replace_in_module"
@contextmanager
def obj_in_module(module, name: str, obj):
    obj_orig = getattr(module, name)
    setattr(module, name, obj)
    try:
        yield
    finally:
        setattr(module, name, obj_orig)


[docs]def consoleflush(): pass
_FLUSHCONSOLE_EXCEPTION_LOG = 'R[flush console]: %s' @ffi.callback('void (void)') def _consoleflush(): try: consoleflush() except Exception as e: logger.error(_FLUSHCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consoleread(prompt: str) -> str: return input(prompt)
_READCONSOLE_EXCEPTION_LOG = 'R[read into console]: %s' _READCONSOLE_INTERNAL_EXCEPTION_LOG = 'Internal rpy2 error with callback: %s' @ffi.callback(READCONSOLE_SIGNATURE) def _consoleread(prompt, buf, n: int, addtohistory) -> int: success = None try: s = conversion._cchar_to_str(prompt) reply = consoleread(s) except Exception as e: success = 0 logger.error(_READCONSOLE_EXCEPTION_LOG, str(e)) if success == 0: return success try: # TODO: handle non-ASCII encodings reply_b = reply.encode('ASCII') reply_n = min(n, len(reply_b)) ffi.memmove(buf, reply_b, reply_n) if reply_n < n: buf[reply_n] = ord(b'\n') for i in range(reply_n+1, n): buf[i] = 0 if reply_n == 0: success = 0 else: success = 1 except Exception as e: success = 0 logger.error(_READCONSOLE_INTERNAL_EXCEPTION_LOG, str(e)) return success def consolereset() -> None: pass _RESETCONSOLE_EXCEPTION_LOG = 'R[reset console]: %s' @ffi.callback(RESETCONSOLE_SIGNATURE) def _consolereset() -> None: try: consolereset() except Exception as e: logger.error(_RESETCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consolewrite_print(s: str) -> None: # TODO: is the callback for flush working with Linux ? print(s, end='', flush=True)
[docs]def consolewrite_warnerror(s: str) -> None: # TODO: use an rpy2/R-specific warning instead of UserWarning. logger.warning(_WRITECONSOLE_EXCEPTION_LOG, s)
_WRITECONSOLE_EXCEPTION_LOG = 'R[write to console]: %s' @ffi.callback(WRITECONSOLE_EX_SIGNATURE) def _consolewrite_ex(buf, n: int, otype) -> None: s = conversion._cchar_to_str_with_maxlen(buf, maxlen=n) try: if otype == 0: consolewrite_print(s) else: consolewrite_warnerror(s) except Exception as e: logger.error(_WRITECONSOLE_EXCEPTION_LOG, str(e)) def showmessage(s: str) -> None: print('R wants to show a message') print(s) _SHOWMESSAGE_EXCEPTION_LOG = 'R[show message]: %s' @ffi.callback(SHOWMESSAGE_SIGNATURE) def _showmessage(buf): s = conversion._cchar_to_str(buf) try: showmessage(s) except Exception as e: logger.error(_SHOWMESSAGE_EXCEPTION_LOG, str(e)) def choosefile(new): return input('Enter file name:') _CHOOSEFILE_EXCEPTION_LOG = 'R[choose file]: %s' @ffi.callback(CHOOSEFILE_SIGNATURE) def _choosefile(new, buf, n: int) -> int: try: res = choosefile(new) except Exception as e: logger.error(_CHOOSEFILE_EXCEPTION_LOG, str(e)) res = None if res is None: return 0 res_cdata = conversion._str_to_cchar(res) ffi.memmove(buf, res_cdata, len(res_cdata)) return len(res_cdata) def showfiles(filenames: typing.Tuple[str], headers: typing.Tuple[str], wtitle: str, pager: str) -> None: for fn in filenames: print('File: %s' % fn) with open(fn) as fh: for row in fh: print(row) print('---') _SHOWFILE_EXCEPTION_LOG = 'R[show file]: %s' _SHOWFILE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error while ' 'showing files for R: %s') @ffi.callback(SHOWFILE_SIGNATURE) def _showfiles(nfiles: int, files, headers, wtitle, delete, pager) -> int: filenames = [] headers_str = [] wtitle_str = None pager_str = None try: wtitle_str = conversion._cchar_to_str(wtitle) pager_str = conversion._cchar_to_str(pager) for i in range(nfiles): filenames.append(conversion._cchar_to_str(files[i])) headers_str.append(conversion._cchar_to_str(headers[i])) except Exception as e: logger.error(_SHOWFILE_INTERNAL_EXCEPTION_LOG, str(e)) if len(filenames): res = 0 else: res = 1 try: showfiles(tuple(filenames), tuple(headers_str), wtitle_str, pager_str) except Exception as e: res = 1 logger.error(_SHOWFILE_EXCEPTION_LOG, str(e)) return res def cleanup(saveact, status, runlast): pass _CLEANUP_EXCEPTION_LOG = 'R[cleanup]: %s' @ffi.callback(CLEANUP_SIGNATURE) def _cleanup(saveact, status, runlast): try: cleanup(saveact, status, runlast) except Exception as e: logger.error(_CLEANUP_EXCEPTION_LOG, str(e))