# -*- coding: utf-8 -*-
import os
import copy
import yaml
import numpy
import shutil
import logging
from zipfile import ZipFile
from zipfile import ZIP_STORED
from kvazar.fileman import FileManager
ZIP_PATH = {"coords" : "coords/crd_%d.npy",
"prev_coords" : "coords/pcrd_%d.npy",
"velocity" : "vels/vel_%d.npy",
"half_velocity" : "vels/hvel_%d.npy",
"accel" : "accels/acc_%d.npy",
"params" : "params/param_%d.txt",
"T" : "npy_params/T.npy",
"U" : "npy_params/U.npy",
"K" : "npy_params/K.npy",
"J" : "npy_params/J.npy"}
class RecordException(Exception):
def __init__(self, message):
super(RecordException, self).__init__(message)
[docs]class Record(object):
"""
It's a class for saving intermediate data of calculation and archive it
in one output file.
"""
def __init__(self, rec_file, rec_type = 'w'):
self.rec_file = rec_file
rf_name, rf_ext = os.path.splitext(rec_file)
self.state_count = 0
self.work_dir = rf_name + "_work_dir"
self.fm = FileManager()
index_dir = 1
while os.path.exists(self.work_dir):
self.work_dir = rf_name + "_work_dir_%d" % index_dir
index_dir += 1
os.mkdir(self.work_dir)
if rec_type == 'a' and os.path.isfile(rec_file):
logging.info("Opening %s" % rec_file)
self.zip_file = ZipFile(rec_file, 'a')
self.zip_file.extractall(self.work_dir)
mask = ZIP_PATH["coords"]
while os.path.exists(self.work_dir + "/" + (mask % self.state_count)):
self.state_count += 1
acd_files = self.get_path_to_files_with_ext("ACD")
if len(acd_files) > 0:
self.struct = self.fm.load(acd_files[0])
else:
raise RecordException("Bad record file. Don't find struct.ACD")
if len(acd_files) > 1:
logging.warning("Too many .ACD files in %s." % rec_file)
elif rec_type == 'w':
logging.info("Creating %s" % rec_file)
self.zip_file = ZipFile(rec_file, 'w')
else:
raise RecordException('Unknown modifier %s' %rec_type + "\nUse 'r' or 'a'!")
def get_path_to_files_with_ext(self, ext):
import re
tree = os.walk(self.work_dir)
p_ext = re.compile('.*[.]%s' % ext)
path_to_files = []
for d in tree:
ext_files = filter(lambda x : p_ext.match(x), d[2])
for cur_file in ext_files:
path_to_files.append(d[0] + "/" + cur_file)
return path_to_files
def initialize(self, struct, config = None):
necessary_dirs = ["coords", "vels", "accels", "params"]
self.set_file(struct)
if config:
self.set_config(config)
map(lambda x : self.fm.ensure_dir(self.work_dir + "/" + x), necessary_dirs)
logging.info("Initializing of record file was succesfully completed.")
def set_file(self, struct):
self.struct = copy.deepcopy(struct)
fm = FileManager()
fm.set_struct(self.struct)
pstruct = self.work_dir + "/struct.ACD"
fm.save(pstruct)
self.zip_file.write(pstruct, "struct.ACD", ZIP_STORED)
def set_config(self, path_to_config):
pconf = self.work_dir + "/config.cfg"
shutil.copy(path_to_config, pconf)
self.zip_file.write(pconf, "struct.ACD", ZIP_STORED)
def append_state(self, state):
npy = ["coords", "velocity", "accel", "T", "J", "U", "K", "half_velocity", "prev_coords"]
yml = ["params"]
def make_params_file(path_txt, monitor_data):
cur_details = open(path_txt, "w")
for d in monitor_data.keys():
data = "%s : %s\n\n" % (d, monitor_data[d])
cur_details.write(data)
cur_details.close()
for parm in state.keys():
if parm in npy:
path = ZIP_PATH[parm] % self.state_count
pparm = self.work_dir + "/" + path
numpy.save(pparm, state[parm])
self.zip_file.write(pparm, path, ZIP_STORED)
elif parm in yml:
data = state[parm]
path = ZIP_PATH[parm] % self.state_count
pdata = self.work_dir + "/" + path
make_params_file(pdata, data)
self.zip_file.write(pdata, path, ZIP_STORED)
self.state_count += 1
def close(self):
self.zip_file.close()
shutil.rmtree(self.work_dir)
def update_state(self, number):
struct_parms = ["coords", "velocity", "accel", "half_velocity", "prev_coords"]
for prm in struct_parms:
path = self.work_dir + "/" + ZIP_PATH[prm] % number
if os.path.exists(path):
setattr(self.struct, prm, numpy.load(path))
self.params = yaml.load(open((self.work_dir + "/" + ZIP_PATH["params"]) % number))
def main():
logging.basicConfig(filename='record.log',level=logging.DEBUG)
fm = FileManager()
struct = fm.load("/home/savegor/projects/kvazar/python/problems/DPPC.ACD")
a = Record("/home/savegor/projects/kvazar/python/output/lol.nmd")
a.initialize(struct)
a.close()
print "Completed"
if __name__=="__main__":
main()