Source code for spam.label.contacts

"""
Library of SPAM functions for dealing with contacts between particles
Copyright (C) 2020 SPAM Contributors

This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option)
any later version.

This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
more details.

You should have received a copy of the GNU General Public License along with
this program.  If not, see <http://www.gnu.org/licenses/>.
"""

import multiprocessing

try:
    multiprocessing.set_start_method("fork")
except RuntimeError:
    pass

import numpy
import progressbar
import scipy.ndimage  # for the contact detection
import skimage.feature  # for the maxima of the EDM
import spam.label
from spam.label.labelToolkit import labelContacts

contactType = "<u4"
# Global number of processes
nProcessesDefault = multiprocessing.cpu_count()


[docs] def contactingLabels(lab, labelsList=None, areas=False, boundingBoxes=None, centresOfMass=None): """ This function returns contacting labels for a given label or list of labels, and optionally the number of voxels involved in the contact. This is designed for an interpixel watershed where there are no watershed lines, and so contact detection is done by dilating the particle in question once and seeing what other labels in comes in contact with. Parameters ----------- lab : 3D numpy array of ints ( of type spam.label.toolkit.labelType) An array of labels with 0 as the background labels : int or a list of labels Labels for which we should compute neighbours areas : bool, optional (default = False) Compute contact "areas"? *i.e.*, number of voxels Careful, this is not a physically correct quantity, and is measured only as the overlap from ``label`` to its neightbours. See ``c ontactPoint`` for something more meaningful boundingBoxes : lab.max()x6 array of ints, optional Bounding boxes in format returned by ``boundingBoxes``. If not defined (Default = None), it is recomputed by running ``boundingBoxes`` centresOfMass : lab.max()x3 array of floats, optional Centres of mass in format returned by ``centresOfMass``. If not defined (Default = None), it is recomputed by running ``centresOfMass`` Returns -------- For a single "labels", a list of contacting labels. if areas == True, then a list of "areas" is also returned. For multiple labels, as above but a lsit of lists in the order of the labels Note ----- Because of dilation, this function is not very safe at the edges, and will throw an exception """ # Default state for single single = False if boundingBoxes is None: boundingBoxes = spam.label.boundingBoxes(lab) if centresOfMass is None: centresOfMass = spam.label.centresOfMass(lab, boundingBoxes=boundingBoxes) if labelsList is None: labelsList = list(range(0, lab.max() + 1)) # I guess there's only one label if type(labelsList) != list: labelsList = [labelsList] single = True contactingLabels = [] contactingAreas = [] for label in labelsList: # catch zero and return it in order to keep arrays aligned if label == 0: contactingLabels.append([0]) if areas: contactingAreas.append([0]) else: p1 = spam.label.getLabel( lab, label, boundingBoxes=boundingBoxes, centresOfMass=centresOfMass, margin=2, ) p2 = spam.label.getLabel( lab, label, boundingBoxes=boundingBoxes, centresOfMass=centresOfMass, margin=2, labelDilate=1, ) dilOnly = numpy.logical_xor(p2["subvol"], p1["subvol"]) labSlice = lab[ p1["slice"][0].start : p1["slice"][0].stop, p1["slice"][1].start : p1["slice"][1].stop, p1["slice"][2].start : p1["slice"][2].stop, ] if dilOnly.shape == labSlice.shape: intersection = dilOnly * labSlice counts = numpy.unique(intersection, return_counts=True) # Print counts starting from the 1th column since we'll always have a ton of zeros # print "\tLabels:\n\t",counts[0][1:] # print "\tCounts:\n\t",counts[1][1:] contactingLabels.append(counts[0][1:]) if areas: contactingAreas.append(counts[1][1:]) else: # The particle is near the edge, pad it padArray = numpy.zeros((3, 2)).astype(int) sliceArray = numpy.zeros((3, 2)).astype(int) for i, sl in enumerate(p1["slice"]): if sl.start < 0: padArray[i, 0] = -1 * sl.start sliceArray[i, 0] = 0 else: sliceArray[i, 0] = sl.start if sl.stop > lab.shape[i]: padArray[i, 1] = sl.stop - lab.shape[i] sliceArray[i, 1] = lab.shape[i] else: sliceArray[i, 1] = sl.stop labSlice = lab[ sliceArray[0, 0] : sliceArray[0, 1], sliceArray[1, 0] : sliceArray[1, 1], sliceArray[2, 0] : sliceArray[2, 1], ] labSlicePad = numpy.pad(labSlice, padArray) intersection = dilOnly * labSlicePad counts = numpy.unique(intersection, return_counts=True) contactingLabels.append(counts[0][1:]) if areas: contactingAreas.append(counts[1][1:]) # Flatten if it's a list with only one object if single: contactingLabels = contactingLabels[0] if areas: contactingAreas = contactingAreas[0] # Now return things if areas: return contactingLabels, contactingAreas else: return contactingLabels
[docs] def contactPoints( lab, contactPairs, returnContactZones=False, boundingBoxes=None, centresOfMass=None, verbose=False, ): """ Get the point, or area of contact between contacting particles. This is done by looking at the overlap of a 1-dilation of particle1 onto particle 2 and vice versa. Parameters ----------- lab : 3D numpy array of ints ( of type spam.label.toolkit.labelType) An array of labels with 0 as the background contactPairs : list (or list of list) of labels Pairs of labels for which we should compute neighbours returnContactZones : bool, optional (default = False) Output a labelled volume where contact zones are labelled according to the order of contact pairs? If False, the centres of mass of the contacts are returned boundingBoxes : lab.max()x6 array of ints, optional Bounding boxes in format returned by ``boundingBoxes``. If not defined (Default = None), it is recomputed by running ``boundingBoxes`` centresOfMass : lab.max()x3 array of floats, optional Centres of mass in format returned by ``centresOfMass``. If not defined (Default = None), it is recomputed by running ``centresOfMass`` Returns -------- if returnContactZones: lab : 3D numpy array of ints ( of type spam.label.toolkit.labelType) else: Z Y X Centres of mass of contacts """ # detect list of lists with code from: https://stackoverflow.com/questions/5251663/determine-if-a-list-contains-other-lists # if not any(isinstance(el, list) for el in contactPairs ): # contactPairs = [ contactPairs ] # different approach: contactPairs = numpy.array(contactPairs) # Pad with extra dim if only one contact pair if len(contactPairs.shape) == 1: contactPairs = contactPairs[numpy.newaxis, ...] if boundingBoxes is None: boundingBoxes = spam.label.boundingBoxes(lab) if centresOfMass is None: centresOfMass = spam.label.centresOfMass(lab, boundingBoxes=boundingBoxes) # To this volume will be added each contact "area", labelled # with the contact pair number (+1) analysisVolume = numpy.zeros_like(lab, dtype=numpy.uint32) if verbose: widgets = [ progressbar.FormatLabel(""), " ", progressbar.Bar(), " ", progressbar.AdaptiveETA(), ] pbar = progressbar.ProgressBar(widgets=widgets, maxval=len(contactPairs)+1) pbar.start() finishedNodes = 0 for n, contactPair in enumerate(contactPairs): # Do both orders in contacts if len(contactPair) != 2: print("spam.label.contacts.contactPoints(): contact pair does not have 2 elements") for label1, label2 in [contactPair, contactPair[::-1]]: # print( "Label1: {} Label2: {}".format( label1, label2 ) ) p1 = spam.label.getLabel( lab, label1, boundingBoxes=boundingBoxes, centresOfMass=centresOfMass, margin=2, ) p2 = spam.label.getLabel( lab, label1, boundingBoxes=boundingBoxes, centresOfMass=centresOfMass, margin=2, labelDilate=1, ) dilOnly = numpy.logical_xor(p2["subvol"], p1["subvol"]) labSlice = ( slice(p1["slice"][0].start, p1["slice"][0].stop), slice(p1["slice"][1].start, p1["slice"][1].stop), slice(p1["slice"][2].start, p1["slice"][2].stop), ) labSubvol = lab[labSlice] if dilOnly.shape == labSubvol.shape: intersection = dilOnly * labSubvol analysisVolume[labSlice][intersection == label2] = n + 1 else: raise Exception("\tdilOnly and labSlice are not the same size... getLabel probably did some safe cutting around the edges") if verbose: finishedNodes += 1 widgets[0] = progressbar.FormatLabel("{}/{} ".format(finishedNodes, len(contactPairs))) pbar.update(finishedNodes) if returnContactZones: return analysisVolume else: return spam.label.centresOfMass(analysisVolume)
[docs] def labelledContacts(lab, maximumCoordinationNumber=20): """ Uniquely names contacts based on grains. Parameters ----------- lab : 3D numpy array of ints ( of type spam.label.toolkit.labelType) An array of labels with 0 as the background maximumCoordinationNumber : int (optional, default = 20) Maximum number of neighbours to consider per grain Returns -------- An array, containing: contactVolume : array of ints 3D array where contact zones are uniquely labelled Z : array of ints Vector of length lab.max()+1 contaning the coordination number (number of touching labels for each label) contactTable : array of ints 2D array of lab.max()+1 by 2*maximumCoordinationNumber containing, per grain: touching grain label, contact number contactingLabels : array of ints 2D array of numberOfContacts by 2 containing, per contact: touching grain label 1, touching grain label 2 """ contacts = numpy.zeros_like(lab, dtype=contactType) Z = numpy.zeros((lab.max() + 1), dtype="<u1") contactTable = numpy.zeros((lab.max() + 1, maximumCoordinationNumber * 2), dtype=contactType) contactingLabels = numpy.zeros(((lab.max() + 1) * maximumCoordinationNumber, 2), dtype=spam.label.labelType) labelContacts(lab, contacts, Z, contactTable, contactingLabels) # removed the first row of contactingLabels, as it is 0, 0 return [contacts, Z, contactTable, contactingLabels[1 : contacts.max() + 1, :]]
[docs] def fetchTwoGrains(volLab, labels, volGrey=None, boundingBoxes=None, padding=0, size_exclude=5): """ Fetches the sub-volume of two grains from a labelled image Parameters ---------- volLab : 3D array of integers Labelled volume, with lab.max() labels labels : 1x2 array of integers the two labels that should be contained in the subvolume volGrey : 3D array Grey-scale volume boundingBoxes : lab.max()x6 array of ints, optional Bounding boxes in format returned by ``boundingBoxes``. If not defined (Default = None), it is recomputed by running ``boundingBoxes`` padding : integer padding of the subvolume for some purpose it might be benefitial to have a subvolume with a boundary of zeros Returns ------- subVolLab : 3D array of integers labelled sub-volume containing the two input labels subVolBin : 3D array of integers binary subvolume subVolGrey : 3D array grey-scale subvolume """ # check if bounding boxes are given if boundingBoxes is None: # print "bounding boxes are not given. calculating ...." boundingBoxes = spam.label.boundingBoxes(volLab) # else: # print "bounding boxes are given" lab1, lab2 = labels # Define output dictionary since we'll add different things to it output = {} # get coordinates of the big bounding box startZ = min(boundingBoxes[lab1, 0], boundingBoxes[lab2, 0]) - padding stopZ = max(boundingBoxes[lab1, 1], boundingBoxes[lab2, 1]) + padding startY = min(boundingBoxes[lab1, 2], boundingBoxes[lab2, 2]) - padding stopY = max(boundingBoxes[lab1, 3], boundingBoxes[lab2, 3]) + padding startX = min(boundingBoxes[lab1, 4], boundingBoxes[lab2, 4]) - padding stopX = max(boundingBoxes[lab1, 5], boundingBoxes[lab2, 5]) + padding output["slice"] = ( slice(startZ, stopZ + 1), slice(startY, stopY + 1), slice(startX, stopX + 1), ) subVolLab = volLab[ output["slice"][0].start : output["slice"][0].stop, output["slice"][1].start : output["slice"][1].stop, output["slice"][2].start : output["slice"][2].stop, ] subVolLab_A = numpy.where(subVolLab == lab1, lab1, 0) subVolLab_B = numpy.where(subVolLab == lab2, lab2, 0) subVolLab = subVolLab_A + subVolLab_B struc = numpy.zeros((3, 3, 3)) struc[1, 1:2, :] = 1 struc[1, :, 1:2] = 1 struc[0, 1, 1] = 1 struc[2, 1, 1] = 1 subVolLab = spam.label.filterIsolatedCells(subVolLab, struc, size_exclude) output["subVolLab"] = subVolLab subVolBin = numpy.where(subVolLab != 0, 1, 0) output["subVolBin"] = subVolBin if volGrey is not None: subVolGrey = volGrey[ output["slice"][0].start : output["slice"][0].stop, output["slice"][1].start : output["slice"][1].stop, output["slice"][2].start : output["slice"][2].stop, ] subVolGrey = subVolGrey * subVolBin output["subVolGrey"] = subVolGrey return output
[docs] def localDetection(subVolGrey, localThreshold, radiusThresh=None): """ local contact refinement checks whether two particles are in contact with a local threshold, that is higher than the global one used for binarisation Parameters ---------- subVolGrey : 3D array Grey-scale volume localThreshold : integer or float, same type as the 3D array threshold for binarisation of the subvolume radiusThresh : integer, optional radius for excluding patches that might not be relevant, e.g. noise can lead to patches that are not connected with the grains in contact the patches are calculated with ``equivalentRadii()`` Default is None and such patches are not excluded from the subvolume Returns ------- CONTACT : boolean if True, that particles appear in contact for the local threshold Note ---- see https://doi.org/10.1088/1361-6501/aa8dbf for further information """ CONTACT = False subVolBin = ((subVolGrey > localThreshold) * 1).astype("uint8") if radiusThresh is not None: # clean the image of isolated voxels or pairs of voxels # due to higher threshold they could exist subVolLab, numObjects = scipy.ndimage.label(subVolBin) if numObjects > 1: radii = spam.label.equivalentRadii(subVolLab) labelsToRemove = numpy.where(radii < radiusThresh) if len(labelsToRemove[0]) > 1: subVolLab = spam.label.removeLabels(subVolLab, labelsToRemove) subVolBin = ((subVolLab > 0) * 1).astype("uint8") # fill holes subVolBin = scipy.ndimage.binary_fill_holes(subVolBin).astype("uint8") labeledArray, numObjects = scipy.ndimage.label(subVolBin, structure=None, output=None) if numObjects == 1: CONTACT = True return CONTACT
[docs] def contactOrientations(volBin, volLab, watershed="ITK", peakDistance=1, markerShrink=True, verbose=False): """ determines contact normal orientation between two particles uses the random walker implementation from skimage Parameters ---------- volBin : 3D array binary volume containing two particles in contact volLab : 3D array of integers labelled volume containing two particles in contact watershed : string sets the basis for the determination of the orientation options are "ITK" for the labelled image from the input, "RW" for a further segmentation by the random walker Default = "ITK" peakDistance : int, optional Distance in pixels used in skimage.feature.peak_local_max Default value is 1 markerShrink : boolean shrinks the markers to contain only one voxel. For large markers, the segmentation might yield strange results depending on the shape. Default = True verbose : boolean (optional, default = False) True for printing the evolution of the process False for not printing the evolution of process Returns ------- contactNormal : 1x3 array contact normal orientation in z,y,x coordinates the vector is flipped such that the z coordinate is positive len(coordIntervox) : integer the number of points for the principal component analysis indicates the quality of the fit notTreatedContact : boolean if False that contact is not determined if the contact consisted of too few voxels a fit is not possible or feasible """ notTreatedContact = False from skimage.segmentation import random_walker if watershed == "ITK": # ------------------------------ # ITK watershed for orientations # ------------------------------ # need to relabel, because the _contactPairs functions looks for 1 and 2 labels = numpy.unique(volLab) volLab[volLab == labels[1]] = 1 volLab[volLab == labels[2]] = 2 contactITK = _contactPairs(volLab) if len(contactITK) <= 2: # print('WARNING: not enough contacting voxels (beucher)... aborting this calculation') notTreatedContact = True return numpy.zeros(3), 0, notTreatedContact coordIntervox = contactITK[:, 0:3] # Determining the contact orientation! using PCA contactNormal = _contactNormals(coordIntervox) if watershed == "RW": # Get Labels label1, label2 = numpy.unique(volLab)[1:] # Create sub-domains subVolBinA = numpy.where(volLab == label1, 1, 0) subVolBinB = numpy.where(volLab == label2, 1, 0) volBin = subVolBinA + subVolBinB # Calculate distance map distanceMapA = scipy.ndimage.distance_transform_edt(subVolBinA) distanceMapB = scipy.ndimage.distance_transform_edt(subVolBinB) # Calculate local Max localMaxiPointsA = skimage.feature.peak_local_max( distanceMapA, min_distance=peakDistance, exclude_border=False, labels=subVolBinA, ) localMaxiPointsB = skimage.feature.peak_local_max( distanceMapB, min_distance=peakDistance, exclude_border=False, labels=subVolBinB, ) # 2022-09-26: Dropping indices, and so rebuilding image of peaks localMaxiA = numpy.zeros_like(distanceMapA, dtype=bool) localMaxiB = numpy.zeros_like(distanceMapB, dtype=bool) localMaxiA[tuple(localMaxiPointsA.T)] = True localMaxiB[tuple(localMaxiPointsB.T)] = True # Label the Peaks struc = numpy.ones((3, 3, 3)) markersA, numMarkersA = scipy.ndimage.label(localMaxiA, structure=struc) markersB, numMarkersB = scipy.ndimage.label(localMaxiB, structure=struc) if numMarkersA == 0 or numMarkersB == 0: notTreatedContact = True if verbose: print("spam.contacts.contactOrientations: No grain markers found for this contact. Setting the contact as non-treated") return numpy.zeros(3), 0, notTreatedContact # Shrink the markers # The index property should be a list with all the labels encountered, exlucing the label for the backgroun (0). centerOfMassA = scipy.ndimage.center_of_mass(markersA, labels=markersA, index=list(numpy.unique(markersA)[1:])) centerOfMassB = scipy.ndimage.center_of_mass(markersB, labels=markersB, index=list(numpy.unique(markersB)[1:])) # Calculate the value of the distance map for the markers marksValA = [] marksValB = [] for x in range(len(centerOfMassA)): marksValA.append( distanceMapA[ int(centerOfMassA[x][0]), int(centerOfMassA[x][1]), int(centerOfMassA[x][2]), ] ) for x in range(len(centerOfMassB)): marksValB.append( distanceMapB[ int(centerOfMassB[x][0]), int(centerOfMassB[x][1]), int(centerOfMassB[x][2]), ] ) # Sort the markers coordinates acoording to their distance map value # First element correspond to the coordinates of the marker with the higest value in the distance map centerOfMassA = numpy.flipud([x for _, x in sorted(zip(marksValA, centerOfMassA))]) centerOfMassB = numpy.flipud([x for _, x in sorted(zip(marksValB, centerOfMassB))]) marksValA = numpy.flipud(sorted(marksValA)) marksValB = numpy.flipud(sorted(marksValB)) # Select markers markers = numpy.zeros(volLab.shape) markers[int(centerOfMassA[0][0]), int(centerOfMassA[0][1]), int(centerOfMassA[0][2])] = 1.0 markers[int(centerOfMassB[0][0]), int(centerOfMassB[0][1]), int(centerOfMassB[0][2])] = 2.0 # set the void phase of the binary image to -1, excludes this phase from calculation of the random walker (saves time) volRWbin = volBin.astype(float) markers[~volRWbin.astype(bool)] = -1 if numpy.max(numpy.unique(markers)) != 2: notTreatedContact = True if verbose: print("spam.contacts.contactOrientations: The grain markers where wrongly computed. Setting the contact as non-treated") return numpy.zeros(3), 0, notTreatedContact probMaps = random_walker(volRWbin, markers, beta=80, mode="cg_mg", return_full_prob=True) # reset probability of voxels in the void region to 0! (-1 right now, as they were not taken into account!) probMaps[:, ~volRWbin.astype(bool)] = 0 # create a map of the probabilities of belonging to either label 1 oder label 2 (which is just the inverse map) labRW1 = probMaps[0].astype(numpy.float32) # label the image depending on the probabilty of each voxel belonging to marker = 1, save one power watershed labRW = numpy.array(labRW1) labRW[labRW > 0.5] = 1 labRW[(labRW < 0.5) & (labRW > 0)] = 2 # seed of the second marker has to be labelled by 2 as well labRW[markers == 2] = 2 contactVox = _contactPairs(labRW) if len(contactVox) <= 2: # print 'WARNING: not enough contacting voxels (rw).... aborting this calculation' notTreatedContact = True if verbose: print("spam.contacts.contactOrientations: Not enough contacting voxels, aborting this calculation. Setting the contact as non-treated") return numpy.zeros(3), 0, notTreatedContact # get subvoxel precision - using the probability values at the contacting voxels! coordIntervox = _contactPositions(contactVox, labRW1) # Determining the contact orientation! using PCA contactNormal = _contactNormals(coordIntervox) return contactNormal[0], len(coordIntervox), notTreatedContact
def _contactPairs(lab): """ finds the voxels involved in the contact i.e. the voxels of one label in direct contact with another label Parameters ---------- lab : 3D array of integers labelled volume containing two particles in contact labels of the considered particles are 1 and 2 Returns ------- contact : (len(contactVoxels))x4 array z,y,x positions and the label of the voxel """ dimensions = numpy.shape(lab) dimZ, dimY, dimX = dimensions[0], dimensions[1], dimensions[2] # position of the voxels labelled by 1 ... row 1 = coord index 1, row 2 = coord 2, ... positionLabel = numpy.array(numpy.where(lab == 1)) # initialize the array for the contact positions and labels! contact = numpy.array([[], [], [], []]).transpose() # loop over all voxels labeled with 1 for x in range(0, len(positionLabel.transpose())): pix = numpy.array([positionLabel[0][x], positionLabel[1][x], positionLabel[2][x], 1]) # pix ... positions (axis 0,1,2) of the first voxel containing 1 # x axis neighbor if positionLabel[0][x] < dimZ - 1: # if the voxel is still in the image! # if neighbor in pos. x-direction is 2 then save the actual voxel and the neighbor!! if lab[positionLabel[0][x] + 1, positionLabel[1][x], positionLabel[2][x]] == 2: vx1 = numpy.array( [ positionLabel[0][x] + 1, positionLabel[1][x], positionLabel[2][x], 2, ] ) # stacks the data, adds a row to the data, so you get (contact, pix, vx2) contact = numpy.vstack((contact, pix)) contact = numpy.vstack((contact, vx1)) # this condition prevents from getting stuck at the border of the image, where the x-value cannot be reduced! if positionLabel[0][x] != 0: # if neighbor in neg. x-direction is 2 then save the actual voxel and the neighbor!! if lab[positionLabel[0][x] - 1, positionLabel[1][x], positionLabel[2][x]] == 2: vx2 = numpy.array( [ positionLabel[0][x] - 1, positionLabel[1][x], positionLabel[2][x], 2, ] ) contact = numpy.vstack((contact, pix)) contact = numpy.vstack((contact, vx2)) # y axis neighbor if positionLabel[1][x] < dimY - 1: if lab[positionLabel[0][x], positionLabel[1][x] + 1, positionLabel[2][x]] == 2: vy1 = numpy.array( [ positionLabel[0][x], positionLabel[1][x] + 1, positionLabel[2][x], 2, ] ) contact = numpy.vstack((contact, pix)) contact = numpy.vstack((contact, vy1)) if positionLabel[1][x] != 0: if lab[positionLabel[0][x], positionLabel[1][x] - 1, positionLabel[2][x]] == 2: vy2 = numpy.array( [ positionLabel[0][x], positionLabel[1][x] - 1, positionLabel[2][x], 2, ] ) contact = numpy.vstack((contact, pix)) contact = numpy.vstack((contact, vy2)) # z axis neighbor if positionLabel[2][x] < dimX - 1: if lab[positionLabel[0][x], positionLabel[1][x], positionLabel[2][x] + 1] == 2: vz1 = numpy.array( [ positionLabel[0][x], positionLabel[1][x], positionLabel[2][x] + 1, 2, ] ) contact = numpy.vstack((contact, pix)) contact = numpy.vstack((contact, vz1)) if positionLabel[2][x] != 0: if lab[positionLabel[0][x], positionLabel[1][x], positionLabel[2][x] - 1] == 2: vz2 = numpy.array( [ positionLabel[0][x], positionLabel[1][x], positionLabel[2][x] - 1, 2, ] ) contact = numpy.vstack((contact, pix)) contact = numpy.vstack((contact, vz2)) return contact def _contactPositions(contact, probMap): """ determines the position of the points that define the contact area for the random walker probability map the 50 percent probability plane is considered as the area of contact linear interpolation is used to determine the 50 percent probability area Parameters ---------- contact : (len(contactVoxels))x4 array z,y,x positions and the label of the voxel as returned by the function contactPairs() probMap : 3D array of floats probability map of one of the labels as determined by the random walker Returns ------- coordIntervox : (len(coordIntervox))x3 array z,y,x positions of the 50 percent probability area """ coordIntervox = numpy.array([[], [], []]).transpose() for x in range(0, len(contact), 2): # call only contact pairs (increment 2) prob1 = probMap[int(contact[x][0]), int(contact[x][1]), int(contact[x][2])] # pick the probability value of the concerning contact voxel! prob2 = probMap[int(contact[x + 1][0]), int(contact[x + 1][1]), int(contact[x + 1][2])] if prob2 - prob1 != 0: add = float((0.5 - prob1) / (prob2 - prob1)) else: add = 0.5 # if both voxels have the same probability (0.5), the center is just in between them! # check whether the next contact voxel is x-axis (0) neighbor or not # x axis neighbor if contact[x][0] != contact[x + 1][0]: # 2 possibilities: a coordinates > or < b coordinates if contact[x][0] < contact[x + 1][0]: # find the contact point in subvoxel resolution! midX = numpy.array([contact[x][0] + add, contact[x][1], contact[x][2]]) coordIntervox = numpy.vstack((coordIntervox, midX)) else: midX = numpy.array([contact[x][0] - add, contact[x][1], contact[x][2]]) coordIntervox = numpy.vstack((coordIntervox, midX)) # y axis neighbor elif contact[x][1] != contact[x + 1][1]: if contact[x][1] < contact[x + 1][1]: midY = numpy.array([contact[x][0], contact[x][1] + add, contact[x][2]]) coordIntervox = numpy.vstack((coordIntervox, midY)) else: midY = numpy.array([contact[x][0], contact[x][1] - add, contact[x][2]]) coordIntervox = numpy.vstack((coordIntervox, midY)) # z axis neighbor else: if contact[x][2] < contact[x + 1][2]: midZ = numpy.array([contact[x][0], contact[x][1], contact[x][2] + add]) coordIntervox = numpy.vstack((coordIntervox, midZ)) else: midZ = numpy.array([contact[x][0], contact[x][1], contact[x][2] - add]) coordIntervox = numpy.vstack((coordIntervox, midZ)) return coordIntervox def _contactNormals(dataSet): """ determines the contact normal orientation based on the intervoxel positions determined by the function contactPositions() uses a principal component analysis the smallest eigenvector of the covariance matrix is considered as the contact orientation Parameters ---------- dataSet : (len(dataSet))x3 array z,y,x positions of the 50 percent probability area of the random walker Returns ------- contactNormal : 1x3 array z,y,x directions of the normalised contact normal orientation """ covariance = numpy.cov(dataSet, rowvar=0, bias=0) # step 3: calculate the eigenvectors and eigenvalues # eigenvalues are not necessarily ordered ... # colum[:,i] corresponds to eig[i] eigVal, eigVec = numpy.linalg.eig(covariance) # look for the eigenvector corresponding to the minimal eigenvalue! minEV = eigVec[:, numpy.argmin(eigVal)] # vector orientation contactNormal = numpy.zeros((1, 3)) if minEV[2] < 0: contactNormal[0, 0], contactNormal[0, 1], contactNormal[0, 2] = ( -minEV[0], -minEV[1], -minEV[2], ) else: contactNormal[0, 0], contactNormal[0, 1], contactNormal[0, 2] = ( minEV[0], minEV[1], minEV[2], ) return contactNormal def _markerCorrection( markers, numMarkers, distanceMap, volBin, peakDistance=5, struc=numpy.ones((3, 3, 3)), ): """ corrects the number of markers used for the random walker segmentation of two particles if too many markers are found, the markers with the largest distance are considered if too few markers are found, the minimum distance between parameters is reduced Parameters ---------- markers : 3D array of integers volume containing the potential markers for the random walker numMarkers : integer number of markers found so far distanceMap : 3D array of floats euclidean distance map volBin : 3D array of integers binary volume peakDistance : integer peak distance as used in skimage.feature.peak_local_max Default value is 15 px struc=numpy.ones((3,3,3)) : 3x3x3 array of integers structuring element for the labelling of the markers Default element is a 3x3x3 array of ones Returns ------- markers : 3-D array of integers volume containing the markers """ counter = 0 # If there are too many markers, slowly increse peakDistance until it's the size of the image, if this overshoots the next bit will bring it back down if numpy.amax(markers) > 2: while numpy.amax(markers) > 2 and peakDistance < min(volBin.shape): peakDistance = max(1, int(peakDistance * 1.3)) localMaxiPoints = skimage.feature.peak_local_max( distanceMap, min_distance=peakDistance, exclude_border=False, labels=volBin, num_peaks=2, ) localMaxi = numpy.zeros_like(distanceMap) localMaxi[tuple(localMaxiPoints.T)] = True markers, numMarkers = scipy.ndimage.label(localMaxi, structure=struc) # If there are no markers, decrease peakDistance while numpy.any(markers) is False or numpy.amax(markers) < 2: if counter > 10: markers = numpy.zeros(markers.shape) return markers peakDistance = max(1, int(peakDistance / 1.3)) localMaxiPoints = skimage.feature.peak_local_max( distanceMap, min_distance=peakDistance, exclude_border=False, labels=volBin, num_peaks=2, ) localMaxi = numpy.zeros_like(distanceMap, dtype=bool) localMaxi[tuple(localMaxiPoints.T)] = True markers, numMarkers = scipy.ndimage.label(localMaxi, structure=struc) counter = counter + 1 if numpy.amax(markers) > 2: centerOfMass = scipy.ndimage.center_of_mass(markers, labels=markers, index=range(1, numMarkers + 1)) distances = numpy.zeros((numMarkers, numMarkers)) for i in range(0, numMarkers): for j in range(i, numMarkers): distances[i, j] = numpy.linalg.norm(numpy.array(centerOfMass[i]) - numpy.array(centerOfMass[j])) maxDistance = numpy.amax(distances) posMaxDistance = numpy.where(distances == maxDistance) # let's choose the markers with the maximum distance markers1 = numpy.where(markers == posMaxDistance[0][0] + 1, 1, 0) markers2 = numpy.where(markers == posMaxDistance[1][0] + 1, 2, 0) localMaxi = markers1 + markers2 markers, numMarkers = scipy.ndimage.label(localMaxi, structure=struc) return markers
[docs] def localDetectionAssembly( volLab, volGrey, contactList, localThreshold, boundingBoxes=None, nProcesses=nProcessesDefault, radiusThresh=4, ): """ Local contact refinement of a set of contacts checks whether two particles are in contact with a local threshold using ``localDetection()`` Parameters ---------- volLab : 3D array Labelled volume volGrey : 3D array Grey-scale volume contactList : (ContactNumber)x2 array of integers contact list with grain labels of each contact in a seperate row, as given by ``spam.label.contacts.labelledContacts`` in the list contactingLabels localThreshold : integer or float, same type as the 3D array threshold for binarisation of the subvolume boundingBoxes : lab.max()x6 array of ints, optional Bounding boxes in format returned by ``boundingBoxes``. If not defined (Default = None), it is recomputed by running ``boundingBoxes`` nProcesses : integer (optional, default = nProcessesDefault) Number of processes for multiprocessing Default = number of CPUs in the system radiusThresh : integer, optional radius in px for excluding patches that might not be relevant, e.g. noise can lead to patches that are not connected with the grains in contact the patches are calculated with ``equivalentRadii()`` Default is 4 Returns ------- contactListRefined : (ContactNumber)x2 array of integers refined contact list, based on the chosen local threshold Note ---- see https://doi.org/10.1088/1361-6501/aa8dbf for further information """ import progressbar # check if bounding boxes are given if boundingBoxes is None: # print "bounding boxes are not given. calculating ...." boundingBoxes = spam.label.boundingBoxes(volLab) contactListRefined = [] numberOfJobs = len(contactList) # print ("\n\tLocal contact refinement") # print ("\tApplying a local threshold of ", localThreshold, " to each contact.") # print ("\tNumber of contacts for treatment ", numberOfJobs) # print ("") ########################################## # Function for localDetectionAssembly global funLocalDetectionAssembly def funLocalDetectionAssembly(job): grainA, grainB = contactList[job].astype("int") labels = [grainA, grainB] subset = fetchTwoGrains(volLab, labels, volGrey, boundingBoxes) contact = localDetection(subset["subVolGrey"], localThreshold, radiusThresh) if contact is True: # print ("we are in contact!") return job, grainA, grainB # Create progressbar widgets = [ progressbar.FormatLabel(""), " ", progressbar.Bar(), " ", progressbar.AdaptiveETA(), ] pbar = progressbar.ProgressBar(widgets=widgets, maxval=numberOfJobs) pbar.start() finishedNodes = 0 # Run multiprocessing with multiprocessing.Pool(processes=nProcesses) as pool: for returns in pool.imap_unordered(funLocalDetectionAssembly, range(0, numberOfJobs)): finishedNodes += 1 widgets[0] = progressbar.FormatLabel("{}/{} ".format(finishedNodes, numberOfJobs)) pbar.update(finishedNodes) if returns is not None: contactListRefined.append([returns[1], returns[2]]) pool.close() pool.join() # End progressbar pbar.finish() return numpy.asarray(contactListRefined)
[docs] def contactOrientationsAssembly( volLab, volGrey, contactList, watershed="ITK", peakDistance=5, boundingBoxes=None, nProcesses=nProcessesDefault, verbose=False, ): """ Determines contact normal orientation in an assembly of touching particles uses either directly the labelled image or the random walker implementation from skimage Parameters ---------- volLab : 3D array Labelled volume volGrey : 3D array Grey-scale volume contactList : (ContactNumber)x2 array of integers contact list with grain labels of each contact in a seperate row, as given by ``spam.label.contacts.labelledContacts`` in the list contactingLabels or by ``spam.label.contacts.localDetectionAssembly()`` watershed : string, optional sets the basis for the determination of the orientation options are "ITK" for the labelled image from the input, "RW" for a further segmentation by the random walker Default = "ITK" peakDistance : int, optional Distance in pixels used in skimage.feature.peak_local_max Default value is 5 boundingBoxes : lab.max()x6 array of ints, optional Bounding boxes in format returned by ``boundingBoxes``. If not defined (Default = None), it is recomputed by running ``boundingBoxes`` nProcesses : integer (optional, default = nProcessesDefault) Number of processes for multiprocessing Default = number of CPUs in the system verbose : boolean (optional, default = False) True for printing the evolution of the process False for not printing the evolution of process Returns ------- contactOrientations : (numContacts)x6 array contact normal orientation for every contact [labelGrainA, labelGrainB, orientationZ, orientationY, orientationX, intervoxel positions for PCA] notTreatedContact : boolean if False that contact is not determined if the contact consisted of too few voxels a fit is not possible or feasible """ # check if bounding boxes are given if boundingBoxes is None: # print ("bounding boxes are not given. calculating ....") boundingBoxes = spam.label.boundingBoxes(volLab) contactOrientations = [] numberOfJobs = len(contactList) # print ("\n\tDetermining the contact orientations of an assembly of particles") # print ("\tNumber of contacts ", numberOfJobs) # print ("") ########################################## # Function for ContactOrientationsAssembly global funContactOrientationsAssembly def funContactOrientationsAssembly(job): grainA, grainB = contactList[job, 0:2].astype("int") labels = [grainA, grainB] subset = fetchTwoGrains(volLab, labels, volGrey, boundingBoxes) contactNormal, intervox, NotTreatedContact = spam.label.contactOrientations( subset["subVolBin"], subset["subVolLab"], watershed, peakDistance=peakDistance, verbose=verbose, ) # TODO work on not treated contacts -- output them! return ( grainA, grainB, contactNormal[0], contactNormal[1], contactNormal[2], intervox, ) # Create progressbar widgets = [ progressbar.FormatLabel(""), " ", progressbar.Bar(), " ", progressbar.AdaptiveETA(), ] pbar = progressbar.ProgressBar(widgets=widgets, maxval=numberOfJobs) pbar.start() finishedNodes = 0 # Run multiprocessing with multiprocessing.Pool(processes=nProcesses) as pool: for returns in pool.imap_unordered(funContactOrientationsAssembly, range(0, numberOfJobs)): finishedNodes += 1 widgets[0] = progressbar.FormatLabel("{}/{} ".format(finishedNodes, numberOfJobs)) pbar.update(finishedNodes) if returns is not None: contactOrientations.append( [ returns[0], returns[1], returns[2], returns[3], returns[4], returns[5], ] ) # result[2], result[3], result[4], result[5], result[6], result[7] pool.close() pool.join() # End progressbar pbar.finish() return numpy.asarray(contactOrientations)