TBCI Numerical high perf. C++ Library 2.8.0
smp.cc
Go to the documentation of this file.
1
8
50
51
52#define _GNU_SOURCE 1
53
54#define NEED_SMP_DECLS 1
55#include "tbci/basics.h"
56#include "tbci/smp.h"
57#include <stdio.h>
58#include <time.h>
59
60#include <errno.h>
61#include <fcntl.h>
62
63#ifdef _POSIX_THREADS
64#ifdef SMP
65
66//#include <signum.h>
67#include <stdlib.h>
68
69#ifdef HAVE_SYS_SYSINFO_H
70# include <sys/sysinfo.h>
71#endif
72
73#ifdef HAVE_SCHED_H
74# include <sched.h>
75#endif
76
77#ifdef HAVE_NUMA_H
78# include <numa.h>
79# include <numaif.h>
80#endif
81
82#ifndef MAX_THREADS
83# define MAX_THREADS 160
84#endif
85
86#include "tbci/list.h"
87
88#ifdef __linux__
89# include <sys/syscall.h>
90# include <unistd.h>
91#ifndef HAVE_GETTID
92inline static pid_t gettid()
93{
94 return syscall(SYS_gettid);
95}
96#endif
97#else
98# define gettid getpid
99#endif
100
102
106struct thr_struct *threads = 0;
108bool threads_bound = false;
109bool bound_main = false;
114
115#ifdef HAVE_LIBNUMA
116unsigned page_size;
117unsigned long page_mask;
118#endif
119
120#ifdef DEBUG_THREAD
121# define TCHK(x) if (UNLIKELY(err = x)) fprintf (stderr, #x " failed: %s\n", CSTD__ strerror (err))
122# define ERRDECL int err = 0
123#else
124# define TCHK(x) x
125# define ERRDECL
126#endif
127
128#ifdef HAVE_SCHED_GETAFFINITY
129static cpu_set_t saved_cpuset;
130
131#ifndef HAVE_CPU_COUNT
132#undef CPU_COUNT
133static int CPU_COUNT(const cpu_set_t *cpus)
134{
135 int cnt = 0;
136 for (int i = 0; i < CPU_SETSIZE; ++i)
137 if (CPU_ISSET(i, cpus))
138 ++cnt;
139 return cnt;
140}
141#endif
142#ifndef CPU_XOR
143static void CPU_XOR(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
144{
145 for (int i = 0; i < CPU_SETSIZE; ++i)
146 if (CPU_ISSET(i, src1) ^ CPU_ISSET(i, src2))
147 CPU_SET(i, dest);
148 else
149 CPU_CLR(i, dest);
150}
151#endif
152#ifndef CPU_AND
153static void CPU_AND(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
154{
155 for (int i = 0; i < CPU_SETSIZE; ++i)
156 if (CPU_ISSET(i, src1) & CPU_ISSET(i, src2))
157 CPU_SET(i, dest);
158 else
159 CPU_CLR(i, dest);
160}
161#endif
162
163
164static int next_set_bit(const int start, const cpu_set_t *cpus)
165{
166 for (int i = start; i < CPU_SETSIZE; ++i)
167 if (CPU_ISSET(i, cpus))
168 return i;
169 return -1;
170}
171
172static char cpu_buf[1024];
173static const char* cpu_str(const cpu_set_t *cpus)
174{
175 static volatile int off = 0;
176 int old_off = off;
177 if (off < 896)
178 off += 128;
179 else
180 off = 0;
181 char* ret = cpu_buf+old_off+1;
182 int ptr = old_off;
183 cpu_buf[ptr+1] = 0;
184 for (int i = 0; i < CPU_SETSIZE && ptr < 1016; ++i)
185 if (CPU_ISSET(i, cpus))
186 ptr += sprintf(cpu_buf+ptr, " %i", i);
187 return ret;
188}
189#define CPU_SETBYTES ((CPU_SETSIZE+7)/8)
190#if defined(__linux__)
191
192static void parse_siblings(const int cpu, cpu_set_t *cpus, const int remove)
193{
194 char fn[80];
195 sprintf(fn, "/sys/devices/system/cpu/cpu%i/topology/thread_siblings", cpu);
196 FILE *f = fopen(fn, "r");
197 if (!f)
198 return;
199 int sibl[(CPU_SETSIZE+31)/32];
200 int nr_ints = 0;
201 bool end = false;
202 /* Linux reports this as field of ints, last int covers first 32 CPUs */
203 while (!end) {
204 int msk;
205 if (fscanf(f, "%x,", &msk) == EOF) {
206 if (fscanf(f, "%x", &msk) == EOF)
207 break;
208 end = true;
209 }
210 sibl[nr_ints++] = msk;
211 }
212 fclose(f);
213 /* Convert to cpu_set_t data structure (machine endian field of longs)
214 * first long covers first 32/64 CPUs */
215 cpu_set_t sibl_set; CPU_ZERO(&sibl_set);
216 long* sibl_ptr = (long*)&sibl_set;
217#if __BYTE_ORDER == __LITTLE_ENDIAN
218 for (int j = nr_ints-1; j >= 0; --j)
219 *sibl_ptr++ = sibl[j];
220#else
221 long* sibl_ptr2 = (long*)&sibl;
222 for (int j = (nr_ints*sizeof(int)/sizeof(long))-1; j >= 0; --j)
223 *sibl_ptr++ = sibl_ptr2[j];
224#endif
225 //fprintf(stderr, "Siblings for cpu%i: %s\n", cpu, cpu_str(&sibl_set));
226 //fprintf(stderr, "CPUS before rmv: %s\n", cpu_str(cpus));
227 if (!CPU_ISSET(cpu, &sibl_set))
228 abort();
229 if (!remove) {
230 memcpy(cpus, &sibl_set, CPU_SETBYTES);
231 return;
232 }
233 CPU_CLR(cpu, &sibl_set);
234 CPU_AND(&sibl_set, &sibl_set, cpus);
235 CPU_XOR(cpus, cpus, &sibl_set);
236 //fprintf(stderr, "CPUS after rmvl: %s\n", cpu_str(cpus));
237}
238
239static void add_siblings(const int cpu, cpu_set_t *cpus)
240{
241 parse_siblings(cpu, cpus, 0);
242}
243
244static void remove_hyperthreads(cpu_set_t *cpus)
245{
246#ifdef THREAD_STAT
247 fprintf(stderr, "CPU set before HT removal: %s\n", cpu_str(cpus));
248#endif
249 for (int i = CPU_SETSIZE-1; i >=0; --i) {
250 if (!CPU_ISSET(i, cpus))
251 continue;
252 parse_siblings(i, cpus, 1);
253 }
254#ifdef THREAD_STAT
255 fprintf(stderr, "CPU set after HT removal: %s\n", cpu_str(cpus));
256#endif
257}
258#else
259static void remove_hyperthreads(cpu_set_t *cpus)
260{
261}
262#endif
263
264#endif
265
266/* detect the # of cpus for glibc and/or linux systems */
267static int detect_num_cpu (int rmv_ht)
268{
269 int cpus = 0;
270#ifdef HAVE_SCHED_GETAFFINITY
271 pid_t pid = gettid();
272 if (! sched_getaffinity(pid, CPU_SETBYTES, &saved_cpuset)) {
273 if (rmv_ht)
274 remove_hyperthreads(&saved_cpuset);
275 return CPU_COUNT(&saved_cpuset);
276 }
277#endif
278#ifdef HAVE_GET_NPROCS // defined(__GLIBC__) && __GLIBC__ >= 2
279 cpus = get_nprocs ();
280 if (cpus <= 0)
281 return 2;
282#elif defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
283 FILE* cpuinfo;
284 char buf[128];
285 cpuinfo = fopen ("/proc/cpuinfo", "r");
286 if (cpuinfo <= 0)
287 return 2; /* return 2, if we can't find out */
288 while (!feof (cpuinfo)) {
289 fgets (buf, 128, cpuinfo);
290 if (CSTD__ memcmp (buf+1, "rocessor", 8) == 0)
291 cpus++;
292 }
293#endif
294#ifdef DEBUG_THREAD
295 fprintf (stderr, "%i CPUs detected.\n", cpus);
296#endif
297 return (cpus == 0 ? 1: cpus);
298}
299
300#ifdef HAVE_GETLOADAVG
301static int loadavg ()
302{
303 double loads[3];
304 int err = getloadavg (loads, 3);
305 if (err == 1)
306 return (int)loads[0];
307 else if (err > 1)
308 return (int)loads[1];
309 else
310 return 0;
311}
312#endif
313
314class cback {
315 public:
316 cbackfn *ctor;
317 cbackfn *dtor;
318 void *parm;
319 cback(cbackfn ct, cbackfn dt, void *p)
320 : ctor(ct), dtor(dt), parm(p) {};
322 : ctor(NULL), dtor(NULL), parm(NULL) {};
323 void init(const int thr)
324 { ctor(parm, thr); }
325 void deinit(const int thr)
326 { dtor(parm, thr); }
327};
328
330/* Instantiate */
331template class List<cback>;
332
333void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
334{
335 cback callback(ctor, dtor, parm);
336 thread_cbacks.append(&callback);
337 //fprintf(stderr, "Register callback %p (%p %p %p)\n", &callback, ctor, dtor, parm);
338}
339
340void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
341{
342 cback callback(ctor, dtor, parm);
343 cback *cbk = thread_cbacks.setcurr(&callback);
344 BCHK(!cbk, NumErr, Deregister unrgistered callback, (long int)ctor, );
345 thread_cbacks.delcurr();
346 //fprintf(stderr, "Deregister callback %p (%p %p %p)\n", &callback, ctor, dtor, parm);
347}
348
349
350void lina_err (struct thr_ctrl *tc)
351{
352 fprintf (stderr, " Thread (%i) synchronization problem!\n", tc->t_no);
353 fflush (stderr); abort ();
354}
355
356void lina_empty (struct thr_ctrl *dummy)
357{
358 /* nothing */
359}
360
361void* empty_thread (void *dummy)
362{
363 return NULL;
364}
365
366#ifdef HAVE_LIBNUMA
367
368static int cpu_to_node(int cpu)
369{
370 char fn[80];
371 for (int n = 0; n < numa_num_possible_nodes(); ++n) {
372 sprintf(fn, "/sys/devices/system/node/node%i/cpu%i", n, cpu);
373 if (!access(fn, R_OK))
374 return n;
375 }
376 return -1;
377}
378
379int do_numa_init(unsigned int cpu, struct thr_struct *ts, unsigned long stack)
380{
381 int node = cpu_to_node(cpu);
382 struct bitmask *nodes = numa_allocate_nodemask();
383 //fprintf(stderr, "Thr %i: CPU %i Node %i\n", ts->t_no, cpu, my_node);
384 cpu_set_t cpuset;
385 numa_bitmask_clearall(nodes);
386 numa_bitmask_setbit(nodes, node);
387 numa_set_preferred(node);
388#ifdef HAVE_SCHED_GETAFFINITY
389 numa_set_membind(nodes);
390#else
391 numa_bind(nodes);
392#endif
393 //sched_yield();
394 //numa_set_localalloc();
395 ts->numa_node = node;
396#ifdef HAVE_SCHED_GETAFFINITY
397 CPU_ZERO(&cpuset);
398 CPU_SET(cpu, &cpuset);
399 parse_siblings(cpu, &cpuset, 0);
400#ifdef HAVE_PTHREAD_GETAFFINITY_NP
401 pthread_setaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
402#else
403 sched_setaffinity(ts->t_pid, CPU_SETBYTES, &cpuset);
404#endif
405#endif
406 /* Move stack page to my node */
407 if (1) {
408 void *pages[] = {(void*)(stack & page_mask)};
409 const int nodes[] = {node};
410 int status = node, oldstat;
411 long err = 1;
412 numa_move_pages(0, 1, pages,
413 NULL, &status, MPOL_MF_MOVE);
414 oldstat = status;
415 if (oldstat != node)
416 err = numa_move_pages(0, 1, pages,
417 nodes, &status, MPOL_MF_MOVE);
418#ifdef THREAD_DEBUG
419 fprintf(stderr, "Numa thr %i(tid %p, lwp %i): CPUs %s Node %i (Stack @%p: %i->%i(%li)\n",
420 ts->t_no, (void*)ts->t_id, ts->t_pid,
421 cpu_str(&cpuset), node, pages[0], oldstat,
422 status, err);
423#endif
424 }
425 numa_free_nodemask(nodes);
426 return node;
427}
428
429void numa_init_job(struct thr_ctrl *tc)
430{
431 do_numa_init(tc->t_size, (struct thr_struct*)tc->t_par[0], (unsigned long)tc);
432}
433
435#define PG_PC 512
436int do_numa_move_pages(int node, int fault_in, unsigned long firstaddr, unsigned long lastaddr)
437{
438 void* pages[PG_PC];
439 int nodes[PG_PC], status[PG_PC];
440 int nr_pages = 0; int nr_moved = 0;
441 unsigned long orig_addr = firstaddr;
442#ifdef VALGRIND
443 memset(status, 0, PG_PC*sizeof(int));
444#endif
445 if (firstaddr - (firstaddr & page_mask) >= page_size/2)
446 firstaddr = firstaddr & page_mask;
447 else
448 firstaddr = (firstaddr + (page_size - 1)) & page_mask;
449 if (lastaddr - (lastaddr & page_mask) > page_size/2)
450 lastaddr = lastaddr & page_mask;
451 else
452 lastaddr = (lastaddr + (page_size - 1)) & page_mask;
453#if 0
454 fprintf(stderr, "numa_move_pages(%p -- %p to node %i: %li pg %i calls)\n",
455 (void*)firstaddr, (void*)lastaddr, node,
456 (lastaddr-firstaddr)/page_size,
457 (lastaddr-firstaddr)/page_size/PG_PC);
458#endif
459 while (lastaddr > firstaddr) {
460 int i;
461 for (i = 0; (i < PG_PC) && (firstaddr < lastaddr); ++i, firstaddr += page_size) {
462 pages[i] = (void*)firstaddr;
463 nodes[i] = node;
464 /* Note: Pages might be not allocated yet and won't be ... */
465 if (fault_in) {
466 if (UNLIKELY(firstaddr < orig_addr))
467 *(volatile int*)orig_addr = *(volatile int*)orig_addr;
468 else
469 *(volatile int*)firstaddr = *(volatile int*)firstaddr;
470 }
471 }
472 nr_pages += i;
473 long err = numa_move_pages(0, i, pages,
474 NULL, status, MPOL_MF_MOVE);
475 if (err)
476 fprintf(stderr, "numa_move_pages(%p -- %p to node %i): %s\n",
477 pages[0], pages[i-1], node, strerror(errno));
478 int to_move = 0; err = 0;
479 for (int j = 0; j < i; ++j)
480 if (status[j] != node)
481 ++to_move;
482#if 0
483 fprintf(stderr, "numa_move_pages(%p -- %p: %i pages, move %i to node %i (from %i and others)\n",
484 pages[0], pages[i-1], i, to_move, node, status[0]);
485#endif
486 nr_moved += to_move;
487 if (to_move)
488 err = numa_move_pages(0, i, pages,
489 nodes, status, MPOL_MF_MOVE);
490 if (err)
491 fprintf(stderr, "numa_move_pages(%p -- %p to node %i): %s\n",
492 pages[0], pages[i-1], node, strerror(errno));
493 }
494 return nr_moved;
495}
496
497void numa_move_pages_job(struct thr_ctrl *tc)
498{
499 tc->t_res_l =
500 do_numa_move_pages(tc->t_size, tc->t_off,
501 (unsigned long)tc->t_par[0],
502 (unsigned long)tc->t_par[1]);
503}
504
505#endif
506
507#if defined(__i386__) || defined(__x86_64__)
508# define _cpu_relax() asm ("rep; nop")
509#else
510# define _cpu_relax() do {} while(0)
511#endif
512
513/* Sidenote: We do no locking, approx. numbers are OK here */
514unsigned long poll_succ = 0;
515unsigned long poll_usucc = 0;
516unsigned long poll_fail = 0;
517unsigned long poll_efail = 0;
518
519#ifndef POLL_REP
520# define POLL_REP 1
521#endif
522#ifndef POLL_REP2
523# define POLL_REP2 168
524#endif
525static int busy_read(int fd, void* ptr, size_t sz, int rep=POLL_REP)
526{
527 size_t rd = 0;
528 int w = rep;
529 while (w-- && (signed)(rd = read(fd, ptr, sz)) < (signed)sz)
530 _cpu_relax();
531 if (sz == rd) {
532 if (rep == POLL_REP)
533 ++poll_usucc;
534 else
535 ++poll_succ;
536 return rd;
537 }
538 // slow path
539 if (rep == POLL_REP)
540 ++poll_efail;
541 else
542 ++poll_fail;
543 fcntl(fd, F_SETFL, 0);
544 _cpu_relax();
545 rd = read(fd, ptr, sz);
546 PREFETCH_R(ptr, 2);
547 fcntl(fd, F_SETFL, O_NONBLOCK);
548 return rd;
549}
550
551void* lina_thread (void* thr)
552{
553 struct thr_struct *ts = (struct thr_struct*) thr;
554 struct thr_ctrl tc;
555 struct job_output out;
556 int err;
557#if defined(HAVE_TLS) || defined(HAVE_DTLS)
558 ismainthread = 0;
559 thrno = ts->t_no+1;
560 this_thread = ts;
561#endif
562#ifdef VALGRIND
563 memset(&tc, 0, sizeof(tc));
564 memset(&out, 0, sizeof(out));
565#endif
566#ifdef DEBUG_THREAD
567 fprintf (stderr, " Thread %i: Try to get setup lock\n", tc.t_no);
568#endif
569 ts->t_pid = gettid (); clock ();
570 tc.t_no = ts->t_no;
571#ifdef DEBUG_THREAD
572 fprintf (stderr, " Thread %i setup: id %08lx, lwp %i; signal main thread ...\n",
573 ts->t_no, ts->t_id, ts->t_pid);
574#endif
575 /* Now wait for some work to do. Finish when a NULL job becomes runnable */
576 while (1) {
577 /* Set job to error! */
578 tc.t_job = lina_err;
579 //memset(tc.t_res_dummy, 0, sizeof(tc.t_res_dummy));
580 /* Ready to accept a job: Wait for it */
581 if (UNLIKELY((err = busy_read(ts->t_pipe_to_thread[0], &tc, sizeof(struct job_input))) != sizeof(struct job_input))) {
582 fprintf(stderr, "Thread %i: Reading failed! %i\n", tc.t_no, err);
583 abort();
584 }
585#ifdef DEBUG_THREAD
586 fprintf (stderr, " Thread %i: Start job %li @ %p\n", ts->t_no, tc.t_job_no, tc.t_job);
587 fflush (stderr);
588#endif
589 ts->t_done_var++;
590 out.t_job_output_no = tc.t_job_no;
591 if (!tc.t_job)
592 break; /* The end */
593 else
594 (*tc.t_job) (&tc);
595#ifdef DEBUG_THREAD
596 fprintf (stderr, " Thread %i: Job done!\n", ts->t_no);
597 fflush (stderr);
598#endif
599 memcpy((void*)out.t_res_dummy, (const void*)tc.t_res_dummy, sizeof(out.t_res_dummy));
600 /* Do we need some memory barrier here? But we are cache-coherent, no? */
601 if (UNLIKELY((err = write(ts->t_pipe_from_thread[1], &out, sizeof(struct job_output))) != sizeof(struct job_output))) {
602 fprintf(stderr, "Thread %i: Writing failed! %i\n", ts->t_no, err);
603 abort();
604 }
605#ifdef DEBUG_THREAD
606 fprintf (stderr, " Thread %i: Signaled Job done!\n", ts->t_no);
607 fflush (stderr);
608#endif
609 }
610#ifdef DEBUG_THREAD
611 fprintf (stderr, " Thread %i: exit!\n", ts->t_no);
612#endif
613 out.t_retval = clock ();
614 ts->t_done_var++;
615
616#if defined(HAVE_CLOCK_GETTIME) && defined(THREAD_STAT)
617 struct timespec tm;
618 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
619 const double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
620 out.t_retval = (long)(secs * CLOCKS_PER_SEC);
621 }
622#endif
623 err = write(ts->t_pipe_from_thread[1], &out, sizeof(struct job_output));
624 pthread_exit(&out.t_retval);
625 return NULL;
626}
627
628
629int init_threads (const int num_cpu, const bool load_magic)
630{
631 int t, err, det_cpus, all_cpus;
632 pthread_mutexattr_t mutattr;
633
634 main_thread_pid = getpid ();
635 /* No of CPUs */
636 all_cpus = detect_num_cpu(0);
637 det_cpus = detect_num_cpu(1);
638 /* How many ? */
639 if (num_cpu <= 0) {
640 num_threads = det_cpus;
641#ifdef HAVE_GETLOADAVG
642 if (load_magic) {
643 /* Subtract load */
644 int lavg = loadavg ();
645 /* If load exceeds no of CPUs, use 2 CPUs */
646 if (all_cpus - lavg < num_threads) {
647 num_threads = MAX(1, all_cpus - lavg);
649 num_threads = 2;
650 }
651 }
652#endif
653 if (num_cpu && num_threads > -num_cpu)
654 num_threads = -num_cpu;
655 } else
656 num_threads = num_cpu;
657 /* Max is MAX_THREADS */
660 /* No threads? */
661 if (num_threads == 1)
662 num_threads = 0;
663 /* Sanity check */
664 if (num_threads > det_cpus)
665 fprintf(stderr, "Warning: Number of threads %i larger than no of CPU cores %i!\n",
666 num_threads, det_cpus);
667#ifdef HAVE_SCHED_GETAFFINITY
668 else
669 sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
670#endif
671#ifdef TBCI_OMP
672 omp_set_num_threads(num_threads);
673#endif
674 /* Alloc mem for control structs */
675 if (num_threads >= 1)
676 threads = (struct thr_struct *) memalign(128, sizeof(struct thr_struct) * (num_threads));
677 //threads = (struct thr_ctrl *) CSTD__ malloc (sizeof(struct thr_ctrl) * (num_threads));
678 else
679 threads = NULL;
680 if (threads)
681 CSTD__ memset (threads, 0, sizeof(struct thr_struct) * (num_threads));
682 /* Some Un*xes only start the clock after the first call to clock() */
683 clock ();
684#if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
685 // If we don't have thread local storage (and no locking),
686 // we need to disable // concurrent memory caching allocations
687 // now globally :-(
688 if (num_threads >= 1) {
689 ismainthread = 0;
690 thrno = -1;
691 }
692#endif
693 /* Now init threads */
694 for (t = 0; t < num_threads; ++t) {
695 int err;
696 struct thr_struct *ts = &threads[t];
697 //fprintf(stderr, "Debug: thread %i @ %p\n", t, tc);
698 ts->t_no = t;
699 /* Create pipes for thread sync */
700 err = pipe(ts->t_pipe_to_thread);
701 if (err) {
702 fprintf(stderr, "Failed to create pipe for thread %i\n", t);
703 abort();
704 }
705 err = pipe(ts->t_pipe_from_thread);
706 if (err) {
707 fprintf(stderr, "Failed to create pipe for thread %i\n", t);
708 abort();
709 }
710 fcntl(ts->t_pipe_to_thread[0], F_SETFL, O_NONBLOCK);
711 fcntl(ts->t_pipe_from_thread[0], F_SETFL, O_NONBLOCK);
712 TCHK(pthread_create (&ts->t_id, NULL, lina_thread, ts));
713 }
714#ifdef DEBUG_THREAD
715 fprintf (stderr, "%i threads @%p (sz=%zi) set up.\n",
716 num_threads, threads, sizeof(struct thr_struct));
717#endif
718 for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
719 //fprintf(stderr, "Callback %p (%p %p %p)\n", c, c->ctor, c->dtor, c->parm);
720 c->ctor(c->parm, num_threads);
721 }
722 //fprintf(stderr, "%li Callbacks registered\n", thread_cbacks.size());
723
724 return num_threads;
725}
726
727void bind_threads (bool bind_main, bool enable_numa, bool add_sibl)
728{
729#ifdef HAVE_SCHED_GETAFFINITY
730 /* Save CPU affinity mask */
731 CPU_ZERO(&saved_cpuset);
732 sched_getaffinity (main_thread_pid, CPU_SETBYTES, &saved_cpuset);
733 if (num_threads > CPU_COUNT(&saved_cpuset))
734 fprintf(stderr, "TBCI SMP: More threads(%i) than allowed CPUs(%i)!\n",
735 num_threads, CPU_COUNT(&saved_cpuset));
736 cpu_set_t cpuset; CPU_ZERO(&cpuset);
737
738 /* TODO: Be clever to which CPUs to bind to, e.g. avoid hyperthreads */
739 int next_cpu = next_set_bit(0, &saved_cpuset);
740 if (next_cpu < 0)
741 abort();
742 bound_main = bind_main;
743#ifdef HAVE_LIBNUMA
744 numa_avail = (numa_available() == 0);
745 if (numa_max_node() == 0)
746 numa_avail = 0;
747 if (numa_avail) {
748 page_size = numa_pagesize();
749 page_mask = ~(page_size-1ULL);
750 }
751 if (!enable_numa)
752 numa_avail = 0;
753 fprintf(stderr, "NUMA: avail %i, maxnode %i\n", numa_avail, numa_max_node());
754#endif /* HAVE_LIBNUMA */
755 if (bind_main) {
756 CPU_SET(next_cpu, &cpuset);
757 if (add_sibl)
758 add_siblings(next_cpu, &cpuset);
759#ifdef HAVE_LIBNUMA
760 if (numa_avail) {
761 struct thr_struct ts;
762 ts.t_no = -1; ts.t_pid = main_thread_pid;
763 ts.t_id = pthread_self();
764 main_numa_node = do_numa_init(next_cpu, &ts, (unsigned long)&ts);
765 } else
766#endif /* HAVE_LIBNUMA */
767#ifdef HAVE_SCHED_GETAFFINITY
768 sched_setaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
769#else
770 do {} while(0);
771#endif
772#ifdef THREAD_DEBUG
773 fprintf(stderr, "Set Main Thread %i: CPU %i (%s)\n", main_thread_pid,
774 next_cpu, cpu_str(&cpuset));
775#endif
776#ifdef DEBUG_THREAD
777 sched_getaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
778 fprintf(stderr, "Get Main Thread %i: CPU %i (%s)\n", main_thread_pid,
779 next_cpu, cpu_str(&cpuset));
780#endif
781 CPU_ZERO(&cpuset);
782 }
783#endif /* HAVE_SCHED_GETAFFINITY */
784 threads_bound = true;
785 int several_nodes = 0;
786 for (int t = 0; t < num_threads; ++t) {
787 struct thr_struct *ts = &threads[t];
788 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
789 if (next_cpu == -1)
790 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
791 CPU_SET(next_cpu, &cpuset);
792 if (add_sibl)
793 add_siblings(next_cpu, &cpuset);
794#ifdef HAVE_LIBNUMA
795 if (numa_avail && cpu_to_node(next_cpu) != main_numa_node)
796 ++several_nodes;
797 if (numa_avail) {
798 thread_start(t, numa_init_job, next_cpu,
799 (void*)ts, (void*)0);
800 } else
801#endif
802#ifdef HAVE_PTHREAD_GETAFFINITY_NP
803 pthread_setaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
804#else
805 do {} while(0);
806#endif
807#ifdef THREAD_DEBUG
808 fprintf(stderr, "Set Thread %i: CPUs %s\n", t,
809 cpu_str(&cpuset));
810#endif
811#ifdef DEBUG_THREAD
812 pthread_getaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
813 fprintf(stderr, "Get Thread %i: CPUs %s\n", t,
814 cpu_str(&cpuset));
815#endif
816 CPU_ZERO(&cpuset);
817 }
818#ifdef HAVE_LIBNUMA
819 if (numa_avail)
820 for (int t = 0; t < num_threads; ++t)
821 thread_wait(t);
822#endif
823#ifdef THREAD_DEBUG
824#ifdef HAVE_SCHED_GETAFFINITY
825 sched_getaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
826 fprintf(stderr, "Main thread: CPUs %s, Node %i\n",
827 cpu_str(&cpuset), main_numa_node);
828#endif
829#ifdef HAVE_PTHREAD_GETAFFINITY_NP
830 for (int t = 0; t < num_threads; ++t) {
831 struct thr_struct *ts = &threads[t];
832 pthread_getaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
833 fprintf(stderr, "Thread %i: CPUs %s, Node %i\n",
834 ts->t_no, cpu_str(&cpuset), ts->numa_node);
835 }
836#endif
837 if (!several_nodes) {
838 numa_avail = 0;
839#ifdef THREAD_DEBUG
840 fprintf(stderr, "All threads on node %i, disabling NUMA\n",
842#endif
843 }
844#endif /* THREAD_DEBUG */
845#ifdef TBCI_OMP
846 int omp_thr = omp_get_max_threads();
847#ifdef DEBUG_THREAD
848 fprintf(stderr, "Now setting affinity for %i OpenMP threads ...\n", omp_thr);
849#endif
850#pragma omp parallel for private(cpuset, next_cpu) schedule(static,1)
851 for (int t = 0; t < omp_thr; ++t) {
852 int tid = omp_get_thread_num();
853 next_cpu = next_set_bit(0, &saved_cpuset);
854 for (int i = 0; i < t; ++i) {
855 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
856 if (next_cpu == -1)
857 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
858 }
859 CPU_ZERO(&cpuset);
860 CPU_SET(next_cpu, &cpuset);
861 if (add_sibl)
862 add_siblings(next_cpu, &cpuset);
863 sched_setaffinity(0, CPU_SETBYTES, &cpuset);
864#ifdef THREAD_DEBUG
865#pragma omp critical
866 fprintf(stderr, "Setting OMP thread %i to %i CPUs %s\n", tid, CPU_COUNT(&cpuset), cpu_str(&cpuset));
867#endif
868 }
869#endif /* TBCI_OMP */
870}
871
874{
875 int t;
876 tot_cpu_tm = 0;
877 ERRDECL;
878 //fprintf (stderr, "Free threads ...\n");
879 for (t = 0; t < num_threads; ++t) {
880 int err;
881 struct thr_struct *ts = &threads[t];
882 struct job_input job;
883 struct job_output out;
884#ifdef VALGRIND
885 memset(&job, 0, sizeof(job));
886#endif
887 job.t_job = 0;
888 fcntl(ts->t_pipe_from_thread[0], F_SETFL, 0);
889 err = write(ts->t_pipe_to_thread[1], &job, sizeof(job));
890 err = read(ts->t_pipe_from_thread[0], &out, sizeof(out));
891 void *res;
892 TCHK(pthread_join (ts->t_id, &res));
897 long tm = out.t_retval;
898 //long tm = *(long*)res;
899 tot_cpu_tm += tm;
900#ifdef THREAD_STAT
901 fprintf (stderr, " CPU time for thread %i :%7.3f s\n",
902 ts->t_no, (double)(tm)/CLOCKS_PER_SEC);
903#endif
904 }
905 for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
906 //fprintf(stderr, "Callback %p (%p %p %p)\n", c, c->ctor, c->dtor, c->parm);
907 c->dtor(c->parm, num_threads);
908 }
909 clock_t mainclock = clock();
910#ifdef HAVE_CLOCK_GETTIME
912 struct timespec tm;
913 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
914 double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
915 mainclock = (long)(secs*CLOCKS_PER_SEC);
916 }
917#endif
918 tot_cpu_tm += mainclock;
919#ifdef THREAD_STAT
920 fprintf (stderr, " CPU time for main thr :%7.3f s\n",
921 (double)mainclock/CLOCKS_PER_SEC);
922 fprintf (stderr, " CPU for all threads :%7.3f s\n",
923 (double)(tot_cpu_tm)/CLOCKS_PER_SEC);
924 fprintf (stderr, " Poll successes: %li, unexp. succ.: %li, failures: %li, exp. fails: %li\n",
926 fflush (stderr);
927#endif
928 if (num_threads > 0)
929 CSTD__ free (threads);
930 num_threads = 0; threads = 0;
931#if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
932 ismainthread = 1;
933 thrno = 0;
934#endif
935#ifdef HAVE_SCHED_GETAFFINITY
936 if (bound_main) {
937 sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
938 bound_main = false;
939 }
940 threads_bound = false;
941#endif
942}
943
944static unsigned long job_no = 0;
945
946void _thread_start_off (const int thr_no, thr_job_t ljob,
947 const unsigned long off, const unsigned long sz,
948 va_list vl)
949{
950 void * par; unsigned t = 0;
951 struct thr_struct* ts = &threads[thr_no];
952 struct job_input job;
953 ERRDECL;
954#ifdef VALGRIND
955 memset(&job, 0, sizeof(job));
956#endif
957 //va_list (vl);
958 BCHKNR(thr_no >= num_threads, NumErr, Starting thread no outside range, thr_no);
959 threads_busy++;
960 job.t_job = ljob; job.t_size = sz; job.t_off = off; job.t_job_no = job_no++;
961 //va_start (vl, sz);
962 while ((par = va_arg (vl, void*)))
963 job.t_par[t++] = par;
964 //va_end (vl);
965 BCHKNR(t > THREAD_MAX_ARGS, NumErr, Too many arguments to thread_start, t-1);
966#ifdef DEBUG_THREAD
967 fprintf (stderr, "Signal thread %i\n", ts->t_no);
968 fflush (stderr);
969#endif
970 if (write(ts->t_pipe_to_thread[1], &job, sizeof(job)) != sizeof(job)) {
971 fprintf(stderr, "Signaling job %li to thread %i failed",
972 job.t_job_no, ts->t_no);
973 abort();
974 }
975 /* Immediatly reschedule */
976 //sched_yield ();
977 }
978
979void thread_start_off (const int thr_no, thr_job_t job,
980 const unsigned long off, const unsigned long sz, ...)
981{
982 va_list vl;
983 va_start(vl, sz);
984 _thread_start_off(thr_no, job, off, sz, vl);
985 va_end(vl);
986}
987
988void thread_start ( const int thr_no, thr_job_t job,
989 const unsigned long sz, ...)
990{
991 va_list vl;
992 va_start(vl, sz);
993 _thread_start_off(thr_no, job, 0, sz, vl);
994 va_end(vl);
995}
996
997void thread_wait (const int thr_no, struct job_output *out)
998{
999 struct thr_struct *ts = &threads[thr_no];
1000 struct job_output out2;
1001 BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
1002 /* Tell the thread that we wait for completion */
1003#ifdef DEBUG_THREAD
1004 fprintf (stderr, "Wait for thread %i\n", ts->t_no);
1005#endif
1006 if (busy_read(ts->t_pipe_from_thread[0], out? out: &out2, sizeof(out2), POLL_REP2) != sizeof(out2)) {
1007 fprintf(stderr, "Waiting for thread %i failed!\n", ts->t_no);
1008 abort();
1009 }
1010 threads_busy--;
1011#ifdef DEBUG_THREAD
1012 fprintf (stderr, "Thread %i signaled completion.\n", ts->t_no);
1013#endif
1014}
1015
1016
1017double thread_wait_result (const int thr_no)
1018{
1019 struct thr_struct *ts = &threads[thr_no];
1020 struct job_output out2;
1021 int err;
1022 BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
1023#ifdef DEBUG_THREAD
1024 fprintf (stderr, "Wait for result of thread %i\n", ts->t_no);
1025#endif
1026 if (busy_read(ts->t_pipe_from_thread[0], &out2, sizeof(out2), POLL_REP2) != sizeof(out2)) {
1027 fprintf(stderr, "Waiting for thread %i failed!\n", ts->t_no);
1028 abort();
1029 }
1030 threads_busy--;
1031#ifdef DEBUG_THREAD
1032 fprintf (stderr, "Thread %i signaled completion\n", ts->t_no);
1033#endif
1034 return /*(volatile double)*/out2.t_res_d;
1035}
1036
1038{
1039 threads_busy++;
1040#ifdef TBCI_OMP
1041 omp_set_num_threads(0);
1042#endif
1043}
1044
1046{
1047 if (threads_busy)
1048 threads_busy--;
1049 else
1050 fprintf (stderr, "reenable_threads(): Threads already enabled.!\n");
1051#ifdef TBCI_OMP
1052 omp_set_num_threads(num_threads);
1053#endif
1054}
1055
1057
1059
1060#else /* no SMP */
1061/* These are for compatibility ... */
1062
1064pid_t main_thread_pid WEAKA = 0;
1065
1066WEAK(int init_threads (const int c, const bool load))
1067{ main_thread_pid = getpid(); clock (); return 0; }
1068WEAK(void bind_threads(bool, bool, bool)) {}
1069WEAK(void free_threads ()) {}
1070WEAK(void disable_threads ())
1071#ifdef TBCI_OMP
1072{ omp_set_num_threads(0); }
1073#else
1074{}
1075#endif
1076WEAK(void reenable_threads ())
1077#ifdef TBCI_OMP
1078{ omp_set_num_threads(omp_get_num_procs()); }
1079#else
1080{}
1081#endif
1082WEAK(void thread_start (const int tno, thr_job_t job,
1083 const unsigned long sz, ...))
1084{
1085 void * par;
1086 struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
1087 unsigned int t = 0;
1088 va_list (vl);
1089 tc->t_job = job; tc->t_size = sz; tc->t_off = 0;
1090 va_start (vl, sz);
1091 while ((par = va_arg (vl, void*)))
1092 tc->t_par[t++] = par;
1093 va_end (vl);
1094 fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1095# ifdef ABORT_ON_ERR
1096 abort ();
1097# endif
1098 (*job)(tc);
1099}
1100WEAK(void thread_start_off (const int tno, thr_job_t job,
1101 const unsigned long off, const unsigned long sz, ...))
1102{
1103 void * par;
1104 struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
1105 unsigned int t = 0;
1106 va_list (vl);
1107 tc->t_job = job; tc->t_size = sz; tc->t_off = off;
1108 va_start (vl, sz);
1109 while ((par = va_arg (vl, void*)))
1110 tc->t_par[t++] = par;
1111 va_end (vl);
1112 fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1113# ifdef ABORT_ON_ERR
1114 abort ();
1115#endif
1116 (*job)(tc);
1117}
1118WEAK(void thread_wait (const int t)) {}
1119WEAK(void* thread_wait_useful (const int t, useful_job_t j, void* a))
1120{ return 0; }
1121WEAK(double thread_wait_result (const int thr_no))
1122{ return 0; }
1123
1124unsigned int curr_n_thr WEAKA;
1125unsigned int last_n_thr WEAKA;
1126unsigned int prev_n_thr WEAKA;
1127
1129
1130#endif /* SMP */
1131
1135
1136#endif /* _POSIX_THREADS */
const Vector< T > const Vector< T > const Vector< T > & p
Definition LM_fit.h:98
const Vector< T > const Vector< T > const Vector< T > int T T & err
Definition LM_fit.h:102
int i
Definition LM_fit.h:71
#define NULL
Definition basics.h:250
#define num_threads
Definition basics.h:782
#define CSTD__
Definition basics.h:340
#define BCHK(cond, exc, txt, ind, rtval)
Definition basics.h:575
#define WEAKA
Definition basics.h:484
#define ismainthread
Definition basics.h:784
#define NAMESPACE_END
Definition basics.h:323
#define thrno
Definition basics.h:783
#define PREFETCH_R(addr, loc)
In case gcc does not yet support __builtin_prefetch(), we have handcoded assembly with gcc for a few ...
Definition basics.h:748
#define WEAK(x)
Definition basics.h:485
#define NAMESPACE_TBCI
Definition basics.h:317
#define BCHKNR(cond, exc, txt, ind)
Definition basics.h:586
#define UNLIKELY(expr)
Definition basics.h:101
#define THREAD__
Definition basics.h:774
#define MAX(a, b)
Definition basics.h:656
Definition list.h:289
exception base class for the TBCI NumLib
Definition except.h:59
Definition smp.cc:314
void deinit(const int thr)
Definition smp.cc:325
void * parm
Definition smp.cc:318
cback(cbackfn ct, cbackfn dt, void *p)
Definition smp.cc:319
void init(const int thr)
Definition smp.cc:323
cbackfn * ctor
Definition smp.cc:316
cbackfn * dtor
Definition smp.cc:317
cback()
Definition smp.cc:321
F_TSMatrix< T > ts
Definition f_matrix.h:1052
return c
Definition f_matrix.h:760
const unsigned TMatrix< T > const Matrix< T > * a
const unsigned end
const unsigned TMatrix< T > * res
static int detect_num_cpu(int rmv_ht)
Definition smp.cc:267
#define POLL_REP
Definition smp.cc:520
void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
Definition smp.cc:340
#define TCHK(x)
Definition smp.cc:124
THREAD__ struct thr_struct * this_thread
Definition smp.cc:113
#define _cpu_relax()
Definition smp.cc:510
void lina_err(struct thr_ctrl *tc)
Definition smp.cc:350
void thread_start_off(const int thr_no, thr_job_t job, const unsigned long off, const unsigned long sz,...)
Definition smp.cc:979
bool bound_main
Definition smp.cc:109
void thread_wait(const int thr_no, struct job_output *out)
Definition smp.cc:997
static pid_t gettid()
Definition smp.cc:92
unsigned int prev_n_thr
Definition smp.cc:1056
unsigned long poll_efail
Definition smp.cc:517
static List< cback > thread_cbacks
Definition smp.cc:329
void _thread_start_off(const int thr_no, thr_job_t ljob, const unsigned long off, const unsigned long sz, va_list vl)
Definition smp.cc:946
NAMESPACE_TBCI int num_threads
Definition smp.cc:103
void lina_empty(struct thr_ctrl *dummy)
Definition smp.cc:356
#define ERRDECL
Definition smp.cc:125
int init_threads(const int num_cpu, const bool load_magic)
Definition smp.cc:629
int tot_cpu_tm
Definition smp.cc:872
void reenable_threads()
Definition smp.cc:1045
unsigned long poll_succ
Definition smp.cc:514
static int busy_read(int fd, void *ptr, size_t sz, int rep=1)
Definition smp.cc:525
int main_numa_node
Definition smp.cc:110
void disable_threads()
Definition smp.cc:1037
static unsigned long job_no
Definition smp.cc:944
#define MAX_THREADS
Definition smp.cc:83
double thread_wait_result(const int thr_no)
Definition smp.cc:1017
unsigned long poll_fail
Definition smp.cc:516
bool threads_bound
Definition smp.cc:108
#define POLL_REP2
Definition smp.cc:523
void free_threads()
Definition smp.cc:873
unsigned long poll_usucc
Definition smp.cc:515
void bind_threads(bool bind_main, bool enable_numa, bool add_sibl)
Definition smp.cc:727
int threads_busy
Definition smp.cc:104
pid_t main_thread_pid
Definition smp.cc:107
void thread_start(const int thr_no, thr_job_t job, const unsigned long sz,...)
Definition smp.cc:988
void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
Definition smp.cc:333
unsigned int last_n_thr
Definition smp.cc:1056
void * empty_thread(void *dummy)
Definition smp.cc:361
void * lina_thread(void *thr)
Definition smp.cc:551
unsigned int curr_n_thr
Definition smp.cc:1056
struct thr_struct * threads
Definition smp.cc:106
int numa_avail
Definition smp.cc:105
void(* thr_job_t)(struct thr_ctrl *)
Before the double inclusion guard on purpose!
Definition smp.h:126
static void disable_threads()
Definition smp.h:326
static void reenable_threads()
Definition smp.h:332
#define THREAD_MAX_ARGS
Definition smp.h:129
static int init_threads(const int=0, const bool=false)
Definition smp.h:323
void *(* useful_job_t)(void *)
Definition smp.h:127
static void free_threads()
Definition smp.h:325
unsigned long t_size
Definition smp.h:135
unsigned long t_off
Definition smp.h:136
unsigned long t_job_no
Definition smp.h:133
thr_job_t t_job
Definition smp.h:134
void * t_par[6]
Definition smp.h:137
volatile char t_res_dummy[16]
Definition smp.h:145
long t_retval
Definition smp.h:143
unsigned long t_job_output_no
Definition smp.h:142
double t_res_d
Definition smp.h:147
int t_no
Definition smp.h:183
long t_res_l
Definition smp.h:179
unsigned long t_off
Definition smp.h:172
thr_job_t t_job
Definition smp.h:170
unsigned long t_job_no
Definition smp.h:169
unsigned long t_size
Definition smp.h:171
void * t_par[6]
Definition smp.h:173
volatile char t_res_dummy[16]
Definition smp.h:176
unsigned int tbci_control
#define _TBCI_CWD_DEFAULT
Definition tbci_param.h:69