Source code for sphractal.surfExact

from concurrent.futures import ProcessPoolExecutor as Pool
from math import ceil, log10
from multiprocessing import cpu_count
from os import mkdir
from os.path import isdir
from time import time

from numba import njit
import numpy as np

from sphractal.utils import calcDist, oppositeInnerAtoms
# from sphractal.utils import annotate


[docs]@njit(fastmath=True, cache=True) def getNearFarCoord(scanBoxIdx, boxLen, lowBound, atomCoord, bufferDist=5.0): """Find the nearest and furthest point of a given box from a given atom.""" scanBoxMax = lowBound - bufferDist + (scanBoxIdx+1)*boxLen scanBoxMin = scanBoxMax - boxLen if atomCoord < scanBoxMin: scanBoxNear, scanBoxFar = scanBoxMin, scanBoxMax elif atomCoord > scanBoxMax: scanBoxNear, scanBoxFar = scanBoxMax, scanBoxMin else: scanBoxNear = atomCoord # The farthest point is always a corner scanBoxFar = scanBoxMin if scanBoxMax-atomCoord < boxLen/2 else scanBoxMax return scanBoxNear, scanBoxFar
[docs]@njit(fastmath=True, cache=True) def scanBox(minXYZ, scanBoxIdxs, scanBoxNearFarXYZs, boxLen, atomIdx, atomRad, atomXYZ, atomNeighIdxs, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist=5.0, rmInSurf=True): """Find the nearest and furthest point of a given box from a given atom.""" # Remove the box if it covers the inner surface if rmInSurf: scanBoxX = minXYZ[0] - bufferDist + (scanBoxIdxs[0]+1)*boxLen - boxLen*0.5 scanBoxY = minXYZ[1] - bufferDist + (scanBoxIdxs[1]+1)*boxLen - boxLen*0.5 scanBoxZ = minXYZ[2] - bufferDist + (scanBoxIdxs[2]+1)*boxLen - boxLen*0.5 if not oppositeInnerAtoms(np.array((scanBoxX, scanBoxY, scanBoxZ)), atomXYZ, atomNeighIdxs, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs): return 'none' # Check what does the box cover distNear = calcDist(atomXYZ, np.array(scanBoxNearFarXYZs[:3])) distFar = calcDist(atomXYZ, np.array(scanBoxNearFarXYZs[-3:])) if distNear < atomRad < distFar: return 'surf' elif distFar < atomRad: return 'bulk' else: return 'none'
# @annotate('scanAtom', color='cyan')
[docs]@njit(fastmath=True, cache=True) def scanAtom(args): """Count the number of boxes that cover the outer spherical surface of a given atom.""" magn, boxLen, minXYZ, atomIdx, atomRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist, rmInSurf = args atomXYZ, atomNeighIdxsPadded = atomsXYZ[atomIdx], atomsNeighIdxs[atomIdx] atomNeighIdxs = atomNeighIdxsPadded[atomNeighIdxsPadded > -1] atomX, atomY, atomZ = atomXYZ minX, minY, minZ = minXYZ atomBoxIdxX = int((atomX - minX + bufferDist)/boxLen) atomBoxIdxY = int((atomY - minY + bufferDist)/boxLen) atomBoxIdxZ = int((atomZ - minZ + bufferDist)/boxLen) numScan = ceil((atomRad + boxLen)/boxLen) atomSurfBoxs, atomBulkBoxs = [], [] for i in range(-numScan, numScan + 1): scanBoxIdxX = atomBoxIdxX + i if scanBoxIdxX < 0 or scanBoxIdxX >= magn: continue scanBoxNearX, scanBoxFarX = getNearFarCoord(scanBoxIdxX, boxLen, minX, atomX, bufferDist) for j in range(-numScan, numScan + 1): scanBoxIdxY = atomBoxIdxY + j if scanBoxIdxY < 0 or scanBoxIdxY >= magn: continue scanBoxNearY, scanBoxFarY = getNearFarCoord(scanBoxIdxY, boxLen, minY, atomY, bufferDist) for k in range(-numScan, numScan + 1): scanBoxIdxZ = atomBoxIdxZ + k if scanBoxIdxZ < 0 or scanBoxIdxZ >= magn: continue scanBoxNearZ, scanBoxFarZ = getNearFarCoord(scanBoxIdxZ, boxLen, minZ, atomZ, bufferDist) belong = scanBox(minXYZ, (scanBoxIdxX, scanBoxIdxY, scanBoxIdxZ), (scanBoxNearX, scanBoxNearY, scanBoxNearZ, scanBoxFarX, scanBoxFarY, scanBoxFarZ), boxLen, atomIdx, atomRad, atomXYZ, atomNeighIdxs, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist, rmInSurf) if belong == 'surf': atomSurfBoxs.append((scanBoxIdxX, scanBoxIdxY, scanBoxIdxZ)) elif belong == 'bulk': atomBulkBoxs.append((scanBoxIdxX, scanBoxIdxY, scanBoxIdxZ)) return atomSurfBoxs, atomBulkBoxs
# @annotate('scanAtomsForLoop', color='cyan')
[docs]@njit(fastmath=True, cache=True) def scanAtomsForLoop(atomsIdxs, magn, boxLen, minXYZ, atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist=5.0, rmInSurf=True): """Serialised loop to scan the atoms for timing comparison with the parallelised version.""" allAtomsSurfBoxs, allAtomsBulkBoxs = [], [] for atomIdx in atomsIdxs: scanAtomInp = (magn, boxLen, minXYZ, atomIdx, atomsRad[atomIdx], atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist, rmInSurf) atomSurfBoxs, atomBulkBoxs = scanAtom(scanAtomInp) allAtomsSurfBoxs.extend(atomSurfBoxs) allAtomsBulkBoxs.extend(atomBulkBoxs) return allAtomsSurfBoxs, allAtomsBulkBoxs
# @annotate('scanAllAtoms', color='magenta')
[docs]def scanAllAtoms(args): """Count the number of boxes that cover the outer spherical surface of a set of atoms for a given box size.""" magn, boxLen, atomsIdxs, minXYZ, atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist, rmInSurf, verbose, maxCPU = args scanAtomInps = [(magn, boxLen, minXYZ, atomIdx, atomsRad[atomIdx], atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist, rmInSurf) for atomIdx in atomsIdxs] allAtomsSurfBoxs, allAtomsBulkBoxs = [], [] with Pool(max_workers=maxCPU) as pool: for scanAtomResult in pool.map(scanAtom, scanAtomInps, chunksize=ceil(len(atomsIdxs) / maxCPU)): allAtomsSurfBoxs.extend(scanAtomResult[0]) allAtomsBulkBoxs.extend(scanAtomResult[1]) # allAtomsSurfBoxs, allAtomsBulkBoxs = scanAtomsForLoop(atomsIdxs, magn, boxLen, minXYZ, # atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, # bufferDist, rmInSurf) allAtomsSurfBoxs, allAtomsBulkBoxs = set(allAtomsSurfBoxs), set(allAtomsBulkBoxs) allAtomsSurfBoxs.difference_update(allAtomsBulkBoxs) if verbose: epsInvStr = f"{1 / boxLen:.2f}" print(f"{epsInvStr.rjust(10)}{str(len(allAtomsBulkBoxs)).rjust(12)}{str(len(allAtomsSurfBoxs)).rjust(12)}") return allAtomsSurfBoxs, allAtomsBulkBoxs
# @annotate('writeBoxCoords', color='yellow')
[docs]def writeBoxCoords(atomsEle, atomsXYZ, allSurfBoxs, allBulkBoxs, minXYZ, boxLens, bufferDist, outDir, npName): """Write out coordinates of scanned boxes.""" minX, minY, minZ = minXYZ boxCoordsDir = f"{outDir}/boxCoords" if not isdir(boxCoordsDir): if not isdir(outDir): mkdir(outDir) mkdir(boxCoordsDir) with open(f"{boxCoordsDir}/{npName}_boxCoords.xyz", 'w') as f: for (i, boxLen) in enumerate(boxLens): if i != 0: f.write('\n') f.write(f"{len(atomsEle) + len(allSurfBoxs[i]) + len(allBulkBoxs[i])}\n") for (j, atomXYZ) in enumerate(atomsXYZ): f.write(f"\n{atomsEle[j]}\t{atomXYZ[0]} {atomXYZ[1]} {atomXYZ[2]}") for (boxIDX, boxIDY, boxIDZ) in allSurfBoxs[i]: boxX = minX - bufferDist + boxIDX*boxLen + boxLen/2 boxY = minY - bufferDist + boxIDY*boxLen + boxLen/2 boxZ = minZ - bufferDist + boxIDZ*boxLen + boxLen/2 f.write(f"\nOV\t{boxX:.6f} {boxY:.6f} {boxZ:.6f}") for (boxIDX, boxIDY, boxIDZ) in allBulkBoxs[i]: boxX = minX - bufferDist + boxIDX*boxLen + boxLen/2 boxY = minY - bufferDist + boxIDY*boxLen + boxLen/2 boxZ = minZ - bufferDist + boxIDZ*boxLen + boxLen/2 f.write(f"\nIV\t{boxX:.6f} {boxY:.6f} {boxZ:.6f}")
# @annotate('findAtomsWithSurfNeighs', color='cyan')
[docs]@njit(fastmath=True, cache=True) def findAtomsWithSurfNeighs(atomsNeighIdxs, atomsSurfIdxs): """Find atoms with neighbours that are on the surface.""" atomsIdxs = [] for (atomIdx, atomNeighIdxs) in enumerate(atomsNeighIdxs): for neighIdx in atomNeighIdxs: if neighIdx < 0: break if neighIdx in atomsSurfIdxs: atomsIdxs.append(atomIdx) break return np.array(atomsIdxs) if len(atomsIdxs) > 0 else atomsSurfIdxs
# @annotate('exactBoxCnts', color='blue')
[docs]def exactBoxCnts(atomsEle, atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, maxRange, minMaxBoxLens, minXYZ, npName, outDir='outputs', numCPUs=None, numBoxLen=10, bufferDist=5.0, rmInSurf=True, writeBox=True, verbose=False): """ Count the boxes that cover the outer surface of a set of overlapping spheres represented as exact spheres for different box sizes. Parameters ---------- atomsEle : 1D ndarray of strs Element type of each atom. atomsRad : 1D ndarray of floats Radius of each atom. atomsSurfIdxs : 1D ndarray of ints Indices of surface atoms. atomsXYZ : 2D ndarray of floats Cartesian coordinates of each atom. atomsNeighIdxs : 2D ndarray of ints Neighbour atoms indices of each atom. maxRange : float Maximum range among all dimensions of the Cartesian space, defines the borders of the largest box. minMaxBoxLens : tuple of floats Minimum and maximum box lengths. minXYZ : 1D ndarray of floats Minimum values of each dimension in the Cartesian space. npName : str Identifier of the measured object, which forms part of the output file name, ideally unique. outDir : str, optional Path to the directory to store the output files. numCPUs : int, optional Number of CPUs to be used for parallelisation of tasks. numBoxLen : int, optional Number of box lengths to use for the collection of the box count data, spaced evenly on logarithmic scale. bufferDist : Union[int,float] Buffer distance from the borders of the largest box in Angstrom. rmInSurf : bool, optional Whether to remove the surface points on the inner surface. writeBox : bool, optional Whether to generate output files for visualisation. verbose : bool, optional Whether to display the details. Returns ------- scales : list of floats Box lengths. counts : list of floats Number of boxes that cover the exact spherical surface of interest. Examples -------- >>> eles, rads, xyzs, _, minxyz, maxxyz = readInp('example.xyz') >>> neighs, _ = findNN(rads, xyzs, minxyz, maxxyz, 1.2) >>> surfs = findSurf(xyzs, neighs, 'alphaShape', 5.0) >>> scalesES, countsES = exactBoxCnts(eles, rads, surfs, xyzs, neighs, 100, (0.2, 1), minxyz, 'example') """ atomsIdxs = atomsSurfIdxs if rmInSurf else np.array(range(len(atomsEle))) if numCPUs is None: numCPUs = cpu_count() # Resource allocations for parallelisation (current settings are based on empirical experiments -- optimised for the default minMaxBoxLens range), rooms available for further optimisation minAtomCPUperLen = max(1, len(atomsIdxs) // 25) maxBoxLenCPU = ceil(numBoxLen / 2) if numCPUs > maxBoxLenCPU * minAtomCPUperLen: atomConcMaxCPU = numCPUs // maxBoxLenCPU boxLenConcMaxCPU = maxBoxLenCPU elif numCPUs > minAtomCPUperLen: atomConcMaxCPU = minAtomCPUperLen boxLenConcMaxCPU = numCPUs // minAtomCPUperLen else: atomConcMaxCPU, boxLenConcMaxCPU = numCPUs, 1 if verbose: print(f" Representing the surface by treating each atom as exact spheres...") print(f" Scanning over:\n {numBoxLen} box lengths using {boxLenConcMaxCPU} cpu(s)...\n {len(atomsIdxs)} atoms using {atomConcMaxCPU} cpu(s)...") print(f" (1/eps) (# bulk) (# surf)") if writeBox: if not isdir(outDir): mkdir(outDir) overallBoxLen = maxRange + bufferDist * 2 allLensSurfBoxs, allLensBulkBoxs, allLensSurfCnts, allLensBulkCnts = [], [], [], [] scales, scanBoxLens, scanAllAtomsInps = [], [], [] approxScanBoxLens = np.geomspace(minMaxBoxLens[1], minMaxBoxLens[0], num=numBoxLen) for approxScanBoxLen in approxScanBoxLens: # Evenly reduced box lengths on log scale magnFac = int(overallBoxLen / approxScanBoxLen) scanBoxLen = overallBoxLen / magnFac scanAllAtomsInp = (magnFac, scanBoxLen, atomsIdxs, minXYZ, atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, bufferDist, rmInSurf, verbose, atomConcMaxCPU) if boxLenConcMaxCPU > 1: scanAllAtomsInps.append(scanAllAtomsInp) else: scanAllAtomsResult = scanAllAtoms(scanAllAtomsInp) allAtomsSurfBoxs, allAtomsBulkBoxs = scanAllAtomsResult allLensSurfBoxs.append(allAtomsSurfBoxs) allLensBulkBoxs.append(allAtomsBulkBoxs) allLensSurfCnts.append(len(allAtomsSurfBoxs)) scales.append(log10(magnFac / overallBoxLen)) scanBoxLens.append(scanBoxLen) if boxLenConcMaxCPU > 1: with Pool(max_workers=boxLenConcMaxCPU) as pool: for scanAllAtomsResult in pool.map(scanAllAtoms, scanAllAtomsInps, chunksize=ceil(numBoxLen / boxLenConcMaxCPU)): allAtomsSurfBoxs, allAtomsBulkBoxs = scanAllAtomsResult allLensSurfBoxs.append(allAtomsSurfBoxs) allLensBulkBoxs.append(allAtomsBulkBoxs) allLensSurfCnts.append(len(allAtomsSurfBoxs)) counts = [log10(sCnt) if sCnt != 0 else np.nan for sCnt in allLensSurfCnts] if writeBox: writeBoxCoords(atomsEle, atomsXYZ, allLensSurfBoxs, allLensBulkBoxs, minXYZ, scanBoxLens, bufferDist, outDir, npName) return scales, counts