import os
import datetime
import sunpy
import warnings
import matplotlib.pyplot as plt
import matplotlib.colors as cl
import numpy as np
import pandas as pd
import astropy.units as u
import astropy.constants as const
from sunpy.coordinates import get_horizons_coord
from matplotlib import rcParams
from matplotlib.dates import DateFormatter
from matplotlib.ticker import ScalarFormatter
from matplotlib.offsetbox import AnchoredText
from seppy.loader.bepi import bepi_sixsp_l3_loader
from seppy.loader.psp import calc_av_en_flux_PSP_EPIHI, calc_av_en_flux_PSP_EPILO, psp_isois_load
from seppy.loader.soho import calc_av_en_flux_ERNE, soho_load
from seppy.loader.stereo import calc_av_en_flux_SEPT, calc_av_en_flux_ST_HET, stereo_load
from seppy.loader.wind import wind3dp_load
from seppy.util import bepi_sixs_load, calc_av_en_flux_sixs, custom_notification, custom_warning, _flux2series, resample_df, k_parameter, k_legacy
from solo_epd_loader import combine_channels as solo_epd_combine_channels
from solo_epd_loader import epd_load
# This is to get rid of this specific warning:
# /home/user/xyz/serpentine/notebooks/sep_analysis_tools/read_swaves.py:96: UserWarning: The input coordinates to pcolormesh are interpreted as
# cell centers, but are not monotonically increasing or decreasing. This may lead to incorrectly calculated cell edges, in which
# case, please supply explicit cell edges to pcolormesh.
# colormesh = ax.pcolormesh( time_arr, freq[::-1], data_arr[::-1], vmin = 0, vmax = 0.5*np.max(data_arr), cmap = 'inferno' )
warnings.filterwarnings(action="ignore",
message="The input coordinates to pcolormesh are interpreted as cell centers, but are not monotonically increasing or \
decreasing. This may lead to incorrectly calculated cell edges, in which case, please supply explicit cell edges to pcolormesh.",
category=UserWarning)
# omit some warnings
warnings.simplefilter(action='ignore', category=pd.errors.PerformanceWarning)
warnings.filterwarnings(action='ignore', message='All-NaN slice encountered', category=RuntimeWarning)
warnings.filterwarnings(action='ignore', message='invalid value encountered in divide', category=RuntimeWarning)
warnings.filterwarnings(action='ignore', message='No units provided for variable', category=sunpy.util.SunpyUserWarning, module='sunpy.io._cdf')
warnings.filterwarnings(action='ignore', message='astropy did not recognize units of', category=sunpy.util.SunpyUserWarning, module='sunpy.io._cdf')
warnings.filterwarnings(action='ignore', message='The variable', category=sunpy.util.SunpyUserWarning, module='sunpy.io._cdf')
STEREO_SEPT_VIEWINGS = ("sun", "asun", "north", "south")
WIND_3DP_VIEWINGS = ("omnidirectional", '0', '1', '2', '3', '4', '5', '6', '7')
SOLO_EPT_VIEWINGS = ("sun", "asun", "north", "south")
SOLO_HET_VIEWINGS = ("sun", "asun", "north", "south")
SOLO_STEP_VIEWINGS = ("Pixel averaged", "Pixel 1", "Pixel 2", "Pixel 3", "Pixel 4", "Pixel 5", "Pixel 6",
"Pixel 7", "Pixel 8", "Pixel 9", "Pixel 10", "Pixel 11", "Pixel 12", "Pixel 13",
"Pixel 14", "Pixel 15")
PSP_EPILO_VIEWINGS = ('3', '7')
PSP_EPIHI_VIEWINGS = ('A', 'B')
[docs]
class Event:
def __init__(self, start_date, end_date, spacecraft, sensor, species,
data_level, data_path, viewing=None, radio_spacecraft=None,
threshold=None):
"""
Initialize the Event object, for which the analysis functions can be
run. This will download and read-in the necessary data.
Parameters
----------
start_date : datetime or str
Start date for data loading
end_date : datetime or str
End date for data loading
spacecraft : str
Selected spacecraft; supported are 'BepiColombo', 'PSP', 'SOHO',
'Solar Orbiter', 'STEREO-A', 'STEREO-B', 'Wind'
sensor : str
Selected instrument. Supported options depend on spacecraft
species : str
Selected species. Depending on instrument, this could be
'electrons', 'protons', or 'ions'
data_level : str
Selected data level. Usually 'l2', sometime 'l3'.
data_path : str
Full local path where the downloaded data should be stored.
viewing : str, optional
Selected viewing direction; possible options depend on instrument.
By default None
radio_spacecraft : tuple of str, optional
Spacecraft from which radio spectrogram should be added. Supported
are ('ahead', 'STEREO-A') and ('behind', 'STEREO-B'). By default
None
threshold : int or float, optional
Only applies to Wind/3DP. Replace all FLUX values above given
number with np.nan, by default None
"""
if spacecraft == "BepiColombo":
spacecraft = "bepi"
if spacecraft == "Solar Orbiter":
spacecraft = "solo"
if spacecraft == "STEREO-A":
spacecraft = "sta"
if spacecraft == "STEREO-B":
spacecraft = "stb"
if sensor in ["ERNE-HED"]:
sensor = "ERNE"
if species in ("protons", "ions"):
species = 'p'
if species in ("electrons", "electron"):
species = 'e'
self.start_date = start_date
self.end_date = end_date
self.spacecraft = spacecraft.lower()
self.sensor = sensor.lower()
self.species = species.lower()
self.data_level = data_level.lower()
self.data_path = data_path + os.sep
self.threshold = threshold
self.radio_spacecraft = radio_spacecraft # this is a 2-tuple, e.g., ("ahead", "STEREO-A")
# Sets the self.viewing to the given viewing
self.update_viewing(viewing=viewing)
self.radio_files = None
# placeholding class attributes
self.flux_series = None
self.onset_stats = None
self.onset_found = None
self.onset = None
self.peak_flux = None
self.peak_time = None
self.fig = None
self.bg_mean = None
self.output = {"flux_series": self.flux_series,
"onset_stats": self.onset_stats,
"onset_found": self.onset_found,
"onset": self.onset,
"peak_flux": self.peak_flux,
"peak_time": self.peak_time,
"fig": self.fig,
"bg_mean": self.bg_mean
}
if self.data_level == 'l3' and self.spacecraft != 'bepi':
raise Warning("Data level 'l3' is only supported for BepiColombo/SIXS-P data!")
# I think it could be worth considering to run self.choose_data(viewing) when the object is created,
# because now it has to be run inside self.print_energies() to make sure that either
# self.current_df, self.current_df_i or self.current_df_e exists, because print_energies() needs column
# names from the dataframe.
self.load_all_viewing()
# # Check that the data that was loaded is valid. If not, give a warning.
self.validate_data()
# Download radio cdf files ONLY if asked to
if self.radio_spacecraft is not None:
from seppy.tools.swaves import get_swaves
self.radio_files = get_swaves(start_date, end_date)
def validate_data(self):
"""
Provide an error msg if this object is initialized with a combination that yields invalid data products.
:meta private:
"""
# SolO/STEP data before 22 Oct 2021 is not supported yet for non-'Pixel averaged' viewing
warn_mess_step_pixels_old = "SolO/STEP data is not included yet for individual Pixels for dates preceding Oct 22, 2021. Only 'Pixel averaged' is supported."
if self.spacecraft == "solo" and self.sensor == "step":
if self.start_date < pd.to_datetime("2021-10-22").date():
if not self.viewing == 'Pixel averaged':
# when 'viewing' is undefined, only give a warning; if it's wrong defined, abort with warning
if not self.viewing:
# warnings.warn(message=warn_mess_step_pixels_old)
custom_warning(message=warn_mess_step_pixels_old)
else:
raise Warning(warn_mess_step_pixels_old)
# Electron data for SolO/STEP is removed for now (Feb 2024, JG)
if self.spacecraft == "solo" and self.sensor == "step" and self.species.lower()[0] == 'e':
raise Warning("SolO/STEP electron data is not implemented yet!")
def update_onset_attributes(self, flux_series, onset_stats, onset_found, peak_flux, peak_time, fig, bg_mean):
"""
Method to update onset-related attributes, that are None by default and only have values after find_onset() has been run.
:meta private:
"""
self.flux_series = flux_series
self.onset_stats = onset_stats
self.onset_found = onset_found
self.onset = onset_stats[-1]
self.peak_flux = peak_flux
self.peak_time = peak_time
self.fig = fig
self.bg_mean = bg_mean
# also remember to update the dictionary, it won't update automatically
self.output = {"flux_series": self.flux_series,
"onset_stats": self.onset_stats,
"onset_found": self.onset_found,
"onset": self.onset,
"peak_flux": self.peak_flux,
"peak_time": self.peak_time,
"fig": self.fig,
"bg_mean": self.bg_mean
}
def update_viewing(self, viewing):
"""
:meta private:
"""
invalid_viewing_msg = f"{viewing} is an invalid viewing direction for {self.spacecraft}/{self.sensor}!"
if self.spacecraft != "wind":
# Validate viewing here. It may be nonsensical and that affects choose_data() and print_energies().
if self.spacecraft in ("sta", "stb"):
if self.sensor == "sept" and viewing not in STEREO_SEPT_VIEWINGS:
raise ValueError(invalid_viewing_msg)
if self.sensor == "het" and viewing is not None:
raise ValueError(invalid_viewing_msg)
if self.spacecraft == "solo":
if self.sensor == "step" and viewing not in SOLO_STEP_VIEWINGS:
raise ValueError(invalid_viewing_msg)
if self.sensor == "ept" and viewing not in SOLO_EPT_VIEWINGS:
raise ValueError(invalid_viewing_msg)
if self.sensor == "het" and viewing not in SOLO_HET_VIEWINGS:
raise ValueError(invalid_viewing_msg)
if self.spacecraft == "psp":
if self.sensor == "isois-epilo" and viewing not in PSP_EPILO_VIEWINGS:
raise ValueError(invalid_viewing_msg)
if self.sensor == "isois-epihi" and viewing not in PSP_EPIHI_VIEWINGS:
raise ValueError(invalid_viewing_msg)
if self.spacecraft == "soho":
if viewing is not None:
raise ValueError(invalid_viewing_msg)
# Finally set validated viewing
self.viewing = viewing
else:
# Wind/3DP viewing directions are omnidirectional, section 0, section 1... section 7.
# This catches the number or the word if omnidirectional
try:
sector_direction = viewing.split(" ")[-1]
# AttributeError is caused by calling None.split()
except AttributeError:
raise ValueError(invalid_viewing_msg)
if sector_direction not in WIND_3DP_VIEWINGS:
raise ValueError(invalid_viewing_msg)
self.viewing = sector_direction
def load_data(self, spacecraft, sensor, viewing, data_level,
autodownload=True, threshold=None):
"""
:meta private:
"""
if self.spacecraft == 'solo':
if self.sensor in ("ept", "het"):
df_i, df_e, meta = epd_load(sensor=sensor,
viewing=viewing,
level=data_level,
startdate=self.start_date,
enddate=self.end_date,
path=self.data_path,
autodownload=autodownload)
return df_i, df_e, meta
elif self.sensor == "step":
df, meta = epd_load(sensor=sensor,
viewing="None",
level=data_level,
startdate=self.start_date,
enddate=self.end_date,
path=self.data_path,
autodownload=autodownload)
return df, meta
if self.spacecraft[:2].lower() == 'st':
if self.sensor == 'sept':
if self.species in ["p", "i"]:
df_i, channels_dict_df_i = stereo_load(instrument=self.sensor,
startdate=self.start_date,
enddate=self.end_date,
spacecraft=self.spacecraft,
# sept_species=self.species,
sept_species='p',
sept_viewing=viewing,
resample=None,
pos_timestamp="center",
path=self.data_path)
df_e, channels_dict_df_e = [], []
return df_i, df_e, channels_dict_df_i, channels_dict_df_e
if self.species == "e":
df_e, channels_dict_df_e = stereo_load(instrument=self.sensor,
startdate=self.start_date,
enddate=self.end_date,
spacecraft=self.spacecraft,
# sept_species=self.species,
sept_species='e',
sept_viewing=viewing,
resample=None,
pos_timestamp="center",
path=self.data_path)
df_i, channels_dict_df_i = [], []
return df_i, df_e, channels_dict_df_i, channels_dict_df_e
if self.sensor == 'het':
df, meta = stereo_load(instrument=self.sensor,
startdate=self.start_date,
enddate=self.end_date,
spacecraft=self.spacecraft,
resample=None,
pos_timestamp="center",
path=self.data_path)
return df, meta
if self.spacecraft.lower() == 'soho':
if self.sensor == 'erne':
df, meta = soho_load(dataset="SOHO_ERNE-HED_L2-1MIN",
startdate=self.start_date,
enddate=self.end_date,
path=self.data_path,
resample=None,
pos_timestamp="center")
return df, meta
if self.sensor == 'ephin':
df, meta = soho_load(dataset="SOHO_COSTEP-EPHIN_L2-1MIN",
startdate=self.start_date,
enddate=self.end_date,
path=self.data_path,
resample=None,
pos_timestamp="center")
return df, meta
if self.sensor in ("ephin-5", "ephin-15"):
dataset = "ephin_flux_2020-2022.csv"
if os.path.isfile(f"{self.data_path}{dataset}"):
df = pd.read_csv(f"{self.data_path}{dataset}", index_col="date", parse_dates=True)
# set electron flux to nan if the ratio of proton-proxy counts to correspondning electron counts is >0.1
df['E5'][df['p_proxy (1.80-2.00 MeV)']/df['0.45-0.50 MeV (E5)'] >= 0.1] = np.nan
df['E15'][df['p_proxy (1.80-2.00 MeV)']/df['0.70-1.10 MeV (E15)'] >= 0.1] = np.nan
else:
raise Warning(f"File {dataset} not found at {self.data_path}! Please verify that 'data_path' is correct.")
meta = {"E5": "0.45 - 0.50 MeV",
"E15": "0.70 - 1.10 MeV"}
# TODO:
# - add resample_df here?
# - add pos_timestamp here
return df, meta
if self.spacecraft.lower() == 'wind':
# In Wind's case we have to retrieve the original viewing before updating, because
# otherwise viewing = 'None' will mess up everything down the road
viewing = self.viewing
if self.sensor == '3dp':
df_i, meta_i = wind3dp_load(dataset="WI_SOPD_3DP",
startdate=self.start_date,
enddate=self.end_date,
resample=None,
multi_index=False,
path=self.data_path,
threshold=self.threshold)
df_e, meta_e = wind3dp_load(dataset="WI_SFPD_3DP",
startdate=self.start_date,
enddate=self.end_date,
resample=None,
multi_index=False,
path=self.data_path,
threshold=self.threshold)
df_omni_i, meta_omni_i = wind3dp_load(dataset="WI_SOSP_3DP",
startdate=self.start_date,
enddate=self.end_date,
resample=None,
multi_index=False,
path=self.data_path,
threshold=self.threshold)
df_omni_e, meta_omni_e = wind3dp_load(dataset="WI_SFSP_3DP",
startdate=self.start_date,
enddate=self.end_date,
resample=None,
multi_index=False,
path=self.data_path,
threshold=self.threshold)
return df_omni_i, df_omni_e, df_i, df_e, meta_i, meta_e
if self.spacecraft.lower() == 'psp':
if self.sensor.lower() == 'isois-epihi':
df, meta = psp_isois_load(dataset='PSP_ISOIS-EPIHI_L2-HET-RATES60',
startdate=self.start_date,
enddate=self.end_date,
path=self.data_path,
resample=None)
return df, meta
if self.sensor.lower() == 'isois-epilo':
df, meta = psp_isois_load(dataset='PSP_ISOIS-EPILO_L2-PE',
startdate=self.start_date,
enddate=self.end_date,
path=self.data_path,
resample=None,
epilo_channel='F',
epilo_threshold=self.threshold)
return df, meta
if self.spacecraft.lower() == 'bepi':
if self.data_level.lower() == 'l2':
df, meta = bepi_sixs_load(startdate=self.start_date,
enddate=self.end_date,
side=viewing,
path=self.data_path,
pos_timestamp='center')
if len(df) > 0:
df_i = df[[f"P{i}" for i in range(1, 10)]]
df_e = df[[f"E{i}" for i in range(1, 8)]]
else:
df_i = pd.DataFrame()
df_e = pd.DataFrame()
return df_i, df_e, meta
elif self.data_level.lower() == 'l3':
df, meta = bepi_sixsp_l3_loader(startdate=self.start_date,
enddate=self.end_date,
resample=None,
path=self.data_path,
pos_timestamp='center')
return df, meta
def load_all_viewing(self):
"""
:meta private:
"""
if self.spacecraft == 'solo':
if self.sensor in ['het', 'ept']:
self.df_i_sun, self.df_e_sun, self.energies_sun =\
self.load_data(self.spacecraft, self.sensor,
'sun', self.data_level)
self.df_i_asun, self.df_e_asun, self.energies_asun =\
self.load_data(self.spacecraft, self.sensor,
'asun', self.data_level)
self.df_i_north, self.df_e_north, self.energies_north =\
self.load_data(self.spacecraft, self.sensor,
'north', self.data_level)
self.df_i_south, self.df_e_south, self.energies_south =\
self.load_data(self.spacecraft, self.sensor,
'south', self.data_level)
elif self.sensor == 'step':
self.df_step, self.energies_step =\
self.load_data(self.spacecraft, self.sensor, self.viewing,
self.data_level)
if self.spacecraft[:2].lower() == 'st':
if self.sensor == 'sept':
self.df_i_sun, self.df_e_sun, self.energies_i_sun, self.energies_e_sun =\
self.load_data(self.spacecraft, self.sensor,
'sun', self.data_level)
self.df_i_asun, self.df_e_asun, self.energies_i_asun, self.energies_e_asun =\
self.load_data(self.spacecraft, self.sensor,
'asun', self.data_level)
self.df_i_north, self.df_e_north, self.energies_i_north, self.energies_e_north =\
self.load_data(self.spacecraft, self.sensor,
'north', self.data_level)
self.df_i_south, self.df_e_south, self.energies_i_south, self.energies_e_south =\
self.load_data(self.spacecraft, self.sensor,
'south', self.data_level)
elif self.sensor == 'het':
self.df_het, self.meta_het =\
self.load_data(self.spacecraft, self.sensor, 'None',
self.data_level)
self.current_df_i = self.df_het.filter(like='Proton')
self.current_df_e = self.df_het.filter(like='Electron')
self.current_energies = self.meta_het
if self.spacecraft.lower() == 'soho':
if self.sensor.lower() == 'erne':
self.df, self.meta =\
self.load_data(self.spacecraft, self.sensor, 'None',
self.data_level)
self.current_df_i = self.df.filter(like='PH_')
# self.current_df_e = self.df.filter(like='Electron')
self.current_energies = self.meta
if self.sensor.lower() == 'ephin':
self.df, self.meta =\
self.load_data(self.spacecraft, self.sensor, 'None',
self.data_level)
self.current_df_e = self.df.filter(like='E')
self.current_energies = self.meta
if self.sensor.lower() in ("ephin-5", "ephin-15"):
self.df, self.meta =\
self.load_data(self.spacecraft, self.sensor, 'None',
self.data_level)
self.current_df_e = self.df
self.current_energies = self.meta
if self.spacecraft.lower() == 'wind':
if self.sensor.lower() == '3dp':
self.df_omni_i, self.df_omni_e, self.df_i, self.df_e, self.meta_i, self.meta_e = \
self.load_data(self.spacecraft, self.sensor, 'None', self.data_level, threshold=self.threshold)
# self.df_i = self.df_i.filter(like='FLUX')
# self.df_e = self.df_e.filter(like='FLUX')
self.current_i_energies = self.meta_i
self.current_e_energies = self.meta_e
if self.spacecraft.lower() == 'psp':
if self.sensor.lower() == 'isois-epihi':
# Note: load_data(viewing='all') doesn't really has an effect, but for PSP/ISOIS-EPIHI all viewings are always loaded anyhow.
self.df, self.meta = self.load_data(self.spacecraft, self.sensor, 'all', self.data_level)
self.df_e = self.df.filter(like='Electrons_Rate_')
self.current_e_energies = self.meta
self.df_i = self.df.filter(like='H_Flux_')
self.current_i_energies = self.meta
if self.sensor.lower() == 'isois-epilo':
# Note: load_data(viewing='all') doesn't really has an effect, but for PSP/ISOIS-EPILO all viewings are always loaded anyhow.
self.df, self.meta = self.load_data(self.spacecraft, self.sensor, 'all', self.data_level, threshold=self.threshold)
self.df_e = self.df.filter(like='Electron_CountRate_')
self.current_e_energies = self.meta
# protons not yet included in PSP/ISOIS-EPILO dataset
# self.df_i = self.df.filter(like='H_Flux_')
# self.current_i_energies = self.meta
if self.spacecraft.lower() == 'bepi':
if self.data_level.lower() == 'l2':
custom_notification('BepiColombo L2 data is not yet publicly available! You need to manual provide the files, or access the L3 data instead by using "data_level='+"'l3'"+'" above!')
self.df_i_0, self.df_e_0, self.energies_0 =\
self.load_data(self.spacecraft, self.sensor, viewing='0', data_level=self.data_level)
self.df_i_1, self.df_e_1, self.energies_1 =\
self.load_data(self.spacecraft, self.sensor, viewing='1', data_level=self.data_level)
self.df_i_2, self.df_e_2, self.energies_2 =\
self.load_data(self.spacecraft, self.sensor, viewing='2', data_level=self.data_level)
self.df_i_3, self.df_e_3, self.energies_3 =\
self.load_data(self.spacecraft, self.sensor, viewing='3', data_level=self.data_level)
# side 4 should not be used for SIXS (resp. it's not in the L3 data), but they can be activated by uncommenting the following lines
# self.df_i_4, self.df_e_4, self.energies_4 =\
# self.load_data(self.spacecraft, self.sensor, viewing='4', data_level=self.data_level)
elif self.data_level.lower() == 'l3':
self.df, self.meta =\
self.load_data(self.spacecraft, self.sensor, viewing=self.viewing, data_level=self.data_level)
def get_sixs_l3_channels(side, species):
columns = [k for k in self.df.columns if k.startswith(f'Side{side}_{species.upper()}')]
columns = [c for c in columns if not (c.endswith('plus') or c.endswith('minus') or c.endswith('err') or c.endswith('A') or 'PE' in c)]
return columns
if len(self.df) > 0:
self.df_i_0, self.df_e_0, self.energies_0 =\
self.df[get_sixs_l3_channels(0, 'p')], self.df[get_sixs_l3_channels(0, 'e')], {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side0')]}
self.df_i_1, self.df_e_1, self.energies_1 =\
self.df[get_sixs_l3_channels(1, 'p')], self.df[get_sixs_l3_channels(1, 'e')], {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side1')]}
self.df_i_2, self.df_e_2, self.energies_2 =\
self.df[get_sixs_l3_channels(2, 'p')], self.df[get_sixs_l3_channels(2, 'e')], {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side2')]}
self.df_i_3, self.df_e_3, self.energies_3 =\
self.df[get_sixs_l3_channels(3, 'p')], self.df[get_sixs_l3_channels(3, 'e')], {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side3')]}
# self.df_i_4, self.df_e_4, self.energies_4 =\
# self.df[get_sixs_l3_channels(4, 'p')], self.df[get_sixs_l3_channels(4, 'e')], {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side4')]}
# Old way of getting the channels, before the function get_sixs_l3_channels() was created
# self.df_i_0, self.df_e_0, self.energies_0 =\
# self.df.filter(like='Side0_P'), self.df.filter(like='Side0_E'), {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side0')]}
# self.df_i_1, self.df_e_1, self.energies_1 =\
# self.df.filter(like='Side1_P'), self.df.filter(like='Side1_E'), {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side1')]}
# self.df_i_2, self.df_e_2, self.energies_2 =\
# self.df.filter(like='Side2_P'), self.df.filter(like='Side2_E'), {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side2')]}
# self.df_i_3, self.df_e_3, self.energies_3 =\
# self.df.filter(like='Side3_P'), self.df.filter(like='Side3_E'), {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side3')]}
# # self.df_i_4, self.df_e_4, self.energies_4 =\
# # self.df.filter(like='Side4_P'), self.df.filter(like='Side4_E'), {key: self.meta[key] for key in [k for k in self.meta.keys() if k.startswith('Side4')]}
else:
raise Exception("No BepiColombo/SIXS L3 data found for the given time range!")
def choose_data(self, viewing):
"""
:meta private:
"""
self.update_viewing(viewing)
if self.spacecraft == 'solo':
if not viewing:
raise Exception("For this operation, the instrument's 'viewing' direction must be defined in the call of 'Event'!")
elif viewing == 'sun':
self.current_df_i = self.df_i_sun
self.current_df_e = self.df_e_sun
self.current_energies = self.energies_sun
elif viewing == 'asun':
self.current_df_i = self.df_i_asun
self.current_df_e = self.df_e_asun
self.current_energies = self.energies_asun
elif viewing == 'north':
self.current_df_i = self.df_i_north
self.current_df_e = self.df_e_north
self.current_energies = self.energies_north
elif viewing == 'south':
self.current_df_i = self.df_i_south
self.current_df_e = self.df_e_south
self.current_energies = self.energies_south
elif "Pixel" in viewing:
# All viewings are contained in the same dataframe, choose the pixel (viewing) here
pixel = self.viewing.split(' ')[1]
# Pixel info is in format NN in the dataframe, 1 -> 01 while 12 -> 12
if len(pixel) == 1:
pixel = f"0{pixel}"
# Pixel length more than 2 means "averaged" -> called "Avg" in the dataframe
elif len(pixel) > 2:
pixel = "Avg"
self.current_df_i = self.df_step[[col for col in self.df_step.columns if f"Magnet_{pixel}_Flux" in col]]
self.current_df_e = self.df_step[[col for col in self.df_step.columns if f"Electron_{pixel}_Flux" in col]]
self.current_energies = self.energies_step
if self.spacecraft[:2].lower() == 'st':
if self.sensor == 'sept':
if viewing == 'sun':
self.current_df_i = self.df_i_sun
self.current_df_e = self.df_e_sun
self.current_i_energies = self.energies_i_sun
self.current_e_energies = self.energies_e_sun
elif viewing == 'asun':
self.current_df_i = self.df_i_asun
self.current_df_e = self.df_e_asun
self.current_i_energies = self.energies_i_asun
self.current_e_energies = self.energies_e_asun
elif viewing == 'north':
self.current_df_i = self.df_i_north
self.current_df_e = self.df_e_north
self.current_i_energies = self.energies_i_north
self.current_e_energies = self.energies_e_north
elif viewing == 'south':
self.current_df_i = self.df_i_south
self.current_df_e = self.df_e_south
self.current_i_energies = self.energies_i_south
self.current_e_energies = self.energies_e_south
if self.spacecraft.lower() == 'wind':
if self.sensor.lower() == '3dp':
# The sectored data has a little different column names
if self.viewing == "omnidirectional":
col_list_i = [col for col in self.df_omni_i.columns if "FLUX" in col]
col_list_e = [col for col in self.df_omni_e.columns if "FLUX" in col]
self.current_df_i = self.df_omni_i[col_list_i]
self.current_df_e = self.df_omni_e[col_list_e]
else:
col_list_i = [col for col in self.df_i.columns if col.endswith(str(self.viewing)) and "FLUX" in col]
col_list_e = [col for col in self.df_e.columns if col.endswith(str(self.viewing)) and "FLUX" in col]
self.current_df_i = self.df_i[col_list_i]
self.current_df_e = self.df_e[col_list_e]
if self.spacecraft.lower() == 'psp':
if self.sensor.lower() == 'isois-epihi':
# viewing = 'A' or 'B'
self.current_df_e = self.df_e[self.df_e.columns[self.df_e.columns.str.startswith(viewing.upper())]]
self.current_df_i = self.df_i[self.df_i.columns[self.df_i.columns.str.startswith(viewing.upper())]]
if self.sensor.lower() == 'isois-epilo':
# viewing = '0' to '7'
self.current_df_e = self.df_e[self.df_e.columns[self.df_e.columns.str.endswith(viewing)]]
# Probably just a temporary thing, but cut all channels without a corresponding energy range string in them to avoid problems with
# dynamic spectrum. Magic number 12 is the amount of channels that have a corresponding energy description.
col_list = [col for col in self.current_df_e.columns if int(col.split('_')[3][1:])<12]
self.current_df_e = self.current_df_e[col_list]
# protons not yet included in PSP/ISOIS-EPILO dataset
# self.current_df_i = self.df_i[self.df_i.columns[self.df_i.columns.str.endswith(viewing)]]
if self.spacecraft.lower() == 'bepi':
if viewing == '0':
self.current_df_i = self.df_i_0
self.current_df_e = self.df_e_0
self.current_energies = self.energies_0
elif viewing == '1':
self.current_df_i = self.df_i_1
self.current_df_e = self.df_e_1
self.current_energies = self.energies_1
elif viewing == '2':
self.current_df_i = self.df_i_2
self.current_df_e = self.df_e_2
self.current_energies = self.energies_2
elif viewing == '3':
self.current_df_i = self.df_i_3
self.current_df_e = self.df_e_3
self.current_energies = self.energies_3
# side 4 should not be used for SIXS, but they can be activated by uncommenting the following lines
# elif(viewing == '4'):
# self.current_df_i = self.df_i_4
# self.current_df_e = self.df_e_4
# self.current_energies = self.energies_4
# def calc_av_en_flux_HET(self, df, energies, en_channel):
# """This function averages the flux of several
# energy channels of SolO/HET into a combined energy channel
# channel numbers counted from 0
# Parameters
# ----------
# df : pd.DataFrame DataFrame containing HET data
# DataFrame containing HET data
# energies : dict
# Energy dict returned from epd_loader (from Jan)
# en_channel : int or list
# energy channel or list with first and last channel to be used
# species : string
# 'e', 'electrons', 'p', 'i', 'protons', 'ions'
# Returns
# -------
# pd.DataFrame
# flux_out: contains channel-averaged flux
# Raises
# ------
# Exception
# [description]
# """
# species = self.species
# try:
# if species not in ['e', 'electrons', 'p', 'protons', 'H']:
# raise ValueError("species not defined. Must by one of 'e',\
# 'electrons', 'p', 'protons', 'H'")
# except ValueError as error:
# print(repr(error))
# raise
# if species in ['e', 'electrons']:
# en_str = energies['Electron_Bins_Text']
# bins_width = 'Electron_Bins_Width'
# flux_key = 'Electron_Flux'
# if species in ['p', 'protons', 'H']:
# en_str = energies['H_Bins_Text']
# bins_width = 'H_Bins_Width'
# flux_key = 'H_Flux'
# if flux_key not in df.keys():
# flux_key = 'H_Flux'
# if type(en_channel) == list:
# # An IndexError here is caused by invalid channel choice
# try:
# en_channel_string = en_str[en_channel[0]].flat[0].split()[0] + ' - '\
# + en_str[en_channel[-1]].flat[0].split()[2] + ' ' +\
# en_str[en_channel[-1]].flat[0].split()[3]
# except IndexError:
# raise Exception(f"{en_channel} is an invalid channel or a combination of channels!")
# if len(en_channel) > 2:
# raise Exception('en_channel must have len 2 or less!')
# if len(en_channel) == 2:
# DE = energies[bins_width]
# for bins in np.arange(en_channel[0], en_channel[-1] + 1):
# if bins == en_channel[0]:
# I_all = df[flux_key].values[:, bins] * DE[bins]
# else:
# I_all = I_all + df[flux_key].values[:, bins] * DE[bins]
# DE_total = np.sum(DE[(en_channel[0]):(en_channel[-1] + 1)])
# flux_av_en = pd.Series(I_all/DE_total, index=df.index)
# flux_out = pd.DataFrame({'flux': flux_av_en}, index=df.index)
# else:
# en_channel = en_channel[0]
# flux_out = pd.DataFrame({'flux':
# df[flux_key].values[:, en_channel]},
# index=df.index)
# else:
# flux_out = pd.DataFrame({'flux':
# df[flux_key].values[:, en_channel]},
# index=df.index)
# en_channel_string = en_str[en_channel]
# return flux_out, en_channel_string
# def calc_av_en_flux_EPT(self, df, energies, en_channel):
# """This function averages the flux of several energy
# channels of EPT into a combined energy channel
# channel numbers counted from 0
# Parameters
# ----------
# df : pd.DataFrame DataFrame containing EPT data
# DataFrame containing EPT data
# energies : dict
# Energy dict returned from epd_loader (from Jan)
# en_channel : int or list
# energy channel number(s) to be used
# species : string
# 'e', 'electrons', 'p', 'i', 'protons', 'ions'
# Returns
# -------
# pd.DataFrame
# flux_out: contains channel-averaged flux
# Raises
# ------
# Exception
# [description]
# """
# species = self.species
# try:
# if species not in ['e', 'electrons', 'p', 'i', 'protons', 'ions']:
# raise ValueError("species not defined. Must by one of 'e',"
# "'electrons', 'p', 'i', 'protons', 'ions'")
# except ValueError as error:
# print(repr(error))
# raise
# if species in ['e', 'electrons']:
# bins_width = 'Electron_Bins_Width'
# flux_key = 'Electron_Flux'
# en_str = energies['Electron_Bins_Text']
# if species in ['p', 'i', 'protons', 'ions']:
# bins_width = 'Ion_Bins_Width'
# flux_key = 'Ion_Flux'
# en_str = energies['Ion_Bins_Text']
# if flux_key not in df.keys():
# flux_key = 'H_Flux'
# if type(en_channel) == list:
# # An IndexError here is caused by invalid channel choice
# try:
# en_channel_string = en_str[en_channel[0]].flat[0].split()[0] + ' - '\
# + en_str[en_channel[-1]].flat[0].split()[2] + ' '\
# + en_str[en_channel[-1]].flat[0].split()[3]
# except IndexError:
# raise Exception(f"{en_channel} is an invalid channel or a combination of channels!")
# if len(en_channel) > 2:
# raise Exception('en_channel must have len 2 or less!')
# if len(en_channel) == 2:
# DE = energies[bins_width]
# for bins in np.arange(en_channel[0], en_channel[-1]+1):
# if bins == en_channel[0]:
# I_all = df[flux_key].values[:, bins] * DE[bins]
# else:
# I_all = I_all + df[flux_key].values[:, bins] * DE[bins]
# DE_total = np.sum(DE[(en_channel[0]):(en_channel[-1]+1)])
# flux_av_en = pd.Series(I_all/DE_total, index=df.index)
# flux_out = pd.DataFrame({'flux': flux_av_en}, index=df.index)
# else:
# en_channel = en_channel[0]
# flux_out = pd.DataFrame({'flux':
# df[flux_key].values[:, en_channel]},
# index=df.index)
# else:
# flux_out = pd.DataFrame({'flux':
# df[flux_key].values[:, en_channel]},
# index=df.index)
# en_channel_string = en_str[en_channel]
# return flux_out, en_channel_string
def print_info(self, title, info):
"""
:meta private:
"""
title_string = "##### >" + title + "< #####"
print(title_string)
print(info)
print('#'*len(title_string) + '\n')
def mean_value(self, tb_start, tb_end, flux_series):
"""
This function calculates the classical mean of the background period
which is used in the onset analysis.
:meta private:
"""
# replace date_series with the resampled version
date = flux_series.index
background = flux_series.loc[(date >= tb_start) & (date < tb_end)]
mean_value = np.nanmean(background)
sigma = np.nanstd(background)
return [mean_value, sigma]
def onset_determination(self, ma_sigma, flux_series, cusum_window, bg_end_time, k_model: str = None):
"""
:meta private:
"""
flux_series = flux_series[bg_end_time:]
# assert date and the starting index of the averaging process
date = flux_series.index
ma = ma_sigma[0]
sigma = ma_sigma[1]
md = ma + self.x_sigma*sigma
# Choose the correct k_parameter to use:
if k_model is None:
k_param = k_parameter(mu=ma, sigma=sigma, sigma_multiplier=self.x_sigma)
elif k_model=="legacy":
print("Using the legacy definition for the k-parameter.")
k_param = k_legacy(mu=ma, sigma=sigma, sigma_multiplier=self.x_sigma)
# Input value for k_param was something strange:
else:
raise ValueError(f"Unidentified input for parameter k_model: {k_model}. Leave to None to use the standard k-parameter, or input 'legacy' if you want to use the old definition.")
# choose h, the variable dictating the "hastiness" of onset alert
h = 2 if k_param>1 else 1
alert = 0
cusum = np.zeros(len(flux_series))
norm_channel = np.zeros(len(flux_series))
# set the onset as default to be NaT (Not a Date)
onset_time = pd.NaT
for i in range(1, len(cusum)):
# normalize the observed flux
norm_channel[i] = (flux_series.iloc[i]-ma)/sigma
# calculate the value for ith cusum entry
cusum[i] = max(0, norm_channel[i] - k_param + cusum[i-1])
# check if cusum[i] is above threshold h,
# if it is -> increment alert
if cusum[i] > h:
alert = alert + 1
else:
alert = 0
# cusum_window(default:30) subsequent increments to alert
# means that the onset was found
if alert == cusum_window:
onset_time = date[i - alert]
break
# ma = mu_a = background average
# md = mu_d = background average + n*sigma
# k_param = the k-parameter of the modified CUSUM function
# h = 1 or 2,describes the hastiness of onset alert
# cusum = the cusum function
# onset_time = the time of the onset
return [ma, md, k_param, norm_channel, cusum, onset_time]
def onset_analysis(self, df_flux, windowstart, windowlen, windowrange, channels_dict,
channel='flux', cusum_window=30, yscale='log',
ylim=None, xlim=None, k_model: str = None):
"""
:meta private:
"""
self.print_info("Energy channels", channels_dict)
spacecraft = self.spacecraft.upper()
sensor = self.sensor.upper()
color_dict = {
'onset_time': '#e41a1c',
'bg_mean': '#e41a1c',
'flux_peak': '#1a1682',
'bg': '#de8585'
}
if self.spacecraft == 'solo':
flux_series = df_flux[channel]
if self.spacecraft[:2].lower() == 'st':
flux_series = df_flux # [channel]'
if self.spacecraft.lower() == 'soho':
flux_series = df_flux # [channel]
if self.spacecraft.lower() == 'wind':
flux_series = df_flux # [channel]
if self.spacecraft.lower() == 'psp':
flux_series = df_flux[channel]
if self.spacecraft.lower() == 'bepi':
flux_series = df_flux # [channel]
flux_series.index = flux_series.index.tz_localize(None)
date = flux_series.index
if ylim is None:
ylim = [np.nanmin(flux_series[flux_series > 0]/2),
np.nanmax(flux_series) * 3]
# windowrange is by default None, and then we define the start and stop with integer hours
if windowrange is None:
# dates for start and end of the averaging processes
avg_start = date[0] + datetime.timedelta(hours=windowstart)
# ending time is starting time + a given timedelta in hours
avg_end = avg_start + datetime.timedelta(hours=windowlen)
else:
avg_start, avg_end = windowrange[0], windowrange[1]
if xlim is None:
xlim = [date[0], date[-1]]
else:
# # add UTC timezone info if not present
# if not xlim[0].tzinfo and date[0].tzinfo == datetime.timezone.utc:
# xlim = [pd.to_datetime(xl, utc=True) for xl in xlim]
df_flux = df_flux[xlim[0]:xlim[-1]]
# onset not yet found
onset_found = False
background_stats = self.mean_value(avg_start, avg_end, flux_series)
onset_stats =\
self.onset_determination(background_stats, flux_series,
cusum_window, avg_end, k_model=k_model)
if not isinstance(onset_stats[-1], pd._libs.tslibs.nattype.NaTType):
onset_found = True
if self.spacecraft == 'solo':
df_flux_peak = df_flux[df_flux[channel] == df_flux[channel].max()]
if self.spacecraft[:2].lower() == 'st':
df_flux_peak = df_flux[df_flux == df_flux.max()]
if self.spacecraft == 'soho':
df_flux_peak = df_flux[df_flux == df_flux.max()]
if self.spacecraft == 'wind':
df_flux_peak = df_flux[df_flux == df_flux.max()]
if self.spacecraft == 'psp':
# df_flux_peak = df_flux[df_flux == df_flux.max()]
df_flux_peak = df_flux[df_flux[channel] == df_flux[channel].max()]
if self.spacecraft == 'bepi':
df_flux_peak = df_flux[df_flux == df_flux.max()]
# df_flux_peak = df_flux[df_flux[channel] == df_flux[channel].max()]
self.print_info("Flux peak", df_flux_peak)
self.print_info("Onset time", onset_stats[-1])
self.print_info("Mean of background intensity",
background_stats[0])
self.print_info("Std of background intensity",
background_stats[1])
# Before starting the plot, save the original rcParam options and update to new ones
original_rcparams = self.save_and_update_rcparams("onset_tool")
fig, ax = plt.subplots()
ax.plot(flux_series.index, flux_series.values, ds='steps-mid')
# CUSUM and norm datapoints in plots.
'''
ax.scatter(flux_series.index, onset_stats[-3], s=1,
color='darkgreen', alpha=0.7, label='norm')
ax.scatter(flux_series.index, onset_stats[-2], s=3,
c='maroon', label='CUSUM')
'''
# onset time
if onset_found:
# Onset time line
ax.axvline(onset_stats[-1], linewidth=1.5,
color=color_dict['onset_time'], linestyle='-',
label="Onset time")
# Flux peak line (first peak only, if there's multiple)
try:
ax.axvline(df_flux_peak.index[0], linewidth=1.5,
color=color_dict['flux_peak'], linestyle='-',
label="Peak time")
except IndexError:
exceptionmsg = "IndexError! Maybe you didn't adjust background_range or plot_range correctly?"
raise Exception(exceptionmsg)
# background mean
ax.axhline(onset_stats[0], linewidth=2,
color=color_dict['bg_mean'], linestyle='--',
label="Mean of background")
# background mean + 2*std
ax.axhline(onset_stats[1], linewidth=2,
color=color_dict['bg_mean'], linestyle=':',
label=f"Mean + {str(self.x_sigma)} * std of background")
# Background shaded area
ax.axvspan(avg_start, avg_end, color=color_dict['bg'],
label="Background", alpha=0.5)
# ax.set_xlabel("Time [HH:MM \nYYYY-mm-dd]", fontsize=16)
ax.set_ylabel(r"Intensity [1/(cm$^{2}$ sr s MeV)]", fontsize=16)
ax.yaxis.set_major_locator(plt.MaxNLocator(4))
# figure limits and scale
plt.ylim(ylim)
plt.xlim(xlim[0], xlim[1])
plt.yscale(yscale)
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.2),
fancybox=True, shadow=False, ncol=3, fontsize=16)
# tickmarks, their size etc...
plt.tick_params(which='major', length=5, width=1.5, labelsize=16)
plt.tick_params(which='minor', length=4, width=1)
# date tick locator and formatter
ax.xaxis_date()
# ax.xaxis.set_major_locator(ticker.MaxNLocator(9))
# utc_dt_format1 = DateFormatter('%H:%M \n%Y-%m-%d')
utc_dt_format1 = DateFormatter('%H:%M\n%b %d\n%Y')
ax.xaxis.set_major_formatter(utc_dt_format1)
if self.species == 'e':
s_identifier = 'electrons'
if self.species in ['p', 'i']:
if ((spacecraft == 'sta' and sensor == 'sept') or (spacecraft == 'solo' and sensor == 'ept')):
s_identifier = 'ions'
else:
s_identifier = 'protons'
self.print_info("Particle species", s_identifier)
if (self.viewing_used != '' and self.viewing_used is not None):
plt.title(f"{spacecraft}/{sensor} {channels_dict} {s_identifier}\n"
f"{self.averaging_used} averaging, viewing: "
f"{self.viewing_used.upper()}")
else:
plt.title(f"{spacecraft}/{sensor} {channels_dict} {s_identifier}\n"
f"{self.averaging_used} averaging")
fig.set_size_inches(16, 8)
# Onset label
if onset_found:
if (self.spacecraft == 'solo' or self.spacecraft == 'psp'):
plabel = AnchoredText(f"Onset time: {str(onset_stats[-1])[:19]}\n"
f"Peak flux: {df_flux_peak['flux'].iloc[0]:.2E}",
prop=dict(size=13), frameon=True,
loc='lower right')
# if(self.spacecraft[:2].lower() == 'st' or self.spacecraft == 'soho' or self.spacecraft == 'wind'):
else:
plabel = AnchoredText(f"Onset time: {str(onset_stats[-1])[:19]}\n"
f"Peak flux: {df_flux_peak.values[0]:.2E}",
prop=dict(size=13), frameon=True,
loc='lower right')
else:
plabel = AnchoredText("No onset found",
prop=dict(size=13), frameon=True,
loc='lower right')
plabel.patch.set_boxstyle("round, pad=0., rounding_size=0.2")
plabel.patch.set_linewidth(2.0)
# Background label
blabel = AnchoredText(f"Background:\n{avg_start} - {avg_end}",
prop=dict(size=13), frameon=True,
loc='upper left')
blabel.patch.set_boxstyle("round, pad=0., rounding_size=0.2")
blabel.patch.set_linewidth(2.0)
# Energy and species label
'''
eslabel = AnchoredText(f"{channels_dict} {s_identifier}",
prop=dict(size=13), frameon=True,
loc='lower left')
eslabel.patch.set_boxstyle("round, pad=0., rounding_size=0.2")
eslabel.patch.set_linewidth(2.0)
'''
ax.add_artist(plabel)
ax.add_artist(blabel)
# ax.add_artist(eslabel)
plt.tight_layout()
plt.show()
# Finally reset matplotlib rcParams to what they were before plotting
rcParams.update(original_rcparams)
return flux_series, onset_stats, onset_found, df_flux_peak, df_flux_peak.index[0], fig, background_stats[0]
[docs]
def find_onset(self, viewing, bg_start=None, bg_length=None, background_range=None, resample_period=None,
channels=[0, 1], yscale='log', cusum_window=30, xlim=None, x_sigma=2, k_model: str = None):
"""
This method runs Poisson-CUSUM onset analysis for the Event object.
Parameters:
-----------
viewing : str
The viewing direction of the sensor.
bg_start : int or float, default None
The start of background averaging from the start of the time series data in hours.
bg_length : int or float, default None
The length of the background averaging period in hours.
background_range : tuple or list of datetimes with len=2, default None
The time range of background averaging. If defined, takes precedence over bg_start and bg_length.
resample_period : str, default None
Pandas-compatible time string to average data. e.g. '10s' for 10 seconds or '2min' for 2 minutes.
channels : int or list of 2 ints, default [0,1]
Index or a combination of indices to plot a channel or combination of channels.
yscale : str, default 'log'
Matplotlib-compatible string for the scale of the y-axis. e.g. 'log' or 'linear'
cusum_window : int, default 30
The amount of consecutive data points above the threshold before identifying an onset.
xlim : tuple or list, default None
Panda-compatible datetimes or strings to assert the left and right boundary of the x-axis of the plot.
x_sigma : int, default 2
The multiplier of m_d in the definition of the control parameter k in Poisson-CUSUM method.
k_model : {str}, default None
Leave to None to use the current k-parameter. Input 'legacy' to use the old definition for k.
"""
# This check was initially transforming the 'channels' integer to a tuple of len==1, but that
# raised a ValueError with solo/ept. However, a list of len==1 is somehow okay.
if isinstance(channels, int):
channels = [channels]
if (background_range is not None) and (xlim is not None):
# Check if background is separated from plot range by over a day, issue a warning if so, but don't
if (background_range[0] < xlim[0] - datetime.timedelta(days=1) and background_range[0] < xlim[1] - datetime.timedelta(days=1)) or \
(background_range[1] > xlim[0] + datetime.timedelta(days=1) and background_range[1] > xlim[1] + datetime.timedelta(days=1)):
background_warning = "Your background_range is separated from plot_range by over a day. If this was intentional you may ignore this warning."
# warnings.warn(message=background_warning)
custom_warning(message=background_warning)
if (self.spacecraft[:2].lower() == 'st' and self.sensor == 'sept') \
or (self.spacecraft.lower() == 'psp' and self.sensor.startswith('isois')) \
or (self.spacecraft.lower() == 'solo' and self.sensor == 'step') \
or (self.spacecraft.lower() == 'solo' and self.sensor == 'ept') \
or (self.spacecraft.lower() == 'solo' and self.sensor == 'het') \
or (self.spacecraft.lower() == 'wind' and self.sensor == '3dp') \
or (self.spacecraft.lower() == 'bepi'):
self.viewing_used = viewing
self.choose_data(viewing)
elif (self.spacecraft[:2].lower() == 'st' and self.sensor == 'het'):
self.viewing_used = ''
elif (self.spacecraft.lower() == 'soho' and self.sensor == 'erne'):
self.viewing_used = ''
elif (self.spacecraft.lower() == 'soho' and self.sensor in ["ephin", "ephin-5", "ephin-15"]):
self.viewing_used = ''
# Check that the data that was loaded is valid. If not, abort with warning.
self.validate_data()
self.averaging_used = resample_period
self.x_sigma = x_sigma
if self.spacecraft == 'solo':
if self.sensor == 'het':
if self.species in ['p', 'i']:
# df_flux, en_channel_string =\
# self.calc_av_en_flux_HET(self.current_df_i, self.current_energies, channels)
df_flux, en_channel_string =\
solo_epd_combine_channels(self.current_df_i, self.current_energies, channels, sensor='HET')
elif self.species == 'e':
# df_flux, en_channel_string =\
# self.calc_av_en_flux_HET(self.current_df_e, self.current_energies, channels)
df_flux, en_channel_string =\
solo_epd_combine_channels(self.current_df_e, self.current_energies, channels, sensor='HET')
elif self.sensor == 'ept':
if self.species in ['p', 'i']:
# df_flux, en_channel_string =\
# self.calc_av_en_flux_EPT(self.current_df_i, self.current_energies, channels)
df_flux, en_channel_string =\
solo_epd_combine_channels(self.current_df_i, self.current_energies, channels, sensor='EPT')
elif self.species == 'e':
# df_flux, en_channel_string =\
# self.calc_av_en_flux_EPT(self.current_df_e, self.current_energies, channels)
df_flux, en_channel_string =\
solo_epd_combine_channels(self.current_df_e, self.current_energies, channels, sensor='EPT')
elif self.sensor == "step":
if len(channels) > 1:
not_implemented_msg = "Multiple channel averaging not yet supported for STEP! Please choose only one channel."
raise Exception(not_implemented_msg)
en_channel_string = self.get_channel_energy_values("str")[channels[0]]
if self.species in ('p', 'i'):
channel_id = self.current_df_i.columns[channels[0]]
df_flux = pd.DataFrame(data={
"flux": self.current_df_i[channel_id]
}, index=self.current_df_i.index)
elif self.species == 'e':
channel_id = self.current_df_e.columns[channels[0]]
df_flux = pd.DataFrame(data={
"flux": self.current_df_e[channel_id]
}, index=self.current_df_e.index)
else:
invalid_sensor_msg = "Invalid sensor!"
raise Exception(invalid_sensor_msg)
if self.spacecraft[:2] == 'st':
# Super ugly implementation, but easiest to just wrap both sept and het calculators
# in try block. KeyError is caused by an invalid channel choice.
try:
if self.sensor == 'het':
if self.species in ['p', 'i']:
df_flux, en_channel_string =\
calc_av_en_flux_ST_HET(self.current_df_i,
self.current_energies['channels_dict_df_p'],
channels,
species='p')
elif self.species == 'e':
df_flux, en_channel_string =\
calc_av_en_flux_ST_HET(self.current_df_e,
self.current_energies['channels_dict_df_e'],
channels,
species='e')
elif self.sensor == 'sept':
if self.species in ['p', 'i']:
df_flux, en_channel_string =\
calc_av_en_flux_SEPT(self.current_df_i,
self.current_i_energies['channels_dict_df_p'],
channels)
elif self.species == 'e':
df_flux, en_channel_string =\
calc_av_en_flux_SEPT(self.current_df_e,
self.current_e_energies['channels_dict_df_e'],
channels)
except KeyError:
raise Exception(f"{channels} is an invalid channel or a combination of channels!")
if self.spacecraft == 'soho':
# A KeyError here is caused by invalid channel
try:
if self.sensor == 'erne':
if self.species in ['p', 'i']:
df_flux, en_channel_string =\
calc_av_en_flux_ERNE(self.current_df_i,
self.current_energies['channels_dict_df_p'],
channels,
species='p',
sensor='HET')
if self.sensor == 'ephin':
# convert single-element "channels" list to integer
if type(channels) is list:
if len(channels) == 1:
channels = channels[0]
else:
print("No multi-channel support for SOHO/EPHIN included yet! Select only one single channel.")
if self.species == 'e':
df_flux = self.current_df_e[f'E{channels}']
en_channel_string = self.current_energies['energy_labels'][f'E{channels}']
if self.sensor in ("ephin-5", "ephin-15"):
if isinstance(channels, list):
if len(channels) == 1:
channels = channels[0]
else:
raise Exception("No multi-channel support for SOHO/EPHIN included yet! Select only one single channel.")
if self.species == 'e':
df_flux = self.current_df_e[f"E{channels}"]
en_channel_string = self.current_energies[f"E{channels}"]
except KeyError:
raise Exception(f"{channels} is an invalid channel or a combination of channels!")
if self.spacecraft == 'wind':
if self.sensor == '3dp':
# convert single-element "channels" list to integer
if type(channels) is list:
if len(channels) == 1:
channels = channels[0]
else:
print("No multi-channel support for Wind/3DP included yet! Select only one single channel.")
if self.species in ['p', 'i']:
if viewing != "omnidirectional":
df_flux = self.current_df_i.filter(like=f'FLUX_E{channels}')
else:
df_flux = self.current_df_i.filter(like=f'FLUX_{channels}')
# extract pd.Series for further use:
df_flux = df_flux[df_flux.columns[0]]
# change flux units from '#/cm2-ster-eV-sec' to '#/cm2-ster-MeV-sec'
df_flux = df_flux*1e6
en_channel_string = self.current_i_energies['channels_dict_df']['Bins_Text'][f'ENERGY_{channels}']
elif self.species == 'e':
if viewing != "omnidirectional":
df_flux = self.current_df_e.filter(like=f'FLUX_E{channels}')
else:
df_flux = self.current_df_e.filter(like=f'FLUX_{channels}')
# extract pd.Series for further use:
df_flux = df_flux[df_flux.columns[0]]
# change flux units from '#/cm2-ster-eV-sec' to '#/cm2-ster-MeV-sec'
df_flux = df_flux*1e6
en_channel_string = self.current_e_energies['channels_dict_df']['Bins_Text'][f'ENERGY_{channels}']
if self.spacecraft.lower() == 'bepi':
if self.data_level == 'l2':
if type(channels) is list:
if len(channels) == 1:
# convert single-element "channels" list to integer
channels = channels[0]
if self.species == 'e':
df_flux = self.current_df_e[f'E{channels}']
en_channel_string = self.current_energies['Energy_Bin_str'][f'E{channels}']
if self.species in ['p', 'i']:
df_flux = self.current_df_i[f'P{channels}']
en_channel_string = self.current_energies['Energy_Bin_str'][f'P{channels}']
else:
if self.species == 'e':
df_flux, en_channel_string = calc_av_en_flux_sixs(self.current_df_e, channels, self.species)
if self.species in ['p', 'i']:
df_flux, en_channel_string = calc_av_en_flux_sixs(self.current_df_i, channels, self.species)
elif self.data_level == 'l3':
if type(channels) is list:
if len(channels) > 1:
if self.species == 'e':
df_flux, en_channel_string = calc_av_en_flux_sixs(self.current_df_e, channels, self.species)
if self.species in ['p', 'i']:
df_flux, en_channel_string = calc_av_en_flux_sixs(self.current_df_i, channels, self.species)
elif len(channels) == 1:
channels = channels[0]
if type(channels) is int:
if self.species == 'e':
df_flux = self.current_df_e[f'Side{self.viewing}_E{channels}']
en_channel_string = self.current_energies[f'Side{self.viewing}_Electron_Bins_str'][f'E{channels}']
if self.species in ['p', 'i']:
df_flux = self.current_df_i[f'Side{self.viewing}_P{channels}']
en_channel_string = self.current_energies[f'Side{self.viewing}_Proton_Bins_str'][f'P{channels}']
if self.spacecraft.lower() == 'psp':
if self.sensor.lower() == 'isois-epihi':
if self.species in ['p', 'i']:
# We're using here only the HET instrument of EPIHI (and not LET1 or LET2)
df_flux, en_channel_string =\
calc_av_en_flux_PSP_EPIHI(df=self.current_df_i,
energies=self.current_i_energies,
en_channel=channels,
species='p',
instrument='het',
viewing=viewing.upper())
if self.species == 'e':
# We're using here only the HET instrument of EPIHI (and not LET1 or LET2)
df_flux, en_channel_string =\
calc_av_en_flux_PSP_EPIHI(df=self.current_df_e,
energies=self.current_e_energies,
en_channel=channels,
species='e',
instrument='het',
viewing=viewing.upper())
if self.sensor.lower() == 'isois-epilo':
if self.species == 'e':
# We're using here only the F channel of EPILO (and not E or G)
df_flux, en_channel_string =\
calc_av_en_flux_PSP_EPILO(df=self.current_df_e,
en_dict=self.current_e_energies,
en_channel=channels,
species='e',
mode='pe',
chan='F',
viewing=viewing)
if resample_period is not None:
df_averaged = resample_df(df=df_flux, resample=resample_period, cols_unc=[]) # TODO: this ignores all uncertainty columns, but they are not used anyway
else:
df_averaged = df_flux
flux_series, onset_stats, onset_found, peak_flux, peak_time, fig, bg_mean =\
self.onset_analysis(df_averaged, bg_start, bg_length, background_range,
en_channel_string, yscale=yscale, cusum_window=cusum_window,
xlim=xlim, k_model=k_model)
# At least in the case of solo/ept the peak_flux is a pandas Dataframe, but it should be a Series
if isinstance(peak_flux, pd.core.frame.DataFrame):
peak_flux = pd.Series(data=peak_flux.values[0])
# update class attributes before returning variables:
self.update_onset_attributes(flux_series, onset_stats, onset_found, peak_flux.values[0], peak_time, fig, bg_mean)
return flux_series, onset_stats, onset_found, peak_flux, peak_time, fig, bg_mean
# For backwards compatibility, make a copy of the `find_onset` function that is called `analyse` (which was its old name).
# Deactivated in August 2025. Remove later if no problems occur.
# analyse = copy.copy(find_onset)
[docs]
def dynamic_spectrum(self, view, cmap: str = 'magma', xlim: tuple = None, resample: str = None, save: bool = False,
other=None) -> None:
"""
Shows all the different energy channels in a single 2D plot, and color codes the corresponding intensity*energy^2 by a colormap.
Parameters:
-----------
view : str or None
The viewing direction of the sensor
cmap : str, default='magma'
The colormap for the dynamic spectrum plot
xlim : 2-tuple of datetime strings (str, str)
Pandas-compatible datetime strings for the start and stop of the figure
resample : str
Pandas-compatibe resampling string, e.g. '10min' or '30s'.
Note that this is just a simple wrapper around thepandas
resample function that is calculating the mean of the data in the new
time bins. This is not necessarily the correct way to resample data,
depending on the data type (for example for errors)!
save : bool
Saves the image
"""
def get_yaxis_bin_boundaries(e_lows, e_highs, y_multiplier, is_solohetions):
"""
Helper function to produce the bin boundaries for dynamic spectrum y-axis.
"""
# For any other sc+instrument combination than solo+HET in the current version, there is no need to complicate setting bin boundaries
if not is_solohetions:
return np.append(e_lows, e_highs[-1]) * y_multiplier
# Init the boundaries. For SolO/HET there are more boundaries than channels
yaxis_bin_boundaries = np.zeros(len(e_lows)+2)
yaxis_idx = 0
chooser_idx = yaxis_idx
while yaxis_idx < len(yaxis_bin_boundaries)-1: # this loop will not go to the final index of the array
# Set the first boundary as simply the first val of lower energy boundaries and continue
if yaxis_idx==0:
yaxis_bin_boundaries[yaxis_idx] = e_lows[chooser_idx]
yaxis_idx += 1
chooser_idx += 1
continue
# If the lower boundary now is the same as the last bins higher boundary, then set that as the boundary
if e_lows[chooser_idx] == e_highs[chooser_idx-1]:
yaxis_bin_boundaries[yaxis_idx] = e_lows[chooser_idx]
yaxis_idx += 1
chooser_idx += 1
# ...if not, set the last higher boundary as the boundary, the next lower boundary as the next boundary and continue business as usual
else:
yaxis_bin_boundaries[yaxis_idx] = e_highs[chooser_idx-1]
yaxis_bin_boundaries[yaxis_idx+1] = e_lows[chooser_idx]
yaxis_idx += 2
chooser_idx += 1
# Finally the last boundary is the final boundary of e_highs:
yaxis_bin_boundaries[-1] = e_highs[-1]
return yaxis_bin_boundaries * y_multiplier
def combine_grids_and_ybins(grid, grid1, y_arr, y_arr1):
# TODO: Which bin exactly is removed here? HET? EPT? (JG)
# solo/het lowest electron channel partially overlaps with ept highest channel -> erase the "extra" bin where overlapping hapens
if self.spacecraft == "solo" and (self.sensor == "het" or other.sensor == "het") and self.species in ("electrons", "electron", 'e'):
grid1 = np.append(grid, grid1, axis=0)[:-1]
# This deletes the first entry of y_arr1
y_arr1 = np.delete(y_arr1, 0, axis=0)
y_arr1 = np.append(y_arr, y_arr1)
# There is a gap between the highest solo/ept proton channel and the lowest het ion channel -> add extra row to grid
# filled with nans to compensate
elif self.spacecraft == "solo" and (self.sensor == "het" or other.sensor == "het") and self.species == 'p':
nans = np.array([[np.nan for i in range(len(grid[0]))]])
grid = np.append(grid, nans, axis=0)
grid1 = np.append(grid, grid1, axis=0)[:-2]
y_arr1 = np.append(y_arr, y_arr1)
else:
grid1 = np.append(grid, grid1, axis=0)
y_arr1 = np.append(y_arr, y_arr1)
return grid1, y_arr1
# Event attributes
spacecraft = self.spacecraft.lower()
instrument = self.sensor.lower()
species = self.species
# Boolean value for checking if y-axis requires a white stripe
is_solohetions = (spacecraft == "solo" and instrument == "het" and species == 'p')
# This method has to be run before doing anything else to make sure that the viewing is correct
self.choose_data(view)
# Check that the data that was loaded is valid. If not, abort with warning.
self.validate_data()
if self.spacecraft == "bepi":
raise NotImplementedError("BepiColombo/SIXS-P not yet implemented!")
if self.spacecraft == "solo":
if instrument == "step":
# custom_warning('The lower STEP energy channels are partly overlapping, which is not correctly implemented at the moment!')
raise Warning('SolO/STEP is not implemented yet in the dynamic spectrum tool!')
# All viewings are contained in the same dataframe, choose the pixel (viewing) here
pixel = self.viewing.split(' ')[1]
# Pixel info is in format NN in the dataframe, 1 -> 01 while 12 -> 12
if len(pixel) == 1:
pixel = f"0{pixel}"
# Pixel length more than 2 means "averaged" -> called "Avg" in the dataframe
elif len(pixel) > 2:
pixel = "Avg"
if species in ("electron", 'e'):
particle_data = self.df_step[[col for col in self.df_step.columns if f"Electron_{pixel}_Flux" in col]]
s_identifier = "electrons"
# Step's "Magnet" channel deflects electrons -> measures all positive ions
else:
particle_data = self.df_step[[col for col in self.df_step.columns if f"Magnet_{pixel}_Flux" in col]]
s_identifier = "ions"
# EPT and HET data come in almost identical containers, they need not be differentiated
elif species in ("electron", 'e'):
particle_data = self.current_df_e["Electron_Flux"]
s_identifier = "electrons"
else:
try:
particle_data = self.current_df_i["Ion_Flux"]
s_identifier = "ions"
except KeyError:
particle_data = self.current_df_i["H_Flux"]
s_identifier = "protons"
if self.spacecraft[:2] == "st":
if species in ["electron", 'e']:
if instrument == "sept":
particle_data = self.current_df_e[[ch for ch in self.current_df_e.columns if ch[:2] == "ch"]]
else:
particle_data = self.current_df_e[[ch for ch in self.current_df_e.columns if "Flux" in ch]]
s_identifier = "electrons"
else:
if instrument == "sept":
particle_data = self.current_df_i[[ch for ch in self.current_df_i.columns if ch[:2] == "ch"]]
s_identifier = "ions"
else:
particle_data = self.current_df_i[[ch for ch in self.current_df_i.columns if "Flux" in ch]]
s_identifier = "protons"
if self.spacecraft == "soho":
if instrument.lower() == "erne":
particle_data = self.current_df_i
s_identifier = "protons"
if instrument.lower() == "ephin":
particle_data = self.current_df_e
# Here drop the E300 channel altogether from the dataframe if the data is produced after Oct 4, 2017,
# for it contains no valid data. Keyword axis==1 refers to the columns axis.
if self.start_date > pd.to_datetime("2017-10-04").date():
particle_data = particle_data.drop("E300", axis=1)
s_identifier = "electrons"
# raise Warning('SOHO/EPHIN is not implemented yet in the dynamic spectrum tool!')
if spacecraft == "psp":
if instrument.lower() == "isois-epihi":
if species in ("electron", 'e'):
particle_data = self.current_df_e
s_identifier = "electrons"
if species in ("proton", "p"):
particle_data = self.current_df_i
s_identifier = "protons"
if instrument.lower() == "isois-epilo":
if species in ("electron", 'e'):
particle_data = self.current_df_e
s_identifier = "electrons"
if spacecraft == "wind":
if instrument.lower() == "3dp":
if species in ("electron", 'e'):
# change flux units from '#/cm2-ster-eV-sec' to '#/cm2-ster-MeV-sec'
particle_data = self.current_df_e*1e6
s_identifier = "electrons"
else:
# change flux units from '#/cm2-ster-eV-sec' to '#/cm2-ster-MeV-sec'
particle_data = self.current_df_i*1e6
s_identifier = "protons"
# These particle instruments will have keVs on their y-axis
LOW_ENERGY_SENSORS = ("sept", "step", "ept")
# For a single instrument, check if low or high energy instrument, but for joint dynamic spectrum
# always use MeVs as the unit, because the y-axis is going to range over a large number of values
if not other:
if instrument in LOW_ENERGY_SENSORS:
y_multiplier = 1e-3 # keV
y_unit = "keV"
else:
y_multiplier = 1e-6 # MeV
y_unit = "MeV"
else:
y_multiplier = 1e-6 # MeV
y_unit = "MeV"
# Resample only if requested
if resample is not None:
particle_data = resample_df(df=particle_data, resample=resample, cols_unc=[]) # TODO: this ignores all uncertainty columns, but they are not used anyway
if xlim is None:
df = particle_data[:]
t_start, t_end = df.index[0], df.index[-1]
else:
# td is added to the start and the end to avert white pixels at the end of the plot
td_str = resample if resample is not None else '0s'
td = pd.Timedelta(value=td_str)
t_start, t_end = pd.to_datetime(xlim[0]), pd.to_datetime(xlim[1])
df = particle_data.loc[(particle_data.index >= (t_start-td)) & (particle_data.index <= (t_end+td))]
# In practice this seeks the date on which the highest flux is observed
# Addendum JG 2026-01-23: returns the first date of the data if all data is NaN
if not df[df.columns[0]].isna().all():
# Since pandas 3.0.0, this raises an ValueError if df consists only of NaNs. Older pandas versions would return -1 (!) in that case
date_of_event = df.iloc[np.argmax(df[df.columns[0]])].name.date()
else:
date_of_event = df.iloc[0].name.date()
# Assert time and channel bins
time = df.index
# The low and high ends of each energy channel
e_lows, e_highs = self.get_channel_energy_values() # this function return energy in eVs
# The mean energy of each channel in eVs
mean_energies = np.sqrt(np.multiply(e_lows, e_highs))
# Boundaries of plotted bins in keVs are the y-axis:
y_arr = get_yaxis_bin_boundaries(e_lows, e_highs, y_multiplier, is_solohetions)
# Set image pixel length and height
image_len = len(time)
image_hei = len(y_arr)-1
# Init the grid
grid = np.zeros((image_len, image_hei))
# Display energy in MeVs -> multiplier squared is 1e-6*1e-6 = 1e-12
ENERGY_MULTIPLIER_SQUARED = 1e-12
# Assign grid bins -> intensity * energy^2
if is_solohetions:
for i, channel in enumerate(df):
if i<5:
grid[:, i] = df[channel]*(mean_energies[i]*mean_energies[i]*ENERGY_MULTIPLIER_SQUARED)
elif i==5:
grid[:, i] = np.nan
grid[:, i+1] = df[channel]*(mean_energies[i]*mean_energies[i]*ENERGY_MULTIPLIER_SQUARED)
else:
grid[:, i+1] = df[channel]*(mean_energies[i]*mean_energies[i]*ENERGY_MULTIPLIER_SQUARED)
else:
for i, channel in enumerate(df):
grid[:, i] = df[channel]*(mean_energies[i]*mean_energies[i]*ENERGY_MULTIPLIER_SQUARED) # Intensity*Energy^2, and energy is in eV -> tranform to keV or MeV
# Finally cut the last entry and transpose the grid so that it can be plotted correctly
grid = grid[:-1, :]
grid = grid.T
maskedgrid = np.where(grid == 0, 0, 1)
maskedgrid = np.ma.masked_where(maskedgrid == 1, maskedgrid)
# ---only plotting_commands from this point----->
# Save the original rcParams and update to new ones
original_rcparams = self.save_and_update_rcparams("dynamic_spectrum")
normscale = cl.LogNorm()
# Init the figure and axes
if self.radio_spacecraft is None:
figsize = (27, 14)
fig, ax = plt.subplots(figsize=figsize)
ax = np.array([ax])
DYN_SPEC_INDX = 0
else:
figsize = (27, 18)
fig, ax = plt.subplots(nrows=2, figsize=figsize, sharex=True)
DYN_SPEC_INDX = 1
from seppy.tools.swaves import plot_swaves
ax[0], colormesh = plot_swaves(downloaded_files=self.radio_files, spacecraft=self.radio_spacecraft[0], start_time=t_start, end_time=t_end, ax=ax[0], cmap=cmap)
fig.tight_layout(pad=9.5, w_pad=-0.5, h_pad=1.0)
# plt.subplots_adjust(wspace=-1, hspace=-1.8)
# Colorbar
cb = fig.colorbar(colormesh, orientation='vertical', ax=ax[0])
clabel = "Intensity" + "\n" + "[dB]"
cb.set_label(clabel)
ax[DYN_SPEC_INDX].set_yscale('log')
# Colorbar
if not other:
# Colormesh
cplot = ax[DYN_SPEC_INDX].pcolormesh(time, y_arr, grid, shading='auto', cmap=cmap, norm=normscale)
_greymesh = ax[DYN_SPEC_INDX].pcolormesh(time, y_arr, maskedgrid, shading='auto', cmap='Greys', vmin=-1, vmax=1)
cb = fig.colorbar(cplot, orientation='vertical', ax=ax[DYN_SPEC_INDX])
clabel = r"Intensity $\cdot$ $E^{2}$" + "\n" + r"[MeV/(cm$^{2}$ sr s)]"
cb.set_label(clabel)
# y-axis settings
ax[DYN_SPEC_INDX].set_ylim(np.nanmin(y_arr), np.nanmax(y_arr))
ax[DYN_SPEC_INDX].set_yticks([yval for yval in y_arr])
ax[DYN_SPEC_INDX].set_ylabel(f"Energy [{y_unit}]")
# Format of y-axis: for single instrument use pretty numbers, for joint spectrum only powers of ten
if not other:
ax[DYN_SPEC_INDX].yaxis.set_major_formatter(ScalarFormatter(useMathText=True))
else:
ax[DYN_SPEC_INDX].yaxis.set_major_formatter(ScalarFormatter(useMathText=False))
# gets rid of minor ticks and labels
ax[DYN_SPEC_INDX].yaxis.minorticks_off()
ax[DYN_SPEC_INDX].yaxis.set_tick_params(length=12., width=2.0, which='major')
# x-axis settings
# ax[DYN_SPEC_INDX].set_xlabel("Time [HH:MM \nm-d]")
ax[DYN_SPEC_INDX].xaxis_date()
ax[DYN_SPEC_INDX].set_xlim(t_start, t_end)
# ax[DYN_SPEC_INDX].xaxis.set_major_locator(mdates.HourLocator(interval = 1))
# utc_dt_format1 = DateFormatter('%H:%M \n%m-%d')
utc_dt_format1 = DateFormatter('%H:%M\n%b %d\n%Y')
ax[DYN_SPEC_INDX].xaxis.set_major_formatter(utc_dt_format1)
# ax.xaxis.set_minor_locator(mdates.MinuteLocator(interval = 5))
# Expand the spectrum to a second instrument (for now only for solo/ept + step or het)
if other and self.sensor == "ept" and other.sensor == "het":
is_solohetions = (other.sensor == "het" and species == 'p')
# This method has to be run before doing anything else to make sure that the viewing is correct
other.choose_data(other.viewing)
# EPT and HET data come in almost identical containers, they need not be differentiated
if species in ("electron", 'e'):
particle_data1 = other.current_df_e["Electron_Flux"]
else:
try:
particle_data1 = other.current_df_i["Ion_Flux"]
except KeyError:
particle_data1 = other.current_df_i["H_Flux"]
# Resample only if requested
if resample is not None:
particle_data1 = resample_df(df=particle_data1, resample=resample, cols_unc=[]) # TODO: this ignores all uncertainty columns, but they are not used anyway
if xlim is None:
df1 = particle_data1[:]
# t_start, t_end = df.index[0], df.index[-1]
else:
# td is added to the start and the end to avert white pixels at the end of the plot
td_str = resample if resample is not None else '0s'
td = pd.Timedelta(value=td_str)
t_start, t_end = pd.to_datetime(xlim[0]), pd.to_datetime(xlim[1])
df1 = particle_data1.loc[(particle_data1.index >= (t_start-td)) & (particle_data1.index <= (t_end+td))]
# Assert time and channel bins
time1 = df.index
# The low and high ends of each energy channel
e_lows1, e_highs1 = other.get_channel_energy_values() # this function return energy in eVs
# The mean energy of each channel in eVs
mean_energies1 = np.sqrt(np.multiply(e_lows1, e_highs1))
# Boundaries of plotted bins in keVs are the y-axis:
y_arr1 = get_yaxis_bin_boundaries(e_lows1, e_highs1, y_multiplier, is_solohetions)
# Set image pixel height (length was already set before)
# For solohet+ions we do not subtract 1 here, because there is an energy gap between EPT highest channel and
# HET lowest channel, hence requiring one "empty" bin in between
image_hei1 = len(y_arr1)+1 if is_solohetions else len(y_arr1)
# Init the grid
grid1 = np.zeros((image_len, image_hei1))
# Assign grid bins -> intensity * energy^2
if is_solohetions:
for i, channel in enumerate(df1):
if i<5:
grid1[:, i] = df1[channel]*(mean_energies1[i]*mean_energies1[i]*ENERGY_MULTIPLIER_SQUARED)
elif i==5:
grid1[:, i] = np.nan
grid1[:, i+1] = df1[channel]*(mean_energies1[i]*mean_energies1[i]*ENERGY_MULTIPLIER_SQUARED)
else:
grid1[:, i+1] = df1[channel]*(mean_energies1[i]*mean_energies1[i]*ENERGY_MULTIPLIER_SQUARED)
else:
for i, channel in enumerate(df1):
grid1[:, i] = df1[channel]*(mean_energies1[i]*mean_energies1[i]*ENERGY_MULTIPLIER_SQUARED) # Intensity*Energy^2, and energy is in eV -> transform to keV or MeV
# Finally cut the last entry and transpose the grid1 so that it can be plotted correctly
grid1 = grid1[:-1, :]
grid1 = grid1.T
# grids and y-axis has to be fused together so they can be plotted in the same colormesh
grid1, y_arr1 = combine_grids_and_ybins(grid, grid1, y_arr, y_arr1)
maskedgrid1 = np.where(grid1 == 0, 0, 1)
maskedgrid1= np.ma.masked_where(maskedgrid1 == 1, maskedgrid1)
# return time1, y_arr1, grid1
# Colormesh
cplot1 = ax[DYN_SPEC_INDX].pcolormesh(time1, y_arr1, grid1, shading='auto', cmap=cmap, norm=normscale)
_greymesh1 = ax[DYN_SPEC_INDX].pcolormesh(time1, y_arr1, maskedgrid1, shading='auto', cmap='Greys', vmin=-1, vmax=1)
# Updating the colorbar
cb = fig.colorbar(cplot1, orientation='vertical', ax=ax[DYN_SPEC_INDX])
clabel = r"Intensity $\cdot$ $E^{2}$" + "\n" + r"[MeV/(cm$^{2}$ sr s)]"
cb.set_label(clabel)
# y-axis settings
ax[DYN_SPEC_INDX].set_yscale('log')
ax[DYN_SPEC_INDX].set_ylim(np.nanmin(y_arr), np.nanmax(y_arr1))
# Set a rougher tickscale
energy_tick_powers = (-1, 1, 3) if species in ("electron", 'e') else (-1, 2, 4)
yticks = np.logspace(start=energy_tick_powers[0], stop=energy_tick_powers[1], num=energy_tick_powers[2])
# First one sets the ticks in place and the second one enlarges the tick labels (not the ticks, the numbers next to them)
ax[DYN_SPEC_INDX].set_yticks([yval for yval in yticks])
ax[DYN_SPEC_INDX].tick_params(axis='y', labelsize=32)
ax[DYN_SPEC_INDX].set_ylabel(f"Energy [{y_unit}]")
# Introduce minor ticks back
ax[DYN_SPEC_INDX].yaxis.set_tick_params(length=8., width=1.2, which='minor')
fig.set_size_inches((27, 18))
# Title
if view is not None and other:
title = f"{spacecraft.upper()}/{instrument.upper()}+{other.sensor.upper()} ({view}) {s_identifier}, {date_of_event}"
elif view:
title = f"{spacecraft.upper()}/{instrument.upper()} ({view}) {s_identifier}, {date_of_event}"
else:
title = f"{spacecraft.upper()}/{instrument.upper()} {s_identifier}, {date_of_event}"
if self.radio_spacecraft is None:
ax[0].set_title(title)
else:
ax[0].set_title(f"Radio & Dynamic Spectrum, {title}")
# saving of the figure
if save:
plt.savefig(f'plots/{spacecraft}_{instrument}_{date_of_event}_dynamic_spectra.png', transparent=False,
facecolor='white', bbox_inches='tight')
self.fig = fig
plt.show()
# Finally return plotting options to what they were before plotting
rcParams.update(original_rcparams)
[docs]
def tsa_plot(self, view, selection=None, xlim=None, resample=None):
"""
Makes an interactive time-shift plot
Parameters:
-----------
view : str or None
Viewing direction for the chosen sensor
selection : 2-tuple
The indices of the channels one wishes to plot. End-exclusive.
xlim : 2-tuple
The start and end point of the plot as pandas-compatible datetimes or strings
resample : str
Pandas-compatible resampling time-string, e.g. "2min" or "50s".
Note that this is just a simple wrapper around thepandas
resample function that is calculating the mean of the data in the new
time bins. This is not necessarily the correct way to resample data,
depending on the data type (for example for errors)!
"""
import ipywidgets as widgets
from IPython.display import display
# inits
spacecraft = self.spacecraft
instrument = self.sensor
species = self.species
# This here is an extremely stupid thing, but we must convert spacecraft input name back
# to its original version so that sunpy.get_horizon_coords() understands it
if spacecraft == "solo":
spacecraft_input_name = "Solar Orbiter"
elif spacecraft == "sta":
spacecraft_input_name = "STEREO-A"
elif spacecraft == "stb":
spacecraft_input_name = "STEREO-B"
else:
spacecraft_input_name = spacecraft.upper()
# get (lon, lat, radius) in (deg, deg, AU) in Stonyhurst coordinates:
# e.g. 'Solar Orbiter', 'STEREO-A', 'STEREO-B', 'SOHO', 'PSP'
position = get_horizons_coord(spacecraft_input_name, self.start_date)
radial_distance_value = np.round(position.radius.value, 2)
METERS_PER_AU = 1 * u.AU.to(u.m)
self.choose_data(view)
# Check that the data that was loaded is valid. If not, abort with warning.
self.validate_data()
if self.spacecraft == "bepi":
raise NotImplementedError("BepiColombo/SIXS-P not yet implemented!")
if self.spacecraft == "solo":
if self.sensor == "step":
if species in ("electron", 'e'):
particle_data = self.current_df_e
s_identifier = "electrons"
else:
particle_data = self.current_df_i
s_identifier = "ions"
else:
if species in ("electron", 'e'):
particle_data = self.current_df_e["Electron_Flux"]
s_identifier = "electrons"
else:
try:
particle_data = self.current_df_i["Ion_Flux"]
s_identifier = "ions"
except KeyError:
particle_data = self.current_df_i["H_Flux"]
s_identifier = "protons"
sc_identifier = "Solar Orbiter"
if self.spacecraft[:2] == "st":
if species in ("electron", 'e'):
if instrument == "sept":
particle_data = self.current_df_e[[ch for ch in self.current_df_e.columns if ch[:2] == "ch"]]
else:
particle_data = self.current_df_e[[ch for ch in self.current_df_e.columns if "Flux" in ch]]
s_identifier = "electrons"
else:
if instrument == "sept":
particle_data = self.current_df_i[[ch for ch in self.current_df_i.columns if ch[:2] == "ch"]]
else:
particle_data = self.current_df_i[[ch for ch in self.current_df_i.columns if "Flux" in ch]]
s_identifier = "protons"
sc_identifier = "STEREO-A" if spacecraft[-1] == "a" else "STEREO-B"
if self.spacecraft == "soho":
# ERNE-HED (only protons)
if instrument.lower() == "erne":
particle_data = self.current_df_i
s_identifier = "protons"
# EPHIN, as of now only electrons, could be extended to protons in the future
if instrument.lower() == "ephin":
particle_data = self.current_df_e
s_identifier = "electrons"
sc_identifier = spacecraft.upper()
if self.spacecraft == "psp":
if instrument.lower() == "isois-epihi":
if species in ("electron", 'e'):
particle_data = self.current_df_e
s_identifier = "electrons"
if species in ("proton", 'p'):
particle_data = self.current_df_i
s_identifier = "protons"
# EPILO only has electrons
if instrument.lower() == "isois-epilo":
if species in ("electron", 'e'):
particle_data = self.current_df_e
s_identifier = "electrons"
sc_identifier = "Parker Solar Probe"
if spacecraft == "wind":
if instrument.lower() == "3dp":
if species in ("electron", 'e'):
# change flux units from '#/cm2-ster-eV-sec' to '#/cm2-ster-MeV-sec'
particle_data = self.current_df_e*1e6
s_identifier = "electrons"
else:
# change flux units from '#/cm2-ster-eV-sec' to '#/cm2-ster-MeV-sec'
particle_data = self.current_df_i*1e6
s_identifier = "protons"
sc_identifier = spacecraft.capitalize()
# make a copy to make sure original data is not altered
dataframe = particle_data.copy()
particle_speeds = self.calculate_particle_speeds()
# t_0 = t - L/v -> L/v is the coefficient that shifts the x-axis
shift_coefficients = [METERS_PER_AU/v for v in particle_speeds]
stepsize = 0.05
min_slider_val, max_slider_val = 0.0, 10
# Only the selected channels will be plotted
if selection is not None:
# len==3 means that we only choose every selection[2]:th channel
if len(selection) == 3:
channel_indices = [i for i in range(selection[0], selection[1], selection[2])]
selected_channels = [channel for channel in dataframe.columns[channel_indices]]
else:
channel_indices = [i for i in range(selection[0], selection[1])]
selected_channels = dataframe.columns[selection[0]:selection[1]]
else:
selected_channels = dataframe.columns
channel_indices = [i for i in range(len(selected_channels))]
# Change 0-values to nan purely for plotting purposes, since we're not doing any
# calculations with them
dataframe[dataframe[selected_channels] == 0] = np.nan
# Get the channel numbers (not the indices!)
if instrument != "isois-epilo":
try:
channel_nums = [int(name.split('_')[-1]) for name in selected_channels]
except ValueError:
# In the case of Wind/3DP, channel strings are like: FLUX_E0_P0, E for energy channel and P for direction
if self.spacecraft == "wind":
channel_nums = [int(name.split('_')[1][-1]) for name in selected_channels]
# SOHO/EPHIN has channels such as E300 etc...
if self.spacecraft == "soho":
channel_nums = [name for name in selected_channels]
else:
channel_nums = [int(name.split('_E')[-1].split('_')[0]) for name in selected_channels]
# Channel energy values as strings:
channel_energy_strs = self.get_channel_energy_values("str")
if selection is not None:
if len(selection) == 3:
channel_energy_strs = channel_energy_strs[slice(selection[0], selection[1], selection[2])]
else:
channel_energy_strs = channel_energy_strs[slice(selection[0], selection[1])]
# creation of the figure
fig, ax = plt.subplots(figsize=(9, 6))
# settings of title
ax.set_title(f"{sc_identifier} {instrument.upper()}, {s_identifier}")
ax.grid(True)
# settings for y and x axes
ax.set_yscale("log")
ax.set_ylabel(r"Intensity [1/(cm$^{2}$ sr s MeV)]")
ax.set_xlabel(r"$t_{0} = t - L/v$")
ax.xaxis_date()
ax.xaxis.set_major_formatter(DateFormatter('%H:%M\n%b %d'))
if xlim is None:
ax.set_xlim(dataframe.index[0], dataframe.index[-1])
else:
try:
ax.set_xlim(xlim[0], xlim[1])
except ValueError:
ax.set_xlim(pd.to_datetime(xlim[0]), pd.to_datetime(xlim[1]))
# So far I'm not sure how to return the original rcParams back to what they were in the case of this function,
# because it runs interactively and there is no clear "ending" point to the function.
# For now I'll attach the original rcparams to a class attribute, so that the user may manually return the parameters
# after they are done with tsa.
self.original_rcparams = self.save_and_update_rcparams("tsa")
# housekeeping lists
series_natural = []
series_norm = []
plotted_natural = []
plotted_norm = []
# go through the selected channels to create individual series and plot them
for i, channel in enumerate(selected_channels):
# construct series and its normalized counterpart
series = _flux2series(dataframe[channel], dataframe.index, resample) # TODO: this uses "cols_unc=[]" for resampling, which ignores all uncertainty columns! But they are not used here anyway for now.
series_normalized = _flux2series(series.values/np.nanmax(series.values), series.index) # deactivate resampling here be. series is already resampled in the line above
# store all series to arrays for later referencing
series_natural.append(series)
series_norm.append(series_normalized)
# save the plotted lines, NOTICE that they come inside a list of len==1
p1 = ax.step(series.index, series.values, c=f"C{i}", visible=True, label=f"{channel_nums[i]}: {channel_energy_strs[i]}")
p2 = ax.step(series_normalized.index, series_normalized.values, c=f"C{i}", visible=False) # normalized lines are initially hidden
# store plotted line objects for later referencing
plotted_natural.append(p1[0])
plotted_norm.append(p2[0])
plt.legend(loc=1, bbox_to_anchor=(1.0, 0.25), fancybox=True, shadow=False, ncol=1, fontsize=9)
# widget objects, slider and button
style = {'description_width': 'initial'}
slider = widgets.FloatSlider(value=min_slider_val,
min=min_slider_val,
max=max_slider_val,
step=stepsize,
continuous_update=True,
description="Path length L [AU]: ",
style=style
)
# button = widgets.Checkbox(value = False,
# description = "Normalize",
# indent = True
# )
button = widgets.RadioButtons(value='original data',
description='Intensity:',
options=['original data', 'normalized'],
disabled=False
)
# A box for the path length
path_label = f"R={radial_distance_value:.2f} AU\nL = {slider.value} AU"
text = plt.text(0.02, 0.03, path_label, transform=ax.transAxes,
bbox=dict(boxstyle="square",
ec=(0., 0., 0.),
fc=(1., 1.0, 1.0),
))
def timeshift(sliderobject):
"""
timeshift connects the slider to the shifting of the plotted curves
"""
# shift the x-values (times) by the timedelta
for i, line in enumerate(plotted_natural):
# calculate the timedelta in seconds corresponding to the change in the path length
# The relevant part here is sliderobject["old"]! It saves the previous value of the slider!
timedelta_sec = shift_coefficients[i]*(slider.value - sliderobject["old"])
# Update the time value
line.set_xdata(line.get_xdata() - pd.Timedelta(seconds=timedelta_sec))
for i, line in enumerate(plotted_norm):
# calculate the timedelta in seconds corresponding to the change in the path length
# The relevant part here is sliderobject["old"]! It saves the previous value of the slider!
timedelta_sec = shift_coefficients[i]*(slider.value - sliderobject["old"])
# Update the time value
line.set_xdata(line.get_xdata() - pd.Timedelta(seconds=timedelta_sec))
# Update the path label artist
text.set_text(f"R={radial_distance_value:.2f} AU\nL = {np.round(slider.value, 2)} AU")
# Effectively this refreshes the figure
fig.canvas.draw_idle()
def normalize_axes(button):
"""
this function connects the button to switching visibility of natural / normed curves
"""
# flip the truth values of natural and normed intensity visibility
for line in plotted_natural:
line.set_visible(not line.get_visible())
for line in plotted_norm:
line.set_visible(not line.get_visible())
# Reset the y-axis label
if plotted_natural[0].get_visible():
ax.set_ylabel(r"Intensity [1/(cm$^{2}$ sr s MeV)]")
else:
ax.set_ylabel("Intensity normalized")
# Effectively this refreshes the figure
fig.canvas.draw_idle()
slider.observe(timeshift, names="value")
display(slider)
button.observe(normalize_axes)
display(button)
def get_channel_energy_values(self, returns: str = "num") -> list:
"""
A class method to return the energies of each energy channel in either str or numerical form.
Parameters:
-----------
returns: str, either 'str' or 'num'
Returns:
---------
energy_ranges : list of energy ranges as strings
or
lower_bounds : list of lower bounds of each energy channel in eVs
higher_bounds : list of higher bounds of each energy channel in eVs
:meta private:
"""
# First check by spacecraft, then by sensor
if self.spacecraft == "solo":
# STEP, ETP and HET energies are in the same object
energy_dict = self.current_energies
if self.species in ("electron", 'e'):
energy_ranges = energy_dict["Electron_Bins_Text"]
else:
p_identifier = "Ion_Bins_Text" if self.sensor == "ept" else "H_Bins_Text" if self.sensor == "het" else "Bins_Text"
energy_ranges = energy_dict[p_identifier]
# Each element in the list is also a list with len==1 for cdflib < 1.3.3, so fix that
energy_ranges = energy_ranges.flatten()
if self.spacecraft[:2] == "st":
# STEREO/SEPT energies come in two different objects
if self.sensor == "sept":
if self.species in ("electron", 'e'):
energy_df = self.current_e_energies['channels_dict_df_e']
else:
energy_df = self.current_i_energies['channels_dict_df_p']
energy_ranges = energy_df["ch_strings"].values
# STEREO/HET energies all in the same dictionary
else:
energy_dict = self.current_energies
if self.species in ("electron", 'e'):
energy_ranges = energy_dict["Electron_Bins_Text"]
else:
energy_ranges = energy_dict["Proton_Bins_Text"]
# Each element in the list is also a list with len==1 for cdflib < 1.3.3, so fix that
energy_ranges = energy_ranges.flatten()
if self.spacecraft == "soho":
if self.sensor.lower() == "erne":
energy_ranges = self.current_energies["channels_dict_df_p"]["ch_strings"].values
if self.sensor.lower() == "ephin":
# Choose only the first 4 channels (E150, E300, E1300 and E3000)
# These are the only electron channels (rest are p and He), and we
# use only electron data here.
energy_ranges = [val for val in self.current_energies['energy_labels'].values()][:4]
if self.sensor.lower() in ("ephin-5", "ephin-15"):
energy_ranges = [value for _, value in self.current_energies.items()]
if self.spacecraft == "psp":
energy_dict = self.meta
if self.sensor == "isois-epihi":
if self.species == 'e':
energy_ranges = energy_dict["Electrons_ENERGY_LABL"]
if self.species == 'p':
energy_ranges = energy_dict["H_ENERGY_LABL"]
# Each element in the list is also a list with len==1 for cdflib < 1.3.3, so fix that
energy_ranges = energy_ranges.flatten()
if self.sensor == "isois-epilo":
# The metadata of ISOIS-EPILO comes in a bit of complex form, so some handling is required
if self.species == 'e':
chan = 'F'
energies = self.meta[f"Electron_Chan{chan}_Energy"].filter(like=f"_P{self.viewing}").values
# Calculate low and high boundaries from mean energy and energy deltas
energies_low = energies - self.meta[f"Electron_Chan{chan}_Energy_DELTAMINUS"].filter(like=f"_P{self.viewing}").values
energies_high = energies + self.meta[f"Electron_Chan{chan}_Energy_DELTAPLUS"].filter(like=f"_P{self.viewing}").values
# Round the numbers to one decimal place
energies_low_rounded = np.round(energies_low, 1)
energies_high_rounded = np.round(energies_high, 1)
# I think nan values should be removed at this point. However, if we were to do that, then print_energies()
# will not work anymore since the number of channels and channel energy ranges won't be the same.
# In the current state PSP/ISOIS-EPILO cannot be examined with dynamic_spectrum(), because there are nan values
# in the channel energy ranges.
# energies_low_rounded = np.array([val for val in energies_low_rounded if not np.isnan(val)])
# energies_high_rounded = np.array([val for val in energies_high_rounded if not np.isnan(val)])
# produce energy range strings from low and high boundaries
energy_ranges = np.array([str(energies_low_rounded[i]) + ' - ' + str(energies_high_rounded[i]) + " keV" for i in range(len(energies_low_rounded))])
# Probably just a temporary thing, but cut all the range strings containing ´nan´ in them to avoid problems with
# dynamic spectrum
energy_ranges = np.array([s for s in energy_ranges if "nan" not in s])
if self.spacecraft == "wind":
if self.species == 'e':
energy_ranges = np.array(self.meta_e["channels_dict_df"]["Bins_Text"])
if self.species == 'p':
energy_ranges = np.array(self.meta_i["channels_dict_df"]["Bins_Text"])
if self.spacecraft == "bepi":
if self.species == 'e':
energy_ranges = [value for key, value in self.meta[f'Side{self.viewing}_Electron_Bins_str'].items()]
if self.species == 'p':
energy_ranges = [value for key, value in self.meta[f'Side{self.viewing}_Proton_Bins_str'].items()]
# Check what to return before running calculations
if returns == "str":
return energy_ranges
else:
# add bepi L3 support below?
if self.spacecraft == "bepi":
raise NotImplementedError("BepiColombo/SIXS-P not yet implemented!")
# From this line onward we extract the numerical values from low and high boundaries, and return floats, not strings
lower_bounds, higher_bounds = [], []
for energy_str in energy_ranges:
# Sometimes there is no hyphen, but then it's not a range of energies
try:
lower_bound, temp = energy_str.split('-')
except ValueError:
continue
# Generalize a bit here, since temp.split(' ') may yield a variety of different lists
components = temp.split(' ')
try:
# PSP meta strings can have up to 4 spaces
if self.spacecraft == "psp":
higher_bound, energy_unit = components[-2], components[-1]
# SOHO/ERNE meta string has space, high value, space, energy_str
elif self.spacecraft == "soho" and self.sensor == "erne":
higher_bound, energy_unit = components[1], components[-1]
# Normal meta strs have two components: bounds and the energy unit
else:
higher_bound, energy_unit = components
# It could be that the strings are not in a standard format, so check if
# there is an empty space before the second energy value
except ValueError:
try:
_, higher_bound, energy_unit = components
# It could even be that for some godforsaken reason there are empty spaces
# between the numbers themselves, so take care of that too
except ValueError:
if components[-1] not in ["keV", "MeV"]:
higher_bound, energy_unit = components[1], components[2]
else:
higher_bound, energy_unit = components[1]+components[2], components[-1]
lower_bounds.append(float(lower_bound))
higher_bounds.append(float(higher_bound))
# Transform lists to numpy arrays for performance and convenience
lower_bounds, higher_bounds = np.asarray(lower_bounds), np.asarray(higher_bounds)
# Finally before returning the lists, make sure that the unit of energy is eV
if energy_unit == "keV":
lower_bounds, higher_bounds = lower_bounds * 1e3, higher_bounds * 1e3
elif energy_unit == "MeV" or energy_unit == "MeV/n":
lower_bounds, higher_bounds = lower_bounds * 1e6, higher_bounds * 1e6
# This only happens with ephin, which has MeV as the unit of energy (Christian?)
# Jan: It works fine with EPHIN, and we should NOT have a catch-all else here, as it may hide bugs with other instruments. I added a raise instead.
# Jan 2: Ok, so this did apply for STEP which has "MeV/n"! Added this above
else:
# lower_bounds, higher_bounds = lower_bounds * 1e6, higher_bounds * 1e6
raise ValueError("Unknown energy unit encountered when extracting channel energy values. Please report this issue!")
return lower_bounds, higher_bounds
def calculate_particle_speeds(self):
"""
Calculates average particle speeds by input channel energy boundaries.
:meta private:
"""
if self.species in ["electron", 'e']:
m_species = const.m_e.value
if self.species in ['p', "ion", 'H']:
m_species = const.m_p.value
C_SQUARED = const.c.value*const.c.value
# E=mc^2, a fundamental property of any object with mass
mass_energy = m_species*C_SQUARED # e.g. 511 keV/c for electrons
# Get the energies of each energy channel, to calculate the mean energy of particles and ultimately
# To get the dimensionless speeds of the particles (beta)
e_lows, e_highs = self.get_channel_energy_values() # get_energy_channels() returns energy in eVs
mean_energies = np.sqrt(np.multiply(e_lows, e_highs))
# Transform kinetic energy from electron volts to joules
e_Joule = [((En*u.eV).to(u.J)).value for En in mean_energies]
# Beta, the unitless speed (v/c)
beta = [np.sqrt(1-((e_J/mass_energy + 1)**(-2))) for e_J in e_Joule]
return np.array(beta)*const.c.value
[docs]
def print_energies(self, return_df=False):
"""
Prints out the channel name / energy range pairs
Parameter:
---------
return_df : {bool} default False. If True, returns the df instead of displaying it.
"""
from IPython.display import display
# This has to be run first, otherwise self.current_df does not exist
# Note that PSP will by default have its viewing=='all', which does not yield proper dataframes
if self.viewing != 'all':
if self.spacecraft == 'solo' and not self.viewing:
raise Warning("For this operation the instrument's 'viewing' direction must be defined in the call of 'Event'! Please define and re-run.")
return
else:
self.choose_data(self.viewing)
else:
if self.sensor == "isois-epihi":
# Just choose data with either ´A´ or ´B´. I'm not sure if there's a difference
self.choose_data('A')
if self.sensor == "isois-epilo":
# ...And here with ´3´ or ´7´. Again I don't know if it'll make a difference
self.choose_data('3')
if self.species in ['e', "electron"]:
channel_names = self.current_df_e.columns
SOLO_EPT_CHANNELS_AMOUNT = 34
SOLO_HET_CHANNELS_AMOUNT = 4
if self.species in ['p', 'i', 'H', "proton", "ion"]:
channel_names = self.current_df_i.columns
SOLO_EPT_CHANNELS_AMOUNT = 64
SOLO_HET_CHANNELS_AMOUNT = 36
# Extract only the numbers from channel names
if self.spacecraft == "solo":
if self.sensor == "step":
channel_names = list(channel_names)
channel_numbers = np.array([int(name.split('_')[-1]) for name in channel_names])
if self.sensor == "ept":
channel_names = [name[1] for name in channel_names[:SOLO_EPT_CHANNELS_AMOUNT]]
channel_numbers = np.array([int(name.split('_')[-1]) for name in channel_names])
if self.sensor == "het":
channel_names = [name[1] for name in channel_names[:SOLO_HET_CHANNELS_AMOUNT]]
channel_numbers = np.array([int(name.split('_')[-1]) for name in channel_names])
if self.spacecraft in ["sta", "stb"] or self.sensor == "erne":
channel_numbers = np.array([int(name.split('_')[-1]) for name in channel_names])
if self.sensor == "ephin":
channel_numbers = np.array([int(name.split('E')[-1]) for name in channel_names])
if self.sensor in ["ephin-5", "ephin-15"]:
channel_numbers = [5, 15]
if self.sensor == "isois-epihi":
channel_numbers = np.array([int(name.split('_')[-1]) for name in channel_names])
if self.sensor == "isois-epilo":
channel_numbers = [int(name.split('_E')[-1].split('_')[0]) for name in channel_names]
if self.sensor == "3dp":
channel_numbers = [int(name.split('_')[1][-1]) for name in channel_names]
if self.sensor == 'sixs-p' and self.data_level == 'l3':
channel_numbers = [int(name.split('_')[1][-1]) for name in channel_names]
if self.sensor == 'sixs-p' and self.data_level == 'l2':
custom_notification("print_energies() is not supporting internal Level 2 data of BepiColombo/SIXS-P!")
return
# Remove any duplicates from the numbers array, since some dataframes come with, e.g., 'ch_2' and 'err_ch_2'
channel_numbers = np.unique(channel_numbers)
energy_strs = self.get_channel_energy_values("str")
# The following behaviour has been fixed upstream. Keeping this here for
# now in case someone is missing it.
# # SOHO/EPHIN returns one too many energy strs, because one of them is 'deactivated bc. or failure mode D'
# # if self.sensor == "ephin":
# # energy_strs = energy_strs[:-1]
# Assemble a pandas dataframe here for nicer presentation
column_names = ("Channel", "Energy range")
if self.spacecraft == 'bepi' and self.data_level == 'l3':
column_names = ("Channel", "Effective energy")
column_data = {
column_names[0]: channel_numbers,
column_names[1]: energy_strs}
df = pd.DataFrame(data=column_data)
# Set the channel number as the index of the dataframe
df = df.set_index(column_names[0])
# Finally display the dataframe such that ALL rows are shown
if not return_df:
with pd.option_context('display.max_rows', None,
'display.max_columns', None,
):
display(df)
else:
return df
def save_and_update_rcparams(self, plotting_function: str):
"""
A class method that saves the matplotlib rcParams that are preset before running a plotting routine, and then
updates the rcParams to fit the plotting routine that is being run.
Parameters:
-----------
plotting_function : str
The name of the plotting routine that is run, e.g., 'onset_tool', 'dynamic_spectrum' or 'tsa'
:meta private:
"""
# The original rcParams set by the user prior to running function
original_rcparams = rcParams.copy()
# Here are listed all the possible dictionaries for the different plotting functions
onset_options = {
"axes.linewidth": 1.5,
"font.size": 16
}
dyn_spec_options = {
"axes.linewidth": 2.8,
"font.size": 28 if self.radio_spacecraft is None else 20,
"axes.titlesize": 32,
"axes.labelsize": 28 if self.radio_spacecraft is None else 26,
"xtick.labelsize": 28 if self.radio_spacecraft is None else 26,
"ytick.labelsize": 20 if self.radio_spacecraft is None else 18,
"pcolor.shading": "auto"
}
tsa_options = {
"axes.linewidth": 1.5,
"font.size": 12}
options_dict = {
"onset_tool": onset_options,
"dynamic_spectrum": dyn_spec_options,
"tsa": tsa_options}
# Finally we update rcParams with the chosen plotting options
rcParams.update(options_dict[plotting_function])
return original_rcparams