54#define NEED_SMP_DECLS 1
55#include "tbci/basics.h"
69#ifdef HAVE_SYS_SYSINFO_H
70# include <sys/sysinfo.h>
83# define MAX_THREADS 160
89# include <sys/syscall.h>
94 return syscall(SYS_gettid);
117unsigned long page_mask;
121# define TCHK(x) if (UNLIKELY(err = x)) fprintf (stderr, #x " failed: %s\n", CSTD__ strerror (err))
122# define ERRDECL int err = 0
128#ifdef HAVE_SCHED_GETAFFINITY
129static cpu_set_t saved_cpuset;
131#ifndef HAVE_CPU_COUNT
133static int CPU_COUNT(
const cpu_set_t *cpus)
136 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
137 if (CPU_ISSET(
i, cpus))
143static void CPU_XOR(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
145 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
146 if (CPU_ISSET(
i, src1) ^ CPU_ISSET(
i, src2))
153static void CPU_AND(cpu_set_t *dest, cpu_set_t *src1, cpu_set_t *src2)
155 for (
int i = 0;
i < CPU_SETSIZE; ++
i)
156 if (CPU_ISSET(
i, src1) & CPU_ISSET(
i, src2))
164static int next_set_bit(
const int start,
const cpu_set_t *cpus)
166 for (
int i = start;
i < CPU_SETSIZE; ++
i)
167 if (CPU_ISSET(
i, cpus))
172static char cpu_buf[1024];
173static const char* cpu_str(
const cpu_set_t *cpus)
175 static volatile int off = 0;
181 char* ret = cpu_buf+old_off+1;
184 for (
int i = 0;
i < CPU_SETSIZE && ptr < 1016; ++
i)
185 if (CPU_ISSET(
i, cpus))
186 ptr += sprintf(cpu_buf+ptr,
" %i",
i);
189#define CPU_SETBYTES ((CPU_SETSIZE+7)/8)
190#if defined(__linux__)
192static void parse_siblings(
const int cpu, cpu_set_t *cpus,
const int remove)
195 sprintf(fn,
"/sys/devices/system/cpu/cpu%i/topology/thread_siblings", cpu);
196 FILE *f = fopen(fn,
"r");
199 int sibl[(CPU_SETSIZE+31)/32];
205 if (fscanf(f,
"%x,", &msk) == EOF) {
206 if (fscanf(f,
"%x", &msk) == EOF)
210 sibl[nr_ints++] = msk;
215 cpu_set_t sibl_set; CPU_ZERO(&sibl_set);
216 long* sibl_ptr = (
long*)&sibl_set;
217#if __BYTE_ORDER == __LITTLE_ENDIAN
218 for (
int j = nr_ints-1; j >= 0; --j)
219 *sibl_ptr++ = sibl[j];
221 long* sibl_ptr2 = (
long*)&sibl;
222 for (
int j = (nr_ints*
sizeof(
int)/
sizeof(
long))-1; j >= 0; --j)
223 *sibl_ptr++ = sibl_ptr2[j];
227 if (!CPU_ISSET(cpu, &sibl_set))
230 memcpy(cpus, &sibl_set, CPU_SETBYTES);
233 CPU_CLR(cpu, &sibl_set);
234 CPU_AND(&sibl_set, &sibl_set, cpus);
235 CPU_XOR(cpus, cpus, &sibl_set);
239static void add_siblings(
const int cpu, cpu_set_t *cpus)
241 parse_siblings(cpu, cpus, 0);
244static void remove_hyperthreads(cpu_set_t *cpus)
247 fprintf(stderr,
"CPU set before HT removal: %s\n", cpu_str(cpus));
249 for (
int i = CPU_SETSIZE-1;
i >=0; --
i) {
250 if (!CPU_ISSET(
i, cpus))
252 parse_siblings(
i, cpus, 1);
255 fprintf(stderr,
"CPU set after HT removal: %s\n", cpu_str(cpus));
259static void remove_hyperthreads(cpu_set_t *cpus)
270#ifdef HAVE_SCHED_GETAFFINITY
272 if (! sched_getaffinity(pid, CPU_SETBYTES, &saved_cpuset)) {
274 remove_hyperthreads(&saved_cpuset);
275 return CPU_COUNT(&saved_cpuset);
278#ifdef HAVE_GET_NPROCS
279 cpus = get_nprocs ();
282#elif defined(__linux__) && (defined(__i386__) || defined(__x86_64__))
285 cpuinfo = fopen (
"/proc/cpuinfo",
"r");
288 while (!feof (cpuinfo)) {
289 fgets (buf, 128, cpuinfo);
290 if (
CSTD__ memcmp (buf+1,
"rocessor", 8) == 0)
295 fprintf (stderr,
"%i CPUs detected.\n", cpus);
297 return (cpus == 0 ? 1: cpus);
300#ifdef HAVE_GETLOADAVG
304 int err = getloadavg (loads, 3);
306 return (
int)loads[0];
308 return (
int)loads[1];
335 cback callback(ctor, dtor, parm);
342 cback callback(ctor, dtor, parm);
344 BCHK(!cbk,
NumErr, Deregister unrgistered callback, (
long int)ctor, );
352 fprintf (stderr,
" Thread (%i) synchronization problem!\n", tc->
t_no);
353 fflush (stderr); abort ();
368static int cpu_to_node(
int cpu)
371 for (
int n = 0; n < numa_num_possible_nodes(); ++n) {
372 sprintf(fn,
"/sys/devices/system/node/node%i/cpu%i", n, cpu);
373 if (!access(fn, R_OK))
379int do_numa_init(
unsigned int cpu,
struct thr_struct *
ts,
unsigned long stack)
381 int node = cpu_to_node(cpu);
382 struct bitmask *nodes = numa_allocate_nodemask();
385 numa_bitmask_clearall(nodes);
386 numa_bitmask_setbit(nodes, node);
387 numa_set_preferred(node);
388#ifdef HAVE_SCHED_GETAFFINITY
389 numa_set_membind(nodes);
395 ts->numa_node = node;
396#ifdef HAVE_SCHED_GETAFFINITY
398 CPU_SET(cpu, &cpuset);
399 parse_siblings(cpu, &cpuset, 0);
400#ifdef HAVE_PTHREAD_GETAFFINITY_NP
401 pthread_setaffinity_np(
ts->t_id, CPU_SETBYTES, &cpuset);
403 sched_setaffinity(
ts->t_pid, CPU_SETBYTES, &cpuset);
408 void *pages[] = {(
void*)(stack & page_mask)};
409 const int nodes[] = {node};
410 int status = node, oldstat;
412 numa_move_pages(0, 1, pages,
413 NULL, &status, MPOL_MF_MOVE);
416 err = numa_move_pages(0, 1, pages,
417 nodes, &status, MPOL_MF_MOVE);
419 fprintf(stderr,
"Numa thr %i(tid %p, lwp %i): CPUs %s Node %i (Stack @%p: %i->%i(%li)\n",
420 ts->t_no, (
void*)
ts->t_id,
ts->t_pid,
421 cpu_str(&cpuset), node, pages[0], oldstat,
425 numa_free_nodemask(nodes);
429void numa_init_job(
struct thr_ctrl *tc)
436int do_numa_move_pages(
int node,
int fault_in,
unsigned long firstaddr,
unsigned long lastaddr)
439 int nodes[PG_PC], status[PG_PC];
440 int nr_pages = 0;
int nr_moved = 0;
441 unsigned long orig_addr = firstaddr;
443 memset(status, 0, PG_PC*
sizeof(
int));
445 if (firstaddr - (firstaddr & page_mask) >= page_size/2)
446 firstaddr = firstaddr & page_mask;
448 firstaddr = (firstaddr + (page_size - 1)) & page_mask;
449 if (lastaddr - (lastaddr & page_mask) > page_size/2)
450 lastaddr = lastaddr & page_mask;
452 lastaddr = (lastaddr + (page_size - 1)) & page_mask;
454 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i: %li pg %i calls)\n",
455 (
void*)firstaddr, (
void*)lastaddr, node,
456 (lastaddr-firstaddr)/page_size,
457 (lastaddr-firstaddr)/page_size/PG_PC);
459 while (lastaddr > firstaddr) {
461 for (
i = 0; (
i < PG_PC) && (firstaddr < lastaddr); ++
i, firstaddr += page_size) {
462 pages[
i] = (
void*)firstaddr;
466 if (
UNLIKELY(firstaddr < orig_addr))
467 *(
volatile int*)orig_addr = *(
volatile int*)orig_addr;
469 *(
volatile int*)firstaddr = *(
volatile int*)firstaddr;
473 long err = numa_move_pages(0,
i, pages,
474 NULL, status, MPOL_MF_MOVE);
476 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i): %s\n",
477 pages[0], pages[
i-1], node, strerror(errno));
478 int to_move = 0;
err = 0;
479 for (
int j = 0; j <
i; ++j)
480 if (status[j] != node)
483 fprintf(stderr,
"numa_move_pages(%p -- %p: %i pages, move %i to node %i (from %i and others)\n",
484 pages[0], pages[
i-1],
i, to_move, node, status[0]);
488 err = numa_move_pages(0,
i, pages,
489 nodes, status, MPOL_MF_MOVE);
491 fprintf(stderr,
"numa_move_pages(%p -- %p to node %i): %s\n",
492 pages[0], pages[
i-1], node, strerror(errno));
497void numa_move_pages_job(
struct thr_ctrl *tc)
501 (
unsigned long)tc->
t_par[0],
502 (
unsigned long)tc->
t_par[1]);
507#if defined(__i386__) || defined(__x86_64__)
508# define _cpu_relax() asm ("rep; nop")
510# define _cpu_relax() do {} while(0)
523# define POLL_REP2 168
529 while (w-- && (
signed)(rd = read(fd, ptr, sz)) < (
signed)sz)
543 fcntl(fd, F_SETFL, 0);
545 rd = read(fd, ptr, sz);
547 fcntl(fd, F_SETFL, O_NONBLOCK);
557#if defined(HAVE_TLS) || defined(HAVE_DTLS)
563 memset(&tc, 0,
sizeof(tc));
564 memset(&out, 0,
sizeof(out));
567 fprintf (stderr,
" Thread %i: Try to get setup lock\n", tc.
t_no);
572 fprintf (stderr,
" Thread %i setup: id %08lx, lwp %i; signal main thread ...\n",
573 ts->t_no,
ts->t_id,
ts->t_pid);
582 fprintf(stderr,
"Thread %i: Reading failed! %i\n", tc.
t_no,
err);
586 fprintf (stderr,
" Thread %i: Start job %li @ %p\n",
ts->t_no, tc.
t_job_no, tc.
t_job);
596 fprintf (stderr,
" Thread %i: Job done!\n",
ts->t_no);
602 fprintf(stderr,
"Thread %i: Writing failed! %i\n",
ts->t_no,
err);
606 fprintf (stderr,
" Thread %i: Signaled Job done!\n",
ts->t_no);
611 fprintf (stderr,
" Thread %i: exit!\n",
ts->t_no);
616#if defined(HAVE_CLOCK_GETTIME) && defined(THREAD_STAT)
618 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
619 const double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
620 out.
t_retval = (long)(secs * CLOCKS_PER_SEC);
623 err = write(
ts->t_pipe_from_thread[1], &out,
sizeof(
struct job_output));
631 int t,
err, det_cpus, all_cpus;
632 pthread_mutexattr_t mutattr;
641#ifdef HAVE_GETLOADAVG
644 int lavg = loadavg ();
665 fprintf(stderr,
"Warning: Number of threads %i larger than no of CPU cores %i!\n",
667#ifdef HAVE_SCHED_GETAFFINITY
684#if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
700 err = pipe(
ts->t_pipe_to_thread);
702 fprintf(stderr,
"Failed to create pipe for thread %i\n", t);
705 err = pipe(
ts->t_pipe_from_thread);
707 fprintf(stderr,
"Failed to create pipe for thread %i\n", t);
710 fcntl(
ts->t_pipe_to_thread[0], F_SETFL, O_NONBLOCK);
711 fcntl(
ts->t_pipe_from_thread[0], F_SETFL, O_NONBLOCK);
715 fprintf (stderr,
"%i threads @%p (sz=%zi) set up.\n",
729#ifdef HAVE_SCHED_GETAFFINITY
731 CPU_ZERO(&saved_cpuset);
734 fprintf(stderr,
"TBCI SMP: More threads(%i) than allowed CPUs(%i)!\n",
736 cpu_set_t cpuset; CPU_ZERO(&cpuset);
739 int next_cpu = next_set_bit(0, &saved_cpuset);
745 if (numa_max_node() == 0)
748 page_size = numa_pagesize();
749 page_mask = ~(page_size-1ULL);
753 fprintf(stderr,
"NUMA: avail %i, maxnode %i\n",
numa_avail, numa_max_node());
756 CPU_SET(next_cpu, &cpuset);
758 add_siblings(next_cpu, &cpuset);
763 ts.t_id = pthread_self();
767#ifdef HAVE_SCHED_GETAFFINITY
774 next_cpu, cpu_str(&cpuset));
779 next_cpu, cpu_str(&cpuset));
785 int several_nodes = 0;
788 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
790 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
791 CPU_SET(next_cpu, &cpuset);
793 add_siblings(next_cpu, &cpuset);
799 (
void*)
ts, (
void*)0);
802#ifdef HAVE_PTHREAD_GETAFFINITY_NP
803 pthread_setaffinity_np(
ts->t_id, CPU_SETBYTES, &cpuset);
808 fprintf(stderr,
"Set Thread %i: CPUs %s\n", t,
812 pthread_getaffinity_np(
ts->t_id, CPU_SETBYTES, &cpuset);
813 fprintf(stderr,
"Get Thread %i: CPUs %s\n", t,
824#ifdef HAVE_SCHED_GETAFFINITY
826 fprintf(stderr,
"Main thread: CPUs %s, Node %i\n",
829#ifdef HAVE_PTHREAD_GETAFFINITY_NP
832 pthread_getaffinity_np(
ts->t_id, CPU_SETBYTES, &cpuset);
833 fprintf(stderr,
"Thread %i: CPUs %s, Node %i\n",
834 ts->t_no, cpu_str(&cpuset),
ts->numa_node);
837 if (!several_nodes) {
840 fprintf(stderr,
"All threads on node %i, disabling NUMA\n",
846 int omp_thr = omp_get_max_threads();
848 fprintf(stderr,
"Now setting affinity for %i OpenMP threads ...\n", omp_thr);
850#pragma omp parallel for private(cpuset, next_cpu) schedule(static,1)
851 for (
int t = 0; t < omp_thr; ++t) {
852 int tid = omp_get_thread_num();
853 next_cpu = next_set_bit(0, &saved_cpuset);
854 for (
int i = 0;
i < t; ++
i) {
855 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
857 next_cpu = next_set_bit(next_cpu+1, &saved_cpuset);
860 CPU_SET(next_cpu, &cpuset);
862 add_siblings(next_cpu, &cpuset);
863 sched_setaffinity(0, CPU_SETBYTES, &cpuset);
866 fprintf(stderr,
"Setting OMP thread %i to %i CPUs %s\n", tid, CPU_COUNT(&cpuset), cpu_str(&cpuset));
885 memset(&job, 0,
sizeof(job));
888 fcntl(
ts->t_pipe_from_thread[0], F_SETFL, 0);
889 err = write(
ts->t_pipe_to_thread[1], &job,
sizeof(job));
890 err = read(
ts->t_pipe_from_thread[0], &out,
sizeof(out));
901 fprintf (stderr,
" CPU time for thread %i :%7.3f s\n",
902 ts->t_no, (
double)(tm)/CLOCKS_PER_SEC);
909 clock_t mainclock = clock();
910#ifdef HAVE_CLOCK_GETTIME
913 if (!clock_gettime(CLOCK_THREAD_CPUTIME_ID, &tm)) {
914 double secs = tm.tv_sec + (double)tm.tv_nsec/1e9;
915 mainclock = (long)(secs*CLOCKS_PER_SEC);
920 fprintf (stderr,
" CPU time for main thr :%7.3f s\n",
921 (
double)mainclock/CLOCKS_PER_SEC);
922 fprintf (stderr,
" CPU for all threads :%7.3f s\n",
924 fprintf (stderr,
" Poll successes: %li, unexp. succ.: %li, failures: %li, exp. fails: %li\n",
931#if !defined(HAVE_TLS) && !defined(HAVE_DTLS)
935#ifdef HAVE_SCHED_GETAFFINITY
947 const unsigned long off,
const unsigned long sz,
950 void * par;
unsigned t = 0;
955 memset(&job, 0,
sizeof(job));
962 while ((par = va_arg (vl,
void*)))
963 job.
t_par[t++] = par;
967 fprintf (stderr,
"Signal thread %i\n",
ts->t_no);
970 if (write(
ts->t_pipe_to_thread[1], &job,
sizeof(job)) !=
sizeof(job)) {
971 fprintf(stderr,
"Signaling job %li to thread %i failed",
980 const unsigned long off,
const unsigned long sz, ...)
989 const unsigned long sz, ...)
1004 fprintf (stderr,
"Wait for thread %i\n",
ts->t_no);
1006 if (
busy_read(
ts->t_pipe_from_thread[0], out? out: &out2,
sizeof(out2),
POLL_REP2) !=
sizeof(out2)) {
1007 fprintf(stderr,
"Waiting for thread %i failed!\n",
ts->t_no);
1012 fprintf (stderr,
"Thread %i signaled completion.\n",
ts->t_no);
1024 fprintf (stderr,
"Wait for result of thread %i\n",
ts->t_no);
1027 fprintf(stderr,
"Waiting for thread %i failed!\n",
ts->t_no);
1032 fprintf (stderr,
"Thread %i signaled completion\n",
ts->t_no);
1041 omp_set_num_threads(0);
1050 fprintf (stderr,
"reenable_threads(): Threads already enabled.!\n");
1072{ omp_set_num_threads(0); }
1078{ omp_set_num_threads(omp_get_num_procs()); }
1083 const unsigned long sz, ...))
1091 while ((par = va_arg (vl,
void*)))
1092 tc->
t_par[t++] = par;
1094 fprintf (stderr,
"Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
1101 const unsigned long off,
const unsigned long sz, ...))
1109 while ((par = va_arg (vl,
void*)))
1110 tc->
t_par[t++] = par;
1112 fprintf (stderr,
"Warning: Tried to start a thread with non-SMP compiled smp.cc !\n");
const Vector< T > const Vector< T > const Vector< T > & p
const Vector< T > const Vector< T > const Vector< T > int T T & err
#define BCHK(cond, exc, txt, ind, rtval)
#define PREFETCH_R(addr, loc)
In case gcc does not yet support __builtin_prefetch(), we have handcoded assembly with gcc for a few ...
#define BCHKNR(cond, exc, txt, ind)
exception base class for the TBCI NumLib
void deinit(const int thr)
cback(cbackfn ct, cbackfn dt, void *p)
const unsigned TMatrix< T > const Matrix< T > * a
const unsigned TMatrix< T > * res
static int detect_num_cpu(int rmv_ht)
void thread_dereg_callback(cbackfn ctor, cbackfn dtor, void *parm)
THREAD__ struct thr_struct * this_thread
void lina_err(struct thr_ctrl *tc)
void thread_start_off(const int thr_no, thr_job_t job, const unsigned long off, const unsigned long sz,...)
void thread_wait(const int thr_no, struct job_output *out)
static List< cback > thread_cbacks
void _thread_start_off(const int thr_no, thr_job_t ljob, const unsigned long off, const unsigned long sz, va_list vl)
NAMESPACE_TBCI int num_threads
void lina_empty(struct thr_ctrl *dummy)
int init_threads(const int num_cpu, const bool load_magic)
static int busy_read(int fd, void *ptr, size_t sz, int rep=1)
static unsigned long job_no
double thread_wait_result(const int thr_no)
void bind_threads(bool bind_main, bool enable_numa, bool add_sibl)
void thread_start(const int thr_no, thr_job_t job, const unsigned long sz,...)
void thread_reg_callback(cbackfn ctor, cbackfn dtor, void *parm)
void * empty_thread(void *dummy)
void * lina_thread(void *thr)
struct thr_struct * threads
void(* thr_job_t)(struct thr_ctrl *)
Before the double inclusion guard on purpose!
static void disable_threads()
static void reenable_threads()
static int init_threads(const int=0, const bool=false)
void *(* useful_job_t)(void *)
static void free_threads()
volatile char t_res_dummy[16]
unsigned long t_job_output_no
volatile char t_res_dummy[16]
unsigned int tbci_control
#define _TBCI_CWD_DEFAULT