"""
The APyT mass spectrum fitting module
=====================================
This module enables semi-automatic fitting of high-quality mass spectra
previously processed using the :ref:`APyT mass spectrum alignment module
<apyt.spectrum.align:The APyT mass spectrum alignment module>`. It leverages the
isotope database from the |periodictable| package to support chemically
meaningful deconvolution of complex spectra.
The core functionality involves modeling the shape of individual isotope peaks,
grouping them by chemical species and charge state, and fitting their
contributions to the measured spectrum using constrained optimization.
General peak shape description
------------------------------
The general peak shape :math:`p(x)` is modeled as the product of two components:
.. math::
p(x) = a(x) \\, d(x),
where:
- :math:`a(x)` is the **activation function**, describing the rising edge of the
peak.
- :math:`d(x)` is the **decay function**, modeling the asymmetric trailing tail.
Decay function
^^^^^^^^^^^^^^
The decay component is defined as a simple exponential function:
.. math::
d(x) = \\exp(-x).
Activation function
^^^^^^^^^^^^^^^^^^^
The onset of the peak is described by an error function:
.. math::
a(x) = \\frac 1 2 \\left(
\\operatorname{erf}\\left(\\frac{\\sqrt \\pi}{2} x\\right) + 1
\\right).
The prefactor ensures that the peak position is located at :math:`x = 0`, i.e.,
:math:`p'(0) = 0`.
Implementation notes
^^^^^^^^^^^^^^^^^^^^
Fitting a complete mass spectrum requires summing over all relevant peaks, each
corresponding to a specific combination of element, isotope, and charge state.
However, natural isotope abundances are used to constrain the relative
intensities within each isotope group, reducing the number of independent
fitting parameters to one intensity per group.
To allow for more flexible peak shapes, the decay component may also be modeled
as a sum of two exponentials (a "double exponential decay"). The desired peak
model is selected by passing one of the following identifiers:
- ``error-expDecay``: Default single exponential decay with error-function
onset.
- ``error-doubleExpDecay``: Double exponential decay with error-function onset.
If custom peak shapes are needed, the internal function :func:`_peak_generic`
must be extended (note: this function is not part of the public API).
Data structure overview
-----------------------
This module primarily uses Python dictionaries for I/O interfaces, detailed
below.
Element dictionary
^^^^^^^^^^^^^^^^^^
A dictionary where each key represents an element or molecule, and the value is
a tuple containing:
- A tuple of occurring charge states.
- The nominal atomic or molecular volume (in nm³), used for subsequent
reconstruction.
Peak dictionary
^^^^^^^^^^^^^^^
A dictionary summarizing the properties of an individual peak (i.e.
element/charge state/abundance combination for a specific isotope):
- ``element``: Chemical symbol or molecular identifier.
- ``charge``: Charge state of the ion.
- ``mass_charge_ratio``: Mass-to-charge ratio (amu/e).
- ``abundance``: Isotopic abundance (as a decimal fraction).
- ``is_max``: Boolean indicating if this is the most abundant isotope.
- ``volume``: Atomic/molecular volume (nm³).
Element count dictionary
^^^^^^^^^^^^^^^^^^^^^^^^
A dictionary summarizing fitted results for each element/charge state
combination, with:
- ``element``: Chemical symbol or molecular identifier.
- ``charge``: Charge state of the ion.
- ``count``: Number of determined counts for this element/charge state
combination.
- ``fraction``: Relative fraction of counts (excluding background).
List of functions
-----------------
This module provides the following functions for spectrum fitting and
interpretation:
* :func:`check_compatibility`: Check compatibility of given version against
current module version.
* :func:`counts`: Get counts for all elements.
* :func:`enable_debug`: Enable or disable debug output.
* :func:`fit`: Fit mass spectrum.
* :func:`map_ids`: Map mass-to-charge ratios to chemical IDs.
* :func:`peaks_list`: Get list of all peaks for specified elements and
charge states.
* :func:`spectrum`: Calculate spectrum for specified list of elements.
* :func:`split_molecules`: Split molecular events into individual atoms.
* :func:`version`: Get version of this module.
.. |periodictable| raw:: html
<a href="https://periodictable.readthedocs.io/en/latest/"
target="_blank">periodic table</a>
.. sectionauthor:: Sebastian M. Eich <Sebastian.Eich@imw.uni-stuttgart.de>
.. codeauthor:: Sebastian M. Eich <Sebastian.Eich@imw.uni-stuttgart.de>
"""
#
#
#
#
__version__ = '0.3.0'
__all__ = [
'check_compatibility',
'counts',
'enable_debug',
'fit',
'map_ids',
'peaks_list',
'spectrum',
'split_molecules',
'version'
]
#
#
#
#
# import modules
import itertools
import lmfit
import numba
import numpy as np
import periodictable
import re
import warnings
#
# import some special functions/modules
from base64 import b32decode, b32encode
from packaging.version import Version
from scipy.signal import find_peaks
from scipy.special import erf
from scipy.stats import multinomial
from timeit import default_timer as timer
#
#
#
#
################################################################################
#
# private module-level variables
#
################################################################################
_abundance_thres = 1e-9
"The abundance threshold for discarding isotopes."
_is_dbg = False
"""The global flag for debug output.
This flag can be set through the :func:`enable_debug` function."""
#
#
#
#
################################################################################
#
# module-level initialization
#
################################################################################
def _add_element(name, symbol, number, table, isotopes, density):
"""
Add new element to periodic table.
"""
#
#
# create new element
element = table.core.Element(name, symbol, number, None, table)
element._density = density
#
# add isotopes to element
for isotope in isotopes:
element.add_isotope(isotope[0])
element[isotope[0]]._mass = isotope[1]
element[isotope[0]]._abundance = isotope[2]
#
#
# add new element to table (append 'X' to indicate custom element)
table.elements._element["{0:d}X".format(number)] = element
setattr(table, symbol, element)
setattr(table.elements, symbol, element)
#
#
# add gallium used by FIB (only one isotope!)
_add_element(
"gallium (FIB)", "Gx", 31, periodictable,
[(69, periodictable.Ga[69].mass, 100.0)], periodictable.Ga.density
)
#
#
#
#
################################################################################
#
# public functions
#
################################################################################
[docs]def check_compatibility(version):
"""
Check compatibility of given version against current module version.
Raises
------
Exception
An exception is raised in case of incompatibility.
"""
#
#
# identical version is always compatible
if Version(version) == Version(__version__):
return
#
#
# version 0.2.0 breaks backward compatibility due to change in analytic
# description of background
if Version(version) < Version('0.2.0'):
raise Exception(
"Version incompatibility in {0:s}: Analytic description of "
"background has been changed in version 0.2.0 (your version is "
"{1:s}). You need to re-fit the mass spectrum.".
format(__name__, version)
)
#
#
# version 0.3.0 breaks backward compatibility due to generalization of peak
# shift parameters
if Version(version) < Version('0.3.0'):
raise Exception(
"Version incompatibility in {0:s}: Peak shift has been generalized "
"in version 0.3.0 (your version is {1:s}). You need to re-fit the "
"mass spectrum.".
format(__name__, version)
)
#
#
#
#
[docs]def counts(peaks_list, function, params, data_range, bin_width,
ignore_list = [], verbose = False):
"""
Get counts for all elements.
This functions loops trough all peaks in *peaks_list* with the ``is_max``
key set to ``True`` and returns the counts for each element and charge state
combination.
Parameters
----------
peaks_list : list of dicts
The list of all occurring peaks in the mass spectrum, as described in
:ref:`peak dictionary<apyt.spectrum.fit:Peak dictionary>`.
function : str
The string identifying the general peak shape function, as described in
:ref:`implementation notes<apyt.spectrum.fit:Implementation notes>`.
params : dict
The dictionary with the fit parameter names as keys, and best-fit values
as values, as described in the |best_params| |model_result| attribute of
the |lmfit| module.
data_range : tuple
The covered data range, i.e. the minimum and maximum of the mass
spectrum. This interval is required for analytical integration of the
background counts.
bin_width : float
The histogram bin width.
Keyword Arguments
-----------------
ignore_list : list of str
The list of elements/molecules to ignore when calculating compositions.
Defaults to empty list.
verbose : bool
Whether to print the content of all element count dictionaries. Defaults
to ``False``.
Returns
-------
counts_list : list of dicts
The list of all element count dictionaries, as described in
:ref:`element count dictionary
<apyt.spectrum.fit:Element count dictionary>`.
total_counts : int
The total number of counts (without background).
background : int
The number of background counts.
.. |best_params| raw:: html
<a href="https://lmfit.github.io/lmfit-py/model.html
#lmfit.model.best_values" target="_blank">
best_values</a>
.. |model_result| raw:: html
<a href="https://lmfit.github.io/lmfit-py/model.html
#modelresult-attributes" target="_blank">
ModelResult</a>
.. |lmfit| raw:: html
<a href="https://lmfit.github.io/lmfit-py/" target="_blank">
LMFIT</a>
"""
#
#
# get unified area for count normalization
area = _peak_generic(function, params, 'count', None)
#
#
# loop through all peaks
counts_list = []
total_counts = 0.0
for peak in peaks_list:
# calculate element count only once (use peak with maximum abundance)
if peak['is_max'] == False:
continue
#
# get intensity parameter for current peak
I = params[_get_intensity_name(peak)]
#
# calculate count
count = I * area / bin_width
#
# cumulate total counts
total_counts += count
#
# append count dictionary to count list
counts_list.append({
'element': peak['element'],
'charge': peak['charge'],
'count': count,
'fraction': 0.0
})
#
# add fractions
for count in counts_list:
count['fraction'] = count['count'] / total_counts
#
#
# calculate background
background = 2.0 * params['base'] / bin_width * \
(np.sqrt(data_range[1]) - np.sqrt(data_range[0]))
#
#
# print element counts if requested
if verbose == True:
print("Total number of counts (without background) is {0:.0f}:".
format(total_counts))
print("Number of background counts is {0:.0f} ({1:.1f}%).\n".format(
background, background / (total_counts + background) * 100))
print("element\t\tcharge\t count\tfraction")
print("--------" * 5 + "--------")
for count in counts_list:
print("{0:12s}\t{1:d}\t{2:8.0f}\t{3:.4f}".
format(*count.values()))
print("========" * 5 + "========")
print("\t\ttotal\t{0:8.0f}\n".format(total_counts))
#
#
# combine counts for different charge states for individual elements
print("element\t\t count\tfraction")
print("--------" * 4 + "--------")
element_count = 0
element = counts_list[0]['element']
for count in counts_list:
if count['element'] == element:
element_count += count['count']
else:
# print previous element and reset
print("{0:12s}\t{1:8.0f}\t{2:.4f}".format(
element, element_count, element_count / total_counts))
element_count = count['count']
element = count['element']
# print last element
print("{0:12s}\t{1:8.0f}\t{2:.4f}".
format(element, element_count, element_count / total_counts))
print("========" * 4 + "========")
print("total\t\t{0:8.0f}\n".format(total_counts))
#
#
# break down molecules into individual elements
count_dict = {}
is_molecule = False
for count in counts_list:
# check whether element is in ignore list
if count['element'] in ignore_list:
if _is_dbg:
print("Ignoring element \"{0:s}\".".
format(count['element']))
continue
#
#
# get molecule items
molecule_items = \
periodictable.formula(count['element']).atoms.items()
#
# check whether multiple elements present
if len(molecule_items) > 1:
is_molecule = True
#
# loop through elements in molecule
for element, element_count in molecule_items:
# check whether multiple atoms are present for individual
# element
if element_count > 1:
is_molecule = True
#
# create new dictionary key-value pair if element not found,
# increment otherwise
if element.symbol not in count_dict:
count_dict[element.symbol] = element_count * count['count']
else:
count_dict[element.symbol] += element_count * count['count']
#
# only print if molecule found or ignore list active
if is_molecule == True or len(ignore_list) > 0:
print("element\t count\tfraction")
print("--------" * 3 + "--------")
for element, element_count in count_dict.items():
print("{0:s}\t{1:9.0f}\t{2:.4f}".
format(element, element_count,
element_count / sum(count_dict.values())))
print("========" * 3 + "========")
print("total\t{0:9.0f}\n".format(sum(count_dict.values())))
#
#
# return element counts, total counts, and background counts
return counts_list, total_counts, background
#
#
#
#
[docs]def enable_debug(is_dbg):
"""Enable or disable debug output.
Parameters
----------
is_dbg : bool
Whether to enable or disable debug output.
"""
#
#
global _is_dbg
_is_dbg = is_dbg
#
#
#
#
[docs]def fit(spectrum, peaks_list, function, verbose = False, **kwargs):
"""
Fit mass spectrum.
This function internally uses the |lmfit| module to fit the complete mass
spectrum, where the peak positions are provided by the *peaks_list*
argument.
Parameters
----------
spectrum : ndarray, shape (n, 2)
The mass spectrum histogram data.
peaks_list : list of dicts
The list of all occurring peaks in the mass spectrum, as described in
:ref:`peak dictionary<apyt.spectrum.fit:Peak dictionary>`.
function : str
The string identifying the general peak shape function, as described in
:ref:`implementation notes<apyt.spectrum.fit:Implementation notes>`.
Keyword Arguments
-----------------
peak_scale : list
Whether to apply individual width scaling to selected peaks. Each scale
parameter represents a scaling factor and it is applied to all peaks
that match a corresponding regular expression from the provided list.
Each regular expression defines one unique scale parameter.
peak_shift : list
Whether to apply additional shifts to selected peaks. Each shift
parameter represents an absolute shift that scales with the square root
of the peak position. It is applied to all peaks that match a
corresponding regular expression from the provided list. Each regular
expression defines one unique shift parameter. A systematic shift of
pure oxygen was first discovered in the vanadium pentoxide measurements
of Simone Bauder in 2024.
scale_width : bool
Whether to use a varying parameter for the peak width scaling.
Theoretically, the peak width in the mass-to-charge scale is expected to
be proportional to the square root of the peak position, but may show
different behavior. The parameter is implemented as an exponent.
Defaults to ``False``, i.e. assume square root behavior, which resembles
a fixed exponent value of ``0.5``.
verbose : bool
Whether to print the fit results and statistics. Defaults to
``False``.
Returns
-------
result : ModelResult
The result of the fit, as described by |model_result| of the |lmfit|
module.
"""
#
#
# check mass spectrum for valid range (strictly positive)
_check_spectrum_range(spectrum[:, 0])
#
#
#
#
# define fit model
start = timer()
model = lmfit.Model(_model_spectrum, independent_vars = ["x"])
#
# estimate fit parameters
parameters = _estimate_fit_parameters(
spectrum, peaks_list, function, **kwargs
)
#
#
# perform fit and print results
print("Fitting mass spectrum using \"{0:s}\"…".format(function))
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message = "The keyword argument (function|peaks_list) does not " +
"match any arguments of the model function. It will be ignored.",
category = UserWarning)
result = model.fit(spectrum[:, 1], parameters, x = spectrum[:, 0],
peaks_list = peaks_list, function = function)
if verbose == True:
print(result.fit_report(show_correl = False))
print("Fitting of mass spectrum took {0:.3f}s.".format(timer() - start))
print("")
#
#
# return fit results
return result
#
#
#
#
[docs]def map_ids(mc_ratio, r, x, peaks_list, function, params, group_charge = True,
verbose = False):
"""
Map mass-to-charge ratios to chemical IDs.
This function calculates a probability vector which contains the
probabilities to find a specific element (with individual charge state if
requested) associated with the given peaks in *peaks_list* (including
background) at every position *x* in the mass-to-charge spectrum. These
probabilities are eventually used for peak de-convolution and background
subtraction to assign the corresponding chemical IDs. The atomic volumes of
the events are also mapped and returned.
Parameters
----------
mc_ratio : ndarray, shape (n,)
The mass-to-charge ratios of the *n* events.
r : ndarray, shape (n,)
The *n* random numbers (between ``0.0`` and ``1.0``) required for
mapping the probability vector to an actual chemical ID.
x : ndarray, shape (m,)
The positions at which the probability vectors are calculated. The
elements of the array must be equidistant and would typically represent
the histogram bin centers.
peaks_list : list of dicts
The list of all occurring peaks in the mass spectrum, as described in
:ref:`peak dictionary<apyt.spectrum.fit:Peak dictionary>`.
function : str
The string identifying the general peak shape function, as described in
:ref:`implementation notes<apyt.spectrum.fit:Implementation notes>`.
params : dict
The dictionary with the fit parameter names as keys, and best-fit values
as values, as described in the |best_params| |model_result| attribute of
the |lmfit| module.
group_charge : bool
Whether to group all charge states of an individual element. Defaults to
``True``.
verbose : bool
Whether to print a list of all chemical IDs and their fractions in
relation to the total counts (with background). Defaults to ``False``.
Returns
-------
ids : ndarray, shape (n,)
The chemical IDs of the *n* events.
Ω : ndarray, shape (n,)
The atomic volumes of the *n* events.
"""
#
#
@numba.njit("i2[:](f4[:], f8[:,:], f8[:])", cache = True, parallel = True)
def _get_ids(bin_ids, p, r):
"""
Simple helper function which performs the actual chemical mapping from
the probability vectors to an ID.
The probability vectors are linearly interpolated based on the
fractional bin ID of each event in the mass spectrum.
"""
#
#
# get integral and fractional part required for interpolation
integ = np.floor(bin_ids)
frac = bin_ids - integ
integ = integ.astype(np.int32)
#
#
# loop through all events
ids = np.empty(len(bin_ids), dtype = np.int16)
for i in numba.prange(len(bin_ids)):
# interpolate probability vector
p_i = (1.0 - frac[i]) * p[integ[i]] + frac[i] * p[integ[i] + 1]
#
# map probability vector to chemical ID
ids[i] = np.searchsorted(p_i, r[i])
#
#
# return chemical IDs
return ids
#
#
#
#
# check mass spectrum for valid range (strictly positive)
_check_spectrum_range(x)
#
#
#
#
# start timer
print("Performing chemical mapping…")
start = timer()
#
#
# add one grid point at either end to allow interpolation at margin
Δx = (x[-1] - x[0]) / (len(x) - 1)
x = np.concatenate(([x[0] - Δx], x, [x[-1] + Δx]))
#
#
# group peaks for the same element (and charge state if requested)
peak_groups = _group_peaks(peaks_list, group_charge)
#
#
# set background counts (always *first* entry)
p_vec = np.zeros((len(x), len(peak_groups) + 1))
p_vec[:, 0] = params['base'] / np.sqrt(x)
#
# loop through peak groups to calculate probability vectors (one vector for
# each x-position containing the probabilities for every peak group
for i in range(len(peak_groups)):
p_vec[:, i + 1] = spectrum(x, peak_groups[i], function, params)
#
#
# normalize probability vector
p_vec /= np.sum(p_vec, axis = 1)[:, np.newaxis]
#
# calculate cumulated probability vector
p_vec = np.cumsum(p_vec, axis = 1)
#
#
# get chemical IDs of each event
# (map mass-to-charge ratios to fractional bin IDs)
ids = _get_ids((mc_ratio - x[0]) / Δx, p_vec, r)
#
#
# print counts of all IDs if requested
if verbose == True:
# get counts for all IDs
counts = np.bincount(ids, minlength = p_vec.shape[1])
#
#
print("\nid\telement\t\tcharge\t count\tfraction\tvolume")
print("--------" * 8 + "--------")
#
# background counts (always first entry)
print("{0:3d}\tBackground\t\t{1:8d}\t{2:.4f}".
format(0, counts[0], counts[0] / len(mc_ratio)))
#
# loop through all peaks
for i, group in zip(range(1, len(peak_groups) + 1), peak_groups):
print(
"{0:3d}\t{1:12s}\t{2:d}\t{3:8d}\t{4:.4f}\t\t{5:.6f}".
format(
i, group[0]['element'],
0 if group_charge == True else group[0]['charge'],
counts[i], counts[i] / len(ids), group[0]['volume']
)
)
#
print("========" * 8 + "========")
print("\t\t\ttotal\t{0:8d}\n".format(len(mc_ratio)))
#
#
# map atomic volumes (prepend zero volume for background events)
Ω = np.append(0.0, [group[0]['volume'] for group in peak_groups])[ids]
#
#
# return chemical IDs and atomic volumes
print("Chemical mapping took {0:.3f}s.".format(timer() - start))
return ids, Ω
#
#
#
#
[docs]def peaks_list(element_dict, mode = 'mass', mass_decimals = 3, verbose = False):
"""
Get list of all peaks for specified elements and charge states.
Parameters
----------
element_dict : dict
The dictionary containing the elements and their charge states, as
described in
:ref:`element dictionary<apyt.spectrum.fit:Element dictionary>`.
Keyword Arguments
-----------------
mode : str
Which mode to use for the calculation of the mass-to-charge ratios. In
``mass`` mode (recommended), the actual isotopic masses are used, while
in ``isotope`` mode, the mass numbers of the isotopes are used. Defaults
to ``mass``.
mass_decimals : int
The number of decimal places for the mass-to-charge ratios in ``mass``
mode should be limited to reduce the number of molecular isotopes. This
setting effectively groups isotopes whose masses do not differ by more
than the specified *mass_decimals*. This value should be set based on
the resolution of the mass spectrum. Defaults to ``3``, i.e. group
isotopes whose masses do not differ by more than ``0.001``.
verbose : bool
Whether to print the content of all determined peak dictionaries.
Defaults to ``False``.
Returns
-------
peaks_list : list of dicts
The list of all occurring peaks in the mass spectrum, as described in
:ref:`peak dictionary<apyt.spectrum.fit:Peak dictionary>`.
"""
#
#
# check for valid mode
if not (mode == 'mass' or mode == 'isotope'):
raise Exception("Unknown mode \"{0:s}\".".format(mode))
#
#
# loop through elements (element properties are passed as a tuple containing
# the charge states and the volume for reconstruction)
print("Using \"{0:s}\" mode for calculation of mass-to-charge ratios.\n".
format(mode))
peaks_dict = {}
for element, (charge_states, volume) in element_dict.items():
# check whether charge states contains only one element
if type(charge_states) is not tuple:
charge_states = (charge_states, )
#
#
# check for possible molecule
counts = list(periodictable.formula(element).atoms.values())
if len(counts) > 1 or max(counts) > 1:
isotopes = _get_molecular_isotopes_list(
element, mode = mode, mass_decimals = mass_decimals
)
else:
isotopes = periodictable.elements.symbol(element)
#
#
# loop through charge states
peaks_dict[element] = {}
for q in tuple(charge_states):
# get isotope with maximum abundance
abundance_max = 0.0
for isotope in isotopes:
if isotope.abundance > abundance_max:
abundance_max = isotope.abundance
isotope_max = isotope
#
#
# loop through isotopes
peaks_dict[element][str(q)] = []
for isotope in isotopes:
# skip invalid isotope
if isotope.abundance <= _abundance_thres * 100:
continue
#
# append peak dictionary to peak list
peak_dict = {
'element': element,
'charge': q,
# set mass-to-charge ratio according to specified mode
# either from actual isotopic mass ("mass") or from mass
# number ("isotope")
'mass_charge_ratio': getattr(isotope, mode) / q,
'abundance': isotope.abundance / 100,
'is_max': False,
'volume': volume
}
if isotope == isotope_max:
peak_dict['is_max'] = True
peaks_dict[element][str(q)].append(peak_dict)
#
#
# loop through all elements and charge states and create flat peak list
peaks_list = []
for element in peaks_dict.values():
for charge_states_peaks_list in element.values():
peaks_list.extend(charge_states_peaks_list)
#
#
#
#
# print all peaks if requested
if verbose == True:
print("Total number of peaks is {0:d}:".format(len(peaks_list)))
print('\t'.join(peaks_list[0].keys()))
print("--------" * 8 + "--------")
#
# sort peaks by mass-to-charge ratio
peaks_list_sorted = sorted(
peaks_list, key = lambda peak: peak['mass_charge_ratio']
)
for peak in peaks_list_sorted:
print("{0:s}\t{1:d}\t{2:.3f}\t\t\t{3:.6f}\t{4!r}\t{5:.6f}".
format(*peak.values()))
print("")
#
#
# return peak list and nested peaks dictionary
return peaks_list, peaks_dict
#
#
#
#
[docs]def spectrum(x, peaks_list, function, params, elements_list = None):
"""
Calculate spectrum for specified list of elements.
Parameters
----------
x : ndarray or scalar
The position(s) where to evaluate the mass spectrum.
peaks_list : list of dicts
The list of all occurring peaks in the mass spectrum, as described in
:ref:`peak dictionary<apyt.spectrum.fit:Peak dictionary>`.
function : str
The string identifying the general peak shape function, as described in
:ref:`implementation notes<apyt.spectrum.fit:Implementation notes>`.
params : dict
The dictionary with the fit parameter names as keys, and best-fit values
as values, as described in the |best_params| |model_result| attribute of
the |lmfit| module.
Keyword Arguments
-----------------
elements_list : list of str
The list specifying for which elements the mass spectrum should be
evaluated. Defaults to ``None``, indicating to use all elements
occurring in *peaks_list*. Note that the background is not included.
Returns
-------
y : ndarray or scalar
The cumulated spectrum value at position *x* for the list of provided
elements. This is a scalar if *x* is a scalar.
"""
#
#
# cumulate all peak contributions for specified elements
y = 0.0
for peak in peaks_list:
if elements_list is None or \
_get_intensity_name(peak).split('_')[1] in elements_list:
y += _peak_generic(function, params, 'eval', (x, peak))
#
#
return y
#
#
#
#
[docs]def split_molecules(ids, xyz, peaks_list,
group_charge = True, shuffle = False, verbose = False):
"""
Split molecular events into individual atoms.
This function examines all events for potential molecules and decomposes
them into their constituent atoms. The chemical IDs are remapped to unique
elements in alphabetical order, and the three-dimensional coordinates of
each molecule are assigned to all its individual atoms. The optional
argument *shuffle* allows for random reordering of the atoms within a
molecule.
Parameters
----------
ids : ndarray, shape (n,)
The chemical IDs of the *n* events.
xyz : ndarray, shape (n, 3)
The three-dimensional reconstructed positions of the *n* events.
peaks_list : list of dicts
The list of all occurring peaks in the mass spectrum, as described in
:ref:`peak dictionary<apyt.spectrum.fit:Peak dictionary>`.
group_charge : bool
Whether to group all charge states of an individual element. Defaults to
``True``.
shuffle : bool
Whether to randomly shuffle the order of the atoms after splitting.
Defaults to ``False``.
verbose : bool
Whether to print a list of all chemical IDs and their fractions in
relation to the total counts (with background). Defaults to ``False``.
Returns
-------
ids_split : ndarray, shape (m,)
The chemical IDs of the *m* events after splitting.
xyz_split : ndarray, shape (m, 3)
The three-dimensional reconstructed positions of the *m* events after
splitting.
"""
#
#
# start timer
start = timer()
print("Splitting molecules…")
#
#
#
#
# get all molecule names from peak groups
molecules = [
group[0]['element']
for group in _group_peaks(peaks_list, group_charge)
]
#
# get counts of all occurring molecular IDs (including background)
molecule_counts = np.bincount(ids, minlength = len(molecules) + 1)
#
#
# break down all molecules into list of individual elements
# (with possible repetition)
elements = [
element.symbol
for molecule in molecules
for element in periodictable.formula(molecule).atoms.keys()
]
#
# convert unique, sorted elements into PeriodicTable element objects
elements = [
periodictable.elements.symbol(element)
for element in sorted(set(elements))
]
if verbose == True:
print("\tList of unique elements is:", elements)
#
#
# calculate total number of individual atoms *after* splitting
# (including background)
count_tot = molecule_counts[0]
for molecule, count in zip(molecules, molecule_counts[1:]):
count_tot += \
count * np.sum(list(periodictable.formula(molecule).atoms.values()))
#
#
# initialize arrays for split events (infer datatype from original arrays)
ids_split = np.empty(count_tot, dtype = ids.dtype)
xyz_split = np.empty((count_tot, xyz.shape[1]), dtype = xyz.dtype)
#
#
#
#
# set background counts (id = 0)
offset = molecule_counts[0]
ids_split[0:offset] = 0
xyz_split[0:offset] = xyz[(ids == 0)]
#
#
# loop through all (unique) elements
for element, element_id in zip(elements, range(1, len(elements) + 1)):
if verbose == True:
print("\tProcessing element '{0:s}' with new id {1:d}…".
format(element.symbol, element_id))
# loop through all molecules
for molecule, molecule_id, molecule_count in zip(
molecules, range(1, len(molecules) + 1), molecule_counts[1:]):
# test whether molecule contains element
if element in periodictable.formula(molecule).atoms.keys():
# set element count
element_count = periodictable.formula(molecule).atoms[element]
if verbose == True:
print(
"\t\tFound in molecule {0:>8s} with original id {1:2d} "
"({2:d} x {3:8d} counts).".
format("'" + molecule + "'", molecule_id, element_count,
molecule_count)
)
#
#
# remap element ID and assign molecule position to its
# individual atoms
sl = np.index_exp[offset:offset+molecule_count*element_count]
ids_split[sl] = element_id
xyz_split[sl] = np.repeat(
xyz[(ids == molecule_id)], element_count, axis = 0
)
#
#
# increment offset position
offset += molecule_count * element_count
#
#
#
#
# randomize order if requested
if shuffle == True:
print("Randomizing order…")
#
# initialize random number generator
rng = np.random.default_rng(seed = 0)
#
# get random permutation
permutation = rng.permutation(len(ids_split))
#
# shuffle IDs and corresponding positions
ids_split = np.take(ids_split, permutation, axis = 0)
xyz_split = np.take(xyz_split, permutation, axis = 0)
#
#
#
#
# print counts of all IDs if requested
if verbose == True:
# get counts for all IDs (including background)
counts = np.bincount(ids_split, minlength = len(elements) + 1)
#
#
print("\nid\telement\t\t count\tfraction")
print("--------" * 5 + "--------")
#
# background counts (always first entry)
print("{0:2d}\tBackground\t{1:8d}\t{2:.4f}".format(
0, counts[0], counts[0] / count_tot
))
#
# loop through all elements
for i, element in zip(range(1, len(elements) + 1), elements):
print("{0:2d}\t{1:12s}\t{2:8d}\t{3:.4f}".format(
i, element.symbol, counts[i], counts[i] / count_tot
))
#
print("========" * 5 + "========")
print("\t\ttotal\t{0:8d}\n".format(count_tot))
#
#
#
#
print(
"Splitting of {0:d} molecules into {1:d} atoms took {2:.3f}s.".
format(len(ids), count_tot, timer() - start)
)
return ids_split, xyz_split
#
#
#
#
[docs]def version():
"""
Get version of this module.
Returns
-------
version : str
The version string of this module.
"""
#
return __version__
#
#
#
#
################################################################################
#
# private module-level functions
#
################################################################################
def _activation(x):
"""
Activation function
The slope at the inflection point (0, 1/2) is identical to 1/2, which can
be considered as a measure of the transition width.
"""
#
#
return 0.5 * (erf(0.5 * np.sqrt(np.pi) * x) + 1.0)
#
#
#
#
def _base32_decode(s):
"""
Simple helper function for base32-decoding a string without padding.
"""
#
#
# re-add padding (length must be divisible by 8)
s += '=' * ((8 - len(s) % 8) % 8)
#
# return decoded string
return b32decode(s.encode('utf8')).decode('utf-8')
#
#
#
#
def _base32_encode(s):
"""
Simple helper function for base32-encoding a string without padding.
"""
#
#
# return encoded string (without padding)
return b32encode(s.encode('utf-8')).decode('utf8').rstrip('=')
#
#
#
#
def _decay(x):
"""
Exponential decay
The decay constant, at which the function value is reduced to 1/e, is
defined to be unity. The caller needs to make sure that the exponential
growth is limited in negative direction to avoid overflow.
"""
#
#
return np.exp(-x)
#
#
#
#
def _check_spectrum_range(spectrum_range):
"""
Simple helper to check strictly positive mass spectrum range.
"""
#
#
if spectrum_range.min() <= 0.0:
raise Exception(
"Mass spectrum range must be strictly positive due to divergence "
"of background. (Minimum is {0:.3f} amu/e.)".
format(spectrum_range.min())
)
#
#
#
#
def _estimate_fit_parameters(data, peaks_list, function, **kwargs):
"""
Estimate initial fit parameters.
"""
#
#
# initialize fit parameters object
params = lmfit.Parameters()
#
#
#
#
# add peak width scaling parameter (implemented as an exponent; defaults to
# square root behavior, i.e. γ = 0.5)
params.add(
'γ', value = 0.5, min = 0.0, max = 2.0,
vary = kwargs.get('scale_width', False)
)
if kwargs.get('scale_width', False) == True:
print("Using peak width scaling γ.")
#
#
# estimate general peaks shape parameters and p(0), i.e. peak shape function
# value at 0
params, p_0 = _peak_generic(function, params, 'estimate', (data))
#
#
#
#
# estimate baseline
peaks, props = find_peaks(
data[:, 1],
distance = np.iinfo(np.int32).max,
width = np.finfo(np.float32).eps, rel_height = 1.0)
base = props['width_heights'][0] * np.sqrt(data[:, 0][peaks[0]])
print("Estimated background at 1.0 amu/e is {0:.1f}.".format(base))
#
# add baseline parameter
params.add('base', value = base)
#
#
#
#
# estimate peak intensities
for peak in peaks_list:
# only consider peak with maximum abundance
if peak['is_max'] == False:
continue
#
#
# select peak region
data_sel = data[
(peak['mass_charge_ratio'] - 0.25 <= data[:, 0]) &
(data[:, 0] <= peak['mass_charge_ratio'] + 0.25)
]
#
# find (maximum) peak in selected region
_, props = find_peaks(
data_sel[:, 1],
distance = np.iinfo(np.int32).max, height = 0.0)
#
# check for correctly identified peak
if len(props['peak_heights']) == 0:
raise Exception(
"No peak found for element '{0:s}' with maximum at "
"{1:.3f} amu/e. You may want to check the alignment of the "
"mass spectrum.".
format(peak['element'], peak['mass_charge_ratio'])
)
#
#
# estimate intensity (for entire isotopic peak group at hypothetical
# position at unity)
I = props['peak_heights'][0] / p_0 / peak['abundance'] * \
peak['mass_charge_ratio']
params.add(_get_intensity_name(peak), value = I, min = 0.0)
#
#
#
#
# add peak scaling parameter(s); peak groups for scaling are selected by
# matching their names against a regular expression which is base32-encoded
# in the parameter name; multiple parameters with individual patterns may be
# defined here
if 'peak_scale' in kwargs:
for reg_ex in kwargs.get('peak_scale'):
params.add(
'λ_' + _base32_encode(reg_ex),
value = 1.0, min = 0.1, max = 10.0,
)
params_matched = [
p for p in params.keys()
if re.fullmatch("^I_" + reg_ex + "$", p) is not None
]
print(
"Using generalized scaling parameter \"{0:s}\" for intensity "
"parameters {1:s} matching regular expression '{2:s}'.".format(
'λ_' + _base32_encode(reg_ex), ', '.join(params_matched),
reg_ex
)
)
#
#
# add peak shift parameter(s) (implemented as an absolute shift which scales
# with the square root of the peak position); peak groups for shifting are
# selected by matching their names against a regular expression which is
# base32-encoded in the parameter name; multiple parameters with individual
# patterns may be defined here
if 'peak_shift' in kwargs:
for reg_ex in kwargs.get('peak_shift'):
params.add(
'Δ_' + _base32_encode(reg_ex),
value = 0.0, min = -0.1, max = 0.1,
)
params_matched = [
p for p in params.keys()
if re.fullmatch("^I_" + reg_ex + "$", p) is not None
]
print(
"Using generalized shift parameter \"{0:s}\" for intensity "
"parameters {1:s} matching regular expression '{2:s}'.".format(
'Δ_' + _base32_encode(reg_ex), ', '.join(params_matched),
reg_ex
)
)
#
#
#
#
# return fit parameters object
print("")
return params
#
#
#
#
def _get_intensity_name(peak):
"""
Simple wrapper to obtain intensity parameter name for element/charge state
associated with specified peak.
"""
#
#
return "I_{0:s}_{1:d}".format(peak['element'], peak['charge'])
#
#
#
#
def _get_molecular_isotopes_list(molecule, mode = 'mass', mass_decimals = 3):
"""
Return (artificial) Element object with all isotopes for specified molecule.
*mode* determines which mode should be used when the mass-to-charge ratio is
calculated for the molecular isotopes. ``mass`` (the default) uses the
actual mass of the molecular isotope, while ``isotope`` uses the mass number
of the molecular isotope. The number of decimal places for the
mass-to-charge ratio in ``mass`` mode should be limited to reduce the number
of molecular isotopes. This setting effectively groups isotopes whose masses
do not differ by more than the specified *mass_decimals*.
"""
#
#
print("Determining isotope combinations for molecule \"{0:s}\"…".
format(molecule))
#
#
# get molecular constituents and their isotopes
atomic_number = 0
mass_numbers_list = []
for element, element_count in periodictable.formula(molecule).atoms.items():
# cumulate (artificial) atomic number
atomic_number += element_count * element.number
#
#
# get list of isotopes with non-zero abundance for current element
isotopes_list, masses, abundances = _get_nonzero_isotopes(element)
#
#
# calculate all possible isotope combinations (Cartesian product) (from
# zero to element_count for each individual isotope which may form the
# molecule for the current element); each element of the resulting list
# is an n-tuple, where n corresponds to the number of (non-zero)
# isotopes and the elements of the tuple denote the counts of the
# respective isotopes; note that by definition, not all isotope
# combinations in the Cartesian product add up to element_count
isotope_combinations = itertools.product(
range(element_count + 1), repeat = len(isotopes_list)
)
# only use isotope combinations where all counts add up to total element
# count
isotope_combinations = \
[ic for ic in isotope_combinations if sum(ic) == element_count]
#
#
# calculate probabilities and mass numbers for all valid isotope
# combinations
mass_numbers_list_element = []
for ic in isotope_combinations:
# calculate probability for current isotope combination according to
# multinomial distribution and store together with mass number and
# mass as dictionary
mass_numbers_list_element.append({
'mass_number': np.sum(
np.asarray(isotopes_list) * np.asarray(ic)
),
# round mass to mass_decimals decimals to allow grouping below
'mass': np.round(
np.sum(masses * np.asarray(ic)), decimals = mass_decimals
),
'probability': multinomial.pmf(
ic, element_count, p = abundances / 100
)
})
# sum total probability
p_tot = sum([mn['probability'] for mn in mass_numbers_list_element])
#
#
# print debug output if requested
if _is_dbg == True:
print("\nIsotope combinations for {0:s} ({1:d} atom(s) in "
"molecule):".format(element.name, element_count))
for isotope in isotopes_list:
print("#{0:s}\t".format(element[isotope].__repr__()), end = '')
print("mass number\tmass\t\tprobability\n" +
"--------" * len(isotopes_list) +
"-------------------------------------------")
#
for ic in zip(isotope_combinations, mass_numbers_list_element):
# skip combinations with negligible contribution
if ic[1]['probability'] < _abundance_thres:
continue
#
print(("{:d}\t" * len(isotopes_list)).format(*ic[0]), end = '')
print("{0:d}\t\t{1:07.3f}\t\t{2:.9f}".format(*ic[1].values()))
print("========" * len(isotopes_list) +
"===========================================")
print("\t" * len(isotopes_list) + "\t\t\ttotal\t{0:.9f}".
format(p_tot))
#
#
# test whether all valid isotope combinations add up to 100%
if abs(p_tot - 1.0) > np.finfo(np.float32).eps:
raise Exception("Total probability ({0:.6f}) for \"{1:s}\" differs "
"from unity.".format(p_tot, element.name))
#
#
# append mass numbers list for current element to overall mass numbers
# list
mass_numbers_list.append(mass_numbers_list_element)
#
#
# loop through all element combinations and calculate combined probability
# for molecule
mass_numbers_list_molecule = []
for mn_tuple in itertools.product(*mass_numbers_list):
mass_numbers_list_molecule.append({
# use 'isotope' key in conformation with periodictable module
'isotope': np.sum( [mn['mass_number'] for mn in mn_tuple]),
'mass': np.sum( [mn['mass'] for mn in mn_tuple]),
'abundance': np.prod([mn['probability'] for mn in mn_tuple])
})
#
#
# create new Element object
artificial_element = periodictable.core.Element(
molecule, molecule, atomic_number, None, periodictable
)
if _is_dbg == True: print("")
print("Artificial atomic number for \"{0:s}\" is {1:d}.".
format(artificial_element.name, artificial_element.number))
#
#
# add all possible isotopes to the artificial element
for mn in mass_numbers_list_molecule:
artificial_element.add_isotope(mn[mode])
#
#
# initialize all abundances to zero
for isotope in artificial_element:
isotope._mass = 0.0
isotope._abundance = 0.0
#
#
# cumulate abundances for every molecular isotope from mass numbers list
# (multiple mass numbers may correspond to the same isotope number)
for mn in mass_numbers_list_molecule:
# in isotope mode, the mass would need to be weighted accordingly;
# simply invalidate here
if mode == 'mass':
artificial_element[mn[mode]]._mass = mn['mass']
else:
artificial_element[mn[mode]]._mass = np.nan
#
# cumluate abundance
artificial_element[mn[mode]]._abundance += mn['abundance'] * 100
#
# mass number is only needed in debug mode
if _is_dbg == True:
artificial_element[mn[mode]]._mass_number = mn['isotope']
#
#
# sum total abundance
total_abundance = 0.0
for isotope in artificial_element:
total_abundance += isotope.abundance
#
#
# print debug output if requested
if _is_dbg == True:
print("\nIsotope combinations for molecule \"{0:s}\":".format(molecule))
print("mass number\tmass\t\tprobability")
print("-------------------------------------------")
for isotope in artificial_element:
if isotope.abundance >= _abundance_thres * 100:
spec = '{1:07.3f}' if mode == 'mass' else '{1:f}'
mass = isotope.mass if mode == 'mass' else np.nan
print(("{0:d}\t\t" + spec + "\t\t{2:.9f}").format(
isotope._mass_number, mass, isotope.abundance / 100)
)
#
# remove _mass_number attribute (only needed in debug mode)
delattr(isotope, "_mass_number")
#
#
print("===========================================")
print("\t\t\ttotal\t{0:.9f}\n".format(total_abundance / 100))
#
#
# test whether all isotopes add up to 100%
if abs(total_abundance - 100.0) > np.finfo(np.float32).eps:
raise Exception("Total abundance ({0:.6f}) for molecule \"{1:s}\" "
"differs from unity.".
format(total_abundance / 100, molecule))
#
#
# return artificial element
print("")
return artificial_element
#
#
#
#
def _get_nonzero_isotopes(element):
"""
Return list of isotopes with non-zero abundances.
"""
#
#
# loop through isotopes with non-zero abundances
isotopes_list = []
masses = []
abundances = []
for isotope in element.isotopes:
if element[isotope].abundance > 0.0:
isotopes_list.append(isotope)
masses.append(element[isotope].mass)
abundances.append(element[isotope].abundance)
#
#
# return list of isotopes, their masses, and their abundances
return isotopes_list, np.asarray(masses), np.asarray(abundances)
#
#
#
#
def _group_peaks(peaks_list, group_charge):
"""
Group all peaks (isotopes) belonging to the same element (and charge state
if requested).
Parameters
----------
peaks_list : list of dicts
The list of all occurring peaks in the mass spectrum, as described in
:ref:`peak dictionary<apyt.spectrum.fit:Peak dictionary>`.
group_charge : bool
Whether to group all charge states of an individual element.
Returns
-------
peak_groups : list of lists
The list of all peak groups, where each group is a list of the
corresponding peaks (isotopes) of the same element (and charge state).
"""
#
#
# loop through all peaks for grouping
peak_groups = []
peaks = []
element = peaks_list[0]['element']
charge_state = peaks_list[0]['charge']
for peak in peaks_list:
# if peak belongs to same element (and charge state), append to current
# peak group
if peak['element'] == element and (
peak['charge'] == charge_state or group_charge == True
):
peaks.append(peak)
# otherwise continue with next element (and charge state)
else:
# append previous peak group and start new peak group
peak_groups.append(peaks)
peaks = [peak]
#
# reset element and charge state
element = peak['element']
charge_state = peak['charge']
#
# append last peak group
peak_groups.append(peaks)
#
#
# return peak groups
return peak_groups
#
#
#
#
def _model_spectrum(x, peaks_list = None, function = None, **params):
"""
Model function to describe complete mass spectrum.
"""
#
#
# cumulate all peak contributions
result = 0.0
for peak in peaks_list:
result += _peak_generic(function, params, 'eval', (x, peak))
#
#
# return function value
return result + params['base'] / np.sqrt(x)
#
#
#
#
def _peak_generic(function, params, mode, arg_tuple):
"""
Generic handle for peak shape function.
This function is intended to serve as a generic wrapper to handle all peak
shape functions defined below, as given by the *function* argument. This
method is called from several other functions with a specified *mode* to
indicate whether the initial peak shape parameters should be estimated
(``estimate``), or whether the spectrum should be evaluated (``eval``), or
whether the internal intensity parameter should be converted to the actual
counts (``count``). New peak shape functions need to be implemented only
here.
"""
#
#
def get_peak_range(x, x0, Δmax):
"""
Small helper function to determine the evaluation range around a peak
position, returned as a slice object for the input array.
Δmax can be either a list or tuple to determine individual ranges to the
left and right, respectively, or a scalar where the the same range is
used on both sides. Note that all ranges are positive.
"""
#
#
# convert scalar to tuple with identical ranges on both sides
if not isinstance(Δmax, (list, tuple)):
Δmax = (Δmax, Δmax)
#
#
# if input is a scalar, simply determine whether its value is within
# evaluation range
if np.isscalar(x):
return -Δmax[0] <= x - x0 and x - x0 <= Δmax[1]
#
#
# calculate separation between data points
Δx = (x[-1] - x[0]) / (len(x) - 1)
#
# get nearest index of corresponding peak position
i0 = int(np.rint((x0 - x[0]) / Δx))
#
# calculate index increments to be within valid evaluation range
Δi = (int(Δmax[0] / Δx) + 1, int(Δmax[1] / Δx) + 1)
#
# return array slicing object
return np.s_[max(0, i0 - Δi[0]) : min(len(x) - 1, i0 + Δi[1] + 1)]
#
#
def peak_scale(peak_name, params):
"""
Small helper function to apply an additional width scaling based on the
peak name.
"""
#
#
# get all parameters which are associated with a width scaling, i.e.
# starting with "λ_"
scale_params = [p for p in params.keys() if p[0:2] == "λ_"]
#
#
# loop through all peak scale parameters
for p in scale_params:
# match peak name against the base32-encoded pattern in the scaling
# parameter name
if re.fullmatch("^I_" + _base32_decode(p[2:]) + "$", peak_name) \
is not None:
# return scaling parameter
return params[p]
# no scaling for all other peaks
return 1.0
#
#
def peak_shift(peak_name, params, x0):
"""
Small helper function to apply an additional specific shift based on the
peak name.
"""
#
#
# get all parameters which are associated with a peak shift, i.e.
# starting with "Δ_"
shift_params = [p for p in params.keys() if p[0:2] == "Δ_"]
#
#
# loop through all peak shift parameters
for p in shift_params:
# match peak name against the base32-encoded pattern in the shift
# parameter name
if re.fullmatch("^I_" + _base32_decode(p[2:]) + "$", peak_name) \
is not None:
# shift peaks (scaled with square root of peak position)
return params[p] * np.sqrt(x0)
# no shift for all other peaks
return 0.0
#
#
#
#
# check for valid mode
if mode != 'count' and mode != 'estimate' and mode != 'eval':
raise Exception("Internal error: Unknown mode \"{0:s}\".".format(mode))
#
#
#
#
# peak width scaling parameter common to all peak shape functions (may be
# fixed to a constant value of 0.5)
γ = params['γ']
#
#
#
#
# find maximum peak in estimation mode for *all* peak shape functions
if mode == 'estimate':
# unpack additional mode-specific arguments
data = arg_tuple
#
# determine maximum peak to estimate general peak shape parameters
print("Finding maximum peak to estimate general peak shape parameters…")
peak, props = find_peaks(
data[:, 1],
distance = np.iinfo(np.int32).max,
width = 0.0, rel_height = 1.0 - 1.0 / np.e
)
peak_pos = data[peak[0], 0]
print("Position of maximum peak is {0:.2f} amu/e.".format(peak_pos))
#
#
#
#
# peak shape: error function activation with exponential decay
if function == "error-expDecay":
# estimate general peak shape parameters
if mode == 'estimate':
# estimate decay constant
τ = props['widths'][0] * (data[2, 0] - data[1, 0]) / peak_pos**γ
params.add('τ', value = τ, min = 0.0)
print("Estimated decay constant is {0:.6f} amu/e.".format(τ))
#
# return estimated fit parameters and p(0), i.e. peak shape function
# value at 0
return params, 0.5
#
#
#
#
# parse common parameters for 'count' and 'eval' mode
τ = params['τ']
#
#
# return unified area to normalize counts
if mode == 'count':
return np.exp(1.0 / np.pi) * τ
#
#
# evaluate function
if mode == 'eval':
# unpack additional mode-specific arguments
x, peak = arg_tuple
#
# parse parameters
I = params[_get_intensity_name(peak)]
x0 = peak['mass_charge_ratio']
#
# set peak width scaling parameter
λ = x0**γ * peak_scale(_get_intensity_name(peak), params)
#
# set additional possible shift based on peak name
Δ = peak_shift(_get_intensity_name(peak), params, x0)
#
#
# set effective width for current peak (τ is normalized width)
τ = τ * λ
#
#
# evaluation range is limited to multiple of decay constant on
# either side of the peak; 0.0 elsewhere
# (prevents possible overflow and is considerably faster)
Δmax = 20.0 * τ
#
#
# get evaluation range around peak
peak_range = get_peak_range(x, x0, Δmax)
#
#
# input was array; use slice
if isinstance(peak_range, slice):
# transform data (only in evaluation interval)
x_t = (x[peak_range] - x0 - Δ) / τ
#
# pre-fill results with zeros
y = np.zeros_like(x)
#
# intensities are calculated for hypothetical peak position at
# unity to account for the scaling with respect to the mass-to-
# charge ratio
y[peak_range] = I / λ * peak['abundance'] * \
_activation(x_t) * _decay(x_t)
#
# return result
return y
#
# input was scalar and in evaluation range
elif peak_range == True:
# transform data
x_t = (x - x0 - Δ) / τ
#
return I / λ * peak['abundance'] * \
_activation(x_t) * _decay(x_t)
#
# input was scalar, but outside evaluation range
elif peak_range == False:
return 0.0
#
#
#
#
# peak shape: error function activation with double exponential decay
elif function == "error-doubleExpDecay":
# estimate general peak shape parameters
if mode == 'estimate':
# estimate decay constant
τ = props['widths'][0] * (data[2, 0] - data[1, 0]) / peak_pos**γ
params.add('τ1', value = τ, min = 0.0)
params.add('τ2', value = 0.1 * τ, min = 0.0)
params.add('φ', value = 0.5, min = 0.0, max = 1.0)
print("Estimated decay constant is {0:.6f} amu/e.".format(τ))
#
# return estimated fit parameters and p(0), i.e. peak shape function
# value at 0
return params, 0.5
#
#
#
#
# parse common parameters for 'count' and 'eval' mode
τ1 = params['τ1']
τ2 = params['τ2']
φ = params['φ']
#
# activation parameter is fixed through maximum condition at nominal
# mass-to-charge ratio
w = τ1 * τ2 / ((1.0 - φ) * τ1 + φ * τ2)
#
#
# return unified area to normalize counts
if mode == 'count':
return φ * np.exp((w / τ1)**2 / np.pi) * τ1 + \
(1.0 - φ) * np.exp((w / τ2)**2 / np.pi) * τ2
#
#
# evaluate function
if mode == 'eval':
# unpack additional mode-specific arguments
x, peak = arg_tuple
#
# parse parameters
I = params[_get_intensity_name(peak)]
x0 = peak['mass_charge_ratio']
#
# set peak width scaling parameter
λ = x0**γ * peak_scale(_get_intensity_name(peak), params)
#
# set additional possible shift based on peak name
Δ = peak_shift(_get_intensity_name(peak), params, x0)
#
#
# set effective width for current peak (τ is normalized width)
τ1 = τ1 * λ
τ2 = τ2 * λ
w = w * λ
#
#
# evaluation range is limited to multiple of width of error function
# onset on the left side and largest decay constant on the right
# side; 0.0 elsewhere (prevents possible overflow and is
# considerably faster)
Δmax = (20.0 * w, 20.0 * max(τ1, τ2))
#
#
# get evaluation range around peak
peak_range = get_peak_range(x, x0, Δmax)
#
#
# input was array; use slice
if isinstance(peak_range, slice):
# transform data (only in evaluation interval)
x_t = x[peak_range] - x0 - Δ
#
# pre-fill results with zeros
y = np.zeros_like(x)
#
# intensities are calculated for hypothetical peak position at
# unity to account for the scaling with respect to the mass-to-
# charge ratio
y[peak_range] = \
I / λ * peak['abundance'] * _activation(x_t / w) * (
φ * _decay(x_t / τ1) + (1.0 - φ) * _decay(x_t / τ2)
)
#
# return result
return y
#
# input was scalar and in evaluation range
elif peak_range == True:
# transform data
x_t = x - x0 - Δ
#
return \
I / λ * peak['abundance'] * _activation(x_t / w) * (
φ * _decay(x_t / τ1) + (1.0 - φ) * _decay(x_t / τ2)
)
#
# input was scalar, but outside evaluation range
elif peak_range == False:
return 0.0
#
#
else:
raise Exception(
"Unknown peak shape function \"{0:s}\".".format(function)
)