Source code for spam.DIC.pixelSearch

Library of SPAM image correlation functions.
import multiprocessing

import numpy
import progressbar

import spam.label
from spam.DIC.DICToolkit import pixelSearch as pixelSearchCPP

def _errorCalc(im1, im2):
    return numpy.nansum(numpy.square(numpy.subtract(im1, im2))) / numpy.nansum(im1)

def _pixelSearch(imagette1, imagette2, imagette1mask=None, imagette2mask=None, returnError=False):
    This function performs a pixel-by-pixel search in 3D for a small reference imagette1 within a larger imagette2.

    The normalised correlation coefficient (NCC) is computed for EVERY combination of the displacements of imagette1 defined within imagette2.
    What is returned is the highest NCC value obtained and the position where it was obtained with respect to the origin in imagette2.

    Values of NCC > 0.99 can generally be trusted.

        imagette1 : 3D numpy array of floats
            Imagette 1 is the smaller reference image

        imagette2 : 3D numpy array of floats
            Imagette 2 is the bigger image inside which to search

        imagette1mask : 3D numpy array of bools, optional
            A mask for imagette1 to define which pixels to include in the correlation
            (True = Correlate these pixels, False = Skip),
            Default = no mask

        imagette2mask : 3D numpy array of bools, optional
            A mask for imagette2 to define which pixels to include in the correlation
            (True = Correlate these pixels, False = Skip),
            Default = no mask

        returnError : bool, optional
            Return a normalised sum of squared differences error compatible with
            register() family of functions? If yes, it's the last returned variable
            Default = False

        p : 3-component vector
            Z, Y, X position with respect to origin in imagette2 of imagette1 to get best NCC

        cc : float
            Normalised correlation coefficient, ~0.5 is random, >0.99 is very good correlation.
    # Note
    # ----
    # It important to remember that the C code runs A BIT faster in its current incarnation when it has
    # a cut-out im2 to deal with (this is related to processor optimistaions).
    # Cutting out imagette2 to just fit around the search range might save a bit of time
    assert numpy.all(imagette2.shape >= imagette1.shape), "spam.DIC.pixelSearch(): imagette2 should be bigger or equal to imagette1 in all dimensions"

    if imagette1mask is not None:
        assert imagette1.shape == imagette1mask.shape, "spam.DIC.pixelSearch: imagette1mask ({}) should have same size as imagette1 ({})".format(imagette1mask.shape, imagette1.shape)
        imagette1 = imagette1.astype("<f4")
        imagette1[imagette1mask == 0] = numpy.nan

    if imagette2mask is not None:
        assert imagette2.shape == imagette2mask.shape, "spam.DIC.pixelSearch: imagette2mask ({}) should have same size as imagette2 ({})".format(imagette2mask.shape, imagette2.shape)
        imagette2 = imagette2.astype("<f4")
        imagette2[imagette2mask == 0] = numpy.nan

    # Run the actual pixel search
    returns = numpy.zeros(4, dtype="<f4")
    pixelSearchCPP(imagette1.astype("<f4"), imagette2.astype("<f4"), returns)

    if returnError:
        error = _errorCalc(
                int(returns[0]) : int(returns[0]) + imagette1.shape[0],
                int(returns[1]) : int(returns[1]) + imagette1.shape[1],
                int(returns[2]) : int(returns[2]) + imagette1.shape[2],
        return numpy.array(returns[0:3]), returns[3], error

        return numpy.array(returns[0:3]), returns[3]

[docs] def pixelSearchDiscrete(lab1, im1, im2, PhiField, searchRange, boundingBoxes, numberOfNodes, nodePositions, applyF, labelDilate, volThreshold, numProc): # Create pixelSearchCC vector pixelSearchCC = numpy.zeros((numberOfNodes), dtype=float) # Error compatible with register() error = numpy.zeros((numberOfNodes), dtype=float) returnStatus = numpy.ones((numberOfNodes), dtype=int) deltaPhiNorm = numpy.ones((numberOfNodes), dtype=int) iterations = numpy.ones((numberOfNodes), dtype=int) firstNode = 1 finishedNodes = 1 returnStatus[0] = 0 global pixelSearchOneNodeDiscrete def pixelSearchOneNodeDiscrete(nodeNumber): """ Function to be called by multiprocessing parallelisation for pixel search in one position. This function will call getImagettes, or the equivalent for labels and perform the pixel search Parameters ---------- nodeNumber : int node number to work on Returns ------- List with: - nodeNumber (needed to write result in right place) - displacement vector - NCC value - error value - return Status """ imagetteReturns = spam.label.getImagettesLabelled( lab1, nodeNumber, PhiField[nodeNumber].copy(), im1, im2, searchRange.copy(), boundingBoxes, nodePositions, margin=labelDilate, labelDilate=labelDilate, applyF=applyF, volumeThreshold=volThreshold, ) imagetteReturns["imagette2mask"] = None # If getImagettes was successful (size check and mask coverage check) if imagetteReturns["returnStatus"] == 1: PSreturns = _pixelSearch( imagetteReturns["imagette1"], imagetteReturns["imagette2"], imagette1mask=imagetteReturns["imagette1mask"], imagette2mask=imagetteReturns["imagette2mask"], returnError=True ) pixelSearchOffset = imagetteReturns["pixelSearchOffset"] return (nodeNumber, PSreturns[0] + pixelSearchOffset, PSreturns[1], PSreturns[2], imagetteReturns["returnStatus"]) # Failed to extract imagettes or something else: return (nodeNumber, numpy.array([numpy.nan] * 3), 0.0, numpy.inf, imagetteReturns["returnStatus"]) print("\n\tStarting Pixel Search Discrete (with {} process{})".format(numProc, "es" if numProc > 1 else "")) widgets = [ progressbar.FormatLabel(""), " ", progressbar.Bar(), " ", progressbar.AdaptiveETA(), ] pbar = progressbar.ProgressBar(widgets=widgets, maxval=numberOfNodes) pbar.start() finishedNodes = 0 with multiprocessing.Pool(processes=numProc) as pool: for returns in pool.imap_unordered(pixelSearchOneNodeDiscrete, range(firstNode, numberOfNodes)): finishedNodes += 1 # Update progres bar if point is not skipped if returns[4] > 0: widgets[0] = progressbar.FormatLabel(" CC={:0>7.5f} ".format(returns[2])) pbar.update(finishedNodes) PhiField[returns[0], 0:3, -1] = returns[1] # Create pixelSearchCC vector pixelSearchCC[returns[0]] = returns[2] error[returns[0]] = returns[3] returnStatus[returns[0]] = returns[4] pool.close() pool.join() pbar.finish() return PhiField, pixelSearchCC, error, returnStatus, deltaPhiNorm, iterations
[docs] def pixelSearchLocal(im1, im2, PhiField, searchRange, hws, im1mask, im2mask, numberOfNodes, nodePositions, applyF, maskCoverage, greyLowThresh, greyHighThresh, twoD, numProc): # Create pixelSearchCC vector pixelSearchCC = numpy.zeros((numberOfNodes), dtype=float) # Error compatible with register() error = numpy.zeros((numberOfNodes), dtype=float) returnStatus = numpy.ones((numberOfNodes), dtype=int) deltaPhiNorm = numpy.ones((numberOfNodes), dtype=int) iterations = numpy.ones((numberOfNodes), dtype=int) firstNode = 0 finishedNodes = 0 global pixelSearchOneNodeLocal def pixelSearchOneNodeLocal(nodeNumber): """ Function to be called by multiprocessing parallelisation for pixel search in one position. This function will call getImagettes, or the equivalent for labels and perform the pixel search Parameters ---------- nodeNumber : int node number to work on Returns ------- List with: - nodeNumber (needed to write result in right place) - displacement vector - NCC value - error value - return Status """ imagetteReturns = spam.DIC.getImagettes( im1, nodePositions[nodeNumber], hws, PhiField[nodeNumber].copy(), im2, searchRange.copy(), im1mask=im1mask, im2mask=im2mask, minMaskCoverage=maskCoverage, greyThreshold=[greyLowThresh, greyHighThresh], applyF=applyF, twoD=twoD, ) # If getImagettes was successful (size check and mask coverage check) if imagetteReturns["returnStatus"] == 1: PSreturns = _pixelSearch( imagetteReturns["imagette1"], imagetteReturns["imagette2"], imagette1mask=imagetteReturns["imagette1mask"], imagette2mask=imagetteReturns["imagette2mask"], returnError=True ) pixelSearchOffset = imagetteReturns["pixelSearchOffset"] return (nodeNumber, PSreturns[0] + pixelSearchOffset, PSreturns[1], PSreturns[2], imagetteReturns["returnStatus"]) # Failed to extract imagettes or something else: return (nodeNumber, numpy.array([numpy.nan] * 3), 0.0, numpy.inf, imagetteReturns["returnStatus"]) print("\n\tStarting Pixel Search Local (with {} process{})".format(numProc, "es" if numProc > 1 else "")) widgets = [ progressbar.FormatLabel(""), " ", progressbar.Bar(), " ", progressbar.AdaptiveETA(), ] pbar = progressbar.ProgressBar(widgets=widgets, maxval=numberOfNodes) pbar.start() finishedNodes = 0 with multiprocessing.Pool(processes=numProc) as pool: for returns in pool.imap_unordered(pixelSearchOneNodeLocal, range(firstNode, numberOfNodes)): finishedNodes += 1 # Update progres bar if point is not skipped if returns[4] > 0: widgets[0] = progressbar.FormatLabel(" CC={:0>7.5f} ".format(returns[2])) pbar.update(finishedNodes) PhiField[returns[0], 0:3, -1] = returns[1] # Create pixelSearchCC vector pixelSearchCC[returns[0]] = returns[2] error[returns[0]] = returns[3] returnStatus[returns[0]] = returns[4] pool.close() pool.join() pbar.finish() return PhiField, pixelSearchCC, error, returnStatus, deltaPhiNorm, iterations