mirror of
https://github.com/jrbrtsn/ban2fail
synced 2024-06-16 11:58:01 +00:00
DNS working well now
This commit is contained in:
parent
25219c8263
commit
fc53f0839b
19
ban2fail.c
19
ban2fail.c
@ -88,7 +88,7 @@ struct Global G= {
|
|||||||
.version= {
|
.version= {
|
||||||
.major= 0,
|
.major= 0,
|
||||||
.minor= 12,
|
.minor= 12,
|
||||||
.patch= 1
|
.patch= 2
|
||||||
},
|
},
|
||||||
|
|
||||||
.bitTuples.flags= GlobalFlagBitTuples
|
.bitTuples.flags= GlobalFlagBitTuples
|
||||||
@ -430,7 +430,7 @@ main(int argc, char **argv)
|
|||||||
|
|
||||||
int rc= PDNS_lookup(S.lePtrArr, nItems, DFLT_DNS_PAUSE_SEC*1000);
|
int rc= PDNS_lookup(S.lePtrArr, nItems, DFLT_DNS_PAUSE_SEC*1000);
|
||||||
assert(-1 != rc);
|
assert(-1 != rc);
|
||||||
ez_fprintf(G.listing_fh, "\tCompleted %d of %u lookups\n", rc, nItems);
|
ez_fprintf(G.listing_fh, "\t==> Completed %d of %u lookups\n", rc, nItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Process each LOGENTRY item */
|
/* Process each LOGENTRY item */
|
||||||
@ -467,16 +467,25 @@ main(int argc, char **argv)
|
|||||||
/* Print out only for list option */
|
/* Print out only for list option */
|
||||||
if(G.flags & GLB_LIST_ADDR_FLG) {
|
if(G.flags & GLB_LIST_ADDR_FLG) {
|
||||||
|
|
||||||
const static char *dns_fmt= "%-15s\t%5u/%-4d offenses %s (%s) %s\n",
|
// const static char *dns_fmt= "%-15s\t%5u/%-4d offenses %s (%s) %s%s %d\n",
|
||||||
|
const static char *dns_fmt= "%-15s\t%5u/%-4d offenses %s (%s) %s%s\n",
|
||||||
*fmt= "%-15s\t%5u/%-4d offenses %s (%s)\n";
|
*fmt= "%-15s\t%5u/%-4d offenses %s (%s)\n";
|
||||||
|
|
||||||
ez_fprintf(G.listing_fh, e->dnsName ? dns_fmt : fmt
|
const char *failStat= "";
|
||||||
|
if(e->dns.flags & PDNS_FWD_FAIL_FLG)
|
||||||
|
failStat= " ~";
|
||||||
|
if(e->dns.flags & PDNS_FWD_NONE_FLG)
|
||||||
|
failStat= " *";
|
||||||
|
|
||||||
|
ez_fprintf(G.listing_fh, e->dns.name ? dns_fmt : fmt
|
||||||
, e->addr
|
, e->addr
|
||||||
, e->count
|
, e->count
|
||||||
, nAllowed
|
, nAllowed
|
||||||
, e->cntry[0] ? e->cntry : "--"
|
, e->cntry[0] ? e->cntry : "--"
|
||||||
, bits2str(flags, BlockBitTuples)
|
, bits2str(flags, BlockBitTuples)
|
||||||
, e->dnsName
|
, e->dns.name
|
||||||
|
, failStat
|
||||||
|
, e->dns.getaddrinfo_rtn
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,11 @@
|
|||||||
#define BUCKET_DEPTH_HINT 10
|
#define BUCKET_DEPTH_HINT 10
|
||||||
|
|
||||||
/* How long to wait for reverse DNS lookups before bailing out */
|
/* How long to wait for reverse DNS lookups before bailing out */
|
||||||
#define DFLT_DNS_PAUSE_SEC 60
|
#ifdef DEBUG
|
||||||
|
# define DFLT_DNS_PAUSE_SEC 10
|
||||||
|
#else
|
||||||
|
# define DFLT_DNS_PAUSE_SEC 60
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Where to find stuff */
|
/* Where to find stuff */
|
||||||
#define CONFIGFILE "/etc/ban2fail/ban2fail.cfg"
|
#define CONFIGFILE "/etc/ban2fail/ban2fail.cfg"
|
||||||
|
15
es.c
15
es.c
@ -21,7 +21,11 @@ enum ES_type {
|
|||||||
|
|
||||||
#define NUMSIGS 30
|
#define NUMSIGS 30
|
||||||
|
|
||||||
#define VSIG_QUEUE_MAX 100
|
/* NOTE: if this queue becomes full, it
|
||||||
|
* can wreak havok on your mutlithreading
|
||||||
|
* logic!
|
||||||
|
*/
|
||||||
|
#define VSIG_QUEUE_MAX 1000
|
||||||
|
|
||||||
/****************************************************
|
/****************************************************
|
||||||
* We get one of these anonymous structs per-process.
|
* We get one of these anonymous structs per-process.
|
||||||
@ -852,6 +856,9 @@ ES_spawn_thread_sched(
|
|||||||
|
|
||||||
/* Spawn the new thread */
|
/* Spawn the new thread */
|
||||||
memset(&tid, 0, sizeof(tid));
|
memset(&tid, 0, sizeof(tid));
|
||||||
|
/* JDR Sat 30 Nov 2019 10:39:04 AM EST
|
||||||
|
* it appears that this fails at 300 threads.
|
||||||
|
*/
|
||||||
rtn = pthread_create (&tid, &attr, user_main, arg);
|
rtn = pthread_create (&tid, &attr, user_main, arg);
|
||||||
if(rtn) {
|
if(rtn) {
|
||||||
sys_eprintf("ERROR: pthread_create()");
|
sys_eprintf("ERROR: pthread_create()");
|
||||||
@ -965,11 +972,7 @@ ES_VSignal (pthread_t tid, int signum)
|
|||||||
assert(tid == ts->tid);
|
assert(tid == ts->tid);
|
||||||
|
|
||||||
/* Place virtual signal in message queue */
|
/* Place virtual signal in message queue */
|
||||||
int rc= MSGQUEUE_submitMsg(&ts->vsig.mq, &signum);
|
ez_MSGQUEUE_submitMsg(&ts->vsig.mq, &signum);
|
||||||
if(rc) {
|
|
||||||
rtn= EOF;
|
|
||||||
goto abort;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* And finally tell the target thread to check it's message queue */
|
/* And finally tell the target thread to check it's message queue */
|
||||||
if(pthread_kill(tid, SIGUSR2)) {
|
if(pthread_kill(tid, SIGUSR2)) {
|
||||||
|
@ -407,9 +407,8 @@ int _ez_getaddrinfo(
|
|||||||
case 0:
|
case 0:
|
||||||
case EAI_AGAIN:
|
case EAI_AGAIN:
|
||||||
case EAI_FAIL:
|
case EAI_FAIL:
|
||||||
#ifdef EAI_NODATA
|
|
||||||
case EAI_NODATA:
|
case EAI_NODATA:
|
||||||
#endif
|
case EAI_NONAME:
|
||||||
return rtn;
|
return rtn;
|
||||||
|
|
||||||
case EAI_SYSTEM:
|
case EAI_SYSTEM:
|
||||||
|
10
logEntry.c
10
logEntry.c
@ -94,11 +94,11 @@ LOGENTRY_destructor(LOGENTRY *self)
|
|||||||
* Free resources.
|
* Free resources.
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
/* Sometimes this is assigned a static string,
|
/* Sometimes this is assigned a static string */
|
||||||
* so just let it leak.
|
if(self->dns.flags & PDNS_REV_DNS_FLG && self->dns.name)
|
||||||
*/
|
{
|
||||||
// if(self->dnsName)
|
free(self->dns.name);
|
||||||
// free(self->dnsName);
|
}
|
||||||
|
|
||||||
return self;
|
return self;
|
||||||
}
|
}
|
||||||
|
10
logEntry.h
10
logEntry.h
@ -23,6 +23,7 @@
|
|||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
|
||||||
#include "map.h"
|
#include "map.h"
|
||||||
|
#include "pdns.h"
|
||||||
|
|
||||||
/* One of these for each offense found in a log file */
|
/* One of these for each offense found in a log file */
|
||||||
typedef struct _LOGENTRY {
|
typedef struct _LOGENTRY {
|
||||||
@ -30,7 +31,14 @@ typedef struct _LOGENTRY {
|
|||||||
char addr[46],
|
char addr[46],
|
||||||
cntry[3];
|
cntry[3];
|
||||||
unsigned count;
|
unsigned count;
|
||||||
char *dnsName;
|
|
||||||
|
/* This data populated by PDNS_lookup() */
|
||||||
|
struct {
|
||||||
|
enum PDNS_flags flags;
|
||||||
|
char *name;
|
||||||
|
int getaddrinfo_rtn;
|
||||||
|
} dns;
|
||||||
|
|
||||||
} LOGENTRY;
|
} LOGENTRY;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
33
msgqueue.c
33
msgqueue.c
@ -75,7 +75,7 @@ MSGQUEUE_submitMsg (MSGQUEUE * self, const void *msgBuf)
|
|||||||
eprintf("WARNING: %p queue full.", self);
|
eprintf("WARNING: %p queue full.", self);
|
||||||
#endif
|
#endif
|
||||||
rtn = EOF;
|
rtn = EOF;
|
||||||
goto done;
|
goto abort;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If this is not the first item to be added */
|
/* If this is not the first item to be added */
|
||||||
@ -84,9 +84,9 @@ MSGQUEUE_submitMsg (MSGQUEUE * self, const void *msgBuf)
|
|||||||
|
|
||||||
memcpy (self->buff_ptr + self->tail * self->msgSize, msgBuf, self->msgSize);
|
memcpy (self->buff_ptr + self->tail * self->msgSize, msgBuf, self->msgSize);
|
||||||
|
|
||||||
self->numItems++;
|
++self->numItems;
|
||||||
|
|
||||||
done:
|
abort:
|
||||||
if (pthread_mutex_unlock (&self->mtx))
|
if (pthread_mutex_unlock (&self->mtx))
|
||||||
assert (0);
|
assert (0);
|
||||||
return rtn;
|
return rtn;
|
||||||
@ -105,7 +105,7 @@ MSGQUEUE_extractMsg (MSGQUEUE * self, void *msgBuf)
|
|||||||
if (!self->numItems)
|
if (!self->numItems)
|
||||||
{
|
{
|
||||||
rtn = EOF;
|
rtn = EOF;
|
||||||
goto done;
|
goto abort;
|
||||||
}
|
}
|
||||||
|
|
||||||
self->numItems--;
|
self->numItems--;
|
||||||
@ -114,7 +114,7 @@ MSGQUEUE_extractMsg (MSGQUEUE * self, void *msgBuf)
|
|||||||
if (self->numItems)
|
if (self->numItems)
|
||||||
self->head = (self->head + 1) % self->maxItems;
|
self->head = (self->head + 1) % self->maxItems;
|
||||||
|
|
||||||
done:
|
abort:
|
||||||
if (pthread_mutex_unlock (&self->mtx))
|
if (pthread_mutex_unlock (&self->mtx))
|
||||||
assert (0);
|
assert (0);
|
||||||
return rtn;
|
return rtn;
|
||||||
@ -139,19 +139,36 @@ MSGQUEUE_checkQueue (MSGQUEUE * self,
|
|||||||
assert (0);
|
assert (0);
|
||||||
|
|
||||||
if (!self->numItems)
|
if (!self->numItems)
|
||||||
goto done;
|
goto abort;
|
||||||
|
|
||||||
for (i = 0, ptr = self->buff_ptr + self->head * self->msgSize;
|
for (i = 0, ptr = self->buff_ptr + self->head * self->msgSize;
|
||||||
i < self->numItems;
|
i < self->numItems;
|
||||||
i++, ptr =
|
++i, ptr =
|
||||||
self->buff_ptr + ((self->head + i) % self->maxItems) * self->msgSize)
|
self->buff_ptr + ((self->head + i) % self->maxItems) * self->msgSize)
|
||||||
{
|
{
|
||||||
if ((rtn = (*check) (ptr, arg)))
|
if ((rtn = (*check) (ptr, arg)))
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
done:
|
abort:
|
||||||
if (pthread_mutex_unlock (&self->mtx))
|
if (pthread_mutex_unlock (&self->mtx))
|
||||||
assert (0);
|
assert (0);
|
||||||
return rtn;
|
return rtn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/************************************************************/
|
||||||
|
int _ez_MSGQUEUE_submitMsg(
|
||||||
|
const char *fileName,
|
||||||
|
int lineNo,
|
||||||
|
const char *funcName,
|
||||||
|
MSGQUEUE *self,
|
||||||
|
const void *msgBuf
|
||||||
|
)
|
||||||
|
{
|
||||||
|
int rtn= MSGQUEUE_submitMsg (self, msgBuf);
|
||||||
|
|
||||||
|
if(!rtn) return 0;
|
||||||
|
|
||||||
|
_eprintf(fileName, lineNo, funcName, "MSGQUEUE_submitMsg() failed");
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
16
msgqueue.h
16
msgqueue.h
@ -22,7 +22,7 @@
|
|||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
|
||||||
typedef struct
|
typedef struct _MSGQUEUE
|
||||||
/*******************************
|
/*******************************
|
||||||
* Necessary info for circular message
|
* Necessary info for circular message
|
||||||
* ring.
|
* ring.
|
||||||
@ -99,9 +99,23 @@ MSGQUEUE_submitMsg (
|
|||||||
*
|
*
|
||||||
* Returns: 0 for success, non-zero otherwise.
|
* Returns: 0 for success, non-zero otherwise.
|
||||||
*/
|
*/
|
||||||
|
#define ez_MSGQUEUE_submitMsg(self, msgBuf) \
|
||||||
|
_ez_MSGQUEUE_submitMsg(__FILE__, __LINE__, __FUNCTION__, self, msgBuf)
|
||||||
|
int _ez_MSGQUEUE_submitMsg(
|
||||||
|
const char *fileName,
|
||||||
|
int lineNo,
|
||||||
|
const char *funcName,
|
||||||
|
MSGQUEUE *self,
|
||||||
|
const void *msgBuf
|
||||||
|
);
|
||||||
|
|
||||||
#define MSGQUEUE_submitTypedMsg(self, msg) \
|
#define MSGQUEUE_submitTypedMsg(self, msg) \
|
||||||
MSGQUEUE_submitMsg(self, &(msg))
|
MSGQUEUE_submitMsg(self, &(msg))
|
||||||
|
|
||||||
|
#define ez_MSGQUEUE_submitTypedMsg(self, msg) \
|
||||||
|
ez_MSGQUEUE_submitMsg(self, &(msg))
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
MSGQUEUE_extractMsg (
|
MSGQUEUE_extractMsg (
|
||||||
MSGQUEUE *self,
|
MSGQUEUE *self,
|
||||||
|
291
pdns.c
291
pdns.c
@ -17,13 +17,7 @@
|
|||||||
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
|
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
|
||||||
***************************************************************************/
|
***************************************************************************/
|
||||||
|
|
||||||
/*
|
#define _GNU_SOURCE
|
||||||
* JDR Sat 30 Nov 2019 08:27:23 AM EST
|
|
||||||
* Performs DNS reverse lookups in parallel. Having to use a bunch of threads
|
|
||||||
* to overcome serialization inherent in a blocking call is a travesty, but I
|
|
||||||
* couldn't find a non-blocking version of getnameinfo().
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
@ -35,21 +29,37 @@
|
|||||||
#include "util.h"
|
#include "util.h"
|
||||||
|
|
||||||
enum vsignals {
|
enum vsignals {
|
||||||
/* All vsigs before this are used to indicate child ready to join */
|
/* All vsigs before this are used to indicate worker ready to join */
|
||||||
EXIT_VSIG= PDNS_MAX_THREADS,
|
EXIT_VSIG= PDNS_MAX_THREADS,
|
||||||
CHECK_INBOX_VSIG
|
CHECK_INBOX_VSIG
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum lookupType {
|
||||||
|
FWD_LOOKUP,
|
||||||
|
REV_LOOKUP
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Messages in the mgr inbox look like this */
|
||||||
|
struct mgrMsg {
|
||||||
|
LOGENTRY *e;
|
||||||
|
unsigned worker_ndx;
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Messages in the worker inbox look like this */
|
||||||
|
struct workerMsg {
|
||||||
|
LOGENTRY *e;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/*============================================================*/
|
/*============================================================*/
|
||||||
/*=========== Forward declarations ===========================*/
|
/*=========== Forward declarations ===========================*/
|
||||||
/*============================================================*/
|
/*============================================================*/
|
||||||
static int check_inbox_f(void *data, int signo);
|
static int mgr_check_inbox_f(void *data, int signo);
|
||||||
static int child_check_inbox_f(void *vp_ndx, int signo);
|
static int worker_check_inbox_f(void *vp_ndx, int signo);
|
||||||
static void* child_main (void *data);
|
static void* worker_main (void *data);
|
||||||
static int child_exit_f(void *data, int signo);
|
static int worker_exit_f(void *data, int signo);
|
||||||
static int join_f(void *data, int signo);
|
static int join_f(void *data, int signo);
|
||||||
static void stop_remaining_children(void);
|
static void stop_remaining_workers(void);
|
||||||
static int timeout_f(void *data);
|
static int timeout_f(void *data);
|
||||||
static int shutdown_f(void *data);
|
static int shutdown_f(void *data);
|
||||||
static unsigned nThreads_joined(void);
|
static unsigned nThreads_joined(void);
|
||||||
@ -60,7 +70,8 @@ static unsigned nThreads_joined(void);
|
|||||||
static struct {
|
static struct {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
EXIT_FLG= 1<<0
|
EXIT_FLG= 1<<0,
|
||||||
|
ORPHAN_FLG= 1<<1
|
||||||
} flags;
|
} flags;
|
||||||
|
|
||||||
int64_t start_ms;
|
int64_t start_ms;
|
||||||
@ -73,21 +84,19 @@ static struct {
|
|||||||
pthread_t tid;
|
pthread_t tid;
|
||||||
MSGQUEUE inbox;
|
MSGQUEUE inbox;
|
||||||
LOGENTRY **lePtrArr;
|
LOGENTRY **lePtrArr;
|
||||||
unsigned ndx,
|
unsigned processedNdx,
|
||||||
nThreads,
|
nThreads,
|
||||||
nItems;
|
nItems;
|
||||||
|
|
||||||
unsigned nCompleted;
|
/* One of these for each worker thread */
|
||||||
|
struct worker {
|
||||||
/* One of these for each child thread */
|
|
||||||
struct child {
|
|
||||||
|
|
||||||
int is_joined;
|
int is_joined;
|
||||||
|
|
||||||
pthread_t tid;
|
pthread_t tid;
|
||||||
MSGQUEUE inbox;
|
MSGQUEUE inbox;
|
||||||
|
|
||||||
} childArr[PDNS_MAX_THREADS];
|
} workerArr[PDNS_MAX_THREADS];
|
||||||
|
|
||||||
} S;
|
} S;
|
||||||
|
|
||||||
@ -103,7 +112,7 @@ PDNS_lookup(LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms)
|
|||||||
{
|
{
|
||||||
int rtn= -1;
|
int rtn= -1;
|
||||||
|
|
||||||
/* Check for nothing-to-do case */
|
/* Check for trivial case */
|
||||||
if(!nItems)
|
if(!nItems)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
@ -114,7 +123,7 @@ PDNS_lookup(LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms)
|
|||||||
S.tid= pthread_self();
|
S.tid= pthread_self();
|
||||||
|
|
||||||
/* Prepare our inbox */
|
/* Prepare our inbox */
|
||||||
MSGQUEUE_constructor(&S.inbox, sizeof(unsigned), PDNS_INBOX_SZ);
|
MSGQUEUE_constructor(&S.inbox, sizeof(struct mgrMsg), PDNS_MGR_INBOX_SZ);
|
||||||
|
|
||||||
/* Stash this where it's easy to get to */
|
/* Stash this where it's easy to get to */
|
||||||
S.nItems= nItems;
|
S.nItems= nItems;
|
||||||
@ -124,36 +133,35 @@ PDNS_lookup(LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms)
|
|||||||
/* Register a countdown timer to know when to stop */
|
/* Register a countdown timer to know when to stop */
|
||||||
S.timeoutKey= ez_ES_registerTimer(timeout_ms, 0, timeout_f, NULL);
|
S.timeoutKey= ez_ES_registerTimer(timeout_ms, 0, timeout_f, NULL);
|
||||||
/* Check inbox on CHECK_INBOX_VSIG */
|
/* Check inbox on CHECK_INBOX_VSIG */
|
||||||
S.inboxKey= ez_ES_registerVSignal(CHECK_INBOX_VSIG, check_inbox_f, NULL);
|
S.inboxKey= ez_ES_registerVSignal(CHECK_INBOX_VSIG, mgr_check_inbox_f, NULL);
|
||||||
|
|
||||||
/* Start child threads */
|
/* Start worker threads */
|
||||||
for(unsigned i= 0; i < S.nThreads; ++i) {
|
for(unsigned i= 0; i < S.nThreads; ++i) {
|
||||||
|
|
||||||
struct child *ch= S.childArr + i;
|
struct worker *wrk= S.workerArr + i;
|
||||||
|
|
||||||
/* Register the join handler on vsig= array index */
|
/* Register the join handler on vsig= array index */
|
||||||
S.joinKeyArr[i]= ez_ES_registerVSignal(i, join_f, NULL);
|
S.joinKeyArr[i]= ez_ES_registerVSignal(i, join_f, NULL);
|
||||||
|
|
||||||
/* Pass the child's array index in to child_main() */
|
/* Pass the worker's array index in to worker_main() */
|
||||||
ch->tid= ES_spawn_thread(child_main, (void*)(long unsigned)i);
|
wrk->tid= ES_spawn_thread(worker_main, (void*)(long unsigned)i);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Give child threads something to do */
|
/* Give worker threads something to do */
|
||||||
for(; S.ndx < S.nThreads; ++S.ndx) {
|
for(; S.processedNdx < S.nThreads; ++S.processedNdx) {
|
||||||
|
|
||||||
struct child *ch= S.childArr + S.ndx;
|
struct worker *wrk= S.workerArr + S.processedNdx;
|
||||||
|
struct workerMsg worker_msg= {.e= S.lePtrArr[S.processedNdx]};
|
||||||
|
|
||||||
/* Give the child something to do */
|
/* Give the worker something to do */
|
||||||
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.lePtrArr[S.ndx]);
|
ez_MSGQUEUE_submitTypedMsg(&wrk->inbox, worker_msg);
|
||||||
assert(0 == rc);
|
/* Prompt worker to check inbox */
|
||||||
/* Prompt child to check inbox */
|
ES_VSignal(wrk->tid, CHECK_INBOX_VSIG);
|
||||||
ES_VSignal(ch->tid, CHECK_INBOX_VSIG);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wait for something to happen */
|
/* Wait for something to happen */
|
||||||
ES_run();
|
ES_run();
|
||||||
//eprintf("------------ ALL THREADS TERMINATED ------------");
|
|
||||||
|
|
||||||
/* Unregister signal handlers for this thread */
|
/* Unregister signal handlers for this thread */
|
||||||
if(S.timeoutKey)
|
if(S.timeoutKey)
|
||||||
@ -169,40 +177,49 @@ PDNS_lookup(LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms)
|
|||||||
ez_ES_unregister(S.joinKeyArr[i]);
|
ez_ES_unregister(S.joinKeyArr[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
rtn= S.nCompleted;
|
rtn= S.processedNdx;
|
||||||
abort:
|
abort:
|
||||||
return rtn;
|
return rtn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
check_inbox_f(void *data, int signo)
|
mgr_check_inbox_f(void *data, int signo)
|
||||||
/*********************************************************
|
/*********************************************************
|
||||||
* Parent was prompted to check the inbox to see which
|
* Parent was prompted to check the inbox to see which
|
||||||
* child is ready for another task.
|
* worker is ready for another task.
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
int rtn= -1;
|
int rtn= -1;
|
||||||
unsigned child_ndx;
|
struct mgrMsg msg;
|
||||||
|
|
||||||
while(EOF != MSGQUEUE_extractTypedMsg(&S.inbox, child_ndx)) {
|
while(EOF != MSGQUEUE_extractTypedMsg(&S.inbox, msg)) {
|
||||||
|
|
||||||
/* Get pointer to child */
|
/* Get pointer to worker */
|
||||||
struct child *ch= S.childArr + child_ndx;
|
struct worker *wrk= S.workerArr + msg.worker_ndx;
|
||||||
|
struct workerMsg worker_msg= {.e= NULL};
|
||||||
|
|
||||||
/* Noting left to do here */
|
if(msg.e->dns.flags & PDNS_DONE_MASK) {
|
||||||
if(S.ndx == S.nItems) {
|
|
||||||
//eprintf("Killing thread %u early", child_ndx);
|
/* If we've finished up, start pruning worker threads */
|
||||||
pthread_kill(ch->tid, SIGTERM);
|
if(S.processedNdx == S.nItems) {
|
||||||
|
pthread_kill(wrk->tid, SIGTERM);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Tell child to work on next task */
|
worker_msg.e= S.lePtrArr[S.processedNdx];
|
||||||
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.lePtrArr[S.ndx]);
|
++S.processedNdx;
|
||||||
assert(0 == rc);
|
|
||||||
ES_VSignal(ch->tid, CHECK_INBOX_VSIG);
|
} else {
|
||||||
|
|
||||||
|
/* Perform forward lookup next */
|
||||||
|
worker_msg.e= msg.e;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Give worker another task */
|
||||||
|
ez_MSGQUEUE_submitTypedMsg(&wrk->inbox, worker_msg);
|
||||||
|
ES_VSignal(wrk->tid, CHECK_INBOX_VSIG);
|
||||||
|
|
||||||
/* Move the "todo" ndx forward */
|
|
||||||
++S.ndx;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rtn= 0;
|
rtn= 0;
|
||||||
@ -219,8 +236,8 @@ nThreads_joined(void)
|
|||||||
unsigned rtn= 0;
|
unsigned rtn= 0;
|
||||||
for(unsigned i= 0; i < S.nThreads; ++i) {
|
for(unsigned i= 0; i < S.nThreads; ++i) {
|
||||||
|
|
||||||
struct child *ch= S.childArr + i;
|
struct worker *wrk= S.workerArr + i;
|
||||||
if(!ch->is_joined) continue;
|
if(!wrk->is_joined) continue;
|
||||||
++rtn;
|
++rtn;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,39 +247,37 @@ nThreads_joined(void)
|
|||||||
static int
|
static int
|
||||||
join_f(void *data, int signo)
|
join_f(void *data, int signo)
|
||||||
/*********************************************************
|
/*********************************************************
|
||||||
* Child prompted us to join
|
* Worker prompted us to join
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
struct child *ch= S.childArr + signo;
|
struct worker *wrk= S.workerArr + signo;
|
||||||
void *pRtn;
|
void *pRtn;
|
||||||
|
|
||||||
//eprintf("joining thread %d", signo);
|
pthread_join(wrk->tid, &pRtn);
|
||||||
|
|
||||||
pthread_join(ch->tid, &pRtn);
|
wrk->is_joined= 1;
|
||||||
|
|
||||||
ch->is_joined= 1;
|
|
||||||
|
|
||||||
/* This will naturally terminate when we are done.*/
|
/* This will naturally terminate when we are done.*/
|
||||||
return S.nThreads == nThreads_joined() ? -1 : 0;
|
return S.nThreads == nThreads_joined() ? -1 : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
stop_remaining_children(void)
|
stop_remaining_workers(void)
|
||||||
/*********************************************************
|
/*********************************************************
|
||||||
* Signal any remaining children to stop.
|
* Signal any remaining workers to stop.
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
/* Tell all remaining child threads to exit now */
|
/* Tell all remaining worker threads to exit now */
|
||||||
unsigned i;
|
unsigned i;
|
||||||
for(i= 0; i < S.nThreads; ++i) {
|
for(i= 0; i < S.nThreads; ++i) {
|
||||||
|
|
||||||
struct child *ch= S.childArr + i;
|
struct worker *wrk= S.workerArr + i;
|
||||||
|
|
||||||
/* If it has already joined, skip it */
|
/* If it has already joined, skip it */
|
||||||
if(ch->is_joined) continue;
|
if(wrk->is_joined) continue;
|
||||||
|
|
||||||
/* Prompt child to shut down now */
|
/* Prompt worker to shut down now */
|
||||||
pthread_kill(ch->tid, SIGTERM);
|
pthread_kill(wrk->tid, SIGTERM);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -278,11 +293,11 @@ timeout_f(void *data)
|
|||||||
/* Post notice that it is time to shut down */
|
/* Post notice that it is time to shut down */
|
||||||
S.flags |= EXIT_FLG;
|
S.flags |= EXIT_FLG;
|
||||||
|
|
||||||
eprintf("Timed out with %u threads remaining", S.nThreads - nThreads_joined());
|
#ifdef DEBUG
|
||||||
|
eprintf("Timed out with %u threads remaining", S.nThreads - nThreads_joined());
|
||||||
|
#endif
|
||||||
|
|
||||||
//eprintf("EXTERMINATE!!!!");
|
stop_remaining_workers();
|
||||||
|
|
||||||
stop_remaining_children();
|
|
||||||
|
|
||||||
/* Register a countdown timer to know when to forcefully
|
/* Register a countdown timer to know when to forcefully
|
||||||
* stop remaining threads.
|
* stop remaining threads.
|
||||||
@ -299,41 +314,48 @@ shutdown_f(void *data)
|
|||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
S.shutdownKey= 0;
|
S.shutdownKey= 0;
|
||||||
eprintf("WTF? %u threads _still_ remain", S.nThreads - nThreads_joined());
|
#ifdef DEBUG
|
||||||
|
eprintf("WTF: %u threads *still* remain!", S.nThreads - nThreads_joined());
|
||||||
|
#endif
|
||||||
|
/* Let workerren know not to signal for a join */
|
||||||
|
S.flags |= ORPHAN_FLG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*============================================================*/
|
/*============================================================*/
|
||||||
/*================= Child threads ============================*/
|
/*================= Worker threads ============================*/
|
||||||
/*============================================================*/
|
/*============================================================*/
|
||||||
|
|
||||||
static void*
|
static void*
|
||||||
child_main (void *vp_ndx)
|
worker_main (void *vp_ndx)
|
||||||
/*********************************************************
|
/*********************************************************
|
||||||
* Children begin execution here.
|
* Workers begin execution here.
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
unsigned ndx= (long unsigned)vp_ndx;
|
unsigned ndx= (long unsigned)vp_ndx;
|
||||||
struct child *self= S.childArr + ndx;
|
struct worker *self= S.workerArr + ndx;
|
||||||
|
|
||||||
/* Prepare child's static data */
|
/* Prepare worker's static data */
|
||||||
MSGQUEUE_constructor(&self->inbox, sizeof(LOGENTRY*), PDNS_CHILD_INBOX_SZ);
|
MSGQUEUE_constructor(&self->inbox, sizeof(struct workerMsg), PDNS_WORKER_INBOX_SZ);
|
||||||
|
|
||||||
/* Register to exit when prompted */
|
/* Register to exit when prompted */
|
||||||
ez_ES_registerSignal(SIGTERM, child_exit_f, NULL);
|
ez_ES_registerSignal(SIGTERM, worker_exit_f, NULL);
|
||||||
|
|
||||||
/* Register to check inbox when prompted */
|
/* Register to check inbox when prompted */
|
||||||
ez_ES_registerVSignal(CHECK_INBOX_VSIG, child_check_inbox_f, vp_ndx);
|
ez_ES_registerVSignal(CHECK_INBOX_VSIG, worker_check_inbox_f, vp_ndx);
|
||||||
|
|
||||||
/* Parent has been blocked waiting for this call */
|
/* Parent has been blocked waiting for this call */
|
||||||
ES_release_parent();
|
ES_release_parent();
|
||||||
|
|
||||||
/* Respond to directives from parent */
|
/* Respond to directives from mgr */
|
||||||
ES_run();
|
ES_run();
|
||||||
int64_t ms= clock_gettime_ms(CLOCK_REALTIME) - S.start_ms;
|
#ifdef qqDEBUG
|
||||||
|
int64_t ms= clock_gettime_ms(CLOCK_REALTIME) - S.start_ms;
|
||||||
//eprintf("thread %u exiting at %f seconds", ndx, (double)ms/1000.);
|
eprintf("thread %u exiting at %f seconds", ndx, (double)ms/1000.);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Parent thread may have moved on. In that case, don't join. */
|
||||||
|
if(!(S.flags & ORPHAN_FLG))
|
||||||
/* Let the main thread know we are ready to join */
|
/* Let the main thread know we are ready to join */
|
||||||
ES_VSignal(S.tid, ndx);
|
ES_VSignal(S.tid, ndx);
|
||||||
|
|
||||||
@ -341,45 +363,91 @@ child_main (void *vp_ndx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
child_check_inbox_f(void *vp_ndx, int signo)
|
worker_check_inbox_f(void *vp_ndx, int signo)
|
||||||
/*********************************************************
|
/*********************************************************
|
||||||
* Child was prompted to check the inbox for tasks.
|
* Worker was prompted to check the inbox for tasks.
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
int rtn= -1;
|
int rtn= -1;
|
||||||
unsigned ndx= (long unsigned)vp_ndx;
|
unsigned ndx= (long unsigned)vp_ndx;
|
||||||
struct child *self= S.childArr + ndx;
|
struct worker *self= S.workerArr + ndx;
|
||||||
char hostBuf[PATH_MAX];
|
struct workerMsg msg;
|
||||||
LOGENTRY *e;
|
|
||||||
|
while(!(S.flags & EXIT_FLG) &&
|
||||||
|
EOF != MSGQUEUE_extractTypedMsg(&self->inbox, msg))
|
||||||
|
{
|
||||||
|
assert(msg.e);
|
||||||
|
int64_t ms= clock_gettime_ms(CLOCK_REALTIME) - S.start_ms;
|
||||||
|
//eprintf("thread %u doing lookup at %f seconds", ndx, (double)ms/1000.);
|
||||||
|
|
||||||
|
if(msg.e->dns.flags & PDNS_REV_DNS_FLG) {
|
||||||
|
|
||||||
|
const static struct addrinfo hints= {
|
||||||
|
.ai_family = AF_UNSPEC, /* Allow IPv4 or IPv6 */
|
||||||
|
// .ai_flags = AI_CANONNAME /* get the forward lookup result */
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Get a populated addrinfo object */
|
||||||
|
struct addrinfo *res= NULL;
|
||||||
|
int rc= ez_getaddrinfo(msg.e->dns.name, NULL, &hints, &res);
|
||||||
|
|
||||||
|
msg.e->dns.getaddrinfo_rtn= rc;
|
||||||
|
|
||||||
|
switch(rc) {
|
||||||
|
case 0:
|
||||||
|
// assert(res && res->ai_canonname);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case EAI_NONAME:
|
||||||
|
msg.e->dns.flags |= PDNS_FWD_NONE_FLG;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case EAI_FAIL:
|
||||||
|
case EAI_NODATA:
|
||||||
|
case EAI_AGAIN:
|
||||||
|
msg.e->dns.flags |= PDNS_FWD_FAIL_FLG;
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
eprintf("rc= %d", rc);
|
||||||
|
assert(0);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* In any case, we are done */
|
||||||
|
msg.e->dns.flags |= PDNS_FWD_DNS_FLG;
|
||||||
|
|
||||||
|
} else { /* reverse lookup */
|
||||||
|
|
||||||
const static struct addrinfo hints= {
|
const static struct addrinfo hints= {
|
||||||
.ai_family = AF_UNSPEC, /* Allow IPv4 or IPv6 */
|
.ai_family = AF_UNSPEC, /* Allow IPv4 or IPv6 */
|
||||||
.ai_flags = AI_NUMERICHOST /* doing reverse lookups */
|
.ai_flags = AI_NUMERICHOST /* doing reverse lookups */
|
||||||
};
|
};
|
||||||
|
|
||||||
while(!(S.flags & EXIT_FLG) &&
|
/* Place to which getnameinfo can copy result */
|
||||||
EOF != MSGQUEUE_extractTypedMsg(&self->inbox, e))
|
char hostBuf[PATH_MAX];
|
||||||
{
|
|
||||||
assert(e);
|
|
||||||
int64_t ms= clock_gettime_ms(CLOCK_REALTIME) - S.start_ms;
|
|
||||||
//eprintf("thread %u doing lookup at %f seconds", ndx, (double)ms/1000.);
|
|
||||||
/* Get a populated addrinfo object */
|
/* Get a populated addrinfo object */
|
||||||
struct addrinfo *res= NULL;
|
struct addrinfo *res= NULL;
|
||||||
int rc= ez_getaddrinfo(e->addr, NULL, &hints, &res);
|
int rc= ez_getaddrinfo(msg.e->addr, NULL, &hints, &res);
|
||||||
assert(0 == rc);
|
assert(0 == rc);
|
||||||
assert(res && res->ai_addr && res->ai_addrlen);
|
assert(res && res->ai_addr && res->ai_addrlen);
|
||||||
/* Now do blocking reverse lookup */
|
/* Now do blocking reverse lookup */
|
||||||
rc= ez_getnameinfo(res->ai_addr, res->ai_addrlen, hostBuf, sizeof(hostBuf)-1, NULL, 0, NI_NAMEREQD);
|
rc= ez_getnameinfo(res->ai_addr, res->ai_addrlen, hostBuf, sizeof(hostBuf)-1, NULL, 0, NI_NAMEREQD);
|
||||||
switch(rc) {
|
switch(rc) {
|
||||||
case 0:
|
case 0:
|
||||||
e->dnsName= strdup(hostBuf);
|
msg.e->dns.name= strdup(hostBuf);
|
||||||
|
msg.e->dns.flags |= PDNS_REV_DNS_FLG;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case EAI_NONAME:
|
case EAI_NONAME:
|
||||||
e->dnsName= "[3(NXDOMAIN)]";
|
msg.e->dns.name= "[3(NXDOMAIN)]";
|
||||||
|
msg.e->dns.flags |= PDNS_NXDOMAIN_FLG;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case EAI_AGAIN:
|
case EAI_AGAIN:
|
||||||
e->dnsName= "[2(SERVFAIL)]";
|
msg.e->dns.name= "[2(SERVFAIL)]";
|
||||||
|
msg.e->dns.flags |= PDNS_SERVFAIL_FLG;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -387,27 +455,34 @@ child_check_inbox_f(void *vp_ndx, int signo)
|
|||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
++S.nCompleted;
|
|
||||||
|
|
||||||
if(S.flags & EXIT_FLG) return -1;
|
/* Catch being bumped out of blocking call by signal */
|
||||||
|
if(S.flags & EXIT_FLG) break;
|
||||||
|
}
|
||||||
|
|
||||||
/* Submit the child's array ndx to main parent
|
/* Only do follow up if we are not exiting */
|
||||||
|
if(!(S.flags & EXIT_FLG)) {
|
||||||
|
|
||||||
|
struct mgrMsg mgr_msg= {.e= msg.e, .worker_ndx= ndx};
|
||||||
|
/* Submit the worker's message to main mgr
|
||||||
* thread's inbox to indicate we are ready for
|
* thread's inbox to indicate we are ready for
|
||||||
* more.
|
* more.
|
||||||
*/
|
*/
|
||||||
MSGQUEUE_submitTypedMsg(&S.inbox, ndx);
|
ez_MSGQUEUE_submitTypedMsg(&S.inbox, mgr_msg);
|
||||||
ES_VSignal(S.tid, CHECK_INBOX_VSIG);
|
ES_VSignal(S.tid, CHECK_INBOX_VSIG);
|
||||||
|
|
||||||
rtn= 0;
|
rtn= 0;
|
||||||
|
}
|
||||||
|
|
||||||
abort:
|
abort:
|
||||||
return rtn;
|
return rtn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
child_exit_f(void *vp_ndx, int signo)
|
worker_exit_f(void *vp_ndx, int signo)
|
||||||
/*********************************************************
|
/**************************************************************************
|
||||||
* Child was prompted to exit now, so return -1 so child_main()
|
* Worker was prompted to exit now, so return -1 causing worker_main() return
|
||||||
* will return from ES_run().
|
* from ES_run().
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
return -1;
|
return -1;
|
||||||
|
37
pdns.h
37
pdns.h
@ -16,23 +16,48 @@
|
|||||||
* Free Software Foundation, Inc., *
|
* Free Software Foundation, Inc., *
|
||||||
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
|
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
|
||||||
***************************************************************************/
|
***************************************************************************/
|
||||||
|
/*
|
||||||
|
* JDR Sat 30 Nov 2019 08:27:23 AM EST
|
||||||
|
* Performs DNS reverse + fwd lookups in parallel. Having to use a bunch of threads
|
||||||
|
* to overcome the serialization inherent in blocking calls is a travesty, but
|
||||||
|
* I couldn't find a non-blocking version of getnameinfo().
|
||||||
|
*
|
||||||
|
* Oh yeah, and I'm not in the mood to write it myself ;-)
|
||||||
|
*/
|
||||||
|
|
||||||
#ifndef PARALLEL_DNS_H
|
#ifndef PARALLEL_DNS_H
|
||||||
#define PARALLEL_DNS_H
|
#define PARALLEL_DNS_H
|
||||||
|
|
||||||
|
|
||||||
|
/* Number of threads to use in parallel */
|
||||||
|
#define PDNS_MAX_THREADS 200
|
||||||
|
#define PDNS_MGR_INBOX_SZ PDNS_MAX_THREADS*2
|
||||||
|
#define PDNS_WORKER_INBOX_SZ 1
|
||||||
|
/* Give worker threads a chance to join */
|
||||||
|
#define PDNS_SHUTDOWN_PAUSE_MS 500
|
||||||
|
|
||||||
|
enum PDNS_flags {
|
||||||
|
PDNS_SERVFAIL_FLG= 1<<0,
|
||||||
|
PDNS_NXDOMAIN_FLG= 1<<1,
|
||||||
|
PDNS_REV_DNS_FLG= 1<<2,
|
||||||
|
PDNS_FWD_DNS_FLG= 1<<3,
|
||||||
|
PDNS_FWD_FAIL_FLG= 1<<4,
|
||||||
|
PDNS_FWD_NONE_FLG= 1<<5,
|
||||||
|
PDNS_DONE_MASK= PDNS_SERVFAIL_FLG|PDNS_NXDOMAIN_FLG|PDNS_FWD_DNS_FLG
|
||||||
|
};
|
||||||
|
|
||||||
|
#define _GNU_SOURCE
|
||||||
#include "logEntry.h"
|
#include "logEntry.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* Number of threads to use in parallel */
|
/* Fix recursive #include dependency */
|
||||||
#define PDNS_MAX_THREADS 100
|
struct _LOGENTRY;
|
||||||
#define PDNS_INBOX_SZ (PDNS_MAX_THREADS*3)
|
|
||||||
#define PDNS_CHILD_INBOX_SZ 1
|
|
||||||
#define PDNS_SHUTDOWN_PAUSE_MS 500
|
|
||||||
|
|
||||||
int
|
int
|
||||||
PDNS_lookup(LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms);
|
PDNS_lookup(struct _LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms);
|
||||||
/**************************************************************
|
/**************************************************************
|
||||||
* Perform parallel DNS reverse lookups on all LOGENTRY objects
|
* Perform parallel DNS reverse lookups on all LOGENTRY objects
|
||||||
* referenced in lePtrArr until finished, or timeout_ms has lapsed.
|
* referenced in lePtrArr until finished, or timeout_ms has lapsed.
|
||||||
|
Loading…
Reference in New Issue
Block a user