"""Callbacks available from R's C-API.
The callbacks make available in R's C-API can be specified as
Python functions, with the module providing the adapter code
that makes it possible."""
from contextlib import contextmanager
import logging
import typing
from _rinterface_cffi import ffi
from . import ffi_proxy
from . import conversion
logger = logging.getLogger(__name__)
# TODO: rename to "replace_in_module"
@contextmanager
def obj_in_module(module, name: str, obj):
obj_orig = getattr(module, name)
setattr(module, name, obj)
try:
yield
finally:
setattr(module, name, obj_orig)
[docs]def consoleflush():
pass
_FLUSHCONSOLE_EXCEPTION_LOG = 'R[flush console]: %s'
@ffi_proxy.callback(ffi_proxy._consoleflush_def,
ffi)
def _consoleflush():
try:
consoleflush()
except Exception as e:
logger.error(_FLUSHCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consoleread(prompt: str) -> str:
"""Read input for the R console.
:param prompt: The message prompted.
:return: A string with the input returned by the user.
"""
return input(prompt)
_READCONSOLE_EXCEPTION_LOG = 'R[read into console]: %s'
_READCONSOLE_INTERNAL_EXCEPTION_LOG = 'Internal rpy2 error with callback: %s'
@ffi_proxy.callback(ffi_proxy._consoleread_def,
ffi)
def _consoleread(prompt, buf, n: int, addtohistory) -> int:
success = None
try:
s = conversion._cchar_to_str(prompt)
reply = consoleread(s)
except Exception as e:
success = 0
logger.error(_READCONSOLE_EXCEPTION_LOG, str(e))
if success == 0:
return success
try:
# TODO: handle non-ASCII encodings
reply_b = reply.encode('ASCII')
reply_n = min(n, len(reply_b))
ffi.memmove(buf,
reply_b,
reply_n)
if reply_n < n:
buf[reply_n] = ord(b'\n')
for i in range(reply_n+1, n):
buf[i] = 0
if reply_n == 0:
success = 0
else:
success = 1
except Exception as e:
success = 0
logger.error(_READCONSOLE_INTERNAL_EXCEPTION_LOG,
str(e))
return success
def consolereset() -> None:
pass
_RESETCONSOLE_EXCEPTION_LOG = 'R[reset console]: %s'
@ffi_proxy.callback(ffi_proxy._consolereset_def,
ffi)
def _consolereset() -> None:
try:
consolereset()
except Exception as e:
logger.error(_RESETCONSOLE_EXCEPTION_LOG, str(e))
[docs]def consolewrite_print(s: str) -> None:
"""R writing to the console/terminal.
:param s: the data to write to the console/terminal.
"""
# TODO: is the callback for flush working with Linux ?
print(s, end='', flush=True)
[docs]def consolewrite_warnerror(s: str) -> None:
# TODO: use an rpy2/R-specific warning instead of UserWarning.
logger.warning(_WRITECONSOLE_EXCEPTION_LOG, s)
_WRITECONSOLE_EXCEPTION_LOG = 'R[write to console]: %s'
@ffi_proxy.callback(ffi_proxy._consolewrite_ex_def,
ffi)
def _consolewrite_ex(buf, n: int, otype) -> None:
s = conversion._cchar_to_str_with_maxlen(buf, maxlen=n)
try:
if otype == 0:
consolewrite_print(s)
else:
consolewrite_warnerror(s)
except Exception as e:
logger.error(_WRITECONSOLE_EXCEPTION_LOG, str(e))
[docs]def showmessage(s: str) -> None:
print('R wants to show a message')
print(s)
_SHOWMESSAGE_EXCEPTION_LOG = 'R[show message]: %s'
@ffi_proxy.callback(ffi_proxy._showmessage_def,
ffi)
def _showmessage(buf):
s = conversion._cchar_to_str(buf)
try:
showmessage(s)
except Exception as e:
logger.error(_SHOWMESSAGE_EXCEPTION_LOG, str(e))
def choosefile(new):
return input('Enter file name:')
_CHOOSEFILE_EXCEPTION_LOG = 'R[choose file]: %s'
@ffi_proxy.callback(ffi_proxy._choosefile_def,
ffi)
def _choosefile(new, buf, n: int) -> int:
try:
res = choosefile(new)
except Exception as e:
logger.error(_CHOOSEFILE_EXCEPTION_LOG, str(e))
res = None
if res is None:
return 0
res_cdata = conversion._str_to_cchar(res)
ffi.memmove(buf, res_cdata, len(res_cdata))
return len(res_cdata)
[docs]def showfiles(filenames: typing.Tuple[str],
headers: typing.Tuple[str],
wtitle: str, pager: str) -> None:
"""R showing files.
:param filenames: A tuple of file names.
:param headers: A tuple of strings (TODO: check what it is)
:wtitle: Title of the "window" showing the files.
"""
for fn in filenames:
print('File: %s' % fn)
with open(fn) as fh:
for row in fh:
print(row)
print('---')
_SHOWFILE_EXCEPTION_LOG = 'R[show file]: %s'
_SHOWFILE_INTERNAL_EXCEPTION_LOG = ('Internal rpy2 error while '
'showing files for R: %s')
@ffi_proxy.callback(ffi_proxy._showfiles_def,
ffi)
def _showfiles(nfiles: int, files, headers, wtitle, delete, pager) -> int:
filenames = []
headers_str = []
wtitle_str = None
pager_str = None
try:
wtitle_str = conversion._cchar_to_str(wtitle)
pager_str = conversion._cchar_to_str(pager)
for i in range(nfiles):
filenames.append(conversion._cchar_to_str(files[i]))
headers_str.append(conversion._cchar_to_str(headers[i]))
except Exception as e:
logger.error(_SHOWFILE_INTERNAL_EXCEPTION_LOG, str(e))
if len(filenames):
res = 0
else:
res = 1
try:
showfiles(tuple(filenames),
tuple(headers_str),
wtitle_str,
pager_str)
except Exception as e:
res = 1
logger.error(_SHOWFILE_EXCEPTION_LOG, str(e))
return res
def cleanup(saveact, status, runlast):
pass
_CLEANUP_EXCEPTION_LOG = 'R[cleanup]: %s'
@ffi_proxy.callback(ffi_proxy._cleanup_def,
ffi)
def _cleanup(saveact, status, runlast):
try:
cleanup(saveact, status, runlast)
except Exception as e:
logger.error(_CLEANUP_EXCEPTION_LOG, str(e))
[docs]def processevents() -> None:
"""Process R events.
This function can be periodically called by R to handle
events such as window resizing in an interactive graphical
device."""
pass
_PROCESSEVENTS_EXCEPTION_LOG = 'R[processevents]: %s'
@ffi_proxy.callback(ffi_proxy._processevents_def,
ffi)
def _processevents() -> None:
try:
processevents()
except Exception as e:
logger.error(_PROCESSEVENTS_EXCEPTION_LOG, str(e))
[docs]def busy(x: int) -> None:
"""R is busy.
:param x: TODO this is an integer but do not know what it does.
"""
pass
_BUSY_EXCEPTION_LOG = 'R[busy]: %s'
@ffi_proxy.callback(ffi_proxy._busy_def,
ffi)
def _busy(which) -> None:
try:
busy(which)
except Exception as e:
logger.error(_BUSY_EXCEPTION_LOG, str(e))
def callback() -> None:
pass
_CALLBACK_EXCEPTION_LOG = 'R[callback]: %s'
@ffi_proxy.callback(ffi_proxy._callback_def,
ffi)
def _callback() -> None:
try:
callback()
except Exception as e:
logger.error(_CALLBACK_EXCEPTION_LOG, str(e))
[docs]def yesnocancel(question: str) -> int:
"""Asking a user to answer yes, no, or cancel.
:param question: The question asked to the user
:return: An integer with the answer.
"""
return int(input(question))
_YESNOCANCEL_EXCEPTION_LOG = 'R[yesnocancel]: %s'
@ffi_proxy.callback(ffi_proxy._yesnocancel_def,
ffi)
def _yesnocancel(question):
try:
q = conversion._cchar_to_str(question)
res = yesnocancel(q)
except Exception as e:
logger.error(_YESNOCANCEL_EXCEPTION_LOG, str(e))
return res