Source code for kvazar.diesel

# -*- coding: utf-8 -*-
import copy
import numpy
import logging

from record import Record
from boxcell import BoxCell
from core.lib import libmd
from core.params import consts
import utils

class DieselException(Exception):
    def __init__(self, message):
        super(DieselException, self).__init__(message)

[docs]class Diesel(object): """ Module for molecular dynamic calculation. To initialize it, you have to set structure and method. """ def __init__(self, struct, method, integrator = {"name" : "VELVERLET"}, log_to_file = False): if not hasattr(self, "procid"): self.procid = 0 self.comm = None if log_to_file: logging.basicConfig(filename=log_to_file, level=logging.DEBUG, format='%(asctime)s -> %(levelname)s -> %(message)s') else: logging.basicConfig(level=logging.DEBUG, format='%(asctime)s -> %(levelname)s -> %(message)s') self.calculator = None if method["FF"] == "MARTINI": from core.martini import MARTINI self.calculator = MARTINI(struct, method["params"]) elif method["FF"] == "TB": from core.tb import TB self.calculator = TB(struct, method["params"]) self.calculator.ensure_forces() elif method["FF"] == "REBO": from core.airebo import AIREBO self.calculator = AIREBO(struct, ['u_rebo'], comm = self.comm) elif method["FF"] == "AIREBO": from core.airebo import AIREBO self.calculator = AIREBO(struct, method["params"], comm = self.comm) else: raise DieselException("Unknown Force Field.") if self.procid == 0: logging.info("Force Field : %s" % method["FF"]) self.struct = struct self.dt = 1 # fs self.dK = 1E-20 # коэффициент: н / м ^ 2 в ангстремы на фемтосекунды в квадрате # мониторинг self.monitor_details = ["time"] self.E_kin = 0 # кинетическая энергия в текущий момент времен self.E_p = 0 # потенциальная энергия в текущий момент времени self.cur_time = 0 self.Kin = [] # массив кинетической энергии в процессе динамики self.U = [] # массив потенциальной энергии в процессе динамики self.J = [] # массив векторов теплового потока в процессе динамики self.T = [] self.use_cutoff = False if not "diapason" in integrator: self.atom_start = 0 self.atom_stop = struct.n_atoms else: self.atom_start, self.atom_stop = tuple(integrator["diapason"]) self.prepare_integrator(integrator) def maxwell_velocities(self, temperature): self.struct.velocity = utils.angular_mom.velocity_from_maxwell_distribution(self.struct.mass, temperature) utils.angular_mom.angul_correct(self.struct) def update_monitor(self, mdata): self.monitor_details += mdata
[docs] def get_forces(self): """ The calculation of forces acting on atoms. """ f = numpy.zeros((self.atom_stop - self.atom_start, 3), numpy.float) self.calculator.forces(f, self.atom_start, self.atom_stop) return f
def check_boundary_sphere(self, f, R): libmd.spherical_boundary(self.struct.coords, f, self.struct.n_atoms, R) def check_external_forces(self, f): data = self.ext_forces_data for fdata in data: force = fdata[0] atoms = fdata[1] f[atoms] += force def init_ext_forces(self, data): datan = [] for fd in data: datan.append([numpy.array(map(float, fd[0])), numpy.array(map(int, fd[1]))]) datan[-1][1] -= 1 self.ext_forces_data = datan get_f = self.get_forces def wrapped_forces(): f = get_f() self.check_external_forces(f) return f self.get_forces = wrapped_forces def init_boundary(self, R): self.R = R get_f = self.get_forces def wrapped_forces(): f = get_f() self.check_boundary_sphere(f, self.R) return f self.get_forces = wrapped_forces def prepare_integrator(self, integrator): a, b = self.atom_start, self.atom_stop integrator_name = integrator["name"] thermostat = False if "thermostat" in integrator: thermostat = integrator["thermostat"] if thermostat.lower() != 'dissipative': temperature = integrator["temperature"] self.temper = temperature if self.procid == 0: logging.info(thermostat + " : " + str(temperature)) if "maxwell_veloc" in integrator: temperature = integrator["temperature"] if integrator["maxwell_veloc"]: self.maxwell_velocities(temperature) if thermostat: self.tau = 1000.0 if thermostat.lower() == "berendsen": def first_step(): f = self.get_forces() accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK self.struct.half_velocity = numpy.zeros_like(self.struct.velocity) self.struct.half_velocity[a:b] = self.struct.velocity[a:b] - accel * self.dt * 0.5 self.delta = 1.0 self.first_step = first_step self.move = self.leapfrog_berendsen_step elif thermostat.lower() == "nose-hoover": def first_step(): f = self.get_forces() accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK self.struct.half_velocity = numpy.zeros_like(self.struct.velocity) self.struct.half_velocity[a:b] = self.struct.velocity[a:b] - accel * self.dt * 0.5 self.delta = 0.0 self.first_step = first_step self.move = self.leapfrog_nosehoover_step elif thermostat.lower() == 'dissipative': if not hasattr(self.struct, "accel"): self.struct.accel = numpy.zeros_like(self.struct.velocity) self.move = self.dissipative_step else: if integrator_name.lower() == "verlet": def first_step(): f = self.get_forces() accel = f / self.struct.mass[:,numpy.newaxis] * self.dK self.struct.prev_coords = self.struct.coords - self.struct.velocity * self.dt - 0.5 * accel * self.dt * self.dt self.first_step = first_step self.move = self.verlet_step elif integrator_name.lower() == "velverlet": if not hasattr(self.struct, "accel"): self.struct.accel = numpy.zeros_like(self.struct.velocity) self.move = self.velocity_verlet_step elif integrator_name.lower() == "leapfrog": def first_step(): f = self.get_forces() accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK self.struct.half_velocity = numpy.zeros_like(self.struct.velocity) self.struct.half_velocity[a:b] = self.struct.velocity[a:b] - accel * self.dt * 0.5 self.first_step = first_step self.move = self.leapfrog_step
[docs] def verlet_step(self): """ The Basic Verlet Algorithm. """ a, b = self.atom_start, self.atom_stop coords = self.struct.coords prev_coords = self.struct.prev_coords # силы, действующие на атомы f = self.get_forces() # ускорение в следующий момент времени (м / (с ^ 2)) accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK cur_coords = copy.deepcopy(coords[a:b]) coords[a:b] = 2 * coords[a:b] - prev_coords[a:b] + accel * self.dt * self.dt self.struct.velocity[a:b] = 0.5 / self.dt * (coords[a:b] - cur_coords) self.struct.prev_coords[a:b], self.struct.coords[a:b] = cur_coords, coords[a:b]
[docs] def dissipative_step(self): """ The Velocity Verlet Algorithm. """ a, b = self.atom_start, self.atom_stop coords, velocity, accel = self.struct.coords, self.struct.velocity, self.struct.accel coords[a:b] += velocity[a:b] * self.dt + accel[a:b] * (0.5 * self.dt * self.dt) # координаты в следующий момент времени #расчет скоростей и ускорений на следующем шаге f = self.get_forces() # силы, действующие на атомы next_accel = f / self.struct.mass[:,numpy.newaxis][a:b] # ускорение в следующий момент времени (м / (с ^ 2)) next_accel *= self.dK # ускорение в следующий момент времени (а / (фс ^ 2)) dV = 0.5 * (next_accel + accel[a:b]) * self.dt # изменение скорости accel[a:b] = next_accel velocity[a:b] += dV velocity[a:b] *= 0.99 self.struct.coords, self.struct.velocity, self.struct.accel = coords, velocity, accel
[docs] def velocity_verlet_step(self): """ The Velocity Verlet Algorithm. """ a, b = self.atom_start, self.atom_stop coords, velocity, accel = self.struct.coords, self.struct.velocity, self.struct.accel coords[a:b] += velocity[a:b] * self.dt + accel[a:b] * (0.5 * self.dt * self.dt) # координаты в следующий момент времени #расчет скоростей и ускорений на следующем шаге f = self.get_forces() # силы, действующие на атомы next_accel = f / self.struct.mass[:,numpy.newaxis][a:b] # ускорение в следующий момент времени (м / (с ^ 2)) next_accel *= self.dK # ускорение в следующий момент времени (а / (фс ^ 2)) dV = 0.5 * (next_accel + accel[a:b]) * self.dt # изменение скорости accel[a:b] = next_accel velocity[a:b] += dV velocity[a:b] *= 0.1 self.struct.coords, self.struct.velocity, self.struct.accel = coords, velocity, accel
[docs] def leapfrog_step(self): """ The Verlet Leapfrog Algorithm. """ a, b = self.atom_start, self.atom_stop coords, half_vel = self.struct.coords, self.struct.half_velocity # силы, действующие на атомы f = self.get_forces() # ускорение в следующий момент времени (м / (с ^ 2)) accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK velocity = copy.deepcopy(half_vel) half_vel[a:b] = self.struct.half_velocity[a:b] + accel * self.dt velocity[a:b] += half_vel[a:b] coords[a:b] += half_vel[a:b] * self.dt self.struct.coords, self.struct.velocity, self.struct.half_velocity = coords, velocity, half_vel
def get_cur_electric_field(self): E = 'was_not_determined' if hasattr(self, 'emw'): omega = self.emw['omega'] phase = self.emw['phase'] E0 = self.emw["E_amp"] E = E0 * numpy.sin(omega * self.get_cur_time() * 1e-15 + phase) return E def get_cur_magnetic_field(self): B = 'was_not_determined' if hasattr(self, 'emw'): omega = self.emw['omega'] phase = self.emw['phase'] B0 = self.emw["B_amp"] B = B0 * numpy.cos(omega * self.get_cur_time() * 1e-15 + phase) return B def get_cur_time(self): return self.cur_time def get_cur_energy(self): return self.calculator.compute() def get_cur_temperature(self): return self.current_temper(self.struct.velocity[self.atom_start : self.atom_stop]) def get_cur_momentum(self): return utils.angular_mom.calc_angular_momentum(self.struct) def get_cur_aver_veloc(self): return utils.angular_mom.calc_center_mass_veloc(self.struct) def current_temper(self, vel): a, b = self.atom_start, self.atom_stop def kin(velocity): return 0.5 * (self.struct.mass[a:b] * (1E10 * velocity * velocity).sum(axis = 1)).sum() n = b - a + 1 return 2 * kin(vel) / (n * 3 * consts.K_BOLTZMAN)
[docs] def leapfrog_berendsen_step(self): """ The Verlet Leapfrog Algorithm with Berendsen Thermostat. """ a, b = self.atom_start, self.atom_stop coords = self.struct.coords f = self.get_forces() accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK vel_minus = self.struct.half_velocity[a:b] velocity = self.struct.velocity[a:b] temperature = self.current_temper(velocity) delta = numpy.sqrt(1 + self.dt / self.tau * (self.temper / temperature - 1)) if temperature > 0 else 1 vel_plus = (vel_minus + accel * self.dt) * delta velocity = 0.5 * (vel_plus + vel_minus) coords[a:b] += vel_plus * self.dt self.struct.coords, self.struct.velocity[a:b], self.struct.half_velocity[a:b] = coords, velocity, vel_plus
[docs] def leapfrog_nosehoover_step(self): """ The Verlet Leapfrog Algorithm with Berendsen Thermostat. """ a, b = self.atom_start, self.atom_stop coords = self.struct.coords f = self.get_forces() accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK vel_plus = None delta_plus = 0 delta_minus = self.delta vel_minus = self.struct.half_velocity[a:b] velocity = self.struct.velocity[a:b] delta = 100 for _ in range(3): delta_plus = delta_minus + self.dt / (self.tau * self.tau) * (self.current_temper(velocity) / self.temper - 1) delta = 0.5 * (delta_plus + delta_minus) vel_plus = vel_minus + self.dt * (accel - delta * velocity) velocity = 0.5 * (vel_plus + vel_minus) self.delta = delta coords[a:b] += vel_plus * self.dt self.struct.coords, self.struct.velocity[a:b], self.struct.half_velocity[a:b] = coords, velocity, vel_plus
[docs] def use_pbc(self, pbc_box): """ Periodic boundary conditions can be used in calculations. Geometry of pbc box should be setted. """ self.pbc_box = self.calculator.parm["pbc_box"] = pbc_box f = self.move def wrapped(): f() self.check_pbc() self.move = wrapped
def use_emw(self, emw_data): self.emw = emw_data self.emw['poynting_vector'] = numpy.array(self.emw['poynting_vector'], numpy.float) self.emw['poynting_vector'] /= numpy.linalg.norm(self.emw['poynting_vector']) self.emw['polarization_vector'] = numpy.array(self.emw['polarization_vector'], numpy.float) pv = self.emw['polarization_vector'] self.emw['E_dir'] = pv / numpy.linalg.norm(pv) self.emw['B_dir'] = numpy.cross(self.emw['poynting_vector'], self.emw['polarization_vector']) self.emw['omega'] = float(self.emw['omega']) self.emw['phase'] = float(self.emw['phase']) self.emw['E_amp'] = float(self.emw['E_amp']) self.emw['B_amp'] = float(self.emw['B_amp']) get_forces = self.get_forces def get_forces_with_emw_influence(): f = get_forces() omega = self.emw['omega'] phase = self.emw['phase'] B_dir = self.emw['B_dir'] E_dir = self.emw['E_dir'] E0 = self.emw["E_amp"] B0 = self.emw["B_amp"] E = (E0 * numpy.sin(omega * self.get_cur_time() * 1e-15 + phase)) B = (B0 * numpy.cos(omega * self.get_cur_time() * 1e-15 + phase)) E = E * E_dir # V / meter B = B * B_dir # for i in xrange(self.atom_start, self.atom_stop): q = self.struct.charges[i] # Coulombs v = self.struct.velocity[i] * 1e5 # meter/sec f[i] += q * (E + numpy.cross(B, v)) #FORCES return f self.get_forces = get_forces_with_emw_influence
[docs] def check_pbc(self): """ If any atoms are out of box, method will try to return them back. """ pbc = self.pbc_box l = [pbc[0][1] - pbc[0][0], pbc[1][1] - pbc[1][0], pbc[2][1] - pbc[2][0]] struct = self.struct for i in xrange(self.atom_start, self.atom_stop): for j in xrange(3): if struct.coords[i, j] > pbc[j][1]: struct.coords[i, j] -= l[j] if struct.coords[i, j] < pbc[j][0]: struct.coords[i, j] += l[j]
[docs] def run(self, time_steps): """ Runs calculations as many times as it follows from conditions. """ [self.move() for _ in xrange(time_steps)]
[docs] def init_cutoff(self, r_in, r_out): """ Van-der-Vaals forces cutoff at some distance. """ if 'pbc_box' in self.calculator.parm.keys(): self.box_cell = BoxCell(self.struct.coords, r_in, r_out, self.pbc_box) else: coords = self.calculator.struct.coords min_x, max_x = coords[:,0].min(), coords[:,0].max() min_y, max_y = coords[:,1].min(), coords[:,1].max() min_z, max_z = coords[:,2].min(), coords[:,2].max() min_x -= r_out min_y -= r_out min_z -= r_out max_x += r_out max_y += r_out max_z += r_out box = numpy.array([[min_x, max_x, max_x - min_x], [min_y, max_y, max_y - min_y], [min_z, max_z, max_z - min_z]], numpy.float) if self.procid == 0: logging.info('Periodic box was calculated automatically.') self.use_pbc(box) self.box_cell = BoxCell(self.struct.coords, r_in, r_out, self.pbc_box) self.calculator.init_cutoff(r_in, r_out, self.box_cell.calc_neighbours(self.struct.coords, 0, len(self.struct.coords))) move = self.move def wrapped(): move() if self.box_cell.time_to_update(self.struct.coords, self.atom_start, self.atom_stop): self.calculator.update_cutoff(self.box_cell.calc_neighbours(self.struct.coords, self.atom_start, self.atom_stop)) self.box_cell.update_prev_coords(self.struct.coords) if self.procid == 0: logging.info("Update cutoff-lists.") self.move = wrapped
[docs] def simulation(self, output, period, max_time, config = None, rec_type = 'w'): """ It writes structure characteristics and monitoring values in output file. """ rec = None comm = self.comm def save(): state = dict() struct_state_params = ["coords", "velocity", "accel", "half_velocity", "prev_coords"] for prm in struct_state_params: if hasattr(self.struct, prm): if comm and prm != "coords": prmdata = getattr(self.struct, prm) setattr(self.struct, prm, self.b_cast_coords(prmdata, prmdata[self.atom_start : self.atom_stop])) state[prm] = getattr(self.struct, prm) monitor = dict() for md in self.monitor_details: fmd = "get_cur_" + md if hasattr(self, fmd): monitor[md] = getattr(self, fmd)() if self.procid == 0: for md in monitor: if monitor[md] is float: logging.info("%s : %.5f" % (md, monitor[md])) else: logging.info("%s : %s" % (md, monitor[md])) state["params"] = monitor if self.procid == 0: rec.append_state(state) if hasattr(self, "first_step"): self.first_step() if self.procid == 0: rec = Record(output, rec_type = rec_type) if rec_type == 'w': rec.initialize(self.struct, config = config) self.cur_time = 0 elif rec_type == 'a': rec.update_state(rec.state_count - 1) md_struct_parms = ["coords", "velocity", "accel", "half_velocity", "prev_coords"] for prm in md_struct_parms: if hasattr(rec.struct, prm): setattr(self.struct, prm, getattr(rec.struct, prm)) setattr(self.calculator.struct, prm, getattr(rec.struct, prm)) self.cur_time = rec.params["time"] + self.dt * period max_time += rec.params["time"] else: raise DieselException("Unknown record option %s." % rec_type) if rec_type == 'w': save() while self.cur_time < max_time: struct = self.struct mass = struct.mass / struct.mass[0] coords = struct.coords - ((struct.coords * mass[:,numpy.newaxis]).sum(axis=0) / sum(mass)) self.run(period) if numpy.isnan(self.struct.coords[self.atom_start : self.atom_stop]).any(): raise DieselException("Error: Nan value in coordiante set.") self.cur_time += self.dt * period save() if self.procid == 0: rec.close() logging.info("Done.")