Parallell DNS working well

This commit is contained in:
john 2019-11-30 09:35:29 -05:00
parent eb7bda3dc9
commit 25219c8263
5 changed files with 121 additions and 56 deletions

View File

@ -88,7 +88,7 @@ struct Global G= {
.version= {
.major= 0,
.minor= 12,
.patch= 0
.patch= 1
},
.bitTuples.flags= GlobalFlagBitTuples
@ -425,13 +425,12 @@ main(int argc, char **argv)
/* Special processing for DNS lookups */
if(G.flags & GLB_DNS_LOOKUP_FLG) {
ez_fprintf(G.listing_fh, "Performing reverse lookups for a %d seconds ... ", DFLT_DNS_PAUSE_SEC);
ez_fprintf(G.listing_fh, "Performing reverse DNS lookups for up to %d seconds ...\n", DFLT_DNS_PAUSE_SEC);
fflush(G.listing_fh);
int rc= PDNS_lookup(S.lePtrArr, nItems, DFLT_DNS_PAUSE_SEC*1000);
const char *msg= "done";
if(rc) msg= "out of time";
ez_fprintf(G.listing_fh, "%s\n", msg);
assert(-1 != rc);
ez_fprintf(G.listing_fh, "\tCompleted %d of %u lookups\n", rc, nItems);
}
/* Process each LOGENTRY item */

View File

@ -40,7 +40,7 @@
#define BUCKET_DEPTH_HINT 10
/* How long to wait for reverse DNS lookups before bailing out */
#define DFLT_DNS_PAUSE_SEC 180
#define DFLT_DNS_PAUSE_SEC 60
/* Where to find stuff */
#define CONFIGFILE "/etc/ban2fail/ban2fail.cfg"

View File

@ -48,7 +48,7 @@ while true; do
esac
# Uncomment this to see the inotifywait output which triggered this cycle
echo "FILE= '$FILE', OPS= '$OPS'"
#echo "FILE= '$FILE', OPS= '$OPS'"
NOW_NS=$(date +%s%N)
(( SINCE_NS = NOW_NS - RAN_NS ))
@ -80,7 +80,7 @@ echo "FILE= '$FILE', OPS= '$OPS'"
RAN_NS=$(date +%s%N)
$TIME $BAN2FAIL || break
done < <(exec $INOTIFYWAIT -m $MON_FNAMES)
done < <($INOTIFYWAIT -m $MON_FNAMES)
echo 'Exiting main loop'
# Pause to let things settle down

143
pdns.c
View File

@ -17,6 +17,13 @@
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
/*
* JDR Sat 30 Nov 2019 08:27:23 AM EST
* Performs DNS reverse lookups in parallel. Having to use a bunch of threads
* to overcome serialization inherent in a blocking call is a travesty, but I
* couldn't find a non-blocking version of getnameinfo().
*/
#include <assert.h>
#include <limits.h>
#include <signal.h>
@ -29,7 +36,7 @@
enum vsignals {
/* All vsigs before this are used to indicate child ready to join */
EXIT_VSIG= PDNS_N_THREADS,
EXIT_VSIG= PDNS_MAX_THREADS,
CHECK_INBOX_VSIG
};
@ -44,6 +51,8 @@ static int child_exit_f(void *data, int signo);
static int join_f(void *data, int signo);
static void stop_remaining_children(void);
static int timeout_f(void *data);
static int shutdown_f(void *data);
static unsigned nThreads_joined(void);
/*============================================================*/
/*=========== Static data ====================================*/
@ -51,21 +60,24 @@ static int timeout_f(void *data);
static struct {
enum {
EXIT_FLG= 1<<0,
DONE_FLG= 1<<1
EXIT_FLG= 1<<0
} flags;
int64_t start_ms;
int timeoutKey,
shutdownKey,
inboxKey,
joinKeyArr[PDNS_N_THREADS];
joinKeyArr[PDNS_MAX_THREADS];
pthread_t tid;
MSGQUEUE inbox;
LOGENTRY **leArr;
LOGENTRY **lePtrArr;
unsigned ndx,
nItems,
nSucc,
nFail;
nThreads,
nItems;
unsigned nCompleted;
/* One of these for each child thread */
struct child {
@ -75,7 +87,7 @@ static struct {
pthread_t tid;
MSGQUEUE inbox;
} childArr[PDNS_N_THREADS];
} childArr[PDNS_MAX_THREADS];
} S;
@ -83,14 +95,21 @@ static struct {
/*=========== PDNS ===========================================*/
/*============================================================*/
int
PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms)
PDNS_lookup(LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms)
/**************************************************************
* Perform parallel DNS reverse lookups on all LOGENTRY objects
* referenced in leArr.
* referenced in lePtrArr.
*/
{
int rtn= -1;
/* Check for nothing-to-do case */
if(!nItems)
return 0;
/* Note when we start */
S.start_ms= clock_gettime_ms(CLOCK_REALTIME);
/* Publish our thread ID */
S.tid= pthread_self();
@ -99,27 +118,33 @@ PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms)
/* Stash this where it's easy to get to */
S.nItems= nItems;
S.leArr= leArr;
S.nThreads= MIN(nItems, PDNS_MAX_THREADS);
S.lePtrArr= lePtrArr;
/* Register a countdown timer to know when to stop */
S.timeoutKey= ez_ES_registerTimer(timeout_ms, 0, timeout_f, NULL);
/* Check inbox on CHECK_INBOX_VSIG */
S.inboxKey= ez_ES_registerVSignal(CHECK_INBOX_VSIG, check_inbox_f, NULL);
assert(S.timeoutKey && S.inboxKey);
/* Start child threads */
for(; S.ndx < PDNS_N_THREADS; ++S.ndx) {
for(unsigned i= 0; i < S.nThreads; ++i) {
struct child *ch= S.childArr + i;
/* Register the join handler on vsig= array index */
S.joinKeyArr[S.ndx]= ez_ES_registerVSignal(S.ndx, join_f, NULL);
S.joinKeyArr[i]= ez_ES_registerVSignal(i, join_f, NULL);
/* Pass the child's array index in to child_main() */
ch->tid= ES_spawn_thread(child_main, (void*)(long unsigned)i);
}
/* Give child threads something to do */
for(; S.ndx < S.nThreads; ++S.ndx) {
struct child *ch= S.childArr + S.ndx;
/* Pass the child's array index in to child_main() */
ch->tid= ES_spawn_thread(child_main, (void*)(long unsigned)S.ndx);
/* Give the child something to do */
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.leArr[S.ndx]);
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.lePtrArr[S.ndx]);
assert(0 == rc);
/* Prompt child to check inbox */
ES_VSignal(ch->tid, CHECK_INBOX_VSIG);
@ -134,16 +159,17 @@ PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms)
if(S.timeoutKey)
ez_ES_unregister(S.timeoutKey);
if(S.shutdownKey)
ez_ES_unregister(S.shutdownKey);
ez_ES_unregister(S.inboxKey);
/* Release all the join registrations */
for(unsigned i= 0; i < PDNS_N_THREADS; ++i) {
for(unsigned i= 0; i < S.nThreads; ++i) {
ez_ES_unregister(S.joinKeyArr[i]);
}
//eprintf("INFO: nItems= %u, nSucc= %u, nFail= %u", nItems, S.nSucc, S.nFail);
rtn= 0;
rtn= S.nCompleted;
abort:
return rtn;
}
@ -165,20 +191,18 @@ check_inbox_f(void *data, int signo)
/* Noting left to do here */
if(S.ndx == S.nItems) {
//eprintf("Killing thread %u early", child_ndx);
pthread_kill(ch->tid, SIGTERM);
continue;
}
/* Tell child to work on next task */
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.leArr[S.ndx]);
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.lePtrArr[S.ndx]);
assert(0 == rc);
ES_VSignal(ch->tid, CHECK_INBOX_VSIG);
/* Move the "todo" ndx forward */
++S.ndx;
if(S.ndx == S.nItems)
S.flags |= DONE_FLG;
}
rtn= 0;
@ -186,6 +210,23 @@ abort:
return rtn;
}
static unsigned
nThreads_joined(void)
/*********************************************************
* Return the number of threads which have already joined.
*/
{
unsigned rtn= 0;
for(unsigned i= 0; i < S.nThreads; ++i) {
struct child *ch= S.childArr + i;
if(!ch->is_joined) continue;
++rtn;
}
return rtn;
}
static int
join_f(void *data, int signo)
/*********************************************************
@ -195,19 +236,14 @@ join_f(void *data, int signo)
struct child *ch= S.childArr + signo;
void *pRtn;
//eprintf("joining thread %d", signo);
pthread_join(ch->tid, &pRtn);
ch->is_joined= 1;
unsigned i;
for(i= 0; i < PDNS_N_THREADS; ++i) {
struct child *ch= S.childArr + i;
if(!ch->is_joined) break;
}
/* This will naturally terminate when we are done.*/
return PDNS_N_THREADS == i ? -1 : 0;
return S.nThreads == nThreads_joined() ? -1 : 0;
}
static void
@ -218,7 +254,7 @@ stop_remaining_children(void)
{
/* Tell all remaining child threads to exit now */
unsigned i;
for(i= 0; i < PDNS_N_THREADS; ++i) {
for(i= 0; i < S.nThreads; ++i) {
struct child *ch= S.childArr + i;
@ -242,13 +278,31 @@ timeout_f(void *data)
/* Post notice that it is time to shut down */
S.flags |= EXIT_FLG;
//eprintf("============ TERMINATE NOW ============");
eprintf("Timed out with %u threads remaining", S.nThreads - nThreads_joined());
//eprintf("EXTERMINATE!!!!");
stop_remaining_children();
/* Register a countdown timer to know when to forcefully
* stop remaining threads.
*/
S.shutdownKey= ez_ES_registerTimer(PDNS_SHUTDOWN_PAUSE_MS, 0, shutdown_f, NULL);
return 0;
}
static int
shutdown_f(void *data)
/*********************************************************
* Terminate any remaining threads.
*/
{
S.shutdownKey= 0;
eprintf("WTF? %u threads _still_ remain", S.nThreads - nThreads_joined());
return -1;
}
/*============================================================*/
/*================= Child threads ============================*/
/*============================================================*/
@ -276,6 +330,9 @@ child_main (void *vp_ndx)
/* Respond to directives from parent */
ES_run();
int64_t ms= clock_gettime_ms(CLOCK_REALTIME) - S.start_ms;
//eprintf("thread %u exiting at %f seconds", ndx, (double)ms/1000.);
/* Let the main thread know we are ready to join */
ES_VSignal(S.tid, ndx);
@ -303,7 +360,8 @@ child_check_inbox_f(void *vp_ndx, int signo)
EOF != MSGQUEUE_extractTypedMsg(&self->inbox, e))
{
assert(e);
int64_t ms= clock_gettime_ms(CLOCK_REALTIME) - S.start_ms;
//eprintf("thread %u doing lookup at %f seconds", ndx, (double)ms/1000.);
/* Get a populated addrinfo object */
struct addrinfo *res= NULL;
int rc= ez_getaddrinfo(e->addr, NULL, &hints, &res);
@ -314,23 +372,22 @@ child_check_inbox_f(void *vp_ndx, int signo)
switch(rc) {
case 0:
e->dnsName= strdup(hostBuf);
++S.nSucc;
break;
case EAI_NONAME:
e->dnsName= "not found: 3(NXDOMAIN)";
++S.nFail;
e->dnsName= "[3(NXDOMAIN)]";
break;
case EAI_AGAIN:
e->dnsName= "not found: 2(SERVFAIL)";
++S.nFail;
e->dnsName= "[2(SERVFAIL)]";
break;
default:
eprintf("FATAL: getnameinfo() returned %d", rc);
abort();
}
}
++S.nCompleted;
if(S.flags & EXIT_FLG) return -1;

19
pdns.h
View File

@ -26,15 +26,24 @@ extern "C" {
#endif
/* Number of threads to use in parallel */
#define PDNS_N_THREADS 160
#define PDNS_INBOX_SZ 200
#define PDNS_CHILD_INBOX_SZ 2
#define PDNS_MAX_THREADS 100
#define PDNS_INBOX_SZ (PDNS_MAX_THREADS*3)
#define PDNS_CHILD_INBOX_SZ 1
#define PDNS_SHUTDOWN_PAUSE_MS 500
int
PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms);
PDNS_lookup(LOGENTRY *lePtrArr[], unsigned nItems, unsigned timeout_ms);
/**************************************************************
* Perform parallel DNS reverse lookups on all LOGENTRY objects
* referenced in leArr until finished, or timeout_ms has lapsed.
* referenced in lePtrArr until finished, or timeout_ms has lapsed.
*
* lePtrArr: array of pointers to LOGENTRY objects
* nItems: length of array
* timeout_ms: maximum amount of time to spend performing lookups.
*
* RETURNS
* suceess - number of lookups completed
* -1 Failures
*/
#ifdef __cplusplus