← Back to C-Kernel-Engine Docs Doxygen Source Documentation
ck_threadpool.h File Reference

Persistent pthread thread pool for CK-Engine inference. More...

#include <stdint.h>
#include <stdatomic.h>
#include <pthread.h>

Go to the source code of this file.

Macros

#define CK_CACHE_LINE   64
 
#define CK_THREADPOOL_MAX_THREADS   64
 
#define CK_THREADPOOL_SPIN_COUNT   1024
 

Typedefs

typedef void(* ck_work_fn_t) (int ith, int nth, void *args)
 

Functions

void ck_threadpool_barrier (ck_threadpool_t *pool)
 
ck_threadpool_t * ck_threadpool_create (int n_threads)
 
void ck_threadpool_destroy (ck_threadpool_t *pool)
 
void ck_threadpool_dispatch (ck_threadpool_t *pool, ck_work_fn_t fn, void *args)
 
ck_threadpool_t * ck_threadpool_global (void)
 
void ck_threadpool_global_destroy (void)
 
int ck_threadpool_n_threads (const ck_threadpool_t *pool)
 
void ck_threadpool_pause (ck_threadpool_t *pool)
 
void ck_threadpool_resume (ck_threadpool_t *pool)
 
int ck_threadpool_thread_id (const ck_threadpool_t *pool)
 

Detailed Description

Persistent pthread thread pool for CK-Engine inference.

Design goals:

  • Sub-microsecond dispatch latency (spin-wait barriers)
  • Zero allocation after init (all memory pre-allocated)
  • Cache-line aligned atomics to avoid false sharing
  • Hybrid polling: spin N rounds, then fall back to condvar
  • Thread 0 = main thread (does serial ops + its share of parallel work)

Usage: ck_threadpool_t *pool = ck_threadpool_create(4); // 4 threads total

// In decode loop: ck_threadpool_dispatch(pool, my_work_fn, args); // my_work_fn called on all threads with (ith, nth, args)

// Between batches: ck_threadpool_pause(pool); // workers sleep (0% CPU) ck_threadpool_resume(pool); // wake workers

ck_threadpool_destroy(pool);

Architecture: STARTUP: Main creates N-1 worker pthreads, all spin on atomic counter DISPATCH: Main writes work desc, bumps counter, all threads execute BARRIER: Atomic counter + spin-wait with _mm_pause() PAUSE: Workers sleep on pthread_cond_t (0% CPU between batches)

Definition in file ck_threadpool.h.

Macro Definition Documentation

◆ CK_CACHE_LINE

#define CK_CACHE_LINE   64

Cache line size for alignment (x86-64)

Definition at line 54 of file ck_threadpool.h.

◆ CK_THREADPOOL_MAX_THREADS

#define CK_THREADPOOL_MAX_THREADS   64

Maximum threads supported (main + workers)

Definition at line 48 of file ck_threadpool.h.

◆ CK_THREADPOOL_SPIN_COUNT

#define CK_THREADPOOL_SPIN_COUNT   1024

Number of spin iterations before falling back to condvar wait

Definition at line 51 of file ck_threadpool.h.

Typedef Documentation

◆ ck_work_fn_t

typedef void(* ck_work_fn_t) (int ith, int nth, void *args)

Work function signature. Called on ALL threads (including main thread 0).

Parameters
ithThread index (0 = main thread)
nthTotal number of threads
argsOpaque argument pointer (set via dispatch)

Definition at line 68 of file ck_threadpool.h.

Function Documentation

◆ ck_threadpool_barrier()

void ck_threadpool_barrier ( ck_threadpool_t *  pool)

Barrier synchronization within a dispatched work function.

ALL threads must call this at the same point. Threads spin-wait until all have arrived, then proceed.

Must only be called from within a work function (during dispatch).

Parameters
poolThread pool

Definition at line 320 of file ck_threadpool.c.

321 {
322  if (!pool || pool->n_threads <= 1) return;
323  barrier_wait(&pool->barrier);
324 }
static void barrier_wait(ck_barrier_t *b)
Definition: ck_threadpool.c:89

References barrier_wait().

◆ ck_threadpool_create()

ck_threadpool_t* ck_threadpool_create ( int  n_threads)

Create a thread pool with n_threads total threads. Thread 0 is the calling (main) thread; n_threads-1 workers are spawned.

Parameters
n_threadsTotal thread count (including main). Must be >= 1. Pass 0 for auto-detect (physical cores).
Returns
Pool handle, or NULL on failure.

Definition at line 183 of file ck_threadpool.c.

184 {
185  if (n_threads <= 0) {
186  n_threads = ck_get_physical_cores();
187  if (n_threads <= 0) n_threads = 1;
188  /* Cap at reasonable default for memory-bound workloads */
189  if (n_threads > 8) n_threads = 8;
190  }
191  if (n_threads > CK_THREADPOOL_MAX_THREADS) {
192  n_threads = CK_THREADPOOL_MAX_THREADS;
193  }
194 
195  ck_threadpool_t *pool = aligned_alloc(CK_CACHE_LINE, sizeof(ck_threadpool_t));
196  if (!pool) return NULL;
197  memset(pool, 0, sizeof(*pool));
198 
199  pool->n_threads = n_threads;
200  atomic_store(&pool->n_dispatch, 0);
201  atomic_store(&pool->n_complete, 0);
202  atomic_store(&pool->stop, 0);
203  atomic_store(&pool->paused, 0);
204  pool->work_fn = NULL;
205  pool->work_args = NULL;
206 
207  barrier_init(&pool->barrier, n_threads);
208 
209  pthread_mutex_init(&pool->mutex, NULL);
210  pthread_cond_init(&pool->cond_dispatch, NULL);
211  pthread_cond_init(&pool->cond_done, NULL);
212 
213  /* Thread 0 = main thread (no pthread created) */
214  pool->workers[0].id = 0;
215  pool->workers[0].pool = pool;
216  pool->workers[0].thread = pthread_self();
217 
218  /* Spawn N-1 worker threads */
219  for (int i = 1; i < n_threads; i++) {
220  pool->workers[i].id = i;
221  pool->workers[i].pool = pool;
222 
223  int rc = pthread_create(&pool->workers[i].thread, NULL,
224  worker_main, &pool->workers[i]);
225  if (rc != 0) {
226  fprintf(stderr, "[CK threadpool] Failed to create worker %d: %s\n",
227  i, strerror(rc));
228  /* Reduce thread count to what we managed to create */
229  pool->n_threads = i;
230  barrier_init(&pool->barrier, i);
231  break;
232  }
233  }
234 
235  if (pool->n_threads > 1) {
236  fprintf(stderr, "[CK threadpool] Created %d threads (1 main + %d workers)\n",
237  pool->n_threads, pool->n_threads - 1);
238  }
239 
240  return pool;
241 }
static void barrier_init(ck_barrier_t *b, int n_threads)
Definition: ck_threadpool.c:78
static void * worker_main(void *arg)
int ck_get_physical_cores(void)
#define CK_THREADPOOL_MAX_THREADS
Definition: ck_threadpool.h:48
#define CK_CACHE_LINE
Definition: ck_threadpool.h:54

References barrier_init(), CK_CACHE_LINE, ck_get_physical_cores(), CK_THREADPOOL_MAX_THREADS, and worker_main().

Referenced by global_pool_init().

◆ ck_threadpool_destroy()

void ck_threadpool_destroy ( ck_threadpool_t *  pool)

Destroy the thread pool. Signals all workers to exit and joins them. Safe to call with NULL.

Definition at line 243 of file ck_threadpool.c.

244 {
245  if (!pool) return;
246 
247  /* Signal shutdown */
248  atomic_store_explicit(&pool->stop, 1, memory_order_release);
249 
250  /* Wake all sleeping workers */
251  pthread_mutex_lock(&pool->mutex);
252  pthread_cond_broadcast(&pool->cond_dispatch);
253  pthread_mutex_unlock(&pool->mutex);
254 
255  /* Join all worker threads */
256  for (int i = 1; i < pool->n_threads; i++) {
257  pthread_join(pool->workers[i].thread, NULL);
258  }
259 
260  pthread_cond_destroy(&pool->cond_dispatch);
261  pthread_cond_destroy(&pool->cond_done);
262  pthread_mutex_destroy(&pool->mutex);
263 
264  free(pool);
265 }

Referenced by ck_threadpool_global_destroy().

◆ ck_threadpool_dispatch()

void ck_threadpool_dispatch ( ck_threadpool_t *  pool,
ck_work_fn_t  fn,
void *  args 
)

Dispatch work to all threads and wait for completion.

  1. Sets the work function and args
  2. Bumps the dispatch counter (wakes workers)
  3. Main thread (ith=0) executes its share
  4. Waits for all threads to complete via barrier

This is a blocking call — returns when ALL threads have finished.

Parameters
poolThread pool
fnWork function (called on each thread)
argsArgument passed to fn

Definition at line 271 of file ck_threadpool.c.

272 {
273  if (!pool || !fn) return;
274 
275  /* Single-thread fast path: just call directly */
276  if (pool->n_threads == 1) {
277  fn(0, 1, args);
278  return;
279  }
280 
281  /* Reset barrier phase for this dispatch */
282  barrier_init(&pool->barrier, pool->n_threads);
283 
284  /* Set work descriptor */
285  pool->work_fn = fn;
286  pool->work_args = args;
287  atomic_store_explicit(&pool->n_complete, 0, memory_order_release);
288 
289  /* Wake workers by bumping dispatch counter */
290  atomic_fetch_add_explicit(&pool->n_dispatch, 1, memory_order_release);
291 
292  /* Also signal condvar for sleeping workers */
293  pthread_mutex_lock(&pool->mutex);
294  pthread_cond_broadcast(&pool->cond_dispatch);
295  pthread_mutex_unlock(&pool->mutex);
296 
297  /* Main thread (ith=0) does its share */
298  fn(0, pool->n_threads, args);
299 
300  /* Wait for all workers to complete */
301  if (pool->n_threads > 1) {
302  int spins = 0;
303  while (atomic_load_explicit(&pool->n_complete, memory_order_acquire)
304  < pool->n_threads - 1) {
305  CK_SPIN_PAUSE();
306  spins++;
307  if (spins >= CK_THREADPOOL_SPIN_COUNT) {
308  pthread_mutex_lock(&pool->mutex);
309  if (atomic_load_explicit(&pool->n_complete, memory_order_acquire)
310  < pool->n_threads - 1) {
311  pthread_cond_wait(&pool->cond_done, &pool->mutex);
312  }
313  pthread_mutex_unlock(&pool->mutex);
314  spins = 0;
315  }
316  }
317  }
318 }
#define CK_SPIN_PAUSE()
Definition: ck_threadpool.c:27
#define CK_THREADPOOL_SPIN_COUNT
Definition: ck_threadpool.h:51

References barrier_init(), CK_SPIN_PAUSE, and CK_THREADPOOL_SPIN_COUNT.

◆ ck_threadpool_global()

ck_threadpool_t* ck_threadpool_global ( void  )

Get or create the global thread pool. Thread-safe (uses pthread_once internally). Uses ck_get_num_threads() for auto-detection.

Returns
Global pool, never NULL after successful first call.

Definition at line 383 of file ck_threadpool.c.

384 {
385  pthread_once(&g_threadpool_once, global_pool_init);
386  return g_threadpool;
387 }
static pthread_once_t g_threadpool_once
static void global_pool_init(void)
static ck_threadpool_t * g_threadpool

References g_threadpool, g_threadpool_once, and global_pool_init().

Referenced by ck_get_threadpool(), and ck_threadpool_init().

◆ ck_threadpool_global_destroy()

void ck_threadpool_global_destroy ( void  )

Destroy the global thread pool. Called during engine shutdown.

Definition at line 389 of file ck_threadpool.c.

390 {
391  if (g_threadpool) {
393  g_threadpool = NULL;
394  /* Reset once control so pool can be re-created if needed */
395  g_threadpool_once = PTHREAD_ONCE_INIT;
396  }
397 }
void ck_threadpool_destroy(ck_threadpool_t *pool)

References ck_threadpool_destroy(), g_threadpool, and g_threadpool_once.

Referenced by ck_threadpool_shutdown().

◆ ck_threadpool_n_threads()

int ck_threadpool_n_threads ( const ck_threadpool_t *  pool)

Get total thread count (including main thread)

Definition at line 351 of file ck_threadpool.c.

352 {
353  return pool ? pool->n_threads : 1;
354 }

◆ ck_threadpool_pause()

void ck_threadpool_pause ( ck_threadpool_t *  pool)

Pause workers — they sleep on condvar (0% CPU). Call between batches or during interactive waiting. Workers wake on next dispatch or resume.

Definition at line 330 of file ck_threadpool.c.

331 {
332  if (!pool) return;
333  atomic_store_explicit(&pool->paused, 1, memory_order_release);
334 }

◆ ck_threadpool_resume()

void ck_threadpool_resume ( ck_threadpool_t *  pool)

Resume workers — transition from sleep to spin-wait. Call before starting a new batch of work.

Definition at line 336 of file ck_threadpool.c.

337 {
338  if (!pool) return;
339  atomic_store_explicit(&pool->paused, 0, memory_order_release);
340 
341  /* Wake sleeping workers */
342  pthread_mutex_lock(&pool->mutex);
343  pthread_cond_broadcast(&pool->cond_dispatch);
344  pthread_mutex_unlock(&pool->mutex);
345 }

◆ ck_threadpool_thread_id()

int ck_threadpool_thread_id ( const ck_threadpool_t *  pool)

Get thread index for current thread (0 = main, -1 if not in pool)

Definition at line 356 of file ck_threadpool.c.

357 {
358  if (!pool) return -1;
359  pthread_t self = pthread_self();
360  for (int i = 0; i < pool->n_threads; i++) {
361  if (pthread_equal(self, pool->workers[i].thread)) {
362  return i;
363  }
364  }
365  return -1;
366 }