25-09-ews-assessment / code / 02_empirical_analysis / 04_ews_computation.py
04_ews_computation.py
Raw
"""
Computes early warning signals (EWS) and characteristics of the mortality event for all time series in the provided folder.

This script computes AC1, var, skew, kurtosis and fractal dimension as EWS on the time series of NDVI, EVI, kNDVI.
It uses rolling windows of 1, 3, 5, or 10 years and computes kendall tau trends in EWS over different time periods 
before the mortality event (full time series, time series before event, 1, 3, 5, 10 years before event).
The script also computes recovery rates based on fitting an exponential function to the five years after the mortality event
and resistance to the mortality as the drop in NDVI in the year of collapse compared to the mean of the three years before.

Usage: python3 ./code/02_empirical_analysis/04_ews_computation.py in_folder out_folder max_workers verbose force_recompute date_check

in_folder: Folder where the preprocessed time series files are stored (default: "./data/MOD13Q1_extracted_timeseries")
out_folder: Folder where the output files will be stored (default: "./data/MOD13Q1_ews_resistance_recovery")
max_workers: Number of parallel workers to use (default: 16)
verbose: If set to 1, prints more information (default: 1)
force_recompute: If set to 1, forces recomputation of all EWS, otherwise only computes files that have not been computed on the date specified by date_check (default: 0)
date_check: Date string (yy_mm_dd) to check for already computed files (default: today's date)

"""

import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import geopandas as gpd
#from geodatasets import get_path
from sklearn import linear_model
from datetime import datetime
import statsmodels
from statsmodels.tsa import stattools
import statsmodels.api as sm
from statsmodels.tsa.seasonal import STL, MSTL
from numpy.lib.stride_tricks import sliding_window_view
from scipy.stats import kendalltau
from tqdm.autonotebook import tqdm
import warnings
import sys
from scipy.optimize import differential_evolution
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm  # For progress tracking
import traceback


#Not recommended, but we'll do it anyways
warnings.filterwarnings("ignore", category=RuntimeWarning)

#Replace print with a time stamp print
_print = print
def print(*args, **kwargs):
    import datetime
    _print(datetime.datetime.now(), *args, **kwargs)

#Also define a vbprint option (verbose print)
def vbprint(*args, **kwargs):
    if verbose:
        print(*args, **kwargs)

### TOOL FUNCTIONS FOR EWS COMPUTATION ###

#Fractal dimension
def madogram(x, n, p = 1):
    #Variation estimator
    g1 = np.log(np.nansum(np.abs(x[1:] - x[:-1])) / (2 * (n - 1)))
    g2 = np.log(np.nansum(np.abs(x[2:] - x[:-2])) / (2 * (n - 2)))

    #Regression
    x = np.log([1, 2])
    xbar = np.mean(x)
    x_corr = x - xbar
    fd = 2 - (x_corr[0]*g1 + x_corr[1] * g2)/np.sum((x-xbar)**2)

    #Note: if FD > 2, trim it to 2 as recommended by Hall & Woods (1993)
    if fd > 2:
        fd = 2
    if fd < 1:
        fd = 1


    return fd

#Compute all EWS in rolling windows
def rolling_ews(y, window = 5*23, min_periods=5): #NOTE: as MOD13Q1 has a 16-day temporal resolution, there are 23 images per year
    #Prepare formats
    y = y.values
    autocorr_values = np.empty(np.shape(y))*np.nan
    var_values = np.empty(np.shape(y))*np.nan
    skew_values = np.empty(np.shape(y))*np.nan
    kurt_values = np.empty(np.shape(y))*np.nan
    fd_values = np.empty(np.shape(y))*np.nan

    #Adjusted computation: here, we don't use centered windows, but for each time point, the computed
    #EWS value reflects the years before that
    for start in range(y.shape[0]-int(window)): # - int(window/2)):
        window_data = y[start:start + window]
        if min_periods is not None and np.sum(~np.isnan(window_data)) >= min_periods:
            #Compute mean and variance
            mean_a = np.nanmean(window_data)
            var_a = np.nanvar(window_data)
            std_a = np.nanstd(window_data)

            # Calculate the autocorrelation
            n = np.sum(~np.isnan(window_data), axis = 0)
            #Computing autocorrelation!
            autocorr = (1 / n * np.nansum((window_data[1:] - mean_a) * (window_data[:-1] - mean_a), axis=0)) / var_a

            #Calculate skewness
            skew = np.nanmean(((window_data-mean_a)/std_a)**3, axis = 0)
            
            #Calculate kurtosis
            kurt = np.nanmean(((window_data-mean_a)/std_a)**4, axis = 0)

            #Calculate fractal dimension
            fd = madogram(window_data, n = n)

        else:
            autocorr = np.nan
            var_a = np.nan
            skew = np.nan
            kurt = np.nan
            fd = np.nan

        #Return autocorrelation, variance, skewness, kurtosis and fractal dimension   
        # Note: here, also adapted from original computation to have non-centered rolling windows     
        autocorr_values[int(start + window)] = autocorr
        var_values[int(start + window)] = var_a
        skew_values[int(start + window)] = skew
        kurt_values[int(start + window)] = kurt
        fd_values[int(start + window)] = fd


    return (autocorr_values, var_values, skew_values, kurt_values, fd_values)


#Tool function to compute kendall tau
def kendall_tau(x, y):
    kt = kendalltau(x, y, nan_policy = "omit")
    return kt.statistic, kt.pvalue


#Compute delta as the signed maximum change in the EWS timeseries (per Rocha, 2022)
def delta(x, y):
    try:
        #Get maximum and minimum values
        maxv = np.nanmax(y)
        minv = np.nanmin(y)
    except ValueError:  #raised if `y` is empty.
        print("value error, setting maxv and minv both to nan")
        maxv = np.nan
        minv = np.nan
    if (np.isnan(maxv) & np.isnan(minv)):
        dval = [np.nan]
    elif maxv == minv:
        maxt = np.nan
        mint = np.nan
        dval = [0]
    else:
        maxt = np.array(x[y == maxv])
        mint = np.array(x[y == minv])
        if len(maxt) > 1:
            maxt = maxt[0]
        if len(mint) > 1:
            mint = mint[0]

        #Compute delta
        sign = (mint > maxt) *-2 +1
        dval = sign * (maxv-minv)
    return dval


#Compute kendall tau trends & delta in EWS for different setups
def trend_computation(df, option, ews):
    df['collapse_time'] = pd.to_datetime(f'{df.year_disturbance.unique()[0]}-01-01')
    #print(ews)
    if option == "full":
        df = df
    elif option == "before":
        df = df.loc[df.cleandate < df.collapse_time].reset_index(drop = True)
    elif option == "collapse_year":
        df = df.loc[((df.cleandate >= df.collapse_time) & (df.cleandate < df.collapse_time + pd.DateOffset(years = 1)))].reset_index(drop = True)

    elif option == "1yr":
        df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 1)))].reset_index(drop = True)
    
    elif option == "3yr":
        df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 3)))].reset_index(drop = True)

    elif option == "5yr":
        df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 5)))].reset_index(drop = True)

    elif option == "10yr":
        df = df.loc[((df.cleandate < df.collapse_time) & (df.cleandate >= df.collapse_time - pd.DateOffset(years = 10)))].reset_index(drop = True)
    
    else: 
        print("option needs to be one of 'full', 'before','collapse_year', '1yr', '3yr', '5yr', '10yr'.")

    #Compute kendall taus
    kt_stat, kt_pval = kendall_tau(df.cleandate.values, df[ews].values)
    #Compute delta
    dval = delta(df.cleandate.values, df[ews].values)

    if not df.shape[0] == 0:

        #Wrap this into dataframe
        return pd.DataFrame({   "entry_id" : df.entry_id.unique()[0],
                                "paired_id" : df.paired_id.unique()[0],
                                "VI" : df.VI.unique()[0],
                                "true_pos_neg" : df.true_pos_neg.unique()[0],
                                "rolling_window" : df.rolling_window.unique()[0],
                                "Setup" : option, 
                                "EWS" : ews, 
                                "n" : df.shape[0],
                                "kt_stat" : kt_stat, 
                                "kt_pval" : kt_pval, 
                                "delta" : dval}, index = [0])
    else:
        print("careful, this dataframe is empty, returning nothing")
        return pd.DataFrame({   "entry_id" : np.nan,
                                "paired_id" : np.nan,
                                "VI" : np.nan,
                                "true_pos_neg": np.nan,
                                "rolling_window" : np.nan,
                                "Setup" : option, 
                                "EWS" : ews, 
                                "n" : np.nan,
                                "kt_stat" : kt_stat, 
                                "kt_pval" : kt_pval, 
                                "delta" : dval}, index = [0])


#Inner pipeline to compute EWS for one entry_id and one rolling window
def inner_pipeline(df, entry_id, rolling_window, preprocess):
    df = df.loc[df.entry_id == entry_id].reset_index(drop = True)
    #print('shape of df for entry_id', entry_id, ':', df.shape)
    VI = df.VI.unique()[0]

    #Detrending and deseasoning
    #Calculate time difference to initial time
    df["date_diff"] = df.cleandate - df.cleandate[0]
    df["date_diff"] = df["date_diff"].astype(int)

    #STL as preprocessing method - this is not currently used
    if preprocess == "stl":
        #Subtract linear trend
        #print("linear detrending...")
        lin_trend = linear_model.LinearRegression()
        nrow = df.shape[0]
        #check for nans
        if np.sum(np.isnan(df.loc[:, f"VI_value"])) > 0:
            print("nans in VI_value")
            df[f"VI_dtl"] = np.nan
        else:
            lin_trend.fit(df.loc[:, ["date_diff"]], df.loc[:, f"VI_value"])
            #Compute residuals
            df[f"VI_dtl"] = df[f"VI_value"] - pd.Series(lin_trend.predict(df.loc[:, ["date_diff"]]))

        # Construct a multiple STL regressor using time as predictor
        #print("stl...")
        ynew = df.loc[:, f"VI_dtl"]
        if df.shape[0] > 23*5:
            d = MSTL(ynew, periods = (23 * 1, 23*5)).fit()
            #Assign the residuals as deseasoned data
            df[f"VI_dtl_stl"] = d.resid
        else:
            print("time series is too short, only", df.shape[0], "points")
            df[f"VI_dtl_stl"] = np.nan

    #Harmonic deseasoning as preprocessing method (per Smith & Boers, 2020)
    elif preprocess == "harmonic":
        print('harmonic deseasoning preprocessing...')
        #Do the rolling mean and harmonic deseasoning preprocessing from Smith & Boers
        #Add a radian time column
        df['t'] = (df['cleandate'] - pd.Timestamp("1970-01-01")).dt.total_seconds() / (365.25 * 24 * 3600)
        df['t'] *= 2 * np.pi  # Convert to radians

        #Rolling mean detrend with 5 year rolling windows
        df[f'VI_dtl'] = df['VI_value'] - df['VI_value'].rolling(window=23 * 5, center=True, min_periods = 12).agg(lambda x: np.nanmean(x))
        #if this is not all nan, keep going
        if np.sum(np.isnan(df[f'VI_dtl'])) < df.shape[0]:

            #Harmonic deseasoning
            def harmonic_model(df, harmonics = 3):
                cos_list = []
                sin_list = []
                for h in range(harmonics):
                    #Create cosine terms
                    df[f'cos_{h}'] = np.cos(df.t*h)
                    #Create sine terms
                    df[f'sin_{h}'] = np.sin(df.t*h)
                df['constant'] = 1
                return df
            
            df = harmonic_model(df, harmonics = 3)
            
            #Train linear model to get the coefficients of the different cos and sine terms
            df_train = df.dropna(subset = ['constant', 'cos_0', 'cos_1', 'cos_2', 'sin_0', 'sin_1', 'sin_2', 'VI_dtl'])
            X = df_train[['constant', 'cos_0', 'cos_1', 'cos_2', 'sin_0', 'sin_1', 'sin_2']]
            y = df_train['VI_dtl']
            model = sm.OLS(y, X).fit()
            #Compute the model's value and subtract
            df[f'VI_dtl_stl'] = df['VI_dtl'] - model.predict(df[['constant', 'cos_0', 'cos_1', 'cos_2', 'sin_0', 'sin_1', 'sin_2']])
        else:
            vbprint("all nan values in VI_dtl, skipping harmonic deseasoning")
            df[f'VI_dtl_stl'] = np.nan

    else:
        print("preprocess needs to be either of 'stl' or 'harmonic'")
        #Stop execution
        sys.exit()

    #Application on entire time window
    vbprint("ews application...")
    ews_full = rolling_ews(df.loc[:,f'VI_dtl_stl'], window = int(rolling_window)*23)
    #Results to dataframe
    column_names = ['ews_ac1', 'ews_var', 'ews_skew', 'ews_kurt', 'ews_fd']

    # Convert tuple of arrays to a DataFrame
    ews_full_df = pd.DataFrame(data=tuple(zip(*ews_full)), columns=column_names)
    ews_full_df["rolling_window"] = rolling_window

    #Add this onto existing df
    results_full = pd.concat([df, ews_full_df], axis = 1)

    #Trend computation: wrap to apply for all EWS and setups
    vbprint("computing trends...")
    ews_eval = []
    for option in ["full", "before","collapse_year", "1yr", "3yr", "5yr", "10yr"]:
        for ews in [f"ews_{i}" for i in ["ac1", "var", "skew", "kurt", "fd"]]:
            #print(option, ews)
            ews_eval.append(trend_computation(results_full, option, ews))
    ews_eval = pd.concat(ews_eval)
    ews_eval["pval_sig"] = ews_eval.kt_pval < 0.05

    return results_full, ews_eval


#Wrapper function to apply EWS application to all entry_id values in df
def outer_pipeline(df, rolling_window, preprocess):
    results_df = []
    ews_df = []
    for r in df.entry_id.unique():
        #Loop through different options of true_pos_neg
        for tp in df.true_pos_neg.unique():
            #print(tp)
            results_add, ews_add = inner_pipeline(df.loc[df.true_pos_neg == tp], r, rolling_window, preprocess)
            results_df.append(results_add)
            ews_df.append(ews_add)
    vbprint("ran through all entry_id values, concatenating dfs")

    #Make into massive dataframe
    results_df = pd.concat(results_df).reset_index(drop = True)
    ews_df = pd.concat(ews_df).reset_index(drop = True)

    return results_df, ews_df



### TOOLS FOR RECOVERY RATE COMPUTATION ###

#Function to cut the timeseries properly
def cut_timeseries(df, entry_id, true_pos_neg, VI):
    #Get collapse year
    collapse_year = df.loc[df.entry_id == entry_id].year_disturbance.unique()[0]
    #print("collapse year:", collapse_year)

    #within the year of collapse, take the lowest point of dtl_stl and then the five years after. fit exponential function to this and get rate r
    ts = df.loc[((df.entry_id == entry_id) & (df.true_pos_neg == true_pos_neg) & (df.VI == VI)), ['cleandate', 'VI_dtl_stl']].reset_index(drop = True)
    
    #Find minimum point in year of collapse
    min_date = ts.loc[ts.cleandate.dt.year == collapse_year]
    min_date = min_date.loc[min_date.VI_dtl_stl == min_date.VI_dtl_stl.min()]
    #If this is an empty dataframe, return nans
    if min_date.shape[0] == 0:
        vbprint("timeseries is empty, returning nans")
        return pd.DataFrame(), np.nan
    
    else:
        #Take the five years after this
        ts_fit = ts.loc[((ts.cleandate >= min_date.cleandate.values[0]) & (ts.cleandate < min_date.cleandate.values[0] + pd.DateOffset(years = 5)))].reset_index(drop = True)
        return ts_fit, min_date.cleandate.values[0]


#Fit exponential function to a range of datasets
def fit_exponential_global(df, entry_id, true_pos_neg, VI):
    #Cut the timeseries properly
    ts, min_date = cut_timeseries(df, entry_id, true_pos_neg, VI)

    if not ts.shape[0] == 0:
        time_col = 'cleandate'
        value_col = 'VI_dtl_stl'

        # Copy and check input
        ts = ts.copy().dropna(how='any')
        ts = ts[[time_col, value_col]].dropna()

        ts[time_col] = pd.to_datetime(ts[time_col])
        t0 = ts[time_col].min()
        ts['t'] = (ts[time_col] - t0).dt.total_seconds() / (3600 * 24)

        #Check shape of ts again
        #print('shape of ts after dropping nans:', ts.shape)

        x_data = ts['t'].values
        y_data = ts[value_col].values

        # Clean: filter out non-finite values
        mask = np.isfinite(x_data) & np.isfinite(y_data)
        x_data, y_data = x_data[mask], y_data[mask]

        if len(y_data) < 2 or np.all(y_data == y_data[0]):
            raise ValueError("Not enough variation in data to fit an exponential model.")

        # Safeguard for bounds
        y_max = np.nanmax(y_data)
        if not np.isfinite(y_max) or y_max <= 0:
            raise ValueError("Invalid y-data values for exponential fitting.")
        y_min = np.nanmin(y_data)

        # Loss function
        def loss(params):
            x0, r = params
            y_pred = x0 * np.exp(r * x_data)
            return np.sum((y_data - y_pred) ** 2)

        # Define finite, reasonable bounds
        x0_bounds = (y_min*10, -1e-6)
        r_bounds = (-1, 1)
        bounds = [x0_bounds, r_bounds]

        # Run global optimization
        result = differential_evolution(loss, bounds, seed=42)
        x0_opt, r_opt = result.x

        # Predict and calculate R^2
        y_pred = x0_opt * np.exp(r_opt * x_data)
        ss_res = np.sum((y_data - y_pred) ** 2)
        ss_tot = np.sum((y_data - np.mean(y_data)) ** 2)
        r_squared = 1 - ss_res / ss_tot
        #print(r_squared)

        return [min_date, x0_opt, r_opt, r_squared]
    
    else:
        vbprint("timeseries to fit is empty, returning nans")
        return [np.nan, np.nan, np.nan, np.nan]


#Function to apply this to all timeseries
def fit_exponential(df):
    #print(df)
    #Group df by entry_id and true_pos_neg, then apply this function
    df_grouped = df.groupby(['entry_id', 'true_pos_neg', 'VI'])
    #print(df_grouped)
    results = []
    for name, group in df_grouped:
        #print(group.shape)
        try:
            res = fit_exponential_global(df, name[0], name[1], name[2])
            results.append([name[0], name[1], name[2]] + res)
            #print(f"Fitted exponential for {name}, results: {res}")
        except ValueError as e:
            print(f"Skipping {name}: {e}")
            results.append([name[0], name[1], name[2], np.nan, np.nan, np.nan, np.nan])
    vbprint("fitted exponential functions to all timeseries, compiling results")
    #print(results)
    #Convert to dataframe
    results_df = pd.DataFrame(results, columns=['entry_id', 'true_pos_neg', 'VI', 'min_date', 'x0', 'r', 'r_squared'])
    return results_df


### TOOLS TO COMPUTE RESISTANCE ###

# Function to compute resistance for one timeseries
def resistance_tool(df, collapse_year):

    #Compute resistance
    resistance = df.VI_value.loc[df.cleandate.dt.year == collapse_year].mean() - df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].mean()
    mean_before = df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].mean()
    resistance_rel = resistance / mean_before

    #Also get the min value in collapse year as number of sd deviations
    sd_before = df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].std()
    resistance_max =  df.VI_value.loc[df.cleandate.dt.year == collapse_year].min() - df.VI_value.loc[((df.cleandate.dt.year >= collapse_year-3) & (df.cleandate.dt.year < collapse_year))].mean()
    resistance_sd_ratio = resistance_max / sd_before

    #Also compute maximum relative resistance
    resistance_max_rel = resistance_max / mean_before

    return resistance, mean_before, resistance_rel, resistance_max, sd_before, resistance_sd_ratio, resistance_max_rel

#Compute
def resistance_one_timeseries(df, in_path):

    #Pivot EVI and NDVI to long format
    #df['kNDVI'] = np.tanh(df['NDVI'] ** 2)
    #df = df.melt(id_vars=['entry_id', 'time', 'year_disturbance', 'true_pos_neg'], value_vars=['NDVI', 'EVI', 'kNDVI'], var_name='VI', value_name = 'VI_value')

    #Apply this to the grouped dataframe
    df_res = df.groupby(['entry_id', 'true_pos_neg', 'VI']).apply(resistance_tool, 
                                            collapse_year = df.year_disturbance.unique()[0], include_groups = True).reset_index().rename(columns = {0:'resistance'})

    #Expand the resistance column
    df_res[['resistance', 'mean_before', 'resistance_rel', 'resistance_max', 'sd_before', 'resistance_sd_ratio', 'resistance_max_rel']] = pd.DataFrame(df_res['resistance'].tolist(), index=df_res.index)

    #Save the result
    return df_res


#Pipeline to compute EWS, recovery rates and resistance for one entry
def pipeline_one_entry(df_path, out_folder, date_check, preprocess = 'harmonic', force_recompute = True):
    
    entry_id = df_path.split('entry_')[1].split('_')[0]
    #Check if this already exists in out_folder
    if ((os.path.exists(os.path.join(out_folder, f"{date_check}_{entry_id}_total_df.feather"))) & (not force_recompute)):
        print("this has already been computed")
        return
    else:
        print('computing')
        #Save an empty file to indicate that this is being computed
        open(os.path.join(out_folder, f"{date_check}_id_{entry_id}_total_df.feather"), 'a').close()
        #Load df
        df = pd.read_feather(df_path)

        #Preprocessing
        df['kNDVI'] = np.tanh(df['NDVI'] ** 2)
        df['cleandate'] = pd.to_datetime(df['time'])

        #Pivot to long form
        df = df.melt(id_vars = ['entry_id', 'cleandate', 'lat', 'lon', 'year_disturbance', 'paired_id', 'true_pos_neg', 'x','y', 'SummaryQA'], 
                value_vars = ['NDVI', 'kNDVI', 'EVI'], var_name = 'VI', value_name = 'VI_value')

        #Mask out those values that are low quality
        df.loc[df.SummaryQA != 0, 'VI_value'] = np.nan

        #Loop over VI options
        total_list = []
        ews_list = []
        for VI in ['NDVI','kNDVI', 'EVI']:
            df_sub = df.loc[df.VI == VI].reset_index(drop = True)
            #print("shape of df_sub:", df_sub.shape)
            for roll in [1, 3, 5, 10]:
                vbprint(f"{VI}: rolling window length {roll}")
                #Compute
                total_df, ews_df = outer_pipeline(df_sub, roll, preprocess = preprocess)
                vbprint("saving these to file")
                #Save these
                total_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_total_df_rolling_window_{roll}.feather"))
                ews_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_ews_df_rolling_window_{roll}.feather"))
                #Also append to list
                total_list.append(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_total_df_rolling_window_{roll}.feather"))
                ews_list.append(os.path.join(out_folder, f"{date_check}_id_{entry_id}_{VI}_ews_df_rolling_window_{roll}.feather"))

        #Join all the files into one
        vbprint("joining all the files into one")
        total_df = pd.concat([pd.read_feather(f) for f in total_list])
        ews_df = pd.concat([pd.read_feather(f) for f in ews_list])

        vbprint('saving EWS and total files')
        #Save the final files
        total_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_total_df.feather"))
        ews_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_ews_df.feather"))

        #Compute recovery rates
        vbprint("fitting the exponential function")
        #Fit the exponential function
        exp_df = fit_exponential(total_df)
        #Save this to file
        exp_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_exp_df.feather"))

        #Compute resistance
        vbprint("computing resistance")
        res_df = resistance_one_timeseries(df, df_path)
        #Save this to file
        res_df.to_feather(os.path.join(out_folder, f"{date_check}_id_{entry_id}_resistance_df.feather"))

        #Remove the temporary files
        vbprint("removing the temporary files")
        #print(total_list)
        for f in total_list:
            os.remove(f)
        for f in ews_list:
            os.remove(f)


#Main function to compute EWS for all timeseries in a folder
def main(in_folder,
        out_folder,
        max_workers,
        force_recompute,
        date_check):

    #Check if out_folder exists, if not, create it
    if not os.path.exists(out_folder):
        os.makedirs(out_folder)

    #Load data one by one
    ts_files = os.listdir(in_folder)
    ts_files = [os.path.join(in_folder, i) for i in ts_files if ((i.endswith('timeseries.feather')) and not ('all' in i))]
    
    #Set global preprocessing option
    preprocess = 'harmonic'
    date_check = str(date_check)

    #Parallel application to the different ts_files
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(pipeline_one_entry, path, out_folder, date_check)
            for path in ts_files
        ]

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing"):
            try:
                future.result()
            except Exception as e:
                print(f"Error processing file: {e}")
                # Handle the exception as needed
                traceback.print_exc()

    #Join all files of each type into one
    
    for suffix in ['total_df', 'ews_df', 'exp_df', 'resistance_df']:
        all_files = [os.path.join(out_folder, f) for f in os.listdir(out_folder) if f.endswith(f"{suffix}.feather") and f.startswith(date_check)]
        #print(all_files)
        if len(all_files) > 0:
            vbprint(f"joining all {suffix} files into one")
            all_df = pd.concat([pd.read_feather(f) for f in all_files])
            all_df.to_feather(os.path.join(out_folder, f"{date_check}_all_{suffix}.feather"))
            #Remove the individual files
            #for f in all_files:
            #    os.remove(f)
        else:
            vbprint(f"No files found for suffix {suffix}, skipping joining step.")
    

if __name__ == "__main__":
    #Get arguments from command line
    in_folder_timeseries = sys.argv[1] if len(sys.argv) > 1 else './data/intermediary/MOD13Q1_extracted_timeseries/'
    out_folder_ews = sys.argv[2] if len(sys.argv) > 2 else './data/final/MOD13Q1_ews_resistance_recovery/'
    max_workers = int(sys.argv[3]) if len(sys.argv) > 3 else 16
    verbose = int(sys.argv[4]) if len(sys.argv) > 4 else 1
    verbose = bool(verbose)
    force_recompute = int(sys.argv[5]) if len(sys.argv) > 5 else 0
    force_recompute = bool(force_recompute)
    date_check = int(sys.argv[6]) if len(sys.argv) > 6 else datetime.today().strftime('%y_%m_%d')
    date_check = str(date_check)

    main(in_folder=in_folder_timeseries, out_folder=out_folder_ews, max_workers=max_workers, force_recompute=force_recompute, date_check=date_check)