Commit 54e40139 authored by Germain Salgues's avatar Germain Salgues

UPDATE: the snow_annual_map processing now merge multiple snow masks that cover the same date

parent a211b481
......@@ -21,6 +21,7 @@ import sys
import os.path as op
import json
import csv
import copy
import logging
import subprocess
from datetime import datetime, timedelta
......@@ -53,7 +54,7 @@ def call_subprocess(process_list):
class prepare_data_for_snow_annual_map():
def __init__(self, params):
logging.info("Init snow_multitemp")
self.raw_params = params
self.raw_params = copy.deepcopy(params)
self.tile_id = params.get("tile_id")
self.date_start = str_to_datetime(params.get("date_start"), "%d/%m/%Y")
......@@ -61,13 +62,13 @@ class prepare_data_for_snow_annual_map():
self.date_margin = timedelta(days=params.get("date_margin", 0))
self.output_dates_filename = params.get("output_dates_filename", None)
self.mode = params.get("mode", "RUNTIME")
self.mission_tags = ["LANDSAT"]#["SENTINEL2"]
self.mission_tags = ["SENTINEL2"]#["LANDSAT"]#
self.snow_products_dir = str(params.get("snow_products_dir"))
self.path_tmp = str(params.get("path_tmp", os.environ.get('TMPDIR')))
self.input_products_list=params.get("input_products_list",[])
self.input_products_list=params.get("input_products_list",[]).copy()
logging.info(self.input_products_list)
self.processing_id = self.tile_id + "_" + \
datetime_to_str(self.date_start) + "_" + \
datetime_to_str(self.date_stop)
......@@ -76,7 +77,8 @@ class prepare_data_for_snow_annual_map():
self.use_densification = params.get("use_densification", False)
if self.use_densification:
self.mission_tags.append("LANDSAT")
self.densification_products_list=params.get("densification_products_list",[])
self.densification_products_list=params.get("densification_products_list",[]).copy()
logging.info(self.densification_products_list)
if not os.path.exists(self.path_out):
os.mkdir(self.path_out)
......@@ -133,15 +135,18 @@ class prepare_data_for_snow_annual_map():
df.loc[product_id, 'snow_product'] = expected_snow_product_path
logging.info(expected_snow_product_path)
# the snow product is already available
if op.exists(expected_snow_product_path):
logging.info(product_id + " is available as snow product")
snow_product_available += 1
df.loc[product_id, 'snow_product_available'] = True
snow_products_list.append(expected_snow_product_path)
# the L2A product is available in the datalake but request a snow detection
elif df.loc[product_id, 'available']:
logging.info(product_id + " requires to generate the snow product")
snow_processing_requested += 1
FileOut.write(df.loc[product_id, 'datalake']+"\n")
# the product must be requested into the datalake before snow detection
else:
logging.info(product_id + " requires to be requested to datalake.")
datalake_update_requested += 1
......@@ -153,21 +158,23 @@ class prepare_data_for_snow_annual_map():
self.snow_products_availability = float(snow_product_available/nb_products)
logging.info("Percent of available snow product : " + str(100*self.snow_products_availability) + "%")
self.datalake_products_availability = float(datalake_product_available/nb_products)
logging.info("Percent of available datalake product : " + str(100*self.datalake_products_availability) + "%")
# datalake update if not all the products are available
if datalake_update_requested > 0:
logging.info("Requesting an update of the datalake because of " + str(datalake_update_requested) + " unavailable products...")
amalthee_theia.fill_datalake()
# this will request all products of the request
# @TODO request only the products for which the snow products are not available
#amalthee_theia.fill_datalake()
logging.info("End of requesting datalake.")
if mission_tag == "LANDSAT":#"SENTINEL2":
if mission_tag == "SENTINEL2":#"LANDSAT":#
self.input_products_list.extend(snow_products_list)
else:
self.densification_products_list.extend(snow_products_list)
# request processing for remaining products
# request snow detection processing for listed products
FileOut.close()
if snow_processing_requested != 0:
self.process_snow_products(filename_i, snow_processing_requested)
......@@ -181,6 +188,8 @@ class prepare_data_for_snow_annual_map():
logging.error("No products available to compute snow annual map!!")
def build_json(self):
# the json is created only is more than 99.9% of the snow products are ready
# @TODO this param should not be hard coded
if self.snow_products_availability > 0.999:
snow_annual_map_param_json = os.path.join(self.path_out, "param.json")
logging.info("Snow annual map can be computed from: " + snow_annual_map_param_json)
......@@ -210,7 +219,7 @@ class prepare_data_for_snow_annual_map():
command.insert(2, "1-"+str(array_size+1))
print(" ".join(command))
try:
call_subprocess(command)
#call_subprocess(command)
logging.info("Order was submitted the snow annual map will soon be available.")
except:
logging.warning("Order was submitted the snow annual map will soon be available, but missinterpreted return code")
......@@ -236,12 +245,14 @@ def main():
"mode":"DEBUG",
"input_products_list":[],
"snow_products_dir":"/work/OT/siaa/Theia/Neige/PRODUITS_NEIGE_LIS_develop_1.5",
"path_tmp":"",
# path_tmp is an actual parameter but must only be uncomment with a correct path
# else the processing use $TMPDIR by default
#"path_tmp":"",
#"path_out":"/home/qt/salguesg/scratch/multitemp_workdir/tmp_test",
"path_out":"/work/OT/siaa/Theia/Neige/Snow_Annual_Maps_L8_Only",
"path_out":"/work/OT/siaa/Theia/Neige/Snow_Annual_Maps_L8_Densification_with_merging",
"ram":8192,
"nbThreads":6,
"use_densification":False,
"use_densification":True,
"densification_products_list":[],
"data_availability_check":False}
......@@ -260,7 +271,6 @@ def main():
config_file = prepare_data_for_snow_annual_map_app.build_json()
if config_file is not None:
prepare_data_for_snow_annual_map_app.process_snow_annual_map(config_file)
#return
if __name__== "__main__":
# Set logging level and format.
......
......@@ -42,6 +42,7 @@ from s2snow.snow_product_parser import load_snow_product
# syntaxx
GDAL_OPT = "?&gdal:co:NBITS=1&gdal:co:COMPRESS=DEFLATE"
def parse_xml(filepath):
""" Parse an xml file to return the zs value of a snow product
"""
......@@ -50,16 +51,8 @@ def parse_xml(filepath):
group = xmldoc.getElementsByTagName('Global_Index_List')[0]
zs = group.getElementsByTagName("QUALITY_INDEX")[0].firstChild.data
def findFiles(folder, pattern):
""" Search recursively into a folder to find a patern match
"""
matches = []
for root, dirs, files in os.walk(folder):
for file in files:
if re.match(pattern, file):
matches.append(os.path.join(root, file))
return matches
#TODO move this function in app_wrappers.py along other otb applications
def gap_filling(img_in, mask_in, img_out, input_dates_file=None,
output_dates_file=None, ram=None, out_type=None):
""" Create and configure the ImageTimeSeriesGapFilling application
......@@ -102,33 +95,39 @@ def gap_filling(img_in, mask_in, img_out, input_dates_file=None,
else:
logging.error("Parameters img_in, img_out and mask_in are required")
def merge_masks_at_same_date(mask_snow_list, mask_cloud_list, merged_snow_mask, merged_cloud_mask):
img_index = range(1, len(mask_cloud_list)+1)
expression_cloud_merging = "".join(["(im" + str(i) + "b1==0?0:" for i in img_index])
expression_cloud_merging += "1"
expression_cloud_merging += "".join([")" for i in img_index])
bandMathApp = band_math(mask_cloud_list,
merged_cloud_mask,
expression_cloud_merging,
self.ram,
otb.ImagePixelType_uint8)
bandMathApp.ExecuteAndWriteOutput()
bandMathApp = None
offset = len(mask_cloud_list)
expression_snow_merging = "".join(["(im" + str(i) + "b1==0?im"+str(i+offset)+"b1:" for i in img_index])
expression_snow_merging += "0"
expression_snow_merging += "".join([")" for i in img_index])
input_img_list = mask_cloud_list + mask_snow_list;
bandMathApp = band_math(input_img_list,
merged_snow_mask,
expression_snow_merging,
self.ram,
otb.ImagePixelType_uint8)
bandMathApp.ExecuteAndWriteOutput()
def merge_masks_at_same_date(snow_product_list, merged_snow_product, threshold=100, ram=None):
""" This function implement the fusion of multiple snow mask
Keyword arguments:
snow_product_list -- the input mask list
merged_snow_product -- the output filepath
threshold -- the threshold between valid <= invalid data
ram -- the ram limitation (not mandatory)
"""
logging.info("Merging products into " + merged_snow_product)
# the merging is performed according the following selection:
# if img1 < threshold use img1 data
# else if img2 < threshold use img2 data
# else if imgN < threshold use imgN data
# the order of the images in the input list is important:
# we expect to have first the main input products
# and then the densification products
img_index = range(1, len(snow_product_list)+1)
expression_merging = "".join(["(im" + str(i) + "b1<=" + str(threshold) + "?im" + str(i) + "b1:" for i in img_index])
expression_merging += "im"+str(img_index[-1])+"b1"
expression_merging += "".join([")" for i in img_index])
img_list= [i.get_snow_mask() for i in snow_product_list]
bandMathApp = band_math(img_list,
merged_snow_product,
expression_merging,
ram,
otb.ImagePixelType_uint8)
bandMathApp.ExecuteAndWriteOutput()
bandMathApp = None
""" This module provide the implementation of the snow annual map """
class snow_annual_map():
def __init__(self, params):
logging.info("Init snow_multitemp")
......@@ -140,16 +139,19 @@ class snow_annual_map():
self.output_dates_filename = params.get("output_dates_filename", None)
self.mode = params.get("mode", "RUNTIME")
# Compute an id like T31TCH_20170831_20180901 to label the map
self.processing_id = str(self.tile_id + "_" + \
datetime_to_str(self.date_start) + "_" + \
datetime_to_str(self.date_stop))
# input_dir is no longer used
#self.input_dir = str(op.join(params.get("snow_products_dir"), self.tile_id))
# Retrive the input_products_list
self.input_path_list = params.get("input_products_list", [])
#self.path_tmp = str(params.get("path_tmp", os.environ.get('TMPDIR')))
self.path_tmp = str(os.environ.get('TMPDIR'))
# @TODO an available path_tmp must be provide or the TMPDIR variable must be avaible
self.path_tmp = str(params.get("path_tmp", os.environ.get('TMPDIR')))
if not os.path.exists(self.path_tmp):
logging.error(self.path_tmp + ", the target does not exist and can't be used for processing")
self.path_out = op.join(str(params.get("path_out")), self.processing_id)
if not os.path.exists(self.path_out):
......@@ -162,7 +164,7 @@ class snow_annual_map():
if self.use_densification:
self.densification_path_list = params.get("densification_products_list", [])
# Define label for output snow product
# Define label for output snow product (cf snow product format)
self.label_no_snow = "0"
self.label_snow = "100"
self.label_cloud = "205"
......@@ -186,66 +188,58 @@ class snow_annual_map():
os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"] = str(self.nbThreads)
# search matching snow product
self.product_list = self.load_products(self.input_path_list, self.tile_id, None)
logging.debug("Product list:")
logging.debug(self.product_list)
self.product_dict = self.load_products(self.input_path_list, self.tile_id, None)
logging.debug("Product dictionnary:")
logging.debug(self.product_dict)
if not self.product_list:
# Exiting with error if none of the input products were loaded
if not self.product_dict:
logging.error("Empty product list!")
return
# @TODO clean the loading of the products to densify the timeserie
# Do the loading of the products to densify the timeserie
if self.use_densification:
# load densification snow products
densification_product_list = self.load_products(self.densification_path_list, None, None)
logging.info("Densification product list:")
logging.info(densification_product_list)
s2_input_dates = [datetime_to_str(product.acquisition_date) \
for product in self.product_list]
s2_footprint_ref = self.product_list[0].get_snow_mask()
# filter densification products to keep only products that don't overlap an existing S2 date
final_densification_product_list = []
for densifier_product in densification_product_list:
if not datetime_to_str(densifier_product.acquisition_date) in s2_input_dates:
final_densification_product_list.append(densifier_product)
logging.info(final_densification_product_list)
if len(final_densification_product_list) > 0:
# reproject the densification products on S2 tile before going further
for densifier_product in final_densification_product_list:
original_mask = densifier_product.get_snow_mask()
reprojected_mask = op.join(self.path_tmp,
densifier_product.product_name + "_reprojected.tif")
if not os.path.exists(reprojected_mask):
super_impose_app = super_impose(s2_footprint_ref,
original_mask,
reprojected_mask,
"nn",
int(self.label_no_data),
self.ram,
otb.ImagePixelType_uint8)
super_impose_app.ExecuteAndWriteOutput()
super_impose_app = None
densifier_product.snow_mask = reprojected_mask
logging.debug(densifier_product.snow_mask)
logging.info("First densifying mask: " + final_densification_product_list[0].snow_mask)
logging.info("Last densifying mask: " + final_densification_product_list[-1].snow_mask)
self.product_list.extend(final_densification_product_list)
densification_product_dict = self.load_products(self.densification_path_list, None, None)
logging.info("Densification product dict:")
logging.info(densification_product_dict)
# Get the footprint of the first snow product
s2_footprint_ref = self.product_dict[list(self.product_dict.keys())[0]][0].get_snow_mask()
if densification_product_dict:
# Reproject the densification products on S2 tile before going further
for densifier_product_key in densification_product_dict.keys():
for densifier_product in densification_product_dict[densifier_product_key]:
original_mask = densifier_product.get_snow_mask()
reprojected_mask = op.join(self.path_tmp,
densifier_product.product_name + "_reprojected.tif")
if not os.path.exists(reprojected_mask):
super_impose_app = super_impose(s2_footprint_ref,
original_mask,
reprojected_mask,
"nn",
int(self.label_no_data),
self.ram,
otb.ImagePixelType_uint8)
super_impose_app.ExecuteAndWriteOutput()
super_impose_app = None
densifier_product.snow_mask = reprojected_mask
logging.debug(densifier_product.snow_mask)
# Add the products to extend the self.product_dict
if densifier_product_key in self.product_dict.keys():
self.product_dict[densifier_product_key].extend(densification_product_dict[densifier_product_key])
else:
self.product_dict[densifier_product_key] = densification_product_dict[densifier_product_key]
else:
logging.warning("No Densifying candidate product found!")
# re-order products according acquisition date
self.product_list.sort(key=lambda x: x.acquisition_date)
logging.debug(self.product_list)
input_dates = [datetime_to_str(product.acquisition_date) for product in self.product_list]
input_dates = sorted(self.product_dict.keys())
write_list_to_file(self.input_dates_filename, input_dates)
# compute or retrive the output dates
output_dates = []
if op.exists(self.output_dates_filename):
output_dates = read_list_from_file(self.output_dates_filename)
......@@ -259,10 +253,18 @@ class snow_annual_map():
shutil.copy2(self.input_dates_filename, self.path_out)
shutil.copy2(self.output_dates_filename, self.path_out)
# load required product
self.snowmask_list = self.get_snow_masks()
logging.debug("Snow mask list:")
logging.debug(self.snowmask_list)
# merge products at the same date
self.resulting_snow_mask_dict={}
for key in self.product_dict.keys():
if len(self.product_dict[key]) > 1:
merged_mask = op.join(self.path_tmp, key + "_merged_snow_product.tif")
merge_masks_at_same_date(self.product_dict[key],
merged_mask,
self.label_snow,
self.ram)
self.resulting_snow_mask_dict[key] = merged_mask
else:
self.resulting_snow_mask_dict[key] = self.product_dict[key][0].get_snow_mask()
# convert the snow masks into binary snow masks
expression = "(im1b1==" + self.label_snow + ")?1:0"
......@@ -324,7 +326,7 @@ class snow_annual_map():
shutil.copy2(self.gapfilled_timeserie, self.path_out)
app_gap_filling = None
# generate the summary map
# generate the annual map
band_index = range(1, len(output_dates)+1)
expression = "+".join(["im1b" + str(i) for i in band_index])
......@@ -349,13 +351,14 @@ class snow_annual_map():
def load_products(self, snow_products_list, tile_id=None, product_type=None):
logging.info("Parsing provided snow products list")
product_list = []
product_dict = {}
search_start_date = self.date_start - self.date_margin
search_stop_date = self.date_stop + self.date_margin
for product_path in snow_products_list:
try:
product = load_snow_product(str(product_path))
logging.info(str(product))
current_day = datetime_to_str(product.acquisition_date)
test_result = True
if search_start_date > product.acquisition_date or \
search_stop_date < product.acquisition_date:
......@@ -365,26 +368,24 @@ class snow_annual_map():
if (product_type is not None) and (product_type not in product.platform):
test_result = False
if test_result:
product_list.append(product)
if current_day not in product_dict.keys():
product_dict[current_day] = [product]
else:
product_dict[current_day].append(product)
logging.info("Keeping: " + str(product))
else:
logging.warning("Discarding: " + str(product))
except Exception:
logging.error("Unable to load product :" + product_path)
return product_list
def get_snow_masks(self):
return [i.get_snow_mask() for i in self.product_list]
return product_dict
def convert_mask_list(self, expression, type_name, mask_format=""):
binary_mask_list = []
for product in self.product_list:
mask_in = product.get_snow_mask()
for mask_date in sorted(self.resulting_snow_mask_dict):
binary_mask = op.join(self.path_tmp,
product.product_name + "_" + type_name + "_binary.tif")
binary_mask = self.extract_binary_mask(mask_in,
mask_date + "_" + type_name + "_binary.tif")
binary_mask = self.extract_binary_mask(self.resulting_snow_mask_dict[mask_date],
binary_mask,
expression,
mask_format)
......@@ -400,32 +401,3 @@ class snow_annual_map():
otb.ImagePixelType_uint8)
bandMathApp.ExecuteAndWriteOutput()
return mask_out
###############################################################
# Main Test
###############################################################
def main():
params = {"tile_id":"T32TPS",
"date_start":"01/09/2017",
"date_stop":"31/08/2018",
"date_margin":15,
"mode":"DEBUG",
"input_products_list":[],
"snow_products_dir":"/work/OT/siaa/Theia/Neige/PRODUITS_NEIGE_LIS_develop_1.5",
"path_tmp":"",
"path_out":"/work/OT/siaa/Theia/Neige/Snow_Annual_Maps_FIX/tmp",
"ram":4096,
"nbThreads":6,
"use_densification":True,
"densification_products_list":[],
"data_availability_check":False}
snow_annual_map_app = snow_annual_map(params)
snow_annual_map_app.run()
if __name__ == '__main__':
# Set logging level and format.
logging.basicConfig(level=logging.DEBUG, format=\
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
main()
......@@ -149,25 +149,24 @@ class snow_annual_map_evaluation(snow_annual_map):
os.environ["ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS"] = str(self.nbThreads)
# search matching comparison snow product
self.product_list = self.load_products(self.comparison_path_list, None, None)
logging.debug("Product list:")
logging.debug(self.product_list)
# re-order products according acquisition date
self.product_list.sort(key=lambda x: x.acquisition_date)
logging.debug("Sorted product list:")
logging.debug(self.product_list)
self.product_dict = self.load_products(self.comparison_path_list, None, None)
logging.debug("Product dict:")
logging.debug(self.product_dict)
# create the comparison products dates file
comparison_input_dates = []
for product in self.product_list:
comparison_input_dates.append(datetime_to_str(product.acquisition_date))
comparison_input_dates = list(sorted(self.product_dict.keys()))
write_list_to_file(self.comparison_dates_filename, comparison_input_dates)
# load required product
self.snowmask_list = self.get_snow_masks()
logging.debug("Comparison snow mask list:")
logging.debug(self.snowmask_list)
self.resulting_snow_mask_dict={}
for key in self.product_dict.keys():
if len(self.product_dict[key]) > 1:
comparison_tag = key + "_comparison"
merged_mask = op.join(self.path_tmp, comparison_tag + "_merged_snow_product.tif")
merge_masks_at_same_date(self.product_dict[key], merged_mask, self.label_snow, self.ram)
self.resulting_snow_mask_dict[comparison_tag] = merged_mask
else:
self.resulting_snow_mask_dict[comparison_tag] = self.product_dict[key][0].get_snow_mask()
# convert the snow masks into binary snow masks
expression = "im1b1=="+self.label_cloud+"?2:(im1b1=="+self.label_no_data+"?2:" \
......@@ -453,11 +452,15 @@ def main():
"date_stop":"31/08/2016",
"date_margin":15,
"mode":"DEBUG",
"input_dir":"/work/OT/siaa/Theia/Neige/output_muscate_v2pass2red40/T31TCH",
"path_tmp":os.environ['TMPDIR'],
"input_products_list":[],
"snow_products_dir":"/work/OT/siaa/Theia/Neige/PRODUITS_NEIGE_LIS_develop_1.5",
"path_tmp":os.environ.get('TMPDIR'),
"path_out":"/home/qt/salguesg/scratch/workdir",
"ram":4096,
"nbThreads":8,
"use_densification":False,
"densification_products_list":[],
"data_availability_check":False,
"run_comparison_evaluation":True,
"comparison_products_list":[],
"run_modis_comparison":True,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment