2020-09-15 15:31:27 +00:00
# define _GNU_SOURCE
2019-11-29 22:23:16 +00:00
# include <errno.h>
# include <stdlib.h>
# include <string.h>
# include <signal.h>
# include <poll.h>
# include <time.h>
# include <assert.h>
2020-09-15 15:31:27 +00:00
# include "es.h"
2019-12-05 16:00:39 +00:00
# include "ez_libpthread.h"
2020-09-15 15:31:27 +00:00
# include "map.h"
# include "msgqueue.h"
# include "util.h"
2019-11-29 22:23:16 +00:00
/* Types of registered callbacks */
enum ES_type {
ES_FD_TYPE ,
ES_SIG_TYPE ,
ES_VSIG_TYPE ,
ES_TIMER_TYPE
} ;
# define NUMSIGS 30
2019-11-30 19:14:42 +00:00
/* NOTE: if this queue becomes full, it
* can wreak havok on your mutlithreading
* logic !
*/
# define VSIG_QUEUE_MAX 1000
2019-11-29 22:23:16 +00:00
/****************************************************
* We get one of these anonymous structs per - process .
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
static struct {
pthread_mutex_t mtx ; /* global initialization mutex */
volatile int keySrc ;
volatile enum {
GLOBAL_INIT_FLG = 1 < < 0
} flags ;
/* Default sigaction stuff */
struct {
sigset_t set ;
struct sigaction arr [ NUMSIGS ] ;
} dflt_sa ;
struct { /* Stuff for ES_spawn_thread_sched() */
pthread_cond_t cond ; /* Condition used for thread synchronization */
pthread_mutex_t cond_mtx ; /* condition mutex */
pthread_mutex_t mtx ; /* mutex for ES spawn operation */
int release_parent ; /* Value to test for pthread_cond_wait() */
} spawn ;
struct {
pthread_mutex_t mtx ; /* mutex for virtual signal operations */
MAP thrd_ts_map ; /* Map associating thread identifier to TS object */
} vsig ;
} S = {
. mtx = PTHREAD_MUTEX_INITIALIZER ,
. spawn . cond = PTHREAD_COND_INITIALIZER ,
. spawn . cond_mtx = PTHREAD_MUTEX_INITIALIZER ,
. spawn . mtx = PTHREAD_MUTEX_INITIALIZER ,
. vsig . mtx = PTHREAD_MUTEX_INITIALIZER
} ;
/****************************************************
* We get one of these anonymous structs per - thread .
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
static _Thread_local struct _TS {
/* Current pthread identifier. If it doesn't match
* pthread_self ( ) , then we are not yet initialized
* in the current thread .
*/
pthread_t tid ;
2020-09-15 15:31:27 +00:00
enum {
TS_PROCESSING_FLG = 1 < < 0
} flags ;
2019-11-29 22:23:16 +00:00
/* Vectors of Cb by type, for fast processing */
PTRVEC fd_vec ,
2020-09-15 15:31:27 +00:00
timer_vec ,
deleted_vec ;
2019-11-29 22:23:16 +00:00
PTRVEC sig_vec_arr [ NUMSIGS ] ; // One vector for each Unix signal
/* Hash table to quickly find Cb's */
MAP key_map ;
/* Simple bit field to know if a signal has been
* raised at least once .
*/
sigset_t sigsRaised ;
struct {
/* virtual signal message queue */
MSGQUEUE mq ;
2019-12-05 19:50:01 +00:00
MAP cb_map ;
2019-11-29 22:23:16 +00:00
} vsig ;
2020-09-15 15:31:27 +00:00
2019-11-29 22:23:16 +00:00
} TS ;
static void
UnixSignalHandler ( int signo )
/****************************************************
* Unix signal handler
*/
{
/* Simply note that a signal has been raised */
sigaddset ( & TS . sigsRaised , signo ) ;
}
/******************************************************************/
/** Class for callback objects ************************************/
/******************************************************************/
typedef struct {
2020-09-15 15:31:27 +00:00
enum {
CB_DELETED_FLG = 1 < < 31
} flags ;
2019-11-29 22:23:16 +00:00
int64_t lastActivity_ms ;
/* Process-wide unique integer */
int key ;
/* Which type of callback object */
enum ES_type type ;
/* Registrant's supplied context pointer, passed back
* into callback function
*/
void * ctxt ;
union { /* Union to accommodate the different callback types */
/* Unix file descriptors */
struct {
int fd ;
short events ;
int ( * callback_f ) ( void * ctxt , int fd , short events ) ;
} fd ;
/* Unix & virtual signals */
struct {
int signum ;
int ( * callback_f ) ( void * ctxt , int signo ) ;
} sig ;
/* Interval timers */
struct {
int64_t register_ms ,
pause_ms ,
interval_ms ,
remaining_ms ,
count ;
int ( * callback_f ) ( void * ctxt ) ;
} timer ;
} un ;
} Cb ;
static int64_t
msec2timeout ( const Cb * cb , int64_t time_ms )
/*********************************************************************
* Compute the number of milliseconds remaining for an interval timer .
* May be negative if timeout should have already happened .
*/
{
assert ( ES_TIMER_TYPE = = cb - > type ) ;
int64_t when_ms = cb - > un . timer . register_ms +
cb - > un . timer . pause_ms +
cb - > un . timer . count * cb - > un . timer . interval_ms ;
return when_ms - time_ms ;
}
# define Cb_FdCreate(self, fd, events, callback_f, ctxt)\
( Cb_FdConstructor ( ( self ) = malloc ( sizeof ( Cb ) ) , fd , events , callback_f , ctxt ) ? ( self ) : ( self ? realloc ( Cb_destructor ( self ) , 0 ) : 0 ) )
static Cb *
Cb_FdConstructor (
Cb * self ,
int fd ,
short events ,
int ( * callback_f ) ( void * ctxt , int fd , short events ) ,
void * ctxt
)
/*********************************************************************
* Initialize for Unix fd .
*/
{
assert ( self ) ;
2020-09-15 15:31:27 +00:00
memset ( self , 0 , sizeof ( * self ) ) ;
2019-11-29 22:23:16 +00:00
self - > key = + + S . keySrc ;
self - > type = ES_FD_TYPE ;
self - > un . fd . fd = fd ;
self - > un . fd . callback_f = callback_f ;
self - > un . fd . events = events ;
self - > ctxt = ctxt ;
return self ;
}
# define Cb_SignalCreate(self, signum, callback_f, ctxt)\
( Cb_SignalConstructor ( ( self ) = malloc ( sizeof ( Cb ) ) , signum , callback_f , ctxt ) ? ( self ) : ( self ? realloc ( Cb_destructor ( self ) , 0 ) : 0 ) )
static Cb *
Cb_SignalConstructor (
Cb * self ,
int signum ,
int ( * callback_f ) ( void * ctxt , int signum ) ,
void * ctxt
)
/*********************************************************************
* Initialize for Unix signal .
*/
{
assert ( self ) ;
2020-09-15 15:31:27 +00:00
memset ( self , 0 , sizeof ( * self ) ) ;
2019-11-29 22:23:16 +00:00
self - > key = + + S . keySrc ;
self - > type = ES_SIG_TYPE ;
self - > un . sig . signum = signum ;
self - > un . sig . callback_f = callback_f ;
self - > ctxt = ctxt ;
return self ;
}
# define Cb_VSignalCreate(self, signum, callback_f, ctxt)\
( Cb_VSignalConstructor ( ( self ) = malloc ( sizeof ( Cb ) ) , signum , callback_f , ctxt ) ? ( self ) : ( self ? realloc ( Cb_destructor ( self ) , 0 ) : 0 ) )
static Cb *
Cb_VSignalConstructor (
Cb * self ,
int signum ,
int ( * callback_f ) ( void * ctxt , int signum ) ,
void * ctxt
)
/*********************************************************************
* Initialize for Unix signal .
*/
{
assert ( self ) ;
2020-09-15 15:31:27 +00:00
memset ( self , 0 , sizeof ( * self ) ) ;
2019-11-29 22:23:16 +00:00
self - > key = + + S . keySrc ;
self - > type = ES_VSIG_TYPE ;
self - > un . sig . signum = signum ;
self - > un . sig . callback_f = callback_f ;
self - > ctxt = ctxt ;
return self ;
}
# define Cb_TimerCreate(self, pause_secs, interval_secs, callback_f, ctxt)\
( Cb_TimerConstructor ( ( self ) = malloc ( sizeof ( Cb ) ) , pause_secs , interval_secs , callback_f , ctxt ) ? ( self ) : ( self ? realloc ( Cb_destructor ( self ) , 0 ) : 0 ) )
static Cb *
Cb_TimerConstructor (
Cb * self ,
int64_t pause_ms ,
int64_t interval_ms ,
int ( * callback_f ) ( void * ctxt ) ,
void * ctxt
)
/*********************************************************************
* Initialize for an interval timer .
*/
{
assert ( self ) ;
2020-09-15 15:31:27 +00:00
memset ( self , 0 , sizeof ( * self ) ) ;
2019-11-29 22:23:16 +00:00
self - > key = + + S . keySrc ;
self - > type = ES_TIMER_TYPE ;
# if ! (defined (_WIN32) || defined (__CYGWIN__))
self - > un . timer . register_ms = clock_gettime_ms ( CLOCK_MONOTONIC_COARSE ) ;
# else
self - > un . timer . register_ms = clock_gettime_ms ( CLOCK_MONOTONIC ) ;
# endif
self - > un . timer . pause_ms = pause_ms ;
self - > un . timer . interval_ms = interval_ms ;
self - > un . timer . callback_f = callback_f ;
self - > un . timer . count = 0 ;
self - > ctxt = ctxt ;
return self ;
}
# define Cb_destroy(s)\
{ if ( Cb_destructor ( s ) ) { free ( s ) ; ( s ) = NULL ; } }
static void *
Cb_destructor ( Cb * self )
/************************************************
* Free resources associated with object .
*/
{
return self ;
}
/******************************************************************/
/***************** ES *********************************************/
/******************************************************************/
static int
sigusr2_h ( void * ctxt , int unused )
/**********************************************************************
* Handle any vsignals .
*/
{
2019-12-05 19:50:01 +00:00
int rtn = 0 ,
vsigno ;
2019-11-29 22:23:16 +00:00
Cb * cb_arr [ VSIG_QUEUE_MAX ] ;
2019-12-05 19:50:01 +00:00
/* Protected operation */
// ez_pthread_mutex_lock(&S.vsig.mtx);
2019-11-29 22:23:16 +00:00
while ( EOF ! = MSGQUEUE_extractMsg ( & TS . vsig . mq , & vsigno ) ) {
2019-12-05 19:50:01 +00:00
int rc = MAP_findItems ( & TS . vsig . cb_map , ( void * * ) cb_arr , VSIG_QUEUE_MAX , & vsigno , sizeof ( int ) ) ;
if ( - 1 = = rc ) {
eprintf ( " FATAL: MAP_findItems() failed " ) ;
abort ( ) ;
}
if ( ! rc ) goto abort ;
2019-11-29 22:23:16 +00:00
for ( int i = 0 ; i < rc ; + + i ) {
Cb * cb = cb_arr [ i ] ;
2019-12-05 19:50:01 +00:00
rtn = ( * cb - > un . sig . callback_f ) ( cb - > ctxt , vsigno ) ;
if ( rtn ) goto abort ;
2019-11-29 22:23:16 +00:00
}
}
2019-12-05 19:50:01 +00:00
abort :
// ez_pthread_mutex_unlock(&S.vsig.mtx);
return rtn ;
2019-11-29 22:23:16 +00:00
}
static void
initialize ( )
/**********************************************************************
* Initialization for current thread , and once for the whole process .
*/
{
/* Get the global mutex */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_lock ( & S . mtx ) ;
2019-11-29 22:23:16 +00:00
/* Processwide static data */
if ( ! ( S . flags & GLOBAL_INIT_FLG ) ) {
S . flags | = GLOBAL_INIT_FLG ;
2020-09-15 15:31:27 +00:00
if ( - 1 = = sigemptyset ( & S . dflt_sa . set ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
MAP_constructor ( & S . vsig . thrd_ts_map , 10 , 10 ) ;
}
/* Release the global mutex */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_unlock ( & S . mtx ) ;
2019-11-29 22:23:16 +00:00
/* Per-thread static data */
PTRVEC_constructor ( & TS . fd_vec , 10 ) ;
PTRVEC_constructor ( & TS . timer_vec , 10 ) ;
2020-09-15 15:31:27 +00:00
PTRVEC_constructor ( & TS . deleted_vec , 10 ) ;
2019-11-29 22:23:16 +00:00
for ( int i = 0 ; i < NUMSIGS ; + + i ) {
PTRVEC_constructor ( & TS . sig_vec_arr [ i ] , 10 ) ;
}
2020-09-15 15:31:27 +00:00
if ( - 1 = = sigemptyset ( & TS . sigsRaised ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
MAP_constructor ( & TS . key_map , 10 , 10 ) ;
/* Remember so we don't call ourselves more than once in the same thread */
TS . tid = pthread_self ( ) ;
/* Add ourself to the vsig thread to TS map */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_lock ( & S . vsig . mtx ) ;
2019-11-29 22:23:16 +00:00
MAP_addTypedKey ( & S . vsig . thrd_ts_map , TS . tid , & TS ) ;
/*--- virtual signal infrastructure ---*/
MSGQUEUE_constructor ( & TS . vsig . mq , sizeof ( int ) , VSIG_QUEUE_MAX ) ;
2019-12-05 19:50:01 +00:00
MAP_constructor ( & TS . vsig . cb_map , 10 , 10 ) ;
2019-11-29 22:23:16 +00:00
/* Register a signal handler for SIGUSR2 so we can have virtual signals. */
2019-12-05 16:00:39 +00:00
ez_ES_registerSignal ( SIGUSR2 , sigusr2_h , NULL ) ;
ez_pthread_mutex_unlock ( & S . vsig . mtx ) ;
2019-11-29 22:23:16 +00:00
}
inline static unsigned
signum2dflt_sa_ndx ( int signum )
/**********************************************************************
* Convert signum to an index for S . dflt_sa . XX
*/
{
assert ( SIGKILL ! = signum & & SIGSTOP ! = signum ) ;
if ( signum < SIGKILL ) return signum - 1 ;
if ( signum < SIGSTOP ) return signum - 2 ;
return signum - 3 ;
}
int
ES_registerSignal (
int signum ,
int ( * callback_f ) ( void * ctxt , int signo ) ,
void * ctxt
)
/**********************************************************************
* Register a function to be called when a particular Unix signal is
* raised .
*/
{
if ( TS . tid ! = pthread_self ( ) ) initialize ( ) ;
Cb * cb ;
unsigned ndx = signum2dflt_sa_ndx ( signum ) ;
/* Only install a new Unix signal handler if we do not already handle this signal */
if ( ! PTRVEC_numItems ( & TS . sig_vec_arr [ ndx ] ) ) {
struct sigaction act ;
act . sa_handler = UnixSignalHandler ;
sigemptyset ( & act . sa_mask ) ;
act . sa_flags = SA_RESTART | SA_NODEFER ;
/* We only store the default action once per process */
if ( ! sigismember ( & S . dflt_sa . set , signum ) ) {
sigaddset ( & S . dflt_sa . set , signum ) ;
2020-09-15 15:31:27 +00:00
if ( sigaction ( signum , & act , & S . dflt_sa . arr [ ndx ] ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
} else {
2020-09-15 15:31:27 +00:00
if ( sigaction ( signum , & act , NULL ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
}
}
2020-09-15 15:31:27 +00:00
if ( ! Cb_SignalCreate ( cb , signum , callback_f , ctxt ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
/* All callbacks are put in the key table */
MAP_addTypedKey ( & TS . key_map , cb - > key , cb ) ;
/* Add to the signal vector */
PTRVEC_addTail ( & TS . sig_vec_arr [ signum2dflt_sa_ndx ( signum ) ] , cb ) ;
return cb - > key ;
}
int
ES_registerVSignal (
int signum ,
int ( * callback_f ) ( void * ctxt , int signo ) ,
void * ctxt
)
/**********************************************************************
* Register a function to be called when a particular virtual signal is
* raised . Virtual signals are implemented on top of the Unix signal , SIGUSR2 .
*
* signum : Any integer number which is meaningful to your application .
* callback_f : callback function for when activity is detected .
2019-12-04 16:40:09 +00:00
* ctxt : Pointer which will be passed as the first argument to callback_f ( ) .
2019-11-29 22:23:16 +00:00
*
* RETURNS :
* If successful , a positive integer which can be used to unregister the callback .
* On failure , - 1 is returned .
*/
{
2019-11-30 05:10:25 +00:00
if ( TS . tid ! = pthread_self ( ) ) initialize ( ) ;
2019-11-29 22:23:16 +00:00
Cb * cb ;
2020-09-15 15:31:27 +00:00
if ( ! Cb_VSignalCreate ( cb , signum , callback_f , ctxt ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
/* Place in the virtual signal map indexed on signum */
2019-12-05 19:50:01 +00:00
MAP_addTypedKey ( & TS . vsig . cb_map , cb - > un . sig . signum , cb ) ;
2019-11-29 22:23:16 +00:00
/* All callbacks are put in the key table */
MAP_addTypedKey ( & TS . key_map , cb - > key , cb ) ;
return cb - > key ;
}
int
ES_registerFd (
int fd ,
short events ,
int ( * callback_f ) ( void * ctxt , int fd , short events ) ,
void * ctxt
)
/**********************************************************************
* Register a function to be called when there is activity on the
* file descriptor ( which may be a file , socket , pipe , etc . under Unix ) .
*/
{
if ( TS . tid ! = pthread_self ( ) ) initialize ( ) ;
Cb * cb ;
2020-09-15 15:31:27 +00:00
if ( ! Cb_FdCreate ( cb , fd , events , callback_f , ctxt ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
/* Index to vector for quick processing */
PTRVEC_addTail ( & TS . fd_vec , cb ) ;
/* All callbacks are put in the key table */
MAP_addTypedKey ( & TS . key_map , cb - > key , cb ) ;
return cb - > key ;
}
int
ES_registerTimer (
int64_t pause_ms ,
int64_t interval_ms ,
int ( * callback_f ) ( void * ctxt ) ,
void * ctxt
)
/**********************************************************************
* Register a function to be called when a timer times out .
*/
{
if ( TS . tid ! = pthread_self ( ) ) initialize ( ) ;
Cb * cb ;
2020-09-15 15:31:27 +00:00
if ( ! Cb_TimerCreate ( cb , pause_ms , interval_ms , callback_f , ctxt ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
/* Add to the timer vector */
PTRVEC_addTail ( & TS . timer_vec , cb ) ;
/* All callbacks are put in the key table */
MAP_addTypedKey ( & TS . key_map , cb - > key , cb ) ;
return cb - > key ;
}
int
ES_unregister ( int key )
/**********************************************************************
* Unegister a previously registered callback .
*/
{
if ( TS . tid ! = pthread_self ( ) ) initialize ( ) ;
Cb * cb = MAP_findTypedItem ( & TS . key_map , key ) ;
2020-09-15 15:31:27 +00:00
if ( ! cb ) {
# ifdef qqDEBUG
eprintf ( " WARNING: could not find callback with key= %d " , key ) ;
# endif
return - 1 ;
}
/* If the callback processing is currently active, do not delete it */
if ( TS . flags & TS_PROCESSING_FLG ) {
if ( ! ( cb - > flags & CB_DELETED_FLG ) ) {
cb - > flags | = CB_DELETED_FLG ;
PTRVEC_addTail ( & TS . deleted_vec , cb ) ;
}
return 0 ;
}
2019-11-29 22:23:16 +00:00
/* Remove from key table */
2020-09-15 15:31:27 +00:00
if ( ! MAP_removeTypedItem ( & TS . key_map , key ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
/* Different operations needed based on type */
switch ( cb - > type ) {
case ES_FD_TYPE :
/* Remove from file descriptor vector */
2020-09-15 15:31:27 +00:00
if ( ! PTRVEC_remove ( & TS . fd_vec , cb ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
break ;
case ES_SIG_TYPE :
{
unsigned ndx = signum2dflt_sa_ndx ( cb - > un . sig . signum ) ;
/* Remove from appropriate signals vector */
2020-09-15 15:31:27 +00:00
if ( ! PTRVEC_remove ( & TS . sig_vec_arr [ ndx ] , cb ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
/* If there are no more signals in this vector */
if ( ! PTRVEC_numItems ( & TS . sig_vec_arr [ ndx ] ) ) {
assert ( sigismember ( & S . dflt_sa . set , cb - > un . sig . signum ) ) ;
/* Restore default signal handling */
2020-09-15 15:31:27 +00:00
if ( sigaction ( cb - > un . sig . signum , & S . dflt_sa . arr [ ndx ] , NULL ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
}
} break ;
case ES_VSIG_TYPE :
2020-09-15 15:31:27 +00:00
if ( ! MAP_removeSpecificTypedItem ( & TS . vsig . cb_map , cb - > un . sig . signum , cb ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
break ;
case ES_TIMER_TYPE :
/* Remove from timer vector */
2020-09-15 15:31:27 +00:00
if ( ! PTRVEC_remove ( & TS . timer_vec , cb ) )
assert ( 0 ) ;
2019-11-29 22:23:16 +00:00
break ;
default :
assert ( 0 ) ;
} ;
/* Free the callback object */
Cb_destroy ( cb ) ;
return 0 ;
}
static int
cmp_remaining_ms ( const void * const * p1 , const void * const * p2 )
/**********************************************************************
* Compare the time remaining for PTRVEC_sort ( ) .
*/
{
const Cb * cb1 = ( const Cb * ) ( * p1 ) ,
* cb2 = ( const Cb * ) ( * p2 ) ;
assert ( ES_TIMER_TYPE = = cb1 - > type & &
ES_TIMER_TYPE = = cb2 - > type ) ;
if ( cb1 - > un . timer . remaining_ms < cb2 - > un . timer . remaining_ms ) return - 1 ;
if ( cb1 - > un . timer . remaining_ms = = cb2 - > un . timer . remaining_ms ) return 0 ;
return 1 ;
}
static int
lastActivity_cmp ( const void * const * pp1 , const void * const * pp2 )
{
const Cb * cb1 = * ( const Cb * const * ) pp1 ,
* cb2 = * ( const Cb * const * ) pp2 ;
/* Put oldest at the top of the vector */
if ( cb1 - > lastActivity_ms < cb2 - > lastActivity_ms ) return - 1 ;
return 1 ;
}
int
ES_run ( void )
/**********************************************************************
* For this thread , use poll ( ) to monitor socket activity until one
* of the registered callback_f ( ) returns non - zero .
*
* RETURNS :
* Whatever nonzero value one of the callbacks ( ) returned .
*/
{
2020-09-15 15:31:27 +00:00
int rtn = - 1 ;
2019-11-29 22:23:16 +00:00
if ( TS . tid ! = pthread_self ( ) )
initialize ( ) ;
/* Loop forever */
for ( ; ; ) {
2020-09-15 15:31:27 +00:00
TS . flags | = TS_PROCESSING_FLG ;
2019-11-29 22:23:16 +00:00
int numFds = PTRVEC_numItems ( & TS . fd_vec ) ;
struct pollfd pollItemArr [ numFds ] ;
Cb * cbArr [ numFds ] ;
/* This sort provides fair queuing */
PTRVEC_sort ( & TS . fd_vec , lastActivity_cmp ) ;
2020-09-15 15:31:27 +00:00
/****** Load up the pollItemArr *****/
2019-11-29 22:23:16 +00:00
unsigned i ;
Cb * cb ;
PTRVEC_loopFwd ( & TS . fd_vec , i , cb ) {
struct pollfd * item = pollItemArr + i ;
switch ( cb - > type ) {
case ES_FD_TYPE :
item - > fd = cb - > un . fd . fd ;
item - > events = cb - > un . fd . events ;
break ;
default :
assert ( 0 ) ;
}
/* Clear the return event field for good measure */
item - > revents = 0 ;
/* Remember the Cb object */
cbArr [ i ] = cb ;
}
/* There may not be any timers */
int64_t poll_ms = - 1 ;
/***** If there are any timers to consider ****/
if ( PTRVEC_numItems ( & TS . timer_vec ) ) {
# if ! (defined (_WIN32) || defined (__CYGWIN__))
int64_t time_ms = clock_gettime_ms ( CLOCK_MONOTONIC_COARSE ) ;
# else
int64_t time_ms = clock_gettime_ms ( CLOCK_MONOTONIC ) ;
# endif
/* Prepare timers to be sorted */
unsigned i ;
PTRVEC_loopFwd ( & TS . timer_vec , i , cb ) {
assert ( ES_TIMER_TYPE = = cb - > type ) ;
cb - > un . timer . remaining_ms = msec2timeout ( cb , time_ms ) ;
}
/* Sort them so the most urgent timer is at the top */
PTRVEC_sort ( & TS . timer_vec , cmp_remaining_ms ) ;
/* Get the top item */
cb = PTRVEC_first ( & TS . timer_vec ) ;
assert ( cb ) ;
/* This is how long we need to wait */
poll_ms = MAX ( cb - > un . timer . remaining_ms , 0 ) ;
}
/*******************************************************/
/******** Wait for something to happen *****************/
/*******************************************************/
int poll_rc = poll ( pollItemArr , numFds , poll_ms ) ;
/********* Check return code *****/
if ( - 1 = = poll_rc ) {
switch ( errno ) {
case EFAULT :
2020-09-15 15:31:27 +00:00
eprintf ( " ERROR: poll() failed " ) ;
goto abort ;
2019-11-29 22:23:16 +00:00
case EINTR :
/* Signal caused poll() to return, which is OK */
break ;
default :
assert ( 0 ) ;
}
}
/********* Respond to signals *****/
int signum ;
for ( signum = 1 ; signum < 32 ; + + signum ) {
/* Can't do anything with these signals */
if ( signum = = SIGKILL | | signum = = SIGSTOP ) continue ;
/* See if signum was raised */
while ( sigismember ( & TS . sigsRaised , signum ) ) {
/* Clear signum from the set of raised signals */
if ( - 1 = = sigdelset ( & TS . sigsRaised , signum ) ) assert ( 0 ) ;
unsigned ndx = signum2dflt_sa_ndx ( signum ) ;
/* See if any of our callbacks are for signum */
PTRVEC_loopFwd ( & TS . sig_vec_arr [ ndx ] , i , cb ) {
assert ( ES_SIG_TYPE = = cb - > type ) ;
2020-09-15 15:31:27 +00:00
if ( cb - > flags & CB_DELETED_FLG )
continue ;
2019-11-29 22:23:16 +00:00
/* Call the callback function */
2020-09-15 15:31:27 +00:00
rtn = ( * cb - > un . sig . callback_f ) ( cb - > ctxt , signum ) ;
2019-11-29 22:23:16 +00:00
2020-09-15 15:31:27 +00:00
if ( rtn )
goto abort ;
2019-11-29 22:23:16 +00:00
}
}
}
/********* Service timers ********/
if ( PTRVEC_numItems ( & TS . timer_vec ) ) {
int64_t remaining_ms ,
time_ms ;
# if ! (defined (_WIN32) || defined (__CYGWIN__))
time_ms = clock_gettime_ms ( CLOCK_MONOTONIC_COARSE ) ;
# else
time_ms = clock_gettime_ms ( CLOCK_MONOTONIC ) ;
# endif
PTRVEC_loopFwd ( & TS . timer_vec , i , cb ) {
2020-09-15 15:31:27 +00:00
if ( cb - > flags & CB_DELETED_FLG )
continue ;
2019-11-29 22:23:16 +00:00
/* See how much time remains for this callback */
remaining_ms = msec2timeout ( cb , time_ms ) ;
/* close enough */
if ( remaining_ms < 2 ) {
/* Keep track of how many times this timer has fired */
+ + cb - > un . timer . count ;
/* Call the callback function */
2020-09-15 15:31:27 +00:00
rtn = ( * cb - > un . timer . callback_f ) ( cb - > ctxt ) ;
2019-11-29 22:23:16 +00:00
/* If this is a single-shot timer, get rid of it now */
if ( ! cb - > un . timer . interval_ms ) {
ES_unregister ( cb - > key ) ;
/* Do this so next vector entry doesn't get skipped */
- - i ;
}
/* If the callback returned non-zero, bail out */
2020-09-15 15:31:27 +00:00
if ( rtn )
goto abort ;
2019-11-29 22:23:16 +00:00
2020-09-15 15:31:27 +00:00
} else
break ; /* time remaining will increase from here on out */
2019-11-29 22:23:16 +00:00
}
}
/********** Service file descriptors *******/
for ( int i = 0 ; i < numFds ; + + i ) {
struct pollfd * item = pollItemArr + i ;
if ( ! item - > revents ) continue ;
Cb * cb = cbArr [ i ] ;
2020-09-15 15:31:27 +00:00
if ( cb - > flags & CB_DELETED_FLG )
continue ;
2019-11-29 22:23:16 +00:00
# if ! (defined (_WIN32) || defined (__CYGWIN__))
cb - > lastActivity_ms = clock_gettime_ms ( CLOCK_MONOTONIC_COARSE ) ;
# else
cb - > lastActivity_ms = clock_gettime_ms ( CLOCK_MONOTONIC ) ;
# endif
2020-09-15 15:31:27 +00:00
assert ( ES_FD_TYPE = = cb - > type ) ;
2019-11-29 22:23:16 +00:00
2020-09-15 15:31:27 +00:00
rtn = ( * cb - > un . fd . callback_f ) ( cb - > ctxt , item - > fd , item - > revents ) ;
2019-11-29 22:23:16 +00:00
2020-09-15 15:31:27 +00:00
/* If the callback returned non-zero, bail out */
if ( rtn )
goto abort ;
}
{ /*--- Free any callbacks marked for deletion ---*/
Cb * cb ;
TS . flags & = ~ TS_PROCESSING_FLG ;
while ( ( cb = PTRVEC_remTail ( & TS . deleted_vec ) ) ) {
if ( ES_unregister ( cb - > key ) )
2019-11-29 22:23:16 +00:00
assert ( 0 ) ;
}
}
}
2020-09-15 15:31:27 +00:00
abort :
return rtn ;
2019-11-29 22:23:16 +00:00
}
pthread_t
ES_spawn_thread_sched (
void * ( * user_main ) ( void * ) ,
void * arg ,
int sched_policy , /* SCHED_NORMAL || SCHED_FIFO || SCHED_RR || SCHED_BATCH */
int priority
)
/**********************************************************************
* Spawn a thread which will begin executing user_main ( arg ) .
* NOTE : the calling thread will be blocked until ES_release_parent ( )
* is called from user_main ( ) !
*
* user_main : function pointer where thread will execute .
* arg : address passed to user_main ( arg ) .
* sched_policy : Which pthreads scheduling policy to use .
* priority : pthreads priority to use .
*
* RETURNS :
* 0 for success , nonzero for error
*/
{
pthread_t tid ;
pthread_attr_t attr ;
int rtn ;
pthread_attr_init ( & attr ) ;
pthread_attr_setdetachstate ( & attr , PTHREAD_CREATE_JOINABLE ) ;
if ( sched_policy = = - 1 ) {
pthread_attr_setinheritsched ( & attr , PTHREAD_INHERIT_SCHED ) ;
} else {
struct sched_param sp ;
pthread_attr_setinheritsched ( & attr , PTHREAD_EXPLICIT_SCHED ) ;
if ( priority < sched_get_priority_min ( sched_policy ) | |
priority > sched_get_priority_max ( sched_policy ) ) {
eprintf ( " ERROR: priority= %d must be between %d and %d inclusive. " , priority , sched_get_priority_min ( sched_policy ) , sched_get_priority_max ( sched_policy ) ) ;
return 1 ;
}
if ( pthread_attr_setschedpolicy ( & attr , sched_policy ) ) assert ( 0 ) ;
sp . sched_priority = priority ;
if ( pthread_attr_setschedparam ( & attr , & sp ) ) assert ( 0 ) ;
}
/* Get the global mutex */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_lock ( & S . spawn . mtx ) ;
2019-11-29 22:23:16 +00:00
/* Get the condition ready for use */
pthread_cond_init ( & S . spawn . cond , NULL ) ;
/* This is the flag we will test to know when the child is ready to recieve signals */
S . spawn . release_parent = 0 ;
/* Get the condition mutex */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_lock ( & S . spawn . cond_mtx ) ;
2019-11-29 22:23:16 +00:00
/* Spawn the new thread */
2019-11-30 05:10:25 +00:00
memset ( & tid , 0 , sizeof ( tid ) ) ;
2019-11-30 19:14:42 +00:00
/* JDR Sat 30 Nov 2019 10:39:04 AM EST
* it appears that this fails at 300 threads .
*/
2019-12-05 16:00:39 +00:00
ez_pthread_create ( & tid , & attr , user_main , arg ) ;
2019-11-29 22:23:16 +00:00
/* Now we, the parent, wait on the child */
while ( ! S . spawn . release_parent ) {
2019-12-05 16:00:39 +00:00
ez_pthread_cond_wait ( & S . spawn . cond , & S . spawn . cond_mtx ) ;
2019-11-29 22:23:16 +00:00
}
/* Release the condition mutex */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_unlock ( & S . spawn . cond_mtx ) ;
2019-11-29 22:23:16 +00:00
/* Release the global lock */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_unlock ( & S . spawn . mtx ) ;
2019-11-29 22:23:16 +00:00
return tid ;
}
void
ES_release_parent ( void )
/**********************************************************************
* Called by a new thread created with ES_spawn_thread_sched ( ) , so
* that the parent can continue execution .
*/
{
/* Condition manipulation must be protected by a mutex */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_lock ( & S . spawn . cond_mtx ) ;
2019-11-29 22:23:16 +00:00
/* Note that parent may be released */
S . spawn . release_parent = 1 ;
/* Signal the parent */
2019-12-05 16:00:39 +00:00
ez_pthread_cond_signal ( & S . spawn . cond ) ;
2019-11-29 22:23:16 +00:00
/* Free up the condition mutex */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_unlock ( & S . spawn . cond_mtx ) ;
2019-11-29 22:23:16 +00:00
}
void
ES_cleanup ( void )
/**********************************************************************
* Called by a thread when it exits , to clean up resources .
*/
{
assert ( TS . tid = = pthread_self ( ) ) ;
/* Remove ourself from the vsig thread to TS map */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_lock ( & S . vsig . mtx ) ;
2019-11-29 22:23:16 +00:00
MAP_removeTypedItem ( & S . vsig . thrd_ts_map , TS . tid ) ;
2019-12-05 19:50:01 +00:00
ez_pthread_mutex_unlock ( & S . vsig . mtx ) ;
2019-11-29 22:23:16 +00:00
2019-12-05 19:50:01 +00:00
/* Destroy all callbacks, which are indexed in key map,
* and the key map itself .
*/
MAP_clearAndDestroy ( & TS . key_map , ( void * ( * ) ( void * ) ) Cb_destructor ) ;
2019-11-29 22:23:16 +00:00
2019-12-05 19:50:01 +00:00
/* Destroy vsignal infrastructure */
MAP_destructor ( & TS . vsig . cb_map ) ;
2019-11-29 22:23:16 +00:00
2019-12-05 19:50:01 +00:00
/* Tear down the message queue */
MSGQUEUE_destructor ( & TS . vsig . mq ) ;
2019-11-29 22:23:16 +00:00
PTRVEC_destructor ( & TS . fd_vec ) ;
PTRVEC_destructor ( & TS . timer_vec ) ;
for ( unsigned i = 0 ; i < NUMSIGS ; + + i ) {
PTRVEC_destructor ( TS . sig_vec_arr + i ) ;
}
}
int
ES_VSignal ( pthread_t tid , int signum )
/**********************************************************************
2019-11-30 05:10:25 +00:00
* Send a virtual signal to tid , which is multiplexed on SIGUSR2 .
2019-11-29 22:23:16 +00:00
*
* tid : Target thread identifier .
* signum : Any integer number which is meaningful to your application .
*
* RETURNS :
* 0 : successful
2019-11-30 05:10:25 +00:00
* - 1 : failures .
2019-11-29 22:23:16 +00:00
*/
{
int rtn = EOF - 1 ;
/* find the correct TS by thread identifier */
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_lock ( & S . vsig . mtx ) ;
2019-11-29 22:23:16 +00:00
struct _TS * ts = MAP_findTypedItem ( & S . vsig . thrd_ts_map , tid ) ;
if ( ! ts ) {
eprintf ( " ERROR: tid= %s not found! " , pthread_t_str ( tid ) ) ;
goto abort ;
}
assert ( tid = = ts - > tid ) ;
/* Place virtual signal in message queue */
2019-11-30 19:14:42 +00:00
ez_MSGQUEUE_submitMsg ( & ts - > vsig . mq , & signum ) ;
2019-11-29 22:23:16 +00:00
/* And finally tell the target thread to check it's message queue */
if ( pthread_kill ( tid , SIGUSR2 ) ) {
sys_eprintf ( " ERROR: kill(%s, SIGUSR2) " , pthread_t_str ( tid ) ) ;
goto abort ;
}
rtn = 0 ;
abort :
2019-12-05 16:00:39 +00:00
ez_pthread_mutex_unlock ( & S . vsig . mtx ) ;
2019-11-29 22:23:16 +00:00
return rtn ;
}
2020-09-15 15:31:27 +00:00
/*=====================================================================================*/
/*===================== ez_xxx() ======================================================*/
/*=====================================================================================*/
/***************************************************/
ez_proto ( int , ES_registerFd ,
int fd ,
short events ,
int ( * callback_f ) ( void * ctxt , int fd , short events ) ,
void * ctxt
)
{
int rtn = ES_registerFd ( fd , events , callback_f , ctxt ) ;
if ( - 1 = = rtn ) {
_eprintf (
# ifdef DEBUG
fileName , lineNo , funcName ,
# endif
" ES_registerFd() failed. " ) ;
abort ( ) ;
}
return rtn ;
}
/***************************************************/
ez_proto ( int , ES_registerSignal ,
int signum ,
int ( * callback_f ) ( void * ctxt , int signo ) ,
void * ctxt
)
{
int rtn = ES_registerSignal ( signum , callback_f , ctxt ) ;
if ( - 1 = = rtn ) {
_eprintf (
# ifdef DEBUG
fileName , lineNo , funcName ,
# endif
" ES_registerSignal() failed. " ) ;
abort ( ) ;
}
return rtn ;
}
/***************************************************/
ez_proto ( int , ES_registerVSignal ,
int signum ,
int ( * callback_f ) ( void * ctxt , int signo ) ,
void * ctxt
)
{
int rtn = ES_registerVSignal ( signum , callback_f , ctxt ) ;
if ( - 1 = = rtn ) {
_eprintf (
# ifdef DEBUG
fileName , lineNo , funcName ,
# endif
" ES_registerVSignal() failed. " ) ;
abort ( ) ;
}
return rtn ;
}
/***************************************************/
ez_proto ( int , ES_VSignal ,
pthread_t tid ,
int signum )
{
int rtn = ES_VSignal ( tid , signum ) ;
if ( rtn ) {
_eprintf (
# ifdef DEBUG
fileName , lineNo , funcName ,
# endif
" ES_VSignal() returned %d. " , rtn ) ;
abort ( ) ;
}
return rtn ;
}
/***************************************************/
ez_proto ( int , ES_registerTimer ,
int64_t pause_ms ,
int64_t interval_ms ,
int ( * callback_f ) ( void * ctxt ) ,
void * ctxt
)
{
int rtn = ES_registerTimer ( pause_ms , interval_ms , callback_f , ctxt ) ;
if ( - 1 = = rtn ) {
_eprintf (
# ifdef DEBUG
fileName , lineNo , funcName ,
# endif
" ES_registerTimer() failed. " ) ;
abort ( ) ;
}
return rtn ;
}
/***************************************************/
ez_proto ( int , ES_unregister , int key )
{
int rtn = ES_unregister ( key ) ;
if ( - 1 = = rtn ) {
_eprintf (
# ifdef DEBUG
fileName , lineNo , funcName ,
# endif
" ES_unregister() failed. " ) ;
abort ( ) ;
}
return rtn ;
}
/***************************************************/
ez_proto ( int , ES_run )
{
int rtn = ES_run ( ) ;
if ( rtn ) {
_eprintf (
# ifdef DEBUG
fileName , lineNo , funcName ,
# endif
" ES_run() returned %d " , rtn ) ;
abort ( ) ;
}
return rtn ;
}