Source code for jsonextended.plugins

#!/usr/bin/env python


from jsonextended import encoders, parsers
import glob
import imp
import inspect
import os
import uuid
import warnings
from fnmatch import fnmatch
from contextlib import contextmanager

# py 2/3 compatibility
try:
    basestring
except NameError:
    basestring = str
try:
    from StringIO import StringIO
except ImportError:
    from io import StringIO  # noqa: F401
try:
    from importlib.machinery import SourceFileLoader
    from types import ModuleType

    def load_source(modname, fname):
        loader = SourceFileLoader(modname, fname)
        mod = ModuleType(loader.name)
        loader.exec_module(mod)
        return mod

except ImportError:
[docs] def load_source(modname, fname): return imp.load_source(modname, fname)
from jsonextended.utils import get_module_path # list of plugin categories, # and their minimal class attribute interface # must include plugin_name, plugin_descript _plugins_interface = { 'encoders': ['plugin_name', 'plugin_descript', 'objclass'], 'decoders': ['plugin_name', 'plugin_descript', 'dict_signature'], 'parsers': ['plugin_name', 'plugin_descript', 'file_regex', 'read_file']} # builtin plugin locations _plugins_builtin = {'encoders': get_module_path(encoders), 'decoders': get_module_path(encoders), 'parsers': get_module_path(parsers)} # the internal plugin store _all_plugins = {name: {} for name in _plugins_interface}
[docs]def view_interfaces(category=None): """ return a view of the plugin minimal class attribute interface(s) Parameters ---------- category : None or str if str, apply for single plugin category Examples -------- >>> from pprint import pprint >>> pprint(view_interfaces()) {'decoders': ['plugin_name', 'plugin_descript', 'dict_signature'], 'encoders': ['plugin_name', 'plugin_descript', 'objclass'], 'parsers': ['plugin_name', 'plugin_descript', 'file_regex', 'read_file']} """ if category is not None: return sorted(_plugins_interface[category][:]) else: return {k: v[:] for k, v in _plugins_interface.items()}
[docs]def view_plugins(category=None): """ return a view of the loaded plugin names and descriptions Parameters ---------- category : None or str if str, apply for single plugin category Examples -------- >>> from pprint import pprint >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} >>> class DecoderPlugin(object): ... plugin_name = 'example' ... plugin_descript = 'a decoder for dicts containing _example_ key' ... dict_signature = ('_example_',) ... >>> errors = load_plugin_classes([DecoderPlugin]) >>> pprint(view_plugins()) {'decoders': {'example': 'a decoder for dicts containing _example_ key'}, 'encoders': {}, 'parsers': {}} >>> view_plugins('decoders') {'example': 'a decoder for dicts containing _example_ key'} >>> unload_all_plugins() """ if category is not None: if category == 'parsers': return { name: {"descript": klass.plugin_descript, "regex": klass.file_regex} for name, klass in _all_plugins[category].items() } return { name: klass.plugin_descript for name, klass in _all_plugins[category].items() } else: return {cat: {name: klass.plugin_descript for name, klass in plugins.items()} for cat, plugins in _all_plugins.items()}
[docs]def get_plugins(category): """ get plugins for category """ return _all_plugins[category]
[docs]def unload_all_plugins(category=None): """ clear all plugins Parameters ---------- category : None or str if str, apply for single plugin category Examples -------- >>> from pprint import pprint >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} >>> class DecoderPlugin(object): ... plugin_name = 'example' ... plugin_descript = 'a decoder for dicts containing _example_ key' ... dict_signature = ('_example_',) ... >>> errors = load_plugin_classes([DecoderPlugin]) >>> pprint(view_plugins()) {'decoders': {'example': 'a decoder for dicts containing _example_ key'}, 'encoders': {}, 'parsers': {}} >>> unload_all_plugins() >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} """ if category is None: for cat in _all_plugins: _all_plugins[cat] = {} else: _all_plugins[category] = {}
[docs]def unload_plugin(name, category=None): """ remove single plugin Parameters ---------- name : str plugin name category : str plugin category Examples -------- >>> from pprint import pprint >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} >>> class DecoderPlugin(object): ... plugin_name = 'example' ... plugin_descript = 'a decoder for dicts containing _example_ key' ... dict_signature = ('_example_',) ... >>> errors = load_plugin_classes([DecoderPlugin],category='decoders') >>> pprint(view_plugins()) {'decoders': {'example': 'a decoder for dicts containing _example_ key'}, 'encoders': {}, 'parsers': {}} >>> unload_plugin('example','decoders') >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} """ if category is not None: _all_plugins[category].pop(name) else: for cat in _all_plugins: if name in _all_plugins[cat]: _all_plugins[cat].pop(name)
[docs]def load_plugin_classes(classes, category=None, overwrite=False): """ load plugins from class objects Parameters ---------- classes: list list of classes category : None or str if str, apply for single plugin category overwrite : bool if True, allow existing plugins to be overwritten Examples -------- >>> from pprint import pprint >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} >>> class DecoderPlugin(object): ... plugin_name = 'example' ... plugin_descript = 'a decoder for dicts containing _example_ key' ... dict_signature = ('_example_',) ... >>> errors = load_plugin_classes([DecoderPlugin]) >>> pprint(view_plugins()) {'decoders': {'example': 'a decoder for dicts containing _example_ key'}, 'encoders': {}, 'parsers': {}} >>> unload_all_plugins() """ load_errors = [] for klass in classes: for pcat, pinterface in _plugins_interface.items(): if category is not None and not pcat == category: continue if all([hasattr(klass, attr) for attr in pinterface]): if klass.plugin_name in _all_plugins[pcat] and not overwrite: err = '{0} is already set for {1}'.format( klass.plugin_name, pcat) load_errors.append((klass.__name__, '{}'.format(err))) continue _all_plugins[pcat][klass.plugin_name] = klass() else: load_errors.append(( klass.__name__, 'does not match {} interface: {}'.format(pcat, pinterface) )) return load_errors
[docs]@contextmanager def plugins_context(classes, category=None): """ context manager to load plugin class(es) then unload on exit Parameters ---------- classes: list list of classes category : None or str if str, apply for single plugin category Examples -------- >>> from pprint import pprint >>> class DecoderPlugin(object): ... plugin_name = 'example' ... plugin_descript = 'a decoder for dicts containing _example_ key' ... dict_signature = ('_example_',) ... >>> with plugins_context([DecoderPlugin]): ... pprint(view_plugins()) {'decoders': {'example': 'a decoder for dicts containing _example_ key'}, 'encoders': {}, 'parsers': {}} >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} """ original = {cat: list(_all_plugins[cat].keys()) for cat in _all_plugins} load_plugin_classes(classes, category, overwrite=True) # if errors: # for cat in _all_plugins: # for name, kls in list(_all_plugins[cat].items()): # if name not in original[cat]: # _all_plugins[cat].pop(name) # raise RuntimeError( # "errors occurred while loading plugins: {}".format(errors)) yield for cat in _all_plugins: for name, kls in list(_all_plugins[cat].items()): if name not in original[cat]: _all_plugins[cat].pop(name)
[docs]def load_plugins_dir(path, category=None, overwrite=False): """ load plugins from a directory Parameters ---------- path : str or path_like category : None or str if str, apply for single plugin category overwrite : bool if True, allow existing plugins to be overwritten """ # get potential plugin python files if hasattr(path, 'glob'): pypaths = path.glob('*.py') else: pypaths = glob.glob(os.path.join(path, '*.py')) load_errors = [] for pypath in pypaths: # use uuid to ensure no conflicts in name space mod_name = str(uuid.uuid4()) try: if hasattr(pypath, 'resolve'): # Make the path absolute, resolving any symlinks pypath = pypath.resolve() with warnings.catch_warnings(record=True): warnings.filterwarnings("ignore", category=ImportWarning) # for MockPaths if hasattr(pypath, 'maketemp'): with pypath.maketemp() as f: module = load_source(mod_name, f.name) else: module = load_source(mod_name, str(pypath)) except Exception as err: load_errors.append((str(pypath), 'Load Error: {}'.format(err))) continue # only get classes that are local to the module class_members = inspect.getmembers(module, inspect.isclass) classes = [klass for klass_name, klass in class_members if klass.__module__ == mod_name] load_errors += load_plugin_classes(classes, category, overwrite) return load_errors
[docs]def load_builtin_plugins(category=None, overwrite=False): """load plugins from builtin directories Parameters ---------- name: None or str category : None or str if str, apply for single plugin category Examples -------- >>> from pprint import pprint >>> pprint(view_plugins()) {'decoders': {}, 'encoders': {}, 'parsers': {}} >>> errors = load_builtin_plugins() >>> errors [] >>> pprint(view_plugins(),width=200) {'decoders': {'decimal.Decimal': 'encode/decode Decimal type', 'fractions.Fraction': 'encode/decode Fraction type', 'numpy.ndarray': 'encode/decode numpy.ndarray', 'pint.Quantity': 'encode/decode pint.Quantity object', 'python.set': 'decode/encode python set'}, 'encoders': {'decimal.Decimal': 'encode/decode Decimal type', 'fractions.Fraction': 'encode/decode Fraction type', 'numpy.ndarray': 'encode/decode numpy.ndarray', 'pint.Quantity': 'encode/decode pint.Quantity object', 'python.set': 'decode/encode python set'}, 'parsers': {'csv.basic': 'read *.csv delimited file with headers to {header:[column_values]}', 'csv.literal': 'read *.literal.csv delimited files with headers to {header:column_values}, with number strings converted to int/float', 'hdf5.read': 'read *.hdf5 (in read mode) files using h5py', 'ipynb': 'read Jupyter Notebooks', 'json.basic': 'read *.json files using json.load', 'keypair': "read *.keypair, where each line should be; '<key> <pair>'", 'yaml.ruamel': 'read *.yaml files using ruamel.yaml'}} >>> unload_all_plugins() """ # noqa: E501 load_errors = [] for cat, path in _plugins_builtin.items(): if cat != category and category is not None: continue load_errors += load_plugins_dir(path, cat, overwrite=overwrite) return load_errors
[docs]def encode(obj, outtype='json', raise_error=False): """ encode objects, via encoder plugins, to new types Parameters ---------- outtype: str use encoder method to_<outtype> to encode raise_error : bool if True, raise ValueError if no suitable plugin found Examples -------- >>> load_builtin_plugins('encoders') [] >>> from decimal import Decimal >>> encode(Decimal('1.3425345')) {'_python_Decimal_': '1.3425345'} >>> encode(Decimal('1.3425345'),outtype='str') '1.3425345' >>> encode(set([1,2,3,4,4])) {'_python_set_': [1, 2, 3, 4]} >>> encode(set([1,2,3,4,4]),outtype='str') '{1, 2, 3, 4}' >>> unload_all_plugins() """ for encoder in get_plugins('encoders').values(): if (isinstance(obj, encoder.objclass) and hasattr(encoder, 'to_{}'.format(outtype))): return getattr(encoder, 'to_{}'.format(outtype))(obj) break if raise_error: raise ValueError( "No JSON serializer is available for" "{0} (of type {1})".format(obj, type(obj))) else: return obj
[docs]def decode(dct, intype='json', raise_error=False): """ decode dict objects, via decoder plugins, to new type Parameters ---------- intype: str use decoder method from_<intype> to encode raise_error : bool if True, raise ValueError if no suitable plugin found Examples -------- >>> load_builtin_plugins('decoders') [] >>> from decimal import Decimal >>> decode({'_python_Decimal_':'1.3425345'}) Decimal('1.3425345') >>> unload_all_plugins() """ for decoder in get_plugins('decoders').values(): if (set(list(decoder.dict_signature)).issubset(dct.keys()) and hasattr(decoder, 'from_{}'.format(intype)) and getattr(decoder, 'allow_other_keys', False)): return getattr(decoder, 'from_{}'.format(intype))(dct) break elif (sorted(list(decoder.dict_signature)) == sorted(dct.keys()) and hasattr(decoder, 'from_{}'.format(intype))): return getattr(decoder, 'from_{}'.format(intype))(dct) break if raise_error: raise ValueError('no suitable plugin found for: {}'.format(dct)) else: return dct
[docs]def parser_available(fpath): """ test if parser plugin available for fpath Examples -------- >>> load_builtin_plugins('parsers') [] >>> test_file = StringIO('{"a":[1,2,3.4]}') >>> test_file.name = 'test.json' >>> parser_available(test_file) True >>> test_file.name = 'test.other' >>> parser_available(test_file) False >>> unload_all_plugins() """ if isinstance(fpath, basestring): fname = fpath elif hasattr(fpath, 'open') and hasattr(fpath, 'name'): fname = fpath.name elif hasattr(fpath, 'readline') and hasattr(fpath, 'name'): fname = fpath.name else: raise ValueError( 'fpath should be a str or file_like object: {}'.format(fpath)) for parser in get_plugins('parsers').values(): if fnmatch(fname, parser.file_regex): return True return False
[docs]def parse(fpath, **kwargs): """ parse file contents, via parser plugins, to dict like object NB: the longest file regex will be used from plugins Parameters ---------- fpath : file_like string, object with 'open' and 'name' attributes, or object with 'readline' and 'name' attributes kwargs : to pass to parser plugin Examples -------- >>> load_builtin_plugins('parsers') [] >>> from pprint import pformat >>> json_file = StringIO('{"a":[1,2,3.4]}') >>> json_file.name = 'test.json' >>> dct = parse(json_file) >>> print(pformat(dct).replace("u'","'")) {'a': [1, 2, 3.4]} >>> reset = json_file.seek(0) >>> from decimal import Decimal >>> dct = parse(json_file, parse_float=Decimal,other=1) >>> print(pformat(dct).replace("u'","'")) {'a': [1, 2, Decimal('3.4')]} >>> class NewParser(object): ... plugin_name = 'example' ... plugin_descript = 'loads test.json files' ... file_regex = 'test.json' ... def read_file(self, file_obj, **kwargs): ... return {'example':1} >>> load_plugin_classes([NewParser],'parsers') [] >>> reset = json_file.seek(0) >>> parse(json_file) {'example': 1} >>> unload_all_plugins() """ if isinstance(fpath, basestring): fname = fpath elif hasattr(fpath, 'open') and hasattr(fpath, 'name'): fname = fpath.name elif hasattr(fpath, 'readline') and hasattr(fpath, 'name'): fname = fpath.name else: raise ValueError( 'fpath should be a str or file_like object: {}'.format(fpath)) parser_dict = { plugin.file_regex: plugin for plugin in get_plugins('parsers').values()} # find longest match first for regex in sorted(parser_dict.keys(), key=len, reverse=True): parser = parser_dict[regex] if fnmatch(fname, regex): if isinstance(fpath, basestring): with open(fpath, 'r') as file_obj: data = parser.read_file(file_obj, **kwargs) elif hasattr(fpath, 'open'): with fpath.open('r') as file_obj: data = parser.read_file(file_obj, **kwargs) elif hasattr(fpath, 'readline'): data = parser.read_file(fpath, **kwargs) return data raise ValueError('{} does not match any regex'.format(fname))