Source code for sphractal.surfVoxel

from concurrent.futures import ProcessPoolExecutor as Pool
from math import ceil, cos, log10, pi, sin, sqrt
from multiprocessing import cpu_count
from os import mkdir, system
from os.path import isdir

from numba import njit
import numpy as np

from sphractal.constants import ATOMIC_RAD_DICT, METALLIC_RAD_DICT
from sphractal.utils import calcDist, oppositeInnerAtoms
# from sphractal.utils import annotate


[docs]@njit(fastmath=True, cache=True) def fibonacciSphere(numPoints, sphereRad): """Generate evenly spread points on the surface of a sphere with a specified radius.""" xyzs = np.empty((numPoints, 3), dtype=np.float64) phi = pi * (sqrt(5)-1) # Golden angle (radians) for i in range(numPoints): y = 1 - (i / float(numPoints-1))*2 # y \in [1, -1] radius = sqrt(1 - y*y) # Radius at y theta = phi * i # Golden angle increment x = cos(theta) * radius z = sin(theta) * radius xyzs[i] = (x * sphereRad, y * sphereRad, z * sphereRad) return xyzs
[docs]@njit(fastmath=True, cache=True) def withinNeighRad(surfPointXYZ, atomNeighIdxs, atomsRad, atomsXYZ): """Return whether a surface point generated around a given atom falls within the radius of a neighbouring atom.""" for neighIdx in atomNeighIdxs: if calcDist(atomsXYZ[neighIdx], surfPointXYZ) <= atomsRad[neighIdx]: return True return False
[docs]@njit(fastmath=True, cache=True) def rmPoint(args): """Check whether a point falls on the outer surface.""" surfPoint, atomXYZ, atomNeighIdxs, atomsRad, atomsXYZ, rmInSurf, atomsSurfIdxs, atomsNeighIdxs = args surfPointXYZ = surfPoint + atomXYZ if withinNeighRad(surfPointXYZ, atomNeighIdxs, atomsRad, atomsXYZ): return 'isWithinRad', surfPointXYZ if (not rmInSurf) or (rmInSurf and oppositeInnerAtoms(surfPointXYZ, atomXYZ, atomNeighIdxs, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs)): return 'toExclude', surfPointXYZ return 'toInclude', surfPointXYZ
# @annotate('pointsOnAtom', color='cyan')
[docs]def pointsOnAtom(args): """Generate surface points around an atom and classify them as either inner or outer surface.""" atomIdx, numPoints, atomsSurfIdxs, atomsRad, atomsXYZ, atomsNeighIdxs, maxCPU, surfPoints, rmInSurf = args if surfPoints is None: surfPoints = fibonacciSphere(numPoints, atomsRad[atomIdx]) atomXYZ, atomNeighIdxsPadded = atomsXYZ[atomIdx], atomsNeighIdxs[atomIdx] atomNeighIdxs = atomNeighIdxsPadded[atomNeighIdxsPadded > -1] rmPointInp, outerSurfs, innerSurfs = [], [], [] # Include points that fall on surface of interest for surfPoint in surfPoints: if maxCPU > 1: rmPointInp.append((surfPoint, atomXYZ, atomNeighIdxs, atomsRad, atomsXYZ, rmInSurf, atomsSurfIdxs, atomsNeighIdxs)) continue pointPosition, surfPointXYZ = rmPoint((surfPoint, atomXYZ, atomNeighIdxs, atomsRad, atomsXYZ, rmInSurf, atomsSurfIdxs, atomsNeighIdxs)) if pointPosition == 'toExclude': outerSurfs.append(surfPointXYZ) elif pointPosition == 'toInclude': innerSurfs.append(surfPointXYZ) # Parallel implementation of the point position assessment algorithm if maxCPU > 1: with Pool(max_workers=maxCPU) as pool: for rmPointResult in pool.map(rmPoint, rmPointInp, chunksize=ceil(numPoints / maxCPU)): if rmPointResult[0] == 'toExclude': outerSurfs.append(rmPointResult[1]) elif rmPointResult[0] == 'toInclude': innerSurfs.append(rmPointResult[1]) return outerSurfs, innerSurfs
# @annotate('pointsToVoxels', color='magenta')
[docs]@njit(fastmath=True, cache=True) def pointsToVoxels(pointXYZs, gridSize): """Turn coordinates of point clouds into coordinates of occupied voxels.""" # Get segmenting points in 3D space rangeXYZ = np.array([pointXYZs[:, i].max() - pointXYZs[:, i].min() for i in range(3)]) maxRange = max(rangeXYZ) minXYZs, maxXYZs, segmentXYZs = np.empty(3), np.empty(3), np.empty((3, gridSize + 1)) for i in range(3): minXYZs[i] = pointXYZs[:, i].min() - (maxRange-rangeXYZ[i]) * 0.5 maxXYZs[i] = pointXYZs[:, i].max() + (maxRange-rangeXYZ[i]) * 0.5 segmentXYZs[i] = np.linspace(minXYZs[i], maxXYZs[i], num=gridSize+1) # Get the index of the segment that each point lies within voxelXs = np.clip(np.searchsorted(segmentXYZs[0], pointXYZs[:, 0]) - 1, 0, gridSize) voxelYs = np.clip(np.searchsorted(segmentXYZs[1], pointXYZs[:, 1]) - 1, 0, gridSize) voxelZs = np.clip(np.searchsorted(segmentXYZs[2], pointXYZs[:, 2]) - 1, 0, gridSize) # Get indices of occupied voxels after flattened voxelIdxs = set(voxelXs*gridSize*gridSize + voxelYs*gridSize + voxelZs) return np.column_stack((voxelXs, voxelYs, voxelZs)), voxelIdxs
# @annotate('writeSurfVoxelIdxs', color='yellow')
[docs]def writeSurfVoxelIdxs(outDir, voxelIdxs): """Generate a txt file required for 3D box-counting using C++ code written by Ruiz de Miras and Posadas.""" with open(f"{outDir}/surfVoxelIdxs.txt", 'w') as f: for idx in voxelIdxs: f.write(f"{idx}\n")
# @annotate('writePCD', color='yellow')
[docs]def writePCD(outDir, npName, surfPointXYZs): """Generate a pcd file required for 3D box-counting using MATLAB code written by Kazuaki Iida.""" surfPointsDir = f"{outDir}/surfPoints" if not isdir(surfPointsDir): if not isdir(outDir): mkdir(outDir) mkdir(surfPointsDir) with open(f"{surfPointsDir}/{npName}_surfPoints.pcd", 'w') as f: f.write('# .PCD v.7 - Point Cloud Data file format\nVERSION .7') f.write('\nFIELDS x y z\nSIZE 4 4 4\nTYPE F F F\nCOUNT 1 1 1\n') f.write(f"WIDTH {len(surfPointXYZs)}\nHEIGHT 1\nPOINTS {len(surfPointXYZs)}\nDATA ascii\n") for xyz in surfPointXYZs: f.write(f"{xyz[0]} {xyz[1]} {xyz[2]}\n")
# @annotate('writeSurfPoints', color='blue')
[docs]def writeSurfPoints(outDir, npName, atomsSurfIdxs, atomsXYZ, surfPointXYZs, nonSurfPointXYZs): """Generate an xyz file for visualisation of classified point clouds.""" surfPointsDir = f"{outDir}/surfPoints" if not isdir(surfPointsDir): if not isdir(outDir): mkdir(outDir) mkdir(surfPointsDir) with open(f"{surfPointsDir}/{npName}_surfPoints.xyz", 'w') as f: f.write(f"{len(surfPointXYZs) + len(nonSurfPointXYZs) + len(atomsSurfIdxs)}\n\n") for (i, xyz) in enumerate(surfPointXYZs): f.write(f"OU {xyz[0]} {xyz[1]} {xyz[2]} {i}\n") for (i, xyz) in enumerate(nonSurfPointXYZs): f.write(f"IN {xyz[0]} {xyz[1]} {xyz[2]} {i}\n") for atomIdx in atomsSurfIdxs: xyz = atomsXYZ[atomIdx] f.write(f"SU {xyz[0]} {xyz[1]} {xyz[2]} {atomIdx}\n")
# @annotate('writeSurfVoxels', color='green')
[docs]def writeSurfVoxels(outDir, npName, surfVoxelXYZs): """Generate an xyz file useful for visualisation of computed surface voxels.""" surfVoxelsDir = f"{outDir}/surfVoxels" if not isdir(surfVoxelsDir): if not isdir(outDir): mkdir(outDir) mkdir(surfVoxelsDir) with open(f"{surfVoxelsDir}/{npName}_surfVoxels.xyz", 'w') as f: f.write(f"{len(surfVoxelXYZs)}\n\n") for (i, xyz) in enumerate(surfVoxelXYZs): f.write(f"VX {xyz[0]} {xyz[1]} {xyz[2]} {i}\n")
# @annotate('getSurfPoints', color='cyan')
[docs]def genSurfPoints(atomsEle, atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, npName, outDir='outputs', numCPUs=None, radType='atomic', numPoints=10000, gridNum=1024, rmInSurf=True, vis=False, verbose=False, genPCD=False): """Generate point clouds approximating the outer spherical surface formed by a set of atoms.""" # Avoid repeating generation of surface points around atoms with the same radii radDict = ATOMIC_RAD_DICT if radType == 'atomic' else METALLIC_RAD_DICT surfPointsEles = {atomEle: fibonacciSphere(numPoints, radDict[atomEle]) for atomEle in set(atomsEle)} # Resource allocations if numCPUs is None: numCPUs = cpu_count() minAtomCPU = max(1, len(atomsSurfIdxs) // 25) maxPointCPUperAtom = ceil(numPoints / numPoints) # ceil(numPoints / 25) if numCPUs > maxPointCPUperAtom * minAtomCPU: atomConcMaxCPU = numCPUs // maxPointCPUperAtom pointConcMaxCPU = maxPointCPUperAtom # elif numCPUs > minAtomCPU: # atomConcMaxCPU = minAtomCPU # pointConcMaxCPU = numCPUs // minAtomCPU else: atomConcMaxCPU, pointConcMaxCPU = numCPUs, 1 if verbose: print(f" Assessing points over:\n {len(atomsSurfIdxs)} atoms using {atomConcMaxCPU} cpu(s)...\n" f" {numPoints} points using {pointConcMaxCPU} cpu(s)...") # Generate point clouds and convert to voxels surfPointXYZs, nonSurfPointXYZs = [], [] pointsOnAtomInp = [] for atomIdx in atomsSurfIdxs: if atomConcMaxCPU > 1: # Adjust back pointsOnAtomInp.append((atomIdx, numPoints, atomsSurfIdxs, atomsRad, atomsXYZ, atomsNeighIdxs, pointConcMaxCPU, surfPointsEles[atomsEle[atomIdx]], rmInSurf)) else: outerSurfs, innerSurfs = pointsOnAtom((atomIdx, numPoints, atomsSurfIdxs, atomsRad, atomsXYZ, atomsNeighIdxs, pointConcMaxCPU, surfPointsEles[atomsEle[atomIdx]], rmInSurf)) surfPointXYZs.extend(outerSurfs) nonSurfPointXYZs.extend(innerSurfs) # Parallel implementation of the points generation around atoms if atomConcMaxCPU > 1: with Pool(max_workers=atomConcMaxCPU) as pool: for pointsOnAtomResult in pool.map(pointsOnAtom, pointsOnAtomInp, chunksize=ceil(len(atomsSurfIdxs) / atomConcMaxCPU)): surfPointXYZs.extend(pointsOnAtomResult[0]) nonSurfPointXYZs.extend(pointsOnAtomResult[1]) surfVoxelXYZs, surfVoxelIdxs = pointsToVoxels(np.array(surfPointXYZs), gridNum) # Generate output files writeSurfVoxelIdxs(outDir, surfVoxelIdxs) if verbose: print(f" {len(surfPointXYZs)} surface points -> {len(surfVoxelIdxs)} voxels, # grids: {gridNum}") if genPCD: writePCD(outDir, npName, surfPointXYZs) if vis: writeSurfPoints(outDir, npName, atomsSurfIdxs, atomsXYZ, surfPointXYZs, nonSurfPointXYZs) writeSurfVoxels(outDir, npName, surfVoxelXYZs)
# @annotate('voxelBoxCnts', color='blue')
[docs]def voxelBoxCnts(atomsEle, atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, npName, outDir='outputs', numCPUs=None, exePath='$FASTBC', radType='atomic', numPoints=300, gridNum=1024, rmInSurf=True, vis=True, verbose=False, genPCD=False): """ Count the boxes that cover the outer surface of a set of overlapping spheres represented as point clouds for different box sizes, using 3D box-counting algorithm written by Ruiz de Miras et al. in C++. IMPORTANT: Make sure the source code has been downloaded from https://github.com/Jon-Ting/fastBC and compiled on your machine. 'exePath' should point to the right directory if FASTBC is not set as an environment variable. Parameters ---------- atomsEle : 1D ndarray of strs Element type of each atom. atomsRad : 1D ndarray of floats Radius of each atom. atomsSurfIdxs : 1D ndarray of ints Indices of surface atoms. atomsXYZ : 2D ndarray of floats Cartesian coordinates of each atom. atomsNeighIdxs : 2D ndarray of ints Neighbour atoms indices of each atom. npName : str Identifier of the measured object, which forms part of the output file name, ideally unique. outDir : str, optional Path to the directory to store the output files. numCPUs : int, optional Number of CPUs to be used for parallelisation of tasks. exePath : str, optional Path to the compiled C++ executable for box-counting. radType : {'atomic', 'metallic'}, optional Type of radii to use for the spheres. numPoints : int, optional Number of surface points to be generated around each atom. gridNum : int, optional Resolution of the 3D binary image. rmInSurf : bool, optional Whether to remove the surface points on the inner surface. vis : bool, optional Whether to generate output files for visualisation. verbose : bool Whether to display the details. genPCD : bool, optional Whether to generate pcd file for box-counting using MATLAB code written by Kazuaki Iida. Returns ------- scales : list Box lengths. counts : list Number of boxes that cover the surface of interest, as represented by the voxels in the 3D binary image. Examples -------- >>> eles, rads, xyzs, _, minxyz, maxxyz = readInp('example.xyz') >>> neighs, _ = findNN(rads, xyzs, minxyz, maxxyz, 1.2) >>> surfs = findSurf(xyzs, neighs, 'alphaShape', 5.0) >>> scalesPC, countsPC = voxelBoxCnts(eles, rads, surfs, xyzs, neighs, 'example') Notes ----- The 3D binary image resolution (gridNum) is restricted to 1024 or lower. Details about maximum grid size and memory estimation could be found in 'test.cpp' documented by the authors (https://www.ugr.es/~demiras/fbc/). """ if verbose: print(f" Approximating the surface with {numPoints} points for each atom...") if not isdir(outDir): mkdir(outDir) genSurfPoints(atomsEle, atomsRad, atomsSurfIdxs, atomsXYZ, atomsNeighIdxs, npName, outDir, numCPUs, radType, numPoints, gridNum, rmInSurf, vis, verbose, genPCD) system(f"{exePath} {gridNum} {outDir}/surfVoxelIdxs.txt {outDir}/surfVoxelBoxCnts.txt") scales, counts = [], [] with open(f"{outDir}/surfVoxelBoxCnts.txt", 'r') as f: for line in f: scales.append(log10(1 / int(line.split()[0]))) counts.append(log10(int(line.split()[1]))) return scales[::-1], counts[::-1]