Source code for jsonextended.edict

#!/usr/bin/env python
# -*- coding: utf-8 -*-

""" a module to manipulate python dictionary like objects

"""
# internal packages
import copy
import json
import re
import logging
import sys
import textwrap
import uuid
from fnmatch import fnmatch
from functools import reduce, total_ordering
import warnings
warnings.simplefilter('once', ImportWarning)
logger = logging.getLogger(__name__)

# python 3 to 2 compatibility
try:
    basestring
except NameError:
    basestring = str
try:
    unicode
except NameError:
    unicode = str
try:
    import pathlib
except ImportError:
    import pathlib2 as pathlib
try:
    from urllib2 import urlopen
except ImportError:
    from urllib.request import urlopen

# local imports
from jsonextended.utils import natural_sort, colortxt  # noqa: E402
from jsonextended.plugins import (
    encode, decode, parse, parser_available)  # noqa: E402


[docs]def is_iter_non_string(obj): """test if object is a list or tuple""" if isinstance(obj, list) or isinstance(obj, tuple): return True return False
# TODO this breaks everything # if hasattr(obj, '__iter__') and not isinstance(obj, basestring): # return True # else: # return False
[docs]def is_dict_like(obj, attr=('keys', 'items')): """test if object is dict like""" for a in attr: if not hasattr(obj, a): return False return True
[docs]def is_list_of_dict_like(obj, attr=('keys', 'items')): """test if object is a list only containing dict like items """ try: if len(obj) == 0: return False return all([is_dict_like(i, attr) for i in obj]) except Exception: return False
[docs]def is_path_like(obj, attr=('name', 'is_file', 'is_dir', 'iterdir')): """test if object is pathlib.Path like""" for a in attr: if not hasattr(obj, a): return False return True
[docs]def convert_type(d, intype, outtype, convert_list=True, in_place=True): """ convert all values of one type to another Parameters ---------- d : dict intype : type_class outtype : type_class convert_list : bool whether to convert instances inside lists and tuples in_place : bool if True, applies conversions to original dict, else returns copy Examples -------- >>> from pprint import pprint >>> d = {'a':'1','b':'2'} >>> pprint(convert_type(d,str,float)) {'a': 1.0, 'b': 2.0} >>> d = {'a':['1','2']} >>> pprint(convert_type(d,str,float)) {'a': [1.0, 2.0]} >>> d = {'a':[('1','2'),[3,4]]} >>> pprint(convert_type(d,str,float)) {'a': [(1.0, 2.0), [3, 4]]} """ if not in_place: out_dict = copy.deepcopy(d) else: out_dict = d def _convert(obj): if isinstance(obj, intype): try: obj = outtype(obj) except Exception: pass elif isinstance(obj, list) and convert_list: obj = _traverse_iter(obj) elif isinstance(obj, tuple) and convert_list: obj = tuple(_traverse_iter(obj)) return obj def _traverse_dict(dic): for key in dic.keys(): if is_dict_like(dic[key]): _traverse_dict(dic[key]) else: dic[key] = _convert(dic[key]) def _traverse_iter(iter): new_iter = [] for key in iter: if is_dict_like(key): _traverse_dict(key) new_iter.append(key) else: new_iter.append(_convert(key)) return new_iter if is_dict_like(out_dict): _traverse_dict(out_dict) else: _convert(out_dict) return out_dict
def _default_print_func(s): if hasattr(s, 'rstrip'): print(s.rstrip()) else: print(s) def _strip_ansi(source): """ Remove ANSI escape codes from text. Parameters ---------- source : str Source to remove the ANSI from """ ansi_re = re.compile('\x1b\\[(.*?)([@-~])') return ansi_re.sub('', source)
[docs]def pprint(d, lvlindent=2, initindent=0, delim=':', max_width=80, depth=3, no_values=False, align_vals=True, print_func=None, keycolor=None, compress_lists=None, round_floats=None, _dlist=False): """ print a nested dict in readable format (- denotes an element in a list of dictionaries) Parameters ---------- d : object lvlindent : int additional indentation spaces for each level initindent : int initial indentation spaces delim : str delimiter between key and value nodes max_width : int max character width of each line depth : int or None maximum levels to display no_values : bool whether to print values align_vals : bool whether to align values for each level print_func : callable or None function to print strings (print if None) keycolor : None or str if str, color keys by this color, allowed: red, green, yellow, blue, magenta, cyan, white compress_lists : int compress lists/tuples longer than this, e.g. [1,1,1,1,1,1] -> [1, 1,..., 1] round_floats : int significant figures for floats Examples -------- >>> d = {'a':{'b':{'c':'Å','de':[4,5,[7,'x'],9]}}} >>> pprint(d,depth=None) a: b: c: Å de: [4, 5, [7, x], 9] >>> pprint(d,max_width=17,depth=None) a: b: c: Å de: [4, 5, [7, x], 9] >>> pprint(d,no_values=True,depth=None) a: b: c: de: >>> pprint(d,depth=2) a: b: {...} >>> pprint({'a':[1,1,1,1,1,1,1,1]}, ... compress_lists=3) a: [1, 1, 1, ...(x5)] """ if print_func is None: print_func = _default_print_func if not is_dict_like(d): d = {'': d} # print_func('{}'.format(d)) # return extra = lvlindent if _dlist else 0 def decode_to_str(obj): val_string = obj if isinstance(obj, list): if compress_lists is not None: if len(obj) > compress_lists: diff = str(len(obj) - compress_lists) obj = obj[:compress_lists] + ['...(x{})'.format(diff)] val_string = '[' + ', '.join([decode_to_str(o) for o in obj]) + ']' elif isinstance(obj, tuple): if compress_lists is not None: if len(obj) > compress_lists: diff = str(len(obj) - compress_lists) obj = list( obj[:compress_lists]) + ['...(x{})'.format(diff)] val_string = '(' + ', '.join([decode_to_str(o) for o in obj]) + ')' elif isinstance(obj, float) and round_floats is not None: round_str = '{0:.' + str(round_floats - 1) + 'E}' val_string = str(float(round_str.format(obj))) else: try: val_string = encode(obj, outtype='str') except (TypeError, UnicodeError): pass # convert unicode to str (so no u'' prefix in python 2) try: return str(val_string) except Exception: return unicode(val_string) if align_vals: key_width = 0 for key, val in d.items(): if not is_dict_like(val): key_str = decode_to_str(key) key_width = max(key_width, len(key_str)) max_depth = depth for i, key in enumerate(natural_sort(d.keys())): value = d[key] if _dlist and i == 0: key_str = '- ' + decode_to_str(key) elif _dlist: key_str = ' ' + decode_to_str(key) else: key_str = decode_to_str(key) if keycolor is not None: key_str = colortxt(key_str, keycolor) if align_vals: key_str = '{0: <{1}} '.format( key_str + delim, key_width + len(delim)) else: key_str = '{0}{1} '.format(key_str, delim) depth = max_depth if max_depth is not None else 2 if keycolor is not None: key_length = len(_strip_ansi(key_str)) else: key_length = len(key_str) key_line = ' ' * initindent + key_str new_line = ' ' * initindent + ' ' * key_length if depth <= 0: continue if is_dict_like(value): if depth <= 1: print_func(' ' * initindent + key_str + '{...}') else: print_func(' ' * initindent + key_str) pprint(value, lvlindent, initindent + lvlindent + extra, delim, max_width, depth=max_depth - 1 if max_depth is not None else None, no_values=no_values, align_vals=align_vals, print_func=print_func, keycolor=keycolor, compress_lists=compress_lists, round_floats=round_floats) continue if isinstance(value, list): if all([is_dict_like(o) for o in value]) and value: if depth <= 1: print_func(key_line + '[...]') continue print_func(key_line) for obj in value: pprint( obj, lvlindent, initindent + lvlindent + extra, delim, max_width, depth=max_depth - 1 if max_depth is not None else None, no_values=no_values, align_vals=align_vals, print_func=print_func, keycolor=keycolor, compress_lists=compress_lists, round_floats=round_floats, _dlist=True) continue val_string_all = decode_to_str(value) if not no_values else '' for i, val_string in enumerate(val_string_all.split('\n')): if max_width is not None: if len(key_line) + 1 > max_width: raise Exception( 'cannot fit keys and data within set max_width') # divide into chuncks and join by same indentation val_indent = ' ' * (initindent + key_length) n = max_width - len(val_indent) val_string = val_indent.join( [s + '\n' for s in textwrap.wrap(val_string, n)])[:-1] if i == 0: print_func(key_line + val_string) else: print_func(new_line + val_string)
[docs]def extract(d, path=None): """ extract section of dictionary Parameters ---------- d : dict path : list[str] keys to section Returns ------- new_dict : dict original, without extracted section extract_dict : dict extracted section Examples -------- >>> from pprint import pprint >>> d = {1:{"a":"A"},2:{"b":"B",'c':'C'}} >>> pprint(extract(d,[2,'b'])) ({1: {'a': 'A'}, 2: {'c': 'C'}}, {'b': 'B'}) """ path = [] if path is None else path d_new = copy.deepcopy(d) d_sub = d_new for key in path[:-1]: d_sub = d_sub[key] key = path[-1] d_extract = {key: d_sub[key]} d_sub.pop(key) return d_new, d_extract
[docs]def indexes(dic, keys=None): """ index dictionary by multiple keys Parameters ---------- dic : dict keys : list Examples -------- >>> d = {1:{"a":"A"},2:{"b":"B"}} >>> indexes(d,[1,'a']) 'A' """ keys = [] if keys is None else keys assert hasattr(dic, 'keys') new = dic.copy() old_key = None for key in keys: if not hasattr(new, 'keys'): raise KeyError('No indexes after: {}'.format(old_key)) old_key = key new = new[key] return new
[docs]def flatten(d, key_as_tuple=True, sep='.', list_of_dicts=None, all_iters=None): """ get nested dict as flat {key:val,...}, where key is tuple/string of all nested keys Parameters ---------- d : object key_as_tuple : bool whether keys are list of nested keys or delimited string of nested keys sep : str if key_as_tuple=False, delimiter for keys list_of_dicts: str or None if not None, flatten lists of dicts using this prefix all_iters: str or None if not None, flatten all lists and tuples using this prefix Examples -------- >>> from pprint import pprint >>> d = {1:{"a":"A"}, 2:{"b":"B"}} >>> pprint(flatten(d)) {(1, 'a'): 'A', (2, 'b'): 'B'} >>> d = {1:{"a":"A"},2:{"b":"B"}} >>> pprint(flatten(d,key_as_tuple=False)) {'1.a': 'A', '2.b': 'B'} >>> d = [{'a':1},{'b':[1, 2]}] >>> pprint(flatten(d,list_of_dicts='__list__')) {('__list__0', 'a'): 1, ('__list__1', 'b'): [1, 2]} >>> d = [{'a':1},{'b':[1, 2]}] >>> pprint(flatten(d,all_iters='__iter__')) {('__iter__0', 'a'): 1, ('__iter__1', 'b', '__iter__0'): 1, ('__iter__1', 'b', '__iter__1'): 2} """ def expand(key, value): if is_dict_like(value): flatten_dict = flatten(value, key_as_tuple, sep, list_of_dicts, all_iters) if key_as_tuple: return [(key + k, v) for k, v in flatten_dict.items()] else: return [(str(key) + sep + k, v) for k, v in flatten_dict.items()] elif is_iter_non_string(value) and all_iters is not None: value = {'{0}{1}'.format(all_iters, i): v for i, v in enumerate(value)} flatten_dict = flatten(value, key_as_tuple, sep, list_of_dicts, all_iters) if key_as_tuple: return [(key + k, v) for k, v in flatten_dict.items()] else: return [(str(key) + sep + k, v) for k, v in flatten_dict.items()] elif is_list_of_dict_like(value) and list_of_dicts is not None: value = {'{0}{1}'.format(list_of_dicts, i): v for i, v in enumerate(value)} flatten_dict = flatten(value, key_as_tuple, sep, list_of_dicts, all_iters) if key_as_tuple: return [(key + k, v) for k, v in flatten_dict.items()] else: return [(str(key) + sep + k, v) for k, v in flatten_dict.items()] else: return [(key, value)] if is_iter_non_string(d) and all_iters is not None: d = {'{0}{1}'.format(all_iters, i): v for i, v in enumerate(d)} elif is_list_of_dict_like(d) and list_of_dicts is not None: d = {'{0}{1}'.format(list_of_dicts, i): v for i, v in enumerate(d)} elif not is_dict_like(d): raise TypeError('d is not dict like: {}'.format(d)) if key_as_tuple: items = [item for k, v in d.items() for item in expand((k,), v)] else: items = [item for k, v in d.items() for item in expand(k, v)] return dict(items)
def _startswith(k, prefix): if not hasattr(k, 'startswith'): return False else: return k.startswith(prefix) def _recreate_lists(d, prefix): if not is_dict_like(d): return d if all([_startswith(k, prefix) for k in d.keys()]): sorted_keys = sorted(list(d.keys()), key=lambda x: int(x.replace(prefix, ''))) return [_recreate_lists(d[k], prefix) if is_dict_like(d[k]) else d[k] for k in sorted_keys] return {k: _recreate_lists(v, prefix) for k, v in d.items()}
[docs]def unflatten(d, key_as_tuple=True, delim='.', list_of_dicts=None, deepcopy=True): r""" unflatten dictionary with keys as tuples or delimited strings Parameters ---------- d : dict key_as_tuple : bool if true, keys are tuples, else, keys are delimited strings delim : str if keys are strings, then split by delim list_of_dicts: str or None if key starts with this treat as a list Examples -------- >>> from pprint import pprint >>> d = {('a','b'):1,('a','c'):2} >>> pprint(unflatten(d)) {'a': {'b': 1, 'c': 2}} >>> d2 = {'a.b':1,'a.c':2} >>> pprint(unflatten(d2,key_as_tuple=False)) {'a': {'b': 1, 'c': 2}} >>> d3 = {('a','__list__1', 'a'): 1, ('a','__list__0', 'b'): 2} >>> pprint(unflatten(d3,list_of_dicts='__list__')) {'a': [{'b': 2}, {'a': 1}]} >>> unflatten({('a','b','c'):1,('a','b'):2}) Traceback (most recent call last): ... KeyError: "child conflict for path: ('a', 'b'); 2 and {'c': 1}" """ if not d: return d if deepcopy: try: d = copy.deepcopy(d) except Exception: warnings.warn( 'error in deepcopy, so using references to input dict') if key_as_tuple: result = d.pop(()) if () in d else {} else: result = d.pop('') if '' in d else {} for key, value in d.items(): if not isinstance(key, tuple) and key_as_tuple: raise ValueError( 'key not tuple and key_as_tuple set to True: {}'.format(key)) elif not isinstance(key, basestring) and not key_as_tuple: raise ValueError( 'key not string and key_as_tuple set to False: {}'.format(key)) elif isinstance(key, basestring) and not key_as_tuple: parts = key.split(delim) else: parts = key d = result for part in parts[:-1]: if part not in d: d[part] = {} d = d[part] if not is_dict_like(d): v1, v2 = sorted([str(d), str({parts[-1]: value})]) raise KeyError("child conflict for path: " "{0}; {1} and {2}".format(parts[:-1], v1, v2)) elif parts[-1] in d: try: value = merge([d[parts[-1]], value]) except Exception: v1, v2 = sorted([str(value), str(d[parts[-1]])]) raise KeyError("child conflict for path: " "{0}; {1} and {2}".format(parts, v1, v2)) d[parts[-1]] = value if list_of_dicts is not None: result = _recreate_lists(result, list_of_dicts) # if is_dict_like(result): # if all([str(k).startswith(list_of_dicts) for k in result.keys()]): # result = [result[k] for k in sorted(list(result.keys()), # key=lambda x: int(x.replace(list_of_dicts, '')))] return result
def _single_merge(a, b, error_path=None, overwrite=False, append=False, list_of_dicts=False): """merges b into a """ if error_path is None: error_path = [] if list_of_dicts and is_list_of_dict_like(a) and is_list_of_dict_like(b): if len(a) != len(b): raise ValueError( 'list of dicts are of different lengths at ' '"{0}": old: {1}, new: {2}'.format('.'.join(error_path), a, b)) return [_single_merge(a_item, b_item, error_path + ["iter_{}".format(i)], overwrite, append, list_of_dicts) for i, (a_item, b_item) in enumerate(zip(a, b))] for key in b: if key in a: if is_dict_like(a[key]) and is_dict_like(b[key]): _single_merge(a[key], b[key], error_path + [str(key)], overwrite, append, list_of_dicts) elif (isinstance(a[key], list) and isinstance(b[key], list) and append): a[key] += b[key] elif (list_of_dicts and is_list_of_dict_like(a[key]) and is_list_of_dict_like(b[key])): if len(a[key]) != len(b[key]): raise ValueError( 'list of dicts are of different lengths at ' '"{0}": old: {1}, new: {2}'.format( '.'.join(error_path + [str(key)]), a[key], b[key])) for i, (a_item, b_item) in enumerate(zip(a[key], b[key])): _single_merge(a_item, b_item, error_path + [str(key), "iter_{}".format(i)], overwrite, append, list_of_dicts) elif a[key] == b[key]: pass # same leaf value elif overwrite: a[key] = b[key] else: raise ValueError( 'different data already exists at ' '"{0}": old: {1}, new: {2}'.format( '.'.join(error_path + [str(key)]), a[key], b[key])) else: a[key] = b[key] return a
[docs]def merge(dicts, overwrite=False, append=False, list_of_dicts=False): """ merge dicts, starting with dicts[1] into dicts[0] Parameters ---------- dicts : list[dict] list of dictionaries overwrite : bool if true allow overwriting of current data append : bool if true and items are both lists, then add them list_of_dicts: bool treat list of dicts as additional branches Examples -------- >>> from pprint import pprint >>> d1 = {1:{"a":"A"},2:{"b":"B"}} >>> d2 = {1:{"a":"A"},2:{"c":"C"}} >>> pprint(merge([d1,d2])) {1: {'a': 'A'}, 2: {'b': 'B', 'c': 'C'}} >>> d1 = {1:{"a":["A"]}} >>> d2 = {1:{"a":["D"]}} >>> pprint(merge([d1,d2],append=True)) {1: {'a': ['A', 'D']}} >>> d1 = {1:{"a":"A"},2:{"b":"B"}} >>> d2 = {1:{"a":"X"},2:{"c":"C"}} >>> merge([d1,d2],overwrite=False) Traceback (most recent call last): ... ValueError: different data already exists at "1.a": old: A, new: X >>> merge([{},{}],overwrite=False) {} >>> merge([{},{'a':1}],overwrite=False) {'a': 1} >>> pprint(merge([{},{'a':1},{'a':1},{'b':2}])) {'a': 1, 'b': 2} >>> pprint(merge([{'a':[{"b": 1}, {"c": 2}]}, {'a':[{"d": 3}]}])) Traceback (most recent call last): ... ValueError: different data already exists at "a": old: [{'b': 1}, {'c': 2}], new: [{'d': 3}] >>> pprint(merge([{'a':[{"b": 1}, {"c": 2}]}, {'a':[{"d": 3}]}], list_of_dicts=True)) Traceback (most recent call last): ... ValueError: list of dicts are of different lengths at "a": old: [{'b': 1}, {'c': 2}], new: [{'d': 3}] >>> pprint(merge([{'a':[{"b": 1}, {"c": 2}]}, {'a':[{"d": 3}, {"e": 4}]}], list_of_dicts=True)) {'a': [{'b': 1, 'd': 3}, {'c': 2, 'e': 4}]} """ # noqa: E501 outdict = copy.deepcopy(dicts[0]) def single_merge(a, b): return _single_merge(a, b, overwrite=overwrite, append=append, list_of_dicts=list_of_dicts) reduce(single_merge, [outdict] + dicts[1:]) return outdict
[docs]def flattennd(d, levels=0, key_as_tuple=True, delim='.', list_of_dicts=None): """ get nested dict as {key:dict,...}, where key is tuple/string of all-n levels of nested keys Parameters ---------- d : dict levels : int the number of levels to leave unflattened key_as_tuple : bool whether keys are list of nested keys or delimited string of nested keys delim : str if key_as_tuple=False, delimiter for keys list_of_dicts: str or None if not None, flatten lists of dicts using this prefix Examples -------- >>> from pprint import pprint >>> d = {1:{2:{3:{'b':'B','c':'C'},4:'D'}}} >>> pprint(flattennd(d,0)) {(1, 2, 3, 'b'): 'B', (1, 2, 3, 'c'): 'C', (1, 2, 4): 'D'} >>> pprint(flattennd(d,1)) {(1, 2): {4: 'D'}, (1, 2, 3): {'b': 'B', 'c': 'C'}} >>> pprint(flattennd(d,2)) {(1,): {2: {4: 'D'}}, (1, 2): {3: {'b': 'B', 'c': 'C'}}} >>> pprint(flattennd(d,3)) {(): {1: {2: {4: 'D'}}}, (1,): {2: {3: {'b': 'B', 'c': 'C'}}}} >>> pprint(flattennd(d,4)) {(): {1: {2: {3: {'b': 'B', 'c': 'C'}, 4: 'D'}}}} >>> pprint(flattennd(d,5)) {(): {1: {2: {3: {'b': 'B', 'c': 'C'}, 4: 'D'}}}} >>> pprint(flattennd(d,1,key_as_tuple=False,delim='.')) {'1.2': {4: 'D'}, '1.2.3': {'b': 'B', 'c': 'C'}} >>> test_dict = {"a":[{"b":[{"c":1, "d": 2}, {"e":3, "f": 4}]}, {"b":[{"c":5, "d": 6}, {"e":7, "f": 8}]}]} >>> pprint(flattennd(test_dict, list_of_dicts="__list__", levels=2)) {('a', '__list__0', 'b'): [{'c': 1, 'd': 2}, {'e': 3, 'f': 4}], ('a', '__list__1', 'b'): [{'c': 5, 'd': 6}, {'e': 7, 'f': 8}]} >>> pprint(flattennd(test_dict, list_of_dicts="__list__", levels=3)) {('a', '__list__0'): {'b': [{'c': 1, 'd': 2}, {'e': 3, 'f': 4}]}, ('a', '__list__1'): {'b': [{'c': 5, 'd': 6}, {'e': 7, 'f': 8}]}} """ # noqa: E501 if levels < 0: raise ValueError('unflattened levels must be greater than 0') new_d = {} flattened = flatten(d, True, delim, list_of_dicts=list_of_dicts) if levels == 0: return flattened for key, value in flattened.items(): if key_as_tuple: new_key = key[: - (levels)] else: new_key = delim.join([str(k) for k in key[:-(levels)]]) new_levels = key[-(levels):] # val_dict = {new_levels: value} # val_dict = unflatten(val_dict, True, delim) if new_key not in new_d: new_d[new_key] = {new_levels: value} else: if new_levels in new_d[new_key]: raise ValueError( "key clash for: {0}; {1}".format(new_key, new_levels)) new_d[new_key][new_levels] = value for nkey, nvalue in new_d.items(): new_d[nkey] = unflatten( nvalue, list_of_dicts=list_of_dicts, deepcopy=False) return new_d
[docs]def flatten2d(d, key_as_tuple=True, delim='.', list_of_dicts=None): """ get nested dict as {key:dict,...}, where key is tuple/string of all-1 nested keys NB: is same as flattennd(d,1,key_as_tuple,delim) Parameters ---------- d : dict key_as_tuple : bool whether keys are list of nested keys or delimited string of nested keys delim : str if key_as_tuple=False, delimiter for keys list_of_dicts: str or None if not None, flatten lists of dicts using this prefix Examples -------- >>> from pprint import pprint >>> d = {1:{2:{3:{'b':'B','c':'C'},4:'D'}}} >>> pprint(flatten2d(d)) {(1, 2): {4: 'D'}, (1, 2, 3): {'b': 'B', 'c': 'C'}} >>> pprint(flatten2d(d,key_as_tuple=False,delim=',')) {'1,2': {4: 'D'}, '1,2,3': {'b': 'B', 'c': 'C'}} """ return flattennd(d, 1, key_as_tuple, delim, list_of_dicts=list_of_dicts)
[docs]def remove_keys(d, keys=None, use_wildcards=True, list_of_dicts=False, deepcopy=True): """remove certain keys from nested dict, retaining preceeding paths Parameters ---------- keys: list use_wildcards : bool if true, can use * (matches everything) and ? (matches any single character) list_of_dicts: bool treat list of dicts as additional branches deepcopy: bool Examples -------- >>> from pprint import pprint >>> d = {1:{"a":"A"},"a":{"b":"B"}} >>> pprint(remove_keys(d,['a'])) {1: 'A', 'b': 'B'} >>> pprint(remove_keys({'abc':1},['a*'],use_wildcards=False)) {'abc': 1} >>> pprint(remove_keys({'abc':1},['a*'],use_wildcards=True)) {} """ keys = [] if keys is None else keys list_of_dicts = '__list__' if list_of_dicts else None def is_in(a, bs): if use_wildcards: for b in bs: try: if a == b: return True if fnmatch(a, b): return True except Exception: pass return False else: try: return a in bs except Exception: return False if not hasattr(d, 'items'): return d else: dic = flatten(d, list_of_dicts=list_of_dicts) new_dic = {} for key, value in dic.items(): new_key = tuple([i for i in key if not is_in(i, keys)]) if not new_key: continue try: if new_key[-1].startswith(list_of_dicts): continue except Exception: pass new_dic[new_key] = value return unflatten( new_dic, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def remove_keyvals(d, keyvals=None, list_of_dicts=False, deepcopy=True): """remove paths with at least one branch leading to certain (key,value) pairs from dict Parameters ---------- d : dict keyvals : dict or list[tuple] (key,value) pairs to remove list_of_dicts: bool treat list of dicts as additional branches Examples -------- >>> from pprint import pprint >>> d = {1:{"b":"A"},"a":{"b":"B","c":"D"},"b":{"a":"B"}} >>> pprint(remove_keyvals(d,[("b","B")])) {1: {'b': 'A'}, 'b': {'a': 'B'}} >>> d2 = {'a':[{'b':1,'c':1},{'b':1,'c':2}]} >>> pprint(remove_keyvals(d2,[("b",1)])) {'a': [{'b': 1, 'c': 1}, {'b': 1, 'c': 2}]} >>> pprint(remove_keyvals(d2,[("b",1)],list_of_dicts=True)) {} """ keyvals = [] if keyvals is None else keyvals list_of_dicts = '__list__' if list_of_dicts else None if hasattr(keyvals, 'items'): keyvals = [(k, v) for k, v in keyvals.items()] if not hasattr(d, 'items'): return d flatd = flatten(d, list_of_dicts=list_of_dicts) def is_in(a, b): try: return a in b except Exception: return False prune = [k[0] for k, v in flatd.items() if is_in((k[-1], v), keyvals)] flatd = {k: v for k, v in flatd.items() if not is_in(k[0], prune)} return unflatten(flatd, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def remove_paths(d, keys, list_of_dicts=False, deepcopy=True): """ remove paths containing certain keys from dict Parameters ---------- d: dict keys : list list of keys to find and remove path list_of_dicts: bool treat list of dicts as additional branches deepcopy: bool deepcopy values Examples -------- >>> from pprint import pprint >>> d = {1:{"a":"A"},2:{"b":"B"},4:{5:{6:'a',7:'b'}}} >>> pprint(remove_paths(d,[6,'a'])) {2: {'b': 'B'}, 4: {5: {7: 'b'}}} >>> d = {1:{2: 3}, 1:{4: 5}} >>> pprint(remove_paths(d,[(1, 2)])) {1: {4: 5}} >>> d2 = {'a':[{'b':1,'c':{'b':3}},{'b':1,'c':2}]} >>> pprint(remove_paths(d2,["b"],list_of_dicts=False)) {'a': [{'b': 1, 'c': {'b': 3}}, {'b': 1, 'c': 2}]} >>> pprint(remove_paths(d2,["b"],list_of_dicts=True)) {'a': [{'c': 2}]} """ keys = [(key,) if not isinstance(key, tuple) else key for key in keys] list_of_dicts = '__list__' if list_of_dicts else None def contains(path): for k in keys: if set(k).issubset(path): return True return False flatd = flatten(d, list_of_dicts=list_of_dicts) flatd = {path: v for path, v in flatd.items() if not contains(path)} return unflatten(flatd, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
# return {key: remove_paths(value,keys) # for key, value in d.items() if key not in keys}
[docs]def filter_values(d, vals=None, list_of_dicts=False, deepcopy=True): """ filters leaf nodes of nested dictionary Parameters ---------- d : dict vals : list values to filter by list_of_dicts: bool treat list of dicts as additional branches deepcopy: bool deepcopy values Examples -------- >>> d = {1:{"a":"A"},2:{"b":"B"},4:{5:{6:'a'}}} >>> filter_values(d,['a']) {4: {5: {6: 'a'}}} """ vals = [] if vals is None else vals list_of_dicts = '__list__' if list_of_dicts else None flatd = flatten(d, list_of_dicts=list_of_dicts) def is_in(a, b): try: return a in b except Exception: return False flatd = {k: v for k, v in flatd.items() if is_in(v, vals)} return unflatten(flatd, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
def _in_pruned(k, pruned): for p in pruned: if tuple(k[:len(p)]) == p: return True return False # TODO filter_keyvals; deal with uncomparable values, speedup?
[docs]def filter_keyvals(d, keyvals, logic="OR", keep_siblings=False, list_of_dicts=False, deepcopy=True): """ filters leaf nodes key:value pairs of nested dictionary Parameters ---------- d : dict keyvals : dict or list[tuple] (key,value) pairs to filter by logic : str "OR" or "AND" for matching pairs keep_siblings : bool keep all sibling paths list_of_dicts : bool treat list of dicts as additional branches deepcopy: bool deepcopy values Examples -------- >>> from pprint import pprint >>> d = {1:{6:'a'},3:{7:'a'},2:{6:"b"},4:{5:{6:'a'}}} >>> pprint(filter_keyvals(d,[(6,'a')])) {1: {6: 'a'}, 4: {5: {6: 'a'}}} >>> d2 = {'a':{'b':1,'c':2,'d':3}, 'e':4} >>> pprint(filter_keyvals(d2, {'b': 1, 'e': 4}, logic="OR", keep_siblings=False)) {'a': {'b': 1}, 'e': 4} >>> pprint(filter_keyvals(d2,[('b',1)], logic="OR", keep_siblings=True)) {'a': {'b': 1, 'c': 2, 'd': 3}} >>> pprint(filter_keyvals(d2, {'b': 1, 'e': 4}, logic="AND", keep_siblings=False)) {} >>> pprint(filter_keyvals(d2, {'b': 1, 'c': 2}, logic="AND", keep_siblings=False)) {'a': {'b': 1, 'c': 2}} >>> pprint(filter_keyvals(d2,[('b',1), ('c',2)], logic="AND", keep_siblings=True)) {'a': {'b': 1, 'c': 2, 'd': 3}} >>> d3 = {"a": {"b": 1, "f": {"d": 3}}, "e": {"b": 1, "c": 2, "f": {"d": 3}}, "g": 5} >>> pprint(filter_keyvals(d3,[('b',1), ('c', 2)], logic="OR", keep_siblings=True)) {'a': {'b': 1, 'f': {'d': 3}}, 'e': {'b': 1, 'c': 2, 'f': {'d': 3}}} >>> pprint(filter_keyvals(d3,[('b',1), ('c', 2)], logic="AND", keep_siblings=True)) {'e': {'b': 1, 'c': 2, 'f': {'d': 3}}} """ # noqa: E501 if len(keyvals) != len(dict(keyvals)): raise ValueError("repeating keys in keyvals: {}".format(keyvals)) keyvals = dict(keyvals) list_of_dicts = '__list__' if list_of_dicts else None flattened = flatten(d, list_of_dicts=list_of_dicts) if logic == "OR": if keep_siblings: pruned = { tuple(k[:-1]) for k, v in flattened.items() if any(key == k[-1] and v == keyvals[key] for key in keyvals)} filtered = {k: v for k, v in flattened.items() if _in_pruned(k, pruned)} else: filtered = { k: v for k, v in flattened.items() if any(key == k[-1] and v == keyvals[key] for key in keyvals)} elif logic == "AND": pruned = {} for k, v in flattened.items(): if any(key == k[-1] and v == keyvals[key] for key in keyvals): pruned[tuple(k[:-1])] = pruned.get(tuple(k[:-1]), []) + [k[-1]] all_keys = set(keyvals.keys()) pruned = [k for k, v in pruned.items() if set(v) == all_keys] if keep_siblings: filtered = {k: v for k, v in flattened.items() if _in_pruned(k, pruned)} else: filtered = {k: v for k, v in flattened.items( ) if k[-1] in all_keys and _in_pruned(k, pruned)} else: raise ValueError("logic must be AND or OR: {}".format(logic)) return unflatten(filtered, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def filter_keyfuncs(d, keyfuncs, logic="OR", keep_siblings=False, list_of_dicts=False, deepcopy=True): """ filters leaf nodes key:func(val) pairs of nested dictionary, where func(val) -> True/False Parameters ---------- d : dict keyfuncs : dict or list[tuple] (key,funcs) pairs to filter by logic : str "OR" or "AND" for matching pairs keep_siblings : bool keep all sibling paths list_of_dicts : bool treat list of dicts as additional branches deepcopy: bool deepcopy values Examples -------- >>> from pprint import pprint >>> d = {'a':{'b':1,'c':2,'d':3}, 'e':4} >>> func1 = lambda v: v <= 2 >>> pprint(filter_keyfuncs(d, {'b': func1, 'e': func1}, logic="OR", keep_siblings=False)) {'a': {'b': 1}} >>> pprint(filter_keyfuncs(d,[('b',func1), ('d', func1)], logic="OR", keep_siblings=True)) {'a': {'b': 1, 'c': 2, 'd': 3}} >>> pprint(filter_keyfuncs(d, {'b': func1, 'e': func1}, logic="AND", keep_siblings=False)) {} >>> pprint(filter_keyfuncs(d, {'b': func1, 'd': func1}, logic="AND", keep_siblings=False)) {} >>> pprint(filter_keyfuncs(d, {'b': func1, 'c': func1}, logic="AND", keep_siblings=False)) {'a': {'b': 1, 'c': 2}} >>> pprint(filter_keyfuncs(d,[('b',func1), ('c',func1)], logic="AND", keep_siblings=True)) {'a': {'b': 1, 'c': 2, 'd': 3}} """ # noqa: E501 if len(keyfuncs) != len(dict(keyfuncs)): raise ValueError("repeating keys in keyfuncs: {}".format(keyfuncs)) keyfuncs = dict(keyfuncs) list_of_dicts = '__list__' if list_of_dicts else None flattened = flatten(d, list_of_dicts=list_of_dicts) if logic == "OR": if keep_siblings: pruned = { tuple(k[:-1]) for k, v in flattened.items() if any(key == k[-1] and keyfuncs[key](v) for key in keyfuncs)} filtered = {k: v for k, v in flattened.items() if _in_pruned(k, pruned)} else: filtered = { k: v for k, v in flattened.items() if any(key == k[-1] and keyfuncs[key](v) for key in keyfuncs)} elif logic == "AND": pruned = {} for k, v in flattened.items(): if any(key == k[-1] and keyfuncs[key](v) for key in keyfuncs): pruned[tuple(k[:-1])] = pruned.get(tuple(k[:-1]), []) + [k[-1]] all_keys = set(keyfuncs.keys()) pruned = [k for k, v in pruned.items() if set(v) == all_keys] if keep_siblings: filtered = {k: v for k, v in flattened.items() if _in_pruned(k, pruned)} else: filtered = {k: v for k, v in flattened.items( ) if k[-1] in all_keys and _in_pruned(k, pruned)} else: raise ValueError("logic must be AND or OR: {}".format(logic)) return unflatten(filtered, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def filter_keys(d, keys, use_wildcards=False, list_of_dicts=False, deepcopy=True): """ filter dict by certain keys Parameters ---------- d : dict keys: list use_wildcards : bool if true, can use * (matches everything) and ? (matches any single character) list_of_dicts: bool treat list of dicts as additional branches deepcopy: bool deepcopy values Examples -------- >>> from pprint import pprint >>> d = {1:{"a":"A"},2:{"b":"B"},4:{5:{6:'a',7:'b'}}} >>> pprint(filter_keys(d,['a',6])) {1: {'a': 'A'}, 4: {5: {6: 'a'}}} >>> d = {1:{"axxxx":"A"},2:{"b":"B"}} >>> pprint(filter_keys(d,['a*'],use_wildcards=True)) {1: {'axxxx': 'A'}} """ list_of_dicts = '__list__' if list_of_dicts else None flatd = flatten(d, list_of_dicts=list_of_dicts) def is_in(a, bs): if use_wildcards: for b in bs: try: if a == b: return True if fnmatch(b, a): return True except Exception: pass return False else: try: return a in bs except Exception: return False flatd = {paths: v for paths, v in flatd.items() if any( [is_in(k, paths) for k in keys])} return unflatten(flatd, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def filter_paths(d, paths, list_of_dicts=False, deepcopy=True): """ filter dict by certain paths containing key sets Parameters ---------- d : dict paths : list[str] or list[tuple] list_of_dicts: bool treat list of dicts as additional branches deepcopy: bool deepcopy values Examples -------- >>> from pprint import pprint >>> d = {'a':{'b':1,'c':{'d':2}},'e':{'c':3}} >>> filter_paths(d,[('c','d')]) {'a': {'c': {'d': 2}}} >>> d2 = {'a':[{'b':1,'c':3},{'b':1,'c':2}]} >>> pprint(filter_paths(d2,["b"],list_of_dicts=False)) {} >>> pprint(filter_paths(d2,["c"],list_of_dicts=True)) {'a': [{'c': 3}, {'c': 2}]} """ list_of_dicts = '__list__' if list_of_dicts else None all_keys = [x for y in paths if isinstance(y, tuple) for x in y] all_keys += [x for x in paths if not isinstance(x, tuple)] # faster to filter first I think new_d = filter_keys(d, all_keys, list_of_dicts=list_of_dicts) new_d = flatten(d, list_of_dicts=list_of_dicts) for key in list(new_d.keys()): if not any([ set(key).issuperset(path if isinstance(path, tuple) else[path]) for path in paths]): new_d.pop(key) return unflatten(new_d, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def rename_keys(d, keymap=None, list_of_dicts=False, deepcopy=True): """ rename keys in dict Parameters ---------- d : dict keymap : dict dictionary of key name mappings list_of_dicts: bool treat list of dicts as additional branches deepcopy: bool deepcopy values Examples -------- >>> from pprint import pprint >>> d = {'a':{'old_name':1}} >>> pprint(rename_keys(d,{'old_name':'new_name'})) {'a': {'new_name': 1}} """ list_of_dicts = '__list__' if list_of_dicts else None keymap = {} if keymap is None else keymap flatd = flatten(d, list_of_dicts=list_of_dicts) flatd = { tuple([keymap.get(k, k) for k in path]): v for path, v in flatd.items() } return unflatten(flatd, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
# return {keymap[key] if key in keymap else key: rename_keys(value,keymap) # for key, value in d.items()}
[docs]def split_key(d, key, new_keys, before=True, list_of_dicts=False, deepcopy=True): """ split an existing key(s) into multiple levels Parameters ---------- d : dict or dict like key: str existing key value new_keys: list[str] new levels to add before: bool add level before existing key (else after) list_of_dicts: bool treat list of dicts as additional branches Examples -------- >>> from pprint import pprint >>> d = {'a':1,'b':2} >>> pprint(split_key(d,'a',['c','d'])) {'b': 2, 'c': {'d': {'a': 1}}} >>> pprint(split_key(d,'a',['c','d'],before=False)) {'a': {'c': {'d': 1}}, 'b': 2} >>> d2 = [{'a':1},{'a':2},{'a':3}] >>> pprint(split_key(d2,'a',['b'],list_of_dicts=True)) [{'b': {'a': 1}}, {'b': {'a': 2}}, {'b': {'a': 3}}] """ list_of_dicts = '__list__' if list_of_dicts else None flatd = flatten(d, list_of_dicts=list_of_dicts) newd = {} for path, v in flatd.items(): if key in path: newk = [] for k in path: if k == key: if before: newk = newk + new_keys + [k] else: newk = newk + [k] + new_keys else: newk.append(k) newd[tuple(newk)] = v else: newd[path] = v return unflatten(newd, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def apply(d, leaf_key, func, new_name=None, remove_lkey=True, list_of_dicts=False, unflatten_level=0, deepcopy=True, **kwargs): """ apply a function to all values with a certain leaf (terminal) key Parameters ---------- d : dict leaf_key : str name of leaf key func : callable function to apply new_name : str if not None, rename leaf_key remove_lkey: bool whether to remove original leaf_key (if new_name is not None) list_of_dicts: bool treat list of dicts as additional branches unflatten_level : int or None the number of levels to leave unflattened before combining, for instance if you need dicts as inputs deepcopy: bool deepcopy values kwargs : dict additional keywords to parse to function Examples -------- >>> from pprint import pprint >>> d = {'a':1,'b':1} >>> func = lambda x: x+1 >>> pprint(apply(d,'a',func)) {'a': 2, 'b': 1} >>> pprint(apply(d,'a',func,new_name='c')) {'b': 1, 'c': 2} >>> pprint(apply(d,'a',func,new_name='c', remove_lkey=False)) {'a': 1, 'b': 1, 'c': 2} >>> test_dict = {"a":[{"b":[{"c":1, "d": 2}, {"e":3, "f": 4}]}, {"b":[{"c":5, "d": 6}, {"e":7, "f": 8}]}]} >>> pprint(apply(test_dict, "b", lambda x: x[-1], list_of_dicts=True, unflatten_level=2)) {'a': [{'b': {'e': 3, 'f': 4}}, {'b': {'e': 7, 'f': 8}}]} """ # noqa: E501 list_of_dicts = '__list__' if list_of_dicts else None if unflatten_level == 0: flatd = flatten(d, list_of_dicts=list_of_dicts) else: flatd = flattennd(d, unflatten_level, list_of_dicts=list_of_dicts) newd = {k: (func(v, **kwargs) if k[-1] == leaf_key else v) for k, v in flatd.items()} if new_name is not None: newd = {(tuple(list(k[:-1]) + [new_name]) if k[-1] == leaf_key else k): v for k, v in newd.items()} if not remove_lkey: newd.update(flatd) return unflatten(newd, list_of_dicts=list_of_dicts, deepcopy=deepcopy)
[docs]def combine_apply(d, leaf_keys, func, new_name, unflatten_level=1, remove_lkeys=True, overwrite=False, list_of_dicts=False, deepcopy=True, **kwargs): """ combine values with certain leaf (terminal) keys by a function Parameters ---------- d : dict leaf_keys : list names of leaf keys func : callable function to apply, must take at least len(leaf_keys) arguments new_name : str new key name unflatten_level : int or None the number of levels to leave unflattened before combining, for instance if you need dicts as inputs (None means all) remove_lkeys: bool whether to remove original leaf_keys overwrite: bool whether to overwrite any existing new_name key list_of_dicts: bool treat list of dicts as additional branches deepcopy: bool deepcopy values kwargs : dict additional keywords to parse to function Examples -------- >>> from pprint import pprint >>> d = {'a':1,'b':2} >>> func = lambda x,y: x+y >>> pprint(combine_apply(d,['a','b'],func,'c')) {'c': 3} >>> pprint(combine_apply(d,['a','b'],func,'c',remove_lkeys=False)) {'a': 1, 'b': 2, 'c': 3} >>> d = {1:{'a':1,'b':2},2:{'a':4,'b':5},3:{'a':1}} >>> pprint(combine_apply(d,['a','b'],func,'c')) {1: {'c': 3}, 2: {'c': 9}, 3: {'a': 1}} >>> func2 = lambda x: sorted(list(x.keys())) >>> d2 = {'d':{'a':{'b':1,'c':2}}} >>> pprint(combine_apply(d2,['a'],func2,'a',unflatten_level=2)) {'d': {'a': ['b', 'c']}} """ list_of_dicts = '__list__' if list_of_dicts else None if unflatten_level is not None: flatd = flattennd(d, levels=unflatten_level, list_of_dicts=list_of_dicts) else: # TODO could do this better? flatd = unflatten(d, key_as_tuple=False, delim='*@#$', deepcopy=deepcopy) for dic in flatd.values(): if not is_dict_like(dic): continue if all([k in list(dic.keys()) for k in leaf_keys]): if remove_lkeys: vals = [dic.pop(k) for k in leaf_keys] else: vals = [dic[k] for k in leaf_keys] if new_name in dic and not overwrite: raise ValueError('{} already in sub-dict'.format(new_name)) dic[new_name] = func(*vals, **kwargs) if unflatten_level is not None: return unflatten(flatd, list_of_dicts=list_of_dicts, deepcopy=deepcopy) else: return flatd
[docs]def split_lists(d, split_keys, new_name='split', check_length=True, deepcopy=True): """split_lists key:list pairs into dicts for each item in the lists NB: will only split if all split_keys are present Parameters ---------- d : dict split_keys : list keys to split new_name : str top level key for split items check_length : bool if true, raise error if any lists are of a different length deepcopy: bool deepcopy values Examples -------- >>> from pprint import pprint >>> d = {'path_key':{'x':[1,2],'y':[3,4],'a':1}} >>> new_d = split_lists(d,['x','y']) >>> pprint(new_d) {'path_key': {'a': 1, 'split': [{'x': 1, 'y': 3}, {'x': 2, 'y': 4}]}} >>> split_lists(d,['x','a']) Traceback (most recent call last): ... ValueError: "a" data at the following path is not a list ('path_key',) >>> d2 = {'path_key':{'x':[1,7],'y':[3,4,5]}} >>> split_lists(d2,['x','y']) Traceback (most recent call last): ... ValueError: lists at the following path do not have the same size ('path_key',) """ # noqa: E501 flattened = flatten2d(d) new_d = {} for key, value in flattened.items(): if set(split_keys).issubset(value.keys()): # combine_d = {} combine_d = [] sub_d = {} length = None for subkey, subvalue in value.items(): if subkey in split_keys: if not isinstance(subvalue, list): raise ValueError( '"{0}" data at the following path is not a list ' '{1}'.format(subkey, key)) if check_length and length is not None: if len(subvalue) != length: raise ValueError( 'lists at the following path ' 'do not have the same size {0}'.format(key)) if length is None: combine_d = [{subkey: v} for v in subvalue] else: for item, val in zip(combine_d, subvalue): item[subkey] = val length = len(subvalue) # new_combine = {k:{subkey:v} # for k,v in enumerate(subvalue)} # combine_d = merge([combine_d,new_combine]) else: sub_d[subkey] = subvalue try: new_d[key] = merge([sub_d, {new_name: combine_d}]) except ValueError: raise ValueError( 'split data key: {0}, already exists at ' 'this level for {1}'.format(new_name, key)) else: new_d[key] = value return unflatten(new_d, deepcopy=deepcopy)
[docs]def combine_lists(d, keys=None, deepcopy=True): """combine lists of dicts Parameters ---------- d : dict or list[dict] keys : list keys to combine (all if None) deepcopy: bool deepcopy values Example ------- >>> from pprint import pprint >>> d = {'path_key': {'a': 1, 'split': [{'x': 1, 'y': 3}, {'x': 2, 'y': 4}]}} >>> pprint(combine_lists(d,['split'])) {'path_key': {'a': 1, 'split': {'x': [1, 2], 'y': [3, 4]}}} >>> combine_lists([{"a":2}, {"a":1}]) {'a': [2, 1]} """ # noqa: E501 if isinstance(d, list): init_list = True d = {'dummy_key843': d} else: init_list = False flattened = flatten(d, list_of_dicts=None) for key, value in list(flattened.items()): if keys is not None: try: if not key[-1] in keys: continue except Exception: continue if not isinstance(value, list): continue if not all([is_dict_like(d) for d in value]): continue newd = {} for subdic in value: for subk, subv in subdic.items(): if subk not in newd: newd[subk] = [] newd[subk].append(subv) flattened[key] = newd final = unflatten(flattened, list_of_dicts=None, deepcopy=deepcopy) if init_list: return list(final.values())[0] else: return final
[docs]def list_to_dict(lst, key=None, remove_key=True): """ convert a list of dicts to a dict with root keys Parameters ---------- lst : list[dict] key : str or None a key contained by all of the dicts if None use index number string remove_key : bool remove key from dicts in list Examples -------- >>> from pprint import pprint >>> lst = [{'name':'f','b':1},{'name':'g','c':2}] >>> pprint(list_to_dict(lst)) {'0': {'b': 1, 'name': 'f'}, '1': {'c': 2, 'name': 'g'}} >>> pprint(list_to_dict(lst,'name')) {'f': {'b': 1}, 'g': {'c': 2}} """ assert all([is_dict_like(d) for d in lst]) if key is not None: assert all([key in d for d in lst]) new_dict = {} for i, d in enumerate(lst): d = unflatten(flatten(d)) if key is None: new_dict[str(i)] = d else: if remove_key: k = d.pop(key) else: k = d[key] new_dict[k] = d return new_dict
[docs]def diff(new_dict, old_dict, iter_prefix='__iter__', np_allclose=False, **kwargs): """ return the difference between two dict_like objects Parameters ---------- new_dict: dict old_dict: dict iter_prefix: str prefix to use for list and tuple indexes np_allclose: bool if True, try using numpy.allclose to assess differences **kwargs: keyword arguments to parse to numpy.allclose Returns ------- outcome: dict Containing none or more of: - "insertions" : list of (path, val) - "deletions" : list of (path, val) - "changes" : list of (path, (val1, val2)) - "uncomparable" : list of (path, (val1, val2)) Examples -------- >>> from pprint import pprint >>> diff({'a':1},{'a':1}) {} >>> pprint(diff({'a': 1, 'b': 2, 'c': 5},{'b': 3, 'c': 4, 'd': 6})) {'changes': [(('b',), (2, 3)), (('c',), (5, 4))], 'deletions': [(('d',), 6)], 'insertions': [(('a',), 1)]} >>> pprint(diff({'a': [{"b":1}, {"c":2}, 1]},{'a': [{"b":1}, {"d":2}, 2]})) {'changes': [(('a', '__iter__2'), (1, 2))], 'deletions': [(('a', '__iter__1', 'd'), 2)], 'insertions': [(('a', '__iter__1', 'c'), 2)]} >>> diff({'a':1}, {'a':1+1e-10}) {'changes': [(('a',), (1, 1.0000000001))]} >>> diff({'a':1}, {'a':1+1e-10}, np_allclose=True) {} """ if np_allclose: try: import numpy except ImportError: raise ValueError("to use np_allclose, numpy must be installed") dct1_flat = flatten(new_dict, all_iters=iter_prefix) dct2_flat = flatten(old_dict, all_iters=iter_prefix) outcome = {'insertions': [], 'deletions': [], 'changes': [], 'uncomparable': []} for path, val in dct1_flat.items(): if path not in dct2_flat: outcome['insertions'].append((path, val)) continue other_val = dct2_flat.pop(path) if np_allclose: try: if numpy.allclose(val, other_val, **kwargs): continue except Exception: pass try: if val != other_val: outcome['changes'].append((path, (val, other_val))) except Exception: outcome['uncomparable'].append((path, (val, other_val))) for path2, val2 in dct2_flat.items(): outcome['deletions'].append((path2, val2)) # remove any empty lists and sort for key in list(outcome.keys()): if not outcome[key]: outcome.pop(key) try: outcome[key] = sorted(outcome[key]) except Exception: pass return outcome
[docs]def to_json(dct, jfile, overwrite=False, dirlevel=0, sort_keys=True, indent=2, default_name='root.json', **kwargs): """ output dict to json Parameters ---------- dct : dict jfile : str or file_like if file_like, must have write method overwrite : bool whether to overwrite existing files dirlevel : int if jfile is path to folder, defines how many key levels to set as sub-folders sort_keys : bool if true then the output of dictionaries will be sorted by key indent : int if non-negative integer, then JSON array elements and object members will be pretty-printed on new lines with that indent level spacing. kwargs : dict keywords for json.dump Examples -------- >>> from jsonextended.utils import MockPath >>> file_obj = MockPath('test.json',is_file=True,exists=False) >>> dct = {'a':{'b':1}} >>> to_json(dct, file_obj) >>> print(file_obj.to_string()) File("test.json") Contents: { "a": { "b": 1 } } >>> from jsonextended.utils import MockPath >>> folder_obj = MockPath() >>> dct = {'x':{'a':{'b':1},'c':{'d':3}}} >>> to_json(dct, folder_obj, dirlevel=0,indent=None) >>> print(folder_obj.to_string(file_content=True)) Folder("root") File("x.json") Contents: {"a": {"b": 1}, "c": {"d": 3}} >>> folder_obj = MockPath() >>> to_json(dct, folder_obj, dirlevel=1,indent=None) >>> print(folder_obj.to_string(file_content=True)) Folder("root") Folder("x") File("a.json") Contents: {"b": 1} File("c.json") Contents: {"d": 3} """ if hasattr(jfile, 'write'): json.dump(dct, jfile, sort_keys=sort_keys, indent=indent, default=encode) return if isinstance(jfile, basestring): path = pathlib.Path(jfile) else: path = jfile file_attrs = ['exists', 'is_dir', 'is_file', 'touch', 'open'] if not all([hasattr(path, attr) for attr in file_attrs]): raise ValueError( 'jfile should be a str or file_like object: {}'.format(jfile)) if path.is_file() and path.exists() and not overwrite: raise IOError('jfile already exists and ' 'overwrite is set to false: {}'.format(jfile)) if not path.is_dir() and dirlevel <= 0: path.touch() # try to create file if doesn't already exist with path.open('w') as outfile: outfile.write(unicode(json.dumps( dct, sort_keys=sort_keys, indent=indent, default=encode, **kwargs))) return if not path.is_dir(): path.mkdir() dirlevel -= 1 # if one or more values if not a nested dict if not all([hasattr(v, 'items') for v in dct.values()]): newpath = path.joinpath(default_name) newpath.touch() with newpath.open('w') as outfile: outfile.write(unicode(json.dumps( dct, sort_keys=sort_keys, indent=indent, default=encode, **kwargs))) return for key, val in dct.items(): if dirlevel <= 0: newpath = path.joinpath('{}.json'.format(key)) newpath.touch() with newpath.open('w') as outfile: outfile.write(unicode(json.dumps( val, ensure_ascii=False, sort_keys=sort_keys, indent=indent, default=encode, **kwargs))) else: newpath = path.joinpath('{}'.format(key)) if not newpath.exists(): newpath.mkdir() to_json(val, newpath, overwrite=overwrite, dirlevel=dirlevel - 1, sort_keys=sort_keys, indent=indent, default_name='{}.json'.format(key), **kwargs)
[docs]def dump(dct, jfile, overwrite=False, dirlevel=0, sort_keys=True, indent=2, default_name='root.json', **kwargs): """ output dict to json Parameters ---------- dct : dict jfile : str or file_like if file_like, must have write method overwrite : bool whether to overwrite existing files dirlevel : int if jfile is path to folder, defines how many key levels to set as sub-folders sort_keys : bool if true then the output of dictionaries will be sorted by key indent : int if non-negative integer, then JSON array elements and object members will be pretty-printed on new lines with that indent level spacing. kwargs : dict keywords for json.dump """ to_json(dct, jfile, overwrite=overwrite, dirlevel=dirlevel, sort_keys=sort_keys, indent=indent, default_name=default_name, **kwargs)
[docs]class to_html(object): # noqa: N801 """ Pretty display dictionary in collapsible format with indents Parameters ---------- obj : str or dict dict or json depth: int Depth of the json tree structure displayed, the rest is collapsed. max_length: int Maximum number of characters of a string displayed as preview, longer string appear collapsed. max_height: int Maxium height in pixels of containing box. sort: bool Whether the json keys are sorted alphabetically. local : bool use local version of javascript file uniqueid : str unique identifier (if None, auto-created) Examples --------- >>> dic = {'sape': {'value': 22}, 'jack': 4098, 'guido': 4127} >>> obj = to_html(dic, depth=1, max_length=10, sort=False, local=True, uniqueid='123') >>> print(obj._repr_html_()) <style> .renderjson a { text-decoration: none; } .renderjson .disclosure { color: red; font-size: 125%; } .renderjson .syntax { color: darkgrey; } .renderjson .string { color: black; } .renderjson .number { color: black; } .renderjson .boolean { color: purple; } .renderjson .key { color: royalblue; } .renderjson .keyword { color: orange; } .renderjson .object.syntax { color: lightseagreen; } .renderjson .array.syntax { color: lightseagreen; } </style><div id="123" style="max-height: 600px; width:100%%;"></div> <script> require(["jsonextended/renderjson.js"], function() { document.getElementById("123").appendChild( renderjson.set_max_string_length(10) //.set_icons(circled plus, circled minus) .set_icons(String.fromCharCode(8853), String.fromCharCode(8854)) .set_sort_objects(false) .set_show_to_level(1)({"guido": 4127, "jack": 4098, "sape": {"value": 22}})) });</script> """ # noqa: E501 _CSS = '<style>' + """ .renderjson a { text-decoration: none; } .renderjson .disclosure { color: red; font-size: 125%; } .renderjson .syntax { color: darkgrey; } .renderjson .string { color: black; } .renderjson .number { color: black; } .renderjson .boolean { color: purple; } .renderjson .key { color: royalblue; } .renderjson .keyword { color: orange; } .renderjson .object.syntax { color: lightseagreen; } .renderjson .array.syntax { color: lightseagreen; } """ + '</style>' def __init__(self, obj, depth=2, max_length=20, max_height=600, sort=True, local=True, uniqueid=None): """ obj : str or dict dict or json depth: int Depth of the json tree structure displayed, the rest is collapsed. max_length: int Maximum number of characters of a string displayed as preview, longer string appear collapsed. max_height: int Maxium height in pixels of containing box. sort: bool Whether the json keys are sorted alphabetically. """ def is_json(myjson): try: json.loads(myjson) except ValueError: return False return True if is_dict_like(obj): self.str = json.dumps(obj, default=encode, sort_keys=True) elif is_json(obj): self.str = obj else: raise ValueError('Wrong Input, dict or json expected') self.uuid = uniqueid if uniqueid is not None else str(uuid.uuid4()) self.depth = int(depth) self.max_length = int(max_length) self.max_height = int(max_height) self.sort = json.dumps(sort) self.local = local def _get_html(self): return """<div id="{0}" style="max-height: {1}px; width:100%%;"></div> """.format(self.uuid, self.max_height) def _get_renderpath(self): # return os.path.join(os.path.dirname(os.path.dirname( # os.path.relpath(inspect.getfile(_example_json_folder)))), # 'renderjson.js') renderjson = 'jsonextended/renderjson.js' if sys.version_info < (3, 0) or self.local: return renderjson # try online, python 2 doesn't seem to like it try: renderjson = ( 'https://rawgit.com/caldwell/renderjson/master/renderjson.js') urlopen(renderjson) except Exception: pass return renderjson def _get_javascript(self): renderjson = self._get_renderpath() return """\ <script> require(["{0}"], function() {{ document.getElementById("{1}").appendChild( renderjson.set_max_string_length({2}) //.set_icons(circled plus, circled minus) .set_icons(String.fromCharCode(8853), String.fromCharCode(8854)) .set_sort_objects({3}) .set_show_to_level({4})({5})) }});</script>""".format(renderjson, self.uuid, self.max_length, self.sort, self.depth, self.str) def _repr_html_(self): return self._CSS + self._get_html() + self._get_javascript() def __ipython_display_(self): from IPython.display import display_html, display_javascript display_html(self._CSS + self._get_html()) display_javascript(self._get_javascript())
[docs]@total_ordering class LazyLoad(object): """ lazy load a dict_like object or file structure as a pseudo dictionary (works with all edict functions) supplies tab completion of keys Parameters ---------- obj: dict or str or object file like object or path to file ignore_regexes : list[str] ignore files and folders matching these regexes (can contain \*, ? and [] wildcards) recursive: bool if True, load subdirectories parent : object the parent object of this instance key_paths: bool indicates if the keys of the object can be resolved as file/folder paths (to ensure strings do not get unintentionally treated as paths) list_of_dicts: bool treat list of dicts as additional branches parse_errors: bool if True, if parsing a file fails then an IOError will be raised if False, if parsing a file fails then only a logging.error will be made and the value will be returned as None parser_kwargs: dict additional keywords for parser plugins read_file method, (loaded decoder plugins are parsed by default) Examples -------- >>> from jsonextended import plugins >>> plugins.load_builtin_plugins() [] >>> l = LazyLoad({'a':{'b':2},3:4}) >>> print(l) {3:..,a:..} >>> l['a'] {b:..} >>> l[['a','b']] 2 >>> l.a.b 2 >>> l.i3 4 >>> from jsonextended.utils import get_test_path >>> from jsonextended.edict import pprint >>> lazydict = LazyLoad(get_test_path()) >>> pprint(lazydict,depth=2) dir1: dir1_1: {...} file1.json: {...} file2.json: {...} dir2: file1.csv: {...} file1.json: {...} dir3: file1.keypair: key1: val1 key2: val2 key3: val3 >>> 'dir1' in lazydict True >>> sorted(lazydict.keys()) ['dir1', 'dir2', 'dir3', 'file1.keypair'] >>> sorted(lazydict.values()) [{}, {key1:..,key2:..,key3:..}, {file1.csv:..,file1.json:..}, {dir1_1:..,file1.json:..,file2.json:..}] >>> lazydict.dir1.file1_json {initial:..,meta:..,optimised:..,units:..} >>> ldict = lazydict.dir1.file1_json.to_dict() >>> isinstance(ldict,dict) True >>> pprint(ldict,depth=1) initial: {...} meta: {...} optimised: {...} units: {...} >>> lazydict = LazyLoad(get_test_path(),recursive=False) >>> lazydict {file1.keypair:..} >>> lazydict = LazyLoad([{'a':{'b':{'c':1}}},{'a':2}], ... list_of_dicts=True) >>> lazydict.i0.a.b.c 1 >>> LazyLoad([1,2,3]) Traceback (most recent call last): ... ValueError: not an expandable object: [1, 2, 3] >>> plugins.unload_all_plugins() """ # noqa: E501 # TODO lazyload parent is not used def __init__(self, obj, ignore_regexes=('.*', '_*'), recursive=True, parent=None, key_paths=True, list_of_dicts=False, parse_errors=True, **parser_kwargs): """ initialise """ self._obj = obj self._ignore_regexes = ignore_regexes self._key_paths = key_paths self._parse_errors = parse_errors self._parser_kwargs = parser_kwargs if 'object_hook' not in parser_kwargs: self._parser_kwargs['object_hook'] = decode self._recurse = recursive self._list_of_dicts = list_of_dicts self._itemmap = None self._tabmap = None def _next_level(self, obj): """get object for next level of tab """ if is_dict_like(obj): child = LazyLoad( obj, self._ignore_regexes, parent=self, key_paths=False, list_of_dicts=self._list_of_dicts, parse_errors=self._parse_errors, **self._parser_kwargs) return child if is_path_like(obj): if not any([fnmatch(obj.name, regex) for regex in self._ignore_regexes]): if parser_available(obj): child = LazyLoad( obj, self._ignore_regexes, parent=self, key_paths=False, list_of_dicts=self._list_of_dicts, parse_errors=self._parse_errors, **self._parser_kwargs) return child elif obj.is_dir(): child = LazyLoad( obj, self._ignore_regexes, parent=self, key_paths=self._key_paths, list_of_dicts=self._list_of_dicts, parse_errors=self._parse_errors, **self._parser_kwargs) return child return obj def _expand(self): """ create item map for next level of data structure """ if self._itemmap is not None: return obj = self._obj if is_dict_like(obj): self._itemmap = {key: self._next_level( val) for key, val in obj.items()} elif is_list_of_dict_like(obj) and self._list_of_dicts: self._itemmap = {i: self._next_level( val) for i, val in enumerate(obj)} elif isinstance(obj, basestring) and self._key_paths: obj = pathlib.Path(obj) if is_path_like(obj): if obj.is_file(): logger.debug("loading: {}".format(obj)) try: new_obj = parse(obj, **self._parser_kwargs) except Exception as err: if self._parse_errors: if sys.version_info.major > 2: # NB: without exec, # this raises a syntax error in python 2 cmnd = ('raise IOError("Parsing error for file: ' '{0}".format(obj)) from err') exec(cmnd, globals(), locals()) else: raise IOError("Parsing error for file: " "{0}\n{1}".format(obj, err)) else: logger.error("Parsing error for file: " "{0}: {1}".format(obj, err)) new_obj = None if is_dict_like(new_obj): self._itemmap = {key: self._next_level( val) for key, val in new_obj.items()} else: self._itemmap = {'non_dict': new_obj} if obj.is_dir(): new_obj = {} for subpath in obj.iterdir(): ignore_path = [fnmatch(subpath.name, regex) for regex in self._ignore_regexes] if not any(ignore_path): if parser_available(subpath): new_obj[subpath.name] = self._next_level(subpath) elif subpath.is_dir() and self._recurse: new_obj[subpath.name] = self._next_level(subpath) self._itemmap = new_obj if self._itemmap is None: raise ValueError('not an expandable object: {}'.format(obj)) self._tabmap = {self._sanitise( key): val for key, val in self._itemmap.items()} def __dir__(self): self._expand() dict_attrs = ['keys', 'items', 'values', 'to_dict', 'to_df', 'to_obj'] return dict_attrs + [name for name in self._tabmap] def __getattr__(self, attr): self._expand() if attr in self._tabmap: return self._tabmap[attr] # return super(LazyLoad,self).__getattr__(attr) raise AttributeError(attr) def __getitem__(self, items): if not isinstance(items, list): items = [items] obj = self for item in items: if not isinstance(obj, self.__class__): raise KeyError('{} (reached leaf node)'.format(item)) obj._expand() obj = obj._itemmap[item] return obj def __contains__(self, item): self._expand() return item in self._itemmap def __iter__(self): self._expand() for key in self._itemmap: yield key def __repr__(self): self._expand() start = ':..,'.join(sorted([str(_) for _ in self._itemmap])) end = ':..' if len(self._itemmap) > 0 else '' return '{' + start + end + '}' def __str__(self): return self.__repr__() def __gt__(self, other): if not hasattr(other, '__str__'): return NotImplemented return len(self.__str__()) > len(other.__str__()) def __eq__(self, other): if not hasattr(other, '__str__'): return NotImplemented return len(self.__str__()) == len(other.__str__()) def _sanitise(self, val): """sanitise tab names attributes aren't allowed to start with a number and replace non alphanumeric characters with _ """ try: int(str(val)[0]) val = 'i' + str(val) except Exception: pass val = re.sub('[^0-9a-zA-Z]+', '_', str(val)) val = 'u' + val if val.startswith('_') else val val = val + '_key' if val in [ 'keys', 'items', 'values', 'to_dict', 'to_df', 'to_obj'] else val return val
[docs] def keys(self): """ D.keys() -> iter of D's keys """ return self.__iter__()
[docs] def values(self): """ D.values() -> list of D's values """ self._expand() for val in self._itemmap.values(): yield val
[docs] def items(self): """ D.items() -> list of D's (key, value) pairs, as 2-tuples """ self._expand() for key, val in self._itemmap.items(): yield key, val
def _recurse_children(self, obj, root=None): root = {} if root is None else root if not hasattr(obj, 'items'): return obj else: return { root[key] if key in root else key: self._recurse_children(value, root) for key, value in obj.items()}
[docs] def to_obj(self): """ return the internal object """ return self._obj
[docs] def to_dict(self): """ return the (fully loaded) structure as a nested dictionary """ return self._recurse_children(self)
[docs] def to_df(self, **kwargs): """ return the (fully loaded) structure as a pandas.DataFrame """ import pandas as pd return pd.DataFrame(self._recurse_children(self), **kwargs)