Source code for rpy2.robjects.functions

import inspect
import os
import re
import textwrap
import typing
from typing import Union
import warnings
from collections import OrderedDict
from rpy2.robjects.robject import RObjectMixin
import rpy2.rinterface as rinterface
from rpy2.rinterface_lib import na_values
from rpy2.robjects import help
from rpy2.robjects import conversion

from rpy2.robjects.packages_utils import (default_symbol_r2python,
                                          default_symbol_resolve,
                                          _map_symbols,
                                          _fix_map_symbols)


baseenv_ri = rinterface.baseenv

# Needed to avoid circular imports.
__formals = baseenv_ri.find('formals')
__args = baseenv_ri.find('args')
__is_null = baseenv_ri.find('is.null')


def _formals_fixed(func):
    tmp = __args(func)
    if __is_null(tmp)[0]:
        return rinterface.NULL
    else:
        return __formals(tmp)


# docstring_property and DocstringProperty
# from Bradley Froehle
# https://gist.github.com/bfroehle/4041015
def docstring_property(class_doc):
    def wrapper(fget):
        return DocstringProperty(class_doc, fget)
    return wrapper


class DocstringProperty(object):
    def __init__(self, class_doc, fget):
        self.fget = fget
        self.class_doc = class_doc

    def __get__(self, obj, objtype=None):
        if obj is None:
            return self.class_doc
        else:
            return self.fget(obj)

    def __set__(self, obj, value):
        raise AttributeError("Cannot set the attribute")

    def __delete__(self, obj):
        raise AttributeError("Cannot delete the attribute")


def _repr_argval(obj):
    """ Helper functions to display an R object in the docstring.
    This a hack and will be hopefully replaced the extraction of
    information from the R help system."""
    try:
        size = len(obj)
        if size == 1:
            if obj[0].rid == rinterface.MissingArg.rid:
                # no default value
                s = None
            elif obj[0].rid == rinterface.NULL.rid:
                s = 'rinterface.NULL'
            else:
                s = str(obj[0][0])
        elif size > 1:
            s = '(%s, ...)' % str(obj[0][0])
        else:
            s = str(obj)
    except Exception:
        s = str(obj)
    return s


[docs]class Function(RObjectMixin, rinterface.SexpClosure): """ Python representation of an R function. """ __local = baseenv_ri.find('local') __call = baseenv_ri.find('call') __assymbol = baseenv_ri.find('as.symbol') __newenv = baseenv_ri.find('new.env') _local_env = None def __init__(self, *args, **kwargs): super(Function, self).__init__(*args, **kwargs) self._local_env = self.__newenv( hash=rinterface.BoolSexpVector((True, )) ) @docstring_property(__doc__) def __doc__(self) -> str: fm = _formals_fixed(self) doc = list(['Python representation of an R function.', 'R arguments:', '']) if fm is rinterface.NULL: doc.append('<No information available>') for key, val in zip(fm.do_slot('names'), fm): if key == '...': val = 'R ellipsis (any number of parameters)' doc.append('%s: %s' % (key, _repr_argval(val))) return os.linesep.join(doc) def __call__(self, *args, **kwargs): new_args = [conversion.py2rpy(a) for a in args] new_kwargs = {} for k, v in kwargs.items(): # TODO: shouldn't this be handled by the conversion itself ? if isinstance(v, rinterface.Sexp): new_kwargs[k] = v else: new_kwargs[k] = conversion.py2rpy(v) res = super(Function, self).__call__(*new_args, **new_kwargs) res = conversion.rpy2py(res) return res
[docs] def formals(self): """ Return the signature of the underlying R function (as the R function 'formals()' would). """ res = _formals_fixed(self) res = conversion.rpy2py(res) return res
[docs] def rcall(self, keyvals, environment: rinterface.SexpEnvironment) -> rinterface.sexp.Sexp: """ Wrapper around the parent method rpy2.rinterface.SexpClosure.rcall(). """ res = super(Function, self).rcall(keyvals, environment) return res
[docs]class SignatureTranslatedFunction(Function): """ Python representation of an R function, where the names in named argument are translated to valid argument names in Python. """ _prm_translate: Union[OrderedDict, dict] = {} def __init__(self, sexp: rinterface.SexpClosure, init_prm_translate=None, on_conflict='warn', symbol_r2python=default_symbol_r2python, symbol_resolve=default_symbol_resolve): super(SignatureTranslatedFunction, self).__init__(sexp) if init_prm_translate is None: self._prm_translate = OrderedDict() else: assert isinstance(init_prm_translate, dict) self._prm_translate = init_prm_translate formals = self.formals() if formals.__sexp__._cdata != rinterface.NULL.__sexp__._cdata: (symbol_mapping, conflicts, resolutions) = _map_symbols( formals.names, translation=self._prm_translate, symbol_r2python=symbol_r2python, symbol_resolve=symbol_resolve) msg_prefix = ('Conflict when converting R symbols' ' in the function\'s signature:\n- ') exception = ValueError _fix_map_symbols(symbol_mapping, conflicts, on_conflict, msg_prefix, exception) symbol_mapping.update(resolutions) reserved_pynames = set(dir(self)) self._prm_translate.update((k, v[0]) for k, v in symbol_mapping.items()) if hasattr(sexp, '__rname__'): self.__rname__ = sexp.__rname__ def __call__(self, *args, **kwargs): prm_translate = self._prm_translate for k in tuple(kwargs.keys()): r_k = prm_translate.get(k, None) if r_k is not None: v = kwargs.pop(k) kwargs[r_k] = v return (super(SignatureTranslatedFunction, self) .__call__(*args, **kwargs))
pattern_link = re.compile(r'\\link\{(.+?)\}') pattern_code = re.compile(r'\\code\{(.+?)\}') pattern_samp = re.compile(r'\\samp\{(.+?)\}') class DocumentedSTFunction(SignatureTranslatedFunction): def __init__(self, sexp: rinterface.SexpClosure, init_prm_translate=None, packagename: typing.Optional[str] = None): super(DocumentedSTFunction, self).__init__(sexp, init_prm_translate=init_prm_translate) self.__rpackagename__ = packagename @docstring_property(__doc__) def __doc__(self): doc = ['Python representation of an R function.'] description = help.docstring(self.__rpackagename__, self.__rname__, sections=['description']) doc.append(description) fm = _formals_fixed(self) names = fm.do_slot('names') doc.append(self.__rname__+'(') for key, val in self._prm_translate.items(): if key == '___': description = ('(was "..."). R ellipsis ' '(any number of parameters)') else: description = _repr_argval(fm[names.index(val)]) if description is None: doc.append(' %s,' % key) else: doc.append(' %s = %s,' % (key, description)) doc.extend((')', '')) package = help.Package(self.__rpackagename__) page = package.fetch(self.__rname__) for item in page.arguments(): description = item.value description = description.replace('\n', '') description, count = pattern_link.subn(r'\1', description) description, count = pattern_code.subn(r'`\1`', description) description, count = pattern_samp.subn(r'`\1`', description) doc.append(' '.join((item.name, ': ', description, ','))) doc.append('') return os.linesep.join(doc) # TODO: shouldn't this be in a more central place / or more general interest ? _SCALAR_COMPAT_RTYPES = set( getattr(rinterface.RTYPES, name).value for name in ('STRSXP', 'INTSXP', 'REALSXP', 'LGLSXP', 'CPLXSXP') ) def _map_default_value(value: rinterface.Sexp): """ Map default in the R signature. Because of R's lazy evaluation some default might be unevaluated expressions. Args: value: """ if value.__sexp__.typeof in _SCALAR_COMPAT_RTYPES: if len(value) == 1: res = value[0] else: res = value else: res = value return res def map_signature( r_func: SignatureTranslatedFunction, is_method: bool = False, map_default: typing.Optional[ typing.Callable[[rinterface.Sexp], typing.Any] ] = _map_default_value ) -> typing.Tuple[inspect.Signature, typing.Optional[int]]: """ Map the signature of an function to the signature of a Python function. While mapping the signature, it will report the eventual presence of an R ellipsis. Args: r_func (SignatureTranslatedFunction): an R function is_method (bool): Whether the function should be treated as a method (adds a `self` param to the signature if so). map_default (function): Function to map default values in the Python signature. No mapping to default values is done if None. Returns: A tuple (inspect.Signature, int or None). """ params = [] r_ellipsis = None if is_method: params.append(inspect.Parameter('self', inspect.Parameter.POSITIONAL_ONLY)) r_params = r_func.formals() rev_prm_transl = {v: k for k, v in r_func._prm_translate.items()} if r_params.names is not rinterface.NULL: for i, (name, default_orig) in enumerate(zip(r_params.names, r_params)): if default_orig == '...': r_ellipsis = i warnings.warn('The R ellispsis is not yet well supported.') transl_name = rev_prm_transl.get(name) default_orig = default_orig[0] if map_default and not rinterface.MissingArg.rsame(default_orig): default_mapped = map_default(default_orig) else: default_mapped = inspect.Parameter.empty prm = inspect.Parameter( transl_name if transl_name else name, inspect.Parameter.POSITIONAL_OR_KEYWORD, default=default_mapped ) params.append(prm) return (inspect.Signature(params), r_ellipsis) def wrap_docstring_default( r_func: SignatureTranslatedFunction, is_method: bool, signature: inspect.Signature, r_ellipsis: typing.Optional[int], *, full_repr: bool = False ) -> str: """ Create a docstring that for a wrapped function. Args: r_func (SignatureTranslatedFunction): an R function is_method (bool): Whether the function should be treated as a method (a `self` parameter is added to the signature if so). signature (inspect.Signature): A mapped signature for `r_func` r_ellipsis (bool): Index of the parameter containing the R ellipsis (`...`). None if the R ellipsis is not in the function signature. full_repr (bool): Whether to have the full body of the R function in the docstring dynamically generated. Returns: A string. """ docstring = [] docstring.append('This {} wraps the following R function.' .format('method' if is_method else 'function')) if r_ellipsis: docstring.extend( ('', textwrap.dedent( """The R ellipsis "..." present in the function's parameters is mapped to a python iterable of (name, value) pairs (such as it is returned by the `dict` method `items()` for example."""), '' ) ) if full_repr: docstring.append('\n{}'.format(r_func.r_repr())) else: r_repr = r_func.r_repr() i = r_repr.find('\n{') if i == -1: docstring.append('\n{}'.format(r_func.r_repr())) else: docstring.append('\n{}\n{{\n ...\n}}'.format(r_repr[:i])) return '\n'.join(docstring) def wrap_r_function( r_func: SignatureTranslatedFunction, name: str, *, is_method: bool = False, full_repr: bool = False, map_default: typing.Optional[ typing.Callable[[rinterface.Sexp], typing.Any] ] = _map_default_value, wrap_docstring: typing.Optional[ typing.Callable[[SignatureTranslatedFunction, bool, inspect.Signature, typing.Optional[int]], str] ] = wrap_docstring_default ) -> typing.Callable: """ Wrap an rpy2 function handle with a Python function with a matching signature. Args: r_func (rpy2.robjects.functions.SignatureTranslatedFunction): The function to be wrapped. name (str): The name of the function. is_method (bool): Whether the function should be treated as a method (adds a `self` param to the signature if so). map_default (function): Function to map default values in the Python signature. No mapping to default values is done if None. Returns: A function wrapping an underlying R function. """ name = name.replace('.', '_') signature, r_ellipsis = map_signature(r_func, is_method=is_method, map_default=map_default) if r_ellipsis: def wrapped_func(*args, **kwargs): new_args = (list((None, x) for x in rinterface.args[:r_ellipsis]) + list(args[r_ellipsis]) + list((None, x) for x in args[min(r_ellipsis+1, len(args)-1):]) + list(kwargs.items())) value = r_func.rcall(new_args, rinterface.globalenv) return value else: def wrapped_func(*args, **kwargs): value = r_func(*args, **kwargs) return value if wrap_docstring: docstring = wrap_docstring(r_func, is_method, signature, r_ellipsis) else: docstring = 'This is a dynamically created wrapper for an R function.' wrapped_func.__name__ = name wrapped_func.__qualname__ = name wrapped_func.__signature__ = signature wrapped_func.__doc__ = docstring return wrapped_func