"""
Script to identify control points (true negatives) for the Hammond dataset based on true positive locations.
The true negatives are selected based on three criteria:
1. Minimum number of outliers in the time series before the disturbance year (outliers defined as absolute z-score > 2)
2. Minimum resistance (difference between mean in disturbance year and mean of three previous years)
3. Minimum mean absolute error (MAE) compared to the true positive time series before the disturbance year
The script extracts the time series for the true positive and the three true negative points and saves them to feather files.
It also saves the locations of the true positive and true negative points to feather files.
NOTE: This script runs for all locations that were previously processed, i.e., if you have run the other scripts in
test mode, it will only run for those locations without throwing an error message.
Usage: python3 ./code/02_empirical_analysis/03_true_negative_selection.py hammond_path zarr_input_folder output_folder
hammond_path: Path to the Hammond CSV file (default: "./data/global_tree_mortality_database/GTM_full_database_resolve_biomes_2017_with_true_pos_adjusted.csv")
zarr_input_folder: Directory where the preprocessed zarr files are stored (default: "./data/MOD13Q1_preprocessed_entry")
output_folder: Directory to save the extracted time series and points (default: "./data/MOD13Q1_extracted_timeseries/")
"""
import xarray as xr
import numpy as np
import pandas as pd
import os
import rioxarray
from pyproj import Transformer
from shapely import Point
from dask.diagnostics import ProgressBar
from tqdm.autonotebook import tqdm
import sys
#Deactivate xarray update warnings
xr.set_options(use_new_combine_kwarg_defaults=True)
#Function to identify the number of outlier points in the time series before the collapse year
def timeseries_outliers(df, collapse_year):
print('Compute outliers')
# 1. Resample to annual means
da_annual = df.resample(time='YE').mean()
# 2. Compute global mean and std across time
mean = da_annual.mean(dim='time')
std = da_annual.std(dim='time')
# 3. Calculate z-score
z_score = (da_annual - mean) / std
# 4. Flag outliers: absolute z > 2
outliers = xr.where(np.abs(z_score) > 2, 1, 0)
# 5. Mask to only include years <= collapse_year
collapse_cutoff = pd.Timestamp(f'{collapse_year}-12-31')
outliers_before_collapse = outliers.where(outliers.time <= collapse_cutoff, 0)
# 6. Count outliers for each pixel
n_outliers = outliers_before_collapse.sum(dim='time')
return n_outliers.NDVI.rename('n_outliers')
#Function to compute resistance to collapse
def resistance(df, collapse_year):
print("Computing resistance")
#Compute resistance: difference in the collapse year mean - mean of the three years previously
df['resistance'] = df.NDVI.sel(time = slice(f'{collapse_year}-01-01', f'{collapse_year+1}-01-01')).mean(dim = 'time') - df.NDVI.sel(time = slice(f'{collapse_year-3}-01-01', f'{collapse_year}-01-01')).mean(dim = 'time')
return df['resistance'].rename('resistance')
#Function to compute mean absolute error
def mae(x, y):
return np.nanmean(np.abs(x - y))
#Function to compute mean absolute error compared to true positive time series
def compute_mae(df, x_true, y_true, collapse_year):
print("Computing MAE")
#Clip to just the values before the collapse
df_pre = df.sel(time = slice('2001-01-01', f'{collapse_year}-01-01'))
#Get the true time series
df_true_pre = df_pre.sel(x = x_true, y = y_true, method='nearest')
#Compute MAE
mae_xr = xr.apply_ufunc(mae, df_pre.NDVI,
input_core_dims = [['time']],
output_core_dims = [[]],
kwargs = {'y' : df_true_pre.NDVI.values},
vectorize = True,
dask = 'parallelized')
return mae_xr.rename('MAE')
#Function to combine the different criteria for true negative selection
def stack_tn_criteria(df, x_true, y_true, collapse_year):
#Compute outliers
df_n_bkpts = timeseries_outliers(df, collapse_year)
#Compute resistance
df_resistance = resistance(df, collapse_year)
#Compute MAE
df_mae = compute_mae(df, x_true, y_true, collapse_year)
#Stack the three criteria
df_stack = xr.merge([df_n_bkpts, df_resistance, df_mae])
#Convert to dataframe
print('Stacking and converting to df')
df_stack = df_stack.to_dataframe().reset_index().dropna()
#Compute ratio between resistance and MAE for minimum bkpts
df_stack['ratio'] = abs(1/df_stack.resistance) / df_stack.MAE
return df_stack
#Function to extract the time series for a given x, y coordinate
def ts_extraction(df, x, y, df_hammond_row):
#Extract the time series
df_ts = df.sel(x = x, y = y, method='nearest')[['NDVI', 'EVI', 'SummaryQA']].to_dataframe().reset_index()
#Drop spatial_ref column and resistance column
df_ts = df_ts.drop(columns = ['spatial_ref'])
#Add all the other values of that dataframe row
for col in df_hammond_row.index:
if col not in ['x', 'y']:
df_ts[col] = df_hammond_row[col]
return df_ts
#Extraction pipeline for one entry
def extraction_pipeline_one_entry(id,
df_hammond,
zarr_in_folder,
out_folder):
#Pipeline to run the whole true negative computation and time series extraction for one entry
print(f"Started processing id {id} in PID {os.getpid()}")
#Check if path exists
if os.path.exists(os.path.join(out_folder, f'entry_{id}_points.feather')):
print(f"Entry {id} already exists, skipping")
return os.path.join(out_folder, f'entry_{id}_points.feather'), os.path.join(out_folder, f'entry_{id}_timeseries.feather')
else:
print(f"Entry {id} does not exist, processing")
#Create empty feather files to prevent another process from working on this
pd.DataFrame().to_feather(os.path.join(out_folder, f'entry_{id}_points.feather'))
pd.DataFrame().to_feather(os.path.join(out_folder, f'entry_{id}_timeseries.feather'))
#Load the zarr file
print('Loading zarr file')
df = xr.open_zarr(os.path.join(zarr_in_folder, f'entry_{id}.zarr'), consolidated = False, chunks = {'x': 1000, 'y': 1000, 'time': -1})
#Check if we have more than 600 time steps
if len(df.time) > 600:
#If so, skip this entry and give a warning
print(f"Entry {id} has more than 600 time steps, skipping, there is something wrong here!")
return os.path.join(out_folder, f'entry_{id}_points.feather'), os.path.join(out_folder, f'entry_{id}_timeseries.feather')
#Clip df to 10km box
print('Clipping to 10km box')
#Get direction of axes
if df.y[0] > df.y[1]:
y_dir = -1
else:
y_dir = 1
print(f"Y axis direction: {y_dir}")
#Get direction of x-axis
if df.x[0] > df.x[1]:
x_dir = -1
else:
x_dir = 1
print(f"X axis direction: {x_dir}")
#Select the box
if x_dir == 1:
x_slice = slice(df_hammond.loc[df_hammond.entry_id == id, 'minx'].values[0], df_hammond.loc[df_hammond.entry_id == id, 'maxx'].values[0])
else:
x_slice = slice(df_hammond.loc[df_hammond.entry_id == id, 'maxx'].values[0], df_hammond.loc[df_hammond.entry_id == id, 'minx'].values[0])
if y_dir == 1:
y_slice = slice(df_hammond.loc[df_hammond.entry_id == id, 'miny'].values[0], df_hammond.loc[df_hammond.entry_id == id, 'maxy'].values[0])
else:
y_slice = slice(df_hammond.loc[df_hammond.entry_id == id, 'maxy'].values[0], df_hammond.loc[df_hammond.entry_id == id, 'miny'].values[0])
#Apply this
df = df.sel(x = x_slice, y = y_slice)
#Also rechunk properly
df = df.chunk({'x': 1000, 'y': 1000, 'time': -1})
print(f"Data shape after clipping: {df.NDVI.shape}")
#Get the x, y coordinates of the true positive
print('TP coordinates')
x_true = df_hammond.loc[df_hammond.entry_id == id, 'x'].values[0]
y_true = df_hammond.loc[df_hammond.entry_id == id, 'y'].values[0]
collapse_year = df_hammond.loc[df_hammond.entry_id == id, 'year_disturbance'].values[0]
#Compute TN
print('Computing TN')
df_tn = stack_tn_criteria(df, x_true, y_true, collapse_year)
#Extract the different points
print('Extracting points')
#Keep only points with minimum number of breakpoints
df_tn = df_tn.loc[df_tn.n_outliers == df_tn.n_outliers.min()].reset_index(drop=True)
#Remove the point with MAE == 0 because this clearly is the original point
df_tn = df_tn.loc[df_tn.MAE != 0].reset_index(drop=True)
#Get the coordinates of the point with the maximum ratio
x_tn_ratio, y_tn_ratio = df_tn.loc[df_tn.ratio == df_tn.ratio.max(), ['x', 'y']].values[0]
#Get the coordinates of the point with minimum MAE
x_tn_mae, y_tn_mae = df_tn.loc[df_tn.MAE == df_tn.MAE.min(), ['x', 'y']].values[0]
#Get the coordinates of the point with minimum resistance
x_tn_res, y_tn_res = df_tn.loc[abs(df_tn.resistance) == abs(df_tn.resistance).min(), ['x', 'y']].values[0]
#Extract true positive time series
print('Extracting TP timeseries')
dtrue = ts_extraction(df, x_true, y_true, df_hammond.loc[df_hammond.entry_id == id].iloc[0])
#Make empty list to save the tn points to
new_point_list = []
#Save the TN points as additional row to the dataframe
print('Extracting TN ratio')
new_point = df_hammond.loc[df_hammond.entry_id == id].iloc[0].copy()
new_point['x'] = x_tn_ratio
new_point['y'] = y_tn_ratio
new_point['true_pos_neg'] = 'true_neg_ratio'
#Extract ts
d_tn_ratio = ts_extraction(df, x_tn_ratio, y_tn_ratio, new_point)
#Save the point based on MAE
print('Extracting TN MAE')
new_point_mae = df_hammond.loc[df_hammond.entry_id == id].iloc[0].copy()
new_point_mae['x'] = x_tn_mae
new_point_mae['y'] = y_tn_mae
new_point_mae['true_pos_neg'] = 'true_neg_mae'
d_tn_mae = ts_extraction(df, x_tn_mae, y_tn_mae, new_point_mae)
#Save the point based on resistance
print('Extracting TN resistance')
new_point_res = df_hammond.loc[df_hammond.entry_id == id].iloc[0].copy()
new_point_res['x'] = x_tn_res
new_point_res['y'] = y_tn_res
new_point_res['true_pos_neg'] = 'true_neg_resistance'
d_tn_res = ts_extraction(df, x_tn_res, y_tn_res, new_point_res)
#Add the new points to a list to later concatenate
new_point_list.append(new_point)
new_point_list.append(new_point_mae)
new_point_list.append(new_point_res)
#Concatenate the time series as well
print('Concatenating time series')
d_vals = pd.concat([dtrue, d_tn_ratio, d_tn_mae, d_tn_res], axis = 0)
#Save them as feather files and return paths
print('Saving to feather')
new_point_list = pd.DataFrame(new_point_list)
new_point_list.to_feather(os.path.join(out_folder, f'entry_{id}_points.feather'))
d_vals.to_feather(os.path.join(out_folder, f'entry_{id}_timeseries.feather'))
#Return the paths as well
return os.path.join(out_folder, f'entry_{id}_points.feather'), os.path.join(out_folder, f'entry_{id}_timeseries.feather')
#Function to load and preprocess Hammond, and run the extraction pipeline for all entries
def main(hammond_path, zarr_in_folder, out_folder):
#Load hammond dataset
print('Loading dataset')
df_hammond = pd.read_csv(hammond_path)
#Check if out_folder exists, if not, create it
if not os.path.exists(out_folder):
os.makedirs(out_folder)
#Add 10km buffer to the coordinates
#NOTE: double check this with direction of y-axis
print('Adding 10km buffer')
df_hammond['minx'] = df_hammond['x'] - 10_000
df_hammond['miny'] = df_hammond['y'] - 10_000
df_hammond['maxx'] = df_hammond['x'] + 10_000
df_hammond['maxy'] = df_hammond['y'] + 10_000
#For those entry_id values, where we have both true_pos and true_pos_adj, drop the true_pos entries
print('Filtering out low resolution entries')
len_orig = len(df_hammond)
ids_adj = df_hammond.groupby('entry_id').filter(lambda x: len(x) > 1).entry_id.unique()
#For those entries, filter out the true_pos
df_hammond = df_hammond.loc[~((df_hammond.entry_id.isin(ids_adj)) & (df_hammond.true_pos_neg == 'true_pos'))].reset_index(drop=True)
print(f"Filtered out {len_orig - len(df_hammond)} entries with true_pos")
#Parallelize application of extraction_pipeline_one_entry
print('Application')
point_list = []
ts_list = []
for i in tqdm(df_hammond.entry_id.values, total = len(df_hammond.entry_id.values)):
#Apply the function to each entry
point_path, ts_path = extraction_pipeline_one_entry(i, df_hammond, zarr_in_folder = zarr_in_folder,
out_folder = out_folder)
#Append all these
point_list.append(point_path)
ts_list.append(ts_path)
#Concatenate all the points and time series
print('Concatenating all points and time series')
df_points = pd.concat([pd.read_feather(p) for p in point_list], ignore_index=True).reset_index(drop=True)
#Concat the points with df_hammond
df_points = pd.concat([df_points, df_hammond], ignore_index=True).reset_index(drop=True)
df_timeseries = pd.concat([pd.read_feather(p) for p in ts_list], ignore_index=True).reset_index(drop=True)
#Save the concatenated dataframes
df_points.to_feather(os.path.join(out_folder, 'all_points.feather'))
df_timeseries.to_feather(os.path.join(out_folder, 'all_timeseries.feather'))
print('done :)')
if __name__ == "__main__":
#Get command line arguments
hammond_path = sys.argv[1] if len(sys.argv) > 1 else "./data/intermediary/global_tree_mortality_database/GTM_full_database_resolve_biomes_2017_with_true_pos_adjusted.csv"
zarr_in_folder = sys.argv[2] if len(sys.argv) > 2 else "./data/intermediary/MOD13Q1_preprocessed_entry"
out_folder = sys.argv[3] if len(sys.argv) > 3 else "./data/intermediary/MOD13Q1_extracted_timeseries/"
#Call the main function
main(hammond_path, zarr_in_folder, out_folder)