# -*- coding: utf-8 -*-
import copy
import numpy
import logging
from record import Record
from boxcell import BoxCell
from core.lib import libmd
from core.params import consts
import utils
class DieselException(Exception):
def __init__(self, message):
super(DieselException, self).__init__(message)
[docs]class Diesel(object):
"""
Module for molecular dynamic calculation. To initialize it, you have to
set structure and method.
"""
def __init__(self, struct, method, integrator = {"name" : "VELVERLET"}, log_to_file = False):
if not hasattr(self, "procid"):
self.procid = 0
self.comm = None
if log_to_file:
logging.basicConfig(filename=log_to_file, level=logging.DEBUG, format='%(asctime)s -> %(levelname)s -> %(message)s')
else:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s -> %(levelname)s -> %(message)s')
self.calculator = None
if method["FF"] == "MARTINI":
from core.martini import MARTINI
self.calculator = MARTINI(struct, method["params"])
elif method["FF"] == "TB":
from core.tb import TB
self.calculator = TB(struct, method["params"])
self.calculator.ensure_forces()
elif method["FF"] == "REBO":
from core.airebo import AIREBO
self.calculator = AIREBO(struct, ['u_rebo'], comm = self.comm)
elif method["FF"] == "AIREBO":
from core.airebo import AIREBO
self.calculator = AIREBO(struct, method["params"], comm = self.comm)
else:
raise DieselException("Unknown Force Field.")
if self.procid == 0:
logging.info("Force Field : %s" % method["FF"])
self.struct = struct
self.dt = 1 # fs
self.dK = 1E-20 # коэффициент: н / м ^ 2 в ангстремы на фемтосекунды в квадрате
# мониторинг
self.monitor_details = ["time"]
self.E_kin = 0 # кинетическая энергия в текущий момент времен
self.E_p = 0 # потенциальная энергия в текущий момент времени
self.cur_time = 0
self.Kin = [] # массив кинетической энергии в процессе динамики
self.U = [] # массив потенциальной энергии в процессе динамики
self.J = [] # массив векторов теплового потока в процессе динамики
self.T = []
self.use_cutoff = False
if not "diapason" in integrator:
self.atom_start = 0
self.atom_stop = struct.n_atoms
else:
self.atom_start, self.atom_stop = tuple(integrator["diapason"])
self.prepare_integrator(integrator)
def maxwell_velocities(self, temperature):
self.struct.velocity = utils.angular_mom.velocity_from_maxwell_distribution(self.struct.mass, temperature)
utils.angular_mom.angul_correct(self.struct)
def update_monitor(self, mdata):
self.monitor_details += mdata
[docs] def get_forces(self):
"""
The calculation of forces acting on atoms.
"""
f = numpy.zeros((self.atom_stop - self.atom_start, 3), numpy.float)
self.calculator.forces(f, self.atom_start, self.atom_stop)
return f
def check_boundary_sphere(self, f, R):
libmd.spherical_boundary(self.struct.coords, f, self.struct.n_atoms, R)
def check_external_forces(self, f):
data = self.ext_forces_data
for fdata in data:
force = fdata[0]
atoms = fdata[1]
f[atoms] += force
def init_ext_forces(self, data):
datan = []
for fd in data:
datan.append([numpy.array(map(float, fd[0])), numpy.array(map(int, fd[1]))])
datan[-1][1] -= 1
self.ext_forces_data = datan
get_f = self.get_forces
def wrapped_forces():
f = get_f()
self.check_external_forces(f)
return f
self.get_forces = wrapped_forces
def init_boundary(self, R):
self.R = R
get_f = self.get_forces
def wrapped_forces():
f = get_f()
self.check_boundary_sphere(f, self.R)
return f
self.get_forces = wrapped_forces
def prepare_integrator(self, integrator):
a, b = self.atom_start, self.atom_stop
integrator_name = integrator["name"]
thermostat = False
if "thermostat" in integrator:
thermostat = integrator["thermostat"]
if thermostat.lower() != 'dissipative':
temperature = integrator["temperature"]
self.temper = temperature
if self.procid == 0:
logging.info(thermostat + " : " + str(temperature))
if "maxwell_veloc" in integrator:
temperature = integrator["temperature"]
if integrator["maxwell_veloc"]:
self.maxwell_velocities(temperature)
if thermostat:
self.tau = 1000.0
if thermostat.lower() == "berendsen":
def first_step():
f = self.get_forces()
accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK
self.struct.half_velocity = numpy.zeros_like(self.struct.velocity)
self.struct.half_velocity[a:b] = self.struct.velocity[a:b] - accel * self.dt * 0.5
self.delta = 1.0
self.first_step = first_step
self.move = self.leapfrog_berendsen_step
elif thermostat.lower() == "nose-hoover":
def first_step():
f = self.get_forces()
accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK
self.struct.half_velocity = numpy.zeros_like(self.struct.velocity)
self.struct.half_velocity[a:b] = self.struct.velocity[a:b] - accel * self.dt * 0.5
self.delta = 0.0
self.first_step = first_step
self.move = self.leapfrog_nosehoover_step
elif thermostat.lower() == 'dissipative':
if not hasattr(self.struct, "accel"):
self.struct.accel = numpy.zeros_like(self.struct.velocity)
self.move = self.dissipative_step
else:
if integrator_name.lower() == "verlet":
def first_step():
f = self.get_forces()
accel = f / self.struct.mass[:,numpy.newaxis] * self.dK
self.struct.prev_coords = self.struct.coords - self.struct.velocity * self.dt - 0.5 * accel * self.dt * self.dt
self.first_step = first_step
self.move = self.verlet_step
elif integrator_name.lower() == "velverlet":
if not hasattr(self.struct, "accel"):
self.struct.accel = numpy.zeros_like(self.struct.velocity)
self.move = self.velocity_verlet_step
elif integrator_name.lower() == "leapfrog":
def first_step():
f = self.get_forces()
accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK
self.struct.half_velocity = numpy.zeros_like(self.struct.velocity)
self.struct.half_velocity[a:b] = self.struct.velocity[a:b] - accel * self.dt * 0.5
self.first_step = first_step
self.move = self.leapfrog_step
[docs] def verlet_step(self):
"""
The Basic Verlet Algorithm.
"""
a, b = self.atom_start, self.atom_stop
coords = self.struct.coords
prev_coords = self.struct.prev_coords
# силы, действующие на атомы
f = self.get_forces()
# ускорение в следующий момент времени (м / (с ^ 2))
accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK
cur_coords = copy.deepcopy(coords[a:b])
coords[a:b] = 2 * coords[a:b] - prev_coords[a:b] + accel * self.dt * self.dt
self.struct.velocity[a:b] = 0.5 / self.dt * (coords[a:b] - cur_coords)
self.struct.prev_coords[a:b], self.struct.coords[a:b] = cur_coords, coords[a:b]
[docs] def dissipative_step(self):
"""
The Velocity Verlet Algorithm.
"""
a, b = self.atom_start, self.atom_stop
coords, velocity, accel = self.struct.coords, self.struct.velocity, self.struct.accel
coords[a:b] += velocity[a:b] * self.dt + accel[a:b] * (0.5 * self.dt * self.dt) # координаты в следующий момент времени
#расчет скоростей и ускорений на следующем шаге
f = self.get_forces() # силы, действующие на атомы
next_accel = f / self.struct.mass[:,numpy.newaxis][a:b] # ускорение в следующий момент времени (м / (с ^ 2))
next_accel *= self.dK # ускорение в следующий момент времени (а / (фс ^ 2))
dV = 0.5 * (next_accel + accel[a:b]) * self.dt # изменение скорости
accel[a:b] = next_accel
velocity[a:b] += dV
velocity[a:b] *= 0.99
self.struct.coords, self.struct.velocity, self.struct.accel = coords, velocity, accel
[docs] def velocity_verlet_step(self):
"""
The Velocity Verlet Algorithm.
"""
a, b = self.atom_start, self.atom_stop
coords, velocity, accel = self.struct.coords, self.struct.velocity, self.struct.accel
coords[a:b] += velocity[a:b] * self.dt + accel[a:b] * (0.5 * self.dt * self.dt) # координаты в следующий момент времени
#расчет скоростей и ускорений на следующем шаге
f = self.get_forces() # силы, действующие на атомы
next_accel = f / self.struct.mass[:,numpy.newaxis][a:b] # ускорение в следующий момент времени (м / (с ^ 2))
next_accel *= self.dK # ускорение в следующий момент времени (а / (фс ^ 2))
dV = 0.5 * (next_accel + accel[a:b]) * self.dt # изменение скорости
accel[a:b] = next_accel
velocity[a:b] += dV
velocity[a:b] *= 0.1
self.struct.coords, self.struct.velocity, self.struct.accel = coords, velocity, accel
[docs] def leapfrog_step(self):
"""
The Verlet Leapfrog Algorithm.
"""
a, b = self.atom_start, self.atom_stop
coords, half_vel = self.struct.coords, self.struct.half_velocity
# силы, действующие на атомы
f = self.get_forces()
# ускорение в следующий момент времени (м / (с ^ 2))
accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK
velocity = copy.deepcopy(half_vel)
half_vel[a:b] = self.struct.half_velocity[a:b] + accel * self.dt
velocity[a:b] += half_vel[a:b]
coords[a:b] += half_vel[a:b] * self.dt
self.struct.coords, self.struct.velocity, self.struct.half_velocity = coords, velocity, half_vel
def get_cur_electric_field(self):
E = 'was_not_determined'
if hasattr(self, 'emw'):
omega = self.emw['omega']
phase = self.emw['phase']
E0 = self.emw["E_amp"]
E = E0 * numpy.sin(omega * self.get_cur_time() * 1e-15 + phase)
return E
def get_cur_magnetic_field(self):
B = 'was_not_determined'
if hasattr(self, 'emw'):
omega = self.emw['omega']
phase = self.emw['phase']
B0 = self.emw["B_amp"]
B = B0 * numpy.cos(omega * self.get_cur_time() * 1e-15 + phase)
return B
def get_cur_time(self):
return self.cur_time
def get_cur_energy(self):
return self.calculator.compute()
def get_cur_temperature(self):
return self.current_temper(self.struct.velocity[self.atom_start : self.atom_stop])
def get_cur_momentum(self):
return utils.angular_mom.calc_angular_momentum(self.struct)
def get_cur_aver_veloc(self):
return utils.angular_mom.calc_center_mass_veloc(self.struct)
def current_temper(self, vel):
a, b = self.atom_start, self.atom_stop
def kin(velocity):
return 0.5 * (self.struct.mass[a:b] * (1E10 * velocity * velocity).sum(axis = 1)).sum()
n = b - a + 1
return 2 * kin(vel) / (n * 3 * consts.K_BOLTZMAN)
[docs] def leapfrog_berendsen_step(self):
"""
The Verlet Leapfrog Algorithm with Berendsen Thermostat.
"""
a, b = self.atom_start, self.atom_stop
coords = self.struct.coords
f = self.get_forces()
accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK
vel_minus = self.struct.half_velocity[a:b]
velocity = self.struct.velocity[a:b]
temperature = self.current_temper(velocity)
delta = numpy.sqrt(1 + self.dt / self.tau * (self.temper / temperature - 1)) if temperature > 0 else 1
vel_plus = (vel_minus + accel * self.dt) * delta
velocity = 0.5 * (vel_plus + vel_minus)
coords[a:b] += vel_plus * self.dt
self.struct.coords, self.struct.velocity[a:b], self.struct.half_velocity[a:b] = coords, velocity, vel_plus
[docs] def leapfrog_nosehoover_step(self):
"""
The Verlet Leapfrog Algorithm with Berendsen Thermostat.
"""
a, b = self.atom_start, self.atom_stop
coords = self.struct.coords
f = self.get_forces()
accel = f / self.struct.mass[:,numpy.newaxis][a:b] * self.dK
vel_plus = None
delta_plus = 0
delta_minus = self.delta
vel_minus = self.struct.half_velocity[a:b]
velocity = self.struct.velocity[a:b]
delta = 100
for _ in range(3):
delta_plus = delta_minus + self.dt / (self.tau * self.tau) * (self.current_temper(velocity) / self.temper - 1)
delta = 0.5 * (delta_plus + delta_minus)
vel_plus = vel_minus + self.dt * (accel - delta * velocity)
velocity = 0.5 * (vel_plus + vel_minus)
self.delta = delta
coords[a:b] += vel_plus * self.dt
self.struct.coords, self.struct.velocity[a:b], self.struct.half_velocity[a:b] = coords, velocity, vel_plus
[docs] def use_pbc(self, pbc_box):
"""
Periodic boundary conditions can be used in calculations. Geometry of
pbc box should be setted.
"""
self.pbc_box = self.calculator.parm["pbc_box"] = pbc_box
f = self.move
def wrapped():
f()
self.check_pbc()
self.move = wrapped
def use_emw(self, emw_data):
self.emw = emw_data
self.emw['poynting_vector'] = numpy.array(self.emw['poynting_vector'], numpy.float)
self.emw['poynting_vector'] /= numpy.linalg.norm(self.emw['poynting_vector'])
self.emw['polarization_vector'] = numpy.array(self.emw['polarization_vector'], numpy.float)
pv = self.emw['polarization_vector']
self.emw['E_dir'] = pv / numpy.linalg.norm(pv)
self.emw['B_dir'] = numpy.cross(self.emw['poynting_vector'], self.emw['polarization_vector'])
self.emw['omega'] = float(self.emw['omega'])
self.emw['phase'] = float(self.emw['phase'])
self.emw['E_amp'] = float(self.emw['E_amp'])
self.emw['B_amp'] = float(self.emw['B_amp'])
get_forces = self.get_forces
def get_forces_with_emw_influence():
f = get_forces()
omega = self.emw['omega']
phase = self.emw['phase']
B_dir = self.emw['B_dir']
E_dir = self.emw['E_dir']
E0 = self.emw["E_amp"]
B0 = self.emw["B_amp"]
E = (E0 * numpy.sin(omega * self.get_cur_time() * 1e-15 + phase))
B = (B0 * numpy.cos(omega * self.get_cur_time() * 1e-15 + phase))
E = E * E_dir # V / meter
B = B * B_dir #
for i in xrange(self.atom_start, self.atom_stop):
q = self.struct.charges[i] # Coulombs
v = self.struct.velocity[i] * 1e5 # meter/sec
f[i] += q * (E + numpy.cross(B, v)) #FORCES
return f
self.get_forces = get_forces_with_emw_influence
[docs] def check_pbc(self):
"""
If any atoms are out of box, method will try to return them back.
"""
pbc = self.pbc_box
l = [pbc[0][1] - pbc[0][0], pbc[1][1] - pbc[1][0], pbc[2][1] - pbc[2][0]]
struct = self.struct
for i in xrange(self.atom_start, self.atom_stop):
for j in xrange(3):
if struct.coords[i, j] > pbc[j][1]:
struct.coords[i, j] -= l[j]
if struct.coords[i, j] < pbc[j][0]:
struct.coords[i, j] += l[j]
[docs] def run(self, time_steps):
"""
Runs calculations as many times as it follows from conditions.
"""
[self.move() for _ in xrange(time_steps)]
[docs] def init_cutoff(self, r_in, r_out):
"""
Van-der-Vaals forces cutoff at some distance.
"""
if 'pbc_box' in self.calculator.parm.keys():
self.box_cell = BoxCell(self.struct.coords, r_in, r_out, self.pbc_box)
else:
coords = self.calculator.struct.coords
min_x, max_x = coords[:,0].min(), coords[:,0].max()
min_y, max_y = coords[:,1].min(), coords[:,1].max()
min_z, max_z = coords[:,2].min(), coords[:,2].max()
min_x -= r_out
min_y -= r_out
min_z -= r_out
max_x += r_out
max_y += r_out
max_z += r_out
box = numpy.array([[min_x, max_x, max_x - min_x],
[min_y, max_y, max_y - min_y],
[min_z, max_z, max_z - min_z]], numpy.float)
if self.procid == 0:
logging.info('Periodic box was calculated automatically.')
self.use_pbc(box)
self.box_cell = BoxCell(self.struct.coords, r_in, r_out, self.pbc_box)
self.calculator.init_cutoff(r_in, r_out, self.box_cell.calc_neighbours(self.struct.coords, 0, len(self.struct.coords)))
move = self.move
def wrapped():
move()
if self.box_cell.time_to_update(self.struct.coords, self.atom_start, self.atom_stop):
self.calculator.update_cutoff(self.box_cell.calc_neighbours(self.struct.coords, self.atom_start, self.atom_stop))
self.box_cell.update_prev_coords(self.struct.coords)
if self.procid == 0:
logging.info("Update cutoff-lists.")
self.move = wrapped
[docs] def simulation(self, output, period, max_time, config = None, rec_type = 'w'):
"""
It writes structure characteristics and monitoring values in
output file.
"""
rec = None
comm = self.comm
def save():
state = dict()
struct_state_params = ["coords", "velocity", "accel", "half_velocity", "prev_coords"]
for prm in struct_state_params:
if hasattr(self.struct, prm):
if comm and prm != "coords":
prmdata = getattr(self.struct, prm)
setattr(self.struct, prm, self.b_cast_coords(prmdata, prmdata[self.atom_start : self.atom_stop]))
state[prm] = getattr(self.struct, prm)
monitor = dict()
for md in self.monitor_details:
fmd = "get_cur_" + md
if hasattr(self, fmd):
monitor[md] = getattr(self, fmd)()
if self.procid == 0:
for md in monitor:
if monitor[md] is float:
logging.info("%s : %.5f" % (md, monitor[md]))
else:
logging.info("%s : %s" % (md, monitor[md]))
state["params"] = monitor
if self.procid == 0:
rec.append_state(state)
if hasattr(self, "first_step"):
self.first_step()
if self.procid == 0:
rec = Record(output, rec_type = rec_type)
if rec_type == 'w':
rec.initialize(self.struct, config = config)
self.cur_time = 0
elif rec_type == 'a':
rec.update_state(rec.state_count - 1)
md_struct_parms = ["coords", "velocity", "accel", "half_velocity", "prev_coords"]
for prm in md_struct_parms:
if hasattr(rec.struct, prm):
setattr(self.struct, prm, getattr(rec.struct, prm))
setattr(self.calculator.struct, prm, getattr(rec.struct, prm))
self.cur_time = rec.params["time"] + self.dt * period
max_time += rec.params["time"]
else:
raise DieselException("Unknown record option %s." % rec_type)
if rec_type == 'w':
save()
while self.cur_time < max_time:
struct = self.struct
mass = struct.mass / struct.mass[0]
coords = struct.coords - ((struct.coords * mass[:,numpy.newaxis]).sum(axis=0) / sum(mass))
self.run(period)
if numpy.isnan(self.struct.coords[self.atom_start : self.atom_stop]).any():
raise DieselException("Error: Nan value in coordiante set.")
self.cur_time += self.dt * period
save()
if self.procid == 0:
rec.close()
logging.info("Done.")