import numpy as np
import matplotlib.pyplot as plt
import os
import sys
import warnings
import natsort
import cv2
import multiprocessing
import copy
import warnings
sys.path.append(os.path.join('..'))
from spexwavepy.imstackfun import Imagestack
from spexwavepy.corefun import _indicator, read_one, crop_one, Imagematch, NormImage
from spexwavepy.corefun import _initDisplay, _contiDisplay
from spexwavepy.postfun import slope_scan, slope_pixel, curv_scan, curv_XST
__DEBUG = True
[docs]
class Tracking:
"""
A container for different speckle tracking algorithms.
Parameters
----------
imstack1 : Imagestack class
The first image stack.
For XSS technique, it is scanned in x | y direction.
imstack2 : Imagestack class
The second image stack.
For XSS technique, it is scanned in x | y direction.
Not always necessary. (default None)
imstack3 : Imagestack class
The third image stack.
For XSS technique, it is scanned in y direction.
/Not always necessary. (default None)
imstack4 : Imagestack class
The fourth image stack.
For XSS technique, it is scanned in y direction.
Not always necessary. (default None)
dimension : str
'1D' or '2D'. To do 1D or 2D data processing. (default '2D')
scandim : str
'x'|'y'|'xy'|'random'. Scan direction for the image stack.
(default 'x')
mempos : str
'downstream' or 'upstream'. Use this to define the position of
the diffusor in respect to the focus of the optics. 'downstream'
means the diffuser is placed downstream of the focus. See
*User guide: Local curvature reconstruction* for more
details of it. (default 'downstream')
scanstep : float
scan step size, unit in :math:`\mu m`. (default None)
pixsize : float
detector pixel size, unit in :math:`\mu m`. (default None)
dist : float
distance from diffuser to detector plane,
if it's the downstream mode, ``mempos`` is 'downstream' (default),
unit in mm. If the diffuser is placed in the upstream,
``mempos`` is 'upstream',
usually it is set to be the distance between the centre
of the optic to the detector. (default None)
subpixelmeth : str
'default', 'gausspeak' or 'parapeak'. (default 'default').
Method used for subpixel registration.
delayX : numpy.ndarray
The tracked 1D/2D shifts in X direction.
delayY : numpy.ndarray
The tracked 1D/2D shifts in Y direction.
resX : numpy.ndarray
The tracking 1D/2D coeffcient in X direction.
resY : numpy.ndarray
The tracking 1D/2D coeffcient in Y direction.
sloX : numpy.ndarray
The reconstructed slope error in X direction.
sloY : numpy.ndarray
The reconstructed slope error in Y direction.
curvX : numpy.ndarray
The reconstructed local curvature in X direction.
curvY : numpy.ndarray
The reconstructed local curvature in Y direction.
"""
def __init__(self, imstack1, imstack2=None, imstack3=None, imstack4=None):
"""
Parameters
----------
imstack1 : Imagestack class
The first image stack.
imstack2 : Imagestack class
The second image stack. (default None)
imstack3 : Imagestack class
The third image stack. (default None)
imstack4 : Imagestack class
The fourth image stack. (default None)
"""
self.imstack1 = imstack1
self.imstack2 = imstack2
self.imstack3 = imstack3
self.imstack4 = imstack4
self.dimension = '2D'
self.scandim = 'x'
self.mempos = 'downstream'
self.scanstep = None
self.pixsize = None
self.subpixelmeth = 'default'
self.delayX = None
self.delayY = None
self._delayX = None
self._delayY = None
self.resX = None
self.resY = None
self.sloX = None
self._sloX = None
self.sloY = None
self._sloY = None
self.curvX = None
self._curvX = None
self.curvY = None
self._curvY = None
self._flag = None
@property
def dimension(self):
return self._dimension
@dimension.setter
def dimension(self, value):
if value not in ['1D', '2D']:
print("Tracking.dimension needs to be '1D' or '2D'.")
sys.exit(0)
else:
self._dimension = value
@property
def scandim(self):
return self._scandim
@scandim.setter
def scandim(self, value):
if value not in ['x', 'y', 'xy', 'random']:
print("Unrecognized scan mode. It should be 'x', 'y'or 'xy', or 'random'.")
sys.exit(0)
else:
self._scandim = value
@property
def mempos(self):
return self._mempos
@mempos.setter
def mempos(self, value):
if value not in ['downstream', 'upstream']:
print("Unrecognized mempos. It should be 'downstream' or 'upstream'.")
sys.exit(0)
else:
self._mempos = value
@property
def scanstep(self):
return self._scanstep
@scanstep.setter
def scanstep(self, value):
self._scanstep = value
@property
def pixsize(self):
return self._pixsize
@pixsize.setter
def pixsize(self, value):
self._pixsize = value
@property
def dist(self):
return self._dist
@dist.setter
def dist(self, value):
self._dist = value
@property
def subpixelmeth(self):
return self._subpixelmeth
@subpixelmeth.setter
def subpixelmeth(self, value):
if value not in ['default', 'gausspeak', 'parapeak']:
print("Not supported subpixel registration method.")
sys.exit(0)
else:
self._subpixelmeth= value
[docs]
def stability(self, edge_x, edge_y, verbose=True):
"""
Check the stability using speckle pattern.
Parameters
----------
edge_x : int, or [int, int]
Cutting area of the reference image in x direction.
If it is a single integer, it will be expanded automatically
to the list [int, int].
edge_y : int, or [int, int]
Cutting area of the reference image in y direction.
If it is a single integer, it will be expanded automatically
to the list [int, int].
verbose : bool
To show the information or not. (default True)
Returns
-------
ixs, iys, res : numpy.array
Shifts in two dimensions and the correlation coefficient.
"""
if self.imstack1.rawdata is not None:
imNo, y_size, x_size = self.imstack1.data.shape
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
ixs = np.zeros(imNo-1)
iys = np.zeros(imNo-1)
res = np.zeros(imNo-1)
im1 = self.imstack1.data[0]
for j in range(imNo-1):
if verbose: _indicator(j, imNo, 'Stability')
im2 = self.imstack1.data[j+1]
im2 = im2[edge_y[0]:-edge_y[1], edge_x[0]:-edge_x[1]]
ix_tmp, iy_tmp, res_tmp = Imagematch(im1, im2, subpixelmeth=self.subpixelmeth)
ixs[j] = ix_tmp - edge_x[0]
iys[j] = iy_tmp - edge_y[0]
res[j] = np.max(res_tmp)
return ixs, iys, res
else:
data_folder = self.imstack1.fpath
file_start = self.imstack1.fstart
file_step = self.imstack1.fstep
fileNum = self.imstack1.fnum
fileNames = natsort.natsorted(os.listdir(os.chdir(data_folder)))
if fileNum <= 0:
fileNames = fileNames[file_start::file_step]
self.fnum = len(fileNames)
else:
fileNames = fileNames[file_start:file_start+fileNum*file_step:file_step]
im1_tmp = read_one(fileNames[0], ShowImage=False)
im1 = crop_one(im1_tmp, self.imstack1.roi)
imNo = len(fileNames)
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
ixs = np.zeros(imNo-1)
iys = np.zeros(imNo-1)
res = np.zeros(imNo-1)
for j in range(imNo-1):
if verbose: _indicator(j, imNo-1, comments='Stability')
im2_tmp = read_one(fileNames[j+1])
im2_tmp = crop_one(im2_tmp, self.imstack1.roi)
y_dim_tmp, x_dim_tmp = im2_tmp.shape
im2 = im2_tmp[edge_y[0]:y_dim_tmp-edge_y[1], edge_x[0]:x_dim_tmp-edge_x[1]]
ix_tmp, iy_tmp, res_tmp = Imagematch(im1, im2, subpixelmeth=self.subpixelmeth)
ix_tmp -= edge_x[0]
iy_tmp -= edge_y[0]
res_tmp = np.max(res_tmp)
ixs[j] = ix_tmp
iys[j] = iy_tmp
res[j] = res_tmp
return ixs, iys, res
[docs]
def stability_multi(self, edge_x, edge_y, cpu_no, verbose=True):
"""
Multiprocessing version of stability function.
.. warning:: **BE CAREFUL** to check the available and safe cpu numbers before run this function!!
Parameters
----------
edge_x : int, or [int, int]
Cutting area of the reference image in x direction.
If it is a single integer, it will be expanded automatically
to the list [int, int].
edge_y : int, or [int, int]
Cutting area of the reference image in y direction.
If it is a single integer, it will be expanded automatically
to the list [int, int].
cpu_no : int
Number of CPU to be used.
verbose : bool
To show the information or not. (default True)
Returns
-------
ixs, iys, res : numpy.array
Shifts in two dimensions and the coefficient.
"""
data_folder = self.imstack1.fpath
file_start = self.imstack1.fstart
file_step = self.imstack1.fstep
fileNum = self.imstack1.fnum
fileNames = natsort.natsorted(os.listdir(os.chdir(data_folder)))
if fileNum <= 0:
fileNames = fileNames[file_start::file_step]
self.fnum = len(fileNames)
fileNum = self.fnum
else:
fileNames = fileNames[file_start:file_start+fileNum*file_step:file_step]
jxs = list(np.arange(0, fileNum-1, 1))
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
ixs = np.zeros(fileNum-1)
iys = np.zeros(fileNum-1)
res = np.zeros(fileNum-1)
global process_tmp
def process_tmp(j):
if verbose:
#The indications may not appear in correct order, however, it may still be useful to keep outputing some information.
_indicator(j, len(jxs), comments='Stability')
im1_tmp = read_one(fileNames[0])
im1 = crop_one(im1_tmp, self.imstack1.roi)
im2_tmp = read_one(fileNames[j+1])
im2_tmp = crop_one(im2_tmp, self.imstack1.roi)
y_dim_tmp, x_dim_tmp = im2_tmp.shape
im2 = im2_tmp[edge_y[0]:y_dim_tmp-edge_y[1], edge_x[0]:x_dim_tmp-edge_x[1]]
ix_tmp, iy_tmp, res_tmp = Imagematch(im1, im2, subpixelmeth=self.subpixelmeth)
ix_tmp -= edge_x[0]
iy_tmp -= edge_y[0]
res_tmp = np.max(res_tmp)
return ix_tmp, iy_tmp, res_tmp
with multiprocessing.Pool(cpu_no) as pool:
results = pool.map(process_tmp, jxs)
for i in range(len(results)):
ixs[i] = results[i][0]
iys[i] = results[i][1]
res[i] = results[i][2]
return ixs, iys, res
[docs]
def collimate(self, edge_x, edge_y):
"""
This function uses the first image from both imstack1 and imstack2
to align the speckle patterns in the image stacks.
It is called before any further tracking method.
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to the list [int, int].
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to the list [int, int].
"""
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
subpixelmeth = self.subpixelmeth
if self.imstack1.rawdata is None:
self.imstack1.read_data()
self.imstack2.read_data()
im_temp = self.imstack1.data[0]
im_ref = self.imstack2.data[0]
y_size_tmp, x_size_tmp = im_temp.shape
im_temp = im_temp[edge_y[0]:y_size_tmp-edge_y[1], edge_x[0]:x_size_tmp-edge_x[1]]
ix_tmp, iy_tmp, res_tmp = Imagematch(im_ref, im_temp, subpixelmeth=subpixelmeth)
ix_tmp -= edge_x[0]
iy_tmp -= edge_y[0]
ix_tmp = round(ix_tmp)
iy_tmp = round(iy_tmp)
if np.max(res_tmp) < 0.5:
warnings.warn("Too low correlate coefficient, may have wrong tracking value!")
print("The coefficient is {:.3f}".format(np.max(res_tmp)))
if iy_tmp > 0:
data1_tmp = self.imstack1.data[0]
y_tmp1, _ = data1_tmp.shape
data2_tmp = self.imstack2.data[0, iy_tmp:, :]
y_tmp2, _ = data2_tmp.shape
y_tmp = min(y_tmp1, y_tmp2)
self.imstack1.data = self.imstack1.data[:, 0:y_tmp, :]
self.imstack2.data = self.imstack2.data[:, iy_tmp:iy_tmp+y_tmp, :]
else:
data1_tmp = self.imstack1.data[0, np.abs(iy_tmp):, :]
y_tmp1, _ = data1_tmp.shape
data2_tmp = self.imstack2.data[0]
y_tmp2, _ = data2_tmp.shape
y_tmp = min(y_tmp1, y_tmp2)
self.imstack1.data = self.imstack1.data[:, np.abs(iy_tmp):np.abs(iy_tmp)+y_tmp, :]
self.imstack2.data = self.imstack2.data[:, 0:y_tmp, :]
if ix_tmp > 0:
data1_tmp = self.imstack1.data[0]
_, x_tmp1 = data1_tmp.shape
data2_tmp = self.imstack2.data[0, :, ix_tmp:]
_, x_tmp2 = data2_tmp.shape
x_tmp = min(x_tmp1, x_tmp2)
self.imstack1.data = self.imstack1.data[:, :, 0:x_tmp]
self.imstack2.data = self.imstack2.data[:, :, ix_tmp:ix_tmp+x_tmp]
else:
data1_tmp = self.imstack1.data[0, :, np.abs(ix_tmp):]
_, x_tmp1 = data1_tmp.shape
data2_tmp = self.imstack2.data[0]
_, x_tmp2 = data2_tmp.shape
x_tmp = min(x_tmp1, x_tmp2)
self.imstack1.data = self.imstack1.data[:, :, np.abs(ix_tmp):np.abs(ix_tmp)+x_tmp]
self.imstack2.data = self.imstack2.data[:, :, 0:x_tmp]
return
def _XSS_2stacks_1D(self, edge_xy, edge_z, normalize=False, display=False, verbose=True, _Resreturn=False):
"""
**1D** speckle tracking for XSS technique with reference beam.
Returns
-------
ixs, iys, resmax : numpy.array
Shifts in x and y and the coefficient if _Resreturn=True.
"""
scandim = self.scandim
if scandim not in ['x', 'y']:
print("Unrecognized 1D scan mode. The supported methods are 1D X and 1D Y scan.")
sys.exit(0)
if isinstance(edge_xy, int):
edge_xy = (edge_xy, edge_xy)
if isinstance(edge_z, int):
edge_z = (edge_z, edge_z)
subpixelmeth = self.subpixelmeth
imNo, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
self.imstack1.data = self.imstack1.data[edge_z[0]:imNo-edge_z[1], :, edge_xy[0]:x_dim_tmp-edge_xy[1]]
y_dim = y_dim_tmp
ixs = np.zeros(y_dim)
iys = np.zeros(y_dim)
resmax = np.zeros(y_dim)
plane1_init = self.imstack1.data[:, 0, :].astype(np.float32)
plane2_init = self.imstack2.data[:, 0, :].astype(np.float32)
height, width = plane1_init.shape
if normalize:
plane1_init = NormImage(plane1_init)
plane2_init = NormImage(plane2_init)
ix_init, iy_init, res_init = Imagematch(plane2_init, plane1_init, subpixelmeth=subpixelmeth)
if display:
fig, h1, h2, h3, h4 = _initDisplay(plane1_init, plane2_init, res_init)
for i in range(y_dim):
if verbose:
if scandim == 'x':
_indicator(i, y_dim, comments = self._flag + ' technique in X direction')
if scandim == 'y':
_indicator(i, y_dim, comments = self._flag + ' technique in Y direction')
plane1 = self.imstack1.data[:, i, :].astype(np.float32)
plane2 = self.imstack2.data[:, i, :].astype(np.float32)
if normalize:
plane1 = NormImage(plane1)
plane2 = NormImage(plane2)
ix_tmp, iy_tmp, res_tmp = Imagematch(plane2, plane1, subpixelmeth=subpixelmeth)
ixs[i] = ix_tmp - edge_xy[0]
iys[i] = iy_tmp - edge_z[0]
resmax[i] = np.max(res_tmp)
if display:
_contiDisplay(fig, h1, h2, h3, h4, plane1, plane2, res_tmp)
if display and self.dimension == '2D': plt.close('all')
if scandim == 'y':
self.delayY = iys
self._delayX = ixs
self.resY = resmax
if scandim == 'x':
self.delayX = np.flip(iys)
self._delayY = np.flip(ixs)
self.resX = resmax
if _Resreturn:
return ixs, iys, resmax
[docs]
def XSS_withrefer(self, edge_x, edge_y, edge_z, hw_xy=None, pad_xy=None, normalize=False, display=False, verbose=True):
"""
Speckle tracking for XSS technique with reference beam.
Two image stacks are needed to define the Tracking class.
The fisrt image stack is the one with test optic.
The second image stack is the reference image stack.
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
edge_z : int, or [int, int]
Area needs to be cut in scan number dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The width/height of the image subregion. If Tracking.scandim is 'x',
it is the height of the subregion; if Tracking.scandim is 'y',
it is the width of the subregion.
Needed when do 2D data processing. (default None)
pad_xy : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
If it is a single integer, it will be expanded automatically
to (int, int).
Needed when do 2D data processing. (default None)
normalize : bool
To normalize the stitched image or not. (default False)
display : bool
To display or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
scandim = self.scandim
if self._flag is None: self._flag = 'XSS(with reference)'
width = hw_xy
if scandim not in ['x', 'y', 'xy']:
print("Unrecognized scan mode. It should be 'x', 'y' or 'xy'.")
sys.exit(0)
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if scandim == 'xy' and self.imstack3 == None:
print("Please provide another image stack.")
sys.exit(0)
if scandim == 'xy' and self.imstack4 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if scandim == 'xy':
if self.imstack1.data.shape[1] != self.imstack1.data.shape[2] \
or self.imstack2.data.shape[1] != self.imstack2.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
#if scandim == 'diag':
# self.imstack3 = copy.deepcopy(self.imstack1)
# self.imstack4 = copy.deepcopy(self.imstack2)
if scandim == 'x': edge_x = None
if scandim == 'y': edge_y = None
if isinstance(edge_z, int):
edge_z = (edge_z, edge_z)
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
verbose_tmp1 = self.imstack1.verbose
verbose_tmp2 = self.imstack2.verbose
self.imstack1.verbose = False
self.imstack2.verbose = False
self.imstack1.rot90deg()
self.imstack2.rot90deg()
self.imstack1.verbose = verbose_tmp1
self.imstack2.verbose = verbose_tmp2
if self.dimension == '1D':
if scandim == 'y':
if isinstance(edge_x, int):
edge_xy = (edge_x, edge_x)
else:
edge_xy = edge_x
if scandim == 'x':
if isinstance(edge_y, int):
edge_xy = (edge_y, edge_y)
else:
edge_xy = edge_y
self._XSS_2stacks_1D(edge_xy, edge_z, normalize, display, verbose)
if self.delayX is not None: self.sloX = slope_scan(self.delayX, self.pixsize, self.dist)
if self.delayY is not None: self.sloY = slope_scan(self.delayY, self.pixsize, self.dist)
if self.dimension == '2D':
subpixelmeth = self.subpixelmeth
if scandim == 'y':
if isinstance(edge_x, int):
edge_xy = (edge_x, edge_x)
else:
edge_xy = edge_x
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
if isinstance(edge_y, int):
edge_xy = (edge_y, edge_y)
else:
edge_xy = edge_y
if isinstance(pad_xy, int):
pad_xy = (pad_xy, pad_xy)
if scandim == 'xy':
if edge_x != edge_y or edge_xy[0] != edge_xy[1]:
print("For scandim == 'xy', the edges should be symmetrical, edge_x should be the same as edge_y, and also the elements of each.")
sys.exit(0)
if scandim == 'xy':
if pad_xy[0] != pad_xy[1]:
print("For scandim == 'xy', the pad should be symmetrical.")
sys.exit(0)
if pad_xy[0] > edge_xy[0] or pad_xy[1] > edge_xy[1]:
print("pad_xy should not be greater than edge_xy.")
sys.exit(0)
imNo, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
self.imstack1.data = self.imstack1.data[edge_z[0]:imNo-edge_z[1], :, edge_xy[0]:x_dim_tmp-edge_xy[1]]
imNo_tmp, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
jxs = np.arange(0, x_dim_tmp-width+1, 1)
delayY_2D = np.empty((y_dim_tmp, len(jxs)))
delayX_2D = np.empty((y_dim_tmp, len(jxs)))
res_2D = np.empty((y_dim_tmp, len(jxs)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
#if self.scandim == 'xy' or self.scandim == 'diag': self.scandim = 'x'
if self.scandim == 'xy': self.scandim = 'x'
for index, jx in enumerate(jxs):
if verbose:
_indicator(jx, len(jxs), comments = self._flag + ' technique in X direction')
self.imstack1.data = imstack1_data[:, :, jx:width+jx]
self.imstack2.data = imstack2_data[:, :, jx+edge_xy[0]-pad_xy[0]:width+jx+edge_xy[0]+pad_xy[1]]
edge_xy_new = 0
edge_z_new = 0
ix_new, iy_new, res_new = self._XSS_2stacks_1D(edge_xy_new, edge_z_new, normalize, display, verbose=False, _Resreturn=True)
delayX_2D[:, index] = ix_new - pad_xy[0]
delayY_2D[:, index] = iy_new - edge_z[0]
res_2D[:, index] = res_new
if scandim == 'y':
self.delayY = delayY_2D
self._delayX = delayX_2D
self.resY = res_2D
self.sloY = slope_scan(self.delayY, self.scanstep, self.dist)
self._sloX = slope_pixel(self._delayX, self.pixsize, self.dist)
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
self.delayX = np.rot90(delayY_2D, k=-1)
self._delayY = np.rot90(delayX_2D, k=-1)
self.resX = np.rot90(res_2D, k=-1)
self.sloX = slope_scan(self.delayX, self.scanstep, self.dist)
self._sloY = slope_pixel(self._delayY, self.pixsize, self.dist)
if scandim == 'xy': #or scandim == 'diag':
if self.imstack3.rawdata is None:
self.imstack3.read_data()
if self.imstack4.rawdata is None:
self.imstack4.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if scandim == 'xy':
if self.imstack3.data.shape[1] != self.imstack3.data.shape[2] \
or self.imstack4.data.shape[1] != self.imstack4.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
if isinstance(edge_x, int):
edge_xy = (edge_x, edge_x)
else:
edge_xy = edge_x
if pad_xy[0] > edge_xy[0] or pad_xy[1] > edge_xy[1]:
print("pad_xy should not be greater than edge_xy.")
sys.exit(0)
imNo2, y_dim_tmp2, x_dim_tmp2 = self.imstack3.data.shape
self.imstack3.data = self.imstack3.data[edge_z[0]:imNo2-edge_z[1], :, edge_xy[0]:x_dim_tmp2-edge_xy[1]]
imNo_tmp2, y_dim_tmp2, x_dim_tmp2 = self.imstack3.data.shape
jxs2 = np.arange(0, x_dim_tmp2-width+1, 1)
delayY_2D_2 = np.empty((y_dim_tmp2, len(jxs2)))
delayX_2D_2 = np.empty((y_dim_tmp2, len(jxs2)))
res_2D_2 = np.empty((y_dim_tmp2, len(jxs2)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
imstack3_data = copy.deepcopy(self.imstack3.data)
imstack4_data = copy.deepcopy(self.imstack4.data)
self.scandim = 'y'
for index, jx in enumerate(jxs2):
if verbose:
_indicator(jx, len(jxs), comments = self._flag + ' technique in Y direction')
self.imstack1.data = imstack3_data[:, :, jx:width+jx]
self.imstack2.data = imstack4_data[:, :, jx+edge_xy[0]-pad_xy[0]:width+jx+edge_xy[0]+pad_xy[1]]
edge_xy_new = 0
edge_z_new = 0
ix_new2, iy_new2, res_new2 = self._XSS_2stacks_1D(edge_xy_new, edge_z_new, normalize, display, verbose=False, _Resreturn=True)
delayX_2D_2[:, index] = ix_new2 - pad_xy[0]
delayY_2D_2[:, index] = iy_new2 - edge_z[0]
res_2D_2[:, index] = res_new2
self.scandim = scandim
self.delayY = delayY_2D_2
self._delayX = delayX_2D_2
self.resY = res_2D_2
self.imstack1.data = imstack1_data
self.imstack2.data = imstack2_data
### To cut the 2D map (align the results in two directions) for post-processing
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
self.delayX = self.delayX[:, edge_x[0]:-edge_x[1]-width+1]
self._delayY = self._delayY[:, edge_x[0]:-edge_x[1]-width+1]
self.delayY = self.delayY[edge_y[0]:-edge_y[1]-width+1, :]
self._delayX = self._delayX[edge_y[0]:-edge_y[1]-width+1, :]
self.sloX = slope_scan(self.delayX, self.scanstep, self.dist)
self.sloY = slope_scan(self.delayY, self.scanstep, self.dist)
#if scandim != 'diag':
self._sloX = slope_pixel(self._delayX, self.pixsize, self.dist)
self._sloY = slope_pixel(self._delayY, self.pixsize, self.dist)
return
[docs]
def XSS_withrefer_multi(self, edge_x, edge_y, edge_z, hw_xy, pad_xy, cpu_no, normalize=False, verbose=True):
"""
Speckle tracking for XSS technique with reference beam.
The fisrt image stack is the one with test optic.
The second image stack is the reference image stack.
.. warning:: **BE CAREFUL** to check the available and safe cpu numbers before run this function!!
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
edge_z : int, or [int, int]
Area needs to be cut in scan number dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The width/height of the image subregion. If Tracking.scandim is 'x',
it is the height of the subregion; if Tracking.scandim is 'y',
it is the width of the subregion.
Needed when do 2D data processing. (default None)
pad_xy : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
If it is a single integer, it will be expanded automatically
to (int, int).
Needed when do 2D data processing. (default None)
cpu_no : int
The number of CPUs that is available.
normalize : bool
To normalize the stitched image or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
if self._flag is None: self._flag = 'XSS(with reference)'
width = hw_xy
if width % 2 != 0: width += 1 # width should be even
scandim = self.scandim
if scandim not in ['x', 'y', 'xy']:
print("Unrecognized scan mode. It should be 'x', 'y' or 'xy'.")
sys.exit(0)
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if scandim == 'xy' and self.imstack3 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if scandim == 'xy':
if self.imstack1.data.shape[1] != self.imstack1.data.shape[2] \
or self.imstack2.data.shape[1] != self.imstack2.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
#if scandim == 'diag':
# self.imstack3 = copy.deepcopy(self.imstack1)
# self.imstack4 = copy.deepcopy(self.imstack2)
if scandim == 'x': edge_x = None
if scandim == 'y': edge_y = None
if isinstance(edge_z, int):
edge_z = (edge_z, edge_z)
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
verbose_tmp1 = self.imstack1.verbose
verbose_tmp2 = self.imstack2.verbose
self.imstack1.verbose = False
self.imstack2.verbose = False
self.imstack1.rot90deg()
self.imstack2.rot90deg()
self.imstack1.verbose = verbose_tmp1
self.imstack2.verbose = verbose_tmp2
if self.dimension == '1D':
print("No need to use multiprocessing for 1D analysis.")
sys.exit(0)
if self.dimension == '2D':
subpixelmeth = self.subpixelmeth
if scandim == 'y':
if isinstance(edge_x, int):
edge_xy = (edge_x, edge_x)
else:
edge_xy = edge_x
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
if isinstance(edge_y, int):
edge_xy = (edge_y, edge_y)
else:
edge_xy = edge_y
if isinstance(pad_xy, int):
pad_xy = (pad_xy, pad_xy)
if scandim == 'xy':
if edge_x != edge_y or edge_xy[0] != edge_xy[1]:
print("For scandim == 'xy', the edges should be symmetrical, edge_x should be the same as edge_y, and also the elements of each.")
sys.exit(0)
if scandim == 'xy':
if pad_xy[0] != pad_xy[1]:
print("For scandim == 'xy', the pad should be symmetrical, pad_x should be the same as pad_y, and also the elements of each.")
sys.exit(0)
if pad_xy[0] > edge_xy[0] or pad_xy[1] > edge_xy[1]:
print("pad_xy should not be greater than edge_xy.")
sys.exit(0)
imNo, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
self.imstack1.data = self.imstack1.data[edge_z[0]:imNo-edge_z[1], :, edge_xy[0]:x_dim_tmp-edge_xy[1]]
imNo_tmp, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
jxs = np.arange(0, x_dim_tmp-width+1, 1)
delayY_2D = np.empty((y_dim_tmp, len(jxs)))
delayX_2D = np.empty((y_dim_tmp, len(jxs)))
res_2D = np.empty((y_dim_tmp, len(jxs)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
#if self.scandim == 'xy' or self.scandim == 'diag': self.scandim = 'x'
if self.scandim == 'xy': self.scandim = 'x'
jxs1 = list(jxs)
global process_tmp1
def process_tmp1(jx):
if verbose:
if self.scandim == 'x':
_indicator(jx, len(jxs1), comments = self._flag + ' technique in X direction')
if self.scandim == 'y':
_indicator(jx, len(jxs1), comments = self._flag + ' technique in Y direction')
self.imstack1.data = imstack1_data[:, :, jx:width+jx]
self.imstack2.data = imstack2_data[:, :, jx+edge_xy[0]-pad_xy[0]:width+jx+edge_xy[0]+pad_xy[1]]
edge_xy_new = 0
edge_z_new = 0
ix_new1, iy_new1, res_new1 = self._XSS_2stacks_1D(edge_xy_new, edge_z_new, normalize, display=False, verbose=False, _Resreturn=True)
ix_new1 -= pad_xy[0]
iy_new1 -= edge_z[0]
return ix_new1, iy_new1, res_new1
with multiprocessing.Pool(cpu_no) as pool:
results = pool.map(process_tmp1, jxs1)
for i in range(len(results)):
delayX_2D[:, i] = results[i][0]
delayY_2D[:, i] = results[i][1]
res_2D[:, i] = results[i][2]
if scandim == 'y':
self.delayY = delayY_2D
self._delayX = delayX_2D
self.resY = res_2D
self.sloY = slope_scan(self.delayY, self.scanstep, self.dist)
self._sloX = slope_pixel(self._delayX, self.pixsize, self.dist)
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
self.delayX = np.rot90(delayY_2D, k=-1)
self._delayY = np.rot90(delayX_2D, k=-1)
self.resX = np.rot90(res_2D, k=-1)
if scandim == 'x':
self.sloX = slope_scan(self.delayX, self.scanstep, self.dist)
self._sloY = slope_pixel(self._delayY, self.pixsize, self.dist)
if scandim == 'xy': #or scandim == 'diag':
if self.imstack3.rawdata is None:
self.imstack3.read_data()
if self.imstack4.rawdata is None:
self.imstack4.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if self.imstack3.data.shape[1] != self.imstack3.data.shape[2] \
or self.imstack4.data.shape[1] != self.imstack4.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
if isinstance(edge_x, int):
edge_xy = (edge_x, edge_x)
else:
edge_xy = edge_x
if pad_xy[0] > edge_xy[0] or pad_xy[1] > edge_xy[1]:
print("pad_xy should not be greater than edge_xy.")
sys.exit(0)
imNo2, y_dim_tmp2, x_dim_tmp2 = self.imstack3.data.shape
self.imstack3.data = self.imstack3.data[edge_z[0]:imNo2-edge_z[1], :, edge_xy[0]:x_dim_tmp2-edge_xy[1]]
imNo_tmp2, y_dim_tmp2, x_dim_tmp2 = self.imstack3.data.shape
jxs2 = np.arange(0, x_dim_tmp2-width+1, 1)
delayY_2D_2 = np.empty((y_dim_tmp2, len(jxs2)))
delayX_2D_2 = np.empty((y_dim_tmp2, len(jxs2)))
res_2D_2 = np.empty((y_dim_tmp2, len(jxs2)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
imstack3_data = copy.deepcopy(self.imstack3.data)
imstack4_data = copy.deepcopy(self.imstack4.data)
self.scandim = 'y'
jxs2 = list(jxs2)
global process_tmp2
def process_tmp2(jx):
if verbose:
_indicator(jx, len(jxs2), comments = self._flag + ' technique in Y direction')
self.imstack1.data = imstack3_data[:, :, jx:width+jx]
self.imstack2.data = imstack4_data[:, :, jx+edge_xy[0]-pad_xy[0]:width+jx+edge_xy[0]+pad_xy[1]]
edge_xy_new = 0
edge_z_new = 0
ix_new2, iy_new2, res_new2 = self._XSS_2stacks_1D(edge_xy_new, edge_z_new, normalize, display=False, verbose=False, _Resreturn=True)
ix_new2 -= pad_xy[0]
iy_new2 -= edge_z[0]
return ix_new2, iy_new2, res_new2
with multiprocessing.Pool(cpu_no) as pool:
results = pool.map(process_tmp2, jxs2)
for i in range(len(results)):
delayX_2D_2[:, i] = results[i][0]
delayY_2D_2[:, i] = results[i][1]
res_2D_2[:, i] = results[i][2]
self.imstack1.data = imstack1_data
self.imstack2.data = imstack2_data
self.scandim = scandim
self.delayY = delayY_2D_2
self._delayX = delayX_2D_2
self.resY = res_2D_2
### To cut the 2D map (align the results in two directions) for post-processing
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
self.delayX = self.delayX[:, edge_x[0]+width//2:-edge_x[1]-width//2+1]
self._delayY = self._delayY[:, edge_x[0]+width//2:-edge_x[1]-width//2+1]
self.delayY = self.delayY[edge_y[0]+width//2:-edge_y[1]-width//2+1, :]
self._delayX = self._delayX[edge_y[0]+width//2:-edge_y[1]-width//2+1, :]
self.sloX = slope_scan(self.delayX, self.scanstep, self.dist)
self.sloY = slope_scan(self.delayY, self.scanstep, self.dist)
#if scandim != 'diag':
self._sloX = slope_pixel(self._delayX, self.pixsize, self.dist)
self._sloY = slope_pixel(self._delayY, self.pixsize, self.dist)
return
[docs]
def XSS_self(self, edge_x, edge_y, edge_z, nstep, hw_xy=None, pad_xy=None, normalize=False, display=False, verbose=True):
"""
Speckle tracking for self-reference XSS technique.
Only one image stack, the one with test optic, is needed.
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
edge_z : int, or [int, int]
Area needs to be cut in scan number dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
nstep : int
The space between two chosen columns or rows.
hw_xy : int
The width/height of the image subregion. If Tracking.scandim is 'x',
it is the height of the subregion; if Tracking.scandim is 'y',
it is the width of the subregion.
Needed when do 2D data processing. (default None)
pad_xy : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
If it is a single integer, it will be expanded automatically
to (int, int).
Needed when do 2D data processing. (default None)
normalize : bool
To normalize the stitched image or not. (default False)
display : bool
To display or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
self._flag = "Self-reference XSS"
width = hw_xy
scandim = self.scandim
if scandim not in ['x', 'y', 'xy']:
print("Unrecognized scan mode. It should be 'x', 'y' or 'xy'.")
sys.exit(0)
if scandim == 'xy':
print("This method needs to be implemented. Try to use 'x' and 'y' seperately.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2 is None:
self.imstack2 = copy.deepcopy(self.imstack1)
else:
print("Only one image stack is needed. Please check your input.")
sys.exit(0)
imNo, y_dim, x_dim = self.imstack1.data.shape
if scandim == 'y':
self.imstack1.data = self.imstack1.data[:, nstep:, :]
self.imstack2.data = self.imstack2.data[:, 0:y_dim-nstep, :]
if scandim == 'x':
self.imstack1.data = self.imstack1.data[:, :, nstep:]
self.imstack2.data = self.imstack2.data[:, :, 0:x_dim-nstep]
self.XSS_withrefer(edge_x, edge_y, edge_z, width, pad_xy, normalize=normalize, display=display, verbose=verbose)
self.sloX = None
self.sloY = None
if scandim == 'x':
self.curvX = curv_scan(self.delayX, self.scanstep, self.dist, self.pixsize, nstep, self.mempos)
if scandim == 'y':
self.curvY = curv_scan(self.delayY, self.scanstep, self.dist, self.pixsize, nstep, self.mempos)
return
[docs]
def XSS_self_multi(self, edge_x, edge_y, edge_z, nstep, hw_xy, pad_xy, cpu_no, normalize=False, verbose=True):
"""
Speckle tracking for self-reference XSS technique.
Only one image stack, the one with test optic, is needed.
.. warning:: **BE CAREFUL** to check the available and safe cpu numbers before run this function!!
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
edge_z : int, or [int, int]
Area needs to be cut in scan number dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
nstep : int
The space between two chosen columns or rows.
hw_xy : int
The width/height of the image subregion. If Tracking.scandim is 'x',
it is the height of the subregion; if Tracking.scandim is 'y',
it is the width of the subregion.
Needed when do 2D data processing.
pad_xy : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
If it is a single integer, it will be expanded automatically
to (int, int).
Needed when do 2D data processing.
cpu_no : int
The number of CPUs that is available.
normalize : bool
To normalize the stitched image or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
self._flag = "Self-reference XSS"
width = hw_xy
scandim = self.scandim
if scandim not in ['x', 'y', 'xy']:
print("Unrecognized scan mode. It should be 'x', 'y' or 'xy'.")
sys.exit(0)
if scandim == 'xy':
print("This method needs to be implemented. Try to use 'x' and 'y' seperately.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2 is None:
self.imstack2 = copy.deepcopy(self.imstack1)
else:
print("Only one image stack is needed. Please check your input.")
sys.exit(0)
imNo, y_dim, x_dim = self.imstack1.data.shape
if scandim == 'y':
self.imstack1.data = self.imstack1.data[:, nstep:, :]
self.imstack2.data = self.imstack2.data[:, 0:y_dim-nstep, :]
if scandim == 'x':
self.imstack1.data = self.imstack1.data[:, :, nstep:]
self.imstack2.data = self.imstack2.data[:, :, 0:x_dim-nstep]
self.XSS_withrefer_multi(edge_x, edge_y, edge_z, width, pad_xy, cpu_no, normalize, verbose=verbose)
self.sloX = None
self.sloY = None
if scandim == 'x':
self.curvX = curv_scan(self.delayX, self.scanstep, self.dist, self.pixsize, nstep, self.mempos)
if scandim == 'y':
self.curvY = curv_scan(self.delayY, self.scanstep, self.dist, self.pixsize, nstep, self.mempos)
return
def _XST_2images_1D(self, edge_x, edge_y, pad_xy, hw_xy, normalize=False, display=False, verbose=True, _Resreturn=False):
"""
**1D** self-reference conventional speckle tracking technique.
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to the list [int, int].
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to the list [int, int].
pad_xy : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
Needed when do 2D data processing. (default None)
hw_xy : int
It defines the width or height that needed in the template image
for x or y data processing.
normalize : bool
To normalize the stitched image or not. (default False)
display : bool
To display or not. (default False)
verbose : bool
To show the information or not. (default True)
Returns
-------
ixs, iys, resmax : numpy.array
Shifts in x and y and the coefficient.
"""
scandim = self.scandim
if scandim not in ['x', 'y']:
print("Unrecognized 1D scan mode. The supported methods are 1D X and 1D Y scan.")
sys.exit(0)
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
if isinstance(pad_xy, int):
pad_xy = (pad_xy, pad_xy)
#if scandim == 'x':
# if pad_xy[0] > edge_x[0] or pad_xy[1] > edge_x[1]:
# print("pad_xy should not be greater than edge_x.")
# sys.exit(0)
#if scandim == 'y':
# if pad_xy[0] > edge_y[0] or pad_xy[1] > edge_y[1]:
# print("pad_xy should not be greater than edge_y.")
# sys.exit(0)
subpixelmeth = self.subpixelmeth
imNo, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
y_dim = y_dim_tmp - edge_y[0] - edge_y[1]- hw_xy + 1
ixs = np.zeros(y_dim)
iys = np.zeros(y_dim)
resmax = np.zeros(y_dim)
plane1_init = self.imstack1.data[0, 0+edge_y[0]:hw_xy+edge_y[0], edge_x[0]:x_dim_tmp-edge_x[1]].astype(np.float32)
plane2_init = self.imstack2.data[0, 0+edge_y[0]-pad_xy[0]:hw_xy+edge_y[0]+pad_xy[1], :].astype(np.float32)
if normalize:
plane1_init = NormImage(plane1_init)
plane2_init = NormImage(plane2_init)
ix_init, iy_init, res_init = Imagematch(plane2_init, plane1_init, subpixelmeth=subpixelmeth)
if display:
fig, h1, h2, h3, h4 = _initDisplay(plane1_init, plane2_init, res_init)
for i in range(y_dim):
if verbose:
if scandim == 'x':
_indicator(i, y_dim, comments = self._flag + ' technique in X direction')
if scandim == 'y':
_indicator(i, y_dim, comments = self._flag + ' technique in Y direction')
plane1 = self.imstack1.data[0, i+edge_y[0]:hw_xy+i+edge_y[0], edge_x[0]:x_dim_tmp-edge_x[1]].astype(np.float32)
plane2 = self.imstack2.data[0, i+edge_y[0]-pad_xy[0]:hw_xy+i+edge_y[0]+pad_xy[1], :].astype(np.float32)
if normalize:
plane1 = NormImage(plane1)
plane2 = NormImage(plane2)
ix_tmp, iy_tmp, res_tmp = Imagematch(plane2, plane1, subpixelmeth=subpixelmeth)
ixs[i] = ix_tmp - edge_x[0]
iys[i] = iy_tmp - pad_xy[0]
resmax[i] = np.max(res_tmp)
if display:
_contiDisplay(fig, h1, h2, h3, h4, plane1, plane2, res_tmp)
if display and self.dimension == '2D': plt.close('all')
if scandim == 'y':
self.delayY = iys
self._delayX = ixs
self.resY = resmax
if scandim == 'x':
self.delayX = np.flip(iys)
self._delayY = np.flip(ixs)
self.resX = resmax
if _Resreturn:
return ixs, iys, resmax
[docs]
def XST_self(self, edge_x, edge_y, pad_x, pad_y, hw_xy, window=None, normalize=False, display=False, verbose=True):
"""
Speckle tracking for self-reference XST technique.
Two image stacks are needed. Both are with test optic.
One image stack consists only one image when the diffuser is at one position,
another image stack consists another image when the diffuser
is at another position.
This technique has been described in [XST_selfpaper]_:
.. [XST_selfpaper] Hu, L., Wang, H., Fox, O., & Sawhney, K. (2022).
Fast wavefront sensing for X-ray optics with an alternating speckle tracking technique.
Opt. Exp., 30(18), 33259-33273.
https://doi.org/10.1364/OE.460163
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
pad_x : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in x direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'y', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
pad_y : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in y direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'x', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The height (when Tracking.scandim is 'y')
or the width (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
window : int
The width (when Tracking.scandim is 'y')
or the height (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
Only used when Tracking.dimension is '2D'. (default None)
normalize : bool
To normalize the stitched image or not. (default False)
display : bool
To display or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
if self._flag is None: self._flag = "Self-reference XST"
scandim = self.scandim
if scandim not in ['x', 'y', 'xy']:
print("Unrecognized scan mode. It should be 'x', 'y' or 'xy'.")
sys.exit(0)
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if scandim == 'xy' and self.imstack3 == None:
print("Please provide another image stack.")
sys.exit(0)
if scandim == 'xy' and self.imstack4 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if scandim == 'xy':
if self.imstack1.data.shape[1] != self.imstack1.data.shape[2] \
or self.imstack2.data.shape[1] != self.imstack2.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
verbose_tmp1 = self.imstack1.verbose
verbose_tmp2 = self.imstack2.verbose
self.imstack1.verbose = False
self.imstack2.verbose = False
self.imstack1.rot90deg()
self.imstack2.rot90deg()
self.imstack1.verbose = verbose_tmp1
self.imstack2.verbose = verbose_tmp2
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
if isinstance(pad_x, int):
pad_x = (pad_x, pad_x)
if isinstance(pad_y, int):
pad_y = (pad_y, pad_y)
if scandim == 'xy':
if edge_x != edge_y or edge_x[0] != edge_x[1] or edge_y[0] != edge_y[1]:
print("For scandim == 'xy', the edges should be symmetrical, edge_x should be the same as edge_y, and also the elements of each.")
sys.exit(0)
if scandim == 'xy':
if pad_x != pad_y or pad_x[0] != pad_x[1] or pad_y[0] != pad_y[1]:
print("For scandim == 'xy', the pad should be symmetrical, pad_x should be the same as pad_y, and also the elements of each.")
sys.exit(0)
if self.dimension == '1D':
if scandim == 'x':
pad_y = None
pad_xy = (pad_x[1], pad_x[0])
edge_x = (edge_x[1], edge_x[0])
self._XST_2images_1D(edge_y, edge_x, pad_xy, hw_xy, normalize, display, verbose)
if scandim == 'y':
pad_x = None
pad_xy = pad_y
self._XST_2images_1D(edge_x, edge_y, pad_xy, hw_xy, normalize, display, verbose)
if self.delayX is not None: self.curvX = curv_XST(self.delayX, self.scanstep, self.dist, self.pixsize, self.mempos)
if self.delayY is not None: self.curvY = curv_XST(self.delayY, self.scanstep, self.dist, self.pixsize, self.mempos)
if self.dimension == '2D':
if scandim == 'x' or scandim == 'xy':
edge_x_new = edge_y
edge_y_new = (edge_x[1], edge_x[0])
pad_x_new = pad_y
pad_y_new = (pad_x[1], pad_x[0])
if scandim == 'y':
edge_x_new = edge_x
edge_y_new = edge_y
pad_x_new = pad_x
pad_y_new = pad_y
if pad_x_new[0] > edge_x_new[0] or pad_x_new[1] > edge_x_new[1]:
print("pad should not be greater than edge.")
if pad_y_new[0] > edge_y_new[0] or pad_y_new[1] > edge_y_new[1]:
print("pad should not be greater than edge.")
imNo, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
jxs = np.arange(0, x_dim_tmp-edge_x_new[0]-edge_x_new[1]-window+1, 1)
delayY_2D = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
delayX_2D = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
res_2D = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
if self.scandim == 'xy': self.scandim = 'x'
for index, jx in enumerate(jxs):
if verbose:
if self.scandim == 'x':
_indicator(jx, len(jxs), comments = self._flag + 'technique in X direction')
if self.scandim == 'y':
_indicator(jx, len(jxs), comments = self._flag + 'technique in Y direction')
y_ind_left = edge_y_new[0] - pad_y_new[0]
y_ind_right = y_dim_tmp - edge_y_new[1] + pad_y_new[1]
x_ind_left = jx + edge_x_new[0] - pad_x_new[0]
x_ind_right = jx + edge_x_new[0] + window + pad_x_new[1]
self.imstack1.data = imstack1_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
self.imstack2.data = imstack2_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
ix_new, iy_new, res_new = self._XST_2images_1D(pad_x_new, pad_y_new, pad_y_new, hw_xy, normalize, display, verbose=False, _Resreturn=True)
delayX_2D[:, index] = ix_new #- pad_x_new[0]
delayY_2D[:, index] = iy_new #- pad_y_new[0]
res_2D[:, index] = res_new
if scandim == 'y':
self.delayY = delayY_2D
self._delayX = delayX_2D
self.resY = res_2D
self.curvY = curv_XST(self.delayY, self.scanstep, self.dist, self.pixsize, self.mempos)
self._curvX = None
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
self.delayX = np.rot90(delayY_2D, k=-1)
self._delayY = np.rot90(delayX_2D, k=-1)
self.resX = np.rot90(res_2D, k=-1)
self.curvX = curv_XST(self.delayX, self.scanstep, self.dist, self.pixsize, self.mempos)
self._curvY = None
if scandim == 'xy':
if self.imstack3.rawdata is None:
self.imstack3.read_data()
if self.imstack4.rawdata is None:
self.imstack4.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if self.imstack3.data.shape[1] != self.imstack3.data.shape[2] \
or self.imstack4.data.shape[1] != self.imstack4.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
edge_x_new = edge_x
edge_y_new = edge_y
pad_x_new = pad_x
pad_y_new = pad_y
if pad_x_new[0] > edge_x_new[0] or pad_x_new[1] > edge_x_new[1]:
print("pad should not be greater than edge.")
if pad_y_new[0] > edge_y_new[0] or pad_y_new[1] > edge_y_new[1]:
print("pad should not be greater than edge.")
imNo, y_dim_tmp, x_dim_tmp = self.imstack3.data.shape
jxs = np.arange(0, x_dim_tmp-edge_x_new[0]-edge_x_new[1]-window+1, 1)
delayY_2D_2 = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
delayX_2D_2 = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
res_2D_2 = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
imstack3_data = copy.deepcopy(self.imstack3.data)
imstack4_data = copy.deepcopy(self.imstack4.data)
self.scandim = 'y'
for index, jx in enumerate(jxs):
if verbose:
if self.scandim == 'x':
_indicator(jx, len(jxs), comments = self._flag + 'technique in X direction')
if self.scandim == 'y':
_indicator(jx, len(jxs), comments = self._flag + 'technique in Y direction')
y_ind_left = edge_y_new[0] - pad_y_new[0]
y_ind_right = y_dim_tmp - edge_y_new[1] + pad_y_new[1]
x_ind_left = jx + edge_x_new[0] - pad_x_new[0]
x_ind_right = jx + edge_x_new[0] - pad_x_new[0] + window + pad_x_new[1]
self.imstack1.data = imstack3_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
self.imstack2.data = imstack4_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
ix_new2, iy_new2, res_new2 = self._XST_2images_1D(pad_x_new, pad_y_new, pad_y_new, hw_xy, normalize, display, verbose=False, _Resreturn=True)
delayX_2D_2[:, index] = ix_new2 #- pad_x_new[0]
delayY_2D_2[:, index] = iy_new2 #- pad_y_new[0]
res_2D_2[:, index] = res_new2
self.scandim = scandim
self.delayY = delayY_2D_2
self._delayX = delayX_2D_2
self.resY = res_2D_2
self.imstack1.data = imstack1_data
self.imstack2.data = imstack2_data
self.curvY = curv_XST(self.delayY, self.scanstep, self.dist, self.pixsize, self.mempos)
self._curvX = None
return
[docs]
def XST_self_multi(self, edge_x, edge_y, pad_x, pad_y, hw_xy, window, cpu_no, normalize=False, verbose=True):
"""
Speckle tracking for self-reference XST technique.
Two image stacks are needed. Both are with test optic.
One image stack consists one image when the diffuser is at one position,
another image stack consists another image when the diffuser
is at another position.
This technique has been described in [XST_selfpaper2]_:
.. [XST_selfpaper2] Hu, L., Wang, H., Fox, O., & Sawhney, K. (2022).
Fast wavefront sensing for X-ray optics with an alternating speckle tracking technique.
Opt. Exp., 30(18), 33259-33273.
https://doi.org/10.1364/OE.460163
.. warning:: **BE CAREFUL** to check the available and safe cpu numbers before run this function!!
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
pad_x : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in x direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'y', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
pad_y : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in y direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'x', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The height (when Tracking.scandim is 'y')
or the width (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
window : int
The width (when Tracking.scandim is 'y')
or the height (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
Only used when Tracking.dimension is '2D'. (default None)
cpu_no : int
The number of CPUs that is available.
normalize : bool
To normalize the stitched image or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
if self._flag is None: self._flag = "Self-reference XST"
scandim = self.scandim
if scandim not in ['x', 'y', 'xy']:
print("Unrecognized scan mode. It should be 'x', 'y' or 'xy'.")
sys.exit(0)
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if scandim == 'xy' and self.imstack3 == None:
print("Please provide another image stack.")
sys.exit(0)
if scandim == 'xy' and self.imstack4 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if scandim == 'xy':
if self.imstack1.data.shape[1] != self.imstack1.data.shape[2] \
or self.imstack2.data.shape[1] != self.imstack2.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
verbose_tmp1 = self.imstack1.verbose
verbose_tmp2 = self.imstack2.verbose
self.imstack1.verbose = False
self.imstack2.verbose = False
self.imstack1.rot90deg()
self.imstack2.rot90deg()
self.imstack1.verbose = verbose_tmp1
self.imstack2.verbose = verbose_tmp2
if isinstance(edge_x, int):
edge_x = (edge_x, edge_x)
if isinstance(edge_y, int):
edge_y = (edge_y, edge_y)
if isinstance(pad_x, int):
pad_x = (pad_x, pad_x)
if isinstance(pad_y, int):
pad_y = (pad_y, pad_y)
if scandim == 'xy':
if edge_x != edge_y or edge_x[0] != edge_x[1] or edge_y[0] != edge_y[1]:
print("For scandim == 'xy', the edges should be symmetrical, edge_x should be the same as edge_y, and also the elements of each.")
sys.exit(0)
if scandim == 'xy':
if pad_x != pad_y or pad_x[0] != pad_x[1] or pad_y[0] != pad_y[1]:
print("For scandim == 'xy', the pad should be symmetrical, pad_x should be the same as pad_y, and also the elements of each.")
sys.exit(0)
if self.dimension == '1D':
print("No need to use multiprocessing for 1D analysis.")
sys.exit(0)
if self.dimension == '2D':
if scandim == 'x' or scandim == 'xy':
edge_x_new = edge_y
edge_y_new = (edge_x[1], edge_x[0])
pad_x_new = pad_y
pad_y_new = (pad_x[1], pad_x[0])
if scandim == 'y':
edge_x_new = edge_x
edge_y_new = edge_y
pad_x_new = pad_x
pad_y_new = pad_y
if pad_x_new[0] > edge_x_new[0] or pad_x_new[1] > edge_x_new[1]:
print("pad should not be greater than edge.")
if pad_y_new[0] > edge_y_new[0] or pad_y_new[1] > edge_y_new[1]:
print("pad should not be greater than edge.")
imNo, y_dim_tmp, x_dim_tmp = self.imstack1.data.shape
jxs = np.arange(0, x_dim_tmp-edge_x_new[0]-edge_x_new[1]-window+1, 1)
delayY_2D = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
delayX_2D = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
res_2D = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
if self.scandim == 'xy': self.scandim = 'x'
jxs1 = list(jxs)
global process_tmp1
def process_tmp1(jx):
if verbose:
if self.scandim == 'x':
_indicator(jx, len(jxs1), comments = self._flag + 'technique in X direction')
if self.scandim == 'y':
_indicator(jx, len(jxs1), comments = self._flag + 'technique in Y direction')
y_ind_left = edge_y_new[0] - pad_y_new[0]
y_ind_right = y_dim_tmp - edge_y_new[1] + pad_y_new[1]
x_ind_left = jx + edge_x_new[0] - pad_x_new[0]
x_ind_right = jx + edge_x_new[0] + window + pad_x_new[1]
self.imstack1.data = imstack1_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
self.imstack2.data = imstack2_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
ix_new1, iy_new1, res_new1 = self._XST_2images_1D(pad_x_new, pad_y_new, pad_y_new, hw_xy, normalize, display=False, verbose=False, _Resreturn=True)
#ix_new1 -= pad_x_new[0]
#iy_new1 -= pad_y_new[0]
return ix_new1, iy_new1, res_new1
with multiprocessing.Pool(cpu_no) as pool:
results = pool.map(process_tmp1, jxs1)
for i in range(len(results)):
delayX_2D[:, i] = results[i][0]
delayY_2D[:, i] = results[i][1]
res_2D[:, i] = results[i][2]
if scandim == 'y':
self.delayY = delayY_2D
self._delayX = delayX_2D
self.resY = res_2D
self.curvY = curv_XST(self.delayY, self.scanstep, self.dist, self.pixsize, self.mempos)
self._curvX = None
if scandim == 'x' or scandim == 'xy': #or scandim == 'diag':
self.delayX = np.rot90(delayY_2D, k=-1)
self._delayY = np.rot90(delayX_2D, k=-1)
self.resX = np.rot90(res_2D, k=-1)
self.curvX = curv_XST(self.delayX, self.scanstep, self.dist, self.pixsize, self.mempos)
self._curvY = None
if scandim == 'xy':
if self.imstack3.rawdata is None:
self.imstack3.read_data()
if self.imstack4.rawdata is None:
self.imstack4.read_data()
# For 'xy' case, ROI should be a square, for 2D integaration
if self.imstack3.data.shape[1] != self.imstack3.data.shape[2] \
or self.imstack4.data.shape[1] != self.imstack4.data.shape[2]:
print("For scandim is 'xy', the ROI should be a square, please re-select.")
sys.exit(0)
edge_x_new = edge_x
edge_y_new = edge_y
pad_x_new = pad_x
pad_y_new = pad_y
if pad_x_new[0] > edge_x_new[0] or pad_x_new[1] > edge_x_new[1]:
print("pad should not be greater than edge.")
if pad_y_new[0] > edge_y_new[0] or pad_y_new[1] > edge_y_new[1]:
print("pad should not be greater than edge.")
imNo, y_dim_tmp, x_dim_tmp = self.imstack3.data.shape
jxs = np.arange(0, x_dim_tmp-edge_x_new[0]-edge_x_new[1]-window+1, 1)
delayY_2D_2 = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
delayX_2D_2 = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
res_2D_2 = np.empty((y_dim_tmp-edge_y_new[0]-edge_y_new[1]-hw_xy+1, len(jxs)))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
imstack3_data = copy.deepcopy(self.imstack3.data)
imstack4_data = copy.deepcopy(self.imstack4.data)
self.scandim = 'y'
jxs2 = list(jxs)
global process_tmp2
def process_tmp2(jx):
if verbose:
if self.scandim == 'x':
_indicator(jx, len(jxs2), comments = self._flag + 'technique in X direction')
if self.scandim == 'y':
_indicator(jx, len(jxs2), comments = self._flag + ' technique in Y direction')
y_ind_left = edge_y_new[0] - pad_y_new[0]
y_ind_right = y_dim_tmp - edge_y_new[1] + pad_y_new[1]
x_ind_left = jx + edge_x_new[0] - pad_x_new[0]
x_ind_right = jx + edge_x_new[0] - pad_x_new[0] + window + pad_x_new[1]
self.imstack1.data = imstack3_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
self.imstack2.data = imstack4_data[:, y_ind_left:y_ind_right, x_ind_left:x_ind_right]
ix_new2, iy_new2, res_new2 = self._XST_2images_1D(pad_x_new, pad_y_new, pad_y_new, hw_xy, normalize, display=False, verbose=False, _Resreturn=True)
#ix_new2 -= pad_x_new[0]
#iy_new2 -= pad_y_new[0]
return ix_new2, iy_new2, res_new2
with multiprocessing.Pool(cpu_no) as pool:
results = pool.map(process_tmp2, jxs2)
for i in range(len(results)):
delayX_2D_2[:, i] = results[i][0]
delayY_2D_2[:, i] = results[i][1]
res_2D_2[:, i] = results[i][2]
self.scandim = scandim
self.delayY = delayY_2D_2
self._delayX = delayX_2D_2
self.resY = res_2D_2
self.imstack1.data = imstack1_data
self.imstack2.data = imstack2_data
self.curvY = curv_XST(self.delayY, self.scanstep, self.dist, self.pixsize, self.mempos)
self._curvX = None
return
[docs]
def XST_withrefer(self, edge_x, edge_y, pad_x, pad_y, hw_xy, window=None, normalize=False, display=False, verbose=True):
"""
Speckle tracking for conventional XST technique, with reference beam.
Two image stacks are needed.
The fisrt image stack consists one image when the diffuser is in the beam,
another image stack consists one reference image without the tested optic.
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
pad_x : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in x direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'y', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
pad_y : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in y direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'x', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The height (when Tracking.scandim is 'y')
or the width (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
window : int
The width (when Tracking.scandim is 'y')
or the height (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
Only used when Tracking.dimension is '2D'. (default None)
normalize : bool
To normalize the stitched image or not. (default False)
display : bool
To display or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
self._flag = "XST(with reference)"
self.scandim = 'y'
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
self.XST_self(edge_x, edge_y, pad_x, pad_y, hw_xy, window, normalize, display, verbose)
self.delayX = copy.deepcopy(self._delayX)
self._delayX = None
self.sloX = slope_pixel(self.delayX, self.pixsize, self.dist)
self.sloY = slope_pixel(self.delayY, self.pixsize, self.dist)
self.scandim = 'random'
return
[docs]
def XST_withrefer_multi(self, edge_x, edge_y, pad_x, pad_y, hw_xy, window, cpu_no, normalize=False, verbose=True):
"""
Speckle tracking for conventional XST technique, with reference beam.
Two image stacks are needed.
The fisrt image stack consists one image when the diffuser is in the beam,
another image stack consists one reference image without the tested optic.
.. warning:: **BE CAREFUL** to check the available and safe cpu numbers before run this function!!
Parameters
----------
edge_x : int, or [int, int]
Area needs to be cut in x dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='x' (scan in x direction), it is useless.
edge_y : int, or [int, int]
Area needs to be cut in y dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
If Tracking.scandim='y' (scan in y direction), it is useless.
pad_x : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in x direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'y', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
pad_y : int, or [int, int]
It defines the extra part the reference image needed to do
the tracking in y direction. If Tracking.dimension is '1D'
**and** Tracking.scandim is is 'x', it is useless.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The height (when Tracking.scandim is 'y')
or the width (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
window : int
The width (when Tracking.scandim is 'y')
or the height (when Tracking.scandim is 'x')
of the subregion to be chosen from the template for cross-correlation.
Only used when Tracking.dimension is '2D'.
cpu_no : int
The number of CPUs that is available.
normalize : bool
To normalize the stitched image or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
self._flag = "XST(with reference)"
self.scandim = 'y'
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
self.XST_self_multi(edge_x, edge_y, pad_x, pad_y, hw_xy, window, cpu_no, normalize, verbose)
self.delayX = copy.deepcopy(self._delayX)
self._delayX = None
self.sloX = slope_pixel(self.delayX, self.pixsize, self.dist)
self.sloY = slope_pixel(self.delayY, self.pixsize, self.dist)
self.scandim = 'random'
return
[docs]
def XSVT_withrefer(self, edge_xy, edge_z, hw_xy=None, pad_xy=None, normalize=False, display=False, verbose=True):
"""
Speckle tracking for XSVT technique with reference beam.
The fisrt image stack is the one with test optic.
The second image stack is the reference image stack.
Parameters
----------
edge_xy : int, or [int, int]
Area needs to be cut in x and y dimensions.
If it is a single integer, it will be expanded automatically
to (int, int).
edge_z : int, or [int, int]
Area needs to be cut in scan number dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The width and height of the image subregion.
Needed when do 2D data processing. (default None)
pad_xy : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
If it is a single integer, it will be expanded automatically
to (int, int).
Needed when do 2D data processing. (default None)
normalize : bool
To normalize the stitched image or not. (default False)
display : bool
To display or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
self._flag = "XSVT(with reference)"
self.scandim = 'random'
self.scanstep = 1. # Dummy
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.dimension == '1D':
print("The 1D data processing is not supported for XSVT method. \
Instead, you can cut a small strip of data and do 2D \
data processing and extract the information you want.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
if isinstance(edge_xy, int):
edge_xy = (edge_xy, edge_xy)
if isinstance(edge_z, int):
edge_z = (edge_z, edge_z)
self.imstack3 = copy.deepcopy(self.imstack1)
self.imstack4 = copy.deepcopy(self.imstack2)
#Use the underscored delay in the XSS method, thus delayY is _delayX, delayX is _delayY.
self.scandim = 'xy'
self.XSS_withrefer(edge_xy, edge_xy, edge_z, hw_xy, pad_xy, normalize, display, verbose)
delayX = copy.deepcopy(self._delayY)
delayY = copy.deepcopy(self._delayX)
self.delayX = delayX
self.delayY = delayY
self._delayX = None
self._delayY = None
self._sloX = None
self._sloY = None
self.sloX = slope_pixel(self.delayX, self.pixsize, self.dist)
self.sloY = slope_pixel(self.delayY, self.pixsize, self.dist)
self.scandim = 'random'
return
[docs]
def XSVT_withrefer_multi(self, edge_xy, edge_z, hw_xy, pad_xy, cpu_no, normalize=False, verbose=True):
"""
Speckle tracking for XSVT technique with reference beam.
The fisrt image stack is the one with test optic.
The second image stack is the reference image stack.
.. warning:: **BE CAREFUL** to check the available and safe cpu numbers before run this function!!
Parameters
----------
edge_xy : int, or [int, int]
Area needs to be cut in x and y dimensions.
If it is a single integer, it will be expanded automatically
to (int, int).
edge_z : int, or [int, int]
Area needs to be cut in scan number dimension.
If it is a single integer, it will be expanded automatically
to (int, int).
hw_xy : int
The width and height of the image subregion.
Needed when do 2D data processing.
pad_xy : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
If it is a single integer, it will be expanded automatically
to (int, int).
Needed when do 2D data processing.
cpu_no : int
The number of CPUs that is available.
normalize : bool
To normalize the stitched image or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
self._flag = "XSVT(with reference)"
self.scandim = 'random'
self.scanstep = 1. # Dummy
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.dimension == '1D':
print("The 1D data processing is not supported for XSVT method. \
Instead, you can cut a small strip of data and do 2D \
data processing and extract the information you want.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
if isinstance(edge_xy, int):
edge_xy = (edge_xy, edge_xy)
if isinstance(edge_z, int):
edge_z = (edge_z, edge_z)
self.imstack3 = copy.deepcopy(self.imstack1)
self.imstack4 = copy.deepcopy(self.imstack2)
#Use the underscored delay in the XSS method, thus delayY is _delayX, delayX is _delayY.
self.scandim = 'xy'
self.XSS_withrefer_multi(edge_xy, edge_xy, edge_z, hw_xy, pad_xy, cpu_no, normalize, verbose)
delayX = copy.deepcopy(self._delayY)
delayY = copy.deepcopy(self._delayX)
self.delayX = delayX
self.delayY = delayY
self._delayX = None
self._delayY = None
self._sloX = None
self._sloY = None
self.sloX = slope_pixel(self.delayX, self.pixsize, self.dist)
self.sloY = slope_pixel(self.delayY, self.pixsize, self.dist)
self.scandim = 'random'
return
[docs]
def Hartmann_XST(self, cen_xmesh, cen_ymesh, pad, size, normalize=False, display=False, verbose=True):
"""
Hartmann-like data processing procedure.
Two image stacks are needed.
The fisrt image stack consists one sample image.
The second image stack consists another reference image.
.. note::
For simplicity, unlike other mode of data processing,
we only provide ``Tracking.delayX`` and ``Tracking.delayY``
for this method. Recovering the appropriate physical quantities
from the speckle shifts is left to the discretion of the users.
Parameters
----------
cen_xmsh : numpy.ndarray
Mesh grid of the x coordinate of the box centre. A 2D array.
cen_ymsh : numpy.ndarray
Mesh grid of the y coordinate of the box centre. A 2D array.
pad : int, or [int, int]
It defines the extra part the reference image needed to do the tracking.
If it is a single integer, it will be expanded automatically
to (int, int).
size : int, or [int, int]
It defines the size of the box used for Hartmann-like tracking mode.
If it is a single integer, it will be expanded automatically
to (int, int). size[0] is the half width of the chosen box, size[1] is
the half height of the chosen box.
normalize : bool
To normalize the stitched image or not. (default False)
display : bool
To display or not. (default False)
verbose : bool
To show the information or not. (default True)
"""
if self._flag is None: self._flag = "Hartmann-like"
if self.imstack2 == None:
print("Please provide another image stack.")
sys.exit(0)
if self.imstack1.rawdata is None:
self.imstack1.read_data()
if self.imstack2.rawdata is None:
self.imstack2.read_data()
if isinstance(pad, int):
pad = (pad, pad)
if isinstance(size, int):
size = (size, size)
y_dim_tmp, x_dim_tmp = cen_xmesh.shape
delayX_2D = np.empty((y_dim_tmp, x_dim_tmp))
delayY_2D = np.empty((y_dim_tmp, x_dim_tmp))
res_2D = np.empty((y_dim_tmp, x_dim_tmp))
imstack1_data = copy.deepcopy(self.imstack1.data)
imstack2_data = copy.deepcopy(self.imstack2.data)
im_sam_init = imstack1_data[0][cen_ymesh[0, 0]-size[1]:cen_ymesh[0, 0]+size[1], cen_xmesh[0, 0]-size[0]:cen_xmesh[0, 0]+size[0]]
im_ref_init = imstack2_data[0][cen_ymesh[0, 0]-size[1]-pad[1]:cen_ymesh[0, 0]+size[1]+pad[1], cen_xmesh[0, 0]-size[0]-pad[0]:cen_xmesh[0, 0]+size[0]+pad[0]]
subpixelmeth = self.subpixelmeth
if normalize:
im_sam_init = NormImage(im_sam_init)
im_ref_init = NormImage(im_ref_init)
ix_init, iy_init, res_init = Imagematch(im_ref_init, im_sam_init, subpixelmeth=subpixelmeth)
if display:
fig, h1, h2, h3, h4 = _initDisplay(im_sam_init, im_ref_init, res_init)
for iy in range(y_dim_tmp):
if verbose:
_indicator(iy, y_dim_tmp, comments = self._flag)
for ix in range(x_dim_tmp):
cen_index_x = cen_xmesh[iy, ix]
cen_index_y = cen_ymesh[iy, ix]
im_sam = imstack1_data[0][cen_index_y-size[1]:cen_index_y+size[1], cen_index_x-size[0]:cen_index_x+size[0]]
im_ref = imstack2_data[0][cen_index_y-size[1]-pad[1]:cen_index_y+size[1]+pad[1], cen_index_x-size[0]-pad[0]:cen_index_x+size[0]+pad[0]]
subpixelmeth = self.subpixelmeth
if normalize:
im_sam = NormImage(im_sam)
im_ref = NormImage(im_ref)
ix_tmp, iy_tmp, res_tmp = Imagematch(im_ref, im_sam, subpixelmeth=subpixelmeth)
delayX_2D[iy, ix] = ix_tmp - pad[0]
delayY_2D[iy, ix] = iy_tmp - pad[1]
res_2D[iy, ix] = np.max(res_tmp)
if display:
_contiDisplay(fig, h1, h2, h3, h4, im_sam, im_ref, res_tmp)
self.delayY = delayY_2D
self.delayX = delayX_2D
return