Source code for pymerlin.dataIO

# -*- coding: utf-8 -*-
"""
MERLIN includes several convenience functions for data input and output. 
Since pyMERLIN is tightly integrated with RIESLING, functions for reading k-space and image ``.h5`` files have been included.
"""

import argparse
import logging
import os

import h5py
import ismrmrd
import itk
import numpy as np
import xmltodict


[docs]def parse_fname(fname): """Parse the filename from a NIFTI file Args: fname (str): Input filename Returns: str: Filename witout file ending """ bname, ext = os.path.splitext(os.path.basename(fname)) if ext.lower() == '.nii': return bname elif ext.lower() == '.gz': bname2, ext2 = os.path.splitext(bname) if ext2 == '.nii': return bname2
[docs]def arg_check_h5(fname): """Check h5 file ending Args: fname (str): Filename to check Returns: str: Filename or error """ bname, ext = os.path.splitext(fname) if ext.lower() in ['.h5', '.hd5', '.hf5', '.hdf5']: return fname else: raise ValueError( "{} doesn't seem to have the right file ending (HDF5 file)".format(fname))
[docs]def arg_check_nii(fname): """Check nifti file ending Args: fname (str): Filename to check Returns: str: Filename or error """ bname, ext = os.path.splitext(fname) if ext.lower() == '.nii': return fname elif ext.lower() == '.gz': bname2, ext2 = os.path.splitext(ext) if ext2 == '.nii': return fname return argparse.ArgumentError("{} doesn't seem to be a valid nifti file".format(fname))
[docs]def check_filename(fname): """ Check if file exists If file already exist it will append a number to produce a unique filename. Args: fname (str): Filename Returns: str: Unique filename """ if os.path.exists(fname): logging.warning("%s already exists" % fname) fpath, fext = os.path.splitext(fname) counter = 1 fname = "%s_%d%s" % (fpath, counter, fext) while os.path.exists(fname): counter += 1 logging.info("%s does not exists" % fname) return fname
[docs]def read_ismrmrd(fname): """Read ISMRM Raw Data format Args: fname (str): ISMRM Raw Data filename Returns: dict: Data and meta data as dictionary """ # Read in dataset as ISMRM Raw Data file to get header f_ismrmrd = ismrmrd.Dataset(fname, '/dataset', True) header = xmltodict.parse(f_ismrmrd.read_xml_header()) user_vars = header['ismrmrdHeader']['userParameters']['userParameterDouble'] for l in user_vars: vals = l.values() if 'Highres Spokes' in vals: nSpokesHigh = int(list(vals)[1]) if 'WASPI Spokes' in vals: nSpokesLow = int(list(vals)[1]) acq = f_ismrmrd.read_acquisition(0) nrcv = acq.active_channels npts = acq.number_of_samples # Now read it in as a native HDF5 file for faster data reading fh5 = h5py.File(fname, 'r') dataset = fh5['dataset'] data = np.asarray(dataset['data']) # Load highres data traj = np.zeros((nSpokesHigh, npts, 3)) traj_low = None ks = np.zeros((nrcv, nSpokesHigh, npts), dtype=np.complex64) ks_low = None for i in range(nSpokesHigh): ks[:, i, :] = data[i][2].view(np.complex64).reshape((nrcv, npts)) traj[i, :, :] = data[i][1].reshape((npts, 3)) # Load lowres data if nSpokesLow > 0: traj_low = np.zeros((nSpokesLow, npts, 3)) ks_low = np.zeros((nrcv, nSpokesLow, npts), dtype=np.complex64) for i in range(nSpokesLow): ks_low[:, i, :] = data[nSpokesHigh + i][2].view(np.complex64).reshape((nrcv, npts)) traj_low[i, :, :] = data[nSpokesHigh + i][1].reshape((npts, 3)) D = {'RawHigh': ks, 'RawLow': ks_low, 'TrajHigh': traj, 'TrajLow': traj_low, 'Header': header} # Close files fh5.close() f_ismrmrd.close() return D
[docs]def create_image(img_array, spacing, corners=None, max_image_value=None, dtype=None): """Create an ITK image object from array Will take magnitude of the data. Does not impose any geometry. Args: img_array (array): 3D image array spacing (list): Voxel size (dx,dy,dz) corners (list, optional): Not implemented. Defaults to None. max_image_value (float, optional): Rescale data to max value. Defaults to None. dtype (itk.dtype, optional): ITK data type for casting. Defaults to None. Returns: itk.Image: ITK Image object """ # Can only do single volume img = itk.image_from_array(np.abs(np.ascontiguousarray(img_array))) # Don't assume isotropic voxels img.SetSpacing([float(x) for x in spacing]) # Set origin in center of the image img.SetOrigin([-img_array.shape[i]/2*spacing[i] for i in range(3)]) if max_image_value: # Rescale image intensity RescaleFilterType = itk.RescaleIntensityImageFilter[type( img), type(img)] rescaleFilter = RescaleFilterType.New() rescaleFilter.SetInput(img) rescaleFilter.SetOutputMinimum(0) rescaleFilter.SetOutputMaximum(max_image_value) rescaleFilter.Update() img_out = rescaleFilter.GetOutput() else: img_out = img # Cast filter if dtype: InputPixelType = itk.template(img_out)[1][0] InputImageType = itk.Image[InputPixelType, 3] OutputImageType = itk.Image[dtype, 3] cast_filter = itk.CastImageFilter[InputImageType, OutputImageType].New( ) cast_filter.SetInput(img_out) cast_filter.Update() img_out = cast_filter.GetOutput() return img_out
[docs]def read_image_h5(h5_file, vol=0, echo=0): """Read image h5 file Args: h5_file (str): File name Returns: (array,array): Image array and voxel spacing """ f = h5py.File(h5_file, 'r') data = f['image'][echo, :, :, :, vol] spacing = f['info'][0][1] f.close() return data, spacing
[docs]def read_radial_h5(h5_file): """Read radial k-space h5 file Args: h5_file (str): Filename Returns: (array,array): k-space data and trajectory """ f = h5py.File(h5_file, 'r') # Reshape trajectory traj = f['trajectory'] # Reshape data data = f['noncartesian'] return data, traj
[docs]def make_3D(img): """Extract first 3D volume from 4D dataset Args: img (np.array): 4D or 3D array Returns: np.array: 3D array """ if len(img.shape) > 3: return img[:, :, :, 0] return img
def write_kspace_h5(f_dest, noncartesian, trajectory, info, f_source=None): f_dest = h5py.File(f_dest, 'w') logging.info("Writing info and meta data") f_dest.create_dataset("info", data=info) if f_source: try: f_source.copy('meta', f_dest) except: logging.info("No meta data") f_source.copy('echoes', f_dest) logging.info("Writing trajectory") traj_chunk_dims = list(trajectory.shape) if traj_chunk_dims[0] > 1024: traj_chunk_dims[0] = 1024 f_dest.create_dataset("trajectory", data=trajectory, chunks=tuple(traj_chunk_dims), compression='gzip') logging.info("Writing k-space data") data_chunk_dims = list(noncartesian.shape) if data_chunk_dims[1] > 1024: data_chunk_dims[1] = 1024 f_dest.create_dataset("noncartesian", dtype='c8', data=noncartesian, chunks=tuple(data_chunk_dims), compression='gzip') logging.info("Closing {}".format(f_dest)) f_dest.close() def get_merlin_fields(): valid_input_args = ['nspokes', 'nlores', 'spoke_step', 'make_brain_mask', 'brain_mask_file', 'sense_maps', 'cg_its', 'ds', 'fov', 'overwrite_files', 'riesling_verbosity', 'ref_nav_num', 'metric', 'batch_itk', 'batch_riesling', 'threads_itk', 'threads_riesling'] return valid_input_args def check_merlin_inputs(fname): valid_input_args = get_merlin_fields() with open('merlin_input_args.txt') as f: eof = False input_arguments = {} while not eof: line = f.readline().split('\n')[0] if len(line) > 0: arg = line.split('=')[0] val = line.split('=')[1] if arg not in input_args: raise ValueError('%s is not a valid input argument' % arg) input_arguments[arg] = val else: eof = True # Check for invalid combinations if (input_arguments['make_brain_mask'] == 1) and (len(input_arguments['brain_mask_file']) > 0): raise ValueError( 'Cannot use make_brain_mask and brain_mask_file together. Choose one')