Skip to content
Snippets Groups Projects
Commit 71972466 authored by Aurore Dupuis's avatar Aurore Dupuis
Browse files

Refactoring snow products reading.

parent 0681aaae
No related branches found
No related tags found
3 merge requests!90Develop,!84Merge develop,!78Resolve "Synthesis on zipped products does not work"
......@@ -110,3 +110,8 @@ OUTPUT_DATES_FILE = "output_dates.txt"
INPUT_PARAMETER_ERROR = -2
UNKNOWN_PRODUCT_EXCEPTION = -3
CONFIGURATION_ERROR = -4
# Date Time format
MUSCATE_DATETIME_FORMAT = "%Y%m%d-%H%M%S-%f"
LANDSAT_DATETIME_FORMAT = "%Y%m%d"
LIS_DATETIME_FORMAT = "%Y%m%dT%H%M%S"
......@@ -38,3 +38,13 @@ class UnknownPlatform(Exception):
class NoProductMatchingSynthesis(Exception):
def __init__(self, msg):
self.msg = msg
class NoSnowProductFound(Exception):
def __init__(self, msg):
self.msg = msg
class NoZipFound(Exception):
def __init__(self, msg):
self.msg = msg
\ No newline at end of file
......@@ -26,15 +26,12 @@ import os.path as op
import zipfile
from os.path import basename, dirname
from s2snow.lis_constant import SENTINEL2, LIS, LANDSAT8_OLITIRS_XS, LANDSAT8, N2A
from s2snow.lis_exception import UnknownPlatform
from s2snow.utils import str_to_datetime
from s2snow.lis_constant import SENTINEL2, LIS, LANDSAT8_OLITIRS_XS, LANDSAT8, N2A, MUSCATE_DATETIME_FORMAT, \
LIS_DATETIME_FORMAT, LANDSAT_DATETIME_FORMAT
from s2snow.lis_exception import UnknownPlatform, NoSnowProductFound, NoZipFound
MUSCATE_DATETIME_FORMAT = "%Y%m%d-%H%M%S-%f"
LIS_DATETIME_FORMAT = "%Y%m%dT%H%M%S"
class snow_product:
class SnowProduct:
def __init__(self, absoluteFilename):
# example 1 "SENTINEL2A_20160912-103551-370_L2B-SNOW_T32TLS_D_V1-0"
# example 2 "LANDSAT8_OLITIRS_XS_20160812_N2A_France-MetropoleD0005H0001"
......@@ -44,100 +41,64 @@ class snow_product:
logging.debug("absoluteFilename : " + absoluteFilename)
self.product_name = basename(absoluteFilename)
self.acquisition_date = None
self.tile_id = None
self.snow_mask = None
self.metadata_file = None
self.product_path = dirname(absoluteFilename)
name_splitted = self.product_name.split("_")
self.platform = name_splitted[0]
logging.debug("product_name : " + self.product_name)
logging.debug("product_path : " + self.product_path)
logging.debug("platform : " + self.platform)
platform = name_splitted[0]
if SENTINEL2 in self.platform :
if SENTINEL2 in platform or LANDSAT8_OLITIRS_XS == platform:
self.acquisition_date = datetime.strptime(name_splitted[1], MUSCATE_DATETIME_FORMAT)
logging.debug("acquisition_date : " + str(self.acquisition_date))
self.product_level = name_splitted[2]
logging.debug("product_level : " + self.product_level)
self.tile_id = name_splitted[3]
logging.debug("tile_id : " + self.tile_id)
self.flag = name_splitted[4]
logging.debug("flag : " + self.flag)
self.product_version = name_splitted[5]
logging.debug("product_version : " + self.product_version)
elif LANDSAT8_OLITIRS_XS == self.platform:
self.acquisition_date = datetime.strptime(name_splitted[1], MUSCATE_DATETIME_FORMAT)
logging.debug("acquisition_date : " + str(self.acquisition_date))
self.product_level = name_splitted[2]
logging.debug("product_level : " + self.product_level)
self.tile_id = name_splitted[3]
logging.debug("tile_id : " + self.tile_id)
self.flag = name_splitted[4]
logging.debug("flag : " + self.flag)
self.product_version = name_splitted[5]
logging.debug("product_version : " + self.product_version)
elif LANDSAT8 in self.platform and N2A in self.product_name:
self.acquisition_date = datetime.strptime(name_splitted[3], "%Y%m%d")
logging.debug("acquisition_date : " + str(self.acquisition_date))
self.product_level = name_splitted[4]
logging.debug("product_level : " + self.product_level)
elif LANDSAT8 in platform and N2A in self.product_name:
self.acquisition_date = datetime.strptime(name_splitted[3], LANDSAT_DATETIME_FORMAT)
self.tile_id = name_splitted[5]
logging.debug("tile_id : " + self.tile_id)
self.flag = None
self.product_version = None
elif LIS in self.platform:
# FSC product
elif LIS in platform:
self.acquisition_date = datetime.strptime(name_splitted[3], LIS_DATETIME_FORMAT)
logging.debug("acquisition_date : " + str(self.acquisition_date))
self.product_level = "L2B"
logging.debug("product_level : " + self.product_level)
self.tile_id = name_splitted[2]
logging.debug("tile_id : " + self.tile_id)
self.flag = None
self.product_version = name_splitted[4]
logging.debug("product_version : " + self.product_version)
else:
msg = "Unknown platform: " + self.platform
msg = "Unknown platform or producer: " + platform
logging.error(msg)
raise UnknownPlatform(msg)
self.zip_product = None
self.is_extracted = False
self.snow_mask = None
self.is_fsc = False
self.define_snow_mask(absoluteFilename)
self.define_metadata_fil(absoluteFilename)
self.log_product()
def define_metadata_fil(self, absoluteFilename):
self.metadata_file = op.join(absoluteFilename,
self.product_name + "_MTD_ALL.xml")
if not op.exists(self.metadata_file):
self.metadata_file = op.join(absoluteFilename,
self.product_name + ".xml")
logging.debug("metadata_file : " + self.metadata_file)
self.sub_files = os.listdir(absoluteFilename)
for sub_file in self.sub_files:
def define_snow_mask(self, absolute_filename):
for sub_file in os.listdir(absolute_filename):
if sub_file.lower().endswith(".zip"):
logging.info("The snow product is stored in a zip")
self.zip_product = op.join(absoluteFilename, sub_file)
logging.info("The snow product is stored in a zip, extracting zip file.")
self.extract_snow_mask(op.join(absolute_filename, sub_file), absolute_filename)
self.define_snow_mask(absolute_filename)
if sub_file == self.product_name:
logging.info("The zipped snow product is already extracted")
self.is_extracted = True
self.extracted_product = op.join(absoluteFilename, sub_file)
self.snow_mask = op.join(self.extracted_product,
self.snow_mask = op.join(op.join(absolute_filename, sub_file),
self.product_name + "_SNW_R2.tif")
if sub_file.upper().endswith("_SNW_R2.TIF"):
self.is_extracted = True
self.snow_mask = op.join(absoluteFilename, sub_file)
self.snow_mask = op.join(absolute_filename, sub_file)
if sub_file.upper().endswith("_SNW_XS.TIF"):
self.is_extracted = True
self.snow_mask = op.join(absoluteFilename, sub_file)
self.snow_mask = op.join(absolute_filename, sub_file)
if sub_file.upper() == "LIS_PRODUCTS":
self.is_extracted = True
self.extracted_product = op.join(absoluteFilename, sub_file)
self.snow_mask = op.join(self.extracted_product, "LIS_SEB.TIF")
self.snow_mask = op.join(op.join(absolute_filename, sub_file), "LIS_SEB.TIF")
if "SNOW-FSC_" in sub_file or "SNOW-FSC-TOC" in sub_file:
self.is_extracted = True
self.snow_mask = op.join(absoluteFilename, sub_file)
logging.debug("snow_mask : " + self.snow_mask)
self.is_fsc = True
self.snow_mask = op.join(absolute_filename, sub_file)
if "SNOW-MSK" in sub_file:
self.is_extracted = True
self.snow_mask = op.join(absoluteFilename, sub_file)
logging.debug("snow_mask : " + self.snow_mask)
self.is_fsc = False
self.metadata_file = op.join(absoluteFilename,
self.product_name + "_MTD_ALL.xml")
self.snow_mask = op.join(absolute_filename, sub_file)
logging.debug("snow_mask : " + self.snow_mask)
def __repr__(self):
return op.join(self.product_path, self.product_name)
......@@ -145,32 +106,34 @@ class snow_product:
def __str__(self):
return op.join(self.product_path, self.product_name)
def extract_snow_mask(self, output_folder):
if self.snow_mask and op.exists(self.snow_mask):
logging.info("The snow mask is already extracted and available")
elif self.zip_product and op.exists(self.zip_product):
extracted_files = extract_from_zipfile(self.zip_product,
output_folder,
["_SNW_R2.tif"])
self.snow_mask = extracted_files[0]
def log_product(self):
logging.debug("product_name : " + self.product_name)
logging.debug("acquisition_date : " + str(self.acquisition_date))
logging.debug("tile_id : " + self.tile_id)
logging.debug("snow_mask : " + self.snow_mask)
logging.debug("metadata_file : " + self.metadata_file)
def extract_snow_mask(self, zip_file, output_folder):
if op.exists(zip_file):
extract_from_zipfile(zip_file, output_folder, ["_SNW_R2.tif", "_MTD_ALL.xml"])
else:
logging.error("Extraction failed")
msg = "Extraction failed - zipfile does not exist : " + zip_file
logging.error(msg)
raise NoZipFound(msg)
def get_snow_mask(self):
if self.snow_mask and op.exists(self.snow_mask):
if self.is_fsc:
#TODO transform FSC en masque de neige
return self.snow_mask
else:
return self.snow_mask
return self.snow_mask
else:
logging.info("The snow mask must first be extracted")
msg = "The snow mask has not been found."
logging.error(msg)
raise NoSnowProductFound(msg)
def get_metadata(self):
if self.metadata_file and op.exists(self.metadata_file):
return self.metadata_file
else:
logging.info("The metadata file was not found")
logging.info("The metadata file has not been found.")
def extract_from_zipfile(file_name, output_folder, patterns=[]):
......
......@@ -38,7 +38,7 @@ from s2snow.compute_SOD_SMOD import compute_SOD_SMOD
from s2snow.lis_constant import TMP_DIR, OUTPUT_DATES_FILE
from s2snow.lis_exception import NoProductMatchingSynthesis
from s2snow.otb_wrappers import band_math, super_impose, band_mathX, gap_filling, get_app_output
from s2snow.snow_product import snow_product
from s2snow.snow_product import SnowProduct
from s2snow.utils import datetime_to_str
from s2snow.utils import write_list_to_file, read_list_from_file
......@@ -445,7 +445,7 @@ def load_snow_products(date_start, date_stop, date_margin, snow_products_list, t
for product_path in snow_products_list:
logging.debug("product_path : %s", str(product_path))
product = snow_product(str(product_path))
product = SnowProduct(str(product_path))
logging.debug("product : %s", str(product))
acquisition_day = datetime.strftime(product.acquisition_date, "%Y%m%d")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment