Source code for seppy.util


import datetime
import glob
import os
import warnings

import astropy.constants as const
import astropy.units as u
import numpy as np
import pandas as pd
import psutil
import sunpy.sun.constants as sconst
from astropy.utils.data import get_pkg_data_filename
from sunpy.coordinates import get_horizons_coord
from tqdm.auto import tqdm
from typing import Callable

# Utilities toolbox, contains helpful functions


def custom_formatwarning(message, *args, **kwargs):
    """
    :meta private:
    """
    # ignore everything except the message
    RED = '\033[91m'
    ENDC = '\033[0m'
    BOLD = "\033[1m"
    return BOLD+RED+'WARNING: '+ENDC+ str(message) + '\n'


def custom_formatnotification(message, *args, **kwargs):
    """
    :meta private:
    """
    # ignore everything except the message
    YELLOW = '\033[93m'
    ENDC = '\033[0m'
    BOLD = "\033[1m"
    _yllw_str = '\x1b[38;5;226m'
    return BOLD+YELLOW+'NOTE: '+ENDC+ str(message) + '\n'


def custom_warning(message):
    """
    :meta private:
    """
    formatwarning_orig = warnings.formatwarning
    warnings.formatwarning = custom_formatwarning
    warnings.warn(message)
    warnings.formatwarning = formatwarning_orig
    return


def custom_notification(message):
    """
    :meta private:
    """
    formatwarning_orig = warnings.formatwarning
    warnings.formatwarning = custom_formatnotification
    warnings.warn(message)
    warnings.formatwarning = formatwarning_orig
    return


[docs] def k_parameter(mu: float, sigma: float, sigma_multiplier: int) -> float: """ The standard version of k for the z-standardized intensity CUSUM. Parameters: ----------- mu : {float, np.ndarray} The mean of the background. sigma : {float, np.ndarray} The standard deviation of the background. sigma_multiplier : {int,float} The multiplier for mu_{d} != 0. Returns: -------- k_param : {float, np.ndarray} Type depends on the input type. A valid k_parameter value (k >= 0). """ if sigma_multiplier == 0: raise ValueError("sigma_multiplier may not be 0!") # Let's not divide by zero. # Only do this check if mu and sigma are singular values, numpy will take # care of the cases with arrays. if not isinstance(mu, (list, np.ndarray)): if mu==0 or sigma==0: return 0 nominator = sigma_multiplier denominator = np.log(1 + (sigma_multiplier*sigma)/mu) k_param = (nominator/denominator) - (mu/sigma) if not isinstance(k_param, (int, float, np.int64, np.float64, np.longdouble)): return k_param return k_param if k_param >= 0 else 0
[docs] def k_legacy(mu: float, sigma: float, sigma_multiplier: float) -> float: """ The old standard k-parameter for SEPpy. """ if sigma_multiplier == 0: raise ValueError("sigma_multiplier may not be 0!") # Let's not divide by zero if not isinstance(mu, (list, np.ndarray)): if mu==0: return 0 nominator = sigma_multiplier denominator = np.log(1 + (sigma_multiplier*sigma)/mu) # In legacy SEPpy, k is rounded to the nearest integer return np.round(nominator/denominator)
# def sqrt_sum_squares(series): # """ # Custom aggregation function for uncertainties, which calculates the # sqrt of the sum of squares divided by number of samples in the bin # Parameters # ---------- # series : pd.Series # The series to aggregate # Returns # ------- # float # Sqrt of sum of squares divided by number of samples # """ # if isinstance(series, pd.DataFrame): # custom_warning("The sqrt_sum_squares function is not tested for DataFrames yet! Proceed with caution and report any issues you find!") # return np.sqrt(np.nansum(series**2)) / series.count()
[docs] def propagated_mean_uncertainty(data: pd.Series | pd.DataFrame) -> float | pd.Series: """ Aggregation function for uncertainties, which calculates the square root of the sum of squares divided by the number of samples in the bin. This differs from the standard Root Mean Square (RMS), where the division by n occurs *inside* the square root: sqrt(sum(x**2) / n). Here, the division occurs *outside*: sqrt(sum(x**2)) / n. This form is appropriate for propagating uncertainties when averaging n measurements. Works for both pd.Series and pd.DataFrame. In the case of a DataFrame, the calculation is performed per column. Parameters ---------- data : pd.Series or pd.DataFrame The data to aggregate Returns ------- float or pd.Series Sqrt of sum of squares divided by number of samples. Returns a single float for pd.Series input, or a pd.Series (one value per column) for pd.DataFrame input. Returns np.nan if the input is empty, all-NaN, or (for DataFrames) for any column that is empty or all-NaN. Raises ------ TypeError If data is not a pd.Series or pd.DataFrame. """ if isinstance(data, pd.Series): count = data.count() # Return np.nan early if count is zero (empty or all-NaN) to avoid division by zero if count == 0: return np.nan return np.sqrt(np.nansum(data**2)) / count elif isinstance(data, pd.DataFrame): count = data.count() # per-column Series # Replace zero counts with np.nan before dividing to avoid division by zero if (count == 0).any(): count = count.where(count > 0, other=np.nan) return np.sqrt(np.nansum(data**2, axis=0)) / count else: raise TypeError(f"Expected pd.Series or pd.DataFrame, got {type(data).__name__}")
[docs] def reduce_list_generic(original_list, placeholder="xx", seperator="_"): """ Reduces a list by replacing numeric sequences with placeholders. Args: original_list: The list of strings to reduce placeholder: The string to replace numeric parts (default: "xx") seperator: The character used to split the strings (default: "_") Returns: A sorted list of unique patterns """ patterns = set() for item in original_list: # Split the string by common delimiters parts = item.split(seperator) # Create a new pattern by replacing numeric parts new_parts = [] for part in parts: # Check if this part is purely numeric if part.isdigit(): new_parts.append(placeholder) # Check if there's a numeric suffix in the part elif any(c.isdigit() for c in part): # Find where the numbers start for i, char in enumerate(part): if char.isdigit(): # Replace numeric portion with placeholder new_parts.append(part[:i] + placeholder) break else: new_parts.append(part) # Join back the modified parts pattern = '_'.join(new_parts) patterns.add(pattern) return sorted(list(patterns))
[docs] def resample_df( df: pd.DataFrame | pd.Series, resample: str, pos_timestamp: str = "center", origin: str = "start", cols_unc: list[str] | str = 'auto', keywords_unc: list[str] = ['unc', 'err', 'sigma'], verbose: bool = True ) -> pd.DataFrame | pd.Series: """ Resamples a Pandas Dataframe or Series to a new frequency. Note that this is just a simple wrapper around the pandas resample function that is calculating the mean of the data in the new time bins. This is not necessarily the correct way to resample data, depending on the data type (for example for errors)! Parameters ---------- df : pd.DataFrame or pd.Series The dataframe or series to resample resample : str pandas-compatible time string, e.g., '1min', '2H' or '25s' pos_timestamp : str, default 'center' Controls if the timestamp is at the center of the time bin, or at the start of it origin : str, default 'start' Controls if the origin of resampling is at the first entry of the input dataframe/series ('start'), or at the start of the day ('start_day') cols_unc : list, default 'auto' List of specific column names in the dataframe (or name of the series) that contain uncertainties. These columns will be resampled using a custom function (sqrt of the sum of squares divided by number of samples in the bin) instead of just the arithmetic mean. If an empty list is provided (i.e. []), all columns will be resampled using the arithmetic mean. If set to 'auto' (default), the function will try to automatically detect columns with uncertainties based on their names (looking for the keywords provided in keywords_unc, by default 'unc', 'err' or 'sigma'). Note that this automatic detection only works for single-level column DataFrames and Series. keywords_unc : list, default ['unc', 'err', 'sigma'] List of keywords to use for automatic detection of uncertainty. All columns with these keywords in their name will be treated as uncertainty columns when cols_unc is set to 'auto'. verbose : bool, default True If True, will print additional debug information, e.g., about automatically detected uncertainty columns. Deactivate at own risk and only if you know what you are doing! Returns ------- df : pd.DataFrame or Series, depending on the input """ if not resample: raise ValueError("Resample period is set to 'None'. No resampling will be applied.") # check if resample option makes sense (e.g., new frequency is smaller than original frequency) delta_resample = pd.to_timedelta(resample) # for original data, get each time difference and find the most common one (mode) dt_original = df.index.to_series().diff() delta_original = dt_original.mode().iloc[0].round('s') # round to full seconds to avoid weirdness # if delta_resample < delta_original: raise ValueError(f"Your resample option of '{resample}' is smaller than the original data cadence of '{delta_original}'. This is not supported!") elif delta_resample == delta_original: custom_warning(f"\nYour resample option of '{resample}' is equal to the original data cadence of '{delta_original}'. You should only average like this if you know EXACTLY what you are doing, as it could offset the position of the timestamps!\n") # automatically detect columns with uncertainties if not provided if type(cols_unc) is str and cols_unc == 'auto': if isinstance(df, pd.DataFrame): if type(df.columns) is not pd.MultiIndex: # cols_unc = [col for col in df.columns if 'uncertainty' in col.lower() or 'err' in col.lower() or 'sigma' in col.lower()] cols_unc = [col for col in df.columns if any(keyword.lower() in col.lower() for keyword in keywords_unc)] elif type(df.columns) is pd.MultiIndex: cols_unc = [] custom_warning("\nResampling of MultiIndex DataFrames with uncertainty columns not implemented yet! Proceeding without uncertainty handling.\n") elif isinstance(df, pd.Series): if isinstance(df.name, str) and any(keyword.lower() in df.name.lower() for keyword in keywords_unc): cols_unc = [df.name] else: cols_unc = [] if len(cols_unc) > 0 and cols_unc != 'auto': if verbose: custom_notification(f"Automatically detected columns with uncertainties: {reduce_list_generic(cols_unc)}. Please report this behaviour if you think it is wrong!") else: custom_warning("No columns with uncertainties automatically detected! You might need to provide them manually via the 'cols_unc' parameter. Please report this behaviour if you think it should work automatically.") # resampling try: # Handle DataFrame input if isinstance(df, pd.DataFrame): # save column order df_columns = df.columns # Identify non-object columns (this includes numeric columns but also datetime columns, etc.) to avoid resampling of object columns (such as strings), which would raise errors. non_object_cols = df.select_dtypes(exclude='object').columns agg_dict: dict[str, Callable[..., float | pd.Series] | str] = {} for col in cols_unc: if col in non_object_cols: agg_dict[col] = propagated_mean_uncertainty for col in df.columns.difference(cols_unc): if col in non_object_cols: agg_dict[col] = 'mean' df = df.resample(resample, origin=origin, label="left").agg(agg_dict) # restore column order (only for columns that survived resampling) df = df[[col for col in df_columns if col in df.columns]] # Check for non-numeric columns that were dropped during resampling and print a warning if verbose is True dropped_cols = [col for col in df_columns if col not in non_object_cols] if dropped_cols: if verbose: custom_warning(f"The following non-numeric columns were dropped during resampling: {dropped_cols}") else: custom_warning("Some non-numeric columns were dropped during resampling.") # Handle Series input elif isinstance(df, pd.Series): if isinstance(df.name, str) and df.name in cols_unc: df = df.resample(resample, origin=origin, label="left").agg(propagated_mean_uncertainty) else: df = df.resample(resample, origin=origin, label="left").mean() # This is actually the original functionality before adding cols_unc # Adjust timestamp position if pos_timestamp == 'start': df.index = df.index else: df.index = df.index + pd.tseries.frequencies.to_offset(pd.Timedelta(resample)/2) # if pos_timestamp == 'stop' or pos_timestamp == 'end': # df.index = df.index + pd.tseries.frequencies.to_offset(pd.Timedelta(resample)) except ValueError: raise ValueError(f"Your resample option of '{resample} doesn't seem to be a proper Pandas frequency!") return df
def _flux2series(flux, dates, cadence=None): """ Converts an array of observed particle flux + timestamps into a pandas series with the desired cadence. Parameters ---------- flux: an array of observed particle fluxes dates: an array of corresponding dates/times cadence: str - desired spacing between the series elements e.g. '1s' or '5min' Returns ------- flux_series: Pandas Series object indexed by the resampled cadence """ # from pandas.tseries.frequencies import to_offset # set up the series object flux_series = pd.Series(flux, index=dates) # if no cadence given, then just return the series with the original # time resolution if cadence is not None: flux_series = resample_df(df=flux_series, resample=cadence, pos_timestamp="center", origin="start", cols_unc=[]) # TODO: this ignores all uncertainty columns, but they are not used here anyway return flux_series def bepicolombo_sixs_stack(path, date, side, pos_timestamp='center'): """ :meta private: """ # side is the index of the file here try: try: filename = f"{path}/sixs_phys_data_{date}_side{side}.csv" df = pd.read_csv(filename) except FileNotFoundError: # try alternative file name format filename = f"{path}/{date.strftime('%Y%m%d')}_side{side}.csv" df = pd.read_csv(filename) times = pd.to_datetime(df['TimeUTC']) # list comprehension because the method can't be applied onto the array "times" times = [t.tz_convert(None) for t in times] df.index = np.array(times) df = df.drop(columns=['TimeUTC']) if pos_timestamp == 'start': cadence1 = (df.index[1]-df.index[0]).seconds cadence2 = (df.index[-1]-df.index[-2]).seconds if cadence1 != cadence2: custom_warning("Bepi/SIXS cadence is changing throughout the day; something is wrong!") df.index = df.index-pd.Timedelta(f'{cadence1/2}s') elif pos_timestamp == 'center': pass except FileNotFoundError: print(f'Unable to open {filename}') df = pd.DataFrame() filename = '' return df, filename def bepi_sixs_load(startdate, enddate, side, path, pos_timestamp='center'): """ :meta private: """ dates = pd.date_range(startdate, enddate) # read files into Pandas dataframes: df, file = bepicolombo_sixs_stack(path, startdate, side=side, pos_timestamp=pos_timestamp) if len(dates) > 1: for date in dates[1:]: t_df, file = bepicolombo_sixs_stack(path, date.date(), side=side, pos_timestamp=pos_timestamp) df = pd.concat([df, t_df]) channels_dict = {"Energy_Bin_str": {'E1': '71 keV', 'E2': '106 keV', 'E3': '169 keV', 'E4': '280 keV', 'E5': '960 keV', 'E6': '2240 keV', 'E7': '8170 keV', 'P1': '1.1 MeV', 'P2': '1.2 MeV', 'P3': '1.5 MeV', 'P4': '2.3 MeV', 'P5': '4.0 MeV', 'P6': '8.0 MeV', 'P7': '15.0 MeV', 'P8': '25.1 MeV', 'P9': '37.3 MeV'}, "Electron_Bins_Low_Energy": np.array([55, 78, 134, 235, 1000, 1432, 4904]), "Electron_Bins_High_Energy": np.array([92, 143, 214, 331, 1193, 3165, 10000]), "Ion_Bins_Low_Energy": np.array([0.001, 1.088, 1.407, 2.139, 3.647, 7.533, 13.211, 22.606, 29.246]), "Ion_Bins_High_Energy": np.array([1.254, 1.311, 1.608, 2.388, 4.241, 8.534, 15.515, 28.413, 40.0])} return df, channels_dict def calc_av_en_flux_sixs(df, channel, species): """ This function averages the flux of two energy channels of BepiColombo/SIXS-P into a combined energy channel channel numbers counted from 1 Parameters ---------- df : pd.DataFrame DataFrame containing HET data channel : int or list energy channel or list with first and last channel to be used species : string 'e', 'electrons', 'p', 'protons' Returns ------- flux: pd.DataFrame channel-averaged flux en_channel_string: str string containing the energy information of combined channel :meta private: """ # try to extract side info from dataframe column names if len(df.filter(like='Side').keys()) > 0: if len(df.filter(like='Side').keys()) > 10: raise ValueError('Multiple BepiColombo/SIXS-P sides found in dataframe, unable to determine which side to use for channel combination!') else: side = df.filter(like='Side').keys()[0][4] elif len(df.filter(like='Side').keys()) == 0: side = None if len(channel) != 2: raise ValueError('Channel parameter has to be a list of two channel numbers, e.g. [5, 6] for combining channels 5 and 6!') if (species[0].lower() == 'e' and channel == [5, 6]) or (species[0].lower() == 'p' and channel == [8, 9]): pass else: raise NotImplementedError(f'BepiColombo/SIXS-P channel combination {channel} for {species} not implemented yet.') # define constant geometric factors if not side: custom_warning('BepiColombo/SIXS-P side could not be determined from dataframe column names (internal level 2 data?). Using old, side-independent geometric factors, which most probably are not accurate anymore!') GEOMFACTOR_PROT8 = 5.97E-01 GEOMFACTOR_PROT9 = 4.09E+00 GEOMFACTOR_ELEC5 = 1.99E-02 GEOMFACTOR_ELEC6 = 1.33E-01 GEOMFACTOR_PROT_COMB89 = 3.34 GEOMFACTOR_ELEC_COMB56 = 0.0972 ENERGY_PROT_COMB89 = '37 MeV' ENERGY_ELEC_COMB56 = '1.4 MeV' if (species[0].lower() == 'p' and channel == [8, 9]): gf_comb = GEOMFACTOR_PROT_COMB89 E_comb = ENERGY_PROT_COMB89 gf_lower_chan = GEOMFACTOR_PROT8 gf_upper_chan = GEOMFACTOR_PROT9 elif (species[0].lower() == 'e' and channel == [5, 6]): gf_comb = GEOMFACTOR_ELEC_COMB56 E_comb = ENERGY_ELEC_COMB56 gf_lower_chan = GEOMFACTOR_ELEC5 gf_upper_chan = GEOMFACTOR_ELEC6 column_name = '' elif side in ['0', '1', '2', '3']: # load in geometric factor and mean energy of combined channels filepath = get_pkg_data_filename(f'data/bepi_sixsp_instrumental_constants/sixsp_side{side}_{species[0].lower()}_combined_gf.csv', package='seppy') tdf = pd.read_csv(filepath, index_col=0).T gf_comb = tdf['GF'][f'{species[0].upper()}{channel[0]}+{species[0].upper()}{channel[1]}'] E_comb = f"{tdf['E'][f'{species[0].upper()}{channel[0]}+{species[0].upper()}{channel[1]}']} MeV" # load in geometric factors of individual channels filepath = get_pkg_data_filename(f'data/bepi_sixsp_instrumental_constants/sixsp_side{side}_{species[0].lower()}_gf_en.csv', package='seppy') tdf = pd.read_csv(filepath, index_col=0).T gf_lower_chan = tdf['GF'][f'{species[0].upper()}{channel[0]}'] gf_upper_chan = tdf['GF'][f'{species[0].upper()}{channel[1]}'] column_name = f'Side{side}_' elif side in ['4']: raise NotImplementedError(f'BepiColombo/SIXS-P channel combination for Side{side} not implemented yet.') if (species[0].lower() == 'e' and channel == [5, 6]) or (species[0].lower() == 'p' and channel == [8, 9]): countrate = df[f'{column_name}{species[0].upper()}{channel[0]}'] * gf_lower_chan + df[f'{column_name}{species[0].upper()}{channel[1]}'] * gf_upper_chan flux = countrate / gf_comb en_channel_string = E_comb else: raise NotImplementedError(f'BepiColombo/SIXS-P channel combination {channel} for {species} not implemented yet.') return flux, en_channel_string """ inf_inj_time.py """ SOLAR_ROT = sconst.get('sidereal rotation rate').to(u.rad/u.s)
[docs] def get_sun_coords(time='now'): ''' Gets the astropy Sun coordinates. Args: time (datetime.datetime): time at which coordinates are fetched. Returns: sun coordinates. ''' return get_horizons_coord("Sun", time=time)
[docs] def radial_distance_to_sun(spacecraft, time='now'): ''' Gets the 3D radial distance of a spacecraft to the Sun. 3D here means that it's the real spatial distance and not a projection on, say, the solar equatorial plane. Args: spacecraft (str): spacecraft to look for. time (datetime.datetime): time at which to look for. Returns: astropy units: radial distance. ''' sc_coords = get_horizons_coord(spacecraft, time) return sc_coords.separation_3d(get_sun_coords(time=time))
[docs] def calc_spiral_length(radial_dist, sw_speed): ''' Calculates the Parker spiral length from the Sun up to a given radial distance. Args: radial_dist (astropy units): radial distance to the Sun. sw_speed (astropy units): solar wind speed. Returns: astropy units: Parker spiral length. ''' temp_const = ((SOLAR_ROT/sw_speed)*(radial_dist.to(u.km)-const.R_sun)).value sqrt_temp_const = np.sqrt(temp_const**2 + 1) return 0.5*u.rad * (sw_speed/SOLAR_ROT) * (temp_const*sqrt_temp_const + np.log(temp_const + sqrt_temp_const))
[docs] def calc_particle_speed(mass, kinetic_energy): ''' Calculates the relativistic particle speed. Args: mass (astropy units): mass of the particle. kinetic_energy (astropy units): kinetic energy of the particle. Returns: astropy units: relativistic particle speed. ''' gamma = np.sqrt(1 - (mass*const.c**2/(kinetic_energy + mass*const.c**2))**2) return gamma*const.c
[docs] def inf_inj_time(spacecraft, onset_time, species, kinetic_energy, sw_speed): ''' Calculates the inferred injection time of a particle (electron or proton) from the Sun, given a detection time at some spacecraft. Args: spacecraft (str): name of the spacecraft. onset_time (datetime.datetime): time of onset/detection. species (str): particle species, 'p' or 'e'. kinetic_energy (astropy units): kinetic energy of particle. If no unit is supplied, is converted to MeV. sw_speed (astropy units): solar wind speed. If no unit is supplied, is converted to km/s. Returns: datetime.datetime: inferred injection time. ''' if type(kinetic_energy) is not u.quantity.Quantity: kinetic_energy = kinetic_energy * u.MeV if type(sw_speed) is not u.quantity.Quantity: sw_speed = sw_speed * u.km/u.s mass_dict = {'p': const.m_p, 'e': const.m_e } radial_distance = radial_distance_to_sun(spacecraft, time=onset_time) spiral_length = calc_spiral_length(radial_distance, sw_speed) particle_speed = calc_particle_speed(mass_dict[species], kinetic_energy) travel_time = spiral_length/particle_speed travel_time = travel_time.to(u.s) return onset_time - datetime.timedelta(seconds=travel_time.value), spiral_length.to(u.AU)
[docs] def energy2speed(species, kinetic_energy): ''' Calculates the relativistic particle speed derived from kinetic energy. Args: species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass kinetic_energy (astropy units/float): kinetic energy of the particle using astropy units; if only float, MeV is used Returns: astropy units: relativistic particle speed. ''' if type(kinetic_energy) is not u.quantity.Quantity: kinetic_energy = kinetic_energy * u.MeV mass_dict = {'p': const.m_p, 'e': const.m_e} return calc_particle_speed(mass_dict[species], kinetic_energy)
[docs] def speed2energy(species, speed): ''' Calculates the relativistic kinetic energy derived from particle speed. Args: species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass speed (astropy units/float): relativistic particle speed of the particle using astropy units; if only float, m/s is used Returns: astropy units: kinetic energy ''' if type(speed) is not u.quantity.Quantity: speed = speed * u.m/u.s mass_dict = {'p': const.m_p, 'e': const.m_e} gamma = 1/np.sqrt(1-speed**2/const.c**2) K = (gamma-1)*mass_dict[species]*const.c**2 return K.to(u.MeV)
[docs] def speed2momentum(species, speed): ''' Calculates the relativistic particle momentum derived from speed. Args: species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass speed (float/astropy units): kinetic energy of the particle using astropy units; if only float m/s is used Returns: astropy units: relativistic particle momentum ''' if type(speed) is not u.quantity.Quantity: speed = speed * u.m/u.s mass_dict = {'p': const.m_p, 'e': const.m_e} return mass_dict[species] * speed
[docs] def energy2momentum(species, kinetic_energy): ''' Calculates the relativistic particle momentum derived from kinetic energy. Args: species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass kinetic_energy (float/astropy units): kinetic energy of the particle using astropy units; if only float MeV is used Returns: astropy units: relativistic particle momentum ''' if type(kinetic_energy) is not u.quantity.Quantity: kinetic_energy = kinetic_energy * u.MeV # mass_dict = {'p': const.m_p, 'e': const.m_e} return speed2momentum(species, energy2speed(species, kinetic_energy))
[docs] def intensity2psd(species, kinetic_energy, intensity): ''' Calculates the phase space density (distribution function) derived from kinetic energy and intensity. Args: species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass kinetic_energy (astropy units): kinetic energy of the particle using astropy units intensity (astropy units): differential intensity using astropy units Returns: astropy units: phase space density Example: import astropy.units as u f = intensity2psd('e', 48*u.keV, 1e4/(u.cm**2 * u.sr * u.s * u.MeV)) ''' if type(kinetic_energy) is not u.quantity.Quantity or type(intensity) is not u.quantity.Quantity: print("All physical inputs have to be defined in astropy units! Run 'help(intensity2psd)' for an example.") return else: p = energy2momentum(species, kinetic_energy) f = intensity / (p**2) return f.to(u.s**3/(u.sr*u.kg**3*u.cm**6))
[docs] def intensity2vsd(species, kinetic_energy, intensity): ''' Calculates the velocity space density (distribution function) derived from kinetic energy and intensity. Args: species (string): particle species (electrons 'e' or protons 'p') used to determine the particle mass kinetic_energy (astropy units): kinetic energy of the particle using astropy units intensity (astropy units): differential intensity using astropy units Returns: astropy units: phase space density Example: import astropy.units as u f = intensity2vsd('e', 48*u.keV, 1e4/(u.cm**2 * u.sr * u.s * u.MeV)) ''' if type(kinetic_energy) is not u.quantity.Quantity or type(intensity) is not u.quantity.Quantity: print("All physical inputs have to be defined in astropy units! Run 'help(intensity2psd)' for an example.") return else: mass_dict = {'p': const.m_p, 'e': const.m_e} p = energy2momentum(species, kinetic_energy) f = intensity / (p**2) * mass_dict[species]**3 return f.to(u.s**3/(u.sr*u.cm**6))
[docs] def jupyterhub_data_path(path_org, path_hub='/home/jovyan/data'): """ Checks if Notebook is run within SOLER/SERPENTINE JupyterHub. Returns the path of a common data folder if yes, and the original path if not. """ if (psutil.Process().parent().name() == 'jupyterhub-sing') and all(x in psutil.Process().parent().environ()['PYTHONPATH'] for x in ['/home/jovyan/serpentine/', '/home/jovyan/soler/sep_tools/']): print(f"JupyterHub detected. Adjusting data path to {path_hub}") return path_hub else: return path_org
[docs] def remove_duplicate_cdf_files(path=None): """ Removes duplicate .cdf files in the provided directory, keeping only the one with the highest version number. Parameters ---------- path : string, optional Directory in which the .cdf files are. If None, current working directory will be used. By default None. Returns ------- deleted_files : list List of deleted duplicate .cdf files. Examples -------- >>> from seppy.util import remove_duplicate_cdf_files >>> deleted_files = remove_duplicate_cdf_files('/Users/johndoe/data/psp') Removing duplicate .cdf files in /Users/johndoe/data/psp/ >>> print(deleted_files) [] """ if not path: path = os.getcwd() if path[-1] is not os.sep: path += os.sep all_cdf = glob.glob(f'{path}*.cdf') print(f'Removing duplicate .cdf files in {path}') deleted_files = [] for cdf in tqdm(all_cdf): cdf_wo_v = cdf.strip(cdf.split('_')[-1]) cdf_duplicates = glob.glob(f'{cdf_wo_v}*') if len(cdf_duplicates) > 1: cdf_duplicates.sort(reverse=True) for dup in cdf_duplicates[1:]: deleted_files.append(dup) os.remove(dup) return deleted_files