Source code for kvazar.record

# -*- coding: utf-8 -*-
import os
import copy
import yaml
import numpy
import shutil
import logging
from zipfile import ZipFile
from zipfile import ZIP_STORED

from kvazar.fileman import FileManager

ZIP_PATH   = {"coords" : "coords/crd_%d.npy", 
              "prev_coords" : "coords/pcrd_%d.npy",
              "velocity" : "vels/vel_%d.npy",
              "half_velocity" : "vels/hvel_%d.npy",
              "accel" : "accels/acc_%d.npy", 
              "params" : "params/param_%d.txt",
              "T" : "npy_params/T.npy",
              "U" : "npy_params/U.npy",
              "K" : "npy_params/K.npy",
              "J" : "npy_params/J.npy"}

class RecordException(Exception):
    def __init__(self, message):
        super(RecordException, self).__init__(message)

[docs]class Record(object): """ It's a class for saving intermediate data of calculation and archive it in one output file. """ def __init__(self, rec_file, rec_type = 'w'): self.rec_file = rec_file rf_name, rf_ext = os.path.splitext(rec_file) self.state_count = 0 self.work_dir = rf_name + "_work_dir" = FileManager() index_dir = 1 while os.path.exists(self.work_dir): self.work_dir = rf_name + "_work_dir_%d" % index_dir index_dir += 1 os.mkdir(self.work_dir) if rec_type == 'a' and os.path.isfile(rec_file):"Opening %s" % rec_file) self.zip_file = ZipFile(rec_file, 'a') self.zip_file.extractall(self.work_dir) mask = ZIP_PATH["coords"] while os.path.exists(self.work_dir + "/" + (mask % self.state_count)): self.state_count += 1 acd_files = self.get_path_to_files_with_ext("ACD") if len(acd_files) > 0: self.struct =[0]) else: raise RecordException("Bad record file. Don't find struct.ACD") if len(acd_files) > 1: logging.warning("Too many .ACD files in %s." % rec_file) elif rec_type == 'w':"Creating %s" % rec_file) self.zip_file = ZipFile(rec_file, 'w') else: raise RecordException('Unknown modifier %s' %rec_type + "\nUse 'r' or 'a'!") def get_path_to_files_with_ext(self, ext): import re tree = os.walk(self.work_dir) p_ext = re.compile('.*[.]%s' % ext) path_to_files = [] for d in tree: ext_files = filter(lambda x : p_ext.match(x), d[2]) for cur_file in ext_files: path_to_files.append(d[0] + "/" + cur_file) return path_to_files def initialize(self, struct, config = None): necessary_dirs = ["coords", "vels", "accels", "params"] self.set_file(struct) if config: self.set_config(config) map(lambda x : + "/" + x), necessary_dirs)"Initializing of record file was succesfully completed.") def set_file(self, struct): self.struct = copy.deepcopy(struct) fm = FileManager() fm.set_struct(self.struct) pstruct = self.work_dir + "/struct.ACD" self.zip_file.write(pstruct, "struct.ACD", ZIP_STORED) def set_config(self, path_to_config): pconf = self.work_dir + "/config.cfg" shutil.copy(path_to_config, pconf) self.zip_file.write(pconf, "struct.ACD", ZIP_STORED) def append_state(self, state): npy = ["coords", "velocity", "accel", "T", "J", "U", "K", "half_velocity", "prev_coords"] yml = ["params"] def make_params_file(path_txt, monitor_data): cur_details = open(path_txt, "w") for d in monitor_data.keys(): data = "%s : %s\n\n" % (d, monitor_data[d]) cur_details.write(data) cur_details.close() for parm in state.keys(): if parm in npy: path = ZIP_PATH[parm] % self.state_count pparm = self.work_dir + "/" + path, state[parm]) self.zip_file.write(pparm, path, ZIP_STORED) elif parm in yml: data = state[parm] path = ZIP_PATH[parm] % self.state_count pdata = self.work_dir + "/" + path make_params_file(pdata, data) self.zip_file.write(pdata, path, ZIP_STORED) self.state_count += 1 def close(self): self.zip_file.close() shutil.rmtree(self.work_dir) def update_state(self, number): struct_parms = ["coords", "velocity", "accel", "half_velocity", "prev_coords"] for prm in struct_parms: path = self.work_dir + "/" + ZIP_PATH[prm] % number if os.path.exists(path): setattr(self.struct, prm, numpy.load(path)) self.params = yaml.load(open((self.work_dir + "/" + ZIP_PATH["params"]) % number))
def main(): logging.basicConfig(filename='record.log',level=logging.DEBUG) fm = FileManager() struct = fm.load("/home/savegor/projects/kvazar/python/problems/DPPC.ACD") a = Record("/home/savegor/projects/kvazar/python/output/lol.nmd") a.initialize(struct) a.close() print "Completed" if __name__=="__main__": main()