← Back to C-Kernel-Engine Docs Doxygen Source Documentation
ck_threadpool.c File Reference

Persistent pthread thread pool for CK-Engine inference. More...

#include "ck_threadpool.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>

Go to the source code of this file.

Macros

#define CK_SPIN_PAUSE()   ((void)0)
 

Functions

static void barrier_init (ck_barrier_t *b, int n_threads)
 
static void barrier_wait (ck_barrier_t *b)
 
int ck_get_num_threads (void)
 
int ck_get_physical_cores (void)
 
void ck_threadpool_barrier (ck_threadpool_t *pool)
 
ck_threadpool_t * ck_threadpool_create (int n_threads)
 
void ck_threadpool_destroy (ck_threadpool_t *pool)
 
void ck_threadpool_dispatch (ck_threadpool_t *pool, ck_work_fn_t fn, void *args)
 
ck_threadpool_t * ck_threadpool_global (void)
 
void ck_threadpool_global_destroy (void)
 
int ck_threadpool_n_threads (const ck_threadpool_t *pool)
 
void ck_threadpool_pause (ck_threadpool_t *pool)
 
void ck_threadpool_resume (ck_threadpool_t *pool)
 
int ck_threadpool_thread_id (const ck_threadpool_t *pool)
 
static void global_pool_init (void)
 
static void * worker_main (void *arg)
 

Variables

static ck_threadpool_t * g_threadpool = NULL
 
static pthread_once_t g_threadpool_once = PTHREAD_ONCE_INIT
 

Detailed Description

Persistent pthread thread pool for CK-Engine inference.

Architecture:

  • N-1 worker pthreads created at startup, main thread is thread 0
  • Workers spin on atomic dispatch counter waiting for work
  • Barriers use atomic counter + spin-wait with _mm_pause()
  • Hybrid polling: spin CK_THREADPOOL_SPIN_COUNT rounds, then condvar
  • All atomics on separate cache lines to avoid false sharing

Based on the ggml_threadpool design from llama.cpp, adapted for CK-Engine's kernel dispatch model.

Definition in file ck_threadpool.c.

Macro Definition Documentation

◆ CK_SPIN_PAUSE

#define CK_SPIN_PAUSE ( )    ((void)0)

Definition at line 27 of file ck_threadpool.c.

Function Documentation

◆ barrier_init()

static void barrier_init ( ck_barrier_t *  b,
int  n_threads 
)
static

Definition at line 78 of file ck_threadpool.c.

79 {
80  atomic_store(&b->n_arrived, 0);
81  atomic_store(&b->n_phase, 0);
82  b->n_threads = n_threads;
83 }

Referenced by ck_threadpool_create(), and ck_threadpool_dispatch().

◆ barrier_wait()

static void barrier_wait ( ck_barrier_t *  b)
static

Spin-wait barrier. All threads must call this. Uses phase counter to allow re-use without reset.

Definition at line 89 of file ck_threadpool.c.

90 {
91  const int n = b->n_threads;
92  const int phase = atomic_load_explicit(&b->n_phase, memory_order_relaxed);
93 
94  if (atomic_fetch_add_explicit(&b->n_arrived, 1, memory_order_acq_rel) == n - 1) {
95  /* Last thread to arrive — reset and advance phase */
96  atomic_store_explicit(&b->n_arrived, 0, memory_order_relaxed);
97  atomic_store_explicit(&b->n_phase, phase + 1, memory_order_release);
98  } else {
99  /* Spin until phase advances */
100  int spins = 0;
101  while (atomic_load_explicit(&b->n_phase, memory_order_acquire) == phase) {
102  CK_SPIN_PAUSE();
103  spins++;
104  /* After many spins, yield to avoid wasting CPU on oversubscribed systems */
105  if (spins > CK_THREADPOOL_SPIN_COUNT * 16) {
106  sched_yield();
107  spins = 0;
108  }
109  }
110  }
111 }
#define CK_SPIN_PAUSE()
Definition: ck_threadpool.c:27
#define CK_THREADPOOL_SPIN_COUNT
Definition: ck_threadpool.h:51

References CK_SPIN_PAUSE, and CK_THREADPOOL_SPIN_COUNT.

Referenced by ck_threadpool_barrier().

◆ ck_get_num_threads()

int ck_get_num_threads ( void  )

Definition at line 178 of file ckernel_strict.c.

179 {
180  // Auto-initialize if not set
181  if (!g_threads_initialized) {
182  ck_set_num_threads(0); // Auto-detect
183  }
184  return g_num_threads;
185 }
void ck_set_num_threads(int num_threads)
static int g_num_threads
static int g_threads_initialized

Referenced by global_pool_init().

◆ ck_get_physical_cores()

int ck_get_physical_cores ( void  )

Definition at line 62 of file ckernel_strict.c.

63 {
64  int physical_cores = 0;
65  int logical_cores = (int)sysconf(_SC_NPROCESSORS_ONLN);
66  if (logical_cores <= 0) {
67  logical_cores = 1;
68  }
69 
70  // Read from /proc/cpuinfo (Linux) and count unique (physical id, core id) pairs.
71  FILE *f = fopen("/proc/cpuinfo", "r");
72  if (f) {
73  char line[256];
74  int physical_id = -1;
75  int core_id = -1;
76 
77  struct {
78  int physical_id;
79  int core_id;
80  } seen[8192];
81  int seen_count = 0;
82 
83  const int seen_cap = (int)(sizeof(seen) / sizeof(seen[0]));
84 
85  // Helper: add (pid,cid) to set if not present.
86  #define CK_ADD_PAIR(pid, cid) \
87  do { \
88  if ((pid) >= 0 && (cid) >= 0) { \
89  int exists = 0; \
90  for (int ii = 0; ii < seen_count; ++ii) { \
91  if (seen[ii].physical_id == (pid) && \
92  seen[ii].core_id == (cid)) { \
93  exists = 1; \
94  break; \
95  } \
96  } \
97  if (!exists && seen_count < seen_cap) { \
98  seen[seen_count].physical_id = (pid); \
99  seen[seen_count].core_id = (cid); \
100  ++seen_count; \
101  } \
102  } \
103  } while (0)
104 
105  while (fgets(line, sizeof(line), f)) {
106  int val;
107 
108  // Blank line separates processor blocks.
109  if (line[0] == '\n' || line[0] == '\0') {
110  CK_ADD_PAIR(physical_id, core_id);
111  physical_id = -1;
112  core_id = -1;
113  continue;
114  }
115 
116  if (sscanf(line, "physical id : %d", &val) == 1) {
117  physical_id = val;
118  continue;
119  }
120  if (sscanf(line, "core id : %d", &val) == 1) {
121  core_id = val;
122  continue;
123  }
124  }
125  fclose(f);
126 
127  // Handle file without trailing blank line.
128  CK_ADD_PAIR(physical_id, core_id);
129 
130  #undef CK_ADD_PAIR
131 
132  physical_cores = seen_count;
133  }
134 
135  // If we couldn't reliably detect physical cores (common in containers),
136  // fall back to logical CPUs instead of incorrectly forcing single-thread execution.
137  if (physical_cores <= 1 && logical_cores > 1) {
138  return logical_cores;
139  }
140 
141  if (physical_cores > 1) {
142  return physical_cores;
143  }
144 
145  return logical_cores;
146 }
#define CK_ADD_PAIR(pid, cid)

Referenced by ck_set_num_threads(), and ck_threadpool_create().

◆ ck_threadpool_barrier()

void ck_threadpool_barrier ( ck_threadpool_t *  pool)

Barrier synchronization within a dispatched work function.

ALL threads must call this at the same point. Threads spin-wait until all have arrived, then proceed.

Must only be called from within a work function (during dispatch).

Parameters
poolThread pool

Definition at line 320 of file ck_threadpool.c.

321 {
322  if (!pool || pool->n_threads <= 1) return;
323  barrier_wait(&pool->barrier);
324 }
static void barrier_wait(ck_barrier_t *b)
Definition: ck_threadpool.c:89

References barrier_wait().

◆ ck_threadpool_create()

ck_threadpool_t* ck_threadpool_create ( int  n_threads)

Create a thread pool with n_threads total threads. Thread 0 is the calling (main) thread; n_threads-1 workers are spawned.

Parameters
n_threadsTotal thread count (including main). Must be >= 1. Pass 0 for auto-detect (physical cores).
Returns
Pool handle, or NULL on failure.

Definition at line 183 of file ck_threadpool.c.

184 {
185  if (n_threads <= 0) {
186  n_threads = ck_get_physical_cores();
187  if (n_threads <= 0) n_threads = 1;
188  /* Cap at reasonable default for memory-bound workloads */
189  if (n_threads > 8) n_threads = 8;
190  }
191  if (n_threads > CK_THREADPOOL_MAX_THREADS) {
192  n_threads = CK_THREADPOOL_MAX_THREADS;
193  }
194 
195  ck_threadpool_t *pool = aligned_alloc(CK_CACHE_LINE, sizeof(ck_threadpool_t));
196  if (!pool) return NULL;
197  memset(pool, 0, sizeof(*pool));
198 
199  pool->n_threads = n_threads;
200  atomic_store(&pool->n_dispatch, 0);
201  atomic_store(&pool->n_complete, 0);
202  atomic_store(&pool->stop, 0);
203  atomic_store(&pool->paused, 0);
204  pool->work_fn = NULL;
205  pool->work_args = NULL;
206 
207  barrier_init(&pool->barrier, n_threads);
208 
209  pthread_mutex_init(&pool->mutex, NULL);
210  pthread_cond_init(&pool->cond_dispatch, NULL);
211  pthread_cond_init(&pool->cond_done, NULL);
212 
213  /* Thread 0 = main thread (no pthread created) */
214  pool->workers[0].id = 0;
215  pool->workers[0].pool = pool;
216  pool->workers[0].thread = pthread_self();
217 
218  /* Spawn N-1 worker threads */
219  for (int i = 1; i < n_threads; i++) {
220  pool->workers[i].id = i;
221  pool->workers[i].pool = pool;
222 
223  int rc = pthread_create(&pool->workers[i].thread, NULL,
224  worker_main, &pool->workers[i]);
225  if (rc != 0) {
226  fprintf(stderr, "[CK threadpool] Failed to create worker %d: %s\n",
227  i, strerror(rc));
228  /* Reduce thread count to what we managed to create */
229  pool->n_threads = i;
230  barrier_init(&pool->barrier, i);
231  break;
232  }
233  }
234 
235  if (pool->n_threads > 1) {
236  fprintf(stderr, "[CK threadpool] Created %d threads (1 main + %d workers)\n",
237  pool->n_threads, pool->n_threads - 1);
238  }
239 
240  return pool;
241 }
static void barrier_init(ck_barrier_t *b, int n_threads)
Definition: ck_threadpool.c:78
static void * worker_main(void *arg)
int ck_get_physical_cores(void)
#define CK_THREADPOOL_MAX_THREADS
Definition: ck_threadpool.h:48
#define CK_CACHE_LINE
Definition: ck_threadpool.h:54

References barrier_init(), CK_CACHE_LINE, ck_get_physical_cores(), CK_THREADPOOL_MAX_THREADS, and worker_main().

Referenced by global_pool_init().

◆ ck_threadpool_destroy()

void ck_threadpool_destroy ( ck_threadpool_t *  pool)

Destroy the thread pool. Signals all workers to exit and joins them. Safe to call with NULL.

Definition at line 243 of file ck_threadpool.c.

244 {
245  if (!pool) return;
246 
247  /* Signal shutdown */
248  atomic_store_explicit(&pool->stop, 1, memory_order_release);
249 
250  /* Wake all sleeping workers */
251  pthread_mutex_lock(&pool->mutex);
252  pthread_cond_broadcast(&pool->cond_dispatch);
253  pthread_mutex_unlock(&pool->mutex);
254 
255  /* Join all worker threads */
256  for (int i = 1; i < pool->n_threads; i++) {
257  pthread_join(pool->workers[i].thread, NULL);
258  }
259 
260  pthread_cond_destroy(&pool->cond_dispatch);
261  pthread_cond_destroy(&pool->cond_done);
262  pthread_mutex_destroy(&pool->mutex);
263 
264  free(pool);
265 }

Referenced by ck_threadpool_global_destroy().

◆ ck_threadpool_dispatch()

void ck_threadpool_dispatch ( ck_threadpool_t *  pool,
ck_work_fn_t  fn,
void *  args 
)

Dispatch work to all threads and wait for completion.

  1. Sets the work function and args
  2. Bumps the dispatch counter (wakes workers)
  3. Main thread (ith=0) executes its share
  4. Waits for all threads to complete via barrier

This is a blocking call — returns when ALL threads have finished.

Parameters
poolThread pool
fnWork function (called on each thread)
argsArgument passed to fn

Definition at line 271 of file ck_threadpool.c.

272 {
273  if (!pool || !fn) return;
274 
275  /* Single-thread fast path: just call directly */
276  if (pool->n_threads == 1) {
277  fn(0, 1, args);
278  return;
279  }
280 
281  /* Reset barrier phase for this dispatch */
282  barrier_init(&pool->barrier, pool->n_threads);
283 
284  /* Set work descriptor */
285  pool->work_fn = fn;
286  pool->work_args = args;
287  atomic_store_explicit(&pool->n_complete, 0, memory_order_release);
288 
289  /* Wake workers by bumping dispatch counter */
290  atomic_fetch_add_explicit(&pool->n_dispatch, 1, memory_order_release);
291 
292  /* Also signal condvar for sleeping workers */
293  pthread_mutex_lock(&pool->mutex);
294  pthread_cond_broadcast(&pool->cond_dispatch);
295  pthread_mutex_unlock(&pool->mutex);
296 
297  /* Main thread (ith=0) does its share */
298  fn(0, pool->n_threads, args);
299 
300  /* Wait for all workers to complete */
301  if (pool->n_threads > 1) {
302  int spins = 0;
303  while (atomic_load_explicit(&pool->n_complete, memory_order_acquire)
304  < pool->n_threads - 1) {
305  CK_SPIN_PAUSE();
306  spins++;
307  if (spins >= CK_THREADPOOL_SPIN_COUNT) {
308  pthread_mutex_lock(&pool->mutex);
309  if (atomic_load_explicit(&pool->n_complete, memory_order_acquire)
310  < pool->n_threads - 1) {
311  pthread_cond_wait(&pool->cond_done, &pool->mutex);
312  }
313  pthread_mutex_unlock(&pool->mutex);
314  spins = 0;
315  }
316  }
317  }
318 }

References barrier_init(), CK_SPIN_PAUSE, and CK_THREADPOOL_SPIN_COUNT.

◆ ck_threadpool_global()

ck_threadpool_t* ck_threadpool_global ( void  )

Get or create the global thread pool. Thread-safe (uses pthread_once internally). Uses ck_get_num_threads() for auto-detection.

Returns
Global pool, never NULL after successful first call.

Definition at line 383 of file ck_threadpool.c.

384 {
385  pthread_once(&g_threadpool_once, global_pool_init);
386  return g_threadpool;
387 }
static pthread_once_t g_threadpool_once
static void global_pool_init(void)
static ck_threadpool_t * g_threadpool

References g_threadpool, g_threadpool_once, and global_pool_init().

Referenced by ck_get_threadpool(), and ck_threadpool_init().

◆ ck_threadpool_global_destroy()

void ck_threadpool_global_destroy ( void  )

Destroy the global thread pool. Called during engine shutdown.

Definition at line 389 of file ck_threadpool.c.

390 {
391  if (g_threadpool) {
393  g_threadpool = NULL;
394  /* Reset once control so pool can be re-created if needed */
395  g_threadpool_once = PTHREAD_ONCE_INIT;
396  }
397 }
void ck_threadpool_destroy(ck_threadpool_t *pool)

References ck_threadpool_destroy(), g_threadpool, and g_threadpool_once.

Referenced by ck_threadpool_shutdown().

◆ ck_threadpool_n_threads()

int ck_threadpool_n_threads ( const ck_threadpool_t *  pool)

Get total thread count (including main thread)

Definition at line 351 of file ck_threadpool.c.

352 {
353  return pool ? pool->n_threads : 1;
354 }

◆ ck_threadpool_pause()

void ck_threadpool_pause ( ck_threadpool_t *  pool)

Pause workers — they sleep on condvar (0% CPU). Call between batches or during interactive waiting. Workers wake on next dispatch or resume.

Definition at line 330 of file ck_threadpool.c.

331 {
332  if (!pool) return;
333  atomic_store_explicit(&pool->paused, 1, memory_order_release);
334 }

◆ ck_threadpool_resume()

void ck_threadpool_resume ( ck_threadpool_t *  pool)

Resume workers — transition from sleep to spin-wait. Call before starting a new batch of work.

Definition at line 336 of file ck_threadpool.c.

337 {
338  if (!pool) return;
339  atomic_store_explicit(&pool->paused, 0, memory_order_release);
340 
341  /* Wake sleeping workers */
342  pthread_mutex_lock(&pool->mutex);
343  pthread_cond_broadcast(&pool->cond_dispatch);
344  pthread_mutex_unlock(&pool->mutex);
345 }

◆ ck_threadpool_thread_id()

int ck_threadpool_thread_id ( const ck_threadpool_t *  pool)

Get thread index for current thread (0 = main, -1 if not in pool)

Definition at line 356 of file ck_threadpool.c.

357 {
358  if (!pool) return -1;
359  pthread_t self = pthread_self();
360  for (int i = 0; i < pool->n_threads; i++) {
361  if (pthread_equal(self, pool->workers[i].thread)) {
362  return i;
363  }
364  }
365  return -1;
366 }

◆ global_pool_init()

static void global_pool_init ( void  )
static

Definition at line 377 of file ck_threadpool.c.

378 {
379  int n = ck_get_num_threads();
381 }
ck_threadpool_t * ck_threadpool_create(int n_threads)
int ck_get_num_threads(void)

References ck_get_num_threads(), ck_threadpool_create(), and g_threadpool.

Referenced by ck_threadpool_global().

◆ worker_main()

static void* worker_main ( void *  arg)
static

Definition at line 117 of file ck_threadpool.c.

118 {
119  ck_worker_t *w = (ck_worker_t *)arg;
120  ck_threadpool_t *pool = w->pool;
121  const int ith = w->id;
122  int last_dispatch = 0;
123 
124  for (;;) {
125  /* Spin-wait for new dispatch */
126  int spins = 0;
127  for (;;) {
128  /* Check shutdown */
129  if (atomic_load_explicit(&pool->stop, memory_order_acquire)) {
130  return NULL;
131  }
132 
133  /* Check for new work */
134  int current = atomic_load_explicit(&pool->n_dispatch, memory_order_acquire);
135  if (current != last_dispatch) {
136  last_dispatch = current;
137  break;
138  }
139 
140  CK_SPIN_PAUSE();
141  spins++;
142 
143  /* After spinning, fall back to condvar sleep */
144  if (spins >= CK_THREADPOOL_SPIN_COUNT) {
145  pthread_mutex_lock(&pool->mutex);
146  /* Re-check under lock to avoid missed wakeup */
147  current = atomic_load_explicit(&pool->n_dispatch, memory_order_acquire);
148  if (current == last_dispatch &&
149  !atomic_load_explicit(&pool->stop, memory_order_acquire)) {
150  pthread_cond_wait(&pool->cond_dispatch, &pool->mutex);
151  }
152  pthread_mutex_unlock(&pool->mutex);
153  spins = 0;
154  }
155  }
156 
157  /* Execute work */
158  ck_work_fn_t fn = pool->work_fn;
159  void *args = pool->work_args;
160  if (fn) {
161  fn(ith, pool->n_threads, args);
162  }
163 
164  /* Signal completion */
165  if (atomic_fetch_add_explicit(&pool->n_complete, 1, memory_order_acq_rel)
166  == pool->n_threads - 2) {
167  /* Last worker done — wake main thread if it's waiting */
168  pthread_mutex_lock(&pool->mutex);
169  pthread_cond_signal(&pool->cond_done);
170  pthread_mutex_unlock(&pool->mutex);
171  }
172  }
173 
174  return NULL;
175 }
void(* ck_work_fn_t)(int ith, int nth, void *args)
Definition: ck_threadpool.h:68

References CK_SPIN_PAUSE, and CK_THREADPOOL_SPIN_COUNT.

Referenced by ck_threadpool_create().

Variable Documentation

◆ g_threadpool

ck_threadpool_t* g_threadpool = NULL
static

◆ g_threadpool_once

pthread_once_t g_threadpool_once = PTHREAD_ONCE_INIT
static

Definition at line 373 of file ck_threadpool.c.

Referenced by ck_threadpool_global(), and ck_threadpool_global_destroy().