fstools/__init__.py

368 lines
13 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
"""
fstools (Filesystem Tools)
==========================
**Author:**
* Dirk Alders <sudo-dirk@mount-mockery.de>
**Description:**
This module supports functions and classes to handle files and paths
**Submodules:**
* :func:`fstools.dirlist`
* :func:`fstools.filelist`
* :func:`fstools.is_writeable`
* :func:`fstools.mkdir`
* :func:`fstools.open_locked_blocking`
* :func:`fstools.open_locked_non_blocking`
* :func:`fstools.uid`
**Unittest:**
See also the :download:`unittest <fstools/_testresults_/unittest.pdf>` documentation.
**Module Documentation:**
"""
__DEPENDENCIES__ = []
import glob
import hashlib
import hmac
import logging
import os
from functools import partial
import sys
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
try:
from config import APP_NAME as ROOT_LOGGER_NAME
except ImportError:
ROOT_LOGGER_NAME = 'root'
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
try:
import fcntl
except ImportError:
logger.warning('Importing "fcntl" was not possible. Only i limited functionality of fstools is available.')
__DESCRIPTION__ = """The Module {\\tt %s} is designed to help on all issues with files and folders.
For more Information read the documentation.""" % __name__.replace('_', '\_')
"""The Module Description"""
__INTERPRETER__ = (3, )
"""The Tested Interpreter-Versions"""
def dirlist(path='.', expression='*', rekursive=True):
"""
Function returning a list of directories below a given path.
:param str path: folder which is the basepath for searching files.
:param str expression: expression to fit including shell-style wildcards. It is only used for the first directory level, if rekursive is set to True.
:param bool rekursive: search all subfolders if True.
:returns: list of filenames including the pathe
:rtype: list
.. note:: The returned filenames could be relative pathes depending on argument path.
**Example:**
.. literalinclude:: fstools/_examples_/dirlist.py
.. literalinclude:: fstools/_examples_/dirlist.log
"""
li = list()
if os.path.exists(path):
logger.debug('DIRLIST: path (%s) exists - looking for directories to append', path)
for dirname in glob.glob(os.path.join(path, expression)):
if os.path.isdir(dirname):
li.append(dirname)
if rekursive:
li.extend(dirlist(dirname))
else:
logger.warning('DIRLIST: path (%s) does not exist - empty filelist will be returned', path)
return li
def filelist(path='.', expression='*', rekursive=True):
"""
Function returning a list of files below a given path.
:param str path: folder which is the basepath for searching files.
:param str expression: expression to fit including shell-style wildcards.
:param bool rekursive: search all subfolders if True.
:returns: list of filenames including the pathe
:rtype: list
.. note:: The returned filenames could be relative pathes depending on argument path.
**Example:**
.. literalinclude:: fstools/_examples_/filelist.py
.. literalinclude:: fstools/_examples_/filelist.log
"""
li = list()
if os.path.exists(path):
logger.debug('FILELIST: path (%s) exists - looking for files to append', path)
for filename in glob.glob(os.path.join(path, expression)):
if os.path.isfile(filename):
li.append(filename)
for directory in os.listdir(path):
directory = os.path.join(path, directory)
if os.path.isdir(directory) and rekursive and not os.path.islink(directory):
li.extend(filelist(directory, expression))
else:
logger.warning('FILELIST: path (%s) does not exist - empty filelist will be returned', path)
return li
class FileObserver(FileSystemEventHandler):
OBSERVE_MODIFIED = 1
OBSERVE_CREATED = 2
OBSERVE_DELETED = 4
OBSERVE_ALL = OBSERVE_MODIFIED | OBSERVE_CREATED | OBSERVE_DELETED
def __init__(self):
super().__init__()
#
self.__callbacks__ = {}
#
self.observer = Observer()
self.observer.start()
def observe(self, filepath, when, callback, *args, **kwargs):
if filepath not in self.__callbacks__:
self.__callbacks__[filepath] = []
self.__callbacks__[filepath].append( (when, callback, args, kwargs) )
self.observer.schedule(self, path=os.path.dirname(filepath), recursive=True)
def __callback_logic__(self, etype, event):
if event.src_path in self.__callbacks__:
for when, cb, args, kwargs in self.__callbacks__[event.src_path]:
if when & etype != 0:
cb(event, *args, **kwargs)
# TODO: LOG
def on_modified(self, event):
self.__callback_logic__(self.OBSERVE_MODIFIED, event)
def on_created(self, event):
self.__callback_logic__(self.OBSERVE_CREATED, event)
def on_deleted(self, event):
self.__callback_logic__(self.OBSERVE_DELETED, event)
def stop(self):
self.observer.stop()
def join(self):
self.observer.join()
def is_writeable(path):
"""
Method to get the Information, if a file or folder is writable.
:param str path: file or folder to check.
:returns: Whether path is writable or not.
:rtype: bool
.. note:: If path does not exist, the return Value is :const:`False`.
**Example:**
.. literalinclude:: fstools/_examples_/is_writeable.py
.. literalinclude:: fstools/_examples_/is_writeable.log
"""
if os.access(path, os.W_OK):
# path is writable whatever it is, file or directory
return True
else:
# path is not writable whatever it is, file or directory
return False
def mkdir(path):
"""
Method to create a folder.
.. note:: All needed subfoilders will also be created (rekursive mkdir).
:param str path: folder to be created.
:returns: True, if folder exists after creation commands, otherwise False.
:rtype: bool
"""
path = os.path.abspath(path)
if not os.path.exists(os.path.dirname(path)):
mkdir(os.path.dirname(path))
if not os.path.exists(path):
os.mkdir(path)
return os.path.isdir(path)
def open_locked_blocking(*args, **kwargs):
"""
Method to get exclusive access to a file.
:param args: Arguments for a standard file open call.
:param kwargs: Keyword arguments for a standard file open call.
:returns: A file descriptor.
:rtype: file handle
.. note:: The call blocks until file is able to be used. This can cause a deadlock, if the file release es done after trying to open the file!
"""
locked_file_descriptor = open(*args, **kwargs)
fcntl.lockf(locked_file_descriptor, fcntl.LOCK_EX)
return locked_file_descriptor
def open_locked_non_blocking(*args, **kwargs):
"""
Method to get exclusive access to a file.
:param args: Arguments for a standard file open call.
:param kwargs: Keyword arguments for a standard file open call.
:raises: OSError, if the file is already blocked.
:returns: A file descriptor.
:rtype: file handle
.. note:: The call blocks until file is able to be used. This can cause a deadlock, if the file release es done after trying to open the file!
"""
locked_file_descriptor = open(*args, **kwargs)
fcntl.lockf(locked_file_descriptor, fcntl.LOCK_EX | fcntl.LOCK_NB)
return locked_file_descriptor
def uid(path, max_staleness=3600):
"""
Function returning a "unique" id for a given file or path.
:param str path: File or folder to generate a uid for.
:param int max_staleness: If a file or folder is older than that, we may consider
it stale and return a different uid - this is a
dirty trick to work around changes never being
detected. Default is 3600 seconds, use None to
disable this trickery. See below for more details.
:returns: An object that changes value if the file changed,
None is returned if there were problems accessing the file
or folder.
:rtype: str
.. warning:: Depending on the operating system capabilities and the way the
file update is done, this function might return the same value
even if the file has changed. It should be better than just
using file's mtime though.
max_staleness tries to avoid the worst for these cases.
.. note:: If this function is used for a path, it will stat all pathes and files rekursively.
**Example:**
.. literalinclude:: fstools/_examples_/uid.py
.. literalinclude:: fstools/_examples_/uid.log
Using just the file's mtime to determine if the file has changed is
not reliable - if file updates happen faster than the file system's
mtime granularity, then the modification is not detectable because
the mtime is still the same.
This function tries to improve by using not only the mtime, but also
other metadata values like file size and inode to improve reliability.
For the calculation of this value, we of course only want to use data
that we can get rather fast, thus we use file metadata, not file data
(file content).
"""
if os.path.isdir(path):
pathlist = dirlist(path) + filelist(path)
pathlist.sort()
else:
pathlist = [path]
uid = []
for element in pathlist:
try:
st = os.stat(element)
except (IOError, OSError):
uid.append(None) # for permanent errors on stat() this does not change, but
# having a changing value would be pointless because if we
# can't even stat the file, it is unlikely we can read it.
else:
fake_mtime = int(st.st_mtime)
if not st.st_ino and max_staleness:
# st_ino being 0 likely means that we run on a platform not
# supporting it (e.g. win32) - thus we likely need this dirty
# trick
now = int(time.time())
if now >= st.st_mtime + max_staleness:
# keep same fake_mtime for each max_staleness interval
fake_mtime = int(now / max_staleness) * max_staleness
uid.append((
st.st_mtime, # might have a rather rough granularity, e.g. 2s
# on FAT, 1s on ext3 and might not change on fast
# updates
st.st_ino, # inode number (will change if the update is done
# by e.g. renaming a temp file to the real file).
# not supported on win32 (0 ever)
st.st_size, # likely to change on many updates, but not
# sufficient alone
fake_mtime) # trick to workaround file system / platform
# limitations causing permanent trouble
)
if sys.version_info < (3, 0):
secret = ''
return hmac.new(secret, repr(uid), hashlib.sha1).hexdigest()
else:
secret = b''
return hmac.new(secret, bytes(repr(uid), 'latin-1'), hashlib.sha1).hexdigest()
def uid_filelist(path='.', expression='*', rekursive=True):
"""
Function returning a unique id for a given file or path.
:param str path: folder which is the basepath for searching files.
:param str expression: expression to fit including shell-style wildcards.
:param bool rekursive: search all subfolders if True.
:returns: An object that changes value if one of the files change.
:rtype: str
.. note:: This UID is created out of the file content. Therefore it is more
reliable then :func:`fstools.uid`, but also much slower.
**Example:**
.. literalinclude:: fstools/_examples_/uid_filelist.py
.. literalinclude:: fstools/_examples_/uid_filelist.log
"""
SHAhash = hashlib.md5()
#
fl = filelist(path, expression, rekursive)
fl.sort()
for f in fl:
if sys.version_info < (3, 0):
with open(f, 'rb') as fh:
SHAhash.update(hashlib.md5(fh.read()).hexdigest())
else:
with open(f, mode='rb') as fh:
d = hashlib.md5()
for buf in iter(partial(fh.read, 128), b''):
d.update(buf)
SHAhash.update(d.hexdigest().encode())
#
return SHAhash.hexdigest()