Source code for susmost.mc.mc

from mpi4py import MPI
from copy import deepcopy
import numpy as np
from math import floor, log, exp
from itertools import groupby
from .tmatrix import make_dumb_tmatrix
from time import time
from . import metropolis
from susmost.lattice import Lattice
from susmost.make_regr import make_regr
from susmost.sitestate import SPMI_db
import types
from collections import namedtuple
from scipy.interpolate import interp1d


def param_idx(self, param_name):
	if param_name == 'energy':
		return -2
	return self.param_names.index(param_name)

def means(self, param_name):
	pidx = self.param_idx(param_name)
	return np.mean(self.full_params_log[:,:,pidx], axis=1)

def save_as_xyz(self, fn, comment='', multiplier = [1,1,1], mulmul=0.0):
	return self.lattice.save_as_xyz(fn, comment, multiplier, mulmul)


setattr(metropolis.Metropolis, "param_idx", param_idx)
setattr(metropolis.Metropolis, "means", means)
setattr(metropolis.Metropolis, "save_as_xyz", save_as_xyz)

"""
m.params_log:

0)	additive_params[0] / lattice.cells_count
1)	additive_params[1] / lattice.cells_count
........
-8)	in-site avgE / lattice.cells_count
-7)	in-site entropy / lattice.cells_count
-6)	in-site min_E / lattice.cells_count
-5)	in-site free_E / lattice.cells_count
-4)	acceptance rate
-3)	curE.internal / lattice.cells_count
-2)	curE.sum() / lattice.cells_count
-1)	T_idx
"""

def make_additive_params(states, param_names):
	return [[[props[pname].value for pname in param_names] for _,props in s.EP_list()] for s in states]
	

[docs] def make_metropolis(lat_task, supercell, T, k_B, param_names=None, precoverage = None, seed=None, pbc_flags=[True, True, False]): """ Create Metropolis object for specified LatticeTask and ReGraph objects Parameters: :lat_task: LatticeTask object :supercell: size of the lattice. See :func:`susmost.cell.universal_cell_size` for possible formats :T: list of temperatures in Kalvin units for replica exchange (*aka* parallel tempering), number of temperatures must be equal to number of MPI processes :k_B: Boltzmann constant, used as energy scale parameter :param_names: list of names of states properties that will be sampled in Metropolis iterations as simulaion parameters; default value - ['coverage'] :precoverage: index of the state that will be used for initialization of the lattice, by default the state with zero ``coverage`` parameter is used :seed: random number generator for Metropolis simulaion :pbc_flags: list or tuple of three boolean flags, that enables/disables Periodic Boundary Conditions along each axis; default value - ``[True, True, False]`` Returns: Metropolis object """ regr = make_regr(supercell, lat_task, pbc_flags=pbc_flags) # key constants: cells_count = len(regr) edges_count = lat_task.edges_count lc_count = lat_task.states_count #---------------------------------- try: T_list = [t for t in T] except TypeError: T_list = [T] #---------------------------------- if param_names is None: param_names = sorted(list(SPMI_db.keys() - set(['name', 'xyz_mark', 'ads_energy', 'idx']))) #---------------------------------- if seed is None: seed = int(time()) #---------------------------------- if precoverage is None: precoverage = lat_task.zero_coverage_state_index precov_state_index = int(precoverage) cells = np.full(cells_count, precov_state_index, dtype=int) # fill by empty_state_index #---------------------------------- allowed_cells = np.full( (cells_count,lc_count), True, dtype=bool) # all allowed transitions = make_dumb_tmatrix(lat_task.states) # any to any allowed internal_energies = [[E for E,_ in s.EP_list()] for s in lat_task.states] # from adsorption energies, s.E - is a list of lattice_graph=regr.as_nparray(edges_count) # fill lattice_graph from regr interaction = lat_task.IM.asarray() # fill interaction from IM additive_params = make_additive_params(lat_task.states, param_names) m = metropolis.Metropolis(lattice_graph, cells, allowed_cells, interaction, internal_energies, transitions, T_list, additive_params, k_B, seed) m.phys_diffusion = False m.E_inf = lat_task.INF_E m.PT_period = 10 * len(cells) # Each m.PT_period steps we try to switch the temperature m.diff_cui_min = 1 # Minimal m.diff_cui_max = 1 m.regr = regr m.regr_array = lattice_graph m.lattice_task = lat_task m.param_names = param_names m.lattice = Lattice(lat_task, regr=regr, regr_array=lattice_graph, cells=m.cells) return m
[docs] def run(m, log_periods_cnt, log_period_steps=None, log_callback=None, params_period_steps=None, relaxation_steps = None, bcast_full_params_log = True, traj_fns='auto', recalc_curs=True): """ Perform Metropolis iterations Parameters: :m: Metropolis object :log_periods_cnt: number of log periods to perform :log_period_steps: number of Metropolis iterations in a single log period. By default ``m.cells_count`` value is used so log period is equivalent to conventional *Monte Carlo step* notion :log_callback: function to be called after each log period, ``log_callback`` function must take ``m`` as a parameter and can return any object as result. List of all objects returned from ``log_callback`` will be returned as a result of overall ``run()`` call. By default is None, so nothing is logged. :params_period_steps: period for sampling of energy and state properties (see notes on ``make_metropolis()`` function). Samples are stored in ``full_params_log`` attribute of ``m`` object. Meassured in Metropolis iterations. Default value ``(log_periods_cnt * log_period_steps)//1000 + 1`` so up to 1000 entries will be added to ``full_params_log``. :relaxation_steps: number of Metropolis iterations to be skipped before logging and parameters sampling. Default value ``log_periods_cnt * log_period_steps`` :bcast_full_params_log: set to False if parameters samples should not be broadcasted to all MPI processes. By default ``True``, so every processes receives complete sample of parameters. :traj_fns: list of XYZ file names to be used to save snapshots of the lattice each ``log_period_steps`` iteration. ``traj_fns[i]`` is used to save samples from *i*-th replica (see notes on ``make_metropolis()`` function). By default "traj.{temperature}.xyz" is used for each temperature :recalc_curs: Call ``recalc_curs()`` method before running Metropolis simulaions. ``recalc_curs()`` recomputes total energy and additive parameter values. It is computationally expensive, but necessary if ``cells`` property was modified outside of Metropolis iterations, for example due to coverage initialization. Number of steps to execute: ``relaxation_steps + log_periods_cnt * log_period_steps``. Number of ``full_params_log`` entries: ``(log_periods_cnt * log_period_steps) / params_period_steps``. Number of samples in trajectory files: ``log_periods_cnt``. Results: List of objects returned by ``log_callback`` function calls After the function call ``m.full_params_log`` contains three-dimensional array with samples of simulation parameters. Parameter values are averaged over lattice sites. ``m.full_params_log[i,j,k]`` is the value of the *k*-th parameter on *j*-th time period from *i*-th replica. Parameter index ``k`` correspond to the *k*-th item of ``param_names`` argument of ``make_metropolis()`` function. In addition to state parameters, several system-wide parameters are included into ``m.full_params_log``. In particular: - ``m.full_params_log[i,j,-1]`` index of the temperature of this sample - ``m.full_params_log[i,j,-2]`` total energy of the system per one lattice cell - ``m.full_params_log[i,j,-3]`` lateral interactions energy (total energy minus adsoprtion energy) per lattice cell - ``m.full_params_log[i,j,-4]`` acceptance rate of Metropolis iterations - ``m.full_params_log[i,j,-5]`` KMC r0 parameter, i.e. maximal step rate, should be constant through correct simulaion - ``m.full_params_log[i,j,-6]`` free adsorption energy per lattice cell - ``m.full_params_log[i,j,-7]`` ground state (minimal) adsorption energy per lattice cell - ``m.full_params_log[i,j,-8]`` entropy of adsoprtion complexes per lattice cell - ``m.full_params_log[i,j,-9]`` mean internal energy (enthalpy) of adsoprtion complexes per lattice cell - ``m.full_params_log[i,j,:-9]`` samples of states parameters - see ``param_names`` argument of ``make_metropolis()`` function. """ if log_period_steps is None: log_period_steps = m.cells_count if relaxation_steps is None: relaxation_steps = log_periods_cnt * log_period_steps if params_period_steps is None: params_period_steps = (log_periods_cnt * log_period_steps)//1000 + 1 if log_callback is None: def log_callback(m): return None if traj_fns=='auto': traj_fns = ["traj.{}.xyz".format(T) for T in m.T_list] if traj_fns is not None: assert len(traj_fns) == len(m.T_list), "Improper number of trajectory filenames: {} instead of {}".format(len(traj_fns), len(m.T_list)) def write_traj(m): m.save_as_xyz(traj_fns[m.T_idx], comment='E={}'.format(m.curE.sum())) else: def write_traj(m): pass result = {} # dict of temperatures to lists with log_callback() results m.params_log_period = log_period_steps if MPI.COMM_WORLD.Get_rank() == 0: if relaxation_steps > 0: print ("Relaxation steps...") else: print ("No relaxation steps requested.") if recalc_curs: if MPI.COMM_WORLD.Get_rank() == 0: #print ("Calling recalc_curs() ...") pass m.recalc_curs() m.run(relaxation_steps) if relaxation_steps > 0 and MPI.COMM_WORLD.Get_rank() == 0: print ("Relaxation completed.") # assert all([pl.enabled for pl in m.param_limits]), [pl.enabled for pl in m.param_limits] m.full_params_log = [] m.params_log_period = params_period_steps time_start = time() for i in range(log_periods_cnt): if MPI.COMM_WORLD.Get_rank() == 0: time_passed = time() - time_start time_per_step = time_passed / i if i > 0 else np.inf ETA = log_periods_cnt * time_per_step - time_passed print('\rStep # {} of {}. Time passed: {:.2f} seconds. Estimated time before completion: {:.2f} seconds'.format(i, log_periods_cnt, time_passed, ETA), end = '\r') m.run(log_period_steps) m.full_params_log += [deepcopy(m.params_log)] res_i = log_callback(m) result.setdefault(m.T_idx,list()).append(res_i) write_traj(m) if MPI.COMM_WORLD.Get_rank() == 0: print ("\n\nDone!\n\n") #print "Gathering callback results..." result = MPI.COMM_WORLD.gather(result,root=0) #print "Gathering callback results... done!" if MPI.COMM_WORLD.Get_rank() == 0: aggr = result[0] # use root dict as initial for aggregated dict for dict_i in result[1:]: for T_idx, res_log in dict_i.items(): aggr.setdefault(T_idx, list()).extend(res_log) result = [res_log for T_idx, res_log in aggr.items()] assert all([len(log_i) == log_periods_cnt for log_i in result]) # check for each temperature result = MPI.COMM_WORLD.bcast(result,root=0) full_params_log = np.concatenate(m.full_params_log) # [time, param] concatenate along `time` axis full_params_log = MPI.COMM_WORLD.gather(full_params_log,root=0) # [rank, time, param] if MPI.COMM_WORLD.Get_rank() == 0: full_params_log = np.swapaxes(full_params_log, 0, 1) # # [time, rank, param] # sort by the last element (T_idx) full_params_log = np.array([sorted(r, key=lambda v: v[-1]) for r in full_params_log]) # r[T_idx, param], v[param] [time, T_idx, param] full_params_log = np.swapaxes(full_params_log, 0, 1) # [T_idx, time, param] if bcast_full_params_log: #print "bcast full param log..." m.full_params_log = MPI.COMM_WORLD.bcast(full_params_log, root=0) #print "bcast full param log... done!" else: m.full_params_log = full_params_log return result
""" def make_CDF(y): x = sorted(y) N = len(y) p = 1./N CDF = [] cum_CDF = 0. for x in x: CDF.append(cum_CDF) cum_CDF += p assert abs(cum_CDF - 1.0) < 1e-6 return np.array(x), np.array(CDF) """ def bins_count(brange, group_step, origin): """""" """ see divisibles() in metropolis.cpp for implementation explanation """ totalBins = int(floor((brange[1] - origin)/group_step)) excludeBins = int(floor((brange[0] - origin)/group_step)) return totalBins - excludeBins def make_histo(y, brange, group_step, origin=0.0): N = bins_count(brange, group_step, origin) assert N > 0 h = np.histogram(y, bins=N, range=brange, density = False) return h[0] * (1./len(y)), h[1] def group_by_column(data, grouping_column, brange, group_step, origin=0.0): """""" """ data[time, param] Returns: data[group, time, param] """ N = bins_count(brange, group_step, origin) assert N > 0 def bin_index(r): idx = int(floor((r[grouping_column] - brange[0])/group_step)) if idx < 0: return -1 # too small outlier if idx >= N: return N # too big outlier return idx d =dict([ (k,list(g)) for k,g in groupby(sorted(data, key=bin_index), bin_index)]) return [d.get(i, []) for i in range(N)] def group_statistics(params_log, param_idx, lower_limit, upper_limit, step): """""" """ params_log[time, param_idx] Returns: means: [group, param], zeros for empty groups stds: [group, param], zeros for empty groups probs: [group] outliers_part: float from 0 to 1 """ m = params_log.shape[1] # number of parameters #hist,bins = mc.make_histo(params_log[:, limit_param_idx], (lower_limit, upper_limit), limit_step) groups = group_by_column(params_log, param_idx, (lower_limit, upper_limit), step) means = np.array([np.mean(g, axis=0) if len(g) > 0 else [0.]*m for g in groups]) # [group, param] stds = np.array([np.std(g, axis=0) if len(g) > 0 else [0.]*m for g in groups]) # [group, param] lens = np.array([len(g) for g in groups], dtype=float) # [group, param] probs = lens / sum(lens) if sum(lens) > 0 else [0.] * len(lens) outliers_part = 1.0 - sum(lens) / len(params_log) return means, stds, probs, outliers_part class ACFAnalysis: def __init__(self): pass def __repr__(self): return "AC times: zero {0.zero_time:.3f}; exp {0.ac_time_exp:.3f} (rmse {0.ac_time_exp_rmse:.3f}); int {0.ac_time_int:.3f} len: {1}".format(self, len(self.acf)) def autocorr_analysis(acf, m=10): r = ACFAnalysis() r.acf = acf r.zero_time = np.argmax(acf <= 0.) if r.zero_time == 0 : r.zero_time = len(acf) if r.zero_time > 2: x = np.arange(r.zero_time-1) y = np.log(acf[:r.zero_time-1]) r.ac_time_exp = -sum(x*x)/sum(x*y) # exponential AC time - least squares fitting to exponential shape of ACF r.ac_time_exp_rmse = sum(np.exp( x * (-1./ r.ac_time_exp) - acf[:r.zero_time-1])**2)**0.5 / (r.zero_time-1) else: r.ac_time_exp = 1. r.ac_time_exp_rmse = 0. r.expected_acf_sputter = 1./(len(acf)**0.5) r.acf_sputter = [np.std(acf[i*len(acf)//m:(i+1)*len(acf)//m]) for i in range(m)] r.acf_mean = [np.mean(acf[i*len(acf)//m:(i+1)*len(acf)//m]) for i in range(m)] r.ac_time_int = sum(acf[1:r.zero_time])*2. + 1. # integrated AC time return r def autocorr_func(y_, m = None, method="fft"): """""" """ m - number of items lefter and righter of origin in autocorr_func returns 2*m+1 element array Note: http://golem.fjfi.cvut.cz/wiki/Library/others/BenczeA_Autocorrelation_05.pdf Autocorrelation analysis and statistical consideration for the determination of velocity fluctuations in fusion plasmas DOI: 10.1063/1.1909200 "Our result may be surprising as the relative scatter does not depend on the ns=Ns/DT event rate. One would expect that for higher event rates the statistics should improve. However, in our case the scatter of the correlation function is produced by random overlapping of events; this is proportional to nS. The mean of the correlation function is also proportional to nS, therefore the relative scatter will not de-pend on the event rate. On the other hand, when DT is increased at a fixed event rate the NS number of events increases as well without changing the random coincidence between the events, and as a result the relative scatter de-creases. In this sense the number of measured events does improve statistics." """ N = len(y_) if m is None: m = N-1 assert m < len(y_), ["ACF length must be smaller than sample size", m, N] y = y_ - np.mean(y_) std_y = np.std(y) if std_y == 0: print ("WARNING! Standart deviation of time series is zero. Can't compute autocorrelation. Return None") return None y /= std_y if method=="naive": c = np.correlate(y, y, 'full') assert len(c) == (2*N-1), [len(c), N] Ns = np.array(list(range(1,N)) + [N] + list(range(N-1,0,-1))) c = c / Ns c = c[len(c)/2 :] elif method=="fft": r = np.fft.rfft(y) s = abs(r)**2 c = np.fft.irfft(s) c /= N else: assert False, method return c[: m ] def stat_digest(m, verbose=True): if MPI.COMM_WORLD.Get_rank() != 0: return param_names = m.param_names + ["Energy"] param_indices = list(range(0, len(m.param_names))) + [-2] digest = [] for Ti,T in enumerate(m.T_list): d = {} # i-th temperature dict print ("Temperature # {} = {}".format(Ti,T)) d["T"] = T for pi, pn in zip(param_indices, param_names): pd = {} # param dict print ("\tParameter # {} : '{}'".format(pi,pn)) data = m.full_params_log[Ti,:,pi] mean = np.mean(data) std = np.std(data) pd['mean'] = mean pd['std'] = std print ("\t\tSample size\t", len(data)) print ("\t\tSample mean\t{:.6f}".format(mean)) print ("\t\tSample stdDev\t{:.6f}".format(std)) acf = autocorr_func(data) pd['acf'] = acf if acf is not None: acf_a = autocorr_analysis(acf) pd['acf_a'] = acf_a #print ("\t\tACF\t", acf_a) print ("\t\tAutocorrelation function parameters:") print ("\t\t\tFirst zero time:\t", acf_a.zero_time) print ("\t\t\tExponential time:\t{:.3f} (RMSE={:.3f})".format(acf_a.ac_time_exp, acf_a.ac_time_exp_rmse)) print ("\t\t\tIntegrated time:\t{:.3f}".format(acf_a.ac_time_int)) neff_int = len(data) / acf_a.ac_time_int pd['neff_int'] = neff_int print ("\t\tEffective sample size integrated\t{:.3f}".format(neff_int)) neff_exp = len(data) / (2. * acf_a.ac_time_exp) pd['neff_exp'] = neff_exp print ("\t\tEffective sample size exponential\t{:.3f}".format(neff_exp)) if abs(1. - neff_exp/neff_int) > 0.1: pass # not accurate criterion # print ("\t\t\033[1;33mAchtung!\033[0;0m Integrated and exponential autocorrelation times too different! Probably non-equilibrium state") print ("\t\tSample mean stdDev\t{:.3f}".format(std / neff_int)) d[pn] = pd digest += [d] data = m.full_params_log[Ti,:,param_indices] corrs = np.corrcoef(data, rowvar=True) print ("\tCorrelations between parameters:") for row in corrs: print ('\t\t', ' '.join(map("{:.3f}".format, row))) meanEs = [d['Energy']['mean'] for d in digest] if len(m.T_list) > 1: print("Recommended temperatures:", recommended_temperatures(m.T_list, meanEs)) return digest def recommended_temperatures(T, E): f = interp1d(E, T) desired_energies = np.linspace(min(E), max(E), len(E)) return [f(Ei).item() for Ei in desired_energies] def kmc_setup(m, r0 = 0.0): """ Initialize Metropolis object for Kinetic Monte Carlo simulaion and set r0 parameter """ m.setup_kmc(r0) def wl_setup(m, emin, emax, bins_count): m.wl_emin = emin m.wl_emax = emax m.wl_bins_count = bins_count m.setup_wl({-2:[emin, emax, bins_count]}, 0.8, 1., 1., 1., 100000, 1000000) def wl_analysis_base(lng, N, T, k_B, emin, emax, bins_count): assert len(lng) == bins_count, (len(lng), bins_count) E = np.linspace(emin, emax, bins_count) beta = 1./(T*k_B) w_per_N = np.exp( (lng - E*beta)/N ) # = np.exp( (lng + A - E*beta)/N ) / exp(A/N) = w_per_N_cor / exp(A/N) # w == w_per_N ** N w_per_N_max = max(w_per_N) # = max(w_per_N_cor / exp(A/N)) = max(w_per_N_cor) / exp(A/N) = w_per_N_max_cor / exp(A/N) w_per_N_normed = w_per_N / w_per_N_max # w_per_N == w_per_N_max * w_per_N_normed # w == (w_per_N_max ** N) * (w_per_N_normed ** N) w_normed = w_per_N_normed ** N # w == (w_per_N_max ** N) * w_normed # Z == sum(w) == (w_per_N_max ** N) * sum(w_normed) Z_normed = sum(w_normed) # Z = (w_per_N_max ** N) * Z_normed # Z_per_N == Z ** (1/N) == w_per_N_max * (Z_normed ** 1/N) Z_per_N = w_per_N_max * (Z_normed ** (1/N)) F_per_N = -log(Z_per_N)/beta # probs = w/Z = (w_per_N_max ** N) * w_normed / ((w_per_N_max ** N) * Z_normed) = w_normed / Z_normed probs = w_normed / Z_normed E_per_N = E / N U_per_N = sum(E_per_N*probs) DE_per_N = N * sum(E_per_N*E_per_N*probs) C_per_N = (DE_per_N - N* (U_per_N**2) ) / (k_B * T**2) S_per_N = beta * (U_per_N - F_per_N) WLAnalysis = namedtuple('WLAnalysis', 'T beta w Z F probs U DE C S') return WLAnalysis(T, beta, w_per_N, Z_per_N, F_per_N, probs, U_per_N, DE_per_N, C_per_N, S_per_N) def wl_analysis(m, T=None): if T is None: T = m.T_list[0] lng = m.get_named_data('ln_g')[1:-1] lng -= min(lng) return wl_analysis_base(lng, m.cells_count, T, m.k_B, m.wl_emin, m.wl_emax, m.wl_bins_count)