Source code for rpy2.robjects.packages

import os
import typing
import warnings
from types import ModuleType
from warnings import warn
import rpy2.rinterface as rinterface
from . import conversion
from rpy2.robjects.functions import (SignatureTranslatedFunction,
                                     docstring_property,
                                     DocumentedSTFunction)
from rpy2.robjects import Environment
from rpy2.robjects.packages_utils import (
                                          default_symbol_r2python,
                                          default_symbol_resolve,
                                          _map_symbols,
                                          _fix_map_symbols
)
import rpy2.robjects.help as rhelp

_require = rinterface.baseenv['require']
_library = rinterface.baseenv['library']
_as_env = rinterface.baseenv['as.environment']
_package_has_namespace = rinterface.baseenv['packageHasNamespace']
_system_file = rinterface.baseenv['system.file']
_get_namespace = rinterface.baseenv['getNamespace']
_get_namespace_version = rinterface.baseenv['getNamespaceVersion']
_get_namespace_exports = rinterface.baseenv['getNamespaceExports']
_loaded_namespaces = rinterface.baseenv['loadedNamespaces']
_globalenv = rinterface.globalenv
_new_env = rinterface.baseenv["new.env"]

StrSexpVector = rinterface.StrSexpVector
# Fetching symbols in the namespace "utils" assumes that "utils" is loaded
# (currently the case by default in R).
_data = rinterface.baseenv['::'](StrSexpVector(('utils', )),
                                 StrSexpVector(('data', )))

_reval = rinterface.baseenv['eval']
_options = rinterface.baseenv['options']


[docs] def no_warnings(func): """ Decorator to run R functions without warning. """ def run_withoutwarnings(*args, **kwargs): warn_i = _options().do_slot('names').index('warn') oldwarn = _options()[warn_i][0] _options(warn=-1) try: res = func(*args, **kwargs) except Exception as e: # restore the old warn setting before propagating # the exception up _options(warn=oldwarn) raise e _options(warn=oldwarn) return res return run_withoutwarnings
@no_warnings def _eval_quiet(expr): return _reval(expr) # FIXME: should this be part of the API for rinterface ? # (may be it is already the case and there is code # duplicaton ?)
[docs] def reval(string: str, envir: typing.Optional[rinterface.SexpEnvironment] = None): """ Evaluate a string as R code :param string: R code :type string: a :class:`str` :param envir: Optional environment to evaluate the R code. """ p = rinterface.parse(string) res = _reval(p, envir=envir) return res
[docs] def quiet_require(name: str, lib_loc=None): """ Load an R package /quietly/ (suppressing messages to the console). """ if lib_loc is None: lib_loc = "NULL" else: lib_loc = "\"%s\"" % (lib_loc.replace('"', '\\"')) expr_txt = ("suppressPackageStartupMessages(" "base::require(%s, lib.loc=%s))" % (name, lib_loc)) expr = rinterface.parse(expr_txt) ok = _eval_quiet(expr) return ok
[docs] class PackageData(object): """ Datasets in an R package. In R datasets can be distributed with a package. Datasets can be: - serialized R objects - R code (that produces the dataset) For a given R packages, datasets are stored separately from the rest of the code and are evaluated/loaded lazily. The lazy aspect has been conserved and the dataset are only loaded or generated when called through the method 'fetch()'. """ _packagename = None _lib_loc = None _datasets = None def __init__(self, packagename, lib_loc=rinterface.NULL): self._packagename = packagename self._lib_loc def _init_setlist(self): _datasets = dict() # 2D array of information about datatsets tmp_m = _data(**{'package': StrSexpVector((self._packagename, )), 'lib.loc': self._lib_loc})[2] nrows, ncols = tmp_m.do_slot('dim') c_i = 2 for r_i in range(nrows): _datasets[tmp_m[r_i + c_i * nrows]] = None # FIXME: check if instance methods are overriden self._datasets = _datasets
[docs] def names(self): """ Names of the datasets""" if self._datasets is None: self._init_setlist() return self._datasets.keys()
[docs] def fetch(self, name): """ Fetch the dataset (loads it or evaluates the R associated with it. In R, datasets are loaded into the global environment by default but this function returns an environment that contains the dataset(s). """ if self._datasets is None: self._init_setlist() if name not in self._datasets: raise KeyError('Data set "%s" cannot be found' % name) env = _new_env() _data(StrSexpVector((name, )), **{'package': StrSexpVector((self._packagename, )), 'lib.loc': self._lib_loc, 'envir': env}) return Environment(env)
[docs] class Package(ModuleType): """ Models an R package (and can do so from an arbitrary environment - with the caution that locked environments should mostly be considered). """ _env = None __rname__ = None _translation = None _rpy2r = None _exported_names = None _symbol_r2python = None __version__: typing.Optional[str] = None __rdata__: typing.Optional[PackageData] = None def __init__(self, env, name, translation={}, exported_names=None, on_conflict='fail', version=None, symbol_r2python=default_symbol_r2python, symbol_resolve=default_symbol_resolve): """ Create a Python module-like object from an R environment, using the specified translation if defined. - env: R environment - name: package name - translation: `dict` with R names as keys and corresponding Python names as values - exported_names: `set` of names/symbols to expose to instance users - on_conflict: 'fail' or 'warn' (default: 'fail') - version: version string for the package - symbol_r2python: function to convert R symbols into Python symbols. The default translate `.` into `_`. - symbol_resolve: function to check the Python symbols obtained from `symbol_r2python`. """ super(Package, self).__init__(name) self._env = env self.__rname__ = name self._translation = translation mynames = tuple(self.__dict__) self._rpy2r = {} if exported_names is None: exported_names = set(self._env.keys()) self._exported_names = exported_names self._symbol_r2python = symbol_r2python self._symbol_resolve = symbol_resolve self.__fill_rpy2r__(on_conflict=on_conflict) self._exported_names = self._exported_names.difference(mynames) self.__version__ = version def __update_dict__(self, on_conflict='fail'): """ Update the __dict__ according to what is in the R environment """ for elt in self._rpy2r: del self.__dict__[elt] self._rpy2r.clear() self.__fill_rpy2r__(on_conflict=on_conflict) def __fill_rpy2r__(self, on_conflict='fail'): """ Fill the attribute _rpy2r. - on_conflict: 'fail' or 'warn' (default: 'fail') """ assert on_conflict in ('fail', 'warn') name = self.__rname__ (symbol_mapping, conflicts, resolutions) = _map_symbols( self._env, translation=self._translation, symbol_r2python=self._symbol_r2python, symbol_resolve=self._symbol_resolve ) msg_prefix = ('Conflict when converting R symbols' ' in the package "%s"' ' to Python symbols: \n-' % self.__rname__) exception = LibraryError _fix_map_symbols(symbol_mapping, conflicts, on_conflict, msg_prefix, exception) symbol_mapping.update(resolutions) reserved_pynames = set(dir(self)) cv = conversion.get_conversion() for rpyname, rnames in symbol_mapping.items(): # last paranoid check if len(rnames) > 1: raise ValueError( 'Only one R name should be associated with %s ' '(and we have %s)' % (rpyname, str(rnames)) ) rname = rnames[0] if rpyname in reserved_pynames: raise LibraryError('The symbol ' + rname + ' in the package "' + name + '"' + ' is conflicting with' + ' a Python object attribute') self._rpy2r[rpyname] = rname if (rpyname != rname) and (rname in self._exported_names): self._exported_names.remove(rname) self._exported_names.add(rpyname) try: riobj = self._env[rname] except rinterface.embedded.RRuntimeError as rre: warn(str(rre)) rpyobj = cv.rpy2py(riobj) if hasattr(rpyobj, '__rname__'): rpyobj.__rname__ = rname # TODO: shouldn't the original R name be also in the __dict__ ? self.__dict__[rpyname] = rpyobj def __repr__(self): s = super(Package, self).__repr__() return 'rpy2.robjects.packages.Package as a %s' % s
# alias STF = SignatureTranslatedFunction
[docs] class SignatureTranslatedPackage(Package): """ R package in which the R functions had their signatures 'translated' (that this the named parameters were made to to conform Python's rules for vaiable names).""" def __fill_rpy2r__(self, on_conflict='fail'): (super(SignatureTranslatedPackage, self) .__fill_rpy2r__(on_conflict=on_conflict)) for name, robj in self.__dict__.items(): if isinstance(robj, rinterface.Sexp) and \ robj.typeof == rinterface.RTYPES.CLOSXP: self.__dict__[name] = STF( self.__dict__[name], on_conflict=on_conflict, symbol_r2python=self._symbol_r2python, symbol_resolve=self._symbol_resolve )
# alias STP = SignatureTranslatedPackage
[docs] class SignatureTranslatedAnonymousPackage(SignatureTranslatedPackage): def __init__(self, string, name): env = Environment() reval(string, env) super(SignatureTranslatedAnonymousPackage, self).__init__(env, name)
# alias STAP = SignatureTranslatedAnonymousPackage
[docs] class InstalledSTPackage(SignatureTranslatedPackage): @docstring_property(__doc__) def __doc__(self): doc = list(['Python representation of an R package.']) if not self.__rname__: doc.append('<No information available>') else: try: doc.append(rhelp.docstring(self.__rname__, self.__rname__ + '-package', sections=['\\description'])) except rhelp.HelpNotFoundError: doc.append('[R help was not found]') return os.linesep.join(doc) def __fill_rpy2r__(self, on_conflict='fail'): (super(SignatureTranslatedPackage, self) .__fill_rpy2r__(on_conflict=on_conflict)) for name, robj in self.__dict__.items(): if isinstance(robj, rinterface.Sexp) and \ robj.typeof == rinterface.RTYPES.CLOSXP: self.__dict__[name] = DocumentedSTFunction( self.__dict__[name], packagename=self.__rname__ )
[docs] class InstalledPackage(Package): @docstring_property(__doc__) def __doc__(self): doc = list(['Python representation of an R package.', 'R arguments:', '']) if not self.__rname__: doc.append('<No information available>') else: try: doc.append(rhelp.docstring(self.__rname__, self.__rname__ + '-package', sections=['\\description'])) except rhelp.HelpNotFoundError: doc.append('[R help was not found]') return os.linesep.join(doc)
[docs] class WeakPackage(Package): """ 'Weak' R package, with which looking for symbols results in a warning (and a None returned) whenever the desired symbol is not found (rather than a traditional `AttributeError`). """ def __getattr__(self, name): res = self.__dict__.get(name) if res is None: warnings.warn( "The symbol '%s' is not in this R namespace/package." % name ) return res
[docs] class LibraryError(ImportError): """ Error occuring when importing an R library """ pass
[docs] class PackageNotInstalledError(LibraryError): """ Error occuring because the R package to import is not installed.""" pass
[docs] class InstalledPackages(object): """ R packages installed. """ def __init__(self, lib_loc=None): libraryiqr = _library(**{'lib.loc': lib_loc}) lib_results_i = libraryiqr.do_slot('names').index('results') self.lib_results = libraryiqr[lib_results_i] self.nrows, self.ncols = self.lib_results.do_slot('dim') self.colnames = self.lib_results.do_slot('dimnames')[1] # column names self.lib_packname_i = self.colnames.index('Package') def isinstalled(self, packagename: str): if not isinstance(packagename, rinterface.StrSexpVector): rinterface.StrSexpVector((packagename, )) else: if len(packagename) > 1: raise ValueError("Only specify one package name at a time.") nrows = self.nrows lib_results, lib_packname_i = self.lib_results, self.lib_packname_i for i in range(0+lib_packname_i*nrows, nrows*(lib_packname_i+1), 1): if lib_results[i] == packagename: return True return False def __iter__(self): """ Iterate through rows, yield tuples at each iteration """ lib_results = self.lib_results nrows, ncols = self.nrows, self.ncols colrg = range(0, ncols) for row_i in range(nrows): yield tuple(lib_results[x*nrows+row_i] for x in colrg)
[docs] def isinstalled(name: str, lib_loc=None): """ Find whether an R package is installed :param name: name of an R package :param lib_loc: specific location for the R library (default: None) :rtype: a :class:`bool` """ instapack = InstalledPackages(lib_loc) return instapack.isinstalled(name)
[docs] def importr(name: str, lib_loc=None, robject_translations={}, signature_translation=True, suppress_messages=True, on_conflict='fail', symbol_r2python=default_symbol_r2python, symbol_resolve=default_symbol_resolve, data=True): """ Import an R package. Arguments: - name: name of the R package - lib_loc: specific location for the R library (default: None) - robject_translations: dict (default: {}) - signature_translation: (True or False) - suppress_message: Suppress messages R usually writes on the console (defaut: True) - on_conflict: 'fail' or 'warn' (default: 'fail') - symbol_r2python: function to translate R symbols into Python symbols - symbol_resolve: function to check the Python symbol obtained from `symbol_r2python`. - data: embed a PackageData objects under the attribute name __rdata__ (default: True) Return: - an instance of class SignatureTranslatedPackage, or of class Package """ if not isinstalled(name, lib_loc=lib_loc): raise PackageNotInstalledError( 'The R package "%s" is not installed.' % name ) if suppress_messages: ok = quiet_require(name, lib_loc=lib_loc) else: ok = _require(name, **{'lib.loc': rinterface.StrSexpVector((lib_loc, ))})[0] if not ok: raise LibraryError("The R package %s could not be imported" % name) exported_names: typing.Optional[typing.Set[str]] if _package_has_namespace(name, _system_file(package=name)): env = _get_namespace(name) version = _get_namespace_version(name)[0] exported_names = set(_get_namespace_exports(name)) else: env = _as_env(rinterface.StrSexpVector(['package:'+name, ])) exported_names = None version = None pack: typing.Union[InstalledSTPackage, InstalledPackage] if signature_translation: pack = InstalledSTPackage(env, name, translation=robject_translations, exported_names=exported_names, on_conflict=on_conflict, version=version, symbol_r2python=symbol_r2python, symbol_resolve=symbol_resolve) else: pack = InstalledPackage(env, name, translation=robject_translations, exported_names=exported_names, on_conflict=on_conflict, version=version, symbol_r2python=symbol_r2python, symbol_resolve=symbol_resolve) if data: if pack.__rdata__ is not None: warn('While importing the R package "%s", the rpy2 Package object ' 'is masking a translated R symbol "__rdata__" already present' % name) pack.__rdata__ = PackageData(name, lib_loc=lib_loc) return pack
[docs] def data(package): """ Return the PackageData for the given package.""" return package.__rdata__
[docs] def wherefrom(symbol: str, startenv: rinterface.SexpEnvironment = rinterface.globalenv): """ For a given symbol, return the environment this symbol is first found in, starting from 'startenv'. """ env = startenv while True: if symbol in env: break env = env.enclos if env.rsame(rinterface.emptyenv): break return conversion.get_conversion().rpy2py(env)
[docs] class ParsedCode(rinterface.ExprSexpVector): pass
[docs] class SourceCode(str): _parsed = None def parse(self): if self._parsed is None: self._parsed = ParsedCode(rinterface.parse(self)) return self._parsed
[docs] def as_namespace(self, name): """ Name for the namespace """ return SignatureTranslatedAnonymousPackage(self, name)