TBCI Numerical high perf. C++ Library  2.8.0
smp.cc
Go to the documentation of this file.
1 
52 #define _GNU_SOURCE 1
53 
54 #define NEED_SMP_DECLS 1
55 #include "tbci/basics.h"
56 #include "tbci/smp.h"
57 #include <stdio.h>
58 #include <time.h>
59 
60 #include <errno.h>
61 #include <fcntl.h>
62 
63 #ifdef _POSIX_THREADS
64 #ifdef SMP
65 
66 //#include <signum.h>
67 #include <stdlib.h>
68 
69 #ifdef HAVE_SYS_SYSINFO_H
70 # include <sys/sysinfo.h>
71 #endif
72 
73 #ifdef HAVE_SCHED_H
74 # include <sched.h>
75 #endif
76 
77 #ifdef HAVE_NUMA_H
78 # include <numa.h>
79 # include <numaif.h>
80 #endif
81 
82 #ifndef MAX_THREADS
83 # define MAX_THREADS 160
84 #endif
85 
86 #include "tbci/list.h"
87 
88 #ifdef __linux__
89 # include <sys/syscall.h>
90 # include <unistd.h>
91 #ifndef HAVE_GETTID
92 inline static pid_t gettid()
93 {
94  return syscall(SYS_gettid);
95 }
96 #endif
97 #else
98 # define gettid getpid
99 #endif
100 
102 
103 int num_threads = 0;
104 int threads_busy = 0;
105 int numa_avail = 0;
106 struct thr_struct *threads = 0;
107 pid_t main_thread_pid = 0;
108 bool threads_bound = false;
109 bool bound_main = false;
112 THREAD__ int thrno = 0;
114 
115 #ifdef HAVE_LIBNUMA
116 unsigned page_size;
117 unsigned long page_mask;
118 #endif
119 
120 #ifdef DEBUG_THREAD
121 # define TCHK(x) if (UNLIKELY(err = x)) fprintf (stderr, #x " failed: %s\n", CSTD__ strerror (err))
122 # define ERRDECL int err = 0
123 #else
124 # define TCHK(x) x
125 # define ERRDECL
126 #endif
127 
128 #ifdef HAVE_SCHED_GETAFFINITY
129 static cpu_set_t saved_cpuset;
130 
131 #ifndef HAVE_CPU_COUNT
132 #undef CPU_COUNT
133 static int CPU_COUNT(const cpu_set_t *cpus)
134 {
135  int cnt = 0;
136  for (int i = 0; i < CPU_SETSIZE; ++i)
137  if (CPU_ISSET(i, cpus))
138  ++cnt;
139  return cnt;
140 }
141 #endif
142 #ifndef CPU_XOR
143 static void CPU_XOR(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
144 {
145  for (int i = 0; i < CPU_SETSIZE; ++i)
146  if (CPU_ISSET(i, src1) ^ CPU_ISSET(i, src2))
147  CPU_SET(i, dest);
148  else
149  CPU_CLR(i, dest);
150 }
151 #endif
152 #ifndef CPU_AND
153 static void CPU_AND(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
154 {
155  for (int i = 0; i < CPU_SETSIZE; ++i)
156  if (CPU_ISSET(i, src1) & CPU_ISSET(i, src2))
157  CPU_SET(i, dest);
158  else
159  CPU_CLR(i, dest);
160 }
161 #endif
162 
163 
164 static int next_set_bit(const int start, const cpu_set_t *cpus)
165 {
166  for (int i = start; i < CPU_SETSIZE; ++i)
167  if (CPU_ISSET(i, cpus))
168  return i;
169  return -1;
170 }
171 
172 static char cpu_buf[1024];
173 static const char* cpu_str(const cpu_set_t *cpus)
174 {
175  static volatile int off = 0;
176  int old_off = off;
177  if (off < 896)
178  off += 128;
179  else
180  off = 0;
181  char* ret = cpu_buf+old_off+1;
182  int ptr = old_off;
183  cpu_buf[ptr+1] = 0;
184  for (int i = 0; i < CPU_SETSIZE && ptr < 1016; ++i)
185  if (CPU_ISSET(i, cpus))
186  ptr += sprintf(cpu_buf+ptr, " %i", i);
187  return ret;
188 }
189 #define CPU_SETBYTES ((CPU_SETSIZE+7)/8)
190 #if defined(__linux__)
191 
192 static void parse_siblings(const int cpu, cpu_set_t *cpus, const int remove)
193 {
194  char fn[80];
195  sprintf(fn, "/sys/devices/system/cpu/cpu%i/topology/thread_siblings", cpu);
196  FILE *f = fopen(fn, "r");
197  if (!f)
198  return;
199  int sibl[(CPU_SETSIZE+31)/32];
200  int nr_ints = 0;
201  bool end = false;
202  /* Linux reports this as field of ints, last int covers first 32 CPUs */
203  while (!end) {
204  int msk;
205  if (fscanf(f, "%x,", &msk) == EOF) {
206  if (fscanf(f, "%x", &msk) == EOF)
207  break;
208  end = true;
209  }
210  sibl[nr_ints++] = msk;
211  }
212  fclose(f);
213  /* Convert to cpu_set_t data structure (machine endian field of longs)
214  * first long covers first 32/64 CPUs */
215  cpu_set_t sibl_set; CPU_ZERO(&sibl_set);
216  long* sibl_ptr = (long*)&sibl_set;
217 #if __BYTE_ORDER == __LITTLE_ENDIAN
218  for (int j = nr_ints-1; j >= 0; --j)
219  *sibl_ptr++ = sibl[j];
220 #else
221  long* sibl_ptr2 = (long*)&sibl;
222  for (int j = (nr_ints*sizeof(int)/sizeof(long))-1; j >= 0; --j)
223  *sibl_ptr++ = sibl_ptr2[j];
224 #endif
225  //fprintf(stderr, "Siblings for cpu%i: %s\n", cpu, cpu_str(&sibl_set));
226  //fprintf(stderr, "CPUS before rmv: %s\n", cpu_str(cpus));
227  if (!CPU_ISSET(cpu, &sibl_set))
228  abort();
229  if (!remove) {
230  memcpy(cpus, &sibl_set, CPU_SETBYTES);
231  return;
232  }
233  CPU_CLR(cpu, &sibl_set);
234  CPU_AND(&sibl_set, &sibl_set, cpus);
235  CPU_XOR(cpus, cpus, &sibl_set);
236  //fprintf(stderr, "CPUS after rmvl: %s\n", cpu_str(cpus));
237 }
238 
239 static void add_siblings(const int cpu, cpu_set_t *cpus)
240 {
241  parse_siblings(cpu, cpus, 0);
242 }
243 
244 static void remove_hyperthreads(cpu_set_t *cpus)
245 {
246 #ifdef THREAD_STAT
247  fprintf(stderr, "CPU set before HT removal: %s\n", cpu_str(cpus));
248 #endif
249  for (int i = CPU_SETSIZE-1; i >=0; --i) {
250  if (!CPU_ISSET(i, cpus))
251  continue;
252  parse_siblings(i, cpus, 1);
253  }
254 #ifdef THREAD_STAT
255  fprintf(stderr, "CPU set after HT removal: %s\n", cpu_str(cpus));
256 #endif
257 }
258 #else
259 static void remove_hyperthreads(cpu_set_t *cpus)
260 {
261 }
262 #endif
263 
264 #endif
265 
266 /* detect the # of cpus for glibc and/or linux systems */
267 static int detect_num_cpu (int rmv_ht)
268 {
269  int cpus = 0;
270 #ifdef HAVE_SCHED_GETAFFINITY
271  pid_t pid = gettid();
272  if (! sched_getaffinity(pid, CPU_SETBYTES, &saved_cpuset)) {
273  if (rmv_ht)
274  remove_hyperthreads(&saved_cpuset);
275  return CPU_COUNT(&saved_cpuset);
276  }
277 #endif
278 #ifdef HAVE_GET_NPROCS // defined(__GLIBC__) && __GLIBC__ >= 2
279  cpus = get_nprocs ();
280  if (cpus <= 0)
281  return 2;
282 #elif defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
283  FILE* cpuinfo;
284  char buf[128];
285  cpuinfo = fopen ("/proc/cpuinfo", "r");
286  if (cpuinfo <= 0)
287  return 2; /* return 2, if we can't find out */
288  while (!feof (cpuinfo)) {
289  fgets (buf, 128, cpuinfo);
290  if (CSTD__ memcmp (buf+1, "rocessor", 8) == 0)
291  cpus++;
292  }
293 #endif
294 #ifdef DEBUG_THREAD
295  fprintf (stderr, "%i CPUs detected.\n", cpus);
296 #endif
297  return (cpus == 0 ? 1: cpus);
298 }
299 
300 #ifdef HAVE_GETLOADAVG
301 static int loadavg ()
302 {
303  double loads[3];
304  int err = getloadavg (loads, 3);
305  if (err == 1)
306  return (int)loads[0];
307  else if (err > 1)
308  return (int)loads[1];
309  else
310  return 0;
311 }
312 #endif
313 
314 class cback {
315  public:
316  cbackfn *ctor;
317  cbackfn *dtor;
318  void *parm;
319  cback(cbackfn ct, cbackfn dt, void *p)
320  : ctor(ct), dtor(dt), parm(p) {};
322  : ctor(NULL), dtor(NULL), parm(NULL) {};
323  void init(const int thr)
324  { ctor(parm, thr); }
325  void deinit(const int thr)
326  { dtor(parm, thr); }
327 };
328 
330 /* Instantiate */
331 template class List<cback>;
332 
333 void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
334 {
335  cback callback(ctor, dtor, parm);
336  thread_cbacks.append(&callback);
337  //fprintf(stderr, "Register callback %p (%p %p %p)\n", &callback, ctor, dtor, parm);
338 }
339 
340 void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
341 {
342  cback callback(ctor, dtor, parm);
343  cback *cbk = thread_cbacks.setcurr(&callback);
344  BCHK(!cbk, NumErr, Deregister unrgistered callback, (long int)ctor, );
345  thread_cbacks.delcurr();
346  //fprintf(stderr, "Deregister callback %p (%p %p %p)\n", &callback, ctor, dtor, parm);
347 }
348 
349 
350 void lina_err (struct thr_ctrl *tc)
351 {
352  fprintf (stderr, " Thread (%i) synchronization problem!\n", tc->t_no);
353  fflush (stderr); abort ();
354 }
355 
356 void lina_empty (struct thr_ctrl *dummy)
357 {
358  /* nothing */
359 }
360 
361 void* empty_thread (void *dummy)
362 {
363  return NULL;
364 }
365 
366 #ifdef HAVE_LIBNUMA
367 
368 static int cpu_to_node(int cpu)
369 {
370  char fn[80];
371  for (int n = 0; n < numa_num_possible_nodes(); ++n) {
372  sprintf(fn, "/sys/devices/system/node/node%i/cpu%i", n, cpu);
373  if (!access(fn, R_OK))
374  return n;
375  }
376  return -1;
377 }
378 
379 int do_numa_init(unsigned int cpu, struct thr_struct *ts, unsigned long stack)
380 {
381  int node = cpu_to_node(cpu);
382  struct bitmask *nodes = numa_allocate_nodemask();
383  //fprintf(stderr, "Thr %i: CPU %i Node %i\n", ts->t_no, cpu, my_node);
384  cpu_set_t cpuset;
385  numa_bitmask_clearall(nodes);
386  numa_bitmask_setbit(nodes, node);
387  numa_set_preferred(node);
388 #ifdef HAVE_SCHED_GETAFFINITY
389  numa_set_membind(nodes);
390 #else
391  numa_bind(nodes);
392 #endif
393  //sched_yield();
394  //numa_set_localalloc();
395  ts->numa_node = node;
396 #ifdef HAVE_SCHED_GETAFFINITY
397  CPU_ZERO(&cpuset);
398  CPU_SET(cpu, &cpuset);
399  parse_siblings(cpu, &cpuset, 0);
400 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
401  pthread_setaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
402 #else
403  sched_setaffinity(ts->t_pid, CPU_SETBYTES, &cpuset);
404 #endif
405 #endif
406  /* Move stack page to my node */
407  if (1) {
408  void *pages[] = {(void*)(stack & page_mask)};
409  const int nodes[] = {node};
410  int status = node, oldstat;
411  long err = 1;
412  numa_move_pages(0, 1, pages,
413  NULL, &status, MPOL_MF_MOVE);
414  oldstat = status;
415  if (oldstat != node)
416  err = numa_move_pages(0, 1, pages,
417  nodes, &status, MPOL_MF_MOVE);
418 #ifdef THREAD_DEBUG
419  fprintf(stderr, "Numa thr %i(tid %p, lwp %i): CPUs %s Node %i (Stack @%p: %i->%i(%li)\n",
420  ts->t_no, (void*)ts->t_id, ts->t_pid,
421  cpu_str(&cpuset), node, pages[0], oldstat,
422  status, err);
423 #endif
424  }
425  numa_free_nodemask(nodes);
426  return node;
427 }
428 
429 void numa_init_job(struct thr_ctrl *tc)
430 {
431  do_numa_init(tc->t_size, (struct thr_struct*)tc->t_par[0], (unsigned long)tc);
432 }
433 
435 #define PG_PC 512
436 int do_numa_move_pages(int node, int fault_in, unsigned long firstaddr, unsigned long lastaddr)
437 {
438  void* pages[PG_PC];
439  int nodes[PG_PC], status[PG_PC];
440  int nr_pages = 0; int nr_moved = 0;
441  unsigned long orig_addr = firstaddr;
442 #ifdef VALGRIND
443  memset(status, 0, PG_PC*sizeof(int));
444 #endif
445  if (firstaddr - (firstaddr & page_mask) >= page_size/2)
446  firstaddr = firstaddr & page_mask;
447  else
448  firstaddr = (firstaddr + (page_size - 1)) & page_mask;
449  if (lastaddr - (lastaddr & page_mask) > page_size/2)
450  lastaddr = lastaddr & page_mask;
451  else
452  lastaddr = (lastaddr + (page_size - 1)) & page_mask;
453 #if 0
454  fprintf(stderr, "numa_move_pages(%p -- %p to node %i: %li pg %i calls)\n",
455  (void*)firstaddr, (void*)lastaddr, node,
456  (lastaddr-firstaddr)/page_size,
457  (lastaddr-firstaddr)/page_size/PG_PC);
458 #endif
459  while (lastaddr > firstaddr) {
460  int i;
461  for (i = 0; (i < PG_PC) && (firstaddr < lastaddr); ++i, firstaddr += page_size) {
462  pages[i] = (void*)firstaddr;
463  nodes[i] = node;
464  /* Note: Pages might be not allocated yet and won't be ... */
465  if (fault_in) {
466  if (UNLIKELY(firstaddr < orig_addr))
467  *(volatile int*)orig_addr = *(volatile int*)orig_addr;
468  else
469  *(volatile int*)firstaddr = *(volatile int*)firstaddr;
470  }
471  }
472  nr_pages += i;
473  long err = numa_move_pages(0, i, pages,
474  NULL, status, MPOL_MF_MOVE);
475  if (err)
476  fprintf(stderr, "numa_move_pages(%p -- %p to node %i): %s\n",
477  pages[0], pages[i-1], node, strerror(errno));
478  int to_move = 0; err = 0;
479  for (int j = 0; j < i; ++j)
480  if (status[j] != node)
481  ++to_move;
482 #if 0
483  fprintf(stderr, "numa_move_pages(%p -- %p: %i pages, move %i to node %i (from %i and others)\n",
484  pages[0], pages[i-1], i, to_move, node, status[0]);
485 #endif
486  nr_moved += to_move;
487  if (to_move)
488  err = numa_move_pages(0, i, pages,
489  nodes, status, MPOL_MF_MOVE);
490  if (err)
491  fprintf(stderr, "numa_move_pages(%p -- %p to node %i): %s\n",
492  pages[0], pages[i-1], node, strerror(errno));
493  }
494  return nr_moved;
495 }
496 
497 void numa_move_pages_job(struct thr_ctrl *tc)
498 {
499  tc->t_res_l =
500  do_numa_move_pages(tc->t_size, tc->t_off,
501  (unsigned long)tc->t_par[0],
502  (unsigned long)tc->t_par[1]);
503 }
504 
505 #endif
506 
507 #if defined(__i386__) || defined(__x86_64__)
508 # define _cpu_relax() asm ("rep; nop")
509 #else
510 # define _cpu_relax() do {} while(0)
511 #endif
512 
513 /* Sidenote: We do no locking, approx. numbers are OK here */
514 unsigned long poll_succ = 0;
515 unsigned long poll_usucc = 0;
516 unsigned long poll_fail = 0;
517 unsigned long poll_efail = 0;
518 
519 #ifndef POLL_REP
520 # define POLL_REP 1
521 #endif
522 #ifndef POLL_REP2
523 # define POLL_REP2 168
524 #endif
525 static int busy_read(int fd, void* ptr, size_t sz, int rep=POLL_REP)
526 {
527  size_t rd = 0;
528  int w = rep;
529  while (w-- && (signed)(rd = read(fd, ptr, sz)) < (signed)sz)
530  _cpu_relax();
531  if (sz == rd) {
532  if (rep == POLL_REP)
533  ++poll_usucc;
534  else
535  ++poll_succ;
536  return rd;
537  }
538  // slow path
539  if (rep == POLL_REP)
540  ++poll_efail;
541  else
542  ++poll_fail;
543  fcntl(fd, F_SETFL, 0);
544  _cpu_relax();
545  rd = read(fd, ptr, sz);
546  PREFETCH_R(ptr, 2);
547  fcntl(fd, F_SETFL, O_NONBLOCK);
548  return rd;
549 }
550 
551 void* lina_thread (void* thr)
552 {
553  struct thr_struct *ts = (struct thr_struct*) thr;
554  struct thr_ctrl tc;
555  struct job_output out;
556  int err;
557 #if defined(HAVE_TLS) || defined(HAVE_DTLS)
558  ismainthread = 0;
559  thrno = ts->t_no+1;
560  this_thread = ts;
561 #endif
562 #ifdef VALGRIND
563  memset(&tc, 0, sizeof(tc));
564  memset(&out, 0, sizeof(out));
565 #endif
566 #ifdef DEBUG_THREAD
567  fprintf (stderr, " Thread %i: Try to get setup lock\n", tc.t_no);
568 #endif
569  ts->t_pid = gettid (); clock ();
570  tc.t_no = ts->t_no;
571 #ifdef DEBUG_THREAD
572  fprintf (stderr, " Thread %i setup: id %08lx, lwp %i; signal main thread ...\n",
573  ts->t_no, ts->t_id, ts->t_pid);
574 #endif
575  /* Now wait for some work to do. Finish when a NULL job becomes runnable */
576  while (1) {
577  /* Set job to error! */
578  tc.t_job = lina_err;
579  //memset(tc.t_res_dummy, 0, sizeof(tc.t_res_dummy));
580  /* Ready to accept a job: Wait for it */
581  if (UNLIKELY((err = busy_read(ts->t_pipe_to_thread[0], &tc, sizeof(struct job_input))) != sizeof(struct job_input))) {
582  fprintf(stderr, "Thread %i: Reading failed! %i\n", tc.t_no, err);
583  abort();
584  }
585 #ifdef DEBUG_THREAD
586  fprintf (stderr, " Thread %i: Start job %li @ %p\n", ts->t_no, tc.t_job_no, tc.t_job);
587  fflush (stderr);
588 #endif
589  ts->t_done_var++;
590  out.t_job_output_no = tc.t_job_no;
591  if (!tc.t_job)
592  break; /* The end */
593  else
594  (*tc.t_job) (&tc);
595 #ifdef DEBUG_THREAD
596  fprintf (stderr, " Thread %i: Job done!\n", ts->t_no);
597  fflush (stderr);
598 #endif
599  memcpy((void*)out.t_res_dummy, (const void*)tc.t_res_dummy, sizeof(out.t_res_dummy));
600  /* Do we need some memory barrier here? But we are cache-coherent, no? */
601  if (UNLIKELY((err = write(ts->t_pipe_from_thread[1], &out, sizeof(struct job_output))) != sizeof(struct job_output))) {
602  fprintf(stderr, "Thread %i: Writing failed! %i\n", ts->t_no, err);
603  abort();
604  }
605 #ifdef DEBUG_THREAD
606  fprintf (stderr, " Thread %i: Signaled Job done!\n", ts->t_no);
607  fflush (stderr);
608 #endif
609  }
610 #ifdef DEBUG_THREAD
611  fprintf (stderr, " Thread %i: exit!\n", ts->t_no);
612 #endif
613  out.t_retval = clock ();
614  ts->t_done_var++;
615 
616 #if defined(HAVE_CLOCK_GETTIME) && defined(THREAD_STAT)
617  struct timespec tm;
618  if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
619  const double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
620  out.t_retval = (long)(secs * CLOCKS_PER_SEC);
621  }
622 #endif
623  err = write(ts->t_pipe_from_thread[1], &out, sizeof(struct job_output));
624  pthread_exit(&out.t_retval);
625  return NULL;
626 }
627 
628 
629 int init_threads (const int num_cpu, const bool load_magic)
630 {
631  int t, err, det_cpus, all_cpus;
632  pthread_mutexattr_t mutattr;
633 
634  main_thread_pid = getpid ();
635  /* No of CPUs */
636  all_cpus = detect_num_cpu(0);
637  det_cpus = detect_num_cpu(1);
638  /* How many ? */
639  if (num_cpu <= 0) {
640  num_threads = det_cpus;
641 #ifdef HAVE_GETLOADAVG
642  if (load_magic) {
643  /* Subtract load */
644  int lavg = loadavg ();
645  /* If load exceeds no of CPUs, use 2 CPUs */
646  if (all_cpus - lavg < num_threads) {
647  num_threads = MAX(1, all_cpus - lavg);
648  if (num_threads < 2 && det_cpus > 2)
649  num_threads = 2;
650  }
651  }
652 #endif
653  if (num_cpu && num_threads > -num_cpu)
654  num_threads = -num_cpu;
655  } else
656  num_threads = num_cpu;
657  /* Max is MAX_THREADS */
658  if (num_threads > MAX_THREADS)
660  /* No threads? */
661  if (num_threads == 1)
662  num_threads = 0;
663  /* Sanity check */
664  if (num_threads > det_cpus)
665  fprintf(stderr, "Warning: Number of threads %i larger than no of CPU cores %i!\n",
666  num_threads, det_cpus);
667 #ifdef HAVE_SCHED_GETAFFINITY
668  else
669  sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
670 #endif
671 #ifdef TBCI_OMP
672  omp_set_num_threads(num_threads);
673 #endif
674  /* Alloc mem for control structs */
675  if (num_threads >= 1)
676  threads = (struct thr_struct *) memalign(128, sizeof(struct thr_struct) * (num_threads));
677  //threads = (struct thr_ctrl *) CSTD__ malloc (sizeof(struct thr_ctrl) * (num_threads));
678  else
679  threads = NULL;
680  if (threads)
681  CSTD__ memset (threads, 0, sizeof(struct thr_struct) * (num_threads));
682  /* Some Un*xes only start the clock after the first call to clock() */
683  clock ();
684 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
685  // If we don't have thread local storage (and no locking),
686  // we need to disable // concurrent memory caching allocations
687  // now globally :-(
688  if (num_threads >= 1) {
689  ismainthread = 0;
690  thrno = -1;
691  }
692 #endif
693  /* Now init threads */
694  for (t = 0; t < num_threads; ++t) {
695  int err;
696  struct thr_struct *ts = &threads[t];
697  //fprintf(stderr, "Debug: thread %i @ %p\n", t, tc);
698  ts->t_no = t;
699  /* Create pipes for thread sync */
700  err = pipe(ts->t_pipe_to_thread);
701  if (err) {
702  fprintf(stderr, "Failed to create pipe for thread %i\n", t);
703  abort();
704  }
705  err = pipe(ts->t_pipe_from_thread);
706  if (err) {
707  fprintf(stderr, "Failed to create pipe for thread %i\n", t);
708  abort();
709  }
710  fcntl(ts->t_pipe_to_thread[0], F_SETFL, O_NONBLOCK);
711  fcntl(ts->t_pipe_from_thread[0], F_SETFL, O_NONBLOCK);
712  TCHK(pthread_create (&ts->t_id, NULL, lina_thread, ts));
713  }
714 #ifdef DEBUG_THREAD
715  fprintf (stderr, "%i threads @%p (sz=%zi) set up.\n",
716  num_threads, threads, sizeof(struct thr_struct));
717 #endif
718  for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
719  //fprintf(stderr, "Callback %p (%p %p %p)\n", c, c->ctor, c->dtor, c->parm);
720  c->ctor(c->parm, num_threads);
721  }
722  //fprintf(stderr, "%li Callbacks registered\n", thread_cbacks.size());
723 
724  return num_threads;
725 }
726 
727 void bind_threads (bool bind_main, bool enable_numa, bool add_sibl)
728 {
729 #ifdef HAVE_SCHED_GETAFFINITY
730  /* Save CPU affinity mask */
731  CPU_ZERO(&saved_cpuset);
732  sched_getaffinity (main_thread_pid, CPU_SETBYTES, &saved_cpuset);
733  if (num_threads > CPU_COUNT(&saved_cpuset))
734  fprintf(stderr, "TBCI SMP: More threads(%i) than allowed CPUs(%i)!\n",
735  num_threads, CPU_COUNT(&saved_cpuset));
736  cpu_set_t cpuset; CPU_ZERO(&cpuset);
737 
738  /* TODO: Be clever to which CPUs to bind to, e.g. avoid hyperthreads */
739  int next_cpu = next_set_bit(0, &saved_cpuset);
740  if (next_cpu < 0)
741  abort();
742  bound_main = bind_main;
743 #ifdef HAVE_LIBNUMA
744  numa_avail = (numa_available() == 0);
745  if (numa_max_node() == 0)
746  numa_avail = 0;
747  if (numa_avail) {
748  page_size = numa_pagesize();
749  page_mask = ~(page_size-1ULL);
750  }
751  if (!enable_numa)
752  numa_avail = 0;
753  fprintf(stderr, "NUMA: avail %i, maxnode %i\n", numa_avail, numa_max_node());
754 #endif /* HAVE_LIBNUMA */
755  if (bind_main) {
756  CPU_SET(next_cpu, &cpuset);
757  if (add_sibl)
758  add_siblings(next_cpu, &cpuset);
759 #ifdef HAVE_LIBNUMA
760  if (numa_avail) {
761  struct thr_struct ts;
762  ts.t_no = -1; ts.t_pid = main_thread_pid;
763  ts.t_id = pthread_self();
764  main_numa_node = do_numa_init(next_cpu, &ts, (unsigned long)&ts);
765  } else
766 #endif /* HAVE_LIBNUMA */
767 #ifdef HAVE_SCHED_GETAFFINITY
768  sched_setaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
769 #else
770  do {} while(0);
771 #endif
772 #ifdef THREAD_DEBUG
773  fprintf(stderr, "Set Main Thread %i: CPU %i (%s)\n", main_thread_pid,
774  next_cpu, cpu_str(&cpuset));
775 #endif
776 #ifdef DEBUG_THREAD
777  sched_getaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
778  fprintf(stderr, "Get Main Thread %i: CPU %i (%s)\n", main_thread_pid,
779  next_cpu, cpu_str(&cpuset));
780 #endif
781  CPU_ZERO(&cpuset);
782  }
783 #endif /* HAVE_SCHED_GETAFFINITY */
784  threads_bound = true;
785  int several_nodes = 0;
786  for (int t = 0; t < num_threads; ++t) {
787  struct thr_struct *ts = &threads[t];
788  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
789  if (next_cpu == -1)
790  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
791  CPU_SET(next_cpu, &cpuset);
792  if (add_sibl)
793  add_siblings(next_cpu, &cpuset);
794 #ifdef HAVE_LIBNUMA
795  if (numa_avail && cpu_to_node(next_cpu) != main_numa_node)
796  ++several_nodes;
797  if (numa_avail) {
798  thread_start(t, numa_init_job, next_cpu,
799  (void*)ts, (void*)0);
800  } else
801 #endif
802 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
803  pthread_setaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
804 #else
805  do {} while(0);
806 #endif
807 #ifdef THREAD_DEBUG
808  fprintf(stderr, "Set Thread %i: CPUs %s\n", t,
809  cpu_str(&cpuset));
810 #endif
811 #ifdef DEBUG_THREAD
812  pthread_getaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
813  fprintf(stderr, "Get Thread %i: CPUs %s\n", t,
814  cpu_str(&cpuset));
815 #endif
816  CPU_ZERO(&cpuset);
817  }
818 #ifdef HAVE_LIBNUMA
819  if (numa_avail)
820  for (int t = 0; t < num_threads; ++t)
821  thread_wait(t);
822 #endif
823 #ifdef THREAD_DEBUG
824 #ifdef HAVE_SCHED_GETAFFINITY
825  sched_getaffinity(main_thread_pid, CPU_SETBYTES, &cpuset);
826  fprintf(stderr, "Main thread: CPUs %s, Node %i\n",
827  cpu_str(&cpuset), main_numa_node);
828 #endif
829 #ifdef HAVE_PTHREAD_GETAFFINITY_NP
830  for (int t = 0; t < num_threads; ++t) {
831  struct thr_struct *ts = &threads[t];
832  pthread_getaffinity_np(ts->t_id, CPU_SETBYTES, &cpuset);
833  fprintf(stderr, "Thread %i: CPUs %s, Node %i\n",
834  ts->t_no, cpu_str(&cpuset), ts->numa_node);
835  }
836 #endif
837  if (!several_nodes) {
838  numa_avail = 0;
839 #ifdef THREAD_DEBUG
840  fprintf(stderr, "All threads on node %i, disabling NUMA\n",
842 #endif
843  }
844 #endif /* THREAD_DEBUG */
845 #ifdef TBCI_OMP
846  int omp_thr = omp_get_max_threads();
847 #ifdef DEBUG_THREAD
848  fprintf(stderr, "Now setting affinity for %i OpenMP threads ...\n", omp_thr);
849 #endif
850 #pragma omp parallel for private(cpuset, next_cpu) schedule(static,1)
851  for (int t = 0; t < omp_thr; ++t) {
852  int tid = omp_get_thread_num();
853  next_cpu = next_set_bit(0, &saved_cpuset);
854  for (int i = 0; i < t; ++i) {
855  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
856  if (next_cpu == -1)
857  next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
858  }
859  CPU_ZERO(&cpuset);
860  CPU_SET(next_cpu, &cpuset);
861  if (add_sibl)
862  add_siblings(next_cpu, &cpuset);
863  sched_setaffinity(0, CPU_SETBYTES, &cpuset);
864 #ifdef THREAD_DEBUG
865 #pragma omp critical
866  fprintf(stderr, "Setting OMP thread %i to %i CPUs %s\n", tid, CPU_COUNT(&cpuset), cpu_str(&cpuset));
867 #endif
868  }
869 #endif /* TBCI_OMP */
870 }
871 
874 {
875  int t;
876  tot_cpu_tm = 0;
877  ERRDECL;
878  //fprintf (stderr, "Free threads ...\n");
879  for (t = 0; t < num_threads; ++t) {
880  int err;
881  struct thr_struct *ts = &threads[t];
882  struct job_input job;
883  struct job_output out;
884 #ifdef VALGRIND
885  memset(&job, 0, sizeof(job));
886 #endif
887  job.t_job = 0;
888  fcntl(ts->t_pipe_from_thread[0], F_SETFL, 0);
889  err = write(ts->t_pipe_to_thread[1], &job, sizeof(job));
890  err = read(ts->t_pipe_from_thread[0], &out, sizeof(out));
891  void *res;
892  TCHK(pthread_join (ts->t_id, &res));
897  long tm = out.t_retval;
898  //long tm = *(long*)res;
899  tot_cpu_tm += tm;
900 #ifdef THREAD_STAT
901  fprintf (stderr, " CPU time for thread %i :%7.3f s\n",
902  ts->t_no, (double)(tm)/CLOCKS_PER_SEC);
903 #endif
904  }
905  for (cback *c = thread_cbacks.getfirst(); c != NULL; c = thread_cbacks.getnext()) {
906  //fprintf(stderr, "Callback %p (%p %p %p)\n", c, c->ctor, c->dtor, c->parm);
907  c->dtor(c->parm, num_threads);
908  }
909  clock_t mainclock = clock();
910 #ifdef HAVE_CLOCK_GETTIME
911  struct timespec tm;
913  if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
914  double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
915  mainclock = (long)(secs*CLOCKS_PER_SEC);
916  }
917 #endif
918  tot_cpu_tm += mainclock;
919 #ifdef THREAD_STAT
920  fprintf (stderr, " CPU time for main thr :%7.3f s\n",
921  (double)mainclock/CLOCKS_PER_SEC);
922  fprintf (stderr, " CPU for all threads :%7.3f s\n",
923  (double)(tot_cpu_tm)/CLOCKS_PER_SEC);
924  fprintf (stderr, " Poll successes: %li, unexp. succ.: %li, failures: %li, exp. fails: %li\n",
925  poll_succ, poll_usucc, poll_fail, poll_efail);
926  fflush (stderr);
927 #endif
928  if (num_threads > 0)
929  CSTD__ free (threads);
930  num_threads = 0; threads = 0;
931 #if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
932  ismainthread = 1;
933  thrno = 0;
934 #endif
935 #ifdef HAVE_SCHED_GETAFFINITY
936  if (bound_main) {
937  sched_setaffinity(main_thread_pid, CPU_SETBYTES, &saved_cpuset);
938  bound_main = false;
939  }
940  threads_bound = false;
941 #endif
942 }
943 
944 static unsigned long job_no = 0;
945 
946 void _thread_start_off (const int thr_no, thr_job_t ljob,
947  const unsigned long off, const unsigned long sz,
948  va_list vl)
949 {
950  void * par; unsigned t = 0;
951  struct thr_struct* ts = &threads[thr_no];
952  struct job_input job;
953  ERRDECL;
954 #ifdef VALGRIND
955  memset(&job, 0, sizeof(job));
956 #endif
957  //va_list (vl);
958  BCHKNR(thr_no >= num_threads, NumErr, Starting thread no outside range, thr_no);
959  threads_busy++;
960  job.t_job = ljob; job.t_size = sz; job.t_off = off; job.t_job_no = job_no++;
961  //va_start (vl, sz);
962  while ((par = va_arg (vl, void*)))
963  job.t_par[t++] = par;
964  //va_end (vl);
965  BCHKNR(t > THREAD_MAX_ARGS, NumErr, Too many arguments to thread_start, t-1);
966 #ifdef DEBUG_THREAD
967  fprintf (stderr, "Signal thread %i\n", ts->t_no);
968  fflush (stderr);
969 #endif
970  if (write(ts->t_pipe_to_thread[1], &job, sizeof(job)) != sizeof(job)) {
971  fprintf(stderr, "Signaling job %li to thread %i failed",
972  job.t_job_no, ts->t_no);
973  abort();
974  }
975  /* Immediatly reschedule */
976  //sched_yield ();
977  }
978 
979 void thread_start_off (const int thr_no, thr_job_t job,
980  const unsigned long off, const unsigned long sz, ...)
981 {
982  va_list vl;
983  va_start(vl, sz);
984  _thread_start_off(thr_no, job, off, sz, vl);
985  va_end(vl);
986 }
987 
988 void thread_start ( const int thr_no, thr_job_t job,
989  const unsigned long sz, ...)
990 {
991  va_list vl;
992  va_start(vl, sz);
993  _thread_start_off(thr_no, job, 0, sz, vl);
994  va_end(vl);
995 }
996 
997 void thread_wait (const int thr_no, struct job_output *out)
998 {
999  struct thr_struct *ts = &threads[thr_no];
1000  struct job_output out2;
1001  BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
1002  /* Tell the thread that we wait for completion */
1003 #ifdef DEBUG_THREAD
1004  fprintf (stderr, "Wait for thread %i\n", ts->t_no);
1005 #endif
1006  if (busy_read(ts->t_pipe_from_thread[0], out? out: &out2, sizeof(out2), POLL_REP2) != sizeof(out2)) {
1007  fprintf(stderr, "Waiting for thread %i failed!\n", ts->t_no);
1008  abort();
1009  }
1010  threads_busy--;
1011 #ifdef DEBUG_THREAD
1012  fprintf (stderr, "Thread %i signaled completion.\n", ts->t_no);
1013 #endif
1014 }
1015 
1016 
1017 double thread_wait_result (const int thr_no)
1018 {
1019  struct thr_struct *ts = &threads[thr_no];
1020  struct job_output out2;
1021  int err;
1022  BCHKNR (thr_no >= num_threads, NumErr, Wait for non-existing thread, thr_no);
1023 #ifdef DEBUG_THREAD
1024  fprintf (stderr, "Wait for result of thread %i\n", ts->t_no);
1025 #endif
1026  if (busy_read(ts->t_pipe_from_thread[0], &out2, sizeof(out2), POLL_REP2) != sizeof(out2)) {
1027  fprintf(stderr, "Waiting for thread %i failed!\n", ts->t_no);
1028  abort();
1029  }
1030  threads_busy--;
1031 #ifdef DEBUG_THREAD
1032  fprintf (stderr, "Thread %i signaled completion\n", ts->t_no);
1033 #endif
1034  return /*(volatile double)*/out2.t_res_d;
1035 }
1036 
1038 {
1039  threads_busy++;
1040 #ifdef TBCI_OMP
1041  omp_set_num_threads(0);
1042 #endif
1043 }
1044 
1046 {
1047  if (threads_busy)
1048  threads_busy--;
1049  else
1050  fprintf (stderr, "reenable_threads(): Threads already enabled.!\n");
1051 #ifdef TBCI_OMP
1052  omp_set_num_threads(num_threads);
1053 #endif
1054 }
1055 
1057 
1059 
1060 #else /* no SMP */
1061 /* These are for compatibility ... */
1062 
1064 pid_t main_thread_pid WEAKA = 0;
1065 
1066 WEAK(int init_threads (const int c, const bool load))
1067 { main_thread_pid = getpid(); clock (); return 0; }
1068 WEAK(void bind_threads(bool, bool, bool)) {}
1069 WEAK(void free_threads ()) {}
1070 WEAK(void disable_threads ())
1071 #ifdef TBCI_OMP
1072 { omp_set_num_threads(0); }
1073 #else
1074 {}
1075 #endif
1076 WEAK(void reenable_threads ())
1077 #ifdef TBCI_OMP
1078 { omp_set_num_threads(omp_get_num_procs()); }
1079 #else
1080 {}
1081 #endif
1082 WEAK(void thread_start (const int tno, thr_job_t job,
1083  const unsigned long sz, ...))
1084 {
1085  void * par;
1086  struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
1087  unsigned int t = 0;
1088  va_list (vl);
1089  tc->t_job = job; tc->t_size = sz; tc->t_off = 0;
1090  va_start (vl, sz);
1091  while ((par = va_arg (vl, void*)))
1092  tc->t_par[t++] = par;
1093  va_end (vl);
1094  fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1095 # ifdef ABORT_ON_ERR
1096  abort ();
1097 # endif
1098  (*job)(tc);
1099 }
1100 WEAK(void thread_start_off (const int tno, thr_job_t job,
1101  const unsigned long off, const unsigned long sz, ...))
1102 {
1103  void * par;
1104  struct thr_ctrl thrc; struct thr_ctrl* tc = &thrc;
1105  unsigned int t = 0;
1106  va_list (vl);
1107  tc->t_job = job; tc->t_size = sz; tc->t_off = off;
1108  va_start (vl, sz);
1109  while ((par = va_arg (vl, void*)))
1110  tc->t_par[t++] = par;
1111  va_end (vl);
1112  fprintf (stderr, "Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1113 # ifdef ABORT_ON_ERR
1114  abort ();
1115 #endif
1116  (*job)(tc);
1117 }
1118 WEAK(void thread_wait (const int t)) {}
1119 WEAK(void* thread_wait_useful (const int t, useful_job_t j, void* a))
1120 { return 0; }
1121 WEAK(double thread_wait_result (const int thr_no))
1122 { return 0; }
1123 
1124 unsigned int curr_n_thr WEAKA;
1125 unsigned int last_n_thr WEAKA;
1126 unsigned int prev_n_thr WEAKA;
1127 
1129 
1130 #endif /* SMP */
1131 
1133 unsigned int tbci_control WEAKA = _TBCI_CWD_DEFAULT;
1135 
1136 #endif /* _POSIX_THREADS */
void * t_par[6]
Definition: smp.h:173
#define BCHKNR(cond, exc, txt, ind)
Definition: basics.h:586
int t_no
Definition: smp.h:158
void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
Definition: smp.cc:340
#define _TBCI_CWD_DEFAULT
Definition: tbci_param.h:69
volatile char t_res_dummy[16]
Definition: smp.h:176
long t_res_l
Definition: smp.h:179
unsigned int tbci_control
const Vector< T > const Vector< T > const Vector< T > & p
Definition: LM_fit.h:97
#define MAX_THREADS
Definition: smp.cc:83
void * parm
Definition: smp.cc:318
void _thread_start_off(const int thr_no, thr_job_t ljob, const unsigned long off, const unsigned long sz, va_list vl)
Definition: smp.cc:946
void deinit(const int thr)
Definition: smp.cc:325
bool threads_bound
Definition: smp.cc:108
int numa_avail
Definition: smp.cc:105
cbackfn * dtor
Definition: smp.cc:317
const unsigned end
unsigned long poll_efail
Definition: smp.cc:517
unsigned long t_job_output_no
Definition: smp.h:142
return c
Definition: f_matrix.h:760
#define POLL_REP
Definition: smp.cc:520
void lina_empty(struct thr_ctrl *dummy)
Definition: smp.cc:356
#define NAMESPACE_TBCI
Definition: basics.h:317
int t_pipe_from_thread[2]
Definition: smp.h:161
void thread_start(const int thr_no, thr_job_t job, const unsigned long sz,...)
Definition: smp.cc:988
unsigned long poll_succ
Definition: smp.cc:514
void reenable_threads()
Definition: smp.cc:1045
void init(const int thr)
Definition: smp.cc:323
exception base class for the TBCI NumLib
Definition: except.h:58
bool bound_main
Definition: smp.cc:109
void * t_par[6]
Definition: smp.h:137
int threads_busy
Definition: smp.cc:104
cbackfn * ctor
Definition: smp.cc:316
static List< cback > thread_cbacks
Definition: smp.cc:329
unsigned int curr_n_thr
Definition: smp.cc:1056
#define BCHK(cond, exc, txt, ind, rtval)
Definition: basics.h:575
#define NULL
Definition: basics.h:250
cback(cbackfn ct, cbackfn dt, void *p)
Definition: smp.cc:319
NAMESPACE_END NAMESPACE_TBCI unsigned int tbci_control WEAKA
Definition: smp.cc:1133
void bind_threads(bool bind_main, bool enable_numa, bool add_sibl)
Definition: smp.cc:727
#define WEAK(x)
Definition: basics.h:485
#define UNLIKELY(expr)
Definition: basics.h:101
T * getfirst() const
Definition: list.h:617
double t_res_d
Definition: smp.h:147
Definition: smp.h:168
#define TCHK(x)
Definition: smp.cc:124
void(* thr_job_t)(struct thr_ctrl *)
Before the double inclusion guard on purpose!
Definition: smp.h:126
NAMESPACE_TBCI int num_threads
Definition: smp.cc:103
Definition: smp.h:132
unsigned long t_job_no
Definition: smp.h:169
unsigned long t_off
Definition: smp.h:136
unsigned long t_job_no
Definition: smp.h:133
void * lina_thread(void *thr)
Definition: smp.cc:551
Definition: list.h:59
struct thr_struct * threads
Definition: smp.cc:106
#define CSTD__
Definition: basics.h:340
unsigned long t_size
Definition: smp.h:171
#define PREFETCH_R(addr, loc)
In case gcc does not yet support __builtin_prefetch(), we have handcoded assembly with gcc for a few ...
Definition: basics.h:748
long t_retval
Definition: smp.h:143
static int detect_num_cpu(int rmv_ht)
Definition: smp.cc:267
unsigned int last_n_thr
Definition: smp.cc:1056
F_TSMatrix< T > ts
Definition: f_matrix.h:1052
#define _cpu_relax()
Definition: smp.cc:510
THREAD__ int thrno
Definition: smp.cc:112
void * empty_thread(void *dummy)
Definition: smp.cc:361
Definition: smp.cc:314
#define POLL_REP2
Definition: smp.cc:523
T * getnext() const
Definition: list.h:629
#define THREAD__
Definition: basics.h:774
void disable_threads()
Definition: smp.cc:1037
void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
Definition: smp.cc:333
int tot_cpu_tm
Definition: smp.cc:872
void free_threads()
Definition: smp.cc:873
volatile char t_res_dummy[16]
Definition: smp.h:145
int numa_node
Definition: smp.h:163
static int busy_read(int fd, void *ptr, size_t sz, int rep=1)
Definition: smp.cc:525
const Vector< T > const Vector< T > const Vector< T > int T T & err
Definition: LM_fit.h:102
#define THREAD_MAX_ARGS
Definition: smp.h:129
THREAD__ struct thr_struct * this_thread
Definition: smp.cc:113
pthread_t t_id
Definition: smp.h:160
cback()
Definition: smp.cc:321
int i
Definition: LM_fit.h:71
#define ERRDECL
Definition: smp.cc:125
void *(* useful_job_t)(void *)
Definition: smp.h:127
unsigned long t_off
Definition: smp.h:172
void thread_wait(const int thr_no, struct job_output *out)
Definition: smp.cc:997
double thread_wait_result(const int thr_no)
Definition: smp.cc:1017
#define MAX(a, b)
Definition: basics.h:656
#define NAMESPACE_END
Definition: basics.h:323
unsigned long poll_fail
Definition: smp.cc:516
THREAD__ int ismainthread
Definition: smp.cc:111
unsigned long poll_usucc
Definition: smp.cc:515
const Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > Vector< T > long int res
Definition: LM_fit.h:199
T * append()
Definition: list.h:436
unsigned int prev_n_thr
Definition: smp.cc:1056
int main_numa_node
Definition: smp.cc:110
int init_threads(const int num_cpu, const bool load_magic)
Definition: smp.cc:629
int t_no
Definition: smp.h:183
static unsigned long job_no
Definition: smp.cc:944
int t_pipe_to_thread[2]
Definition: smp.h:161
const unsigned TMatrix< T > const Matrix< T > * a
thr_job_t t_job
Definition: smp.h:170
T * setcurr(const T *rec) const
Definition: list.h:354
void thread_start_off(const int thr_no, thr_job_t job, const unsigned long off, const unsigned long sz,...)
Definition: smp.cc:979
unsigned int t_done_var
Definition: smp.h:162
static pid_t gettid()
Definition: smp.cc:92
void lina_err(struct thr_ctrl *tc)
Definition: smp.cc:350
T * delcurr()
Definition: list.h:475
thr_job_t t_job
Definition: smp.h:134
unsigned long t_size
Definition: smp.h:135
pid_t main_thread_pid
Definition: smp.cc:107
pid_t t_pid
Definition: smp.h:159