import pathlib
import warnings
import numpy as np
import qpimage
import qpsphere
from . import util
#: Output sphere analysis qpimage.QPSeries data
FILE_SPHERE_DATA = "sphere_{}_{}_data.h5"
#: Output sphere analysis statistics
FILE_SPHERE_STAT = "sphere_{}_{}_statistics.txt"
[docs]class EdgeDetectionFailedWarning(UserWarning):
pass
[docs]def analyze_sphere(h5roi, dir_out, r0=10e-6, method="edge",
model="projection", edgekw={}, imagekw={},
alpha=.18, rad_fact=1.2, ret_changed=False):
"""Perform sphere analysis
Parameters
----------
h5series: str
Path of qpimage.QPSeries hdf5 file
dir_out: str
Path to output directory
r0: float
Initial radius
method: str
Either "edge" or "image"; see :ref:`config_sphere` for
more information.
model: str
Propagation model to use; see :ref:`config_sphere` for
more information.
edgekw: dict
Keyword arguments to :func:`qpsphere.edgefit.contour_canny`
imagekw: dict
Keyword arguments to :func:`qpsphere.imagefit.alg.match_phase`
alpha: float
Refraction increment [mL/g]
rad_fact: float
Radial inclusion factor for dry mass computation
ret_changed: bool
Return boolean indicating whether the sphere data on disk was
created/updated (True) or whether previously created ROI
data was used (False).
"""
dir_out = pathlib.Path(dir_out).resolve()
h5out = dir_out / FILE_SPHERE_DATA.format(method, model)
statout = dir_out / FILE_SPHERE_STAT.format(method, model)
with qpimage.QPSeries(h5file=h5roi, h5mode="r") as qps:
cfgid = util.hash_object([qps,
r0,
method,
model,
edgekw,
imagekw if method == "image" else None,
alpha,
rad_fact])
create = mode_for_sphere_analysis(h5in=h5roi, h5out=h5out, cfgid=cfgid)
# initialize file
initmode = "w" if create else "r"
with qpimage.QPSeries(h5file=h5out, h5mode=initmode) as qps_out:
# get all simulation identifiers from previous analysis
ids_out = [qpi["identifier"] for qpi in qps_out]
with qpimage.QPSeries(h5file=h5roi, h5mode="r") as qps_in, \
statout.open(mode="w") as fd:
header = ["identifier",
"index",
"radius_um",
"rel_dry_mass_pg",
"abs_dry_mass_pg",
"time",
"medium",
]
fd.write("#" + "\t".join(header) + "\r\n")
for qpi in qps_in:
simident = "{}:{}:sim:{}".format(qpi["identifier"], cfgid, model)
if simident in ids_out:
# read simulation results
with qpimage.QPSeries(h5file=h5out, h5mode="r") as qps_out:
n = qps_out[simident]["sim index"]
r = qps_out[simident]["sim radius"]
c = qps_out[simident]["sim center"]
else:
try:
# fit sphere model
n, r, c, qpi_sim = qpsphere.analyze(qpi,
r0=r0,
method=method,
model=model,
edgekw=edgekw,
imagekw=imagekw,
ret_center=True,
ret_qpi=True)
except qpsphere.edgefit.RadiusExceedsImageSizeError:
# Edge detection cannot proceed because the presumed
# object radius exceeds the image size. This might be
# the result of a "size variation" set too large.
msg = "Edge detection failed for ROI " \
+ "{}! Try ".format(qpi["identifier"]) \
+ "reducing the value of [roi]: 'size variation'."
warnings.warn(msg, EdgeDetectionFailedWarning)
# use dummy data
n = np.nan
r = np.nan
c = (np.nan, np.nan)
qpi_sim = qpsphere.simulate(radius=0,
sphere_index=1,
grid_size=qpi.shape)
# write simulation results
with qpimage.QPSeries(h5file=h5out, h5mode="a") as qps_out:
qps_out.add_qpimage(qpi=qpi_sim, identifier=simident)
# finally, update text file
if "time" in qpi:
qptime = qpi["time"]
else:
qptime = np.nan
data = {
"identifier": qpi["identifier"],
"index": n,
"radius_um": r * 1e6,
"abs_dry_mass_pg": absolute_dry_mass_sphere(
qpi=qpi,
radius=r,
center=c,
alpha=alpha,
rad_fact=rad_fact
) * 1e12,
"rel_dry_mass_pg": relative_dry_mass(
qpi=qpi,
radius=r,
center=c,
alpha=alpha,
rad_fact=rad_fact
) * 1e12,
"time": qptime,
"medium": qpi["medium index"]
}
fd.write("\t".join([str(data[k]) for k in header]) + "\r\n")
fd.flush()
if ret_changed:
return h5out, create
else:
return h5out
[docs]def absolute_dry_mass_sphere(qpi, radius, center, alpha=.18, rad_fact=1.2):
"""Compute absolute dry mass of a spherical phase object
The absolute dry mass is computed with
.. code::
m_abs = m_rel + m_sup
m_rel = lambda / (2*PI*alpha) * phi_tot * deltaA
m_sup = 4*PI / (3*alpha) * radius^3 (n_med - n_PBS)
with the vacuum wavelength ``lambda``, the total phase
retardation in the area of interest ``phi_tot``, the pixel
area ``deltaA``, the refractive index of the medium `n_med`
(stored in ``qpi.meta``), and the refractive index of phosphate
buffered saline (PBS) ``n_PBS=1.335``.
This is the *absolute* dry mass, because it takes into account
the offset caused by the suppressed density in the phase data.
Parameters
----------
qpi: qpimage.QPImage
QPI data
center: tuble (x,y)
Center of the sphere [px]
radius: float
Radius of the sphere [m]
wavelength: float
The wavelength of the light [m]
alpha: float
Refraction increment [mL/g]
rad_fact: float
Inclusion factor that scales `radius` to increase
the area used for phase summation; if the backgound
phase exhibits a noise phase signal, positive and
negative contributions cancel out and `rad_fact`
does not have an effect above a certain critical value.
Returns
-------
dry_mass: float
The absolute dry mass of the sphere [g]
"""
dm_rel = relative_dry_mass(qpi=qpi,
radius=radius,
center=center,
alpha=alpha,
rad_fact=rad_fact)
medium_index = qpi["medium index"]
dm_sup = 4 / 3 * np.pi / (alpha * 1e-6) * \
radius**3 * (medium_index - 1.335)
return dm_rel + dm_sup
[docs]def relative_dry_mass(qpi, radius, center, alpha=.18, rad_fact=1.2):
"""Compute relative dry mass of a circular area in QPI
The dry mass is computed with
m_rel = lambda / (2*PI*alpha) * phi_tot * deltaA
with the vacuum wavelength `lambda`, the total phase
retardation in the area of interest `phi_tot`, and the pixel
area `deltaA`.
This is the *relative* dry mass, because it is computed relative
to the surrounding medium (phi_tot) and not relative to water.
Parameters
----------
qpi: qpimage.QPImage
QPI data
center: tuble (x,y)
Center of the area of interest [px]
radius: float
Radius of the area of interest [m]
wavelength: float
The wavelength of the light [m]
alpha: float
Refraction increment [mL/g]
rad_fact: float
Inclusion factor that scales `radius` to increase
the area used for phase summation; if the backgound
phase exhibits a noise phase signal, positive and
negative contributions cancel out and `rad_fact`
does not have an effect above a certain critical value.
Returns
-------
dry_mass: float
The relative dry mass of the object [g]
"""
image = qpi.pha
rincl = radius / qpi["pixel size"] * rad_fact
wavelength = qpi["wavelength"]
sx, sy = qpi.shape
x = np.arange(sx).reshape(-1, 1)
y = np.arange(sy).reshape(1, -1)
discsq = ((x - center[0])**2 + (y - center[1])**2)
area = discsq < rincl**2
phi_tot = np.sum(image[area])
# compute dry mass
pxarea = qpi["pixel size"]**2
# convert alpha mL/g to m³/g
fact = 1e-6
# [kg]
dm = wavelength / (2 * np.pi * alpha * fact) * phi_tot * pxarea
return dm
[docs]def mode_for_sphere_analysis(h5in, h5out, cfgid):
"""Determine the mode for the QPSeries file for subsequent analysis
Sometimes an analysis is interrupted and the output files are
still intact. This method determines whether it is possible to
continue the analysis where left off or not.
Parameters
----------
h5in: pathlib.Path
The input QPSeries file
h5out: pathlib.Path
The output QPSeries file
cfgid: str
The configuration hash of the sphere analysis which is
part of the output QPSeries analysis
Returns
-------
create: bool
Whether the output QPSeries file is ok. This is dependent
on the following scenarios:
- True: There is no output QPSeries file, it is corrupt,
or at least one of the qpimage identifiers is not present
in the input QPSeries.
- False: Some of the input QPSeries identifiers are
present in the output QPSeries.
"""
with qpimage.QPSeries(h5file=h5in, h5mode="r") as qps_in:
# read all identifiers
idsin = [qpi["identifier"] for qpi in qps_in]
try:
with qpimage.QPSeries(h5file=h5out, h5mode="r") as qps_out:
# read all identifiers
idsout = [qpi["identifier"] for qpi in qps_out]
except (IOError, OSError):
# corrupt file
idsout = []
valid = [] # matching identifiers
bad = [] # those exist in h5out but not in h5in
for oid in idsout:
iid, iicfg = oid.rsplit(":", 3)[:2]
if iid in idsin and iicfg == cfgid:
valid.append(iid)
else:
bad.append(iid)
return bad or not valid