| /* |
| Copyright 2005-2010 Intel Corporation. All Rights Reserved. |
| |
| This file is part of Threading Building Blocks. |
| |
| Threading Building Blocks is free software; you can redistribute it |
| and/or modify it under the terms of the GNU General Public License |
| version 2 as published by the Free Software Foundation. |
| |
| Threading Building Blocks is distributed in the hope that it will be |
| useful, but WITHOUT ANY WARRANTY; without even the implied warranty |
| of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| GNU General Public License for more details. |
| |
| You should have received a copy of the GNU General Public License |
| along with Threading Building Blocks; if not, write to the Free Software |
| Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
| |
| As a special exception, you may use this file as part of a free software |
| library without restriction. Specifically, if other files instantiate |
| templates or use macros or inline functions from this file, or you compile |
| this file and link it with other files to produce an executable, this |
| file does not by itself cause the resulting executable to be covered by |
| the GNU General Public License. This exception does not however |
| invalidate any other reasons why the executable file might be covered by |
| the GNU General Public License. |
| */ |
| |
| #include "tbb/pipeline.h" |
| #include "tbb/spin_mutex.h" |
| #include "tbb/cache_aligned_allocator.h" |
| #include "itt_notify.h" |
| |
| |
| namespace tbb { |
| |
| namespace internal { |
| |
| //! This structure is used to store task information in a input buffer |
| struct task_info { |
| void* my_object; |
| //! Invalid unless a task went through an ordered stage. |
| Token my_token; |
| //! False until my_token is set. |
| bool my_token_ready; |
| //! True if my_object is valid. |
| bool is_valid; |
| //! Set to initial state (no object, no token) |
| void reset() { |
| my_object = NULL; |
| my_token = 0; |
| my_token_ready = false; |
| is_valid = false; |
| } |
| }; |
| //! A buffer of input items for a filter. |
| /** Each item is a task_info, inserted into a position in the buffer corresponding to a Token. */ |
| class input_buffer { |
| friend class tbb::internal::pipeline_root_task; |
| friend class tbb::thread_bound_filter; |
| |
| typedef Token size_type; |
| |
| //! Array of deferred tasks that cannot yet start executing. |
| task_info* array; |
| |
| //! Size of array |
| /** Always 0 or a power of 2 */ |
| size_type array_size; |
| |
| //! Lowest token that can start executing. |
| /** All prior Token have already been seen. */ |
| Token low_token; |
| |
| //! Serializes updates. |
| spin_mutex array_mutex; |
| |
| //! Resize "array". |
| /** Caller is responsible to acquiring a lock on "array_mutex". */ |
| void grow( size_type minimum_size ); |
| |
| //! Initial size for "array" |
| /** Must be a power of 2 */ |
| static const size_type initial_buffer_size = 4; |
| |
| //! Used only for out of order buffer. |
| Token high_token; |
| |
| //! True for ordered filter, false otherwise. |
| bool is_ordered; |
| |
| //! True for thread-bound filter, false otherwise. |
| bool is_bound; |
| public: |
| //! Construct empty buffer. |
| input_buffer( bool is_ordered_, bool is_bound_ ) : |
| array(NULL), array_size(0), |
| low_token(0), high_token(0), |
| is_ordered(is_ordered_), is_bound(is_bound_) { |
| grow(initial_buffer_size); |
| __TBB_ASSERT( array, NULL ); |
| } |
| |
| //! Destroy the buffer. |
| ~input_buffer() { |
| __TBB_ASSERT( array, NULL ); |
| cache_aligned_allocator<task_info>().deallocate(array,array_size); |
| poison_pointer( array ); |
| } |
| |
| //! Put a token into the buffer. |
| /** If task information was placed into buffer, returns true; |
| otherwise returns false, informing the caller to create and spawn a task. |
| */ |
| // Using template to avoid explicit dependency on stage_task |
| template<typename StageTask> |
| bool put_token( StageTask& putter ) { |
| { |
| spin_mutex::scoped_lock lock( array_mutex ); |
| Token token; |
| if( is_ordered ) { |
| if( !putter.my_token_ready ) { |
| putter.my_token = high_token++; |
| putter.my_token_ready = true; |
| } |
| token = putter.my_token; |
| } else |
| token = high_token++; |
| __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL ); |
| if( token!=low_token || is_bound ) { |
| // Trying to put token that is beyond low_token. |
| // Need to wait until low_token catches up before dispatching. |
| if( token-low_token>=array_size ) |
| grow( token-low_token+1 ); |
| ITT_NOTIFY( sync_releasing, this ); |
| putter.put_task_info(array[token&(array_size-1)]); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| //! Note that processing of a token is finished. |
| /** Fires up processing of the next token, if processing was deferred. */ |
| // Using template to avoid explicit dependency on stage_task |
| template<typename StageTask> |
| void note_done( Token token, StageTask& spawner ) { |
| task_info wakee; |
| wakee.reset(); |
| { |
| spin_mutex::scoped_lock lock( array_mutex ); |
| if( !is_ordered || token==low_token ) { |
| // Wake the next task |
| task_info& item = array[++low_token & (array_size-1)]; |
| ITT_NOTIFY( sync_acquired, this ); |
| wakee = item; |
| item.is_valid = false; |
| } |
| } |
| if( wakee.is_valid ) |
| spawner.spawn_stage_task(wakee); |
| } |
| |
| #if __TBB_TASK_GROUP_CONTEXT |
| //! The method destroys all data in filters to prevent memory leaks |
| void clear( filter* my_filter ) { |
| long t=low_token; |
| for( size_type i=0; i<array_size; ++i, ++t ){ |
| task_info& temp = array[t&(array_size-1)]; |
| if (temp.is_valid ) { |
| my_filter->finalize(temp.my_object); |
| temp.is_valid = false; |
| } |
| } |
| } |
| #endif |
| |
| bool return_item(task_info& info, bool advance) { |
| spin_mutex::scoped_lock lock( array_mutex ); |
| task_info& item = array[low_token&(array_size-1)]; |
| ITT_NOTIFY( sync_acquired, this ); |
| if( item.is_valid ) { |
| info = item; |
| item.is_valid = false; |
| if (advance) low_token++; |
| return true; |
| } |
| return false; |
| } |
| |
| void put_item( task_info& info ) { |
| info.is_valid = true; |
| spin_mutex::scoped_lock lock( array_mutex ); |
| Token token; |
| if( is_ordered ) { |
| if( !info.my_token_ready ) { |
| info.my_token = high_token++; |
| info.my_token_ready = true; |
| } |
| token = info.my_token; |
| } else |
| token = high_token++; |
| __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL ); |
| if( token-low_token>=array_size ) |
| grow( token-low_token+1 ); |
| ITT_NOTIFY( sync_releasing, this ); |
| array[token&(array_size-1)] = info; |
| } |
| }; |
| |
| void input_buffer::grow( size_type minimum_size ) { |
| size_type old_size = array_size; |
| size_type new_size = old_size ? 2*old_size : initial_buffer_size; |
| while( new_size<minimum_size ) |
| new_size*=2; |
| task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size); |
| task_info* old_array = array; |
| for( size_type i=0; i<new_size; ++i ) |
| new_array[i].is_valid = false; |
| long t=low_token; |
| for( size_type i=0; i<old_size; ++i, ++t ) |
| new_array[t&(new_size-1)] = old_array[t&(old_size-1)]; |
| array = new_array; |
| array_size = new_size; |
| if( old_array ) |
| cache_aligned_allocator<task_info>().deallocate(old_array,old_size); |
| } |
| |
| class stage_task: public task, public task_info { |
| private: |
| friend class tbb::pipeline; |
| pipeline& my_pipeline; |
| filter* my_filter; |
| //! True if this task has not yet read the input. |
| bool my_at_start; |
| public: |
| //! Construct stage_task for first stage in a pipeline. |
| /** Such a stage has not read any input yet. */ |
| stage_task( pipeline& pipeline ) : |
| my_pipeline(pipeline), |
| my_filter(pipeline.filter_list), |
| my_at_start(true) |
| { |
| task_info::reset(); |
| } |
| //! Construct stage_task for a subsequent stage in a pipeline. |
| stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) : |
| task_info(info), |
| my_pipeline(pipeline), |
| my_filter(filter_), |
| my_at_start(false) |
| {} |
| //! Roughly equivalent to the constructor of input stage task |
| void reset() { |
| task_info::reset(); |
| my_filter = my_pipeline.filter_list; |
| my_at_start = true; |
| } |
| //! The virtual task execution method |
| /*override*/ task* execute(); |
| #if __TBB_TASK_GROUP_CONTEXT |
| ~stage_task() |
| { |
| if (my_filter && my_object && (my_filter->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) { |
| __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled"); |
| my_filter->finalize(my_object); |
| my_object = NULL; |
| } |
| } |
| #endif // __TBB_TASK_GROUP_CONTEXT |
| //! Creates and spawns stage_task from task_info |
| void spawn_stage_task(const task_info& info) |
| { |
| stage_task* clone = new (allocate_additional_child_of(*parent())) |
| stage_task( my_pipeline, my_filter, info ); |
| spawn(*clone); |
| } |
| //! Puts current task information |
| void put_task_info(task_info &where_to_put ) { |
| where_to_put.my_object = my_object; |
| where_to_put.my_token = my_token; |
| where_to_put.my_token_ready = my_token_ready; |
| where_to_put.is_valid = true; |
| } |
| }; |
| |
| task* stage_task::execute() { |
| __TBB_ASSERT( !my_at_start || !my_object, NULL ); |
| __TBB_ASSERT( !my_filter->is_bound(), NULL ); |
| if( my_at_start ) { |
| if( my_filter->is_serial() ) { |
| my_object = (*my_filter)(my_object); |
| if( my_object ) { |
| if( my_filter->is_ordered() ) { |
| my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics |
| my_token_ready = true; |
| } else if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) { |
| if( my_pipeline.has_thread_bound_filters ) |
| my_pipeline.token_counter++; // ideally, with relaxed semantics |
| } |
| if( !my_filter->next_filter_in_pipeline ) { |
| reset(); |
| goto process_another_stage; |
| } else { |
| ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens ); |
| if( --my_pipeline.input_tokens>0 ) |
| spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) ); |
| } |
| } else { |
| my_pipeline.end_of_input = true; |
| return NULL; |
| } |
| } else /*not is_serial*/ { |
| if( my_pipeline.end_of_input ) |
| return NULL; |
| if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) { |
| if( my_pipeline.has_thread_bound_filters ) |
| my_pipeline.token_counter++; |
| } |
| ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens ); |
| if( --my_pipeline.input_tokens>0 ) |
| spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) ); |
| my_object = (*my_filter)(my_object); |
| if( !my_object ) { |
| my_pipeline.end_of_input = true; |
| if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) { |
| if( my_pipeline.has_thread_bound_filters ) |
| my_pipeline.token_counter--; |
| } |
| return NULL; |
| } |
| } |
| my_at_start = false; |
| } else { |
| my_object = (*my_filter)(my_object); |
| if( my_filter->is_serial() ) |
| my_filter->my_input_buffer->note_done(my_token, *this); |
| } |
| my_filter = my_filter->next_filter_in_pipeline; |
| if( my_filter ) { |
| // There is another filter to execute. |
| // Crank up priority a notch. |
| add_to_depth(1); |
| if( my_filter->is_serial() ) { |
| // The next filter must execute tokens in order |
| if( my_filter->my_input_buffer->put_token(*this) ){ |
| // Can't proceed with the same item |
| if( my_filter->is_bound() ) { |
| // Find the next non-thread-bound filter |
| do { |
| my_filter = my_filter->next_filter_in_pipeline; |
| } while( my_filter && my_filter->is_bound() ); |
| // Check if there is an item ready to process |
| if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()) ) |
| goto process_another_stage; |
| } |
| my_filter = NULL; // To prevent deleting my_object twice if exception occurs |
| return NULL; |
| } |
| } |
| } else { |
| // Reached end of the pipe. |
| if( ++my_pipeline.input_tokens>1 || my_pipeline.end_of_input || my_pipeline.filter_list->is_bound() ) |
| return NULL; // No need to recycle for new input |
| ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens ); |
| // Recycle as an input stage task. |
| reset(); |
| } |
| process_another_stage: |
| /* A semi-hackish way to reexecute the same task object immediately without spawning. |
| recycle_as_continuation marks the task for future execution, |
| and then 'this' pointer is returned to bypass spawning. */ |
| recycle_as_continuation(); |
| return this; |
| } |
| |
| class pipeline_root_task: public task { |
| pipeline& my_pipeline; |
| bool do_segment_scanning; |
| |
| /*override*/ task* execute() { |
| if( !my_pipeline.end_of_input ) |
| if( !my_pipeline.filter_list->is_bound() ) |
| if( my_pipeline.input_tokens > 0 ) { |
| recycle_as_continuation(); |
| set_ref_count(1); |
| return new( allocate_child() ) stage_task( my_pipeline ); |
| } |
| if( do_segment_scanning ) { |
| filter* current_filter = my_pipeline.filter_list->next_segment; |
| /* first non-thread-bound filter that follows thread-bound one |
| and may have valid items to process */ |
| filter* first_suitable_filter = current_filter; |
| while( current_filter ) { |
| __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" ); |
| __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" ); |
| if( !my_pipeline.end_of_input |
| || (tokendiff_t)(my_pipeline.token_counter - current_filter->my_input_buffer->low_token) > 0 ) |
| { |
| task_info info; |
| info.reset(); |
| if( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) { |
| set_ref_count(1); |
| recycle_as_continuation(); |
| return new( allocate_child() ) stage_task( my_pipeline, current_filter, info); |
| } |
| current_filter = current_filter->next_segment; |
| if( !current_filter ) { |
| if( !my_pipeline.end_of_input ) { |
| recycle_as_continuation(); |
| return this; |
| } |
| current_filter = first_suitable_filter; |
| __TBB_Yield(); |
| } |
| } else { |
| /* The preceding pipeline segment is empty. |
| Fast-forward to the next post-TBF segment. */ |
| first_suitable_filter = first_suitable_filter->next_segment; |
| current_filter = first_suitable_filter; |
| } |
| } /* end of while */ |
| return NULL; |
| } else { |
| if( !my_pipeline.end_of_input ) { |
| recycle_as_continuation(); |
| return this; |
| } |
| return NULL; |
| } |
| } |
| public: |
| pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false) |
| { |
| __TBB_ASSERT( my_pipeline.filter_list, NULL ); |
| filter* first = my_pipeline.filter_list; |
| if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) { |
| // Scanning the pipeline for segments |
| filter* head_of_previous_segment = first; |
| for( filter* subfilter=first->next_filter_in_pipeline; |
| subfilter!=NULL; |
| subfilter=subfilter->next_filter_in_pipeline ) |
| { |
| if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) { |
| do_segment_scanning = true; |
| head_of_previous_segment->next_segment = subfilter; |
| head_of_previous_segment = subfilter; |
| } |
| } |
| } |
| } |
| }; |
| |
| #if _MSC_VER && !defined(__INTEL_COMPILER) |
| // Workaround for overzealous compiler warnings |
| // Suppress compiler warning about constant conditional expression |
| #pragma warning (disable: 4127) |
| #endif |
| |
| // The class destroys end_counter and clears all input buffers if pipeline was cancelled. |
| class pipeline_cleaner: internal::no_copy { |
| pipeline& my_pipeline; |
| public: |
| pipeline_cleaner(pipeline& _pipeline) : |
| my_pipeline(_pipeline) |
| {} |
| ~pipeline_cleaner(){ |
| #if __TBB_TASK_GROUP_CONTEXT |
| if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled |
| my_pipeline.clear_filters(); |
| #endif |
| my_pipeline.end_counter = NULL; |
| } |
| }; |
| |
| } // namespace internal |
| |
| void pipeline::inject_token( task& ) { |
| __TBB_ASSERT(0,"illegal call to inject_token"); |
| } |
| |
| #if __TBB_TASK_GROUP_CONTEXT |
| void pipeline::clear_filters() { |
| for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) { |
| if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) |
| if( internal::input_buffer* b = f->my_input_buffer ) |
| b->clear(f); |
| } |
| } |
| #endif |
| |
| pipeline::pipeline() : |
| filter_list(NULL), |
| filter_end(NULL), |
| end_counter(NULL), |
| end_of_input(false), |
| has_thread_bound_filters(false) |
| { |
| token_counter = 0; |
| input_tokens = 0; |
| } |
| |
| pipeline::~pipeline() { |
| clear(); |
| } |
| |
| void pipeline::clear() { |
| filter* next; |
| for( filter* f = filter_list; f; f=next ) { |
| if( internal::input_buffer* b = f->my_input_buffer ) { |
| delete b; |
| f->my_input_buffer = NULL; |
| } |
| next=f->next_filter_in_pipeline; |
| f->next_filter_in_pipeline = filter::not_in_pipeline(); |
| if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) { |
| f->prev_filter_in_pipeline = filter::not_in_pipeline(); |
| f->my_pipeline = NULL; |
| } |
| if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) ) |
| f->next_segment = NULL; |
| } |
| filter_list = filter_end = NULL; |
| } |
| |
| void pipeline::add_filter( filter& filter_ ) { |
| #if TBB_USE_ASSERT |
| if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) |
| __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" ); |
| __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" ); |
| __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" ); |
| #endif |
| if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) { |
| filter_.my_pipeline = this; |
| filter_.prev_filter_in_pipeline = filter_end; |
| if ( filter_list == NULL) |
| filter_list = &filter_; |
| else |
| filter_end->next_filter_in_pipeline = &filter_; |
| filter_.next_filter_in_pipeline = NULL; |
| filter_end = &filter_; |
| } |
| else |
| { |
| if( !filter_end ) |
| filter_end = reinterpret_cast<filter*>(&filter_list); |
| |
| *reinterpret_cast<filter**>(filter_end) = &filter_; |
| filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline); |
| *reinterpret_cast<filter**>(filter_end) = NULL; |
| } |
| if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) { |
| if( filter_.is_serial() ) { |
| if( filter_.is_bound() ) |
| has_thread_bound_filters = true; |
| filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() ); |
| } |
| else { |
| if( filter_.prev_filter_in_pipeline && filter_.prev_filter_in_pipeline->is_bound() ) |
| filter_.my_input_buffer = new internal::input_buffer( false, false ); |
| } |
| } else { |
| if( filter_.is_serial() ) { |
| filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false ); |
| } |
| } |
| |
| } |
| |
| void pipeline::remove_filter( filter& filter_ ) { |
| if (&filter_ == filter_list) |
| filter_list = filter_.next_filter_in_pipeline; |
| else { |
| __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" ); |
| filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline; |
| } |
| if (&filter_ == filter_end) |
| filter_end = filter_.prev_filter_in_pipeline; |
| else { |
| __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" ); |
| filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline; |
| } |
| if( internal::input_buffer* b = filter_.my_input_buffer ) { |
| delete b; |
| filter_.my_input_buffer = NULL; |
| } |
| filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline(); |
| if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) ) |
| filter_.next_segment = NULL; |
| filter_.my_pipeline = NULL; |
| } |
| |
| void pipeline::run( size_t max_number_of_live_tokens |
| #if __TBB_TASK_GROUP_CONTEXT |
| , tbb::task_group_context& context |
| #endif |
| ) { |
| __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" ); |
| __TBB_ASSERT( !end_counter, "pipeline already running?" ); |
| if( filter_list ) { |
| internal::pipeline_cleaner my_pipeline_cleaner(*this); |
| end_of_input = false; |
| #if __TBB_TASK_GROUP_CONTEXT |
| end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this ); |
| #else |
| end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this ); |
| #endif |
| input_tokens = internal::Token(max_number_of_live_tokens); |
| // Start execution of tasks |
| task::spawn_root_and_wait( *end_counter ); |
| } |
| } |
| |
| #if __TBB_TASK_GROUP_CONTEXT |
| void pipeline::run( size_t max_number_of_live_tokens ) { |
| if( filter_list ) { |
| // Construct task group context with the exception propagation mode expected |
| // by the pipeline caller. |
| uintptr_t ctx_traits = filter_list->my_filter_mode & filter::exact_exception_propagation ? |
| task_group_context::default_traits : |
| task_group_context::default_traits & ~task_group_context::exact_exception; |
| task_group_context context(task_group_context::bound, ctx_traits); |
| run(max_number_of_live_tokens, context); |
| } |
| } |
| #endif // __TBB_TASK_GROUP_CONTEXT |
| |
| filter::~filter() { |
| if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) { |
| if ( next_filter_in_pipeline != filter::not_in_pipeline() ) { |
| __TBB_ASSERT( prev_filter_in_pipeline != filter::not_in_pipeline(), "probably filter list is broken" ); |
| my_pipeline->remove_filter(*this); |
| } else |
| __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" ); |
| } else { |
| __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" ); |
| } |
| } |
| |
| thread_bound_filter::result_type thread_bound_filter::process_item() { |
| return internal_process_item(true); |
| } |
| |
| thread_bound_filter::result_type thread_bound_filter::try_process_item() { |
| return internal_process_item(false); |
| } |
| |
| thread_bound_filter::result_type thread_bound_filter::internal_process_item(bool is_blocking) { |
| internal::task_info info; |
| info.reset(); |
| |
| if( !prev_filter_in_pipeline ) { |
| if( my_pipeline->end_of_input ) |
| return end_of_stream; |
| while( my_pipeline->input_tokens == 0 ) { |
| if( is_blocking ) |
| __TBB_Yield(); |
| else |
| return item_not_available; |
| } |
| info.my_object = (*this)(info.my_object); |
| if( info.my_object ) { |
| my_pipeline->input_tokens--; |
| if( is_ordered() ) { |
| info.my_token = my_pipeline->token_counter; |
| info.my_token_ready = true; |
| } |
| my_pipeline->token_counter++; // ideally, with relaxed semantics |
| } else { |
| my_pipeline->end_of_input = true; |
| return end_of_stream; |
| } |
| } else { /* this is not an input filter */ |
| while( !my_input_buffer->return_item(info, /*advance=*/true) ) { |
| if( my_pipeline->end_of_input && my_input_buffer->low_token == my_pipeline->token_counter ) |
| return end_of_stream; |
| if( is_blocking ) |
| __TBB_Yield(); |
| else |
| return item_not_available; |
| } |
| info.my_object = (*this)(info.my_object); |
| } |
| if( next_filter_in_pipeline ) { |
| next_filter_in_pipeline->my_input_buffer->put_item(info); |
| } else { |
| my_pipeline->input_tokens++; |
| } |
| |
| return success; |
| } |
| |
| } // tbb |
| |