"""
Computes early warning signals (EWS) and characteristics of the mortality event for all time series in the provided folder.
This script computes AC1, var, skew, kurtosis and fractal dimension as EWS on the time series of NDVI, EVI, kNDVI.
It uses rolling windows of 1, 3, 5, or 10 years and computes kendall tau trends in EWS over different time periods
before the mortality event (full time series, time series before event, 1, 3, 5, 10 years before event).
The script also computes recovery rates based on fitting an exponential function to the five years after the mortality event
and resistance to the mortality as the drop in NDVI in the year of collapse compared to the mean of the three years before.
Usage: python3 ./code/02_empirical_analysis/04_ews_computation.py in_folder out_folder max_workers verbose force_recompute date_check
in_folder: Folder where the preprocessed time series files are stored (default: "./data/MOD13Q1_extracted_timeseries")
out_folder: Folder where the output files will be stored (default: "./data/MOD13Q1_ews_resistance_recovery")
max_workers: Number of parallel workers to use (default: 16)
verbose: If set to 1, prints more information (default: 1)
force_recompute: If set to 1, forces recomputation of all EWS, otherwise only computes files that have not been computed on the date specified by date_check (default: 0)
date_check: Date string (yy_mm_dd) to check for already computed files (default: today's date)
"""
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import geopandas as gpd
#from geodatasets import get_path
from sklearn import linear_model
from datetime import datetime
import statsmodels
from statsmodels.tsa import stattools
import statsmodels.api as sm
from statsmodels.tsa.seasonal import STL, MSTL
from numpy.lib.stride_tricks import sliding_window_view
from scipy.stats import kendalltau
from tqdm.autonotebook import tqdm
import warnings
import sys
from scipy.optimize import differential_evolution
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm # For progress tracking
import traceback
#Not recommended, but we'll do it anyways
warnings.filterwarnings("ignore", category=RuntimeWarning)
#Replace print with a time stamp print
_print = print
def print(*args, **kwargs):
import datetime
_print(datetime.datetime.now(), *args, **kwargs)
#Also define a vbprint option (verbose print)
def vbprint(*args, **kwargs):
if verbose:
print(*args, **kwargs)
### TOOL FUNCTIONS FOR EWS COMPUTATION ###
#Fractal dimension
def madogram(x, n, p = 1):
#Variation estimator
g1 = np.log(np.nansum(np.abs(x[1:] - x[:-1])) / (2 * (n - 1)))
g2 = np.log(np.nansum(np.abs(x[2:] - x[:-2])) / (2 * (n - 2)))
#Regression
x = np.log([1, 2])
xbar = np.mean(x)
x_corr = x - xbar
fd = 2 - (x_corr[0]*g1 + x_corr[1] * g2)/np.sum((x-xbar)**2)
#Note: if FD > 2, trim it to 2 as recommended by Hall & Woods (1993)
if fd > 2:
fd = 2
if fd < 1:
fd = 1
return fd
#Compute all EWS in rolling windows
def rolling_ews(y, window = 5*23, min_periods=5): #NOTE: as MOD13Q1 has a 16-day temporal resolution, there are 23 images per year
#Prepare formats
y = y.values
autocorr_values = np.empty(np.shape(y))*np.nan
var_values = np.empty(np.shape(y))*np.nan
skew_values = np.empty(np.shape(y))*np.nan
kurt_values = np.empty(np.shape(y))*np.nan
fd_values = np.empty(np.shape(y))*np.nan
#Adjusted computation: here, we don't use centered windows, but for each time point, the computed
#EWS value reflects the years before that
for start in range(y.shape[0]-int(window)): # - int(window/2)):
window_data = y[start:start + window]
if min_periods is not None and np.sum(~np.isnan(window_data)) >= min_periods:
#Compute mean and variance
mean_a = np.nanmean(window_data)
var_a = np.nanvar(window_data)
std_a = np.nanstd(window_data)
# Calculate the autocorrelation
n = np.sum(~np.isnan(window_data), axis = 0)
#Computing autocorrelation!
autocorr = (1 / n * np.nansum((window_data[1:] - mean_a) * (window_data[:-1] - mean_a), axis=0)) / var_a
#Calculate skewness
skew = np.nanmean(((window_data-mean_a)/std_a)**3, axis = 0)
#Calculate kurtosis
kurt = np.nanmean(((window_data-mean_a)/std_a)**4, axis = 0)
#Calculate fractal dimension
fd = madogram(window_data, n = n)
else:
autocorr = np.nan
var_a = np.nan
skew = np.nan
kurt = np.nan
fd = np.nan
#Return autocorrelation, variance, skewness, kurtosis and fractal dimension
# Note: here, also adapted from original computation to have non-centered rolling windows
autocorr_values[int(start + window)] = autocorr
var_values[int(start + window)] = var_a
skew_values[int(start + window)] = skew
kurt_values[int(start + window)] = kurt
fd_values[int(start + window)] = fd
return (autocorr_values, var_values, skew_values, kurt_values, fd_values)
#Tool function to compute kendall tau
def kendall_tau(x, y):
kt = kendalltau(x, y, nan_policy = "omit")
return kt.statistic, kt.pvalue
#Compute delta as the signed maximum change in the EWS timeseries (per Rocha, 2022)
def delta(x, y):
try:
#Get maximum and minimum values
maxv = np.nanmax(y)
minv = np.nanmin(y)
except ValueError: #raised if `y` is empty.
print("value error, setting maxv and minv both to nan")
maxv = np.nan
minv = np.nan
if (np.isnan(maxv) & np.isnan(minv)):
dval = [np.nan]
elif maxv == minv:
maxt = np.nan
mint = np.nan
dval = [0]
else:
maxt = np.array(x[y == maxv])
mint = np.array(x[y == minv])
if len(maxt) > 1:
maxt = maxt[0]
if len(mint) > 1:
mint = mint[0]
#Compute delta
sign = (mint > maxt) *-2 +1
dval = sign * (maxv-minv)
return dval
#Compute kendall tau trends & delta in EWS for different setups
def trend_computation(df, option, ews):
df['collapse_time'] = pd.to_datetime(f'{df.year_disturbance.unique()[0]}-01-01')
#print(ews)
if option == "full":
df = df
elif option == "before":
df = df.loc[df.cleandate < df.collapse_time].reset_index(drop = True)
elif option == "collapse_year":
df = df.loc[((df.cleandate >= df.collapse_time) & (df.cleandate < df.collapse_time + pd.DateOffset(years = 1)))].reset_index(drop = True)
elif option == "1yr":
df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 1)))].reset_index(drop = True)
elif option == "3yr":
df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 3)))].reset_index(drop = True)
elif option == "5yr":
df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 5)))].reset_index(drop = True)
elif option == "10yr":
df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 10)))].reset_index(drop = True)
else:
print("option needs to be one of 'full', 'before','collapse_year', '1yr', '3yr', '5yr', '10yr'.")
#Compute kendall taus
kt_stat, kt_pval = kendall_tau(df.cleandate.values, df[ews].values)
#Compute delta
dval = delta(df.cleandate.values, df[ews].values)
if not df.shape[0] == 0:
#Wrap this into dataframe
return pd.DataFrame({ "entry_id" : df.entry_id.unique()[0],
"paired_id" : df.paired_id.unique()[0],
"VI" : df.VI.unique()[0],
"true_pos_neg" : df.true_pos_neg.unique()[0],
"rolling_window" : df.rolling_window.unique()[0],
"Setup" : option,
"EWS" : ews,
"n" : df.shape[0],
"kt_stat" : kt_stat,
"kt_pval" : kt_pval,
"delta" : dval}, index = [0])
else:
print("careful, this dataframe is empty, returning nothing")
return pd.DataFrame({ "entry_id" : np.nan,
"paired_id" : np.nan,
"VI" : np.nan,
"true_pos_neg": np.nan,
"rolling_window" : np.nan,
"Setup" : option,
"EWS" : ews,
"n" : np.nan,
"kt_stat" : kt_stat,
"kt_pval" : kt_pval,
"delta" : dval}, index = [0])
#Inner pipeline to compute EWS for one entry_id and one rolling window
def inner_pipeline(df, entry_id, rolling_window, preprocess):
df = df.loc[df.entry_id == entry_id].reset_index(drop = True)
#print('shape of df for entry_id', entry_id, ':', df.shape)
VI = df.VI.unique()[0]
#Detrending and deseasoning
#Calculate time difference to initial time
df["date_diff"] = df.cleandate - df.cleandate[0]
df["date_diff"] = df["date_diff"].astype(int)
#STL as preprocessing method - this is not currently used
if preprocess == "stl":
#Subtract linear trend
#print("linear detrending...")
lin_trend = linear_model.LinearRegression()
nrow = df.shape[0]
#check for nans
if np.sum(np.isnan(df.loc[:, f"VI_value"])) > 0:
print("nans in VI_value")
df[f"VI_dtl"] = np.nan
else:
lin_trend.fit(df.loc[:, ["date_diff"]], df.loc[:, f"VI_value"])
#Compute residuals
df[f"VI_dtl"] = df[f"VI_value"] - pd.Series(lin_trend.predict(df.loc[:, ["date_diff"]]))
# Construct a multiple STL regressor using time as predictor
#print("stl...")
ynew = df.loc[:, f"VI_dtl"]
if df.shape[0] > 23*5:
d = MSTL(ynew, periods = (23 * 1, 23*5)).fit()
#Assign the residuals as deseasoned data
df[f"VI_dtl_stl"] = d.resid
else:
print("time series is too short, only", df.shape[0], "points")
df[f"VI_dtl_stl"] = np.nan
#Harmonic deseasoning as preprocessing method (per Smith & Boers, 2020)
elif preprocess == "harmonic":
print('harmonic deseasoning preprocessing...')
#Do the rolling mean and harmonic deseasoning preprocessing from Smith & Boers
#Add a radian time column
df['t'] = (df['cleandate'] - pd.Timestamp("1970-01-01")).dt.total_seconds() / (365.25 * 24 * 3600)
df['t'] *= 2 * np.pi # Convert to radians
#Rolling mean detrend with 5 year rolling windows
df[f'VI_dtl'] = df['VI_value'] - df['VI_value'].rolling(window=23 * 5, center=True, min_periods = 12).agg(lambda x: np.nanmean(x))
#if this is not all nan, keep going
if np.sum(np.isnan(df[f'VI_dtl'])) < df.shape[0]:
#Harmonic deseasoning
def harmonic_model(df, harmonics = 3):
cos_list = []
sin_list = []
for h in range(harmonics):
#Create cosine terms
df[f'cos_{h}'] = np.cos(df.t*h)
#Create sine terms
df[f'sin_{h}'] = np.sin(df.t*h)
df['constant'] = 1
return df
df = harmonic_model(df, harmonics = 3)
#Train linear model to get the coefficients of the different cos and sine terms
df_train = df.dropna(subset = ['constant', 'cos_0', 'cos_1', 'cos_2', 'sin_0', 'sin_1', 'sin_2', 'VI_dtl'])
X = df_train[['constant', 'cos_0', 'cos_1', 'cos_2', 'sin_0', 'sin_1', 'sin_2']]
y = df_train['VI_dtl']
model = sm.OLS(y, X).fit()
#Compute the model's value and subtract
df[f'VI_dtl_stl'] = df['VI_dtl'] - model.predict(df[['constant', 'cos_0', 'cos_1', 'cos_2', 'sin_0', 'sin_1', 'sin_2']])
else:
vbprint("all nan values in VI_dtl, skipping harmonic deseasoning")
df[f'VI_dtl_stl'] = np.nan
else:
print("preprocess needs to be either of 'stl' or 'harmonic'")
#Stop execution
sys.exit()
#Application on entire time window
vbprint("ews application...")
ews_full = rolling_ews(df.loc[:,f'VI_dtl_stl'], window = int(rolling_window)*23)
#Results to dataframe
column_names = ['ews_ac1', 'ews_var', 'ews_skew', 'ews_kurt', 'ews_fd']
# Convert tuple of arrays to a DataFrame
ews_full_df = pd.DataFrame(data=tuple(zip(*ews_full)), columns=column_names)
ews_full_df["rolling_window"] = rolling_window
#Add this onto existing df
results_full = pd.concat([df, ews_full_df], axis = 1)
#Trend computation: wrap to apply for all EWS and setups
vbprint("computing trends...")
ews_eval = []
for option in ["full", "before","collapse_year", "1yr", "3yr", "5yr", "10yr"]:
for ews in [f"ews_{i}" for i in ["ac1", "var", "skew", "kurt", "fd"]]:
#print(option, ews)
ews_eval.append(trend_computation(results_full, option, ews))
ews_eval = pd.concat(ews_eval)
ews_eval["pval_sig"] = ews_eval.kt_pval < 0.05
return results_full, ews_eval
#Wrapper function to apply EWS application to all entry_id values in df
def outer_pipeline(df, rolling_window, preprocess):
results_df = []
ews_df = []
for r in df.entry_id.unique():
#Loop through different options of true_pos_neg
for tp in df.true_pos_neg.unique():
#print(tp)
results_add, ews_add = inner_pipeline(df.loc[df.true_pos_neg == tp], r, rolling_window, preprocess)
results_df.append(results_add)
ews_df.append(ews_add)
vbprint("ran through all entry_id values, concatenating dfs")
#Make into massive dataframe
results_df = pd.concat(results_df).reset_index(drop = True)
ews_df = pd.concat(ews_df).reset_index(drop = True)
return results_df, ews_df
### TOOLS FOR RECOVERY RATE COMPUTATION ###
#Function to cut the timeseries properly
def cut_timeseries(df, entry_id, true_pos_neg, VI):
#Get collapse year
collapse_year = df.loc[df.entry_id == entry_id].year_disturbance.unique()[0]
#print("collapse year:", collapse_year)
#within the year of collapse, take the lowest point of dtl_stl and then the five years after. fit exponential function to this and get rate r
ts = df.loc[((df.entry_id == entry_id) & (df.true_pos_neg == true_pos_neg) & (df.VI == VI)), ['cleandate', 'VI_dtl_stl']].reset_index(drop = True)
#Find minimum point in year of collapse
min_date = ts.loc[ts.cleandate.dt.year == collapse_year]
min_date = min_date.loc[min_date.VI_dtl_stl == min_date.VI_dtl_stl.min()]
#If this is an empty dataframe, return nans
if min_date.shape[0] == 0:
vbprint("timeseries is empty, returning nans")
return pd.DataFrame(), np.nan
else:
#Take the five years after this
ts_fit = ts.loc[((ts.cleandate >= min_date.cleandate.values[0]) & (ts.cleandate < min_date.cleandate.values[0] + pd.DateOffset(years = 5)))].reset_index(drop = True)
return ts_fit, min_date.cleandate.values[0]
#Fit exponential function to a range of datasets
def fit_exponential_global(df, entry_id, true_pos_neg, VI):
#Cut the timeseries properly
ts, min_date = cut_timeseries(df, entry_id, true_pos_neg, VI)
if not ts.shape[0] == 0:
time_col = 'cleandate'
value_col = 'VI_dtl_stl'
# Copy and check input
ts = ts.copy().dropna(how='any')
ts = ts[[time_col, value_col]].dropna()
ts[time_col] = pd.to_datetime(ts[time_col])
t0 = ts[time_col].min()
ts['t'] = (ts[time_col] - t0).dt.total_seconds() / (3600 * 24)
#Check shape of ts again
#print('shape of ts after dropping nans:', ts.shape)
x_data = ts['t'].values
y_data = ts[value_col].values
# Clean: filter out non-finite values
mask = np.isfinite(x_data) & np.isfinite(y_data)
x_data, y_data = x_data[mask], y_data[mask]
if len(y_data) < 2 or np.all(y_data == y_data[0]):
raise ValueError("Not enough variation in data to fit an exponential model.")
# Safeguard for bounds
y_max = np.nanmax(y_data)
if not np.isfinite(y_max) or y_max <= 0:
raise ValueError("Invalid y-data values for exponential fitting.")
y_min = np.nanmin(y_data)
# Loss function
def loss(params):
x0, r = params
y_pred = x0 * np.exp(r * x_data)
return np.sum((y_data - y_pred) ** 2)
# Define finite, reasonable bounds
x0_bounds = (y_min*10, -1e-6)
r_bounds = (-1, 1)
bounds = [x0_bounds, r_bounds]
# Run global optimization
result = differential_evolution(loss, bounds, seed=42)
x0_opt, r_opt = result.x
# Predict and calculate R^2
y_pred = x0_opt * np.exp(r_opt * x_data)
ss_res = np.sum((y_data - y_pred) ** 2)
ss_tot = np.sum((y_data - np.mean(y_data)) ** 2)
r_squared = 1 - ss_res / ss_tot
#print(r_squared)
return [min_date, x0_opt, r_opt, r_squared]
else:
vbprint("timeseries to fit is empty, returning nans")
return [np.nan, np.nan, np.nan, np.nan]
#Function to apply this to all timeseries
def fit_exponential(df):
#print(df)
#Group df by entry_id and true_pos_neg, then apply this function
df_grouped = df.groupby(['entry_id', 'true_pos_neg', 'VI'])
#print(df_grouped)
results = []
for name, group in df_grouped:
#print(group.shape)
try:
res = fit_exponential_global(df, name[0], name[1], name[2])
results.append([name[0], name[1], name[2]] + res)
#print(f"Fitted exponential for {name}, results: {res}")
except ValueError as e:
print(f"Skipping {name}: {e}")
results.append([name[0], name[1], name[2], np.nan, np.nan, np.nan, np.nan])
vbprint("fitted exponential functions to all timeseries, compiling results")
#print(results)
#Convert to dataframe
results_df = pd.DataFrame(results, columns=['entry_id', 'true_pos_neg', 'VI', 'min_date', 'x0', 'r', 'r_squared'])
return results_df
### TOOLS TO COMPUTE RESISTANCE ###
# Function to compute resistance for one timeseries
def resistance_tool(df, collapse_year):
#Compute resistance
resistance = df.VI_value.loc[df.cleandate.dt.year == collapse_year].mean() - df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].mean()
mean_before = df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].mean()
resistance_rel = resistance / mean_before
#Also get the min value in collapse year as number of sd deviations
sd_before = df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].std()
resistance_max = df.VI_value.loc[df.cleandate.dt.year == collapse_year].min() - df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].mean()
resistance_sd_ratio = resistance_max / sd_before
#Also compute maximum relative resistance
resistance_max_rel = resistance_max / mean_before
return resistance, mean_before, resistance_rel, resistance_max, sd_before, resistance_sd_ratio, resistance_max_rel
#Compute
def resistance_one_timeseries(df, in_path):
#Pivot EVI and NDVI to long format
#df['kNDVI'] = np.tanh(df['NDVI'] ** 2)
#df = df.melt(id_vars=['entry_id', 'time', 'year_disturbance', 'true_pos_neg'], value_vars=['NDVI', 'EVI', 'kNDVI'], var_name='VI', value_name = 'VI_value')
#Apply this to the grouped dataframe
df_res = df.groupby(['entry_id', 'true_pos_neg', 'VI']).apply(resistance_tool,
collapse_year = df.year_disturbance.unique()[0], include_groups = True).reset_index().rename(columns = {0:'resistance'})
#Expand the resistance column
df_res[['resistance', 'mean_before', 'resistance_rel', 'resistance_max', 'sd_before', 'resistance_sd_ratio', 'resistance_max_rel']] = pd.DataFrame(df_res['resistance'].tolist(), index=df_res.index)
#Save the result
return df_res
#Pipeline to compute EWS, recovery rates and resistance for one entry
def pipeline_one_entry(df_path, out_folder, date_check, preprocess = 'harmonic', force_recompute = True):
entry_id = df_path.split('entry_')[1].split('_')[0]
#Check if this already exists in out_folder
if ((os.path.exists(os.path.join(out_folder, f"{date_check}_{entry_id}_total_df.feather"))) & (not force_recompute)):
print("this has already been computed")
return
else:
print('computing')
#Save an empty file to indicate that this is being computed
open(os.path.join(out_folder, f"{date_check}_id_{entry_id}_total_df.feather"), 'a').close()
#Load df
df = pd.read_feather(df_path)
#Preprocessing
df['kNDVI'] = np.tanh(df['NDVI'] ** 2)
df['cleandate'] = pd.to_datetime(df['time'])
#Pivot to long form
df = df.melt(id_vars = ['entry_id', 'cleandate', 'lat', 'lon', 'year_disturbance', 'paired_id', 'true_pos_neg', 'x','y', 'SummaryQA'],
value_vars = ['NDVI', 'kNDVI', 'EVI'], var_name = 'VI', value_name = 'VI_value')
#Mask out those values that are low quality
df.loc[df.SummaryQA != 0, 'VI_value'] = np.nan
#Loop over VI options
total_list = []
ews_list = []
for VI in ['NDVI','kNDVI', 'EVI']:
df_sub = df.loc[df.VI == VI].reset_index(drop = True)
#print("shape of df_sub:", df_sub.shape)
for roll in [1, 3, 5, 10]:
vbprint(f"{VI}: rolling window length {roll}")
#Compute
total_df, ews_df = outer_pipeline(df_sub, roll, preprocess = preprocess)
vbprint("saving these to file")
#Save these
total_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_total_df_rolling_window_{roll}.feather"))
ews_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_ews_df_rolling_window_{roll}.feather"))
#Also append to list
total_list.append(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_total_df_rolling_window_{roll}.feather"))
ews_list.append(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_ews_df_rolling_window_{roll}.feather"))
#Join all the files into one
vbprint("joining all the files into one")
total_df = pd.concat([pd.read_feather(f) for f in total_list])
ews_df = pd.concat([pd.read_feather(f) for f in ews_list])
vbprint('saving EWS and total files')
#Save the final files
total_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_total_df.feather"))
ews_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_ews_df.feather"))
#Compute recovery rates
vbprint("fitting the exponential function")
#Fit the exponential function
exp_df = fit_exponential(total_df)
#Save this to file
exp_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_exp_df.feather"))
#Compute resistance
vbprint("computing resistance")
res_df = resistance_one_timeseries(df, df_path)
#Save this to file
res_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_resistance_df.feather"))
#Remove the temporary files
vbprint("removing the temporary files")
#print(total_list)
for f in total_list:
os.remove(f)
for f in ews_list:
os.remove(f)
#Main function to compute EWS for all timeseries in a folder
def main(in_folder,
out_folder,
max_workers,
force_recompute,
date_check):
#Check if out_folder exists, if not, create it
if not os.path.exists(out_folder):
os.makedirs(out_folder)
#Load data one by one
ts_files = os.listdir(in_folder)
ts_files = [os.path.join(in_folder, i) for i in ts_files if ((i.endswith('timeseries.feather')) and not ('all' in i))]
#Set global preprocessing option
preprocess = 'harmonic'
date_check = str(date_check)
#Parallel application to the different ts_files
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(pipeline_one_entry, path, out_folder, date_check)
for path in ts_files
]
for future in tqdm(as_completed(futures), total=len(futures), desc="Processing"):
try:
future.result()
except Exception as e:
print(f"Error processing file: {e}")
# Handle the exception as needed
traceback.print_exc()
#Join all files of each type into one
for suffix in ['total_df', 'ews_df', 'exp_df', 'resistance_df']:
all_files = [os.path.join(out_folder, f) for f in os.listdir(out_folder) if f.endswith(f"{suffix}.feather") and f.startswith(date_check)]
#print(all_files)
if len(all_files) > 0:
vbprint(f"joining all {suffix} files into one")
all_df = pd.concat([pd.read_feather(f) for f in all_files])
all_df.to_feather(os.path.join(out_folder, f"{date_check}_all_{suffix}.feather"))
#Remove the individual files
#for f in all_files:
# os.remove(f)
else:
vbprint(f"No files found for suffix {suffix}, skipping joining step.")
if __name__ == "__main__":
#Get arguments from command line
in_folder_timeseries = sys.argv[1] if len(sys.argv) > 1 else './data/intermediary/MOD13Q1_extracted_timeseries/'
out_folder_ews = sys.argv[2] if len(sys.argv) > 2 else './data/final/MOD13Q1_ews_resistance_recovery/'
max_workers = int(sys.argv[3]) if len(sys.argv) > 3 else 16
verbose = int(sys.argv[4]) if len(sys.argv) > 4 else 1
verbose = bool(verbose)
force_recompute = int(sys.argv[5]) if len(sys.argv) > 5 else 0
force_recompute = bool(force_recompute)
date_check = int(sys.argv[6]) if len(sys.argv) > 6 else datetime.today().strftime('%y_%m_%d')
date_check = str(date_check)
main(in_folder=in_folder_timeseries, out_folder=out_folder_ews, max_workers=max_workers, force_recompute=force_recompute, date_check=date_check)