Source code for rpy2.robjects.functions

import inspect
import os
import re
import textwrap
import typing
from typing import Union
import warnings
from collections import OrderedDict
from rpy2.robjects.robject import RObjectMixin
import rpy2.rinterface as rinterface
import rpy2.rinterface_lib.sexp
from rpy2.robjects import help
from rpy2.robjects import conversion
from rpy2.robjects.vectors import Vector

from rpy2.robjects.packages_utils import (default_symbol_r2python,
                                          default_symbol_resolve,
                                          _map_symbols,
                                          _fix_map_symbols)


baseenv_ri = rinterface.baseenv

# Needed to avoid circular imports.
__formals = baseenv_ri.find('formals')
__args = baseenv_ri.find('args')
__is_null = baseenv_ri.find('is.null')


def _formals_fixed(func):
    if func.typeof in (rpy2.rinterface_lib.sexp.RTYPES.SPECIALSXP,
                       rpy2.rinterface_lib.sexp.RTYPES.BUILTINSXP):
        res = rpy2.rinterface_lib.sexp.NULL
    else:
        res = __formals(func)
        if res is rpy2.rinterface_lib.sexp.NULL:
            res = __formals(__args(func))
    return res


# docstring_property and DocstringProperty
# from Bradley Froehle
# https://gist.github.com/bfroehle/4041015
def docstring_property(class_doc):
    def wrapper(fget):
        return DocstringProperty(class_doc, fget)
    return wrapper


class DocstringProperty(object):
    def __init__(self, class_doc, fget):
        self.fget = fget
        self.class_doc = class_doc

    def __get__(self, obj, objtype=None):
        if obj is None:
            return self.class_doc
        else:
            return self.fget(obj)

    def __set__(self, obj, value):
        raise AttributeError("Cannot set the attribute")

    def __delete__(self, obj):
        raise AttributeError("Cannot delete the attribute")


def _repr_argval(obj):
    """ Helper functions to display an R object in the docstring.
    This a hack and will be hopefully replaced the extraction of
    information from the R help system."""
    try:
        size = len(obj)
        if size == 1:
            if obj[0].rid == rinterface.MissingArg.rid:
                # no default value
                s = None
            elif obj[0].rid == rinterface.NULL.rid:
                s = 'rinterface.NULL'
            else:
                s = str(obj[0][0])
        elif size > 1:
            s = '(%s, ...)' % str(obj[0][0])
        else:
            s = str(obj)
    except Exception:
        s = str(obj)
    return s


[docs] class Function(RObjectMixin, rinterface.SexpClosure): """ Python representation of an R function. """ __local = baseenv_ri.find('local') __call = baseenv_ri.find('call') __assymbol = baseenv_ri.find('as.symbol') __newenv = baseenv_ri.find('new.env') _local_env = None def __init__(self, *args, **kwargs): super(Function, self).__init__(*args, **kwargs) self._local_env = self.__newenv( hash=rinterface.BoolSexpVector((True, )) ) @docstring_property(__doc__) def __doc__(self) -> str: fm = _formals_fixed(self) doc = list(['Python representation of an R function.', 'R arguments:', '']) if fm is rinterface.NULL: doc.append('<No information available>') else: for key, val in zip(fm.do_slot('names'), fm): if key == '...': val = 'R ellipsis (any number of parameters)' doc.append('%s: %s' % (key, _repr_argval(val))) return os.linesep.join(doc) def __call__(self, *args, **kwargs): cv = conversion.get_conversion() new_args = [cv.py2rpy(a) for a in args] new_kwargs = {} for k, v in kwargs.items(): # TODO: shouldn't this be handled by the conversion itself ? if isinstance(v, rinterface.Sexp): new_kwargs[k] = v else: new_kwargs[k] = cv.py2rpy(v) res = super(Function, self).__call__(*new_args, **new_kwargs) res = cv.rpy2py(res) return res
[docs] def formals(self): """ Return the signature of the underlying R function (as the R function 'formals()' would). """ res = _formals_fixed(self) res = conversion.get_conversion().rpy2py(res) return res
[docs] def rcall( self, keyvals, environment: typing.Optional[rinterface.SexpEnvironment] = None ) -> rinterface.sexp.Sexp: """ Wrapper around the parent method rpy2.rinterface.SexpClosure.rcall(). """ res = super(Function, self).rcall(keyvals, environment=environment) return res
[docs] class SignatureTranslatedFunction(Function): """ Python representation of an R function, where the names in named argument are translated to valid argument names in Python. """ _prm_translate: Union[OrderedDict, dict] = {} def __init__(self, sexp: rinterface.SexpClosure, init_prm_translate=None, on_conflict='warn', symbol_r2python=default_symbol_r2python, symbol_resolve=default_symbol_resolve): super(SignatureTranslatedFunction, self).__init__(sexp) if init_prm_translate is None: self._prm_translate = OrderedDict() else: assert isinstance(init_prm_translate, dict) self._prm_translate = init_prm_translate formals = self.formals() if formals.__sexp__._cdata != rinterface.NULL.__sexp__._cdata: (symbol_mapping, conflicts, resolutions) = _map_symbols( formals.names, translation=self._prm_translate, symbol_r2python=symbol_r2python, symbol_resolve=symbol_resolve) msg_prefix = ('Conflict when converting R symbols' ' in the function\'s signature:\n- ') exception = ValueError _fix_map_symbols(symbol_mapping, conflicts, on_conflict, msg_prefix, exception) symbol_mapping.update(resolutions) # TODO: Why was this done? # reserved_pynames = set(dir(self)) self._prm_translate.update((k, v[0]) for k, v in symbol_mapping.items()) if hasattr(sexp, '__rname__'): # TODO: mypy does not use the line above and trips on # __rname__ being not always present. self.__rname__ = sexp.__rname__ # type: ignore def __call__(self, *args, **kwargs): prm_translate = self._prm_translate for k in tuple(kwargs.keys()): r_k = prm_translate.get(k, None) if r_k is not None: v = kwargs.pop(k) kwargs[r_k] = v return (super(SignatureTranslatedFunction, self) .__call__(*args, **kwargs))
pattern_link = re.compile(r'\\link\{(.+?)\}') pattern_code = re.compile(r'\\code\{(.+?)\}') pattern_samp = re.compile(r'\\samp\{(.+?)\}') class DocumentedSTFunction(SignatureTranslatedFunction): def __init__(self, sexp: rinterface.SexpClosure, init_prm_translate=None, packagename: typing.Optional[str] = None): super(DocumentedSTFunction, self).__init__(sexp, init_prm_translate=init_prm_translate) self.__rpackagename__ = packagename @docstring_property(__doc__) def __doc__(self): package = help.Package(self.__rpackagename__) page = package.fetch(self.__rname__) doc = ['Wrapper around an R function.', '', 'The docstring below is built from the R documentation.', ''] if r'\description' in page.sections: doc.append( page.to_docstring( section_names=[r'\description'] ) ) fm = _formals_fixed(self) if fm is rpy2.rinterface_lib.sexp.NULL: # If still NULL there is no argument. names = tuple() else: names = fm.do_slot('names') doc.append(self.__rname__+'(') for key, val in self._prm_translate.items(): if key == '___': description = ('(was "..."). R ellipsis ' '(any number of parameters)') else: description = _repr_argval(fm[names.index(val)]) if description is None: doc.append(' %s,' % key) else: doc.append(' %s = %s,' % (key, description)) doc.extend((')', '')) #### doc.append('Args:') for item in page.arguments(): description = ('%s ' % os.linesep).join(item.value) doc.append(' '.join((' ', item.name, ': ', description))) doc.append('') if r'\details' in page.sections: doc.append( page.to_docstring( section_names=[r'\details'] ) ) return os.linesep.join(doc) # TODO: shouldn't this be in a more central place / or more general interest ? _SCALAR_COMPAT_RTYPES = set( getattr(rinterface.RTYPES, name).value for name in ('STRSXP', 'INTSXP', 'REALSXP', 'LGLSXP', 'CPLXSXP') ) def _map_default_value(value: rinterface.Sexp): """ Map default in the R signature. Because of R's lazy evaluation some default might be unevaluated expressions. Args: value: """ if value.__sexp__.typeof in _SCALAR_COMPAT_RTYPES: # TODO: The dynamic check of typeof (to ensure that that # the underlying R object is of a compatible type makes # mypy trip. There is no way to check type outside of runtime. # Code refactoring would be needed. if len(value) == 1: # type: ignore res = value[0] # type: ignore else: res = value else: res = value return res def map_signature( r_func: SignatureTranslatedFunction, is_method: bool = False, map_default: typing.Optional[ typing.Callable[[rinterface.Sexp], typing.Any] ] = _map_default_value ) -> typing.Tuple[inspect.Signature, typing.Optional[int]]: """ Map the signature of an function to the signature of a Python function. While mapping the signature, it will report the eventual presence of an R ellipsis. Args: r_func (SignatureTranslatedFunction): an R function is_method (bool): Whether the function should be treated as a method (adds a `self` param to the signature if so). map_default (function): Function to map default values in the Python signature. No mapping to default values is done if None. Returns: A tuple (inspect.Signature, int or None). """ params = [] r_ellipsis = None if is_method: params.append(inspect.Parameter('self', inspect.Parameter.POSITIONAL_ONLY)) r_params = r_func.formals() rev_prm_transl = {v: k for k, v in r_func._prm_translate.items()} if r_params.names is not rinterface.NULL: for i, (name, default_orig) in enumerate( zip(r_params.names, r_params) ): if default_orig == '...': r_ellipsis = i warnings.warn('The R ellispsis is not yet well supported.') transl_name = rev_prm_transl.get(name) if isinstance(default_orig, Vector): default_orig = default_orig[0] if map_default and not rinterface.MissingArg.rsame(default_orig): default_mapped = map_default(default_orig) else: default_mapped = inspect.Parameter.empty else: default_mapped = default_orig prm = inspect.Parameter( transl_name if transl_name else name, inspect.Parameter.POSITIONAL_OR_KEYWORD, default=default_mapped ) params.append(prm) return (inspect.Signature(params), r_ellipsis) def wrap_docstring_default( r_func: SignatureTranslatedFunction, is_method: bool, signature: inspect.Signature, r_ellipsis: typing.Optional[int], *, full_repr: bool = False ) -> str: """ Create a docstring that for a wrapped function. Args: r_func (SignatureTranslatedFunction): an R function is_method (bool): Whether the function should be treated as a method (a `self` parameter is added to the signature if so). signature (inspect.Signature): A mapped signature for `r_func` r_ellipsis (bool): Index of the parameter containing the R ellipsis (`...`). None if the R ellipsis is not in the function signature. full_repr (bool): Whether to have the full body of the R function in the docstring dynamically generated. Returns: A string. """ docstring = [] docstring.append('This {} wraps the following R function.' .format('method' if is_method else 'function')) if r_ellipsis: docstring.extend( ('', textwrap.dedent( """The R ellipsis "..." present in the function's parameters is mapped to a python iterable of (name, value) pairs (such as it is returned by the `dict` method `items()` for example.""" ), '') ) if full_repr: docstring.append('\n{}'.format(r_func.r_repr())) else: r_repr = r_func.r_repr() i = r_repr.find('\n{') if i == -1: docstring.append('\n{}'.format(r_func.r_repr())) else: docstring.append('\n{}\n{{\n ...\n}}'.format(r_repr[:i])) return '\n'.join(docstring) def wrap_r_function( r_func: SignatureTranslatedFunction, name: str, *, is_method: bool = False, full_repr: bool = False, map_default: typing.Optional[ typing.Callable[[rinterface.Sexp], typing.Any] ] = _map_default_value, wrap_docstring: typing.Optional[ typing.Callable[[SignatureTranslatedFunction, bool, inspect.Signature, typing.Optional[int]], str] ] = wrap_docstring_default ) -> typing.Callable: """ Wrap an rpy2 function handle with a Python function of matching signature. Args: r_func (rpy2.robjects.functions.SignatureTranslatedFunction): The function to be wrapped. name (str): The name of the function. is_method (bool): Whether the function should be treated as a method (adds a `self` param to the signature if so). map_default (function): Function to map default values in the Python signature. No mapping to default values is done if None. Returns: A function wrapping an underlying R function. """ name = name.replace('.', '_') signature, r_ellipsis = map_signature(r_func, is_method=is_method, map_default=map_default) if r_ellipsis: def wrapped_func(*args, **kwargs): new_args = ( list((None, x) for x in rinterface.args[:r_ellipsis]) + list(args[r_ellipsis]) + list((None, x) for x in args[min(r_ellipsis+1, len(args)-1):]) + list(kwargs.items())) value = r_func.rcall(new_args) return value else: def wrapped_func(*args, **kwargs): value = r_func(*args, **kwargs) return value if wrap_docstring: docstring = wrap_docstring(r_func, is_method, signature, r_ellipsis) else: docstring = 'This is a dynamically created wrapper for an R function.' wrapped_func.__name__ = name wrapped_func.__qualname__ = name # TODO: Open issue in mypy about __signature. # https://github.com/python/mypy/issues/5958 # Ignore the type check for now. wrapped_func.__signature__ = signature # type: ignore wrapped_func.__doc__ = docstring return wrapped_func