Commit 89402ead authored by Germain Salgues's avatar Germain Salgues

UPDATE: minor change in snow_annual_map behavior

parent abaf77c8
......@@ -51,7 +51,7 @@ def main(argv):
snow_annual_map_evaluation_app = snow_annual_map_evaluation.snow_annual_map_evaluation(data)
snow_annual_map_evaluation_app.run()
if data.get("run_l8_evaluation", False):
if data.get("run_comparison_evaluation", False):
snow_annual_map_evaluation_app.run_evaluation()
if data.get("run_modis_comparison", False):
......
......@@ -37,7 +37,7 @@ def datetime_to_str(date, format="%Y%m%d"):
"""
logging.debug(date)
return date.strftime(format)
def call_subprocess(process_list):
""" Run subprocess and write to stdout and stderr
"""
......@@ -48,7 +48,7 @@ def call_subprocess(process_list):
stderr=subprocess.PIPE)
out, err = process.communicate()
logging.info(out)
sys.stderr.write(err)
sys.stderr.write(str(err))
class prepare_data_for_snow_annual_map():
def __init__(self, params):
......@@ -61,13 +61,22 @@ class prepare_data_for_snow_annual_map():
self.date_margin = timedelta(days=params.get("date_margin", 0))
self.output_dates_filename = params.get("output_dates_filename", None)
self.mode = params.get("mode", "RUNTIME")
self.mission_tags = ["LANDSAT"]#["SENTINEL2"]
self.input_dir = str(params.get("input_dir"))
self.snow_products_dir = str(params.get("snow_products_dir"))
self.path_tmp = str(params.get("path_tmp", os.environ.get('TMPDIR')))
self.path_out = op.join(str(params.get("path_out")),
self.tile_id + "_" + datetime_to_str(self.date_start)
+ "-" + datetime_to_str(self.date_stop))
self.input_products_list=params.get("input_products_list",[])
self.processing_id = self.tile_id + "_" + \
datetime_to_str(self.date_start) + "_" + \
datetime_to_str(self.date_stop)
self.path_out = op.join(str(params.get("path_out")), self.processing_id)
self.use_densification = params.get("use_densification", False)
if self.use_densification:
self.mission_tags.append("LANDSAT")
self.densification_products_list=params.get("densification_products_list",[])
if not os.path.exists(self.path_out):
os.mkdir(self.path_out)
......@@ -79,91 +88,145 @@ class prepare_data_for_snow_annual_map():
self.datalake_products_availability = 0
def run(self):
logging.info('Process tile:' + self.tile_id + '...')
logging.info('Process tile:' + self.tile_id +'.')
logging.info(' for period ' + str(self.date_start) + ' to ' + str(self.date_stop))
search_start_date = self.date_start - self.date_margin
search_stop_date = self.date_stop + self.date_margin
parameters = {"processingLevel": "LEVEL2A", "location":str(self.tile_id)}
amalthee_theia = Amalthee('theia')
amalthee_theia.search("SENTINEL2",
datetime_to_str(search_start_date, "%Y-%m-%d"),
datetime_to_str(search_stop_date, "%Y-%m-%d"),
parameters,
nthreads = self.nbThreads)
nb_products = amalthee_theia.products.shape[0]
logging.info('There is ' + str(nb_products) + ' products for the current request')
df = amalthee_theia.products
df['snow_product'] = ""
df['snow_product_available'] = False
snow_product_available = 0
datalake_product_available = 0
datalake_update_requested = 0
for product_id in df.index:
logging.info('Processing ' + product_id)
# check datalake availability
if df.loc[product_id, 'available']:
datalake_product_available += 1
# check snow product availability
expected_snow_product_path = op.join(self.input_dir, self.tile_id, product_id)
df.loc[product_id, 'snow_product'] = expected_snow_product_path
logging.info(expected_snow_product_path)
if op.exists(expected_snow_product_path):
logging.info(product_id + " is available as snow product")
snow_product_available += 1
df.loc[product_id, 'snow_product_available'] = True
elif df.loc[product_id, 'available']:
logging.info(product_id + " requires to generate the snow product")
self.process_snow_product(product_id)
filename_i = os.path.abspath(self.processing_id +"_pending_for_snow_processing.txt")
FileOut = open(os.path.join(".", filename_i),"w")
resulting_df = None
snow_processing_requested = 0
for mission_tag in self.mission_tags:
parameters = {"processingLevel": "LEVEL2A", "location":str(self.tile_id)}
amalthee_theia = Amalthee('theia')
amalthee_theia.search(mission_tag,
datetime_to_str(search_start_date, "%Y-%m-%d"),
datetime_to_str(search_stop_date, "%Y-%m-%d"),
parameters,
nthreads = self.nbThreads)
nb_products = amalthee_theia.products.shape[0]
logging.info('There are ' + str(nb_products) + ' ' + mission_tag + ' products for the current request')
snow_products_list=[]
if nb_products:
df = amalthee_theia.products
df['snow_product'] = ""
df['snow_product_available'] = False
snow_product_available = 0
datalake_product_available = 0
datalake_update_requested = 0
for product_id in df.index:
logging.info('Processing ' + product_id)
# check datalake availability
if df.loc[product_id, 'available']:
datalake_product_available += 1
# check snow product availability
expected_snow_product_path = op.join(self.snow_products_dir, self.tile_id, product_id)
df.loc[product_id, 'snow_product'] = expected_snow_product_path
logging.info(expected_snow_product_path)
if op.exists(expected_snow_product_path):
logging.info(product_id + " is available as snow product")
snow_product_available += 1
df.loc[product_id, 'snow_product_available'] = True
snow_products_list.append(expected_snow_product_path)
elif df.loc[product_id, 'available']:
logging.info(product_id + " requires to generate the snow product")
snow_processing_requested += 1
FileOut.write(df.loc[product_id, 'datalake']+"\n")
else:
logging.info(product_id + " requires to be requested to datalake.")
datalake_update_requested += 1
if resulting_df is not None:
resulting_df = resulting_df.append(df)
else:
resulting_df = df
self.snow_products_availability = float(snow_product_available/nb_products)
logging.info("Percent of available snow product : " + str(100*self.snow_products_availability) + "%")
self.datalake_products_availability = float(datalake_product_available/nb_products)
logging.info("Percent of available datalake product : " + str(100*self.datalake_products_availability) + "%")
# datalake update if not all the products are available
if datalake_update_requested > 0:
logging.info("Requesting an update of the datalake because of " + str(datalake_update_requested) + " unavailable products...")
amalthee_theia.fill_datalake()
logging.info("End of requesting datalake.")
if mission_tag == "LANDSAT":#"SENTINEL2":
self.input_products_list.extend(snow_products_list)
else:
logging.info(product_id + " requires to be requested to datalake.")
datalake_update_requested += 1
self.snow_products_availability = float(snow_product_available/nb_products)
logging.info("Percent of available snow product : " + str(100*self.snow_products_availability) + "%")
self.datalake_products_availability = float(datalake_product_available/nb_products)
logging.info("Percent of available datalake product : " + str(100*self.datalake_products_availability) + "%")
# TODO add datalake update if not all the products are available
if datalake_update_requested > 0:
logging.info("Requesting an update of the datalake because of " + datalake_update_requested + " unavailable products...")
#amalthee_theia.fill_datalake()
logging.info("End of requesting datalake.")
self.densification_products_list.extend(snow_products_list)
# request processing for remaining products
FileOut.close()
if snow_processing_requested != 0:
self.process_snow_products(filename_i, snow_processing_requested)
# Create fill to access requested products status
products_file = op.join(self.path_out, "input_datalist.csv")
logging.info("Products detailed status is avaible under: " + products_file)
df.to_csv(products_file, sep=';')
if resulting_df is not None:
products_file = op.join(self.path_out, "input_datalist.csv")
logging.info("Products detailed status is avaible under: " + products_file)
resulting_df.to_csv(products_file, sep=';')
else:
logging.error("No products available to compute snow annual map!!")
def build_json(self):
if self.snow_products_availability > 0.9:
if self.snow_products_availability > 0.999:
snow_annual_map_param_json = os.path.join(self.path_out, "param.json")
logging.info("Snow annual map can be computed from: " + snow_annual_map_param_json)
self.raw_params['data_availability_check'] = True
self.raw_params['log'] = True
self.raw_params['log_stdout'] = op.join(self.path_out,"stdout.log")
self.raw_params['log_stderr'] = op.join(self.path_out,"stderr.log")
self.raw_params['input_products_list'] = self.input_products_list
if self.use_densification:
self.raw_params['densification_products_list'] = self.densification_products_list
jsonFile = open(snow_annual_map_param_json, "w")
jsonFile.write(json.dumps(self.raw_params, indent=4))
jsonFile.close()
return snow_annual_map_param_json
else:
logging.error("Snow annual map cannot be computed because of too many missing products")
def process_snow_product(self, product_id):
logging.info("Ordering processing of the snow product for " + product_id)
def process_snow_products(self, file_to_process, array_size=None):
logging.info("Ordering processing of the snow products on " + file_to_process)
command = ["qsub",
"-v",
"tile="+self.tile_id[1:]+",outpath="+self.input_dir,
"runTile_lis_Sentinel2_datalake_anytile.sh"]
#call_subprocess(command)
logging.info("Order was submitted the snow product will soon be available.")
"filename=\""+file_to_process+"\",tile=\""+self.tile_id[1:]+"\",out_path=\""+self.snow_products_dir+"\",overwrite=\"false\"",
"run_lis_from_filelist.sh"]
# in case the array size is provided, it requires a job array of the exact size.
if array_size:
command.insert(1, "-J")
command.insert(2, "1-"+str(array_size+1))
print(" ".join(command))
try:
call_subprocess(command)
logging.info("Order was submitted the snow annual map will soon be available.")
except:
logging.warning("Order was submitted the snow annual map will soon be available, but missinterpreted return code")
def process_snow_annual_map(self, file_to_process):
logging.info("Ordering processing of the snow annual map, " + file_to_process)
command = ["qsub",
"-v",
"config=\""+file_to_process+"\",overwrite=false",
"run_snow_annual_map.sh"]
print(" ".join(command))
try:
call_subprocess(command)
logging.info("Order was submitted the snow annual map will soon be available.")
except:
logging.warning("Order was submitted the snow annual map will soon be available, but missinterpreted return code")
def main():
params = {"tile_id":"T32TPS",
......@@ -171,12 +234,15 @@ def main():
"date_stop":"31/08/2018",
"date_margin":15,
"mode":"DEBUG",
"input_dir":"/work/OT/siaa/Theia/Neige/PRODUITS_NEIGE_LIS_develop_1.5",
"input_products_list":[],
"snow_products_dir":"/work/OT/siaa/Theia/Neige/PRODUITS_NEIGE_LIS_develop_1.5",
"path_tmp":"",
"path_out":"/work/OT/siaa/Theia/Neige/Snow_Annual_Maps",
"ram":2048,
"nbThreads":5,
"use_l8_for_densification":False,
#"path_out":"/home/qt/salguesg/scratch/multitemp_workdir/tmp_test",
"path_out":"/work/OT/siaa/Theia/Neige/Snow_Annual_Maps_L8_Only",
"ram":8192,
"nbThreads":6,
"use_densification":False,
"densification_products_list":[],
"data_availability_check":False}
with open('selectNeigeSyntheseMultitemp.csv', 'r') as csvfile:
......@@ -191,8 +257,10 @@ def main():
prepare_data_for_snow_annual_map_app = prepare_data_for_snow_annual_map(params)
prepare_data_for_snow_annual_map_app.run()
prepare_data_for_snow_annual_map_app.build_json()
config_file = prepare_data_for_snow_annual_map_app.build_json()
if config_file is not None:
prepare_data_for_snow_annual_map_app.process_snow_annual_map(config_file)
#return
if __name__== "__main__":
# Set logging level and format.
......
#!/bin/bash
#PBS -N TheiaNeigeArrayFromFile
#PBS -j oe
#PBS -l select=1:ncpus=4:mem=20000mb
#PBS -l walltime=00:59:00
#PBS -J 1-100
#PBS -m e
# run LIS for one Sentinel-2 Level-2A tile (all available dates in the tile folder)
# specify the path to the tile folder, the path the DEM and the template configuration file (.json)
# First argument is the tile name (nnccc): qsub -v filename="file.txt",tile="31TCH",out_path="path/where/to/store/results"(,overwrite="false") run_lis_from_filelist.sh
# FIXME: conflict on hal between environnement variable TMPDIR used also internally by openmpi
# Store the directory in PBS_TMP_DIR
PBS_TMPDIR=$TMPDIR
# Unset TMPDIR env variable which conflicts with openmpi
unset TMPDIR
# tile="T"$1
if [ -z $tile ]; then
echo "tile is not set, exit"
exit
fi
# filename to process
if [ -z $filename ]; then
echo "filename is not set, exit"
exit 1
fi
echo $filename
if [ -z $overwrite ]; then
echo "overwrite is not set, not overwrite will be done"
overwrite="false"
fi
if [ -z $out_path ]; then
echo "out_path is not set, exit"
exit 1
fi
echo $out_path
# working directory
tmp_output_dir=$PBS_TMPDIR/TheiaNeige_Muscate_T${tile}_out/
tmp_input_dir=$PBS_TMPDIR/TheiaNeige_Muscate_T${tile}_in/
# storage directory
storage_output_dir=$out_path
# S2 L2A products input path
pin="/datalake/S2-L2A-THEIA/"
# DEM input path
pdem="/work/OT/siaa/Theia/Neige/DEM/"
pdem_l8="/work/OT/siaa/Theia/Neige/DEM_L8/"
# input DEM
inputdem=$(find $pdem/S2__TEST_AUX_REFDE2_T${tile}_0001.DBL.DIR/ -name "*ALT_R2.TIF")
echo "inputdem:" $inputdem
inputdem_l8=$(find $pdem_l8/L8_TEST_AUX_REFDE2_T${tile}_0001.DBL.DIR/ -name "*ALT.TIF")
echo "inputdem l8:" $inputdem_l8
# load the available product names from the tile directory
array_products=($(cat $filename))
echo "array size" ${#array_products[@]}
# use the PBS_ARRAY_INDEX variable to distribute jobs in parallel (bash indexing is zero-based)
i="${array_products[${PBS_ARRAY_INDEX} - 1]}"
if [ -z $i ]; then
echo "No file to process PBS_ARRAY_INDEX:" ${PBS_ARRAY_INDEX}
exit 0
fi
echo "array_products[PBS_ARRAY_INDEX]" $i
# use the product name to identify the config and output files
product_zip=$(basename $i)
product_folder=$(basename $i .zip)
echo "product_folder" $product_folder
echo "product_zip" $product_zip
# check if final folder already exist
if [ -d $storage_output_dir/T$tile/$product_folder ]; then
echo "$storage_output_dir/T$tile/$product_folder exists!"
if [ $overwrite == "false" ]; then
echo "exiting to avoid overwrite"
exit 1
fi
fi
#create working input directory
pinw=$tmp_input_dir
mkdir -p $pinw
echo "pinw" $pinw
#copy input data
cp -r $i $pinw/
cd $pinw
#unzip -u $pinw/$product_zip
img_input=$pinw/$product_folder
zip_input_product=$pinw/$product_zip
echo "img_input" $img_input
echo "zip_input_product" $zip_input_product
# create working output directory
pout=$tmp_output_dir/$product_folder/
mkdir -p $pout
echo "pout" $pout
#Load LIS modules
#module load lis/develop
source /home/qt/salguesg/load_lis.sh
#configure gdal_cachemax to speedup gdal polygonize and gdal rasterize (half of requested RAM)
export GDAL_CACHEMAX=2048
echo $GDAL_CACHEMAX
export PATH=/home/qt/salguesg/local/bin:/home/qt/salguesg/local/bin:$PATH
echo $PATH
#check if L8 products to use the correct DEM
if [[ $product_zip == *LANDSAT8* ]];
then
inputdem=$inputdem_l8
echo 'using dem $inputdem'
fi
# create config file
date ; echo "START build_json.py $config"
build_json.py -ram 2048 -dem $inputdem $zip_input_product $pout
config=${pout}/param_test.json
echo $config
date ; echo "END build_json.py"
if [ -z "$(ls -A -- $pout)" ]
then
echo "ERROR: $pout directory is empty, something wrong happen during build_json"
exit 1
fi
# run the snow detection
date ; echo "START run_snow_detector.py $config"
run_snow_detector.py $config
date ; echo "END run_snow_detector.py"
if [ -z "$(ls -A -- ${pout}/LIS_PRODUCTS)" ]
then
echo "ERROR: ${pout}/LIS_PRODUCTS directory is empty, something wrong happen during run_snow_detector"
exit 1
fi
# copy output to /work
mkdir -p $storage_output_dir/T$tile/
cp -r $pout $storage_output_dir/T$tile/
final_path=$storage_output_dir/T$tile/$product_folder
chgrp -R lis_admin $final_path
chmod 775 -R $final_path
echo "Product is available under $final_path"
rm -r $pout $pinw
......@@ -25,3 +25,9 @@ Name,
32TPT
32TQS
32TQT
31TFK
31TFL
32TLP
31TGJ
33TUM
......@@ -102,6 +102,34 @@ def gap_filling(img_in, mask_in, img_out, input_dates_file=None,
else:
logging.error("Parameters img_in, img_out and mask_in are required")
def merge_masks_at_same_date(mask_snow_list, mask_cloud_list, merged_snow_mask, merged_cloud_mask):
img_index = range(1, len(mask_cloud_list)+1)
expression_cloud_merging = "".join(["(im" + str(i) + "b1==0?0:" for i in img_index])
expression_cloud_merging += "1"
expression_cloud_merging += "".join([")" for i in img_index])
bandMathApp = band_math(mask_cloud_list,
merged_cloud_mask,
expression_cloud_merging,
self.ram,
otb.ImagePixelType_uint8)
bandMathApp.ExecuteAndWriteOutput()
bandMathApp = None
offset = len(mask_cloud_list)
expression_snow_merging = "".join(["(im" + str(i) + "b1==0?im"+str(i+offset)+"b1:" for i in img_index])
expression_snow_merging += "0"
expression_snow_merging += "".join([")" for i in img_index])
input_img_list = mask_cloud_list + mask_snow_list;
bandMathApp = band_math(input_img_list,
merged_snow_mask,
expression_snow_merging,
self.ram,
otb.ImagePixelType_uint8)
bandMathApp.ExecuteAndWriteOutput()
return
class snow_annual_map():
def __init__(self, params):
logging.info("Init snow_multitemp")
......@@ -117,8 +145,10 @@ class snow_annual_map():
datetime_to_str(self.date_start) + "_" + \
datetime_to_str(self.date_stop))
self.input_dir = str(op.join(params.get("input_dir"), self.tile_id))
# input_dir is no longer used
#self.input_dir = str(op.join(params.get("snow_products_dir"), self.tile_id))
self.input_path_list = params.get("input_products_list", [])
#self.path_tmp = str(params.get("path_tmp", os.environ.get('TMPDIR')))
self.path_tmp = str(os.environ.get('TMPDIR'))
self.path_out = op.join(str(params.get("path_out")), self.processing_id)
......@@ -129,20 +159,9 @@ class snow_annual_map():
self.ram = params.get("ram", 512)
self.nbThreads = params.get("nbThreads", None)
self.use_l8_for_densification = params.get("use_l8_for_densification", False)
if self.use_l8_for_densification:
self.l8_tile_id = params.get("l8_tile_id")
self.l8_input_dir = str(op.join(params.get("l8_input_dir"), self.l8_tile_id))
if not os.path.exists(self.l8_input_dir) or not self.l8_tile_id:
logging.error("Cannot use L8 densification with "
+ str(self.l8_tile_id)
+ ", "
+ self.l8_input_dir)
else:
logging.info("Using L8 densification with "
+ str(self.l8_tile_id)
+ ", "
+ self.l8_input_dir)
self.use_densification = params.get("use_densification", False)
if self.use_densification:
self.densification_path_list = params.get("densification_products_list", [])
# Define label for output snow product
self.label_no_snow = "0"
......@@ -168,7 +187,8 @@ class snow_annual_map():
os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"] = str(self.nbThreads)
# search matching snow product
self.product_list = self.find_products(self.input_dir, self.tile_id, "SENTINEL2")
self.product_list = self.load_products(self.input_path_list, self.tile_id, None)
logging.debug("Product list:")
logging.debug(self.product_list)
......@@ -176,29 +196,29 @@ class snow_annual_map():
logging.error("Empty product list!")
return
# @TODO clean the loading of the L8 products to densify the timeserie
if self.use_l8_for_densification:
# search matching L8 snow product
l8_product_list = self.find_products(self.l8_input_dir, self.l8_tile_id, "LANDSAT8")
logging.info("L8 product list:")
logging.info(l8_product_list)
# @TODO clean the loading of the products to densify the timeserie
if self.use_densification:
# load densification snow products
densification_product_list = self.load_products(self.densification_path_list, None, None)
logging.info("Densification product list:")
logging.info(densification_product_list)
s2_input_dates = [datetime_to_str(product.acquisition_date) \
for product in self.product_list]
s2_footprint_ref = self.product_list[0].get_snow_mask()
# filter l8 products to keep only products that don't overlap an existing S2 date
final_l8_product_list = []
for l8_product in l8_product_list:
if not datetime_to_str(l8_product.acquisition_date) in s2_input_dates:
final_l8_product_list.append(l8_product)
logging.info(final_l8_product_list)
# filter densification products to keep only products that don't overlap an existing S2 date
final_densification_product_list = []
for densifier_product in densification_product_list:
if not datetime_to_str(densifier_product.acquisition_date) in s2_input_dates:
final_densification_product_list.append(densifier_product)
logging.info(final_densification_product_list)
# reproject the l8 products on S2 tile before going further
for l8_product in final_l8_product_list:
original_mask = l8_product.get_snow_mask()
# reproject the densification products on S2 tile before going further
for densifier_product in final_densification_product_list:
original_mask = densifier_product.get_snow_mask()
reprojected_mask = op.join(self.path_tmp,
l8_product.product_name + "_reprojected.tif")
densifier_product.product_name + "_reprojected.tif")
if not os.path.exists(reprojected_mask):
super_impose_app = super_impose(s2_footprint_ref,
original_mask,
......@@ -209,12 +229,12 @@ class snow_annual_map():
otb.ImagePixelType_uint8)
super_impose_app.ExecuteAndWriteOutput()
super_impose_app = None
l8_product.snow_mask = reprojected_mask