# -*- coding: utf-8 -*-
"""
Tools for applying motion correction to k-space data
"""
import logging
import h5py
from .dataIO import *
[docs]def calc_H(traj, D, spacing):
"""Calculate phase correction matrix
Args:
traj (array): Trajectory to correct
D (dict): Correction factors ({'dx','dy', 'dz'})
spacing (array): Voxel spacing [x,y,z]
Returns:
array: Phase correction matrix
"""
dx = D['dx']/spacing[0]
dy = D['dy']/spacing[1]
dz = D['dz']/spacing[2]
# We assume that the trajectory is normalised to -0.5 to 0.5
xF = traj[:, :, 0]
yF = traj[:, :, 1]
zF = traj[:, :, 2]
H = np.exp(2j*np.pi*(xF*dx + yF*dy + zF*dz))
return H
def apply_moco(data_in, traj_in, D_reg, spacing):
"""
Applies rigid body motion correction to k-space data and trajetory.
ITK versor applies the rotation and then the translation
Correct approach is thus to first rotate trajectory then apply
the corresponding phase correcttion on these coordinates
Args:
data_in (array): K-space data
traj_in (array): Trajectory
D_reg (dict): Registration dictionary
spacing (array): Image voxel size
Returns:
(array, array): corrected data and corrected trajectory
"""
traj_corr = np.matmul(traj_in, D_reg['R'])
H = calc_H(traj_corr, D_reg, spacing)
if np.isnan(H[:]).any():
raise ValueError("H is nan")
data_corr = np.zeros_like(data_in)
for ircv in range(np.shape(data_in)[-1]):
data_corr[..., ircv] = data_in[..., ircv]*H
return data_corr, traj_corr
[docs]def moco_single(source_h5, dest_h5, reg, nlores):
"""Corrects a radial dataset from a pickle file.
Args:
source_h5 (str): Source file to correct
dest_h5 (str): Output file
reg_list (list): Reg file
"""
# Load data
logging.info("Opening source file: %s" % source_h5)
f_source = h5py.File(source_h5, 'r')
info = f_source['info'][:]
spacing = info['voxel_size'][0]
spokes_lo = nlores
lo_scale = info['lo_scale'][0]
traj = f_source['trajectory'][:]
traj_corr = np.zeros_like(traj)
data = f_source['noncartesian'][:]
data_corr = np.zeros_like(data)
logging.info("Correcting data and trajectories")
traj_corr = np.matmul(traj, reg['R'])
dc, tc = apply_moco(data_in=data[0, spokes_lo:, :, :], traj_in=traj[spokes_lo:, :, :],
D_reg=reg, spacing=spacing)
data_corr[:, spokes_lo:, ...] = dc
traj_corr[spokes_lo:, ...] = tc
if spokes_lo > 0:
lo_scale = np.max(traj_corr[spokes_lo:, ...][:]) / \
np.max(traj_corr[0:spokes_lo, ...][:])
dc, tc = apply_moco(data_in=data[0, 0:spokes_lo, :, :], traj_in=traj[0:spokes_lo, :, :],
D_reg=reg, spacing=spacing*lo_scale)
data_corr[:, 0:spokes_lo, ...] = dc
traj_corr[0:spokes_lo, ...] = tc
# Write data to destination file
valid_dest_h5 = check_filename(dest_h5)
logging.info("Opening destination file: %s" % valid_dest_h5)
write_kspace_h5(valid_dest_h5, data_corr,
traj_corr, info, f_source=f_source)
f_source.close()
def moco_combined(source_h5, dest_h5, reg_list, nlores):
"""Corrects a combined radial dataset from list of pickle files
Args:
source_h5 (str): Source file to correct
dest_h5 (str): Output file
reg_list (list): List of registration dictionaries
"""
# Load data
logging.info("Opening source file: %s" % source_h5)
f_source = h5py.File(source_h5, 'r')
info = f_source['info'][:]
spacing = info['voxel_size'][0]
spokes_lo = nlores
n_interleaves = len(reg_list)
traj = f_source['trajectory'][:]
traj_corr = np.zeros_like(traj)
data = f_source['noncartesian'][:]
data_corr = np.zeros_like(data)
# We don't correct any lowres spokes
idx0 = 0
idx1 = int(spokes_lo)
if spokes_lo > 0:
data_corr[0, idx0:idx1, :, :] = data[0, idx0:idx1, :, :]
traj_corr[idx0:idx1, :, :] = traj[idx0:idx1, :, :]
logging.info("Correcting data and trajectories")
for (i, D_reg) in enumerate(reg_list):
logging.info("Processing interleave %d/%d" % (i+1, n_interleaves))
idx0 = idx1
idx1 = idx0 + D_reg['spi']
dc, tc = apply_moco(data_in=data[0, idx0:idx1, :, :], traj_in=traj[idx0:idx1, :, :],
D_reg=D_reg, spacing=spacing)
data_corr[0, idx0:idx1, ...] = dc
traj_corr[idx0:idx1, ...] = tc
# Write data to destination file
valid_dest_h5 = check_filename(dest_h5)
logging.info("Opening destination file: %s" % valid_dest_h5)
write_kspace_h5(valid_dest_h5, data_corr,
traj_corr, info, f_source=f_source)
logging.info("Closing all files")
f_source.close()
[docs]def moco_sw(source_h5, dest_h5, reg_list, nseg, nlores):
"""Corrects radial dataset from a sliding window reconstruction.
Args:
source_h5 (str): Source file to correct
dest_h5 (str): Output file
reg_list (list): List of registration dictionaries
nseg (int): Number of segments per interleave (equivalent to sliding window step)
"""
# Load data
logging.info("Opening source file: %s" % source_h5)
f_source = h5py.File(source_h5, 'r')
info = f_source['info'][:]
spacing = info['voxel_size'][0]
spokes_lo = nlores
n_nav = len(reg_list)
traj = f_source['trajectory'][:]
traj_corr = np.zeros_like(traj)
data = f_source['noncartesian'][:]
data_corr = np.zeros_like(data)
# We don't correct any lowres spokes
idx0 = 0
idx1 = int(spokes_lo)
if spokes_lo > 0:
data_corr[0, idx0:idx1, :, :] = data[0, idx0:idx1, :, :]
traj_corr[idx0:idx1, :, :] = traj[idx0:idx1, :, :]
logging.info("Correcting data and trajectories")
logging.info(f"sps: {int(reg_list[0]['spi']/nseg)}")
# Loop over all segments
for iseg in range(n_nav+nseg-1):
inav = iseg - int(np.floor(nseg/2))
if inav < 0:
inav = 0
if inav > n_nav-1:
inav = n_nav-1
# Match segment to navigator for recon params
logging.info("Processing segment %d/%d Using Nav %d" %
(iseg+1, n_nav, inav))
D_reg = reg_list[inav]
idx0 = idx1 # Start where last interleave ended
sps = int(D_reg['spi']/nseg) # Spokes per segment
idx1 = idx0 + sps
print("inav: {}, i0:i1: {}:{}".format(inav, idx0, idx1))
dc, tc = apply_moco(data_in=data[0, idx0:idx1, :, :], traj_in=traj[idx0:idx1, :, :],
D_reg=D_reg, spacing=spacing)
data_corr[0, idx0:idx1, ...] = dc
traj_corr[idx0:idx1, ...] = tc
# Write data to destination file
valid_dest_h5 = check_filename(dest_h5)
logging.info("Opening destination file: %s" % valid_dest_h5)
write_kspace_h5(valid_dest_h5, data_corr,
traj_corr, info, f_source=f_source)
logging.info("Closing all files")
f_source.close()