import datetime
import glob
import os
import warnings
import astropy.constants as const
import astropy.units as u
import numpy as np
import pandas as pd
import psutil
import sunpy.sun.constants as sconst
from astropy.utils.data import get_pkg_data_filename
from sunpy.coordinates import get_horizons_coord
from tqdm.auto import tqdm
from typing import Callable
# Utilities toolbox, contains helpful functions
def custom_formatwarning(message, *args, **kwargs):
"""
:meta private:
"""
# ignore everything except the message
RED = '\033[91m'
ENDC = '\033[0m'
BOLD = "\033[1m"
return BOLD+RED+'WARNING: '+ENDC+ str(message) + '\n'
def custom_formatnotification(message, *args, **kwargs):
"""
:meta private:
"""
# ignore everything except the message
YELLOW = '\033[93m'
ENDC = '\033[0m'
BOLD = "\033[1m"
_yllw_str = '\x1b[38;5;226m'
return BOLD+YELLOW+'NOTE: '+ENDC+ str(message) + '\n'
def custom_warning(message):
"""
:meta private:
"""
formatwarning_orig = warnings.formatwarning
warnings.formatwarning = custom_formatwarning
warnings.warn(message)
warnings.formatwarning = formatwarning_orig
return
def custom_notification(message):
"""
:meta private:
"""
formatwarning_orig = warnings.formatwarning
warnings.formatwarning = custom_formatnotification
warnings.warn(message)
warnings.formatwarning = formatwarning_orig
return
[docs]
def k_parameter(mu: float, sigma: float, sigma_multiplier: int) -> float:
"""
The standard version of k for the z-standardized intensity CUSUM.
Parameters:
-----------
mu : {float, np.ndarray}
The mean of the background.
sigma : {float, np.ndarray}
The standard deviation of the background.
sigma_multiplier : {int,float}
The multiplier for mu_{d} != 0.
Returns:
--------
k_param : {float, np.ndarray} Type depends on the input type.
A valid k_parameter value (k >= 0).
"""
if sigma_multiplier == 0:
raise ValueError("sigma_multiplier may not be 0!")
# Let's not divide by zero.
# Only do this check if mu and sigma are singular values, numpy will take
# care of the cases with arrays.
if not isinstance(mu, (list, np.ndarray)):
if mu==0 or sigma==0:
return 0
nominator = sigma_multiplier
denominator = np.log(1 + (sigma_multiplier*sigma)/mu)
k_param = (nominator/denominator) - (mu/sigma)
if not isinstance(k_param, (int, float, np.int64, np.float64, np.longdouble)):
return k_param
return k_param if k_param >= 0 else 0
[docs]
def k_legacy(mu: float, sigma: float, sigma_multiplier: float) -> float:
"""
The old standard k-parameter for SEPpy.
"""
if sigma_multiplier == 0:
raise ValueError("sigma_multiplier may not be 0!")
# Let's not divide by zero
if not isinstance(mu, (list, np.ndarray)):
if mu==0:
return 0
nominator = sigma_multiplier
denominator = np.log(1 + (sigma_multiplier*sigma)/mu)
# In legacy SEPpy, k is rounded to the nearest integer
return np.round(nominator/denominator)
# def sqrt_sum_squares(series):
# """
# Custom aggregation function for uncertainties, which calculates the
# sqrt of the sum of squares divided by number of samples in the bin
# Parameters
# ----------
# series : pd.Series
# The series to aggregate
# Returns
# -------
# float
# Sqrt of sum of squares divided by number of samples
# """
# if isinstance(series, pd.DataFrame):
# custom_warning("The sqrt_sum_squares function is not tested for DataFrames yet! Proceed with caution and report any issues you find!")
# return np.sqrt(np.nansum(series**2)) / series.count()
[docs]
def propagated_mean_uncertainty(data: pd.Series | pd.DataFrame) -> float | pd.Series:
"""
Aggregation function for uncertainties, which calculates the square root
of the sum of squares divided by the number of samples in the bin.
This differs from the standard Root Mean Square (RMS), where the division
by n occurs *inside* the square root: sqrt(sum(x**2) / n). Here, the
division occurs *outside*: sqrt(sum(x**2)) / n. This form is appropriate
for propagating uncertainties when averaging n measurements.
Works for both pd.Series and pd.DataFrame. In the case of a DataFrame,
the calculation is performed per column.
Parameters
----------
data : pd.Series or pd.DataFrame
The data to aggregate
Returns
-------
float or pd.Series
Sqrt of sum of squares divided by number of samples.
Returns a single float for pd.Series input, or a pd.Series
(one value per column) for pd.DataFrame input.
Returns np.nan if the input is empty, all-NaN, or (for DataFrames)
for any column that is empty or all-NaN.
Raises
------
TypeError
If data is not a pd.Series or pd.DataFrame.
"""
if isinstance(data, pd.Series):
count = data.count()
# Return np.nan early if count is zero (empty or all-NaN) to avoid division by zero
if count == 0:
return np.nan
return np.sqrt(np.nansum(data**2)) / count
elif isinstance(data, pd.DataFrame):
count = data.count() # per-column Series
# Replace zero counts with np.nan before dividing to avoid division by zero
if (count == 0).any():
count = count.where(count > 0, other=np.nan)
return np.sqrt(np.nansum(data**2, axis=0)) / count
else:
raise TypeError(f"Expected pd.Series or pd.DataFrame, got {type(data).__name__}")
[docs]
def reduce_list_generic(original_list, placeholder="xx", seperator="_"):
"""
Reduces a list by replacing numeric sequences with placeholders.
Args:
original_list: The list of strings to reduce
placeholder: The string to replace numeric parts (default: "xx")
seperator: The character used to split the strings (default: "_")
Returns:
A sorted list of unique patterns
"""
patterns = set()
for item in original_list:
# Split the string by common delimiters
parts = item.split(seperator)
# Create a new pattern by replacing numeric parts
new_parts = []
for part in parts:
# Check if this part is purely numeric
if part.isdigit():
new_parts.append(placeholder)
# Check if there's a numeric suffix in the part
elif any(c.isdigit() for c in part):
# Find where the numbers start
for i, char in enumerate(part):
if char.isdigit():
# Replace numeric portion with placeholder
new_parts.append(part[:i] + placeholder)
break
else:
new_parts.append(part)
# Join back the modified parts
pattern = '_'.join(new_parts)
patterns.add(pattern)
return sorted(list(patterns))
[docs]
def resample_df(
df: pd.DataFrame | pd.Series,
resample: str,
pos_timestamp: str = "center",
origin: str = "start",
cols_unc: list[str] | str = 'auto',
keywords_unc: list[str] = ['unc', 'err', 'sigma'],
verbose: bool = True
) -> pd.DataFrame | pd.Series:
"""
Resamples a Pandas Dataframe or Series to a new frequency. Note that this is
just a simple wrapper around the pandas resample function that is
calculating the mean of the data in the new time bins. This is not
necessarily the correct way to resample data, depending on the data type
(for example for errors)!
Parameters
----------
df : pd.DataFrame or pd.Series
The dataframe or series to resample
resample : str
pandas-compatible time string, e.g., '1min', '2H' or '25s'
pos_timestamp : str, default 'center'
Controls if the timestamp is at the center of the time bin, or at
the start of it
origin : str, default 'start'
Controls if the origin of resampling is at the first entry of the
input dataframe/series ('start'), or at the start of the day
('start_day')
cols_unc : list, default 'auto'
List of specific column names in the dataframe (or name of the
series) that contain uncertainties. These columns will be resampled
using a custom function (sqrt of the sum of squares divided by
number of samples in the bin) instead of just the arithmetic mean.
If an empty list is provided (i.e. []), all columns will be
resampled using the arithmetic mean. If set to 'auto' (default), the
function will try to automatically detect columns with uncertainties
based on their names (looking for the keywords provided in
keywords_unc, by default 'unc', 'err' or 'sigma'). Note that this
automatic detection only works for single-level column DataFrames
and Series.
keywords_unc : list, default ['unc', 'err', 'sigma']
List of keywords to use for automatic detection of uncertainty. All
columns with these keywords in their name will be treated as
uncertainty columns when cols_unc is set to 'auto'.
verbose : bool, default True
If True, will print additional debug information, e.g., about
automatically detected uncertainty columns. Deactivate at own risk
and only if you know what you are doing!
Returns
-------
df : pd.DataFrame or Series, depending on the input
"""
if not resample:
raise ValueError("Resample period is set to 'None'. No resampling will be applied.")
# check if resample option makes sense (e.g., new frequency is smaller than original frequency)
delta_resample = pd.to_timedelta(resample)
# for original data, get each time difference and find the most common one (mode)
dt_original = df.index.to_series().diff()
delta_original = dt_original.mode().iloc[0].round('s') # round to full seconds to avoid weirdness
#
if delta_resample < delta_original:
raise ValueError(f"Your resample option of '{resample}' is smaller than the original data cadence of '{delta_original}'. This is not supported!")
elif delta_resample == delta_original:
custom_warning(f"\nYour resample option of '{resample}' is equal to the original data cadence of '{delta_original}'. You should only average like this if you know EXACTLY what you are doing, as it could offset the position of the timestamps!\n")
# automatically detect columns with uncertainties if not provided
if type(cols_unc) is str and cols_unc == 'auto':
if isinstance(df, pd.DataFrame):
if type(df.columns) is not pd.MultiIndex:
# cols_unc = [col for col in df.columns if 'uncertainty' in col.lower() or 'err' in col.lower() or 'sigma' in col.lower()]
cols_unc = [col for col in df.columns if any(keyword.lower() in col.lower() for keyword in keywords_unc)]
elif type(df.columns) is pd.MultiIndex:
cols_unc = []
custom_warning("\nResampling of MultiIndex DataFrames with uncertainty columns not implemented yet! Proceeding without uncertainty handling.\n")
elif isinstance(df, pd.Series):
if isinstance(df.name, str) and any(keyword.lower() in df.name.lower() for keyword in keywords_unc):
cols_unc = [df.name]
else:
cols_unc = []
if len(cols_unc) > 0 and cols_unc != 'auto':
if verbose:
custom_notification(f"Automatically detected columns with uncertainties: {reduce_list_generic(cols_unc)}. Please report this behaviour if you think it is wrong!")
else:
custom_warning("No columns with uncertainties automatically detected! You might need to provide them manually via the 'cols_unc' parameter. Please report this behaviour if you think it should work automatically.")
# resampling
try:
# Handle DataFrame input
if isinstance(df, pd.DataFrame):
# save column order
df_columns = df.columns
# Identify non-object columns (this includes numeric columns but also datetime columns, etc.) to avoid resampling of object columns (such as strings), which would raise errors.
non_object_cols = df.select_dtypes(exclude='object').columns
agg_dict: dict[str, Callable[..., float | pd.Series] | str] = {}
for col in cols_unc:
if col in non_object_cols:
agg_dict[col] = propagated_mean_uncertainty
for col in df.columns.difference(cols_unc):
if col in non_object_cols:
agg_dict[col] = 'mean'
df = df.resample(resample, origin=origin, label="left").agg(agg_dict)
# restore column order (only for columns that survived resampling)
df = df[[col for col in df_columns if col in df.columns]]
# Check for non-numeric columns that were dropped during resampling and print a warning if verbose is True
dropped_cols = [col for col in df_columns if col not in non_object_cols]
if dropped_cols:
if verbose:
custom_warning(f"The following non-numeric columns were dropped during resampling: {dropped_cols}")
else:
custom_warning("Some non-numeric columns were dropped during resampling.")
# Handle Series input
elif isinstance(df, pd.Series):
if isinstance(df.name, str) and df.name in cols_unc:
df = df.resample(resample, origin=origin, label="left").agg(propagated_mean_uncertainty)
else:
df = df.resample(resample, origin=origin, label="left").mean() # This is actually the original functionality before adding cols_unc
# Adjust timestamp position
if pos_timestamp == 'start':
df.index = df.index
else:
df.index = df.index + pd.tseries.frequencies.to_offset(pd.Timedelta(resample)/2)
# if pos_timestamp == 'stop' or pos_timestamp == 'end':
# df.index = df.index + pd.tseries.frequencies.to_offset(pd.Timedelta(resample))
except ValueError:
raise ValueError(f"Your resample option of '{resample} doesn't seem to be a proper Pandas frequency!")
return df
def _flux2series(flux, dates, cadence=None):
"""
Converts an array of observed particle flux + timestamps into a pandas series
with the desired cadence.
Parameters
----------
flux: an array of observed particle fluxes
dates: an array of corresponding dates/times
cadence: str - desired spacing between the series elements e.g. '1s' or '5min'
Returns
-------
flux_series: Pandas Series object indexed by the resampled cadence
"""
# from pandas.tseries.frequencies import to_offset
# set up the series object
flux_series = pd.Series(flux, index=dates)
# if no cadence given, then just return the series with the original
# time resolution
if cadence is not None:
flux_series = resample_df(df=flux_series, resample=cadence, pos_timestamp="center", origin="start", cols_unc=[]) # TODO: this ignores all uncertainty columns, but they are not used here anyway
return flux_series
def bepicolombo_sixs_stack(path, date, side, pos_timestamp='center'):
"""
:meta private:
"""
# side is the index of the file here
try:
try:
filename = f"{path}/sixs_phys_data_{date}_side{side}.csv"
df = pd.read_csv(filename)
except FileNotFoundError:
# try alternative file name format
filename = f"{path}/{date.strftime('%Y%m%d')}_side{side}.csv"
df = pd.read_csv(filename)
times = pd.to_datetime(df['TimeUTC'])
# list comprehension because the method can't be applied onto the array "times"
times = [t.tz_convert(None) for t in times]
df.index = np.array(times)
df = df.drop(columns=['TimeUTC'])
if pos_timestamp == 'start':
cadence1 = (df.index[1]-df.index[0]).seconds
cadence2 = (df.index[-1]-df.index[-2]).seconds
if cadence1 != cadence2:
custom_warning("Bepi/SIXS cadence is changing throughout the day; something is wrong!")
df.index = df.index-pd.Timedelta(f'{cadence1/2}s')
elif pos_timestamp == 'center':
pass
except FileNotFoundError:
print(f'Unable to open {filename}')
df = pd.DataFrame()
filename = ''
return df, filename
def bepi_sixs_load(startdate, enddate, side, path, pos_timestamp='center'):
"""
:meta private:
"""
dates = pd.date_range(startdate, enddate)
# read files into Pandas dataframes:
df, file = bepicolombo_sixs_stack(path, startdate, side=side, pos_timestamp=pos_timestamp)
if len(dates) > 1:
for date in dates[1:]:
t_df, file = bepicolombo_sixs_stack(path, date.date(), side=side, pos_timestamp=pos_timestamp)
df = pd.concat([df, t_df])
channels_dict = {"Energy_Bin_str": {'E1': '71 keV', 'E2': '106 keV', 'E3': '169 keV', 'E4': '280 keV', 'E5': '960 keV', 'E6': '2240 keV', 'E7': '8170 keV',
'P1': '1.1 MeV', 'P2': '1.2 MeV', 'P3': '1.5 MeV', 'P4': '2.3 MeV', 'P5': '4.0 MeV', 'P6': '8.0 MeV', 'P7': '15.0 MeV', 'P8': '25.1 MeV', 'P9': '37.3 MeV'},
"Electron_Bins_Low_Energy": np.array([55, 78, 134, 235, 1000, 1432, 4904]),
"Electron_Bins_High_Energy": np.array([92, 143, 214, 331, 1193, 3165, 10000]),
"Ion_Bins_Low_Energy": np.array([0.001, 1.088, 1.407, 2.139, 3.647, 7.533, 13.211, 22.606, 29.246]),
"Ion_Bins_High_Energy": np.array([1.254, 1.311, 1.608, 2.388, 4.241, 8.534, 15.515, 28.413, 40.0])}
return df, channels_dict
def calc_av_en_flux_sixs(df, channel, species):
"""
This function averages the flux of two energy channels of BepiColombo/SIXS-P into a combined energy channel
channel numbers counted from 1
Parameters
----------
df : pd.DataFrame
DataFrame containing HET data
channel : int or list
energy channel or list with first and last channel to be used
species : string
'e', 'electrons', 'p', 'protons'
Returns
-------
flux: pd.DataFrame
channel-averaged flux
en_channel_string: str
string containing the energy information of combined channel
:meta private:
"""
# try to extract side info from dataframe column names
if len(df.filter(like='Side').keys()) > 0:
if len(df.filter(like='Side').keys()) > 10:
raise ValueError('Multiple BepiColombo/SIXS-P sides found in dataframe, unable to determine which side to use for channel combination!')
else:
side = df.filter(like='Side').keys()[0][4]
elif len(df.filter(like='Side').keys()) == 0:
side = None
if len(channel) != 2:
raise ValueError('Channel parameter has to be a list of two channel numbers, e.g. [5, 6] for combining channels 5 and 6!')
if (species[0].lower() == 'e' and channel == [5, 6]) or (species[0].lower() == 'p' and channel == [8, 9]):
pass
else:
raise NotImplementedError(f'BepiColombo/SIXS-P channel combination {channel} for {species} not implemented yet.')
# define constant geometric factors
if not side:
custom_warning('BepiColombo/SIXS-P side could not be determined from dataframe column names (internal level 2 data?). Using old, side-independent geometric factors, which most probably are not accurate anymore!')
GEOMFACTOR_PROT8 = 5.97E-01
GEOMFACTOR_PROT9 = 4.09E+00
GEOMFACTOR_ELEC5 = 1.99E-02
GEOMFACTOR_ELEC6 = 1.33E-01
GEOMFACTOR_PROT_COMB89 = 3.34
GEOMFACTOR_ELEC_COMB56 = 0.0972
ENERGY_PROT_COMB89 = '37 MeV'
ENERGY_ELEC_COMB56 = '1.4 MeV'
if (species[0].lower() == 'p' and channel == [8, 9]):
gf_comb = GEOMFACTOR_PROT_COMB89
E_comb = ENERGY_PROT_COMB89
gf_lower_chan = GEOMFACTOR_PROT8
gf_upper_chan = GEOMFACTOR_PROT9
elif (species[0].lower() == 'e' and channel == [5, 6]):
gf_comb = GEOMFACTOR_ELEC_COMB56
E_comb = ENERGY_ELEC_COMB56
gf_lower_chan = GEOMFACTOR_ELEC5
gf_upper_chan = GEOMFACTOR_ELEC6
column_name = ''
elif side in ['0', '1', '2', '3']:
# load in geometric factor and mean energy of combined channels
filepath = get_pkg_data_filename(f'data/bepi_sixsp_instrumental_constants/sixsp_side{side}_{species[0].lower()}_combined_gf.csv', package='seppy')
tdf = pd.read_csv(filepath, index_col=0).T
gf_comb = tdf['GF'][f'{species[0].upper()}{channel[0]}+{species[0].upper()}{channel[1]}']
E_comb = f"{tdf['E'][f'{species[0].upper()}{channel[0]}+{species[0].upper()}{channel[1]}']} MeV"
# load in geometric factors of individual channels
filepath = get_pkg_data_filename(f'data/bepi_sixsp_instrumental_constants/sixsp_side{side}_{species[0].lower()}_gf_en.csv', package='seppy')
tdf = pd.read_csv(filepath, index_col=0).T
gf_lower_chan = tdf['GF'][f'{species[0].upper()}{channel[0]}']
gf_upper_chan = tdf['GF'][f'{species[0].upper()}{channel[1]}']
column_name = f'Side{side}_'
elif side in ['4']:
raise NotImplementedError(f'BepiColombo/SIXS-P channel combination for Side{side} not implemented yet.')
if (species[0].lower() == 'e' and channel == [5, 6]) or (species[0].lower() == 'p' and channel == [8, 9]):
countrate = df[f'{column_name}{species[0].upper()}{channel[0]}'] * gf_lower_chan + df[f'{column_name}{species[0].upper()}{channel[1]}'] * gf_upper_chan
flux = countrate / gf_comb
en_channel_string = E_comb
else:
raise NotImplementedError(f'BepiColombo/SIXS-P channel combination {channel} for {species} not implemented yet.')
return flux, en_channel_string
"""
inf_inj_time.py
"""
SOLAR_ROT = sconst.get('sidereal rotation rate').to(u.rad/u.s)
[docs]
def get_sun_coords(time='now'):
'''
Gets the astropy Sun coordinates.
Args:
time (datetime.datetime): time at which coordinates are fetched.
Returns:
sun coordinates.
'''
return get_horizons_coord("Sun", time=time)
[docs]
def radial_distance_to_sun(spacecraft, time='now'):
'''
Gets the 3D radial distance of a spacecraft to the Sun.
3D here means that it's the real spatial distance and not
a projection on, say, the solar equatorial plane.
Args:
spacecraft (str): spacecraft to look for.
time (datetime.datetime): time at which to look for.
Returns:
astropy units: radial distance.
'''
sc_coords = get_horizons_coord(spacecraft, time)
return sc_coords.separation_3d(get_sun_coords(time=time))
[docs]
def calc_spiral_length(radial_dist, sw_speed):
'''
Calculates the Parker spiral length from the Sun up to a given radial distance.
Args:
radial_dist (astropy units): radial distance to the Sun.
sw_speed (astropy units): solar wind speed.
Returns:
astropy units: Parker spiral length.
'''
temp_const = ((SOLAR_ROT/sw_speed)*(radial_dist.to(u.km)-const.R_sun)).value
sqrt_temp_const = np.sqrt(temp_const**2 + 1)
return 0.5*u.rad * (sw_speed/SOLAR_ROT) * (temp_const*sqrt_temp_const + np.log(temp_const + sqrt_temp_const))
[docs]
def calc_particle_speed(mass, kinetic_energy):
'''
Calculates the relativistic particle speed.
Args:
mass (astropy units): mass of the particle.
kinetic_energy (astropy units): kinetic energy of the particle.
Returns:
astropy units: relativistic particle speed.
'''
gamma = np.sqrt(1 - (mass*const.c**2/(kinetic_energy + mass*const.c**2))**2)
return gamma*const.c
[docs]
def inf_inj_time(spacecraft, onset_time, species, kinetic_energy, sw_speed):
'''
Calculates the inferred injection time of a particle (electron or proton) from the Sun,
given a detection time at some spacecraft.
Args:
spacecraft (str): name of the spacecraft.
onset_time (datetime.datetime): time of onset/detection.
species (str): particle species, 'p' or 'e'.
kinetic_energy (astropy units): kinetic energy of particle. If no unit is supplied, is converted to MeV.
sw_speed (astropy units): solar wind speed. If no unit is supplied, is converted to km/s.
Returns:
datetime.datetime: inferred injection time.
'''
if type(kinetic_energy) is not u.quantity.Quantity:
kinetic_energy = kinetic_energy * u.MeV
if type(sw_speed) is not u.quantity.Quantity:
sw_speed = sw_speed * u.km/u.s
mass_dict = {'p': const.m_p,
'e': const.m_e
}
radial_distance = radial_distance_to_sun(spacecraft, time=onset_time)
spiral_length = calc_spiral_length(radial_distance, sw_speed)
particle_speed = calc_particle_speed(mass_dict[species], kinetic_energy)
travel_time = spiral_length/particle_speed
travel_time = travel_time.to(u.s)
return onset_time - datetime.timedelta(seconds=travel_time.value), spiral_length.to(u.AU)
[docs]
def energy2speed(species, kinetic_energy):
'''
Calculates the relativistic particle speed derived from kinetic energy.
Args:
species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass
kinetic_energy (astropy units/float): kinetic energy of the particle using astropy units; if only float, MeV is used
Returns:
astropy units: relativistic particle speed.
'''
if type(kinetic_energy) is not u.quantity.Quantity:
kinetic_energy = kinetic_energy * u.MeV
mass_dict = {'p': const.m_p, 'e': const.m_e}
return calc_particle_speed(mass_dict[species], kinetic_energy)
[docs]
def speed2energy(species, speed):
'''
Calculates the relativistic kinetic energy derived from particle speed.
Args:
species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass
speed (astropy units/float): relativistic particle speed of the particle using astropy units; if only float, m/s is used
Returns:
astropy units: kinetic energy
'''
if type(speed) is not u.quantity.Quantity:
speed = speed * u.m/u.s
mass_dict = {'p': const.m_p, 'e': const.m_e}
gamma = 1/np.sqrt(1-speed**2/const.c**2)
K = (gamma-1)*mass_dict[species]*const.c**2
return K.to(u.MeV)
[docs]
def speed2momentum(species, speed):
'''
Calculates the relativistic particle momentum derived from speed.
Args:
species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass
speed (float/astropy units): kinetic energy of the particle using astropy units; if only float m/s is used
Returns:
astropy units: relativistic particle momentum
'''
if type(speed) is not u.quantity.Quantity:
speed = speed * u.m/u.s
mass_dict = {'p': const.m_p, 'e': const.m_e}
return mass_dict[species] * speed
[docs]
def energy2momentum(species, kinetic_energy):
'''
Calculates the relativistic particle momentum derived from kinetic energy.
Args:
species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass
kinetic_energy (float/astropy units): kinetic energy of the particle using astropy units; if only float MeV is used
Returns:
astropy units: relativistic particle momentum
'''
if type(kinetic_energy) is not u.quantity.Quantity:
kinetic_energy = kinetic_energy * u.MeV
# mass_dict = {'p': const.m_p, 'e': const.m_e}
return speed2momentum(species, energy2speed(species, kinetic_energy))
[docs]
def intensity2psd(species, kinetic_energy, intensity):
'''
Calculates the phase space density (distribution function) derived from kinetic energy and intensity.
Args:
species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass
kinetic_energy (astropy units): kinetic energy of the particle using astropy units
intensity (astropy units): differential intensity using astropy units
Returns:
astropy units: phase space density
Example:
import astropy.units as u
f = intensity2psd('e', 48*u.keV, 1e4/(u.cm**2 * u.sr * u.s * u.MeV))
'''
if type(kinetic_energy) is not u.quantity.Quantity or type(intensity) is not u.quantity.Quantity:
print("All physical inputs have to be defined in astropy units! Run 'help(intensity2psd)' for an example.")
return
else:
p = energy2momentum(species, kinetic_energy)
f = intensity / (p**2)
return f.to(u.s**3/(u.sr*u.kg**3*u.cm**6))
[docs]
def intensity2vsd(species, kinetic_energy, intensity):
'''
Calculates the velocity space density (distribution function) derived from kinetic energy and intensity.
Args:
species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass
kinetic_energy (astropy units): kinetic energy of the particle using astropy units
intensity (astropy units): differential intensity using astropy units
Returns:
astropy units: phase space density
Example:
import astropy.units as u
f = intensity2vsd('e', 48*u.keV, 1e4/(u.cm**2 * u.sr * u.s * u.MeV))
'''
if type(kinetic_energy) is not u.quantity.Quantity or type(intensity) is not u.quantity.Quantity:
print("All physical inputs have to be defined in astropy units! Run 'help(intensity2psd)' for an example.")
return
else:
mass_dict = {'p': const.m_p, 'e': const.m_e}
p = energy2momentum(species, kinetic_energy)
f = intensity / (p**2) * mass_dict[species]**3
return f.to(u.s**3/(u.sr*u.cm**6))
[docs]
def jupyterhub_data_path(path_org, path_hub='/home/jovyan/data'):
"""
Checks if Notebook is run within SOLER/SERPENTINE JupyterHub. Returns the path of a common data folder if yes, and the original path if not.
"""
if (psutil.Process().parent().name() == 'jupyterhub-sing') and all(x in psutil.Process().parent().environ()['PYTHONPATH'] for x in ['/home/jovyan/serpentine/', '/home/jovyan/soler/sep_tools/']):
print(f"JupyterHub detected. Adjusting data path to {path_hub}")
return path_hub
else:
return path_org
[docs]
def remove_duplicate_cdf_files(path=None):
"""
Removes duplicate .cdf files in the provided directory, keeping only the one with the highest version number.
Parameters
----------
path : string, optional
Directory in which the .cdf files are. If None, current working directory will be used. By default None.
Returns
-------
deleted_files : list
List of deleted duplicate .cdf files.
Examples
--------
>>> from seppy.util import remove_duplicate_cdf_files
>>> deleted_files = remove_duplicate_cdf_files('/Users/johndoe/data/psp')
Removing duplicate .cdf files in /Users/johndoe/data/psp/
>>> print(deleted_files)
[]
"""
if not path:
path = os.getcwd()
if path[-1] is not os.sep:
path += os.sep
all_cdf = glob.glob(f'{path}*.cdf')
print(f'Removing duplicate .cdf files in {path}')
deleted_files = []
for cdf in tqdm(all_cdf):
cdf_wo_v = cdf.strip(cdf.split('_')[-1])
cdf_duplicates = glob.glob(f'{cdf_wo_v}*')
if len(cdf_duplicates) > 1:
cdf_duplicates.sort(reverse=True)
for dup in cdf_duplicates[1:]:
deleted_files.append(dup)
os.remove(dup)
return deleted_files