220 rivejä
5.6 KiB
Python
220 rivejä
5.6 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
"""
|
|
stringtools (Stringtools)
|
|
=========================
|
|
|
|
**Author:**
|
|
|
|
* Dirk Alders <sudo-dirk@mount-mockery.de>
|
|
|
|
**Description:**
|
|
|
|
This Module supports functionality around string operations.
|
|
|
|
**Submodules:**
|
|
|
|
* :mod:`stringtools.csp`
|
|
* :mod:`stringtools.stp`
|
|
* :func:`gzip_compress`
|
|
* :func:`gzip_extract`
|
|
* :func:`hexlify`
|
|
|
|
**Unittest:**
|
|
|
|
See also the :download:`unittest <stringtools/_testresults_/unittest.pdf>` documentation.
|
|
|
|
**Module Documentation:**
|
|
|
|
"""
|
|
|
|
from stringtools import stp
|
|
from stringtools import csp
|
|
__DEPENDENCIES__ = []
|
|
|
|
import fractions
|
|
import gzip
|
|
import logging
|
|
import time
|
|
import sys
|
|
if sys.version_info < (3, 0):
|
|
from cStringIO import StringIO
|
|
|
|
try:
|
|
from config import APP_NAME as ROOT_LOGGER_NAME
|
|
except ImportError:
|
|
ROOT_LOGGER_NAME = 'root'
|
|
logger = logging.getLogger(ROOT_LOGGER_NAME).getChild(__name__)
|
|
|
|
__DESCRIPTION__ = """The Module {\\tt %s} is designed to support functionality for strings (e.g. transfer strings via a bytestream, compressing, extracting, ...).
|
|
For more Information read the sphinx documentation.""" % __name__.replace('_', '\_')
|
|
"""The Module Description"""
|
|
__INTERPRETER__ = (2, 3)
|
|
"""The Tested Interpreter-Versions"""
|
|
|
|
__all__ = ['gzip_compress',
|
|
'gzip_extract',
|
|
'hexlify',
|
|
'csp',
|
|
'stp']
|
|
|
|
|
|
def physical_value_repr(value, unit=''):
|
|
prefix = {
|
|
-4: 'p',
|
|
-3: 'n',
|
|
-2: 'u',
|
|
-1: 'm',
|
|
0: '',
|
|
1: 'k',
|
|
2: 'M',
|
|
3: 'G',
|
|
4: 'T',
|
|
5: 'P',
|
|
}
|
|
u = 0
|
|
while u > -4 and u < 5 and (value >= 1000. or value < 1.) and value != 0:
|
|
if value >= 1:
|
|
u += 1
|
|
value /= 1000.
|
|
else:
|
|
u -= 1
|
|
value *= 1000.
|
|
if u == 0:
|
|
ext = ''
|
|
else:
|
|
ext = prefix[u]
|
|
#
|
|
if value < 100.:
|
|
value = '%.2f' % (value)
|
|
else:
|
|
value = '%.1f' % (value)
|
|
while value.find('.') > -1 and (value.endswith('0') or value.endswith('.')):
|
|
value = value[:-1]
|
|
return value + ext + unit
|
|
|
|
|
|
def time_repr(seconds):
|
|
days = seconds / (24 * 60 * 60)
|
|
seconds = seconds % (24 * 60 * 60)
|
|
if seconds >= 60 * 60:
|
|
rv = time.strftime('%H:%M:%S', time.gmtime(seconds))
|
|
else:
|
|
rv = time.strftime('%M:%S', time.gmtime(seconds))
|
|
if days >= 1:
|
|
rv = '%dD %s' % (days, rv)
|
|
if rv.endswith(' 00:00'):
|
|
rv = rv[:-6]
|
|
return rv
|
|
|
|
|
|
def frac_repr(value):
|
|
f = fractions.Fraction(value).limit_denominator()
|
|
return '%s/%s' % (f.numerator, f.denominator)
|
|
|
|
|
|
def gzip_compress(s, compresslevel=9):
|
|
"""
|
|
Method to compress a stream of bytes.
|
|
|
|
:param str s: The bytestream (string) to be compressed
|
|
:param int compresslevel: An optional compressionn level (default is 9)
|
|
:return: The compressed bytestream
|
|
:rtype: str
|
|
|
|
**Example:**
|
|
|
|
.. literalinclude:: stringtools/_examples_/gzip_compress.py
|
|
|
|
Will result to the following output:
|
|
|
|
.. literalinclude:: stringtools/_examples_/gzip_compress.log
|
|
"""
|
|
rv = None
|
|
t = time.time()
|
|
if sys.version_info >= (3, 0):
|
|
rv = gzip.compress(s, compresslevel)
|
|
else:
|
|
buf = StringIO()
|
|
f = gzip.GzipFile(mode='wb', compresslevel=compresslevel, fileobj=buf)
|
|
try:
|
|
f.write(s)
|
|
finally:
|
|
f.close()
|
|
rv = buf.getvalue()
|
|
buf.close()
|
|
if rv is not None:
|
|
logger.debug('GZIP: Finished to compress a string (compression_rate=%.3f, consumed_time=%.1fs).', len(rv) / float(len(s)), time.time() - t)
|
|
return rv
|
|
|
|
|
|
def gzip_extract(s):
|
|
"""
|
|
Method to extract data from a compress stream of bytes.
|
|
|
|
:param str s: The compressed bytestream (string) to be extracted
|
|
:return: The extracted data
|
|
:rtype: str
|
|
|
|
**Example:**
|
|
|
|
.. literalinclude:: stringtools/_examples_/gzip_extract.py
|
|
|
|
Will result to the following output:
|
|
|
|
.. literalinclude:: stringtools/_examples_/gzip_extract.log
|
|
"""
|
|
t = time.time()
|
|
rv = None
|
|
if sys.version_info >= (3, 0):
|
|
rv = gzip.decompress(s)
|
|
else:
|
|
inbuffer = StringIO(s)
|
|
f = gzip.GzipFile(mode='rb', fileobj=inbuffer)
|
|
try:
|
|
rv = f.read()
|
|
finally:
|
|
f.close()
|
|
inbuffer.close()
|
|
if rv is not None:
|
|
logger.debug('GZIP: Finished to extract a string (compression_rate=%.3f, consumed_time=%.1fs).', len(s) / float(len(rv)), time.time() - t)
|
|
return rv
|
|
|
|
|
|
def hexlify(s):
|
|
"""Method to hexlify a string.
|
|
|
|
:param str s: A string including the bytes to be hexlified.
|
|
:returns: The hexlified string
|
|
:rtype: str
|
|
|
|
**Example:**
|
|
|
|
.. literalinclude:: stringtools/_examples_/hexlify.py
|
|
|
|
Will result to the following output:
|
|
|
|
.. literalinclude:: stringtools/_examples_/hexlify.log
|
|
"""
|
|
rv = '(%d):' % len(s)
|
|
for byte in s:
|
|
if sys.version_info >= (3, 0):
|
|
rv += ' %02x' % byte
|
|
else:
|
|
rv += ' %02x' % ord(byte)
|
|
return rv
|
|
|
|
|
|
def linefeed_filter(s):
|
|
"""Method to change linefeed and carriage return to '\\\\n' and '\\\\r'
|
|
|
|
:param str s: A string including carriage return and/ or linefeed.
|
|
:returns: A string with converted carriage return and/ or linefeed.
|
|
:rtype: str
|
|
"""
|
|
if sys.version_info >= (3, 0):
|
|
return s.replace(b'\r', b'\\r').replace(b'\n', b'\\n')
|
|
else:
|
|
return s.replace('\r', '\\r').replace('\n', '\\n')
|