#define _GNU_SOURCE #include #include #include #include #include #include #include #include "es.h" #include "ez_libpthread.h" #include "map.h" #include "msgqueue.h" #include "util.h" /* Types of registered callbacks */ enum ES_type { ES_FD_TYPE, ES_SIG_TYPE, ES_VSIG_TYPE, ES_TIMER_TYPE }; #define NUMSIGS 30 /* NOTE: if this queue becomes full, it * can wreak havok on your mutlithreading * logic! */ #define VSIG_QUEUE_MAX 1000 /**************************************************** * We get one of these anonymous structs per-process. ****************************************************/ static struct { pthread_mutex_t mtx; /* global initialization mutex */ volatile int keySrc; volatile enum { GLOBAL_INIT_FLG=1<<0 } flags; /* Default sigaction stuff */ struct { sigset_t set; struct sigaction arr[NUMSIGS]; } dflt_sa; struct { /* Stuff for ES_spawn_thread_sched() */ pthread_cond_t cond; /* Condition used for thread synchronization */ pthread_mutex_t cond_mtx; /* condition mutex */ pthread_mutex_t mtx; /* mutex for ES spawn operation */ int release_parent; /* Value to test for pthread_cond_wait() */ } spawn; struct { pthread_mutex_t mtx; /* mutex for virtual signal operations */ MAP thrd_ts_map; /* Map associating thread identifier to TS object */ } vsig; } S= { .mtx= PTHREAD_MUTEX_INITIALIZER, .spawn.cond= PTHREAD_COND_INITIALIZER, .spawn.cond_mtx= PTHREAD_MUTEX_INITIALIZER, .spawn.mtx= PTHREAD_MUTEX_INITIALIZER, .vsig.mtx= PTHREAD_MUTEX_INITIALIZER }; /**************************************************** * We get one of these anonymous structs per-thread. ****************************************************/ static _Thread_local struct _TS { /* Current pthread identifier. If it doesn't match * pthread_self(), then we are not yet initialized * in the current thread. */ pthread_t tid; enum { TS_PROCESSING_FLG= 1<<0 } flags; /* Vectors of Cb by type, for fast processing */ PTRVEC fd_vec, timer_vec, deleted_vec; PTRVEC sig_vec_arr[NUMSIGS]; // One vector for each Unix signal /* Hash table to quickly find Cb's */ MAP key_map; /* Simple bit field to know if a signal has been * raised at least once. */ sigset_t sigsRaised; struct { /* virtual signal message queue */ MSGQUEUE mq; MAP cb_map; } vsig; } TS; static void UnixSignalHandler (int signo) /**************************************************** * Unix signal handler */ { /* Simply note that a signal has been raised */ sigaddset(&TS.sigsRaised, signo); } /******************************************************************/ /** Class for callback objects ************************************/ /******************************************************************/ typedef struct { enum { CB_DELETED_FLG=1<<31 } flags; int64_t lastActivity_ms; /* Process-wide unique integer */ int key; /* Which type of callback object */ enum ES_type type; /* Registrant's supplied context pointer, passed back * into callback function */ void *ctxt; union { /* Union to accommodate the different callback types */ /* Unix file descriptors */ struct { int fd; short events; int (*callback_f)(void *ctxt, int fd, short events); } fd; /* Unix & virtual signals */ struct { int signum; int (*callback_f)(void *ctxt, int signo); } sig; /* Interval timers */ struct { int64_t register_ms, pause_ms, interval_ms, remaining_ms, count; int (*callback_f)(void *ctxt); } timer; } un; } Cb; static int64_t msec2timeout(const Cb *cb, int64_t time_ms) /********************************************************************* * Compute the number of milliseconds remaining for an interval timer. * May be negative if timeout should have already happened. */ { assert(ES_TIMER_TYPE == cb->type); int64_t when_ms= cb->un.timer.register_ms + cb->un.timer.pause_ms + cb->un.timer.count * cb->un.timer.interval_ms; return when_ms - time_ms; } #define Cb_FdCreate(self, fd, events, callback_f, ctxt)\ (Cb_FdConstructor((self)=malloc(sizeof(Cb)), fd, events, callback_f, ctxt) ? (self) : ( self ? realloc(Cb_destructor(self),0): 0)) static Cb* Cb_FdConstructor( Cb *self, int fd, short events, int (*callback_f)(void *ctxt, int fd, short events), void *ctxt ) /********************************************************************* * Initialize for Unix fd. */ { assert(self); memset(self, 0, sizeof(*self)); self->key= ++S.keySrc; self->type= ES_FD_TYPE; self->un.fd.fd= fd; self->un.fd.callback_f= callback_f; self->un.fd.events= events; self->ctxt= ctxt; return self; } #define Cb_SignalCreate(self, signum, callback_f, ctxt)\ (Cb_SignalConstructor((self)=malloc(sizeof(Cb)), signum, callback_f, ctxt) ? (self) : ( self ? realloc(Cb_destructor(self),0): 0)) static Cb* Cb_SignalConstructor( Cb *self, int signum, int (*callback_f)(void *ctxt, int signum), void *ctxt ) /********************************************************************* * Initialize for Unix signal. */ { assert(self); memset(self, 0, sizeof(*self)); self->key= ++S.keySrc; self->type= ES_SIG_TYPE; self->un.sig.signum= signum; self->un.sig.callback_f= callback_f; self->ctxt= ctxt; return self; } #define Cb_VSignalCreate(self, signum, callback_f, ctxt)\ (Cb_VSignalConstructor((self)=malloc(sizeof(Cb)), signum, callback_f, ctxt) ? (self) : ( self ? realloc(Cb_destructor(self),0): 0)) static Cb* Cb_VSignalConstructor( Cb *self, int signum, int (*callback_f)(void *ctxt, int signum), void *ctxt ) /********************************************************************* * Initialize for Unix signal. */ { assert(self); memset(self, 0, sizeof(*self)); self->key= ++S.keySrc; self->type= ES_VSIG_TYPE; self->un.sig.signum= signum; self->un.sig.callback_f= callback_f; self->ctxt= ctxt; return self; } #define Cb_TimerCreate(self, pause_secs, interval_secs, callback_f, ctxt)\ (Cb_TimerConstructor((self)=malloc(sizeof(Cb)), pause_secs, interval_secs, callback_f, ctxt) ? (self) : ( self ? realloc(Cb_destructor(self),0): 0)) static Cb* Cb_TimerConstructor( Cb *self, int64_t pause_ms, int64_t interval_ms, int (*callback_f)(void *ctxt), void *ctxt ) /********************************************************************* * Initialize for an interval timer. */ { assert(self); memset(self, 0, sizeof(*self)); self->key= ++S.keySrc; self->type= ES_TIMER_TYPE; #if ! (defined (_WIN32) || defined (__CYGWIN__)) self->un.timer.register_ms= clock_gettime_ms(CLOCK_MONOTONIC_COARSE); #else self->un.timer.register_ms= clock_gettime_ms(CLOCK_MONOTONIC); #endif self->un.timer.pause_ms= pause_ms; self->un.timer.interval_ms= interval_ms; self->un.timer.callback_f= callback_f; self->un.timer.count= 0; self->ctxt= ctxt; return self; } #define Cb_destroy(s)\ {if(Cb_destructor(s)) {free(s); (s)=NULL;}} static void* Cb_destructor(Cb *self) /************************************************ * Free resources associated with object. */ { return self; } /******************************************************************/ /***************** ES *********************************************/ /******************************************************************/ static int sigusr2_h(void *ctxt, int unused) /********************************************************************** * Handle any vsignals. */ { int rtn= 0, vsigno; Cb *cb_arr[VSIG_QUEUE_MAX]; /* Protected operation */ // ez_pthread_mutex_lock(&S.vsig.mtx); while(EOF != MSGQUEUE_extractMsg(&TS.vsig.mq, &vsigno)) { int rc= MAP_findItems(&TS.vsig.cb_map, (void**)cb_arr, VSIG_QUEUE_MAX, &vsigno, sizeof(int)); if(-1 == rc) { eprintf("FATAL: MAP_findItems() failed"); abort(); } if(!rc) goto abort; for(int i= 0; i < rc; ++i) { Cb *cb= cb_arr[i]; rtn= (* cb->un.sig.callback_f)(cb->ctxt, vsigno); if(rtn) goto abort; } } abort: // ez_pthread_mutex_unlock(&S.vsig.mtx); return rtn; } static void initialize() /********************************************************************** * Initialization for current thread, and once for the whole process. */ { /* Get the global mutex */ ez_pthread_mutex_lock(&S.mtx); /* Processwide static data */ if(!(S.flags & GLOBAL_INIT_FLG)) { S.flags |= GLOBAL_INIT_FLG; if(-1 == sigemptyset(&S.dflt_sa.set)) assert(0); MAP_constructor(&S.vsig.thrd_ts_map, 10, 10); } /* Release the global mutex */ ez_pthread_mutex_unlock(&S.mtx); /* Per-thread static data */ PTRVEC_constructor(&TS.fd_vec, 10); PTRVEC_constructor(&TS.timer_vec, 10); PTRVEC_constructor(&TS.deleted_vec, 10); for(int i= 0; i < NUMSIGS; ++i) { PTRVEC_constructor(&TS.sig_vec_arr[i], 10); } if(-1 == sigemptyset(&TS.sigsRaised)) assert(0); MAP_constructor(&TS.key_map, 10, 10); /* Remember so we don't call ourselves more than once in the same thread */ TS.tid= pthread_self(); /* Add ourself to the vsig thread to TS map */ ez_pthread_mutex_lock(&S.vsig.mtx); MAP_addTypedKey(&S.vsig.thrd_ts_map, TS.tid, &TS); /*--- virtual signal infrastructure ---*/ MSGQUEUE_constructor(&TS.vsig.mq, sizeof(int), VSIG_QUEUE_MAX); MAP_constructor(&TS.vsig.cb_map, 10, 10); /* Register a signal handler for SIGUSR2 so we can have virtual signals. */ ez_ES_registerSignal(SIGUSR2, sigusr2_h, NULL); ez_pthread_mutex_unlock(&S.vsig.mtx); } inline static unsigned signum2dflt_sa_ndx(int signum) /********************************************************************** * Convert signum to an index for S.dflt_sa.XX */ { assert(SIGKILL != signum && SIGSTOP != signum); if(signum < SIGKILL) return signum - 1; if(signum < SIGSTOP) return signum - 2; return signum - 3; } int ES_registerSignal ( int signum, int (*callback_f)(void *ctxt, int signo), void *ctxt ) /********************************************************************** * Register a function to be called when a particular Unix signal is * raised. */ { if(TS.tid != pthread_self()) initialize(); Cb *cb; unsigned ndx= signum2dflt_sa_ndx(signum); /* Only install a new Unix signal handler if we do not already handle this signal */ if(!PTRVEC_numItems(&TS.sig_vec_arr[ndx])) { struct sigaction act; act.sa_handler = UnixSignalHandler; sigemptyset (&act.sa_mask); act.sa_flags = SA_RESTART|SA_NODEFER; /* We only store the default action once per process */ if(!sigismember(&S.dflt_sa.set, signum)) { sigaddset(&S.dflt_sa.set, signum); if (sigaction (signum, &act, &S.dflt_sa.arr[ndx])) assert (0); } else { if (sigaction (signum, &act, NULL)) assert (0); } } if(!Cb_SignalCreate(cb, signum, callback_f, ctxt)) assert(0); /* All callbacks are put in the key table */ MAP_addTypedKey(&TS.key_map, cb->key, cb); /* Add to the signal vector */ PTRVEC_addTail(&TS.sig_vec_arr[signum2dflt_sa_ndx(signum)], cb); return cb->key; } int ES_registerVSignal ( int signum, int (*callback_f)(void *ctxt, int signo), void *ctxt ) /********************************************************************** * Register a function to be called when a particular virtual signal is * raised. Virtual signals are implemented on top of the Unix signal, SIGUSR2. * * signum: Any integer number which is meaningful to your application. * callback_f: callback function for when activity is detected. * ctxt: Pointer which will be passed as the first argument to callback_f(). * * RETURNS: * If successful, a positive integer which can be used to unregister the callback. * On failure, -1 is returned. */ { if(TS.tid != pthread_self()) initialize(); Cb *cb; if(!Cb_VSignalCreate(cb, signum, callback_f, ctxt)) assert(0); /* Place in the virtual signal map indexed on signum */ MAP_addTypedKey(&TS.vsig.cb_map, cb->un.sig.signum, cb); /* All callbacks are put in the key table */ MAP_addTypedKey(&TS.key_map, cb->key, cb); return cb->key; } int ES_registerFd ( int fd, short events, int (*callback_f)(void *ctxt, int fd, short events), void *ctxt ) /********************************************************************** * Register a function to be called when there is activity on the * file descriptor (which may be a file, socket, pipe, etc. under Unix). */ { if(TS.tid != pthread_self()) initialize(); Cb *cb; if(!Cb_FdCreate(cb, fd, events, callback_f, ctxt)) assert(0); /* Index to vector for quick processing */ PTRVEC_addTail(&TS.fd_vec, cb); /* All callbacks are put in the key table */ MAP_addTypedKey(&TS.key_map, cb->key, cb); return cb->key; } int ES_registerTimer ( int64_t pause_ms, int64_t interval_ms, int (*callback_f)(void *ctxt), void *ctxt ) /********************************************************************** * Register a function to be called when a timer times out. */ { if(TS.tid != pthread_self()) initialize(); Cb *cb; if(!Cb_TimerCreate(cb, pause_ms, interval_ms, callback_f, ctxt)) assert(0); /* Add to the timer vector */ PTRVEC_addTail(&TS.timer_vec, cb); /* All callbacks are put in the key table */ MAP_addTypedKey(&TS.key_map, cb->key, cb); return cb->key; } int ES_unregister (int key) /********************************************************************** * Unegister a previously registered callback. */ { if(TS.tid != pthread_self()) initialize(); Cb *cb = MAP_findTypedItem(&TS.key_map, key); if(!cb) { #ifdef qqDEBUG eprintf("WARNING: could not find callback with key= %d", key); #endif return -1; } /* If the callback processing is currently active, do not delete it */ if(TS.flags & TS_PROCESSING_FLG) { if(!(cb->flags & CB_DELETED_FLG)) { cb->flags |= CB_DELETED_FLG; PTRVEC_addTail(&TS.deleted_vec, cb); } return 0; } /* Remove from key table */ if(!MAP_removeTypedItem(&TS.key_map, key)) assert(0); /* Different operations needed based on type */ switch(cb->type) { case ES_FD_TYPE: /* Remove from file descriptor vector */ if(!PTRVEC_remove(&TS.fd_vec, cb)) assert(0); break; case ES_SIG_TYPE: { unsigned ndx= signum2dflt_sa_ndx(cb->un.sig.signum); /* Remove from appropriate signals vector */ if(!PTRVEC_remove(&TS.sig_vec_arr[ndx], cb)) assert(0); /* If there are no more signals in this vector */ if(!PTRVEC_numItems(&TS.sig_vec_arr[ndx])) { assert(sigismember(&S.dflt_sa.set, cb->un.sig.signum)); /* Restore default signal handling */ if (sigaction (cb->un.sig.signum, &S.dflt_sa.arr[ndx], NULL)) assert (0); } } break; case ES_VSIG_TYPE: if(!MAP_removeSpecificTypedItem(&TS.vsig.cb_map, cb->un.sig.signum, cb)) assert(0); break; case ES_TIMER_TYPE: /* Remove from timer vector */ if(!PTRVEC_remove(&TS.timer_vec, cb)) assert(0); break; default: assert(0); }; /* Free the callback object */ Cb_destroy(cb); return 0; } static int cmp_remaining_ms (const void *const*p1, const void *const*p2) /********************************************************************** * Compare the time remaining for PTRVEC_sort(). */ { const Cb *cb1= (const Cb*)(*p1), *cb2= (const Cb*)(*p2); assert(ES_TIMER_TYPE == cb1->type && ES_TIMER_TYPE == cb2->type); if(cb1->un.timer.remaining_ms < cb2->un.timer.remaining_ms) return -1; if(cb1->un.timer.remaining_ms == cb2->un.timer.remaining_ms) return 0; return 1; } static int lastActivity_cmp(const void *const* pp1, const void *const* pp2) { const Cb *cb1= *(const Cb *const*)pp1, *cb2= *(const Cb *const*)pp2; /* Put oldest at the top of the vector */ if(cb1->lastActivity_ms < cb2->lastActivity_ms) return -1; return 1; } int ES_run (void) /********************************************************************** * For this thread, use poll() to monitor socket activity until one * of the registered callback_f() returns non-zero. * * RETURNS: * Whatever nonzero value one of the callbacks() returned. */ { int rtn= -1; if(TS.tid != pthread_self()) initialize(); /* Loop forever */ for(;;) { TS.flags |= TS_PROCESSING_FLG; int numFds= PTRVEC_numItems(&TS.fd_vec); struct pollfd pollItemArr[numFds]; Cb *cbArr[numFds]; /* This sort provides fair queuing */ PTRVEC_sort(&TS.fd_vec, lastActivity_cmp); /****** Load up the pollItemArr *****/ unsigned i; Cb *cb; PTRVEC_loopFwd(&TS.fd_vec, i, cb) { struct pollfd *item= pollItemArr+i; switch(cb->type) { case ES_FD_TYPE: item->fd= cb->un.fd.fd; item->events= cb->un.fd.events; break; default: assert(0); } /* Clear the return event field for good measure */ item->revents= 0; /* Remember the Cb object */ cbArr[i]= cb; } /* There may not be any timers */ int64_t poll_ms= -1; /***** If there are any timers to consider ****/ if(PTRVEC_numItems(&TS.timer_vec)) { #if ! (defined (_WIN32) || defined (__CYGWIN__)) int64_t time_ms= clock_gettime_ms(CLOCK_MONOTONIC_COARSE); #else int64_t time_ms= clock_gettime_ms(CLOCK_MONOTONIC); #endif /* Prepare timers to be sorted */ unsigned i; PTRVEC_loopFwd(&TS.timer_vec, i, cb) { assert(ES_TIMER_TYPE == cb->type); cb->un.timer.remaining_ms= msec2timeout(cb, time_ms); } /* Sort them so the most urgent timer is at the top */ PTRVEC_sort(&TS.timer_vec, cmp_remaining_ms); /* Get the top item */ cb= PTRVEC_first(&TS.timer_vec); assert(cb); /* This is how long we need to wait */ poll_ms= MAX(cb->un.timer.remaining_ms, 0); } /*******************************************************/ /******** Wait for something to happen *****************/ /*******************************************************/ int poll_rc= poll(pollItemArr, numFds, poll_ms); /********* Check return code *****/ if(-1 == poll_rc) { switch(errno) { case EFAULT: eprintf("ERROR: poll() failed"); goto abort; case EINTR: /* Signal caused poll() to return, which is OK */ break; default: assert(0); } } /********* Respond to signals *****/ int signum; for(signum= 1; signum < 32; ++signum) { /* Can't do anything with these signals */ if(signum == SIGKILL || signum == SIGSTOP) continue; /* See if signum was raised */ while(sigismember(&TS.sigsRaised, signum)) { /* Clear signum from the set of raised signals */ if(-1 == sigdelset(&TS.sigsRaised, signum)) assert(0); unsigned ndx= signum2dflt_sa_ndx(signum); /* See if any of our callbacks are for signum */ PTRVEC_loopFwd(&TS.sig_vec_arr[ndx], i, cb) { assert(ES_SIG_TYPE == cb->type); if(cb->flags & CB_DELETED_FLG) continue; /* Call the callback function */ rtn= (* cb->un.sig.callback_f)(cb->ctxt, signum); if(rtn) goto abort; } } } /********* Service timers ********/ if(PTRVEC_numItems(&TS.timer_vec)) { int64_t remaining_ms, time_ms; #if ! (defined (_WIN32) || defined (__CYGWIN__)) time_ms= clock_gettime_ms(CLOCK_MONOTONIC_COARSE); #else time_ms= clock_gettime_ms(CLOCK_MONOTONIC); #endif PTRVEC_loopFwd(&TS.timer_vec, i, cb) { if(cb->flags & CB_DELETED_FLG) continue; /* See how much time remains for this callback */ remaining_ms= msec2timeout(cb, time_ms); /* close enough */ if(remaining_ms < 2) { /* Keep track of how many times this timer has fired */ ++cb->un.timer.count; /* Call the callback function */ rtn= (* cb->un.timer.callback_f)(cb->ctxt); /* If this is a single-shot timer, get rid of it now */ if(!cb->un.timer.interval_ms) { ES_unregister(cb->key); /* Do this so next vector entry doesn't get skipped */ --i; } /* If the callback returned non-zero, bail out */ if(rtn) goto abort; } else break; /* time remaining will increase from here on out */ } } /********** Service file descriptors *******/ for(int i= 0; i < numFds; ++i) { struct pollfd *item= pollItemArr+i; if(!item->revents) continue; Cb *cb= cbArr[i]; if(cb->flags & CB_DELETED_FLG) continue; #if ! (defined (_WIN32) || defined (__CYGWIN__)) cb->lastActivity_ms= clock_gettime_ms(CLOCK_MONOTONIC_COARSE); #else cb->lastActivity_ms= clock_gettime_ms(CLOCK_MONOTONIC); #endif assert(ES_FD_TYPE == cb->type); rtn= (* cb->un.fd.callback_f)(cb->ctxt, item->fd, item->revents); /* If the callback returned non-zero, bail out */ if(rtn) goto abort; } { /*--- Free any callbacks marked for deletion ---*/ Cb *cb; TS.flags &= ~TS_PROCESSING_FLG; while((cb= PTRVEC_remTail(&TS.deleted_vec))) { if(ES_unregister(cb->key)) assert(0); } } } abort: return rtn; } pthread_t ES_spawn_thread_sched( void *(*user_main) (void *), void *arg, int sched_policy, /* SCHED_NORMAL || SCHED_FIFO || SCHED_RR || SCHED_BATCH */ int priority ) /********************************************************************** * Spawn a thread which will begin executing user_main(arg). * NOTE: the calling thread will be blocked until ES_release_parent() * is called from user_main()! * * user_main: function pointer where thread will execute. * arg: address passed to user_main(arg). * sched_policy: Which pthreads scheduling policy to use. * priority: pthreads priority to use. * * RETURNS: * 0 for success, nonzero for error */ { pthread_t tid; pthread_attr_t attr; int rtn; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); if(sched_policy == -1) { pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED); } else { struct sched_param sp; pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED); if(priority < sched_get_priority_min(sched_policy) || priority > sched_get_priority_max(sched_policy)) { eprintf("ERROR: priority= %d must be between %d and %d inclusive.", priority, sched_get_priority_min(sched_policy), sched_get_priority_max(sched_policy)); return 1; } if(pthread_attr_setschedpolicy(&attr, sched_policy)) assert(0); sp.sched_priority= priority; if(pthread_attr_setschedparam(&attr, &sp)) assert(0); } /* Get the global mutex */ ez_pthread_mutex_lock(&S.spawn.mtx); /* Get the condition ready for use */ pthread_cond_init(&S.spawn.cond, NULL); /* This is the flag we will test to know when the child is ready to recieve signals */ S.spawn.release_parent= 0; /* Get the condition mutex */ ez_pthread_mutex_lock(&S.spawn.cond_mtx); /* Spawn the new thread */ memset(&tid, 0, sizeof(tid)); /* JDR Sat 30 Nov 2019 10:39:04 AM EST * it appears that this fails at 300 threads. */ ez_pthread_create (&tid, &attr, user_main, arg); /* Now we, the parent, wait on the child */ while(!S.spawn.release_parent) { ez_pthread_cond_wait(&S.spawn.cond, &S.spawn.cond_mtx); } /* Release the condition mutex */ ez_pthread_mutex_unlock(&S.spawn.cond_mtx); /* Release the global lock */ ez_pthread_mutex_unlock(&S.spawn.mtx); return tid; } void ES_release_parent(void) /********************************************************************** * Called by a new thread created with ES_spawn_thread_sched(), so * that the parent can continue execution. */ { /* Condition manipulation must be protected by a mutex */ ez_pthread_mutex_lock (&S.spawn.cond_mtx); /* Note that parent may be released */ S.spawn.release_parent= 1; /* Signal the parent */ ez_pthread_cond_signal(&S.spawn.cond); /* Free up the condition mutex */ ez_pthread_mutex_unlock (&S.spawn.cond_mtx); } void ES_cleanup(void) /********************************************************************** * Called by a thread when it exits, to clean up resources. */ { assert(TS.tid == pthread_self()); /* Remove ourself from the vsig thread to TS map */ ez_pthread_mutex_lock(&S.vsig.mtx); MAP_removeTypedItem(&S.vsig.thrd_ts_map, TS.tid); ez_pthread_mutex_unlock(&S.vsig.mtx); /* Destroy all callbacks, which are indexed in key map, * and the key map itself. */ MAP_clearAndDestroy(&TS.key_map, (void*(*)(void*))Cb_destructor); /* Destroy vsignal infrastructure */ MAP_destructor(&TS.vsig.cb_map); /* Tear down the message queue */ MSGQUEUE_destructor(&TS.vsig.mq); PTRVEC_destructor(&TS.fd_vec); PTRVEC_destructor(&TS.timer_vec); for(unsigned i= 0; i < NUMSIGS; ++i) { PTRVEC_destructor(TS.sig_vec_arr+i); } } int ES_VSignal (pthread_t tid, int signum) /********************************************************************** * Send a virtual signal to tid, which is multiplexed on SIGUSR2. * * tid: Target thread identifier. * signum: Any integer number which is meaningful to your application. * * RETURNS: * 0: successful * -1: failures. */ { int rtn= EOF-1; /* find the correct TS by thread identifier */ ez_pthread_mutex_lock(&S.vsig.mtx); struct _TS *ts= MAP_findTypedItem(&S.vsig.thrd_ts_map, tid); if(!ts) { eprintf("ERROR: tid= %s not found!", pthread_t_str(tid)); goto abort; } assert(tid == ts->tid); /* Place virtual signal in message queue */ ez_MSGQUEUE_submitMsg(&ts->vsig.mq, &signum); /* And finally tell the target thread to check it's message queue */ if(pthread_kill(tid, SIGUSR2)) { sys_eprintf("ERROR: kill(%s, SIGUSR2)", pthread_t_str(tid)); goto abort; } rtn= 0; abort: ez_pthread_mutex_unlock(&S.vsig.mtx); return rtn; } /*=====================================================================================*/ /*===================== ez_xxx() ======================================================*/ /*=====================================================================================*/ /***************************************************/ ez_proto (int, ES_registerFd, int fd, short events, int (*callback_f)(void *ctxt, int fd, short events), void *ctxt ) { int rtn= ES_registerFd(fd, events, callback_f, ctxt); if(-1 == rtn) { _eprintf( #ifdef DEBUG fileName, lineNo, funcName, #endif "ES_registerFd() failed."); abort(); } return rtn; } /***************************************************/ ez_proto (int, ES_registerSignal, int signum, int (*callback_f)(void *ctxt, int signo), void *ctxt ) { int rtn= ES_registerSignal(signum, callback_f, ctxt); if(-1 == rtn) { _eprintf( #ifdef DEBUG fileName, lineNo, funcName, #endif "ES_registerSignal() failed."); abort(); } return rtn; } /***************************************************/ ez_proto (int, ES_registerVSignal, int signum, int (*callback_f)(void *ctxt,int signo), void *ctxt ) { int rtn= ES_registerVSignal(signum, callback_f, ctxt); if(-1 == rtn) { _eprintf( #ifdef DEBUG fileName, lineNo, funcName, #endif "ES_registerVSignal() failed."); abort(); } return rtn; } /***************************************************/ ez_proto (int, ES_VSignal, pthread_t tid, int signum) { int rtn= ES_VSignal(tid, signum); if(rtn) { _eprintf( #ifdef DEBUG fileName, lineNo, funcName, #endif "ES_VSignal() returned %d.", rtn); abort(); } return rtn; } /***************************************************/ ez_proto (int, ES_registerTimer, int64_t pause_ms, int64_t interval_ms, int (*callback_f)(void *ctxt), void *ctxt ) { int rtn= ES_registerTimer(pause_ms, interval_ms, callback_f, ctxt); if(-1 == rtn) { _eprintf( #ifdef DEBUG fileName, lineNo, funcName, #endif "ES_registerTimer() failed."); abort(); } return rtn; } /***************************************************/ ez_proto ( int, ES_unregister, int key) { int rtn= ES_unregister(key); if(-1 == rtn) { _eprintf( #ifdef DEBUG fileName, lineNo, funcName, #endif "ES_unregister() failed."); abort(); } return rtn; } /***************************************************/ ez_proto (int, ES_run) { int rtn= ES_run(); if(rtn) { _eprintf( #ifdef DEBUG fileName, lineNo, funcName, #endif "ES_run() returned %d", rtn); abort(); } return rtn; }