import contextlib
import os
import sys
import tempfile
from fnmatch import fnmatch
from functools import total_ordering
import collections
# python 2/3 compatibility
try:
basestring
except NameError:
basestring = str
try:
import pathlib
except ImportError:
import pathlib2 as pathlib
try:
unicode
except NameError:
unicode = str
# TODO handling bytes/encoding (py 2 and 3)
class _OpenRead(object):
def __init__(self, linelist, encoding=None, usebytes=False):
self._linelist = linelist
self._current_indx = 0
self._current_line = 0
self._encoding = encoding
self._bytes = usebytes
def read(self, size=None):
out = '\n'.join(self._linelist)[self._current_indx:]
if size is not None:
self._current_indx += size
out = out[0:self._current_indx]
if self._encoding is not None:
out = out.encode(self._encoding)
return out
def readline(self):
if self._current_line >= len(self._linelist):
line = ''
else:
line = self._linelist[self._current_line] + '\n'
self._current_line += 1
self._current_indx += len(
'\n'.join(self._linelist[0:self._current_line]))
if self._encoding is not None:
line = line.encode(self._encoding)
return line
def readlines(self):
self._current_indx = len('\n'.join(self._linelist))
self._current_line = len(self._linelist)
if self._encoding is not None and self:
return [line.encode(self._encoding) for line in self._linelist]
else:
return self._linelist[:]
def __iter__(self):
for line in self._linelist:
if self._encoding is not None:
line = line.encode(self._encoding)
yield line
# TODO handling bytes/encoding (py 2 and 3)
class _OpenWrite(object):
def __init__(self, usebytes=False):
self._str = ''
self._bytes = usebytes
def write(self, instr):
if hasattr(instr, "decode"):
instr = instr.decode()
self._str += instr
def writelines(self, lines):
for instr in lines:
if hasattr(instr, "decode"):
instr = instr.decode()
self.write(instr)
[docs]@total_ordering
class MockPath(object):
r"""a mock path, mimicking pathlib.Path,
supporting context open method for read/write
Parameters
----------
path : str
the path string
is_file : bool
if True is file, else folder
content : str
content of the file
structure:
structure of the directory
Examples
--------
>>> file_obj = MockPath("path/to/test.txt",is_file=True,
... content="line1\nline2\nline3")
...
>>> file_obj
MockFile("path/to/test.txt")
>>> file_obj.name
'test.txt'
>>> file_obj.parent
MockFolder("path/to")
>>> print(str(file_obj))
path/to/test.txt
>>> print(file_obj.to_string())
File("test.txt") Contents:
line1
line2
line3
>>> file_obj.is_file()
True
>>> file_obj.is_dir()
False
>>> with file_obj.open('r') as f:
... print(f.readline().strip())
line1
>>> with file_obj.open('w') as f:
... f.write('newline1\nnewline2')
>>> print(file_obj.to_string())
File("test.txt") Contents:
newline1
newline2
>>> with file_obj.maketemp() as temp:
... with temp.open() as f:
... print(f.readline().strip())
newline1
>>> dir_obj = MockPath(
... structure=[{'dir1':[{'subdir':[file_obj.copy_path_obj()]},file_obj.copy_path_obj()]},
... {'dir2':[file_obj.copy_path_obj()]},file_obj.copy_path_obj()]
... )
>>> dir_obj
MockFolder("root")
>>> dir_obj.name
'root'
>>> dir_obj.is_file()
False
>>> dir_obj.is_dir()
True
>>> print(dir_obj.to_string())
Folder("root")
Folder("dir1")
Folder("subdir")
File("test.txt")
File("test.txt")
Folder("dir2")
File("test.txt")
File("test.txt")
>>> "dir1/test.txt" in dir_obj
True
>>> dir_obj["dir1/test.txt"]
MockFile("root/dir1/test.txt")
>>> list(dir_obj.iterdir())
[MockFolder("root/dir1"), MockFolder("root/dir2"), MockFile("root/test.txt")]
>>> list(dir_obj.glob("*"))
[MockFolder("root/dir1"), MockFolder("root/dir2"), MockFile("root/test.txt")]
>>> list(dir_obj.glob("dir1/*"))
[MockFolder("root/dir1/subdir"), MockFile("root/dir1/test.txt")]
>>> list(dir_obj.glob("**"))
[MockFolder("root/dir1"), MockFolder("root/dir1/subdir"), MockFolder("root/dir2")]
>>> list(dir_obj.glob("**/*"))
[MockFolder("root/dir1"), MockFolder("root/dir1/subdir"), MockFile("root/dir1/subdir/test.txt"), MockFile("root/dir1/test.txt"), MockFolder("root/dir2"), MockFile("root/dir2/test.txt"), MockFile("root/test.txt")]
# >>> list(dir_obj.glob("\*\*/dir1"))
# [MockFolder("root/dir1")]
>>> new = dir_obj.joinpath('dir3')
>>> new.mkdir()
>>> list(dir_obj.iterdir())
[MockFolder("root/dir1"), MockFolder("root/dir2"), MockFolder("root/dir3"), MockFile("root/test.txt")]
>>> dir_obj.joinpath("test.txt").unlink()
>>> list(dir_obj.iterdir())
[MockFolder("root/dir1"), MockFolder("root/dir2"), MockFolder("root/dir3")]
>>> dir_obj.joinpath("dir3").rmdir()
>>> list(dir_obj.iterdir())
[MockFolder("root/dir1"), MockFolder("root/dir2")]
>>> print(dir_obj.to_string())
Folder("root")
Folder("dir1")
Folder("subdir")
File("test.txt")
File("test.txt")
Folder("dir2")
File("test.txt")
>>> dir_obj.joinpath("dir1/subdir")
MockFolder("root/dir1/subdir")
>>> dir_obj.joinpath("dir1", "subdir")
MockFolder("root/dir1/subdir")
>>> new = dir_obj.joinpath("dir1/subdir/other")
>>> new
MockVirtualPath("root/dir1/subdir/other")
>>> new.touch()
>>> new
MockFile("root/dir1/subdir/other")
>>> new.unlink()
>>> new
MockVirtualPath("root/dir1/subdir/other")
>>> new.mkdir()
>>> new
MockFolder("root/dir1/subdir/other")
>>> newfile = MockPath('newfile.txt', is_file=True)
>>> new.copy_from(newfile)
>>> print(new.to_string())
Folder("other")
File("newfile.txt")
>>> file_obj = MockPath("newfile2.txt",is_file=True, content='test')
>>> file_obj.copy_to(new)
>>> print(new.to_string())
Folder("other")
File("newfile.txt")
File("newfile2.txt")
>>> file_obj.name = "newfile3.txt"
>>> with file_obj.maketemp() as temp:
... new.copy_from(temp)
>>> print(new.to_string())
Folder("other")
File("newfile.txt")
File("newfile2.txt")
File("newfile3.txt")
>>> print(new.copy_path_obj().to_string())
Folder("other")
File("newfile.txt")
File("newfile2.txt")
File("newfile3.txt")
>>> with new.maketemp(getoutput=True) as tempdir:
... tempdir.joinpath("new").mkdir()
... tempdir.joinpath("new/file.txt").touch()
>>> print(new.to_string())
Folder("other")
Folder("new")
File("file.txt")
File("newfile.txt")
File("newfile2.txt")
File("newfile3.txt")
""" # noqa: E501
def __init__(self, path='root',
is_file=False, exists=True,
structure=(), content='', parent=None):
self._path = path
self._name = os.path.basename(path)
self._exists = exists
self._is_file = is_file
self._is_dir = not is_file
self._content = content.splitlines()
self._parent = parent
self._children = []
paths = self._splitall(path)
if len(paths) > 1 and parent is None:
self.parent = MockPath(os.path.join(*paths[:-1]))
for subobj in structure:
if hasattr(subobj, 'keys'):
key = list(subobj.keys())[0]
self.add_child(MockPath(os.path.join(self._path, key),
structure=subobj[key], parent=self))
elif isinstance(subobj, MockPath):
if subobj._parent is not None:
raise ValueError("attempting to add a child which already "
"has a parent: {}".format(subobj))
self.add_child(subobj)
else:
raise ValueError(
'items must be dict_like or MockPath: {}'.format(subobj))
def _get_path(self):
return self._path
path = property(_get_path)
def _get_parent(self):
if self._parent is None:
path = MockPath('subroot')
path._children = [self]
return path
else:
return self._parent
def _set_parent(self, parent):
if parent is None:
self._parent = None
parentpath = ''
else:
parentpath = parent.path
self._parent = parent
self._path = os.path.join(parentpath, self.name)
for child in self.children:
child.parent = self
parent = property(_get_parent, _set_parent)
def _get_name(self):
return self._name
def _set_name(self, name):
self._name = name
if self._parent is not None:
self._path = os.path.join(self.parent.path, name)
name = property(_get_name, _set_name)
def _get_children(self):
return self._children[:]
children = property(_get_children)
def _get_content(self):
return self._content[:]
file_content = property(_get_content)
def __getitem__(self, name):
"""
Parameters
----------
name: str or list of strings
Returns
-------
"""
next = []
if isinstance(name, basestring):
name = self._splitall(name)
if isinstance(name, list):
if len(name) == 1:
name = name[0]
else:
next = name[1:]
name = name[0]
if isinstance(name, basestring):
for child in self.iterdir():
if child.name == name:
if next:
return child[next]
else:
return child
raise KeyError("no name: {}".format(name))
else:
raise ValueError("name not a list or str: {}".format(name))
def __contains__(self, name):
"""
Parameters
----------
name: str or list of strings
Returns
-------
"""
try:
self[name]
return True
except KeyError:
return False
[docs] def add_child(self, child):
# TODO could allow same name if one is file and one is dir?
if child.name in [c.name for c in self._children]:
raise IOError(
"child with this name already exists: {}".format(child.name))
child.parent = self
self._children.append(child)
# TODO need to implement relative naming and switchin to/from
[docs] def absolute(self):
return self
# TODO should return a new mock path rather than a str,
# need to implement relative naming and switchin to/from
[docs] def relative_to(self, other):
return os.path.relpath(self._path, other._path)
[docs] def rename(self, target):
if hasattr(target, "name"):
name = target.name
elif isinstance(target, basestring):
name = target
else:
raise ValueError(
"target must be a string or have a name attribute")
if not self.exists():
raise IOError("path doesn't exist: {}".format(self))
self.name = name
self._path = os.path.join(os.path.dirname(self._path), name)
def _recurse_structure(self):
structure = []
for child in self.children:
if not child.exists():
continue
if child.is_file():
structure.append(child.copy_path_obj())
else:
structure.append(
{child.name: [c.copy_path_obj() for c in child.children]})
return structure
[docs] def copy_path_obj(self):
"""copy mock path (removing path and parent)"""
if self.is_file():
return MockPath(path=self.name, is_file=True,
exists=self.exists(), structure=[],
content="\n".join(self._content), parent=None)
else:
structure = self._recurse_structure()
return MockPath(path=self.name, is_file=False,
exists=self.exists(), parent=None,
structure=structure)
[docs] def is_file(self):
return self._is_file
[docs] def is_dir(self):
return self._is_dir
[docs] def exists(self):
return self._exists
[docs] def samefile(self, other):
return self == other
def _splitall(self, path):
if isinstance(path, MockPath):
path = os.path.relpath(str(path), str(self))
if not isinstance(path, basestring):
raise ValueError(
"path is not a string or MockPath: {}".format(path))
allparts = []
while 1:
parts = os.path.split(path)
if parts[0] == path: # sentinel for absolute paths
allparts.insert(0, parts[0])
break
elif parts[1] == path: # sentinel for relative paths
allparts.insert(0, parts[1])
break
else:
path = parts[0]
allparts.insert(0, parts[1])
return allparts
def _flatten(self, l):
for el in l:
if (isinstance(el, collections.Iterable)
and not isinstance(el, basestring)):
for sub in self._flatten(el):
yield sub
else:
yield el
[docs] def joinpath(self, *paths):
parts = list(self._flatten([self._splitall(path) for path in paths]))
if not parts:
raise IOError("must be at least one path")
elif len(parts) == 1:
if parts[0] == '' or parts[0] == '.':
return self
for child in self._children:
if child.name == parts[0]:
return child
# does not yet exist,
# must use touch or mkdir to convert to file or folder
new = MockPath(path=os.path.join(
self._path, parts[0]), exists=False, parent=self)
self.add_child(new)
return new
else:
for child in self._children:
if child.name == parts[0]:
return child.joinpath(*parts[1:])
new = MockPath(path=os.path.join(
self._path, parts[0]), exists=False, parent=self)
self.add_child(new)
return new.joinpath(*parts[1:])
# TODO add mode=0o777, exist_ok=False
[docs] def mkdir(self, parents=False):
"""
Parameters
----------
mode
parents: bool
If True, any missing parents of this path are created as needed
If False, a missing parent raises FileNotFoundError.
Returns
-------
"""
if self.parent is not None:
if not self.parent.exists():
if not parents:
raise FileNotFoundError(
"the parent must exist for {}".format(self))
else:
self.parent.mkdir(parents=parents) # mode=0o777,
if not self._exists:
self._is_file = False
self._is_dir = True
self._exists = True
# TODO store stat attributes and apply them to mktemp
[docs] def stat(self):
""" Retrieve information about a file
Parameters
----------
path: str
Returns
-------
attr: object
see os.stat, includes st_mode, st_size, st_uid, st_gid, st_atime,
and st_mtime attributes
"""
class MockStat(object):
# at present just returning a typical file result
def __init__(self):
self.st_mode = 33188
self.st_ino = 74204932
self.st_dev = 16777220
self.st_nlink = 1
self.st_uid = 634541
self.st_gid = 1335817362
self.st_size = 10410
self.st_atime = 1504518028
self.st_mtime = 1504518028
self.st_ctime = 1504518028
def __repr__(self):
return (
"MockStatResult(st_mode=33188, st_ino=74204932, "
"st_dev=16777220, st_nlink=1, st_uid=634541, "
"st_gid=1335817362, st_size=10410, st_atime=1504518028, "
"st_mtime=1504518028, st_ctime=1504518028)")
return MockStat()
[docs] def chmod(self, mode):
""" Change the mode (permissions) of a file
Parameters
----------
path: str
mode: int
new permissions (see os.chmod)
Examples
--------
To make a file executable
cur_mode = folder.stat("exec.sh").st_mode
folder.chmod("exec.sh", cur_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH )
""" # noqa: E501
pass
[docs] def touch(self):
if self.parent is not None:
if not self.parent.exists():
raise IOError("the parent must exist")
if not self._exists:
self._is_file = True
self._is_dir = False
self._exists = True
[docs] def unlink(self):
if not self._is_file:
raise IOError("path is not a file")
self._exists = False
[docs] def rmdir(self):
if not self._is_dir:
raise IOError("path is not a directory")
if list(self.iterdir()):
raise IOError("path is not empty: {}".format(list(self.iterdir())))
self._exists = False
[docs] def iterdir(self):
for subobj in sorted(self.children):
if subobj.exists():
yield subobj
# TODO if glob("**/stop") should stop recursing once found
[docs] def glob(self, regex, recurse=False, toplevel=True):
"""
Parameters
----------
regex: str
the path regex, with * to match 0 or more (non-recursive) paths
and ** to match zero or more (recursive) directories
recurse: bool
Yields
-------
path: MockPath
"""
if not regex and toplevel:
raise ValueError("Unacceptable pattern: ''")
parts = self._splitall(regex)
if parts[0] == "**":
recurse = True
parts = [""] if len(parts) == 1 else parts[1:]
if len(parts) == 1:
for subobj in sorted(self.iterdir()):
if subobj.is_dir() and recurse:
yield subobj
for path in subobj.glob(parts[0],
recurse=recurse, toplevel=False):
yield path
elif fnmatch(subobj.name, parts[0]):
yield subobj
else:
for subobj in sorted(self.iterdir()):
if subobj.is_dir() and recurse:
yield subobj
for path in subobj.glob(os.path.join(*parts[1:]),
recurse=recurse, toplevel=False):
yield path
elif fnmatch(subobj.name, parts[0]):
for path in subobj.glob(os.path.join(*parts[1:]),
recurse=recurse, toplevel=False):
yield path
[docs] @contextlib.contextmanager
def maketemp(self, getoutput=False, dir=None):
"""make a named temporary file or folder containing the path contents
Parameters
----------
getoutput: bool
if True, (on exit) new paths will be read/added to the path
dir: None or str
directory to place temp in (see tempfile.mkstemp)
Yields
------
temppath: path.Path
path to temporary
"""
if self.is_file():
filetemp = tempfile.NamedTemporaryFile(
mode='w+', delete=False, dir=dir)
try:
filetemp.write('\n'.join(self._content))
filetemp.close()
dirpath = os.path.join(
os.path.dirname(filetemp.name), self.name)
os.rename(filetemp.name, dirpath)
yield pathlib.Path(dirpath)
finally:
try:
if getoutput:
raise NotImplementedError
finally:
os.remove(dirpath)
else:
temppath = pathlib.Path(tempfile.mkdtemp(dir=dir))
dirpath = os.path.join(os.path.dirname(str(temppath)), self.name)
os.rename(str(temppath), dirpath)
temppath = pathlib.Path(dirpath)
try:
self.copy_to(temppath.parent)
yield pathlib.Path(dirpath)
finally:
temppath = pathlib.Path(dirpath)
try:
if getoutput:
self._iter_temp(self, temppath, overwrite=False)
finally:
for subpath in temppath.glob("**/*"):
if subpath.is_file():
subpath.unlink()
for subpath in reversed(list(temppath.glob("**"))):
subpath.rmdir()
if temppath.exists():
temppath.rmdir()
def _iter_temp(self, mock, temp, overwrite=False):
if not mock.name == temp.name:
raise ValueError(
"mock name and temp name different: "
"{0}, {1}".format(mock.name, temp.name))
child_names = {c.name: c for c in mock.children}
for subpath in temp.iterdir():
if subpath.is_file():
if subpath.name in child_names and not overwrite:
pass
else:
with subpath.open() as f:
newpath = MockPath(
subpath.name, is_file=True,
content=f.read(), exists=True)
mock.add_child(newpath)
else:
if subpath.name not in child_names:
newpath = MockPath(subpath.name, exists=True)
mock.add_child(newpath)
else:
newpath = child_names[subpath.name]
self._iter_temp(newpath, subpath, overwrite=overwrite)
[docs] def copy_to(self, target):
""" copy from a mock path to a target
Parameters
----------
target: str, pathlib.Path or MockPath
Returns
-------
"""
if not self.exists():
raise IOError("this path does not exist")
if isinstance(target, basestring):
target = pathlib.Path(target)
if isinstance(target, pathlib.Path):
if not target.is_dir():
raise IOError("target is not a directory")
ignore = len(self.path) - len(self.name)
for path in self.glob("**/*"):
newpath = target.joinpath(str(path)[ignore:])
if path.is_dir():
if not newpath.exists():
newpath.mkdir()
else:
if newpath.exists():
raise IOError(
"file already exists: {}".format(newpath))
else:
newpath.touch()
with newpath.open('w') as f:
if sys.version_info.major > 2:
f.write("\n".join(path._content))
else:
f.write(unicode('\n'.join(path._content)))
elif isinstance(target, MockPath):
if not target.is_dir():
raise IOError("target is not a directory")
newpath = self.copy_path_obj()
return target.add_child(newpath)
else:
raise ValueError(
"target is not str, "
"pathlib.Path or MockPath: {}".format(target))
[docs] def copy_from(self, source):
""" copy from a source to a mock directory
Parameters
----------
source: str or pathlib.Path or MockPath
any file like object or path to a file
Returns
-------
"""
if not self.is_dir() or not self.exists():
raise IOError("this path is not an existing directory")
if isinstance(source, basestring):
source = pathlib.Path(source)
if not source.exists():
raise IOError("source does not exist: {}".format(source))
if isinstance(source, MockPath):
if not source.exists():
raise IOError("source does not exist: {}".format(source))
else:
newpath = source.copy_path_obj()
self.add_child(newpath)
elif isinstance(source, pathlib.Path):
if not source.exists():
raise IOError("source does not exist: {}".format(source))
elif source.is_file():
with source.open() as f:
content = f.read()
newfile = MockPath(source.name, is_file=True,
parent=self, content=content)
self.add_child(newfile)
else:
raise NotImplementedError
else:
raise ValueError(
"source is not str, "
"pathlib.Path or MockPath: {}".format(source))
[docs] @contextlib.contextmanager
def open(self, mode='r', encoding=None):
""" context manager for opening a file
Parameters
----------
mode: str
encoding: None or str
Returns
-------
"""
if self.is_dir():
raise IOError('[Errno 21] Is a directory: {}'.format(self.path))
if 'r' in mode:
obj = _OpenRead(self._content, encoding, "b" in mode)
yield obj
elif 'w' in mode:
obj = _OpenWrite("b" in mode)
yield obj
self._content = obj._str.splitlines()
else:
raise ValueError('readwrite should contain r or w')
def __gt__(self, other):
if not hasattr(other, 'name'):
return NotImplemented
return self.path > other.path
def __eq__(self, other):
if not hasattr(other, 'name'):
return NotImplemented
return self.path == other.path
def _recurse_print(self, obj, text='',
indent=0, indentlvl=2, file_content=False):
indent += indentlvl
for subobj in sorted(obj):
if not subobj.exists():
continue
if subobj.is_dir():
text += ' ' * indent + \
'{0}("{1}")\n'.format(self._folderstr, subobj.name)
text += self._recurse_print(
subobj.iterdir(), indent=indent, file_content=file_content)
else:
if file_content:
contents = ['{0}("{1}") Contents:'.format(
self._filestr, subobj.name)]
contents.extend(subobj._content)
sep = '\n' + ' ' * (indent + 1)
text += ' ' * indent + sep.join(contents) + '\n'
else:
text += ' ' * indent + \
'{0}("{1}")\n'.format(self._filestr, subobj.name)
return text
[docs] def to_string(self, indentlvl=2, file_content=False, color=False):
"""convert to string """
if color:
self._folderstr = colortxt('Folder', 'green')
self._filestr = colortxt('File', 'blue')
else:
self._folderstr = 'Folder'
self._filestr = 'File'
if self.is_file():
contents = [
'{0}("{1}") Contents:'.format(self._filestr, self.name)]
contents.extend(self._content)
return '\n'.join(contents)
elif self.is_dir():
text = '{0}("{1}")\n'.format(self._folderstr, self.name)
text += self._recurse_print(self.iterdir(), indentlvl=indentlvl,
file_content=file_content)
text = text[0:-1] if text.endswith('\n') else text
return text
else:
return 'MockPath({})'.format(self.name)
def __str__(self):
return self._path # self.__repr__()
def __repr__(self):
if not self.exists():
return 'MockVirtualPath("{}")'.format(self._path)
elif self.is_dir():
return 'MockFolder("{}")'.format(self._path)
elif self.is_file():
return 'MockFile("{}")'.format(self._path)
else:
return 'MockPath("{}")'.format(self._path)
_ATTRIBUTES = dict(
list(zip([
'bold',
'dark',
'',
'underline',
'blink',
'',
'reverse',
'concealed'
],
list(range(1, 9))
))
)
del _ATTRIBUTES['']
_HIGHLIGHTS = dict(
list(zip([
'on_grey',
'on_red',
'on_green',
'on_yellow',
'on_blue',
'on_magenta',
'on_cyan',
'on_white'
],
list(range(40, 48))
))
)
_COLORS = dict(
list(zip([
'grey',
'red',
'green',
'yellow',
'blue',
'magenta',
'cyan',
'white',
],
list(range(30, 38))
))
)
[docs]def colortxt(text, color=None, on_color=None, attrs=None):
"""Colorize text.
Available text colors:
red, green, yellow, blue, magenta, cyan, white.
Available text highlights:
on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white.
Available attributes:
bold, dark, underline, blink, reverse, concealed.
Examples
--------
>>> txt = colortxt('Hello, World!', 'red', 'on_grey', ['bold'])
>>> print(txt)
\x1b[1m\x1b[40m\x1b[31mHello, World!\x1b[0m
"""
_RESET = '\033[0m'
__ISON = True
if __ISON and os.getenv('ANSI_COLORS_DISABLED') is None:
fmt_str = '\033[%dm%s'
if color is not None:
text = fmt_str % (_COLORS[color], text)
if on_color is not None:
text = fmt_str % (_HIGHLIGHTS[on_color], text)
if attrs is not None:
for attr in attrs:
text = fmt_str % (_ATTRIBUTES[attr], text)
text += _RESET
return text