"""
=================
DirichletMagmaMix
=================
A python package for modelling partial mantle melt aggregation using the Dirichlet distribution
as described by Rudge et al. (2013).
Developed and maintained by Simon Matthews (simonm@hi.is).
The module requires a set of primary melt compositions to be generated elsewhere. At the present
time the module can only important results generated by the pyMelt library (though an older
version of the code is compatible with alphaMELTS).
"""
__version__ = "1.00"
__author__ = "Simon Matthews"
import numpy as np
import pandas as pd
import pyMelt as m
from copy import copy
from warnings import warn as _warn
[docs]class Error(Exception):
"""Base class for exceptions in this module."""
pass
[docs]class UnmixedMelts:
"""
Contains all the melts (from all lithologies) and information about how they
should be weighted in the mixtures.
Parameters
----------
melts : pandas.DataFrame
All of the melts from the melting region. Columns should only be
chemical composition.
weighting : str, pandas.Series, or None, default: None
How to weight the melts in the mixtures (in addition to their mass).
In built options:
- 'triangle', a triangular melting region for a spreading-centre
- 'truncated_triangle', a triangular melting region missing the
top or bottom. Specify by passing arguments 'shape_top' and
'shape_bottom' (pressure in GPa).
Alternatively, supply a pandas Series with the numerical weighting
values. If None, an equal weighting will be applied.
isotopes : list of dicts, default: []
For any isotope ratios in the melts DataFrame a dict should be
provided with the following keys:
- 'ratio': the name of the ratio in the DataFrame, e.g., 'Nd143Nd144'.
- 'element': the element the isotopes are associated with, e.g., 'Nd'.
This element must be in the DataFrame.
mass_warning : bool, default: True
Warn if there is no mass information provided.
lithology_names : list of strings, default: []
The names of the lithologies contributing to the melting column. Used
to preserve tracer variables, for example during fractional crystallisation.
"""
def __init__(self, melts, pressure=None, mass=None, weights=None,
isotopes=[], mass_warning=True, lithology_names=[],
**kwargs):
self.melts = copy(melts)
self.pressure = copy(pressure)
self.mass = copy(mass)
self.isotopes = isotopes
self.isotope_names = []
self.lithology_names = lithology_names
for iso in self.isotopes:
self.isotope_names.append(iso['ratio'])
# Calculate weighting
if weights == 'triangle':
self.weights = self._calculate_triangular_weighting()
elif weights == 'truncated_triangle':
self.weights = self._calculate_triangular_weighting(**kwargs)
elif weights is None:
self.weights = self._equal_weighting()
elif isinstance(weights, pd.Series):
if len(weights) == len(self.melts):
self.weights = weights
else:
raise InputError("Weights series is not the same length as the "
"melts DataFrame/")
else:
raise InputError("Weights not recognised.")
# Apply equal mass if no other information is given:
if self.mass is None:
self.mass = self._equal_weighting()
if mass_warning is True:
_warn("No mass passed. Each melt will be "
"treated as if it has equal mass.")
# Ensure mass is normalised:
else:
self.mass = self.mass / np.sum(self.mass)
# Remove zero weighted melts:
self.weights.reset_index(inplace=True, drop=True)
self.pressure.reset_index(inplace=True, drop=True)
self.mass.reset_index(inplace=True, drop=True)
self.melts.reset_index(inplace=True, drop=True)
self.pressure = self.pressure[self.weights > 0]
self.mass = self.mass[self.weights > 0]
self.melts = self.melts[self.weights > 0]
self.weights = self.weights[self.weights > 0]
[docs] def mean_composition(self):
"""
Calculates the mean composition of the melts in the melting column, according
to the defined masses and weighting.
Returns
-------
pandas.Series
The mean composition
"""
w = np.array([self.weights * self.mass / np.sum(self.weights * self.mass)]
* np.shape(self.melts)[1]).T
mixed = (w * self.melts).sum()
mixed = self._mix_isotopes(mixed)
return mixed
[docs] def gen_dirichlet_parameters(self, N):
"""
Generate dirichlet parameters given the mixing parameter passed to function,
and the weighting variables contained in UnmixedMelts object.
Parameters
----------
N: float or int
Mixing parameter. Must be >1.
Returns
-------
pandas.Series
Dirichlet parameters
"""
return (N - 1) * self.mass * self.weights / np.sum(self.mass * self.weights)
[docs] def gen_mixing_weights(self, alpha):
"""
Generate a random distribution of mixing proportions.
Parameters
----------
alpha : pandas.Series
The dirichlet parameters, probably calculated using the gen_alpha method.
Returns
-------
pandas.Series
Mixing weights for one random draw from the dirichlet distribution.
"""
return np.random.dirichlet(alpha)
[docs] def gen_mixed_melts(self, mixing_parameter, samples=1):
"""
Generate mixed melts with mixing parameters randomly selected
from the dirichlet distribution defined by the mixing parameter.
Parameters
----------
mixing_parameter: float or int
Mixing parameter. Must be >=1.
samples: int, default: 1
Number of mixed melts to return.
Returns
-------
MixedMelts
The randomly mixed melts.
"""
mixed = pd.DataFrame(columns=self.melts.columns)
# N = 1 is just a randomly chosen melt
if mixing_parameter == 1:
wts = self.weights * self.mass
for i in range(samples):
randindex = np.random.choice(self.melts.index, 1, list(wts))[0]
mixed = mixed.append(self.melts[self.melts.index == randindex], ignore_index=True)
else:
for i in range(samples):
alpha = self.gen_dirichlet_parameters(mixing_parameter)
# Make an array of r values for each column:
r = np.array([self.gen_mixing_weights(alpha)] * np.shape(self.melts)[1]).T
mix = (r * self.melts).sum()
mix = self._mix_isotopes(mix, r[:, 0])
mixed = mixed.append(mix, ignore_index=True)
return MixedMelts(mixed)
[docs] def gen_cmc_melts(self, samples, max_crystallisation, mixing_min, mixing_max,
partition_coefficient=None):
"""
Generate melts produced by concurrent mixing and crystallisation, using the
model defined in Rudge et. al. (2003).
Parameters
----------
samples: int
Number of mixed melts to produce.
max_crystallisation: float
Maximum fraction of crystallisation. Denoted by Chi in Rudge et. al. (2003)
mixing_min: float or int
Minimum degree of melt mixing
mixing_max: float or int
Maximum degree of melt mixing
partition_coefficient: float, pandas.Series or None, default: None
The partition coefficient for each element during crystallisation. If None,
all elements will be treated as being perfectly incompatible.
Returns
-------
MixedMelts
The randomly mixed melts.
"""
# Generate uniform random variable U:
U = np.random.uniform(0.0, 1.0, samples)
C = max_crystallisation * U**2
N = mixing_min * (mixing_max / mixing_min) ** U
mixed = pd.DataFrame(columns=self.melts.columns + ['crystallisation'])
# mixed['crystallisation'] = C
for i, Ni in zip(range(samples), N):
if Ni == 1:
wts = self.weights * self.mass
randindex = np.random.choice(self.melts.index, 1, list(wts))
mix = self.melts[self.melts.index == randindex]
else:
alpha = self.gen_dirichlet_parameters(Ni)
r = np.tile(np.array([self.gen_mixing_weights(alpha)]),
(np.shape(self.melts)[1], 1)).T
mix = (r * self.melts).sum()
mix = self._mix_isotopes(mix, r[:, 0])
for el in mix.index:
if el not in self.isotope_names and el not in self.lithology_names:
if partition_coefficient is None:
mix[el] = mix[el] / (1 - C[i])
elif isinstance(partition_coefficient, float):
mix[el] = mix[el] * (1 - C[i]) ** (partition_coefficient - 1)
elif (isinstance(partition_coefficient, pd.Series)
and el in partition_coefficient.index):
mix[el] = mix[el] * (1 - C[i]) ** (partition_coefficient[el] - 1)
else:
raise InputError("Something is wrong with the partition coefficient.")
mix['crystallisation'] = C[i]
mixed = mixed.append(mix, ignore_index=True)
return MixedMelts(mixed)
[docs] def homogenise(self, pressure):
"""Homogenise melts produced below a certain pressure.
Parameters
----------
pressure: float
Pressure at which to homogenise melts generated beneath.
"""
melts_to_homog = self.melts[self.pressure > pressure]
weights_to_homog = (self.weights[self.pressure > pressure]
* self.mass[self.pressure > pressure])
weights_to_homog_normed = weights_to_homog / weights_to_homog.sum()
weights_to_homog_normed = np.tile(np.array(weights_to_homog_normed),
(np.shape(melts_to_homog)[1], 1)).T
homogenised = (weights_to_homog_normed * melts_to_homog).sum()
homogenised = self._mix_isotopes(homogenised, weights_to_homog_normed[:, 0],
melts_to_homog)
for index, row in self.melts.iterrows():
if self.pressure.loc[index] > pressure:
for col in self.melts.columns:
self.melts.loc[index, col] = homogenised[col]
[docs] def variance(self, mixing_parameter=1):
"""Calculate the variance for elements at a specified degree of mixing. Uses
the equation given in the appendix of Rudge et al.
Parameters
----------
mixing_parameter: float or int
Mixing parameter.
Returns
-------
pandas.Series
Series with element names as index.
"""
meanComps = self.mean_composition()
variances = {}
for el in meanComps.index:
if el not in self.isotope_names:
variances[el] = (np.sum(self.weights * self.mass
* (self.melts[el] - meanComps[el]) ** 2)
/ mixing_parameter)
return pd.Series(variances)
[docs] def correlation(self):
"""
Calculates the correlation coefficient between elements. This is independent
of the mixing parameter.
Returns
-------
pandas.DataFrame
DataFrame containing the correlations with elements as the indexes
and columns.
"""
elements = []
for col in self.melts.columns:
if col not in self.isotope_names:
elements.append(col)
cors = np.zeros([np.shape(elements)[0]] * 2)
var = self.variance()
covs = self.covariance()
for i, el1 in zip(range(np.shape(elements)[0]), elements):
for j, el2 in zip(range(np.shape(elements)[0]), elements):
cors[i, j] = covs.iloc[i, j] / np.sqrt(var.iloc[i] * var.iloc[j])
return pd.DataFrame(cors, index=elements, columns=elements)
def _calculate_triangular_weighting(self, shape_top=None, shape_bottom=None, **kwargs):
"""
Calculates normalised weighting values for a triangular melting region, with
truncated top and/or bottom. If pressure exists in the melts DataFrame the
weighting will be calculated from the pressure, otherwise equal pressure
increments will be assumed.
To use the truncation, pressure must exist in the melts dataframe as column
'pressure'.
Parameters
----------
shape_top : float
The top truncation pressure, in GPa.
shape_bottom : float
The bottom truncation pressure, in GPa.
Returns
-------
pandas.Series
The weighting values for each melt.
"""
if self.pressure is not None:
max_pressure = np.nanmax(self.pressure)
min_pressure = np.nanmin(self.pressure)
w = (np.array(self.pressure) - min_pressure) / (max_pressure - min_pressure)
else:
_warn("Triangular weighting is being applied without pressure information. "
"Assuming equal pressure decrements.")
w = np.linspace(1, 0, np.shape(self.melts)[0])
if shape_top is not None:
top_mask = self.pressure < shape_top
w[top_mask] = 0
if shape_bottom is not None:
bottom_mask = self.pressure > shape_bottom
w[bottom_mask] = 0
w = w / np.sum(w)
return pd.Series(w)
def _equal_weighting(self):
"""
Generates normalised equal weighting for all the melts in a melting region.
"""
w = np.full(len(self.melts), 1.0 / len(self.melts))
return pd.Series(w)
def _mix_isotopes(self, mixed, proportions=None, unmixed=None):
"""
Calculates the mixed isotope ratios.
Parameters
----------
mixed : pandas.Series
The homogenised concentrations (plus nonsense values for the isotope ratios).
proportions : numpy.Array or None, default: None
The proportions of each melt in the mixture. If None, the melts will be
homogenised.
unmixed : pandas.Series or None
Use a subset of melts to do the mixing. If None the whole suite of melts will be used.
Returns
-------
pandas.Series
The mixed composition with the nonsense isotope ratios replaced with the true values.
"""
mixed = copy(mixed)
for iso in self.isotopes:
if iso['element'] not in mixed.index:
raise InputError("The element for isotope ratio {} was "
"not found.".format(iso['ratio']))
else:
if proportions is None:
proportions = self.weights * self.mass / np.sum(self.weights * self.mass)
if unmixed is None:
mixed[iso['ratio']] = np.sum(proportions
* self.melts[iso['element']]
/ mixed[iso['element']]
* self.melts[iso['ratio']])
else:
mixed[iso['ratio']] = np.sum(proportions
* unmixed[iso['element']]
/ mixed[iso['element']]
* unmixed[iso['ratio']])
return mixed
[docs]class MixedMelts:
"""
Results from a dirichlet mixing calculation.
"""
def __init__(self, melts):
self.melts = melts
[docs]def import_pyMelt_geoSetting(geoSetting, weights=None, isotopes={}, **kwargs):
"""
Imports a pyMelt geoSetting.
Parameters
----------
geoSetting : pyMelt.geoSetting
The geoSetting class containing the melts to be mixed.
weights : pandas.Series, string, or None, default: None
Specify weighting for the melts (see UnmixedMelts docstring for more information). If
None and the geoSetting is a spreadingCentre, a triangular weighting will be applied,
otherwise the melts will have equal weighting.
isotopes : dict containing lists of dicts, default: {}
The isotope ratios to assign to the melts. The keys of the first dictionary should be the
lithology names, then the list consists of dictionaries containing:
- ratio, e.g., 'Nd143Nd144'
- element, e.g., 'Nd'
- value, e.g., 0.51230
Returns
-------
UnmixedMelts
The UnmixedMelts object created during import
"""
if isinstance(geoSetting, m.geosettings.geoSetting) is False:
raise InputError("PyMelt object not recognised.")
pressure = pd.Series(dtype='float64')
mass = pd.Series(dtype='float64')
melts = pd.DataFrame(dtype='float64')
lithology_names = list(geoSetting.lithologies.keys())
isotopes_in_system = []
for lithology_name in lithology_names:
lith = copy(geoSetting.lithologies[lithology_name])
pressure = pressure.append(geoSetting.P, ignore_index=True)
dX = np.zeros(np.shape(lith.F)[0])
dX[1:] = np.array(lith.F.iloc[1:]) - np.array(lith.F[:-1])
dX = pd.Series(dX)
prop_index = geoSetting.mantle.names.index(lithology_name)
mass = mass.append(dX * geoSetting.mantle.proportions[prop_index],
ignore_index=True)
if lithology_name in isotopes:
for iso in isotopes[lithology_name]:
iso_tiled = np.tile(iso['value'], np.shape(lith.F)[0])
lith[iso['ratio']] = iso_tiled
if lithology_name == list(geoSetting.lithologies.keys())[0]:
isotopes_in_system.append({'ratio': iso['ratio'],
'element': iso['element']})
lith[lithology_name] = np.tile(1.0, np.shape(lith.F)[0])
for lith_other in geoSetting.lithologies.keys():
if lith_other != lithology_name:
lith[lith_other] = np.tile(0.0, np.shape(lith.F)[0])
melts = melts.append(lith, ignore_index=True)
melts = melts.drop(['P', 'T', 'F'], axis=1)
if weights is None:
if isinstance(geoSetting, m.geosettings.spreadingCentre):
weights = 'triangle'
elif isinstance(weights, pd.Series):
weights = weights[mass > 0]
um = UnmixedMelts(melts[mass > 0], pressure[mass > 0], mass[mass > 0], weights,
isotopes=isotopes_in_system, lithology_names=lithology_names,
**kwargs)
return um