import inspect
import os
import re
import textwrap
import typing
from typing import Union
import warnings
from collections import OrderedDict
from rpy2.robjects.robject import RObjectMixin
import rpy2.rinterface as rinterface
from rpy2.rinterface_lib import na_values
from rpy2.robjects import help
from rpy2.robjects import conversion
from rpy2.robjects.packages_utils import (default_symbol_r2python,
                                          default_symbol_resolve,
                                          _map_symbols,
                                          _fix_map_symbols)
baseenv_ri = rinterface.baseenv
# Needed to avoid circular imports.
__formals = baseenv_ri.find('formals')
__args = baseenv_ri.find('args')
__is_null = baseenv_ri.find('is.null')
def _formals_fixed(func):
    tmp = __args(func)
    if __is_null(tmp)[0]:
        return rinterface.NULL
    else:
        return __formals(tmp)
# docstring_property and DocstringProperty
# from Bradley Froehle
# https://gist.github.com/bfroehle/4041015
def docstring_property(class_doc):
    def wrapper(fget):
        return DocstringProperty(class_doc, fget)
    return wrapper
class DocstringProperty(object):
    def __init__(self, class_doc, fget):
        self.fget = fget
        self.class_doc = class_doc
    def __get__(self, obj, objtype=None):
        if obj is None:
            return self.class_doc
        else:
            return self.fget(obj)
    def __set__(self, obj, value):
        raise AttributeError("Cannot set the attribute")
    def __delete__(self, obj):
        raise AttributeError("Cannot delete the attribute")
def _repr_argval(obj):
    """ Helper functions to display an R object in the docstring.
    This a hack and will be hopefully replaced the extraction of
    information from the R help system."""
    try:
        size = len(obj)
        if size == 1:
            if obj[0].rid == rinterface.MissingArg.rid:
                # no default value
                s = None
            elif obj[0].rid == rinterface.NULL.rid:
                s = 'rinterface.NULL'
            else:
                s = str(obj[0][0])
        elif size > 1:
            s = '(%s, ...)' % str(obj[0][0])
        else:
            s = str(obj)
    except Exception:
        s = str(obj)
    return s
[docs]class Function(RObjectMixin, rinterface.SexpClosure):
    """ Python representation of an R function.
    """
    __local = baseenv_ri.find('local')
    __call = baseenv_ri.find('call')
    __assymbol = baseenv_ri.find('as.symbol')
    __newenv = baseenv_ri.find('new.env')
    _local_env = None
    def __init__(self, *args, **kwargs):
        super(Function, self).__init__(*args, **kwargs)
        self._local_env = self.__newenv(
            hash=rinterface.BoolSexpVector((True, ))
        )
    @docstring_property(__doc__)
    def __doc__(self) -> str:
        fm = _formals_fixed(self)
        doc = list(['Python representation of an R function.',
                    'R arguments:', ''])
        if fm is rinterface.NULL:
            doc.append('<No information available>')
        for key, val in zip(fm.do_slot('names'), fm):
            if key == '...':
                val = 'R ellipsis (any number of parameters)'
            doc.append('%s: %s' % (key, _repr_argval(val)))
        return os.linesep.join(doc)
    def __call__(self, *args, **kwargs):
        new_args = [conversion.py2rpy(a) for a in args]
        new_kwargs = {}
        for k, v in kwargs.items():
            # TODO: shouldn't this be handled by the conversion itself ?
            if isinstance(v, rinterface.Sexp):
                new_kwargs[k] = v
            else:
                new_kwargs[k] = conversion.py2rpy(v)
        res = super(Function, self).__call__(*new_args, **new_kwargs)
        res = conversion.rpy2py(res)
        return res
[docs]    def rcall(self,
              keyvals,
              environment: rinterface.SexpEnvironment) -> rinterface.sexp.Sexp:
        """ Wrapper around the parent method
        rpy2.rinterface.SexpClosure.rcall(). """
        res = super(Function, self).rcall(keyvals, environment)
        return res  
[docs]class SignatureTranslatedFunction(Function):
    """ Python representation of an R function, where
    the names in named argument are translated to valid
    argument names in Python. """
    _prm_translate: Union[OrderedDict, dict] = {}
    def __init__(self, sexp: rinterface.SexpClosure,
                 init_prm_translate=None,
                 on_conflict='warn',
                 symbol_r2python=default_symbol_r2python,
                 symbol_resolve=default_symbol_resolve):
        super(SignatureTranslatedFunction, self).__init__(sexp)
        if init_prm_translate is None:
            self._prm_translate = OrderedDict()
        else:
            assert isinstance(init_prm_translate, dict)
            self._prm_translate = init_prm_translate
        formals = self.formals()
        if formals.__sexp__._cdata != rinterface.NULL.__sexp__._cdata:
            (symbol_mapping,
             conflicts,
             resolutions) = _map_symbols(
                 formals.names,
                 translation=self._prm_translate,
                 symbol_r2python=symbol_r2python,
                 symbol_resolve=symbol_resolve)
            msg_prefix = ('Conflict when converting R symbols'
                          ' in the function\'s signature:\n- ')
            exception = ValueError
            _fix_map_symbols(symbol_mapping,
                             conflicts,
                             on_conflict,
                             msg_prefix,
                             exception)
            symbol_mapping.update(resolutions)
            reserved_pynames = set(dir(self))
            self._prm_translate.update((k, v[0])
                                       for k, v in symbol_mapping.items())
        if hasattr(sexp, '__rname__'):
            self.__rname__ = sexp.__rname__
    def __call__(self, *args, **kwargs):
        prm_translate = self._prm_translate
        for k in tuple(kwargs.keys()):
            r_k = prm_translate.get(k, None)
            if r_k is not None:
                v = kwargs.pop(k)
                kwargs[r_k] = v
        return (super(SignatureTranslatedFunction, self)
                .__call__(*args, **kwargs)) 
pattern_link = re.compile(r'\\link\{(.+?)\}')
pattern_code = re.compile(r'\\code\{(.+?)\}')
pattern_samp = re.compile(r'\\samp\{(.+?)\}')
class DocumentedSTFunction(SignatureTranslatedFunction):
    def __init__(self, sexp: rinterface.SexpClosure,
                 init_prm_translate=None,
                 packagename: typing.Optional[str] = None):
        super(DocumentedSTFunction,
              self).__init__(sexp,
                             init_prm_translate=init_prm_translate)
        self.__rpackagename__ = packagename
    @docstring_property(__doc__)
    def __doc__(self):
        doc = ['Python representation of an R function.']
        description = help.docstring(self.__rpackagename__,
                                     self.__rname__,
                                     sections=['description'])
        doc.append(description)
        fm = _formals_fixed(self)
        names = fm.do_slot('names')
        doc.append(self.__rname__+'(')
        for key, val in self._prm_translate.items():
            if key == '___':
                description = ('(was "..."). R ellipsis '
                               '(any number of parameters)')
            else:
                description = _repr_argval(fm[names.index(val)])
            if description is None:
                doc.append('    %s,' % key)
            else:
                doc.append('    %s = %s,' % (key, description))
        doc.extend((')', ''))
        package = help.Package(self.__rpackagename__)
        page = package.fetch(self.__rname__)
        for item in page.arguments():
            description = item.value
            description = description.replace('\n', '')
            description, count = pattern_link.subn(r'\1', description)
            description, count = pattern_code.subn(r'`\1`', description)
            description, count = pattern_samp.subn(r'`\1`', description)
            doc.append(' '.join((item.name, ': ', description, ',')))
            doc.append('')
        return os.linesep.join(doc)
# TODO: shouldn't this be in a more central place / or more general interest ?
_SCALAR_COMPAT_RTYPES = set(
    getattr(rinterface.RTYPES, name).value
    for name in ('STRSXP', 'INTSXP', 'REALSXP', 'LGLSXP', 'CPLXSXP')
)
def _map_default_value(value: rinterface.Sexp):
    """
    Map default in the R signature.
    Because of R's lazy evaluation some default might be unevaluated expressions.
    Args:
      value:
    """
    if value.__sexp__.typeof in _SCALAR_COMPAT_RTYPES:
        if len(value) == 1:
            res = value[0]
        else:
            res = value
    else:
        res = value
    return res
def map_signature(
        r_func: SignatureTranslatedFunction,
        is_method: bool = False,
        map_default: typing.Optional[
            typing.Callable[[rinterface.Sexp], typing.Any]
        ] = _map_default_value
) -> typing.Tuple[inspect.Signature, typing.Optional[int]]:
    """
    Map the signature of an function to the signature of a Python function.
    While mapping the signature, it will report the eventual presence of
    an R ellipsis.
    Args:
        r_func (SignatureTranslatedFunction): an R function
        is_method (bool): Whether the function should be treated as a method
            (adds a `self` param to the signature if so).
        map_default (function): Function to map default values in the Python
            signature. No mapping to default values is done if None.
    Returns:
        A tuple (inspect.Signature, int or None).
    """
    params = []
    r_ellipsis = None
    if is_method:
        params.append(inspect.Parameter('self',
                                        inspect.Parameter.POSITIONAL_ONLY))
    r_params = r_func.formals()
    rev_prm_transl = {v: k for k, v in r_func._prm_translate.items()}
    if r_params.names is not rinterface.NULL:
        for i, (name, default_orig) in enumerate(zip(r_params.names, r_params)):
            if default_orig == '...':
                r_ellipsis = i
                warnings.warn('The R ellispsis is not yet well supported.')
            transl_name = rev_prm_transl.get(name)
            default_orig = default_orig[0]
            if map_default and not rinterface.MissingArg.rsame(default_orig):
                default_mapped = map_default(default_orig)
            else:
                default_mapped = inspect.Parameter.empty
            prm = inspect.Parameter(
                transl_name if transl_name else name,
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
                default=default_mapped
            )
            params.append(prm)
    return (inspect.Signature(params), r_ellipsis)
def wrap_docstring_default(
        r_func: SignatureTranslatedFunction,
        is_method: bool,
        signature: inspect.Signature,
        r_ellipsis: typing.Optional[int], *,
        full_repr: bool = False
) -> str:
    """
    Create a docstring that for a wrapped function.
    Args:
        r_func (SignatureTranslatedFunction): an R function
        is_method (bool): Whether the function should be treated as a method
            (a `self` parameter is added to the signature if so).
        signature (inspect.Signature): A mapped signature for `r_func`
        r_ellipsis (bool): Index of the parameter containing the R ellipsis (`...`).
            None if the R ellipsis is not in the function signature.
        full_repr (bool): Whether to have the full body of the R function in
            the docstring dynamically generated.
    Returns:
        A string.
    """
    docstring = []
    docstring.append('This {} wraps the following R function.'
                     .format('method' if is_method else 'function'))
    if r_ellipsis:
        docstring.extend(
            ('',
             textwrap.dedent(
                 """The R ellipsis "..." present in the function's parameters
                 is mapped to a python iterable of (name, value) pairs (such as
                 it is returned by the `dict` method `items()` for example."""),
             ''
            )
        )
    if full_repr:
        docstring.append('\n{}'.format(r_func.r_repr()))
    else:
        r_repr = r_func.r_repr()
        i = r_repr.find('\n{')
        if i == -1:
            docstring.append('\n{}'.format(r_func.r_repr()))
        else:
            docstring.append('\n{}\n{{\n  ...\n}}'.format(r_repr[:i]))
    
    return '\n'.join(docstring)
def wrap_r_function(
        r_func: SignatureTranslatedFunction, name: str, *,
        is_method: bool = False, full_repr: bool = False,
        map_default: typing.Optional[
            typing.Callable[[rinterface.Sexp],
                            typing.Any]
            ] = _map_default_value,
        wrap_docstring: typing.Optional[
            typing.Callable[[SignatureTranslatedFunction,
                             bool,
                             inspect.Signature,
                             typing.Optional[int]],
                            str]
            ] = wrap_docstring_default
) -> typing.Callable:
    """
    Wrap an rpy2 function handle with a Python function with a matching signature.
    Args:
        r_func (rpy2.robjects.functions.SignatureTranslatedFunction): The
        function to be wrapped.
        name (str): The name of the function.
        is_method (bool): Whether the function should be treated as a method
        (adds a `self` param to the signature if so).
        map_default (function): Function to map default values in the Python
        signature. No mapping to default values is done if None.
    Returns:
        A function wrapping an underlying R function.
    """
    name = name.replace('.', '_')
    signature, r_ellipsis = map_signature(r_func, is_method=is_method,
                                          map_default=map_default)
    if r_ellipsis:
        def wrapped_func(*args, **kwargs):
            new_args = (list((None, x) for x in rinterface.args[:r_ellipsis]) +
                        list(args[r_ellipsis]) +
                        list((None, x) for x in args[min(r_ellipsis+1, len(args)-1):]) +
                        list(kwargs.items()))
            value = r_func.rcall(new_args, rinterface.globalenv)
            return value
    else:
        def wrapped_func(*args, **kwargs):
            value = r_func(*args, **kwargs)
            return value
    if wrap_docstring:
        docstring = wrap_docstring(r_func, is_method, signature, r_ellipsis)
    else:
        docstring = 'This is a dynamically created wrapper for an R function.'
    wrapped_func.__name__ = name
    wrapped_func.__qualname__ = name
    wrapped_func.__signature__ = signature
    wrapped_func.__doc__ = docstring
    return wrapped_func