From eb7bda3dc90e2b796e1acc843a0298de02227a98 Mon Sep 17 00:00:00 2001 From: john Date: Sat, 30 Nov 2019 00:10:25 -0500 Subject: [PATCH] Added massively parallel reverse dns lookup --- Jmakefile | 1 + ban2fail.c | 117 ++---------------- ban2fail.h | 3 + es.c | 13 +- es.h | 6 +- logEntry.c | 6 + logEntry.h | 1 + msgqueue.h | 5 + pdns.c | 357 +++++++++++++++++++++++++++++++++++++++++++++++++++++ pdns.h | 44 +++++++ util.h | 4 +- 11 files changed, 442 insertions(+), 115 deletions(-) create mode 100644 pdns.c create mode 100644 pdns.h diff --git a/Jmakefile b/Jmakefile index e8040c1..c30f71e 100644 --- a/Jmakefile +++ b/Jmakefile @@ -27,6 +27,7 @@ src := \ map.c \ maxoff.c \ msgqueue.c \ + pdns.c \ ptrvec.c \ str.c \ util.c \ diff --git a/ban2fail.c b/ban2fail.c index e03fc00..eb5a81b 100644 --- a/ban2fail.c +++ b/ban2fail.c @@ -33,6 +33,7 @@ #include "logType.h" #include "map.h" #include "maxoff.h" +#include "pdns.h" #include "str.h" #include "util.h" @@ -127,12 +128,6 @@ static struct { PTRVEC toBlock_vec, toUnblock_vec; - /* Used for reverse DNS lookups */ - struct { - struct gaicb **cbPtrArr, - *cbArr; - } gai; - /* Used to place LOGENTRY address objects into linear * access container. */ @@ -430,99 +425,14 @@ main(int argc, char **argv) /* Special processing for DNS lookups */ if(G.flags & GLB_DNS_LOOKUP_FLG) { - static struct sigevent sev= {.sigev_notify= SIGEV_NONE}; - const static struct addrinfo hints= { - .ai_family = AF_UNSPEC, - .ai_flags = AI_NUMERICHOST - }; + ez_fprintf(G.listing_fh, "Performing reverse lookups for a %d seconds ... ", DFLT_DNS_PAUSE_SEC); + fflush(G.listing_fh); - /* Allocate array of structures */ - S.gai.cbArr= calloc(sizeof(struct gaicb), nItems); - assert(S.gai.cbArr); - - /* Allocate pointer array */ - S.gai.cbPtrArr= malloc(sizeof(struct gaicb*) * nItems); - assert(S.gai.cbPtrArr); - - /* Fill out cbPtrArr with addresses, populate structures */ - for(unsigned i= 0; i < nItems; ++i) { - - LOGENTRY *e= S.lePtrArr[i]; - struct gaicb *cb= S.gai.cbArr+i; - - /* Populate gaicb object */ - cb->ar_name= e->addr; - cb->ar_request= &hints; - - /* Place object address in cbPtrArr */ - S.gai.cbPtrArr[i]= cb; - } - - /* See if we can submit all of our requests */ -eprintf("Submitting %u addresses for lookup", nItems); - int rc= ez_getaddrinfo_a(GAI_NOWAIT, S.gai.cbPtrArr, nItems, &sev); - if(rc) - eprintf("returned %d", rc); - assert(0 == rc); - - // TODO: define max timeout on command line - static struct timespec ts; - ms2timespec(&ts, 10*1000); - - /* Pause for parallel DNS lookups */ - for(;;) { - int rc= ez_gai_suspend((const struct gaicb*const*)S.gai.cbPtrArr, nItems, &ts); - switch(rc) { - case 0: - case EAI_INTR: - continue; - - case EAI_ALLDONE: - break; - - default: - eprintf("INFO: gai_suspend() failed, rc= %d [%s]", rc, gai_strerror(rc)); - abort(); - } - break; - } - - eprintf("All done"); - unsigned nSucc= 0, - nFail= 0; - - /* Cancel any ongoing lookups */ - gai_cancel(NULL); - - /* Now check each gaicb object */ - for(unsigned i= 0; i < nItems; ++i) { - - struct gaicb *cb= S.gai.cbArr + i; - int status= gai_error(cb); - static char hostBuf[PATH_MAX]; - - switch(status) { - - case 0: { - ++nSucc; - assert(cb->ar_name && cb->ar_result); - struct addrinfo *ai= cb->ar_result; - assert(ai->ai_addr && ai->ai_addrlen); - - int rc= ez_getnameinfo(ai->ai_addr, ai->ai_addrlen, hostBuf, sizeof(hostBuf)-1, NULL, 0, NI_NAMEREQD); - eprintf("%s= %s", cb->ar_name, rc ? "unknown" : hostBuf); - } break; - - default: - ++nFail; - eprintf("INFO: status= %d [%s]", status, gai_strerror(status)); - continue; - } - - // TODO: use the result - } - eprintf("nItems= %u, nSucc= %u, nFail= %u", nItems, nSucc, nFail); - } /* End of GLB_DNS_LOOKUP_FLG */ + int rc= PDNS_lookup(S.lePtrArr, nItems, DFLT_DNS_PAUSE_SEC*1000); + const char *msg= "done"; + if(rc) msg= "out of time"; + ez_fprintf(G.listing_fh, "%s\n", msg); + } /* Process each LOGENTRY item */ for(unsigned i= 0; i < nItems; ++i) { @@ -557,22 +467,17 @@ eprintf("Submitting %u addresses for lookup", nItems); /* Print out only for list option */ if(G.flags & GLB_LIST_ADDR_FLG) { - const char *dns_name= NULL; -#if 0 - if(G.flags & GLB_DNS_LOOKUP_FLG) - dns_name= reverse_dns_lookup(e->addr); -#endif - const static char *dns_fmt= "%-15s\t%5u/%-4d offenses %s (%s) [%s]\n", + const static char *dns_fmt= "%-15s\t%5u/%-4d offenses %s (%s) %s\n", *fmt= "%-15s\t%5u/%-4d offenses %s (%s)\n"; - ez_fprintf(G.listing_fh, dns_name ? dns_fmt : fmt + ez_fprintf(G.listing_fh, e->dnsName ? dns_fmt : fmt , e->addr , e->count , nAllowed , e->cntry[0] ? e->cntry : "--" , bits2str(flags, BlockBitTuples) - , dns_name + , e->dnsName ); } diff --git a/ban2fail.h b/ban2fail.h index 6c81503..01385ca 100644 --- a/ban2fail.h +++ b/ban2fail.h @@ -39,6 +39,9 @@ #define N_ADDRESSES_HINT 10000 #define BUCKET_DEPTH_HINT 10 +/* How long to wait for reverse DNS lookups before bailing out */ +#define DFLT_DNS_PAUSE_SEC 180 + /* Where to find stuff */ #define CONFIGFILE "/etc/ban2fail/ban2fail.cfg" #define LOCKPATH "/run/lock/ban2fail" diff --git a/es.c b/es.c index 7202f2f..62ba70f 100644 --- a/es.c +++ b/es.c @@ -434,6 +434,8 @@ ES_registerVSignal ( * On failure, -1 is returned. */ { + if(TS.tid != pthread_self()) initialize(); + Cb *cb; if(!Cb_VSignalCreate(cb, signum, callback_f, ctxt)) assert(0); @@ -849,7 +851,12 @@ ES_spawn_thread_sched( if(pthread_mutex_lock(&S.spawn.cond_mtx)) assert (0); /* Spawn the new thread */ + memset(&tid, 0, sizeof(tid)); rtn = pthread_create (&tid, &attr, user_main, arg); + if(rtn) { + sys_eprintf("ERROR: pthread_create()"); + abort(); + } /* Now we, the parent, wait on the child */ while(!S.spawn.release_parent) { @@ -934,16 +941,14 @@ ES_cleanup(void) int ES_VSignal (pthread_t tid, int signum) /********************************************************************** - * Send a virtual signal to tid by placing signum in a mutex protected - * message queue, and then call pthread_kill(tid, SIGHUP) to notify the thread. + * Send a virtual signal to tid, which is multiplexed on SIGUSR2. * * tid: Target thread identifier. * signum: Any integer number which is meaningful to your application. * * RETURNS: * 0: successful - * EOF: the message queue is full - * -1: All other failures. + * -1: failures. */ { int rtn= EOF-1; diff --git a/es.h b/es.h index ec8b0a6..2be594d 100644 --- a/es.h +++ b/es.h @@ -77,16 +77,14 @@ ES_registerVSignal ( int ES_VSignal (pthread_t tid, int signum); /********************************************************************** - * Send a virtual signal to tid by placing signum in a mutex protected - * message queue, and then call pthread_kill(tid, SIGHUP) to notify the thread. + * Send a virtual signal to tid, which is multiplexed on SIGUSR2. * * tid: Target thread identifier. * signum: Any integer number which is meaningful to your application. * * RETURNS: * 0: successful - * EOF: the message queue is full - * -1: All other failures. + * -1: failures. */ int diff --git a/logEntry.c b/logEntry.c index 83da3f3..e1e6fd0 100644 --- a/logEntry.c +++ b/logEntry.c @@ -94,6 +94,12 @@ LOGENTRY_destructor(LOGENTRY *self) * Free resources. */ { + /* Sometimes this is assigned a static string, + * so just let it leak. + */ +// if(self->dnsName) +// free(self->dnsName); + return self; } diff --git a/logEntry.h b/logEntry.h index d5fb009..bcfea08 100644 --- a/logEntry.h +++ b/logEntry.h @@ -30,6 +30,7 @@ typedef struct _LOGENTRY { char addr[46], cntry[3]; unsigned count; + char *dnsName; } LOGENTRY; #ifdef __cplusplus diff --git a/msgqueue.h b/msgqueue.h index e18d3d5..69d9265 100644 --- a/msgqueue.h +++ b/msgqueue.h @@ -99,6 +99,8 @@ MSGQUEUE_submitMsg ( * * Returns: 0 for success, non-zero otherwise. */ +#define MSGQUEUE_submitTypedMsg(self, msg) \ + MSGQUEUE_submitMsg(self, &(msg)) int MSGQUEUE_extractMsg ( @@ -113,6 +115,9 @@ MSGQUEUE_extractMsg ( * * Returns: 0 for success, EOF if the queue is empty. */ +#define MSGQUEUE_extractTypedMsg(self, msg) \ + MSGQUEUE_extractMsg(self, &(msg)) + int MSGQUEUE_checkQueue ( diff --git a/pdns.c b/pdns.c new file mode 100644 index 0000000..93e0d9c --- /dev/null +++ b/pdns.c @@ -0,0 +1,357 @@ +/*************************************************************************** + * Copyright (C) 2019 by John D. Robertson * + * john@rrci.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 3 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * + ***************************************************************************/ + +#include +#include +#include + +#include "ez_es.h" +#include "ez_libc.h" +#include "msgqueue.h" +#include "pdns.h" +#include "util.h" + +enum vsignals { + /* All vsigs before this are used to indicate child ready to join */ + EXIT_VSIG= PDNS_N_THREADS, + CHECK_INBOX_VSIG +}; + + +/*============================================================*/ +/*=========== Forward declarations ===========================*/ +/*============================================================*/ +static int check_inbox_f(void *data, int signo); +static int child_check_inbox_f(void *vp_ndx, int signo); +static void* child_main (void *data); +static int child_exit_f(void *data, int signo); +static int join_f(void *data, int signo); +static void stop_remaining_children(void); +static int timeout_f(void *data); + +/*============================================================*/ +/*=========== Static data ====================================*/ +/*============================================================*/ +static struct { + + enum { + EXIT_FLG= 1<<0, + DONE_FLG= 1<<1 + } flags; + + int timeoutKey, + inboxKey, + joinKeyArr[PDNS_N_THREADS]; + + pthread_t tid; + MSGQUEUE inbox; + LOGENTRY **leArr; + unsigned ndx, + nItems, + nSucc, + nFail; + + /* One of these for each child thread */ + struct child { + + int is_joined; + + pthread_t tid; + MSGQUEUE inbox; + + } childArr[PDNS_N_THREADS]; + +} S; + +/*============================================================*/ +/*=========== PDNS ===========================================*/ +/*============================================================*/ +int +PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms) +/************************************************************** + * Perform parallel DNS reverse lookups on all LOGENTRY objects + * referenced in leArr. + */ +{ + int rtn= -1; + + /* Publish our thread ID */ + S.tid= pthread_self(); + + /* Prepare our inbox */ + MSGQUEUE_constructor(&S.inbox, sizeof(unsigned), PDNS_INBOX_SZ); + + /* Stash this where it's easy to get to */ + S.nItems= nItems; + S.leArr= leArr; + + /* Register a countdown timer to know when to stop */ + S.timeoutKey= ez_ES_registerTimer(timeout_ms, 0, timeout_f, NULL); + /* Check inbox on CHECK_INBOX_VSIG */ + S.inboxKey= ez_ES_registerVSignal(CHECK_INBOX_VSIG, check_inbox_f, NULL); + + assert(S.timeoutKey && S.inboxKey); + + /* Start child threads */ + for(; S.ndx < PDNS_N_THREADS; ++S.ndx) { + + /* Register the join handler on vsig= array index */ + S.joinKeyArr[S.ndx]= ez_ES_registerVSignal(S.ndx, join_f, NULL); + + struct child *ch= S.childArr + S.ndx; + /* Pass the child's array index in to child_main() */ + ch->tid= ES_spawn_thread(child_main, (void*)(long unsigned)S.ndx); + + /* Give the child something to do */ + int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.leArr[S.ndx]); + assert(0 == rc); + /* Prompt child to check inbox */ + ES_VSignal(ch->tid, CHECK_INBOX_VSIG); + + } + + /* Wait for something to happen */ + ES_run(); +//eprintf("------------ ALL THREADS TERMINATED ------------"); + + /* Unregister signal handlers for this thread */ + if(S.timeoutKey) + ez_ES_unregister(S.timeoutKey); + + ez_ES_unregister(S.inboxKey); + + /* Release all the join registrations */ + for(unsigned i= 0; i < PDNS_N_THREADS; ++i) { + ez_ES_unregister(S.joinKeyArr[i]); + } + +//eprintf("INFO: nItems= %u, nSucc= %u, nFail= %u", nItems, S.nSucc, S.nFail); + + rtn= 0; +abort: + return rtn; +} + +static int +check_inbox_f(void *data, int signo) +/********************************************************* + * Parent was prompted to check the inbox to see which + * child is ready for another task. + */ +{ + int rtn= -1; + unsigned child_ndx; + + while(EOF != MSGQUEUE_extractTypedMsg(&S.inbox, child_ndx)) { + + /* Get pointer to child */ + struct child *ch= S.childArr + child_ndx; + + /* Noting left to do here */ + if(S.ndx == S.nItems) { + pthread_kill(ch->tid, SIGTERM); + continue; + } + + /* Tell child to work on next task */ + int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.leArr[S.ndx]); + assert(0 == rc); + ES_VSignal(ch->tid, CHECK_INBOX_VSIG); + + /* Move the "todo" ndx forward */ + ++S.ndx; + + if(S.ndx == S.nItems) + S.flags |= DONE_FLG; + } + + rtn= 0; +abort: + return rtn; +} + +static int +join_f(void *data, int signo) +/********************************************************* + * Child prompted us to join + */ +{ + struct child *ch= S.childArr + signo; + void *pRtn; + + pthread_join(ch->tid, &pRtn); + + ch->is_joined= 1; + + unsigned i; + for(i= 0; i < PDNS_N_THREADS; ++i) { + + struct child *ch= S.childArr + i; + if(!ch->is_joined) break; + } + + /* This will naturally terminate when we are done.*/ + return PDNS_N_THREADS == i ? -1 : 0; +} + +static void +stop_remaining_children(void) +/********************************************************* + * Signal any remaining children to stop. + */ +{ + /* Tell all remaining child threads to exit now */ + unsigned i; + for(i= 0; i < PDNS_N_THREADS; ++i) { + + struct child *ch= S.childArr + i; + + /* If it has already joined, skip it */ + if(ch->is_joined) continue; + + /* Prompt child to shut down now */ + pthread_kill(ch->tid, SIGTERM); + } +} + +static int +timeout_f(void *data) +/********************************************************* + * Countdown timer has expired. + */ +{ + /* Note that the countdown timer fired */ + S.timeoutKey= 0; + + /* Post notice that it is time to shut down */ + S.flags |= EXIT_FLG; + +//eprintf("============ TERMINATE NOW ============"); + + stop_remaining_children(); + + return 0; +} + +/*============================================================*/ +/*================= Child threads ============================*/ +/*============================================================*/ + +static void* +child_main (void *vp_ndx) +/********************************************************* + * Children begin execution here. + */ +{ + unsigned ndx= (long unsigned)vp_ndx; + struct child *self= S.childArr + ndx; + + /* Prepare child's static data */ + MSGQUEUE_constructor(&self->inbox, sizeof(LOGENTRY*), PDNS_CHILD_INBOX_SZ); + + /* Register to exit when prompted */ + ez_ES_registerSignal(SIGTERM, child_exit_f, NULL); + + /* Register to check inbox when prompted */ + ez_ES_registerVSignal(CHECK_INBOX_VSIG, child_check_inbox_f, vp_ndx); + + /* Parent has been blocked waiting for this call */ + ES_release_parent(); + + /* Respond to directives from parent */ + ES_run(); + + /* Let the main thread know we are ready to join */ + ES_VSignal(S.tid, ndx); + + return NULL; +} + +static int +child_check_inbox_f(void *vp_ndx, int signo) +/********************************************************* + * Child was prompted to check the inbox for tasks. + */ +{ + int rtn= -1; + unsigned ndx= (long unsigned)vp_ndx; + struct child *self= S.childArr + ndx; + char hostBuf[PATH_MAX]; + LOGENTRY *e; + const static struct addrinfo hints= { + .ai_family = AF_UNSPEC, /* Allow IPv4 or IPv6 */ + .ai_flags = AI_NUMERICHOST /* doing reverse lookups */ + }; + + while(!(S.flags & EXIT_FLG) && + EOF != MSGQUEUE_extractTypedMsg(&self->inbox, e)) + { + assert(e); + + /* Get a populated addrinfo object */ + struct addrinfo *res= NULL; + int rc= ez_getaddrinfo(e->addr, NULL, &hints, &res); + assert(0 == rc); + assert(res && res->ai_addr && res->ai_addrlen); + /* Now do blocking reverse lookup */ + rc= ez_getnameinfo(res->ai_addr, res->ai_addrlen, hostBuf, sizeof(hostBuf)-1, NULL, 0, NI_NAMEREQD); + switch(rc) { + case 0: + e->dnsName= strdup(hostBuf); + ++S.nSucc; + break; + + case EAI_NONAME: + e->dnsName= "not found: 3(NXDOMAIN)"; + ++S.nFail; + break; + + case EAI_AGAIN: + e->dnsName= "not found: 2(SERVFAIL)"; + ++S.nFail; + break; + + default: + abort(); + } + } + + if(S.flags & EXIT_FLG) return -1; + + /* Submit the child's array ndx to main parent + * thread's inbox to indicate we are ready for + * more. + */ + MSGQUEUE_submitTypedMsg(&S.inbox, ndx); + ES_VSignal(S.tid, CHECK_INBOX_VSIG); + + rtn= 0; +abort: + return rtn; +} + +static int +child_exit_f(void *vp_ndx, int signo) +/********************************************************* + * Child was prompted to exit now, so return -1 so child_main() + * will return from ES_run(). + */ +{ + return -1; +} diff --git a/pdns.h b/pdns.h new file mode 100644 index 0000000..e6ba4f5 --- /dev/null +++ b/pdns.h @@ -0,0 +1,44 @@ +/*************************************************************************** + * Copyright (C) 2019 by John D. Robertson * + * john@rrci.com * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 3 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * + ***************************************************************************/ +#ifndef PARALLEL_DNS_H +#define PARALLEL_DNS_H + +#include "logEntry.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* Number of threads to use in parallel */ +#define PDNS_N_THREADS 160 +#define PDNS_INBOX_SZ 200 +#define PDNS_CHILD_INBOX_SZ 2 + +int +PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms); +/************************************************************** + * Perform parallel DNS reverse lookups on all LOGENTRY objects + * referenced in leArr until finished, or timeout_ms has lapsed. + */ + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/util.h b/util.h index 9b9c9c8..7ebf247 100644 --- a/util.h +++ b/util.h @@ -276,7 +276,9 @@ regex_compile(regex_t *preg, const char *pattern, int cflags); FILE* pager_open(void); /*************************************************** - * open() the caller's $PAGER on tmpfile. + * popen() the caller's $PAGER. Calling pclose() + * will then wait for pager to finish, and subsequently + * close the stream. */ const char*