"""Callbacks available from R's C-API.
The callbacks make available in R's C-API can be specified as
Python functions, with the module providing the adapter code
that makes it possible."""
from contextlib import contextmanager
import logging
import typing
from .openrlib import ffi
from .openrlib import _rinterface_cffi
from . import ffi_proxy
from . import conversion
logger = logging.getLogger(__name__)
# TODO: rename to "replace_in_module"
@contextmanager
def obj_in_module(module, name: str, obj):
obj_orig = getattr(module, name)
setattr(module, name, obj)
try:
yield
finally:
setattr(module, name, obj_orig)
[docs]def consoleflush():
pass
_FLUSHCONSOLE_EXCEPTION_LOG = 'R[flush console]: %s'
@ffi_proxy.callback(ffi_proxy._consoleflush_def,
_rinterface_cffi)
def _consoleflush():
try:
consoleflush()
except Exception as e:
logger.error(_FLUSHCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consoleread(prompt: str) -> str:
"""Read input for the R console.
:param prompt: The message prompted.
:return: A string with the input returned by the user.
"""
return input(prompt)
_READCONSOLE_EXCEPTION_LOG = 'R[read into console]: %s'
_READCONSOLE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error with '
'_consoleread callback: %s')
@ffi_proxy.callback(ffi_proxy._consoleread_def,
_rinterface_cffi)
def _consoleread(prompt, buf, n: int, addtohistory) -> int:
success = None
try:
s = conversion._cchar_to_str(prompt)
reply = consoleread(s)
except Exception as e:
success = 0
logger.error(_READCONSOLE_EXCEPTION_LOG, str(e))
if success == 0:
return success
try:
# TODO: Should the coding by dynamically extracted from
# elsewhere ?
reply_b = reply.encode('utf-8')
reply_n = min(n, len(reply_b))
pybuf = bytearray(n)
pybuf[:reply_n] = reply_b[:reply_n]
ffi.memmove(buf,
pybuf,
n)
if reply_n == 0:
success = 0
else:
success = 1
except Exception as e:
success = 0
logger.error(_READCONSOLE_INTERNAL_EXCEPTION_LOG,
str(e))
return success
def consolereset() -> None:
pass
_RESETCONSOLE_EXCEPTION_LOG = 'R[reset console]: %s'
@ffi_proxy.callback(ffi_proxy._consolereset_def,
_rinterface_cffi)
def _consolereset() -> None:
try:
consolereset()
except Exception as e:
logger.error(_RESETCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consolewrite_print(s: str) -> None:
"""R writing to the console/terminal.
:param s: the data to write to the console/terminal.
"""
# TODO: is the callback for flush working with Linux ?
print(s, end='', flush=True)
[docs]def consolewrite_warnerror(s: str) -> None:
# TODO: use an rpy2/R-specific warning instead of UserWarning.
logger.warning(_WRITECONSOLE_EXCEPTION_LOG, s)
_WRITECONSOLE_EXCEPTION_LOG = 'R[write to console]: %s'
@ffi_proxy.callback(ffi_proxy._consolewrite_ex_def,
_rinterface_cffi)
def _consolewrite_ex(buf, n: int, otype) -> None:
s = conversion._cchar_to_str_with_maxlen(buf, maxlen=n)
try:
if otype == 0:
consolewrite_print(s)
else:
consolewrite_warnerror(s)
except Exception as e:
logger.error(_WRITECONSOLE_EXCEPTION_LOG, str(e))
[docs]def showmessage(s: str) -> None:
print('R wants to show a message')
print(s)
_SHOWMESSAGE_EXCEPTION_LOG = 'R[show message]: %s'
@ffi_proxy.callback(ffi_proxy._showmessage_def,
_rinterface_cffi)
def _showmessage(buf):
s = conversion._cchar_to_str(buf)
try:
showmessage(s)
except Exception as e:
logger.error(_SHOWMESSAGE_EXCEPTION_LOG, str(e))
def choosefile(new):
return input('Enter file name:')
_CHOOSEFILE_EXCEPTION_LOG = 'R[choose file]: %s'
@ffi_proxy.callback(ffi_proxy._choosefile_def,
_rinterface_cffi)
def _choosefile(new, buf, n: int) -> int:
try:
res = choosefile(new)
except Exception as e:
logger.error(_CHOOSEFILE_EXCEPTION_LOG, str(e))
res = None
if res is None:
return 0
res_cdata = conversion._str_to_cchar(res)
ffi.memmove(buf, res_cdata, len(res_cdata))
return len(res_cdata)
[docs]def showfiles(filenames: typing.Tuple[str],
headers: typing.Tuple[str],
wtitle: str, pager: str) -> None:
"""R showing files.
:param filenames: A tuple of file names.
:param headers: A tuple of strings (TODO: check what it is)
:wtitle: Title of the "window" showing the files.
"""
for fn in filenames:
print('File: %s' % fn)
with open(fn) as fh:
for row in fh:
print(row)
print('---')
_SHOWFILE_EXCEPTION_LOG = 'R[show file]: %s'
_SHOWFILE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error while '
'showing files for R: %s')
@ffi_proxy.callback(ffi_proxy._showfiles_def,
_rinterface_cffi)
def _showfiles(nfiles: int, files, headers, wtitle, delete, pager) -> int:
filenames: typing.List[str] = []
headers_str = []
wtitle_str = None
pager_str = None
try:
wtitle_str = conversion._cchar_to_str(wtitle)
pager_str = conversion._cchar_to_str(pager)
for i in range(nfiles):
filenames.append(conversion._cchar_to_str(files[i]))
headers_str.append(conversion._cchar_to_str(headers[i]))
except Exception as e:
logger.error(_SHOWFILE_INTERNAL_EXCEPTION_LOG, str(e))
if len(filenames):
res = 0
else:
res = 1
try:
showfiles(tuple(filenames),
tuple(headers_str),
wtitle_str,
pager_str)
except Exception as e:
res = 1
logger.error(_SHOWFILE_EXCEPTION_LOG, str(e))
return res
def cleanup(saveact, status, runlast):
pass
_CLEANUP_EXCEPTION_LOG = 'R[cleanup]: %s'
@ffi_proxy.callback(ffi_proxy._cleanup_def,
_rinterface_cffi)
def _cleanup(saveact, status, runlast):
try:
cleanup(saveact, status, runlast)
except Exception as e:
logger.error(_CLEANUP_EXCEPTION_LOG, str(e))
[docs]def processevents() -> None:
"""Process R events.
This function can be periodically called by R to handle
events such as window resizing in an interactive graphical
device."""
pass
_PROCESSEVENTS_EXCEPTION_LOG = 'R[processevents]: %s'
@ffi_proxy.callback(ffi_proxy._processevents_def,
_rinterface_cffi)
def _processevents() -> None:
try:
processevents()
except Exception as e:
logger.error(_PROCESSEVENTS_EXCEPTION_LOG, str(e))
[docs]def busy(x: int) -> None:
"""R is busy.
:param x: TODO this is an integer but do not know what it does.
"""
pass
_BUSY_EXCEPTION_LOG = 'R[busy]: %s'
@ffi_proxy.callback(ffi_proxy._busy_def,
_rinterface_cffi)
def _busy(which) -> None:
try:
busy(which)
except Exception as e:
logger.error(_BUSY_EXCEPTION_LOG, str(e))
def callback() -> None:
pass
_CALLBACK_EXCEPTION_LOG = 'R[callback]: %s'
@ffi_proxy.callback(ffi_proxy._callback_def,
_rinterface_cffi)
def _callback() -> None:
try:
callback()
except Exception as e:
logger.error(_CALLBACK_EXCEPTION_LOG, str(e))
[docs]def yesnocancel(question: str) -> int:
"""Asking a user to answer yes, no, or cancel.
:param question: The question asked to the user
:return: An integer with the answer.
"""
return int(input(question))
_YESNOCANCEL_EXCEPTION_LOG = 'R[yesnocancel]: %s'
@ffi_proxy.callback(ffi_proxy._yesnocancel_def,
_rinterface_cffi)
def _yesnocancel(question):
try:
q = conversion._cchar_to_str(question)
res = yesnocancel(q)
except Exception as e:
logger.error(_YESNOCANCEL_EXCEPTION_LOG, str(e))
return res