from glob import glob
import logging
import os
import numpy as np
from pymchelper.detector import Detector, average_with_nan, ErrorEstimate
from pymchelper.readers.common import guess_reader, group_input_files
from pymchelper.writers.common import Converters
logger = logging.getLogger(__name__)
[docs]def fromfile(filename):
"""Read a detector data from a binary file ```filename```"""
reader = guess_reader(filename)
detector = Detector()
detector.counter = 1
reader.read(detector)
detector.error_raw = np.zeros_like(detector.data_raw)
detector.error_raw *= np.nan
return detector
[docs]def fromfilelist(input_file_list, error, nan):
"""
:param input_file_list:
:param error:
:param nan:
:return:
"""
if not isinstance(input_file_list, list): # probably a string instead of list
input_file_list = [input_file_list]
if nan:
detector_list = [fromfile(filename) for filename in input_file_list]
result = average_with_nan(detector_list, error)
elif len(input_file_list) == 1:
result = fromfile(input_file_list[0])
else:
result = fromfile(input_file_list[0])
# allocate memory for accumulator in standard deviation calculation
# not needed if user requested not to include errors
if error != ErrorEstimate.none:
m2 = np.zeros_like(result.data_raw)
# loop over all files
for n, filename in enumerate(input_file_list[1:], start=2):
x = fromfile(filename).data_raw
# Running variance algorithm based on algorithm by B. P. Welford,
# presented in Donald Knuth's Art of Computer Programming, Vol 2, page 232, 3rd edition.
# Can be found here: http://www.johndcook.com/blog/standard_deviation/
# and https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Online_algorithm
delta = x - result.data_raw # delta = x - mean
result.data_raw += delta / n # mean += delta / n
if error != ErrorEstimate.none:
m2 += delta * (x - result.data_raw) # M2 += delta * (x - mean)
# unbiased sample variance is stored in `__M2 / (n - 1)`
# unbiased sample standard deviation in classical algorithm is calculated as (sqrt(1/(n-1)sum(x-<x>)**2)
# here it is calculated as square root of unbiased sample variance:
if len(input_file_list) > 1 and error != ErrorEstimate.none:
result.error_raw = np.sqrt(m2 / (len(input_file_list) - 1))
# if user requested standard error then we calculate it as:
# S = stderr = stddev / sqrt(N), or in other words,
# S = s/sqrt(N) where S is the corrected standard deviation of the mean.
if len(input_file_list) > 1 and error == ErrorEstimate.stderr:
result.error_raw /= np.sqrt(len(input_file_list)) # np.sqrt() always returns np.float64
result.counter = len(input_file_list)
core_names_dict = group_input_files(input_file_list)
if len(core_names_dict) == 1:
result.corename = list(core_names_dict)[0]
return result
[docs]def frompattern(pattern, error, nan, jobs=-1, verbose=0):
"""
:param pattern:
:param error:
:param nan:
:param jobs:
:param verbose:
:return:
"""
list_of_matching_files = glob(pattern)
core_names_dict = group_input_files(list_of_matching_files)
def _get_detector(filelist, error, nan):
detector = fromfilelist(filelist, error, nan)
return detector
# parallel execution of output file generation, using all CPU cores
# see http://pythonhosted.org/joblib
try:
from joblib import Parallel, delayed
logger.info("Parallel processing on {:d} jobs (-1 means all)".format(jobs))
# options.verbose count the number of `-v` switches provided by user
# joblib Parallel class expects the verbosity as a larger number (i.e. multiple of 10)
worker = Parallel(n_jobs=jobs, verbose=verbose * 10)
result = worker(
delayed(_get_detector)(filelist, error, nan)
for core_name, filelist in core_names_dict.items()
)
except (ImportError, SyntaxError):
# single-cpu implementation, in case joblib library fails (i.e. Python 3.2)
logger.info("Single CPU processing")
result = [_get_detector(core_name, filelist)
for core_name, filelist in core_names_dict.items()]
return result
[docs]def convertfromlist(filelist, error, nan, outputdir, converter_name, options, outputfile=None):
"""
:param filelist:
:param error:
:param nan:
:param outputdir:
:param converter_name:
:param options:
:param outputfile:
:return:
"""
detector = fromfilelist(filelist, error, nan)
if outputfile is not None:
output_path = outputfile
elif outputdir is None:
output_path = detector.corename
else:
output_path = os.path.join(outputdir, detector.corename)
tofile(detector, output_path, converter_name, options)
return None
[docs]def convertfrompattern(pattern, outputdir, converter_name, options,
error=ErrorEstimate.stderr, nan=True, jobs=-1, verbose=0):
"""
:param pattern:
:param outputdir:
:param converter_name:
:param options:
:param error:
:param nan:
:param jobs:
:param verbose:
:return:
"""
list_of_matching_files = glob(pattern)
core_names_dict = group_input_files(list_of_matching_files)
# parallel execution of output file generation, using all CPU cores
# see http://pythonhosted.org/joblib
try:
from joblib import Parallel, delayed
logger.info("Parallel processing on {:d} jobs (-1 means all)".format(jobs))
# options.verbose count the number of `-v` switches provided by user
# joblib Parallel class expects the verbosity as a larger number (i.e. multiple of 10)
worker = Parallel(n_jobs=jobs, verbose=verbose * 10)
worker(
delayed(convertfromlist)(filelist, error, nan, outputdir, converter_name, options)
for core_name, filelist in core_names_dict.items()
)
except (ImportError, SyntaxError):
# single-cpu implementation, in case joblib library fails (i.e. Python 3.2)
logger.info("Single CPU processing")
for core_name, filelist in core_names_dict.items():
convertfromlist(filelist, error, nan, outputdir, converter_name, options)
[docs]def tofile(detector, filename, converter_name, options):
"""
Save a detector data to a ``filename`` using converter defined by ``converter_name``
:param detector:
:param filename:
:param converter_name:
:param options:
:return:
"""
writer_cls = Converters.fromname(converter_name)
writer = writer_cls(filename, options)
logger.debug("Writing file with corename {:s}".format(filename))
writer.write(detector)