import os
import typing
import warnings
from types import ModuleType
from warnings import warn
import rpy2.rinterface as rinterface
from . import conversion
from rpy2.robjects.functions import (SignatureTranslatedFunction,
docstring_property,
DocumentedSTFunction)
from rpy2.robjects import Environment
from rpy2.robjects.packages_utils import (
default_symbol_r2python,
default_symbol_resolve,
_map_symbols,
_fix_map_symbols
)
import rpy2.robjects.help as rhelp
_require = rinterface.baseenv['require']
_library = rinterface.baseenv['library']
_as_env = rinterface.baseenv['as.environment']
_package_has_namespace = rinterface.baseenv['packageHasNamespace']
_system_file = rinterface.baseenv['system.file']
_get_namespace = rinterface.baseenv['getNamespace']
_get_namespace_version = rinterface.baseenv['getNamespaceVersion']
_get_namespace_exports = rinterface.baseenv['getNamespaceExports']
_loaded_namespaces = rinterface.baseenv['loadedNamespaces']
_globalenv = rinterface.globalenv
_new_env = rinterface.baseenv["new.env"]
StrSexpVector = rinterface.StrSexpVector
# Fetching symbols in the namespace "utils" assumes that "utils" is loaded
# (currently the case by default in R).
_data = rinterface.baseenv['::'](StrSexpVector(('utils', )),
StrSexpVector(('data', )))
_reval = rinterface.baseenv['eval']
_options = rinterface.baseenv['options']
[docs]
def no_warnings(func):
""" Decorator to run R functions without warning. """
def run_withoutwarnings(*args, **kwargs):
warn_i = _options().do_slot('names').index('warn')
oldwarn = _options()[warn_i][0]
_options(warn=-1)
try:
res = func(*args, **kwargs)
except Exception as e:
# restore the old warn setting before propagating
# the exception up
_options(warn=oldwarn)
raise e
_options(warn=oldwarn)
return res
return run_withoutwarnings
@no_warnings
def _eval_quiet(expr):
return _reval(expr)
# FIXME: should this be part of the API for rinterface ?
# (may be it is already the case and there is code
# duplicaton ?)
[docs]
def reval(string: str,
envir: typing.Optional[rinterface.SexpEnvironment] = None):
""" Evaluate a string as R code
:param string: R code
:type string: a :class:`str`
:param envir: Optional environment to evaluate the R code.
"""
p = rinterface.parse(string)
res = _reval(p, envir=envir)
return res
[docs]
def quiet_require(name: str, lib_loc=None):
""" Load an R package /quietly/ (suppressing messages to the console). """
if lib_loc is None:
lib_loc = "NULL"
else:
lib_loc = "\"%s\"" % (lib_loc.replace('"', '\\"'))
expr_txt = ("suppressPackageStartupMessages("
"base::require(%s, lib.loc=%s))"
% (name, lib_loc))
expr = rinterface.parse(expr_txt)
ok = _eval_quiet(expr)
return ok
[docs]
class PackageData(object):
""" Datasets in an R package.
In R datasets can be distributed with a package.
Datasets can be:
- serialized R objects
- R code (that produces the dataset)
For a given R packages, datasets are stored separately from the rest
of the code and are evaluated/loaded lazily.
The lazy aspect has been conserved and the dataset are only loaded
or generated when called through the method 'fetch()'.
"""
_packagename = None
_lib_loc = None
_datasets = None
def __init__(self, packagename, lib_loc=rinterface.NULL):
self._packagename = packagename
self._lib_loc
def _init_setlist(self):
_datasets = dict()
# 2D array of information about datatsets
tmp_m = _data(**{'package': StrSexpVector((self._packagename, )),
'lib.loc': self._lib_loc})[2]
nrows, ncols = tmp_m.do_slot('dim')
c_i = 2
for r_i in range(nrows):
_datasets[tmp_m[r_i + c_i * nrows]] = None
# FIXME: check if instance methods are overriden
self._datasets = _datasets
[docs]
def names(self):
""" Names of the datasets"""
if self._datasets is None:
self._init_setlist()
return self._datasets.keys()
[docs]
def fetch(self, name):
""" Fetch the dataset (loads it or evaluates the R associated
with it.
In R, datasets are loaded into the global environment by default
but this function returns an environment that contains the dataset(s).
"""
if self._datasets is None:
self._init_setlist()
if name not in self._datasets:
raise KeyError('Data set "%s" cannot be found' % name)
env = _new_env()
_data(StrSexpVector((name, )),
**{'package': StrSexpVector((self._packagename, )),
'lib.loc': self._lib_loc,
'envir': env})
return Environment(env)
[docs]
class Package(ModuleType):
""" Models an R package
(and can do so from an arbitrary environment - with the caution
that locked environments should mostly be considered).
"""
_env = None
__rname__ = None
_translation = None
_rpy2r = None
_exported_names = None
_symbol_r2python = None
__version__: typing.Optional[str] = None
__rdata__: typing.Optional[PackageData] = None
def __init__(self, env, name, translation={},
exported_names=None, on_conflict='fail',
version=None,
symbol_r2python=default_symbol_r2python,
symbol_resolve=default_symbol_resolve):
""" Create a Python module-like object from an R environment,
using the specified translation if defined.
- env: R environment
- name: package name
- translation: `dict` with R names as keys and corresponding Python
names as values
- exported_names: `set` of names/symbols to expose to instance users
- on_conflict: 'fail' or 'warn' (default: 'fail')
- version: version string for the package
- symbol_r2python: function to convert R symbols into Python symbols.
The default translate `.` into `_`.
- symbol_resolve: function to check the Python symbols obtained
from `symbol_r2python`.
"""
super(Package, self).__init__(name)
self._env = env
self.__rname__ = name
self._translation = translation
mynames = tuple(self.__dict__)
self._rpy2r = {}
if exported_names is None:
exported_names = set(self._env.keys())
self._exported_names = exported_names
self._symbol_r2python = symbol_r2python
self._symbol_resolve = symbol_resolve
self.__fill_rpy2r__(on_conflict=on_conflict)
self._exported_names = self._exported_names.difference(mynames)
self.__version__ = version
def __update_dict__(self, on_conflict='fail'):
""" Update the __dict__ according to what is in the R environment """
for elt in self._rpy2r:
del self.__dict__[elt]
self._rpy2r.clear()
self.__fill_rpy2r__(on_conflict=on_conflict)
def __fill_rpy2r__(self, on_conflict='fail'):
""" Fill the attribute _rpy2r.
- on_conflict: 'fail' or 'warn' (default: 'fail')
"""
assert on_conflict in ('fail', 'warn')
name = self.__rname__
(symbol_mapping,
conflicts,
resolutions) = _map_symbols(
self._env,
translation=self._translation,
symbol_r2python=self._symbol_r2python,
symbol_resolve=self._symbol_resolve
)
msg_prefix = ('Conflict when converting R symbols'
' in the package "%s"'
' to Python symbols: \n-' % self.__rname__)
exception = LibraryError
_fix_map_symbols(symbol_mapping,
conflicts,
on_conflict,
msg_prefix,
exception)
symbol_mapping.update(resolutions)
reserved_pynames = set(dir(self))
cv = conversion.get_conversion()
for rpyname, rnames in symbol_mapping.items():
# last paranoid check
if len(rnames) > 1:
raise ValueError(
'Only one R name should be associated with %s '
'(and we have %s)' % (rpyname, str(rnames))
)
rname = rnames[0]
if rpyname in reserved_pynames:
raise LibraryError('The symbol ' + rname +
' in the package "' + name + '"' +
' is conflicting with' +
' a Python object attribute')
self._rpy2r[rpyname] = rname
if (rpyname != rname) and (rname in self._exported_names):
self._exported_names.remove(rname)
self._exported_names.add(rpyname)
try:
riobj = self._env[rname]
except rinterface.embedded.RRuntimeError as rre:
warn(str(rre))
rpyobj = cv.rpy2py(riobj)
if hasattr(rpyobj, '__rname__'):
rpyobj.__rname__ = rname
# TODO: shouldn't the original R name be also in the __dict__ ?
self.__dict__[rpyname] = rpyobj
def __repr__(self):
s = super(Package, self).__repr__()
return 'rpy2.robjects.packages.Package as a %s' % s
# alias
STF = SignatureTranslatedFunction
[docs]
class SignatureTranslatedPackage(Package):
""" R package in which the R functions had their signatures
'translated' (that this the named parameters were made to
to conform Python's rules for vaiable names)."""
def __fill_rpy2r__(self, on_conflict='fail'):
(super(SignatureTranslatedPackage, self)
.__fill_rpy2r__(on_conflict=on_conflict))
for name, robj in self.__dict__.items():
if isinstance(robj, rinterface.Sexp) and \
robj.typeof == rinterface.RTYPES.CLOSXP:
self.__dict__[name] = STF(
self.__dict__[name],
on_conflict=on_conflict,
symbol_r2python=self._symbol_r2python,
symbol_resolve=self._symbol_resolve
)
# alias
STP = SignatureTranslatedPackage
[docs]
class SignatureTranslatedAnonymousPackage(SignatureTranslatedPackage):
def __init__(self, string, name):
env = Environment()
reval(string, env)
super(SignatureTranslatedAnonymousPackage, self).__init__(env,
name)
# alias
STAP = SignatureTranslatedAnonymousPackage
[docs]
class InstalledSTPackage(SignatureTranslatedPackage):
@docstring_property(__doc__)
def __doc__(self):
doc = list(['Python representation of an R package.'])
if not self.__rname__:
doc.append('<No information available>')
else:
try:
doc.append(rhelp.docstring(self.__rname__,
self.__rname__ + '-package',
sections=['\\description']))
except rhelp.HelpNotFoundError:
doc.append('[R help was not found]')
return os.linesep.join(doc)
def __fill_rpy2r__(self, on_conflict='fail'):
(super(SignatureTranslatedPackage, self)
.__fill_rpy2r__(on_conflict=on_conflict))
for name, robj in self.__dict__.items():
if isinstance(robj, rinterface.Sexp) and \
robj.typeof == rinterface.RTYPES.CLOSXP:
self.__dict__[name] = DocumentedSTFunction(
self.__dict__[name],
packagename=self.__rname__
)
[docs]
class InstalledPackage(Package):
@docstring_property(__doc__)
def __doc__(self):
doc = list(['Python representation of an R package.',
'R arguments:', ''])
if not self.__rname__:
doc.append('<No information available>')
else:
try:
doc.append(rhelp.docstring(self.__rname__,
self.__rname__ + '-package',
sections=['\\description']))
except rhelp.HelpNotFoundError:
doc.append('[R help was not found]')
return os.linesep.join(doc)
[docs]
class WeakPackage(Package):
"""
'Weak' R package, with which looking for symbols results in
a warning (and a None returned) whenever the desired symbol is
not found (rather than a traditional `AttributeError`).
"""
def __getattr__(self, name):
res = self.__dict__.get(name)
if res is None:
warnings.warn(
"The symbol '%s' is not in this R namespace/package." % name
)
return res
[docs]
class LibraryError(ImportError):
""" Error occuring when importing an R library """
pass
[docs]
class PackageNotInstalledError(LibraryError):
""" Error occuring because the R package to import is not installed."""
pass
[docs]
class InstalledPackages(object):
""" R packages installed. """
def __init__(self, lib_loc=None):
libraryiqr = _library(**{'lib.loc': lib_loc})
lib_results_i = libraryiqr.do_slot('names').index('results')
self.lib_results = libraryiqr[lib_results_i]
self.nrows, self.ncols = self.lib_results.do_slot('dim')
self.colnames = self.lib_results.do_slot('dimnames')[1] # column names
self.lib_packname_i = self.colnames.index('Package')
def isinstalled(self, packagename: str):
if not isinstance(packagename, rinterface.StrSexpVector):
rinterface.StrSexpVector((packagename, ))
else:
if len(packagename) > 1:
raise ValueError("Only specify one package name at a time.")
nrows = self.nrows
lib_results, lib_packname_i = self.lib_results, self.lib_packname_i
for i in range(0+lib_packname_i*nrows,
nrows*(lib_packname_i+1),
1):
if lib_results[i] == packagename:
return True
return False
def __iter__(self):
""" Iterate through rows, yield tuples at each iteration """
lib_results = self.lib_results
nrows, ncols = self.nrows, self.ncols
colrg = range(0, ncols)
for row_i in range(nrows):
yield tuple(lib_results[x*nrows+row_i] for x in colrg)
[docs]
def isinstalled(name: str,
lib_loc=None):
"""
Find whether an R package is installed
:param name: name of an R package
:param lib_loc: specific location for the R library (default: None)
:rtype: a :class:`bool`
"""
instapack = InstalledPackages(lib_loc)
return instapack.isinstalled(name)
[docs]
def importr(name: str,
lib_loc=None,
robject_translations={},
signature_translation=True,
suppress_messages=True,
on_conflict='fail',
symbol_r2python=default_symbol_r2python,
symbol_resolve=default_symbol_resolve,
data=True):
""" Import an R package.
Arguments:
- name: name of the R package
- lib_loc: specific location for the R library (default: None)
- robject_translations: dict (default: {})
- signature_translation: (True or False)
- suppress_message: Suppress messages R usually writes on the console
(defaut: True)
- on_conflict: 'fail' or 'warn' (default: 'fail')
- symbol_r2python: function to translate R symbols into Python symbols
- symbol_resolve: function to check the Python symbol obtained
from `symbol_r2python`.
- data: embed a PackageData objects under the attribute
name __rdata__ (default: True)
Return:
- an instance of class SignatureTranslatedPackage, or of class Package
"""
if not isinstalled(name, lib_loc=lib_loc):
raise PackageNotInstalledError(
'The R package "%s" is not installed.' % name
)
if suppress_messages:
ok = quiet_require(name, lib_loc=lib_loc)
else:
ok = _require(name,
**{'lib.loc': rinterface.StrSexpVector((lib_loc, ))})[0]
if not ok:
raise LibraryError("The R package %s could not be imported" % name)
exported_names: typing.Optional[typing.Set[str]]
if _package_has_namespace(name,
_system_file(package=name)):
env = _get_namespace(name)
version = _get_namespace_version(name)[0]
exported_names = set(_get_namespace_exports(name))
else:
env = _as_env(rinterface.StrSexpVector(['package:'+name, ]))
exported_names = None
version = None
pack: typing.Union[InstalledSTPackage, InstalledPackage]
if signature_translation:
pack = InstalledSTPackage(env, name,
translation=robject_translations,
exported_names=exported_names,
on_conflict=on_conflict,
version=version,
symbol_r2python=symbol_r2python,
symbol_resolve=symbol_resolve)
else:
pack = InstalledPackage(env, name, translation=robject_translations,
exported_names=exported_names,
on_conflict=on_conflict,
version=version,
symbol_r2python=symbol_r2python,
symbol_resolve=symbol_resolve)
if data:
if pack.__rdata__ is not None:
warn('While importing the R package "%s", the rpy2 Package object '
'is masking a translated R symbol "__rdata__" already present'
% name)
pack.__rdata__ = PackageData(name, lib_loc=lib_loc)
return pack
[docs]
def data(package):
""" Return the PackageData for the given package."""
return package.__rdata__
[docs]
def wherefrom(symbol: str,
startenv: rinterface.SexpEnvironment = rinterface.globalenv):
""" For a given symbol, return the environment
this symbol is first found in, starting from 'startenv'.
"""
env = startenv
while True:
if symbol in env:
break
env = env.enclos
if env.rsame(rinterface.emptyenv):
break
return conversion.get_conversion().rpy2py(env)
[docs]
class ParsedCode(rinterface.ExprSexpVector):
pass
[docs]
class SourceCode(str):
_parsed = None
def parse(self):
if self._parsed is None:
self._parsed = ParsedCode(rinterface.parse(self))
return self._parsed
[docs]
def as_namespace(self, name):
""" Name for the namespace """
return SignatureTranslatedAnonymousPackage(self,
name)