54 #define NEED_SMP_DECLS 1
55 #include "tbci/basics.h"
69 #ifdef HAVE_SYS_SYSINFO_H
70 # include <sys/sysinfo.h>
83 # define MAX_THREADS 160
86 #include "tbci/list.h"
89 # include <sys/syscall.h>
94 return syscall(SYS_gettid);
98 # define gettid getpid
117 unsigned long page_mask;
121 # define TCHK(x) if (UNLIKELY(err = x)) fprintf (stderr, #x " failed: %s\n", CSTD__ strerror (err))
122 # define ERRDECL int err = 0
128 #ifdef HAVE_SCHED_GETAFFINITY
129 static cpu_set_t saved_cpuset;
131 #ifndef HAVE_CPU_COUNT
133 static int CPU_COUNT(
const cpu_set_t *cpus)
136 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
137 if (CPU_ISSET(
i, cpus))
143 static void CPU_XOR(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
145 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
146 if (CPU_ISSET(
i, src1) ^ CPU_ISSET(
i, src2))
153 static void CPU_AND(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
155 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
156 if (CPU_ISSET(
i, src1) & CPU_ISSET(
i, src2))
164 static int next_set_bit(
const int start,
const cpu_set_t *cpus)
166 for (
int i = start;
i < CPU_SETSIZE; ++
i)
167 if (CPU_ISSET(
i, cpus))
172 static char cpu_buf[1024];
173 static const char* cpu_str(
const cpu_set_t *cpus)
175 static volatile int off = 0;
181 char* ret = cpu_buf+old_off+1;
184 for (
int i = 0;
i < CPU_SETSIZE && ptr < 1016; ++
i)
185 if (CPU_ISSET(
i, cpus))
186 ptr += sprintf(cpu_buf+ptr,
" %i",
i);
189 #define CPU_SETBYTES ((CPU_SETSIZE+7)/8)
190 #if defined(__linux__)
192 static void parse_siblings(
const int cpu, cpu_set_t *cpus,
const int remove)
195 sprintf(fn,
"/sys/devices/system/cpu/cpu%i/topology/thread_siblings", cpu);
196 FILE *f = fopen(fn,
"r");
199 int sibl[(CPU_SETSIZE+31)/32];
205 if (fscanf(f,
"%x,", &msk) == EOF) {
206 if (fscanf(f,
"%x", &msk) == EOF)
210 sibl[nr_ints++] = msk;
215 cpu_set_t sibl_set; CPU_ZERO(&sibl_set);
216 long* sibl_ptr = (
long*)&sibl_set;
217 #if __BYTE_ORDER == __LITTLE_ENDIAN
218 for (
int j = nr_ints-1; j >= 0; --j)
219 *sibl_ptr++ = sibl[j];
221 long* sibl_ptr2 = (
long*)&sibl;
222 for (
int j = (nr_ints*
sizeof(
int)/
sizeof(long))-1; j >= 0; --j)
223 *sibl_ptr++ = sibl_ptr2[j];
227 if (!CPU_ISSET(cpu, &sibl_set))
230 memcpy(cpus, &sibl_set, CPU_SETBYTES);
233 CPU_CLR(cpu, &sibl_set);
234 CPU_AND(&sibl_set, &sibl_set, cpus);
235 CPU_XOR(cpus, cpus, &sibl_set);
239 static void add_siblings(
const int cpu, cpu_set_t *cpus)
241 parse_siblings(cpu, cpus, 0);
244 static void remove_hyperthreads(cpu_set_t *cpus)
247 fprintf(stderr,
"CPU set before HT removal: %s\n", cpu_str(cpus));
249 for (
int i = CPU_SETSIZE-1;
i >=0; --
i) {
250 if (!CPU_ISSET(
i, cpus))
252 parse_siblings(
i, cpus, 1);
255 fprintf(stderr,
"CPU set after HT removal: %s\n", cpu_str(cpus));
259 static void remove_hyperthreads(cpu_set_t *cpus)
270 #ifdef HAVE_SCHED_GETAFFINITY
272 if (! sched_getaffinity(pid, CPU_SETBYTES, &saved_cpuset)) {
274 remove_hyperthreads(&saved_cpuset);
275 return CPU_COUNT(&saved_cpuset);
278 #ifdef HAVE_GET_NPROCS // defined(__GLIBC__) && __GLIBC__ >= 2
279 cpus = get_nprocs ();
282 #elif defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
285 cpuinfo = fopen (
"/proc/cpuinfo",
"r");
288 while (!feof (cpuinfo)) {
289 fgets (buf, 128, cpuinfo);
290 if (
CSTD__ memcmp (buf+1,
"rocessor", 8) == 0)
295 fprintf (stderr,
"%i CPUs detected.\n", cpus);
297 return (cpus == 0 ? 1: cpus);
300 #ifdef HAVE_GETLOADAVG
301 static int loadavg ()
304 int err = getloadavg (loads, 3);
306 return (
int)loads[0];
308 return (
int)loads[1];
335 cback callback(ctor, dtor, parm);
336 thread_cbacks.
append(&callback);
342 cback callback(ctor, dtor, parm);
344 BCHK(!cbk,
NumErr, Deregister unrgistered callback, (
long int)ctor, );
352 fprintf (stderr,
" Thread (%i) synchronization problem!\n", tc->
t_no);
353 fflush (stderr); abort ();
368 static int cpu_to_node(
int cpu)
371 for (
int n = 0; n < numa_num_possible_nodes(); ++n) {
372 sprintf(fn,
"/sys/devices/system/node/node%i/cpu%i", n, cpu);
373 if (!access(fn, R_OK))
379 int do_numa_init(
unsigned int cpu,
struct thr_struct *
ts,
unsigned long stack)
381 int node = cpu_to_node(cpu);
382 struct bitmask *nodes = numa_allocate_nodemask();
385 numa_bitmask_clearall(nodes);
386 numa_bitmask_setbit(nodes, node);
387 numa_set_preferred(node);
388 #ifdef HAVE_SCHED_GETAFFINITY
389 numa_set_membind(nodes);
396 #ifdef HAVE_SCHED_GETAFFINITY
398 CPU_SET(cpu, &cpuset);
399 parse_siblings(cpu, &cpuset, 0);
400 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
401 pthread_setaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
403 sched_setaffinity(ts->
t_pid, CPU_SETBYTES, &cpuset);
408 void *pages[] = {(
void*)(stack & page_mask)};
409 const int nodes[] = {node};
410 int status = node, oldstat;
412 numa_move_pages(0, 1, pages,
413 NULL, &status, MPOL_MF_MOVE);
416 err = numa_move_pages(0, 1, pages,
417 nodes, &status, MPOL_MF_MOVE);
419 fprintf(stderr,
"Numa thr %i(tid %p, lwp %i): CPUs %s Node %i (Stack @%p: %i->%i(%li)\n",
421 cpu_str(&cpuset), node, pages[0], oldstat,
425 numa_free_nodemask(nodes);
429 void numa_init_job(
struct thr_ctrl *tc)
436 int do_numa_move_pages(
int node,
int fault_in,
unsigned long firstaddr,
unsigned long lastaddr)
439 int nodes[PG_PC], status[PG_PC];
440 int nr_pages = 0;
int nr_moved = 0;
441 unsigned long orig_addr = firstaddr;
443 memset(status, 0, PG_PC*
sizeof(
int));
445 if (firstaddr - (firstaddr & page_mask) >= page_size/2)
446 firstaddr = firstaddr & page_mask;
448 firstaddr = (firstaddr + (page_size - 1)) & page_mask;
449 if (lastaddr - (lastaddr & page_mask) > page_size/2)
450 lastaddr = lastaddr & page_mask;
452 lastaddr = (lastaddr + (page_size - 1)) & page_mask;
454 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i: %li pg %i calls)\n",
455 (
void*)firstaddr, (
void*)lastaddr, node,
456 (lastaddr-firstaddr)/page_size,
457 (lastaddr-firstaddr)/page_size/PG_PC);
459 while (lastaddr > firstaddr) {
461 for (i = 0; (i < PG_PC) && (firstaddr < lastaddr); ++
i, firstaddr += page_size) {
462 pages[
i] = (
void*)firstaddr;
466 if (
UNLIKELY(firstaddr < orig_addr))
467 *(
volatile int*)orig_addr = *(
volatile int*)orig_addr;
469 *(
volatile int*)firstaddr = *(
volatile int*)firstaddr;
473 long err = numa_move_pages(0, i, pages,
474 NULL, status, MPOL_MF_MOVE);
476 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i): %s\n",
477 pages[0], pages[i-1], node, strerror(errno));
478 int to_move = 0; err = 0;
479 for (
int j = 0; j <
i; ++j)
480 if (status[j] != node)
483 fprintf(stderr,
"numa_move_pages(%p -- %p: %i pages, move %i to node %i (from %i and others)\n",
484 pages[0], pages[i-1], i, to_move, node, status[0]);
488 err = numa_move_pages(0, i, pages,
489 nodes, status, MPOL_MF_MOVE);
491 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i): %s\n",
492 pages[0], pages[i-1], node, strerror(errno));
497 void numa_move_pages_job(
struct thr_ctrl *tc)
501 (
unsigned long)tc->
t_par[0],
502 (
unsigned long)tc->
t_par[1]);
507 #if defined(__i386__) || defined(__x86_64__)
508 # define _cpu_relax() asm ("rep; nop")
510 # define _cpu_relax() do {} while(0)
523 # define POLL_REP2 168
529 while (w-- && (
signed)(rd = read(fd, ptr, sz)) < (
signed)sz)
543 fcntl(fd, F_SETFL, 0);
545 rd = read(fd, ptr, sz);
547 fcntl(fd, F_SETFL, O_NONBLOCK);
557 #if defined(HAVE_TLS) || defined(HAVE_DTLS)
563 memset(&tc, 0,
sizeof(tc));
564 memset(&out, 0,
sizeof(out));
567 fprintf (stderr,
" Thread %i: Try to get setup lock\n", tc.
t_no);
572 fprintf (stderr,
" Thread %i setup: id %08lx, lwp %i; signal main thread ...\n",
582 fprintf(stderr,
"Thread %i: Reading failed! %i\n", tc.
t_no, err);
586 fprintf (stderr,
" Thread %i: Start job %li @ %p\n", ts->
t_no, tc.
t_job_no, tc.
t_job);
596 fprintf (stderr,
" Thread %i: Job done!\n", ts->
t_no);
602 fprintf(stderr,
"Thread %i: Writing failed! %i\n", ts->
t_no, err);
606 fprintf (stderr,
" Thread %i: Signaled Job done!\n", ts->
t_no);
611 fprintf (stderr,
" Thread %i: exit!\n", ts->
t_no);
616 #if defined(HAVE_CLOCK_GETTIME) && defined(THREAD_STAT)
618 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
619 const double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
620 out.
t_retval = (
long)(secs * CLOCKS_PER_SEC);
631 int t,
err, det_cpus, all_cpus;
632 pthread_mutexattr_t mutattr;
641 #ifdef HAVE_GETLOADAVG
644 int lavg = loadavg ();
648 if (num_threads < 2 && det_cpus > 2)
665 fprintf(stderr,
"Warning: Number of threads %i larger than no of CPU cores %i!\n",
667 #ifdef HAVE_SCHED_GETAFFINITY
684 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
702 fprintf(stderr,
"Failed to create pipe for thread %i\n", t);
707 fprintf(stderr,
"Failed to create pipe for thread %i\n", t);
715 fprintf (stderr,
"%i threads @%p (sz=%zi) set up.\n",
716 num_threads, threads,
sizeof(
struct thr_struct));
720 c->ctor(
c->parm, num_threads);
729 #ifdef HAVE_SCHED_GETAFFINITY
731 CPU_ZERO(&saved_cpuset);
734 fprintf(stderr,
"TBCI SMP: More threads(%i) than allowed CPUs(%i)!\n",
736 cpu_set_t cpuset; CPU_ZERO(&cpuset);
739 int next_cpu = next_set_bit(0, &saved_cpuset);
745 if (numa_max_node() == 0)
748 page_size = numa_pagesize();
749 page_mask = ~(page_size-1ULL);
753 fprintf(stderr,
"NUMA: avail %i, maxnode %i\n",
numa_avail, numa_max_node());
756 CPU_SET(next_cpu, &cpuset);
758 add_siblings(next_cpu, &cpuset);
763 ts.
t_id = pthread_self();
767 #ifdef HAVE_SCHED_GETAFFINITY
774 next_cpu, cpu_str(&cpuset));
779 next_cpu, cpu_str(&cpuset));
785 int several_nodes = 0;
788 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
790 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
791 CPU_SET(next_cpu, &cpuset);
793 add_siblings(next_cpu, &cpuset);
799 (
void*)ts, (
void*)0);
802 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
803 pthread_setaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
808 fprintf(stderr,
"Set Thread %i: CPUs %s\n", t,
812 pthread_getaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
813 fprintf(stderr,
"Get Thread %i: CPUs %s\n", t,
824 #ifdef HAVE_SCHED_GETAFFINITY
826 fprintf(stderr,
"Main thread: CPUs %s, Node %i\n",
829 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
832 pthread_getaffinity_np(ts->
t_id, CPU_SETBYTES, &cpuset);
833 fprintf(stderr,
"Thread %i: CPUs %s, Node %i\n",
837 if (!several_nodes) {
840 fprintf(stderr,
"All threads on node %i, disabling NUMA\n",
846 int omp_thr = omp_get_max_threads();
848 fprintf(stderr,
"Now setting affinity for %i OpenMP threads ...\n", omp_thr);
850 #pragma omp parallel for private(cpuset, next_cpu) schedule(static,1)
851 for (
int t = 0; t < omp_thr; ++t) {
852 int tid = omp_get_thread_num();
853 next_cpu = next_set_bit(0, &saved_cpuset);
854 for (
int i = 0; i < t; ++
i) {
855 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
857 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
860 CPU_SET(next_cpu, &cpuset);
862 add_siblings(next_cpu, &cpuset);
863 sched_setaffinity(0, CPU_SETBYTES, &cpuset);
866 fprintf(stderr,
"Setting OMP thread %i to %i CPUs %s\n", tid, CPU_COUNT(&cpuset), cpu_str(&cpuset));
885 memset(&job, 0,
sizeof(job));
892 TCHK(pthread_join (ts->
t_id, &res));
901 fprintf (stderr,
" CPU time for thread %i :%7.3f s\n",
902 ts->
t_no, (
double)(tm)/CLOCKS_PER_SEC);
907 c->dtor(
c->parm, num_threads);
909 clock_t mainclock = clock();
910 #ifdef HAVE_CLOCK_GETTIME
913 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
914 double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
915 mainclock = (
long)(secs*CLOCKS_PER_SEC);
918 tot_cpu_tm += mainclock;
920 fprintf (stderr,
" CPU time for main thr :%7.3f s\n",
921 (
double)mainclock/CLOCKS_PER_SEC);
922 fprintf (stderr,
" CPU for all threads :%7.3f s\n",
923 (
double)(tot_cpu_tm)/CLOCKS_PER_SEC);
924 fprintf (stderr,
" Poll successes: %li, unexp. succ.: %li, failures: %li, exp. fails: %li\n",
925 poll_succ, poll_usucc, poll_fail, poll_efail);
930 num_threads = 0; threads = 0;
931 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
935 #ifdef HAVE_SCHED_GETAFFINITY
947 const unsigned long off,
const unsigned long sz,
950 void * par;
unsigned t = 0;
955 memset(&job, 0,
sizeof(job));
962 while ((par = va_arg (vl,
void*)))
963 job.
t_par[t++] = par;
967 fprintf (stderr,
"Signal thread %i\n", ts->
t_no);
971 fprintf(stderr,
"Signaling job %li to thread %i failed",
980 const unsigned long off,
const unsigned long sz, ...)
989 const unsigned long sz, ...)
1004 fprintf (stderr,
"Wait for thread %i\n", ts->
t_no);
1007 fprintf(stderr,
"Waiting for thread %i failed!\n", ts->
t_no);
1012 fprintf (stderr,
"Thread %i signaled completion.\n", ts->
t_no);
1024 fprintf (stderr,
"Wait for result of thread %i\n", ts->
t_no);
1027 fprintf(stderr,
"Waiting for thread %i failed!\n", ts->
t_no);
1032 fprintf (stderr,
"Thread %i signaled completion\n", ts->
t_no);
1041 omp_set_num_threads(0);
1050 fprintf (stderr,
"reenable_threads(): Threads already enabled.!\n");
1072 { omp_set_num_threads(0); }
1078 { omp_set_num_threads(omp_get_num_procs()); }
1083 const unsigned long sz, ...))
1091 while ((par = va_arg (vl,
void*)))
1092 tc->
t_par[t++] = par;
1094 fprintf (stderr,
"Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1095 # ifdef ABORT_ON_ERR
1101 const unsigned long off,
const unsigned long sz, ...))
1109 while ((par = va_arg (vl,
void*)))
1110 tc->
t_par[t++] = par;
1112 fprintf (stderr,
"Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1113 # ifdef ABORT_ON_ERR
1124 unsigned int curr_n_thr
WEAKA;
1125 unsigned int last_n_thr
WEAKA;
1126 unsigned int prev_n_thr
WEAKA;
#define BCHKNR(cond, exc, txt, ind)
void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
#define _TBCI_CWD_DEFAULT
volatile char t_res_dummy[16]
unsigned int tbci_control
const Vector< T > const Vector< T > const Vector< T > & p
void _thread_start_off(const int thr_no, thr_job_t ljob, const unsigned long off, const unsigned long sz, va_list vl)
void deinit(const int thr)
unsigned long t_job_output_no
void lina_empty(struct thr_ctrl *dummy)
int t_pipe_from_thread[2]
void thread_start(const int thr_no, thr_job_t job, const unsigned long sz,...)
exception base class for the TBCI NumLib
static List< cback > thread_cbacks
#define BCHK(cond, exc, txt, ind, rtval)
cback(cbackfn ct, cbackfn dt, void *p)
NAMESPACE_END NAMESPACE_TBCI unsigned int tbci_control WEAKA
void bind_threads(bool bind_main, bool enable_numa, bool add_sibl)
void(* thr_job_t)(struct thr_ctrl *)
Before the double inclusion guard on purpose!
NAMESPACE_TBCI int num_threads
void * lina_thread(void *thr)
struct thr_struct * threads
#define PREFETCH_R(addr, loc)
In case gcc does not yet support __builtin_prefetch(), we have handcoded assembly with gcc for a few ...
static int detect_num_cpu(int rmv_ht)
void * empty_thread(void *dummy)
void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
volatile char t_res_dummy[16]
static int busy_read(int fd, void *ptr, size_t sz, int rep=1)
const Vector< T > const Vector< T > const Vector< T > int T T & err
THREAD__ struct thr_struct * this_thread
void *(* useful_job_t)(void *)
void thread_wait(const int thr_no, struct job_output *out)
double thread_wait_result(const int thr_no)
THREAD__ int ismainthread
const Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > long int res
int init_threads(const int num_cpu, const bool load_magic)
static unsigned long job_no
const unsigned TMatrix< T > const Matrix< T > * a
T * setcurr(const T *rec) const
void thread_start_off(const int thr_no, thr_job_t job, const unsigned long off, const unsigned long sz,...)
void lina_err(struct thr_ctrl *tc)