From 0b0a3dffaec900519a6034a95cc2b00b3c8f13f7 Mon Sep 17 00:00:00 2001 From: Julien Malik <julien.malik@c-s.fr> Date: Mon, 21 Mar 2011 11:00:17 +0100 Subject: [PATCH] REFAC: add a StreamingManager class, rework the streaming management of StreamingImageVirtualFileWriter - Add a StreamingManager class, integrating the PipelineMemoryPrintCalculator, and managing several streaming policies (stripped by specified number of lines, stripped by available memory, tiled by specified tile dimension, tiled by available memory) - In tile mode, the class uses the new ImageRegionSquareTileSplitter, which generates square tiles - Integrate the StreamingManager into the StreamingImageVirtualWriter as the only object responsible for streaming policy. The filter relies on this object to get the number of splits and iterate in the splits, without knowing the underlying implementation - Add proper progress reporting in the StreamingImageVirtualWriter, by observing the progress events generated by the source object, to provide a continuous progress update instead of a tile by tile progress - Make the necessary changes in the tests to account for new streaming parameters specifications --- Code/Common/otbStreamingManager.h | 196 ++++++++++++ Code/Common/otbStreamingManager.txx | 291 ++++++++++++++++++ Code/IO/otbStreamingImageVirtualWriter.h | 99 +++--- Code/IO/otbStreamingImageVirtualWriter.txx | 256 ++++----------- .../otbMatrixTransposeMatrixImageFilter.cxx | 2 +- ...StreamingInnerProductVectorImageFilter.cxx | 2 +- .../otbStreamingMinMaxImageFilter.cxx | 2 +- .../otbStreamingMinMaxVectorImageFilter.cxx | 2 +- .../otbStreamingStatisticsImageFilter.cxx | 2 +- ...tbStreamingStatisticsVectorImageFilter.cxx | 2 +- 10 files changed, 583 insertions(+), 271 deletions(-) create mode 100644 Code/Common/otbStreamingManager.h create mode 100644 Code/Common/otbStreamingManager.txx diff --git a/Code/Common/otbStreamingManager.h b/Code/Common/otbStreamingManager.h new file mode 100644 index 0000000000..6d5d81b43a --- /dev/null +++ b/Code/Common/otbStreamingManager.h @@ -0,0 +1,196 @@ +/*========================================================================= + + Program: ORFEO Toolbox + Language: C++ + Date: $Date$ + Version: $Revision$ + + + Copyright (c) Centre National d'Etudes Spatiales. All rights reserved. + See OTBCopyright.txt for details. + + + This software is distributed WITHOUT ANY WARRANTY; without even + the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + PURPOSE. See the above copyright notices for more information. + +=========================================================================*/ +#ifndef __otbStreamingManager_h +#define __otbStreamingManager_h + +#include "otbMacro.h" +#include "otbConfigure.h" + +#include "itkLightObject.h" +#include "itkImageRegionSplitter.h" + +namespace otb +{ + +namespace StreamingManagement +{ + enum StreamingMode + { + /** Estimates the memory used by the pipeline + * and set the strips size to fit the specified + * available RAM + */ + STRIPPED_AVAILABLE_RAM, + + /** Force the use of a specific number of lines per strips + */ + STRIPPED_SET_NUMBEROFLINES, + + /** Estimates the memory used that will be used during the pipeline + * execution and set the tile size to fit the specified + * available RAM. + */ + TILED_AVAILABLE_RAM, + + /** Force the use of a specific tile dimension + * The associated parameter is the size used in each dimension + */ + TILED_SET_TILE_SIZE, + }; +} + +/** \class StreamingManager + * \brief This class handles the streaming process used in the writers implementation + * + * The streaming mode can be chosen with either SetStrippedRAMStreamingMode, SetStrippedNumberOfLinesStreamingMode, + * SetTiledRAMStreamingMode, or SetTiledTileDimensionStreamingMode. + * + * Then, PrepareStreaming must be called so that the stream type and dimensions are computed + * This involves passing the actual DataObject who will be written, since it will be used + * during memory estimation for some specific streaming modes. + * + * After PrepareStreaming has been called, the actual number of splits and streaming mode which will be used + * can be retrieved with GetStreamingMode and GetNumberOfSplits. + * The different splits can be retrieved with GetSplit + * + * \sa StreamingImageFileWriter + * \sa StreamingImageVirtualFileWriter + */ +template<class TImage> +class ITK_EXPORT StreamingManager : public itk::LightObject +{ +public: + /** Standard class typedefs. */ + typedef StreamingManager Self; + typedef itk::LightObject Superclass; + typedef itk::SmartPointer<Self> Pointer; + typedef itk::SmartPointer<const Self> ConstPointer; + + typedef TImage ImageType; + typedef typename ImageType::Pointer ImagePointerType; + typedef typename ImageType::RegionType RegionType; + typedef typename RegionType::IndexType IndexType; + typedef typename RegionType::SizeType SizeType; + typedef typename ImageType::InternalPixelType PixelType; + + typedef StreamingManagement::StreamingMode StreamingModeType; + + /** Creation through object factory macro */ + itkNewMacro(Self); + + /** Type macro */ + itkTypeMacro(StreamingManager, itk::LightObject); + + /** Dimension of input image. */ + itkStaticConstMacro(ImageDimension, unsigned int, ImageType::ImageDimension); + + /** Use stripped mode. + * The number of lines of the strips are computed by estimating the + * memory footprint of the pipeline, and chosen so that no more than + * availableRAM MBytes of memory are used. + * If no parameter is given, then the available RAM is retrieved from the + * OTB configuration file or build configuration option */ + virtual void SetStrippedRAMStreamingMode( unsigned int availableRAMInMB = 0 ); + + /** Use stripped mode. + * The number of lines of the strips are explicitely specified */ + virtual void SetStrippedNumberOfLinesStreamingMode( unsigned int numberOfLines ); + + /** Use tiled mode. + * The dimensions of the tiles are computed by estimating the + * memory footprint of the pipeline, and chosen so that no more than + * availableRAM MBytes of memory are used. + * + * If no parameter is given, then the available RAM is retrieved from the + * OTB configuration file or build configuration option. + * + * The tiles are set to be square, with dimension aligned with multiple of 16 */ + virtual void SetTiledRAMStreamingMode( unsigned int availableRAMInMB = 0 ); + + /** Use tiled mode. + * The dimension of the tile are explicitely specified. + * The parameter specifies the size of the tile in each dimension. */ + virtual void SetTiledTileDimensionStreamingMode( unsigned int tileDimension ); + + /** Actually computes the stream divisions, accorfing to the specified streaming mode, + * eventually using the input parameter to estimate memory consumption */ + virtual void PrepareStreaming(itk::DataObject * input, const RegionType ®ion); + + /** Returns the actual streaming mode that will be used to process the image. + * PrepareStreaming() must have been called before. + * This can be different than the required streaming mode. For example, if + * the input passed to PrepareStreaming() is fully buffered, then + * the STRIPPED_SET_NUMBEROFLINES mode is used with only one strip */ + virtual StreamingManagement::StreamingMode GetStreamingMode(); + + /** Returns the actual number of pieces that will be used to process the image. + * PrepareStreaming() must have been called before. + * This can be different than the requested number */ + virtual unsigned int GetNumberOfSplits(); + + /** Get a region definition that represents the ith piece a specified region. + * The "numberOfPieces" must be equal to what + * GetNumberOfSplits() returns. */ + virtual RegionType GetSplit(unsigned int i); + + +public: + StreamingManager(); + virtual ~StreamingManager(); + + virtual unsigned int EstimateOptimalNumberOfDivisions(itk::DataObject * input, const RegionType ®ion); + + /** The desired streaming mode specified by the user */ + StreamingManagement::StreamingMode m_DesiredMode; + + /** The actual streaming mode which will be used */ + StreamingManagement::StreamingMode m_ActualMode; + + /** The available RAM set as parameter when using the STRIPPED_AVAILABLE_RAM or TILED_AVAILABLE_RAM mode */ + unsigned int m_AvailableRAMInMB; + + /** The desired number of lines when using the STRIPPED_SET_NUMBEROFLINES streaming mode */ + unsigned int m_DesiredNumberOfLines; + + /** The desired tile dimension when using the TILED_SET_TILE_SIZE streaming mode */ + unsigned int m_DesiredTileDimension; + + /** The computed number of splits after PrepareStreaming has been called */ + unsigned int m_ComputedNumberOfSplits; + + /** The region to stream */ + RegionType m_Region; + + /** The splitter used to compute the different strips */ + typedef itk::ImageRegionSplitter<itkGetStaticConstMacro(ImageDimension)> AbstractSplitterType; + typedef typename AbstractSplitterType::Pointer AbstractSplitterPointerType; + AbstractSplitterPointerType m_Splitter; + +private: + StreamingManager(const StreamingManager &); //purposely not implemented + void operator =(const StreamingManager&); //purposely not implemented + +}; + +} // End namespace otb + +#ifndef OTB_MANUAL_INSTANTIATION +#include "otbStreamingManager.txx" +#endif + +#endif diff --git a/Code/Common/otbStreamingManager.txx b/Code/Common/otbStreamingManager.txx new file mode 100644 index 0000000000..5816141f67 --- /dev/null +++ b/Code/Common/otbStreamingManager.txx @@ -0,0 +1,291 @@ +/*========================================================================= + + Program: ORFEO Toolbox + Language: C++ + Date: $Date$ + Version: $Revision$ + + + Copyright (c) Centre National d'Etudes Spatiales. All rights reserved. + See OTBCopyright.txt for details. + + + This software is distributed WITHOUT ANY WARRANTY; without even + the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + PURPOSE. See the above copyright notices for more information. + +=========================================================================*/ +#ifndef __otbStreamingManager_txx +#define __otbStreamingManager_txx + +#include "otbStreamingManager.h" +#include "otbMacro.h" +#include "otbConfigure.h" +#include "otbConfigurationFile.h" +#include "otbPipelineMemoryPrintCalculator.h" +#include "itkExtractImageFilter.h" + +#include "otbImageRegionSquareTileSplitter.h" +#include "itkImageRegionSplitter.h" + +namespace otb +{ + +template <class TImage> +StreamingManager<TImage>::StreamingManager() + : m_DesiredMode(StreamingManagement::TILED_AVAILABLE_RAM), + m_ActualMode(StreamingManagement::TILED_AVAILABLE_RAM), + m_AvailableRAMInMB(0), + m_DesiredNumberOfLines(0), + m_DesiredTileDimension(0), + m_ComputedNumberOfSplits(0) +{ +} + +template <class TImage> +StreamingManager<TImage>::~StreamingManager() +{ +} + +template <class TImage> +void +StreamingManager<TImage>::SetStrippedRAMStreamingMode( unsigned int availableRAMInMB ) +{ + m_DesiredMode = StreamingManagement::STRIPPED_AVAILABLE_RAM; + m_AvailableRAMInMB = availableRAMInMB; +} + +template <class TImage> +void +StreamingManager<TImage>::SetStrippedNumberOfLinesStreamingMode( unsigned int numberOfLines ) +{ + m_DesiredMode = StreamingManagement::STRIPPED_SET_NUMBEROFLINES; + m_DesiredNumberOfLines = numberOfLines; +} + +template <class TImage> +void +StreamingManager<TImage>::SetTiledRAMStreamingMode( unsigned int availableRAMInMB ) +{ + otbMsgDevMacro(<< "StreamingManager::SetTiledRAMStreamingMode " << availableRAMInMB) + m_DesiredMode = StreamingManagement::TILED_AVAILABLE_RAM; + m_AvailableRAMInMB = availableRAMInMB; +} + +template <class TImage> +void +StreamingManager<TImage>::SetTiledTileDimensionStreamingMode( unsigned int tileDimension ) +{ + m_DesiredMode = StreamingManagement::TILED_SET_TILE_SIZE; + m_DesiredTileDimension = tileDimension; +} + +template <class TImage> +void +StreamingManager<TImage>::PrepareStreaming( itk::DataObject * input, const RegionType ®ion ) +{ + switch (m_DesiredMode) + { + case StreamingManagement::STRIPPED_AVAILABLE_RAM: + { + otbMsgDevMacro(<< "Activating STRIPPED_AVAILABLE_RAM streaming mode") + unsigned long nbDivisions = EstimateOptimalNumberOfDivisions(input, region); + m_Splitter = itk::ImageRegionSplitter<itkGetStaticConstMacro(ImageDimension)>::New(); + m_ComputedNumberOfSplits = m_Splitter->GetNumberOfSplits(region, nbDivisions); + + otbMsgDevMacro(<< "Number of split : " << m_ComputedNumberOfSplits) + } + break; + + case StreamingManagement::STRIPPED_SET_NUMBEROFLINES: + { + otbMsgDevMacro(<< "Activating STRIPPED_SET_NUMBEROFLINES streaming mode") + if (m_DesiredNumberOfLines < 1) + { + itkWarningMacro(<< "DesiredNumberOfLines set to 0 : use 1 as strip number of lines") + m_DesiredNumberOfLines = 1; + } + + /* Calculate number of split */ + unsigned long numberLinesOfRegion = region.GetSize()[1]; // Y dimension + unsigned long nbSplit; + if (numberLinesOfRegion > m_DesiredNumberOfLines) + { + nbSplit = + static_cast<unsigned long>(vcl_ceil(static_cast<double>(numberLinesOfRegion) / + static_cast<double>(m_DesiredNumberOfLines))); + } + else + { + // Don't stream + nbSplit = 1; + } + + m_Splitter = itk::ImageRegionSplitter<itkGetStaticConstMacro(ImageDimension)>::New(); + m_ComputedNumberOfSplits = m_Splitter->GetNumberOfSplits(region, nbSplit); + otbMsgDevMacro(<< "Number of split : " << m_ComputedNumberOfSplits) + } + break; + + case StreamingManagement::TILED_AVAILABLE_RAM: + { + otbMsgDevMacro(<< "Activating TILED_AVAILABLE_RAM streaming mode") + unsigned long nbDivisions = EstimateOptimalNumberOfDivisions(input, region); + m_Splitter = otb::ImageRegionSquareTileSplitter<itkGetStaticConstMacro(ImageDimension)>::New(); + m_ComputedNumberOfSplits = m_Splitter->GetNumberOfSplits(region, nbDivisions); + otbMsgDevMacro(<< "Number of split : " << m_ComputedNumberOfSplits) + } + break; + + case StreamingManagement::TILED_SET_TILE_SIZE: + { + otbMsgDevMacro(<< "Activating TILED_SET_TILE_SIZE streaming mode") + if (m_DesiredTileDimension < 16) + { + itkWarningMacro(<< "DesiredTileDimension inferior to 16 : use 16 as tile dimension") + m_DesiredTileDimension = 16; + } + + // Calculate number of split + m_Splitter = otb::ImageRegionSquareTileSplitter<itkGetStaticConstMacro(ImageDimension)>::New(); + unsigned int nbDesiredTiles = itk::Math::Ceil<unsigned int>( double(region.GetNumberOfPixels()) / (m_DesiredTileDimension * m_DesiredTileDimension) ); + m_ComputedNumberOfSplits = m_Splitter->GetNumberOfSplits(region, nbDesiredTiles); + otbMsgDevMacro(<< "Number of split : " << m_ComputedNumberOfSplits) + } + break; + } + + // Save the region to generate the splits later + m_Region = region; +} + + +template <class TImage> +unsigned int +StreamingManager<TImage>::EstimateOptimalNumberOfDivisions(itk::DataObject * input, const RegionType ®ion) +{ + otbMsgDevMacro(<< "m_AvailableRAMInMB " << m_AvailableRAMInMB) + unsigned int availableRAMInBytes = m_AvailableRAMInMB * 1024 * 1024; + + if (availableRAMInBytes == 0) + { + otbMsgDevMacro(<< "Retrieving available RAM size from configuration") + // Retrieve it from the configuration + try + { + typedef otb::ConfigurationFile ConfigurationType; + ConfigurationType::Pointer conf = ConfigurationType::GetInstance(); + + availableRAMInBytes = conf->GetParameter<unsigned int>( + "OTB_STREAM_MAX_SIZE_BUFFER_FOR_STREAMING"); + } + catch(...) + { + // We should never have to go here if the configuration file is + // correct and found. + // In case it is not fallback on the cmake + // defined constants. + availableRAMInBytes = OTB_STREAM_MAX_SIZE_BUFFER_FOR_STREAMING; + } + } + + otbMsgDevMacro("RAM used to estimate memory footprint : " << availableRAMInBytes / 1024 / 1024 << " MB") + + otb::PipelineMemoryPrintCalculator::Pointer memoryPrintCalculator; + memoryPrintCalculator = otb::PipelineMemoryPrintCalculator::New(); + + memoryPrintCalculator->SetAvailableMemory( availableRAMInBytes ); + + // Trick to avoid having the resampler compute the whole + // deformation field + double regionTrickFactor = 1; + ImageType* inputImage = dynamic_cast<ImageType*>(input); + //inputImage = 0; + if (inputImage) + { + + typedef itk::ExtractImageFilter<ImageType,ImageType> ExtractFilterType; + typename ExtractFilterType::Pointer extractFilter = ExtractFilterType::New(); + extractFilter->SetInput(inputImage); + + // Define a small region to run the memory footprint estimation, + // around the image center, 100 pixels wide in each dimension + SizeType smallSize; + smallSize.Fill(100); + IndexType index; + index[0] = region.GetIndex()[0] + region.GetSize()[0]/2 - 50; + index[1] = region.GetIndex()[1] + region.GetSize()[1]/2 - 50; + + RegionType smallRegion; + smallRegion.SetSize(smallSize); + smallRegion.SetIndex(index); + + // In case the image is smaller than 100 pixels in a direction + smallRegion.Crop(region); + + extractFilter->SetExtractionRegion(smallRegion); + + bool smallRegionSuccess = smallRegion.Crop(region); + + if (smallRegionSuccess) + { + otbMsgDevMacro("Using an extract to estimate memory : " << smallRegion) + // the region is well behaved, inside the largest possible region + memoryPrintCalculator->SetDataToWrite(extractFilter->GetOutput()); + regionTrickFactor = static_cast<double>( region.GetNumberOfPixels() ) + / static_cast<double>(smallRegion.GetNumberOfPixels()); + + memoryPrintCalculator->SetBiasCorrectionFactor(regionTrickFactor); + } + else + { + otbMsgDevMacro("Using the input region to estimate memory : " << region) + // the region is not well behaved + // use the full region + memoryPrintCalculator->SetDataToWrite(input); + memoryPrintCalculator->SetBiasCorrectionFactor(1.0); + } + + memoryPrintCalculator->Compute(); + } + else + { + // Use the original object to estimate memory footprint + memoryPrintCalculator->SetDataToWrite(input); + memoryPrintCalculator->SetBiasCorrectionFactor(1.0); + + memoryPrintCalculator->Compute(); + } + + otbMsgDevMacro( "Estimated Memory print for the full image : " + << static_cast<unsigned int>(memoryPrintCalculator->GetMemoryPrint() / 1024 / 1024 ) << std::endl) + otbMsgDevMacro( "Optimal number of stream divisions: " + << memoryPrintCalculator->GetOptimalNumberOfStreamDivisions() << std::endl) + + return memoryPrintCalculator->GetOptimalNumberOfStreamDivisions(); +} + +template <class TImage> +StreamingManagement::StreamingMode +StreamingManager<TImage>::GetStreamingMode() +{ + return m_ActualMode; +} + +template <class TImage> +unsigned int +StreamingManager<TImage>::GetNumberOfSplits() +{ + return m_ComputedNumberOfSplits; +} + +template <class TImage> +typename StreamingManager<TImage>::RegionType +StreamingManager<TImage>::GetSplit(unsigned int i) +{ + return m_Splitter->GetSplit(i, m_ComputedNumberOfSplits, m_Region); +} + +} // End namespace otb + +#endif diff --git a/Code/IO/otbStreamingImageVirtualWriter.h b/Code/IO/otbStreamingImageVirtualWriter.h index 9ae59e8429..99e72c1c06 100644 --- a/Code/IO/otbStreamingImageVirtualWriter.h +++ b/Code/IO/otbStreamingImageVirtualWriter.h @@ -20,8 +20,7 @@ #include "itkMacro.h" #include "itkImageToImageFilter.h" -#include "itkImageRegionSplitter.h" -#include "otbStreamingTraits.h" +#include "otbStreamingManager.h" namespace otb { @@ -45,7 +44,7 @@ namespace otb * \sa PersistentStatisticsImageFilter * \sa PersistentImageStreamingDecorator. */ -template <class TInputImage> +template <class TInputImage, class TStreamingManager = StreamingManager<TInputImage> > class ITK_EXPORT StreamingImageVirtualWriter : public itk::ImageToImageFilter<TInputImage, TInputImage> { public: @@ -68,86 +67,62 @@ public: typedef typename InputImageType::PixelType InputImagePixelType; /** Streaming traits helper typedef */ - typedef StreamingTraits<InputImageType> StreamingTraitsType; + typedef TStreamingManager StreamingManagerType; + typedef typename StreamingManagerType::Pointer StreamingManagerPointerType; /** Dimension of input image. */ itkStaticConstMacro(InputImageDimension, unsigned int, InputImageType::ImageDimension); - /** Set/Get the image input of this writer. */ - void SetInput(const InputImageType *input); - const InputImageType * GetInput(void); - const InputImageType * GetInput(unsigned int idx); + StreamingManagerType* GetStreamingManager(void) + { + return m_StreamingManager; + } - void SetNthInput(unsigned int idx, const InputImageType *input); - - - /** SmartPointer to a region splitting object */ - typedef itk::ImageRegionSplitter<itkGetStaticConstMacro(InputImageDimension)> SplitterType; - typedef typename SplitterType::Pointer RegionSplitterPointer; - - /** Set buffer memory size (in bytes) use to calculate the number of stream divisions */ - void SetBufferMemorySize(unsigned long); - - /** Set the buffer number of lines use to calculate the number of stream divisions */ - void SetBufferNumberOfLinesDivisions(unsigned long); - - /** The number of stream divisions is calculate by using - * OTB_STREAM_IMAGE_SIZE_TO_ACTIVATE_STREAMING and - * OTB_STREAM_MAX_SIZE_BUFFER_FOR_STREAMING cmake variables. - */ - void SetAutomaticNumberOfStreamDivisions(void); - - /** Set the tiling automatic mode for streaming division */ - void SetTilingStreamDivisions(void); - /** Choose number of divisions in tiling streaming division */ - void SetTilingStreamDivisions(unsigned long); - - /** Return the string to indicate the method use to calculate number of stream divisions. */ - std::string GetMethodUseToCalculateNumberOfStreamDivisions(void); - - /** Set the number of pieces to divide the input. The upstream pipeline - * will be executed this many times. */ - void SetNumberOfStreamDivisions(unsigned long); - - /** Get the number of pieces to divide the input. The upstream pipeline - * will be executed this many times. */ - unsigned long GetNumberOfStreamDivisions(void); - - /** Set the helper class for dividing the input into chunks. */ - itkSetObjectMacro(RegionSplitter, SplitterType); - - /** Get the helper class for dividing the input into chunks. */ - itkGetObjectMacro(RegionSplitter, SplitterType); - - /** Type use to define number of divisions */ - typedef StreamingMode CalculationDivisionEnumType; - - virtual void GenerateInputRequestedRegion(void); + void SetStreamingManager(StreamingManagerType* streamingManager) + { + m_StreamingManager = streamingManager; + } protected: StreamingImageVirtualWriter(); + virtual ~StreamingImageVirtualWriter(); + void PrintSelf(std::ostream& os, itk::Indent indent) const; virtual void GenerateData(void); + virtual void GenerateInputRequestedRegion(void); + private: StreamingImageVirtualWriter(const StreamingImageVirtualWriter &); //purposely not implemented void operator =(const StreamingImageVirtualWriter&); //purposely not implemented - /** This method calculate the number of stream divisions, by using the CalculationDivision type */ - unsigned long CalculateNumberOfStreamDivisions(void); + void ObserveSourceFilterProgress(itk::Object* object, const itk::EventObject & event ) + { + if (typeid(event) != typeid(itk::ProgressEvent)) + { + return; + } + + itk::ProcessObject* processObject = dynamic_cast<itk::ProcessObject*>(object); + if (processObject) + m_DivisionProgress = processObject->GetProgress(); + + this->UpdateFilterProgress(); + } - /** Use to define the method used to calculate number of divisions */ - unsigned long m_BufferMemorySize; - unsigned long m_BufferNumberOfLinesDivisions; - unsigned long m_NumberOfStreamDivisions; + void UpdateFilterProgress() + { + this->UpdateProgress( (m_DivisionProgress + m_CurrentDivision) / m_NumberOfDivisions ); + } - RegionSplitterPointer m_RegionSplitter; + unsigned int m_NumberOfDivisions; + unsigned int m_CurrentDivision; + float m_DivisionProgress; - /** Use to determine method of calculation number of divisions */ - CalculationDivisionEnumType m_CalculationDivision; + StreamingManagerPointerType m_StreamingManager; }; } // end namespace otb diff --git a/Code/IO/otbStreamingImageVirtualWriter.txx b/Code/IO/otbStreamingImageVirtualWriter.txx index ce0b593897..6ffd8621b5 100644 --- a/Code/IO/otbStreamingImageVirtualWriter.txx +++ b/Code/IO/otbStreamingImageVirtualWriter.txx @@ -19,231 +19,55 @@ #define __otbStreamingImageVirtualWriter_txx #include "otbStreamingImageVirtualWriter.h" -#include "itkCommand.h" -#include "itkImageRegionIterator.h" -#include "itkObjectFactoryBase.h" -#include "itkImageFileWriter.h" -#include "itkImageRegionMultidimensionalSplitter.h" - #include "otbMacro.h" #include "otbConfigure.h" +#include "itkCommand.h" namespace otb { -/** - * - */ -template <class TInputImage> -StreamingImageVirtualWriter<TInputImage> + +template <class TInputImage, class TStreamingManager> +StreamingImageVirtualWriter<TInputImage,TStreamingManager> ::StreamingImageVirtualWriter() { - m_BufferMemorySize = 0; - m_BufferNumberOfLinesDivisions = 0; - // default to 10 divisions - m_NumberOfStreamDivisions = 0; - // default to AUTOMATIC_NUMBER_OF_DIVISIONS - m_CalculationDivision = SET_AUTOMATIC_NUMBER_OF_STREAM_DIVISIONS; - - // create default region splitter - m_RegionSplitter = itk::ImageRegionSplitter<InputImageDimension>::New(); + m_StreamingManager = StreamingManagerType::New(); } -/** - * - */ -template <class TInputImage> -StreamingImageVirtualWriter<TInputImage> +template <class TInputImage, class TStreamingManager> +StreamingImageVirtualWriter<TInputImage,TStreamingManager> ::~StreamingImageVirtualWriter() { } -/** - * - */ -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::SetBufferMemorySize(unsigned long memory_size_divisions) -{ - m_BufferMemorySize = memory_size_divisions; - m_CalculationDivision = SET_BUFFER_MEMORY_SIZE; - this->Modified(); -} - -/** - * - */ -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::SetBufferNumberOfLinesDivisions(unsigned long nb_lines_divisions) -{ - m_BufferNumberOfLinesDivisions = nb_lines_divisions; - m_CalculationDivision = SET_BUFFER_NUMBER_OF_LINES; - this->Modified(); -} - -/** - * - */ -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::SetNumberOfStreamDivisions(unsigned long nb_divisions) -{ - m_NumberOfStreamDivisions = nb_divisions; - m_CalculationDivision = SET_NUMBER_OF_STREAM_DIVISIONS; - this->Modified(); -} - -/** - * - */ -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::SetAutomaticNumberOfStreamDivisions(void) -{ - m_CalculationDivision = SET_AUTOMATIC_NUMBER_OF_STREAM_DIVISIONS; - this->Modified(); -} - -/** - * - */ -template <class TInputImage> +template <class TInputImage, class TStreamingManager> void -StreamingImageVirtualWriter<TInputImage> -::SetTilingStreamDivisions(void) -{ - m_CalculationDivision = SET_TILING_WITH_SET_AUTOMATIC_NUMBER_OF_STREAM_DIVISIONS; - m_RegionSplitter = itk::ImageRegionMultidimensionalSplitter<InputImageDimension>::New(); - this->Modified(); -} - -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::SetTilingStreamDivisions(unsigned long nb_divisions) -{ - m_CalculationDivision = SET_TILING_WITH_SET_NUMBER_OF_STREAM_DIVISIONS; - m_NumberOfStreamDivisions = nb_divisions; - m_RegionSplitter = itk::ImageRegionMultidimensionalSplitter<InputImageDimension>::New(); - this->Modified(); -} - -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::SetInput(const InputImageType *input) -{ - this->SetNthInput(0, input); -} - -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::SetNthInput(unsigned int idx, const InputImageType *input) -{ - // ProcessObject is not const_correct so this cast is required here. - this->itk::ProcessObject::SetNthInput(idx, - const_cast<TInputImage *>(input)); -} - -template <class TInputImage> -const typename StreamingImageVirtualWriter<TInputImage>::InputImageType * -StreamingImageVirtualWriter<TInputImage> -::GetInput(void) -{ - if (this->GetNumberOfInputs() < 1) - { - return 0; - } - - return static_cast<TInputImage*> - (this->itk::ProcessObject::GetInput(0)); -} - -template <class TInputImage> -const typename StreamingImageVirtualWriter<TInputImage>::InputImageType * -StreamingImageVirtualWriter<TInputImage> -::GetInput(unsigned int idx) +StreamingImageVirtualWriter<TInputImage,TStreamingManager> +::PrintSelf(std::ostream& os, itk::Indent indent) const { - return static_cast<TInputImage*> (this->itk::ProcessObject::GetInput(idx)); + Superclass::PrintSelf(os, indent); } -template <class TInputImage> +template <class TInputImage, class TStreamingManager> void -StreamingImageVirtualWriter<TInputImage> +StreamingImageVirtualWriter<TInputImage,TStreamingManager> ::GenerateInputRequestedRegion(void) { InputImagePointer inputPtr = const_cast<InputImageType *>(this->GetInput(0)); + + InputImageRegionType region; typename InputImageRegionType::SizeType size; typename InputImageRegionType::IndexType index; - InputImageRegionType region; + index.Fill(0); size.Fill(0); region.SetSize(size); region.SetIndex(index); inputPtr->SetRequestedRegion(region); } -/** - * - */ -template <class TInputImage> -unsigned long -StreamingImageVirtualWriter<TInputImage> -::GetNumberOfStreamDivisions(void) -{ - return (CalculateNumberOfStreamDivisions()); -} -template<class TInputImage> -unsigned long -StreamingImageVirtualWriter<TInputImage> -::CalculateNumberOfStreamDivisions(void) -{ - return StreamingTraitsType - ::CalculateNumberOfStreamDivisions(this->GetInput(), - this->GetInput()->GetLargestPossibleRegion(), - m_RegionSplitter, - m_CalculationDivision, - m_NumberOfStreamDivisions, - m_BufferMemorySize, - m_BufferNumberOfLinesDivisions); -} -/** - * - */ -template <class TInputImage> -std::string -StreamingImageVirtualWriter<TInputImage> -::GetMethodUseToCalculateNumberOfStreamDivisions(void) -{ - return (StreamingTraitsType::GetMethodUseToCalculateNumberOfStreamDivisions(m_CalculationDivision)); -} -/** - * - */ -template <class TInputImage> -void -StreamingImageVirtualWriter<TInputImage> -::PrintSelf(std::ostream& os, itk::Indent indent) const -{ - Superclass::PrintSelf(os, indent); - os << indent << "Number of stream divisions: " << m_NumberOfStreamDivisions - << std::endl; - if (m_RegionSplitter) - { - os << indent << "Region splitter:" << m_RegionSplitter << std::endl; - } - else - { - os << indent << "Region splitter: (none)" << std::endl; - } -} -template<class TInputImage> + +template<class TInputImage, class TStreamingManager> void -StreamingImageVirtualWriter<TInputImage> +StreamingImageVirtualWriter<TInputImage,TStreamingManager> ::GenerateData(void) { /** @@ -257,6 +81,7 @@ StreamingImageVirtualWriter<TInputImage> * Tell all Observers that the filter is starting */ this->InvokeEvent(itk::StartEvent()); + /** * Grab the input */ @@ -267,25 +92,47 @@ StreamingImageVirtualWriter<TInputImage> * minimum of what the user specified via SetNumberOfStreamDivisions() * and what the Splitter thinks is a reasonable value. */ - unsigned int numDivisions; - numDivisions = static_cast<unsigned int>(CalculateNumberOfStreamDivisions()); + m_StreamingManager->PrepareStreaming(inputPtr, outputRegion); + m_NumberOfDivisions = m_StreamingManager->GetNumberOfSplits(); + + /** + * Register to the ProgressEvent of the source filter + */ + // Get the source process object + itk::ProcessObject* source = inputPtr->GetSource(); + + // Check if source exists + if(source) + { + typedef itk::MemberCommand<Self> CommandType; + typedef typename CommandType::Pointer CommandPointerType; + + CommandPointerType command = CommandType::New(); + command->SetCallbackFunction(this, &Self::ObserveSourceFilterProgress); + + source->AddObserver(itk::ProgressEvent(), command); + } + else + { + itkWarningMacro(<< "Could not get the source process object. Progress report might be buggy"); + } /** * Loop over the number of pieces, execute the upstream pipeline on each * piece, and copy the results into the output image. */ InputImageRegionType streamRegion; - unsigned int piece; - for (piece = 0; - piece < numDivisions && !this->GetAbortGenerateData(); - piece++) + for (m_CurrentDivision = 0; + m_CurrentDivision < m_NumberOfDivisions && !this->GetAbortGenerateData(); + m_CurrentDivision++, m_DivisionProgress = 0, this->UpdateFilterProgress()) { - streamRegion = m_RegionSplitter->GetSplit(piece, numDivisions, outputRegion); + streamRegion = m_StreamingManager->GetSplit(m_CurrentDivision); + otbMsgDevMacro(<< "Processing region : " << streamRegion ) inputPtr->ReleaseData(); inputPtr->SetRequestedRegion(streamRegion); inputPtr->Update(); - this->UpdateProgress((float) piece / numDivisions); } + /** * If we ended due to aborting, push the progress up to 1.0 (since * it probably didn't end there) @@ -308,11 +155,14 @@ StreamingImageVirtualWriter<TInputImage> this->GetOutput(idx)->DataHasBeenGenerated(); } } + /** * Release any inputs if marked for release */ this->ReleaseInputs(); } + + } // end namespace otb #endif diff --git a/Testing/Code/BasicFilters/otbMatrixTransposeMatrixImageFilter.cxx b/Testing/Code/BasicFilters/otbMatrixTransposeMatrixImageFilter.cxx index beebc71c12..2d9a433f50 100644 --- a/Testing/Code/BasicFilters/otbMatrixTransposeMatrixImageFilter.cxx +++ b/Testing/Code/BasicFilters/otbMatrixTransposeMatrixImageFilter.cxx @@ -50,7 +50,7 @@ int otbMatrixTransposeMatrixImageFilter(int argc, char * argv[]) reader2->SetFileName(infname2); // filter->SetStreamingMode(otb::SET_NUMBER_OF_STREAM_DIVISIONS); - filter->GetStreamer()->SetNumberOfStreamDivisions(200); + //filter->GetStreamer()->SetNumberOfStreamDivisions(200); filter->SetFirstInput(reader1->GetOutput()); filter->SetSecondInput(reader2->GetOutput()); filter->SetUsePadFirstInput(true); diff --git a/Testing/Code/BasicFilters/otbStreamingInnerProductVectorImageFilter.cxx b/Testing/Code/BasicFilters/otbStreamingInnerProductVectorImageFilter.cxx index 2987a0f636..bf4c392613 100644 --- a/Testing/Code/BasicFilters/otbStreamingInnerProductVectorImageFilter.cxx +++ b/Testing/Code/BasicFilters/otbStreamingInnerProductVectorImageFilter.cxx @@ -40,7 +40,7 @@ int otbStreamingInnerProductVectorImageFilter(int argc, char* argv[]) // Instantiation object FilterType::Pointer filter = FilterType::New(); - filter->GetStreamer()->SetNumberOfStreamDivisions(10); + //filter->GetStreamer()->SetNumberOfStreamDivisions(10); filter->SetCenterData(centerdata); filter->SetInput(reader->GetOutput()); filter->Update(); diff --git a/Testing/Code/BasicFilters/otbStreamingMinMaxImageFilter.cxx b/Testing/Code/BasicFilters/otbStreamingMinMaxImageFilter.cxx index 0a4493c830..5f59b2d7f7 100644 --- a/Testing/Code/BasicFilters/otbStreamingMinMaxImageFilter.cxx +++ b/Testing/Code/BasicFilters/otbStreamingMinMaxImageFilter.cxx @@ -43,7 +43,7 @@ int otbStreamingMinMaxImageFilter(int argc, char * argv[]) reader->SetFileName(infname); //filter->SetStreamingMode(otb::SET_NUMBER_OF_STREAM_DIVISIONS); - filter->GetStreamer()->SetNumberOfStreamDivisions(200); + //filter->GetStreamer()->SetNumberOfStreamDivisions(200); filter->SetInput(reader->GetOutput()); otb::StandardFilterWatcher watcher(filter, "Min Max Computation"); filter->Update(); diff --git a/Testing/Code/BasicFilters/otbStreamingMinMaxVectorImageFilter.cxx b/Testing/Code/BasicFilters/otbStreamingMinMaxVectorImageFilter.cxx index 778df22c3a..f8f2e265fe 100644 --- a/Testing/Code/BasicFilters/otbStreamingMinMaxVectorImageFilter.cxx +++ b/Testing/Code/BasicFilters/otbStreamingMinMaxVectorImageFilter.cxx @@ -43,7 +43,7 @@ int otbStreamingMinMaxVectorImageFilter(int argc, char * argv[]) reader->SetFileName(infname); //filter->SetStreamingMode(otb::SET_NUMBER_OF_STREAM_DIVISIONS); - filter->GetStreamer()->SetNumberOfStreamDivisions(200); + //filter->GetStreamer()->SetNumberOfStreamDivisions(200); filter->SetInput(reader->GetOutput()); otb::StandardFilterWatcher watcher(filter, "Min Max Computation"); filter->Update(); diff --git a/Testing/Code/BasicFilters/otbStreamingStatisticsImageFilter.cxx b/Testing/Code/BasicFilters/otbStreamingStatisticsImageFilter.cxx index adb2fc6c71..22fae71fb8 100644 --- a/Testing/Code/BasicFilters/otbStreamingStatisticsImageFilter.cxx +++ b/Testing/Code/BasicFilters/otbStreamingStatisticsImageFilter.cxx @@ -43,7 +43,7 @@ int otbStreamingStatisticsImageFilter(int argc, char * argv[]) reader->SetFileName(infname); //filter->GetStreamer()->SetStreamingMode(otb::SET_NUMBER_OF_STREAM_DIVISIONS); - filter->GetStreamer()->SetNumberOfStreamDivisions(200); + //filter->GetStreamer()->SetNumberOfStreamDivisions(200); filter->SetInput(reader->GetOutput()); filter->Update(); diff --git a/Testing/Code/BasicFilters/otbStreamingStatisticsVectorImageFilter.cxx b/Testing/Code/BasicFilters/otbStreamingStatisticsVectorImageFilter.cxx index e0c312c80b..8316823fb3 100644 --- a/Testing/Code/BasicFilters/otbStreamingStatisticsVectorImageFilter.cxx +++ b/Testing/Code/BasicFilters/otbStreamingStatisticsVectorImageFilter.cxx @@ -42,7 +42,7 @@ int otbStreamingStatisticsVectorImageFilter(int argc, char * argv[]) reader->SetFileName(infname); //filter->SetStreamingMode(otb::SET_NUMBER_OF_STREAM_DIVISIONS); - filter->GetStreamer()->SetNumberOfStreamDivisions(200); + //filter->GetStreamer()->SetNumberOfStreamDivisions(200); filter->SetInput(reader->GetOutput()); filter->Update(); -- GitLab