blob: c29eb89b06f5dcd121ba0b48b95ea3066c9e5a3b [file] [log] [blame]
/*
Copyright 2005-2010 Intel Corporation. All Rights Reserved.
This file is part of Threading Building Blocks.
Threading Building Blocks is free software; you can redistribute it
and/or modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation.
Threading Building Blocks is distributed in the hope that it will be
useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Threading Building Blocks; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
As a special exception, you may use this file as part of a free software
library without restriction. Specifically, if other files instantiate
templates or use macros or inline functions from this file, or you compile
this file and link it with other files to produce an executable, this
file does not by itself cause the resulting executable to be covered by
the GNU General Public License. This exception does not however
invalidate any other reasons why the executable file might be covered by
the GNU General Public License.
*/
#include "tbb/parallel_reduce.h"
#include "tbb/atomic.h"
#include "harness_assert.h"
using namespace std;
static tbb::atomic<long> ForkCount;
static tbb::atomic<long> FooBodyCount;
//! Class with public interface that is exactly minimal requirements for Range concept
class MinimalRange {
size_t begin, end;
friend class FooBody;
explicit MinimalRange( size_t i ) : begin(0), end(i) {}
friend void Flog( int nthread, bool inteference );
public:
MinimalRange( MinimalRange& r, tbb::split ) : end(r.end) {
begin = r.end = (r.begin+r.end)/2;
}
bool is_divisible() const {return end-begin>=2;}
bool empty() const {return begin==end;}
};
//! Class with public interface that is exactly minimal requirements for Body of a parallel_reduce
class FooBody {
private:
FooBody( const FooBody& ); // Deny access
void operator=( const FooBody& ); // Deny access
friend void Flog( int nthread, bool interference );
//! Parent that created this body via split operation. NULL if original body.
FooBody* parent;
//! Total number of index values processed by body and its children.
size_t sum;
//! Number of join operations done so far on this body and its children.
long join_count;
//! Range that has been processed so far by this body and its children.
size_t begin, end;
//! True if body has not yet been processed at least once by operator().
bool is_new;
//! 1 if body was created by split; 0 if original body.
int forked;
FooBody() {++FooBodyCount;}
public:
~FooBody() {
forked = 0xDEADBEEF;
sum=0xDEADBEEF;
join_count=0xDEADBEEF;
--FooBodyCount;
}
FooBody( FooBody& other, tbb::split ) {
++FooBodyCount;
++ForkCount;
sum = 0;
parent = &other;
join_count = 0;
is_new = true;
forked = 1;
}
void join( FooBody& s ) {
ASSERT( s.forked==1, NULL );
ASSERT( this!=&s, NULL );
ASSERT( this==s.parent, NULL );
ASSERT( end==s.begin, NULL );
end = s.end;
sum += s.sum;
join_count += s.join_count + 1;
s.forked = 2;
}
void operator()( const MinimalRange& r ) {
for( size_t k=r.begin; k<r.end; ++k )
++sum;
if( is_new ) {
is_new = false;
begin = r.begin;
} else
ASSERT( end==r.begin, NULL );
end = r.end;
}
};
#include <cstdio>
#include "harness.h"
#include "tbb/tick_count.h"
void Flog( int nthread, bool interference=false ) {
for (int mode = 0; mode < 4; mode++) {
tbb::tick_count T0 = tbb::tick_count::now();
long join_count = 0;
tbb::affinity_partitioner ap;
for( size_t i=0; i<=1000; ++i ) {
FooBody f;
f.sum = 0;
f.parent = NULL;
f.join_count = 0;
f.is_new = true;
f.forked = 0;
f.begin = ~size_t(0);
f.end = ~size_t(0);
ASSERT( FooBodyCount==1, NULL );
switch (mode) {
case 0:
tbb::parallel_reduce( MinimalRange(i), f );
break;
case 1:
tbb::parallel_reduce( MinimalRange(i), f, tbb::simple_partitioner() );
break;
case 2:
tbb::parallel_reduce( MinimalRange(i), f, tbb::auto_partitioner() );
break;
case 3:
tbb::parallel_reduce( MinimalRange(i), f, ap );
break;
}
join_count += f.join_count;
ASSERT( FooBodyCount==1, NULL );
ASSERT( f.sum==i, NULL );
ASSERT( f.begin==(i==0 ? ~size_t(0) : 0), NULL );
ASSERT( f.end==(i==0 ? ~size_t(0) : i), NULL );
}
tbb::tick_count T1 = tbb::tick_count::now();
REMARK("time=%g join_count=%ld ForkCount=%ld nthread=%d%s\n",
(T1-T0).seconds(),join_count,long(ForkCount), nthread, interference ? " with interference)":"");
}
}
class DeepThief: public tbb::task {
/*override*/tbb::task* execute() {
if( !is_stolen_task() )
spawn(*child);
wait_for_all();
return NULL;
}
task* child;
friend void FlogWithInterference(int);
public:
DeepThief() : child() {}
};
//! Test for problem in TBB 2.1 parallel_reduce where middle of a range is stolen.
/** Warning: this test is a somewhat abusive use of TBB somewhat because
it requires two or more threads to avoid deadlock. */
void FlogWithInterference( int nthread ) {
ASSERT( nthread>=2, "requires too or more threads" );
// Build linear chain of tasks.
// The purpose is to drive up "task depth" in TBB 2.1.
// An alternative would be to use add_to_depth, but that method is deprecated in TBB 2.2,
// and this way we generalize to catching problems with implicit depth calculations.
tbb::task* root = new( tbb::task::allocate_root() ) tbb::empty_task;
root->set_ref_count(2);
tbb::task* t = root;
for( int i=0; i<3; ++i ) {
t = new( t->allocate_child() ) tbb::empty_task;
t->set_ref_count(1);
}
// Append a DeepThief to the chain.
DeepThief* deep_thief = new( t->allocate_child() ) DeepThief;
deep_thief->set_ref_count(2);
// Append a leaf to the chain.
tbb::task* leaf = new( deep_thief->allocate_child() ) tbb::empty_task;
deep_thief->child = leaf;
root->spawn(*deep_thief);
Flog(nthread,true);
if( root->ref_count()==2 ) {
// Spawn leaf, which when it finishes, cause the DeepThief and rest of the chain to finish.
root->spawn( *leaf );
}
// Wait for all tasks in the chain from root to leaf to finish.
root->wait_for_all();
root->destroy( *root );
}
#include "tbb/blocked_range.h"
#if _MSC_VER
typedef tbb::internal::uint64_t ValueType;
#else
typedef uint64_t ValueType;
#endif
struct Sum {
template<typename T>
T operator() ( const T& v1, const T& v2 ) const {
return v1 + v2;
}
};
struct Accumulator {
ValueType operator() ( const tbb::blocked_range<ValueType*>& r, ValueType value ) const {
for ( ValueType* pv = r.begin(); pv != r.end(); ++pv )
value += *pv;
return value;
}
};
void ParallelSum () {
const ValueType I = 0,
N = 1000000,
R = N * (N + 1) / 2;
ValueType *array = new ValueType[N + 1];
for ( ValueType i = 0; i < N; ++i )
array[i] = i + 1;
tbb::blocked_range<ValueType*> range(array, array + N);
ValueType r1 = tbb::parallel_reduce( range, I, Accumulator(), Sum() );
ASSERT( r1 == R, NULL );
#if __TBB_LAMBDAS_PRESENT
ValueType r2 = tbb::parallel_reduce( range, I,
[](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType {
for ( ValueType* pv = r.begin(); pv != r.end(); ++pv )
value += *pv;
return value;
},
Sum()
);
ASSERT( r2 == R, NULL );
#endif /* LAMBDAS */
delete array;
}
#include "tbb/task_scheduler_init.h"
#include "harness_cpu.h"
int TestMain () {
if( MinThread<0 ) {
REPORT("Usage: nthread must be positive\n");
exit(1);
}
for( int p=MinThread; p<=MaxThread; ++p ) {
tbb::task_scheduler_init init( p );
Flog(p);
if( p>=2 )
FlogWithInterference(p);
ParallelSum();
// Test that all workers sleep when no work
TestCPUUserTime(p);
}
return Harness::Done;
}