import deltascope as cranium
import multiprocessing as mp
import time
import os
from functools import partial
from sys import argv
from sys import exc_info
import re
import json
[docs]class paramsClass:
'''
A class to read and validate parameters for multiprocessing transformation.
Validated parameters can be read as attributes of the object
'''
def __init__(self,path):
'''
Read json data in config file and validate that parameter inputs are correct
:param str path: Complete path to the config file
'''
# Read file containing config data and parse json data
config_data = open(path).read()
params = json.loads(config_data)
self.check_config(params,path)
[docs] def add_outdir(self,path):
'''
Add out directory as an attribute of the class
:param str path: Complete path to the output directory
'''
self.outdir = path
[docs] def check_config(self,D,path):
'''
Check that each parameter in the config file is correct and raise an error if it isn't
:param dict D: Dictionary containing parameters from the config file
:param str path: Complete filepath to the config file
'''
#Check that the rootdir is valid
if D['rootdir'] == '':
print('Root directory path (rootdir) is not defined. Modify in',path)
raise
elif os.path.isdir(D['rootdir']):
self.rootdir = D['rootdir']
else:
print('Root directory path (rootdir) must specify an existing directory. Modify in',path)
raise
#Check that c1, structural channel, is defined
if D['c1-dir'] == '':
print('C1 directory path (c1-dir) is not defined. Modify in',path)
raise
elif os.path.isdir(D['c1-dir']):
self.c1_dir = D['c1-dir']
self.c1_files = os.listdir(self.c1_dir)
else:
print('C1 directory path (c1-dir) is not defined. Modify in',path)
raise
#Check c1 key
if D['c1-key'] == '':
print('C1 directory key (c1-key) is not defined. Modify in',path)
raise
else:
self.c1_key = D['c1-key']
#Check other channels and add to list if valid
self.Lcdir = []
self.Lcfiles = []
self.Lckey = []
for d,k in zip(['c2-dir','c3-dir','c4-dir'],['c2-key','c3-key','c4-key']):
if D[d] != '':
if os.path.isdir(D[d]):
self.Lcdir.append(D[d])
self.Lcfiles.append(os.listdir(D[d]))
if D[k] == '':
print('Channel key (',k,') is not defined. Modify in',path)
raise
else:
self.Lckey.append(D[k])
else:
print('Channel directory path (',d,') is not defined. Modify in',path)
raise
#Validate that experiment name is defined
if D['expname'] == '':
print('Experiment name (expname) is not defined in',path)
raise
else:
self.expname = D['expname']
#Check median threshold value
if D['medthresh'] <= 1 and D['medthresh'] >= 0:
self.medthresh = D['medthresh']
else:
print('Median threshold (medthresh) must be a number between 0 and 1. Modify in',path)
raise
#Check that radius is an integer
if type(D['radius']) == int:
self.radius = D['radius']
else:
print('Radius input must be an integer. Modify in',path)
raise
#Check genthresh float between 0 and 1
if D['genthresh'] <= 1 and D['genthresh'] >= 0:
self.genthresh = D['genthresh']
else:
print('General threshold input must be a number between 0 and 1. Modify in',path)
raise
#Check format of microns
if type(D['microns']) == list:
for i in D['microns']:
#If it is a float or int
if not (type(i) == float) | (type(i) == int):
print('Micron inputs must be numeric values. Modify in',path)
raise
self.microns = D['microns']
else:
print('Micron input must be a list of numeric values. Modify in',path)
raise
#Check degree
if type(D['deg']) != int:
print('Degree input must be an integer. Modify in',path)
raise
else:
self.deg = D['deg']
#Check that comporder is a list of integers between 0 and 2
if type(D['comporder']) == list:
for i in D['comporder']:
#Check that it is an int between 0 and 2
if i not in [0,1,2]:
print('Comporder input must be a list of integers between 0 and 2. Modify in',path)
raise
self.comporder = D['comporder']
else:
print('Comporder input must be a list of integers between 0 and 2. Modify in',path)
raise
#Check fit dimensions
if type(D['fitdim']) == list:
for i in D['fitdim']:
if i not in ['x','y','z']:
print('Fitdim input must be a list of \'x\', \'y\', or \'z\'. Modify in',path)
raise
self.fitdim = D['fitdim']
else:
print('Fitdim input must be a list of \'x\', \'y\', or \'z\'. Modify in',path)
raise
if type(D['twoD']) == bool:
self.twoD = D['twoD']
else:
print('Specification for 2D transformation must be boolean. Modify in',path)
raise
self.scale = [1,1,1]
print('All parameter inputs are correct')
[docs]def check_nums(P):
'''
Check that the numbers of files selected by the same list index match
Raises an error if file numbers are mismatched
:param :class:`paramClass` P: Object containing all variables from config file
'''
Lnums = []
print('hello')
for i,f in enumerate(P.c1_files):
print(i,f)
if 'h5' in f:
n = re.findall(r'\d+',f.split('.')[0])[0]
print('n',n)
for Lf in P.Lcfiles:
if n not in Lf[i]:
print('File numbers are mismatched between channel directories.')
raise
Lnums.append(i)
return(Lnums)
[docs]def process(num,P=None):
'''
Run through the processing steps for a single sample through saving psi files
:param int num: Index of the file that is currently being processed
:param :class:`paramClass` P: Object containing all variables from config file
'''
tic = time.time()
print(num,'Starting sample')
try:
e = cranium.embryo(P.expname,num,P.outdir)
#Add channels and preprocess data
e.add_channel(os.path.join(P.c1_dir,P.c1_files[num]),P.c1_key)
e.chnls[P.c1_key].preprocess_data(P.genthresh,P.scale,P.microns)
for i in range(len(P.Lcdir)):
e.add_channel(os.path.join(P.Lcdir[i],P.Lcfiles[i][num]),P.Lckey[i])
e.chnls[P.Lckey[i]].preprocess_data(P.genthresh,P.scale,P.microns)
#Calculate PCA transformation for structural channel, c1
if P.twoD == True:
e.chnls[P.c1_key].calculate_pca_median_2d(e.chnls[P.c1_key].raw_data,P.medthresh,P.radius,P.microns)
pca = e.chnls[P.c1_key].pcamed
e.chnls[P.c1_key].pca_transform_2d(e.chnls[P.c1_key].df_thresh,pca,P.comporder,P.fitdim,deg=P.deg)
#Transform additional channels
for i in range(len(P.Lcdir)):
e.chnls[P.Lckey[i]].pca_transform_2d(e.chnls[P.Lckey[i]].df_thresh,pca,P.comporder,P.fitdim,deg=P.deg)
else:
e.chnls[P.c1_key].calculate_pca_median(e.chnls[P.c1_key].raw_data,P.medthresh,P.radius,P.microns)
pca = e.chnls[P.c1_key].pcamed
e.chnls[P.c1_key].pca_transform_3d(e.chnls[P.c1_key].df_thresh,pca,P.comporder,P.fitdim,deg=P.deg)
#Transform additional channels
for i in range(len(P.Lcdir)):
e.chnls[P.Lckey[i]].pca_transform_3d(e.chnls[P.Lckey[i]].df_thresh,pca,P.comporder,P.fitdim,deg=P.deg)
# print(num,'Starting coordinate transformation')
# e.chnls[P.c1_key].transform_coordinates()
# for i in range(len(P.Lcdir)):
# e.chnls[P.Lckey[i]].transform_coordinates()
#e.save_psi()
columns = ['x','y','z']
for ch in e.chnls.keys():
cranium.write_data(os.path.join(e.outdir,
e.name+'_'+str(e.number)+'_'+ch+'.psi'),
e.chnls[ch].df_align[columns])
toc = time.time()
print(num,'Complete',toc-tic)
except:
toc = time.time()
print(num,'Failed',toc-tic,exc_info())
if __name__=='__main__':
f,config_path = argv
P = paramsClass(config_path)
#Create out directory stamped with current date and time
outdir = os.path.join(P.rootdir,'Output'+time.strftime("%m-%d-%H-%M",time.localtime()))
os.mkdir(outdir)
P.add_outdir(outdir)
print('outdir',outdir)
processfxn = partial(process,P=P)
Lnums = check_nums(P)
n = len(Lnums)
# Initiate map pools in sets of 5 samples
for i in range(0,n,5):
if i+5>n:
L = Lnums[i:n]
else:
L = Lnums[i:i+5]
pool = mp.Pool()
pool.map(processfxn,L)
pool.close()
pool.join()