Added massively parallel reverse dns lookup

This commit is contained in:
john 2019-11-30 00:10:25 -05:00
parent 2e4c215de2
commit eb7bda3dc9
11 changed files with 442 additions and 115 deletions

View File

@ -27,6 +27,7 @@ src := \
map.c \
maxoff.c \
msgqueue.c \
pdns.c \
ptrvec.c \
str.c \
util.c \

View File

@ -33,6 +33,7 @@
#include "logType.h"
#include "map.h"
#include "maxoff.h"
#include "pdns.h"
#include "str.h"
#include "util.h"
@ -127,12 +128,6 @@ static struct {
PTRVEC toBlock_vec,
toUnblock_vec;
/* Used for reverse DNS lookups */
struct {
struct gaicb **cbPtrArr,
*cbArr;
} gai;
/* Used to place LOGENTRY address objects into linear
* access container.
*/
@ -430,99 +425,14 @@ main(int argc, char **argv)
/* Special processing for DNS lookups */
if(G.flags & GLB_DNS_LOOKUP_FLG) {
static struct sigevent sev= {.sigev_notify= SIGEV_NONE};
const static struct addrinfo hints= {
.ai_family = AF_UNSPEC,
.ai_flags = AI_NUMERICHOST
};
ez_fprintf(G.listing_fh, "Performing reverse lookups for a %d seconds ... ", DFLT_DNS_PAUSE_SEC);
fflush(G.listing_fh);
/* Allocate array of structures */
S.gai.cbArr= calloc(sizeof(struct gaicb), nItems);
assert(S.gai.cbArr);
/* Allocate pointer array */
S.gai.cbPtrArr= malloc(sizeof(struct gaicb*) * nItems);
assert(S.gai.cbPtrArr);
/* Fill out cbPtrArr with addresses, populate structures */
for(unsigned i= 0; i < nItems; ++i) {
LOGENTRY *e= S.lePtrArr[i];
struct gaicb *cb= S.gai.cbArr+i;
/* Populate gaicb object */
cb->ar_name= e->addr;
cb->ar_request= &hints;
/* Place object address in cbPtrArr */
S.gai.cbPtrArr[i]= cb;
}
/* See if we can submit all of our requests */
eprintf("Submitting %u addresses for lookup", nItems);
int rc= ez_getaddrinfo_a(GAI_NOWAIT, S.gai.cbPtrArr, nItems, &sev);
if(rc)
eprintf("returned %d", rc);
assert(0 == rc);
// TODO: define max timeout on command line
static struct timespec ts;
ms2timespec(&ts, 10*1000);
/* Pause for parallel DNS lookups */
for(;;) {
int rc= ez_gai_suspend((const struct gaicb*const*)S.gai.cbPtrArr, nItems, &ts);
switch(rc) {
case 0:
case EAI_INTR:
continue;
case EAI_ALLDONE:
break;
default:
eprintf("INFO: gai_suspend() failed, rc= %d [%s]", rc, gai_strerror(rc));
abort();
}
break;
}
eprintf("All done");
unsigned nSucc= 0,
nFail= 0;
/* Cancel any ongoing lookups */
gai_cancel(NULL);
/* Now check each gaicb object */
for(unsigned i= 0; i < nItems; ++i) {
struct gaicb *cb= S.gai.cbArr + i;
int status= gai_error(cb);
static char hostBuf[PATH_MAX];
switch(status) {
case 0: {
++nSucc;
assert(cb->ar_name && cb->ar_result);
struct addrinfo *ai= cb->ar_result;
assert(ai->ai_addr && ai->ai_addrlen);
int rc= ez_getnameinfo(ai->ai_addr, ai->ai_addrlen, hostBuf, sizeof(hostBuf)-1, NULL, 0, NI_NAMEREQD);
eprintf("%s= %s", cb->ar_name, rc ? "unknown" : hostBuf);
} break;
default:
++nFail;
eprintf("INFO: status= %d [%s]", status, gai_strerror(status));
continue;
}
// TODO: use the result
}
eprintf("nItems= %u, nSucc= %u, nFail= %u", nItems, nSucc, nFail);
} /* End of GLB_DNS_LOOKUP_FLG */
int rc= PDNS_lookup(S.lePtrArr, nItems, DFLT_DNS_PAUSE_SEC*1000);
const char *msg= "done";
if(rc) msg= "out of time";
ez_fprintf(G.listing_fh, "%s\n", msg);
}
/* Process each LOGENTRY item */
for(unsigned i= 0; i < nItems; ++i) {
@ -557,22 +467,17 @@ eprintf("Submitting %u addresses for lookup", nItems);
/* Print out only for list option */
if(G.flags & GLB_LIST_ADDR_FLG) {
const char *dns_name= NULL;
#if 0
if(G.flags & GLB_DNS_LOOKUP_FLG)
dns_name= reverse_dns_lookup(e->addr);
#endif
const static char *dns_fmt= "%-15s\t%5u/%-4d offenses %s (%s) [%s]\n",
const static char *dns_fmt= "%-15s\t%5u/%-4d offenses %s (%s) %s\n",
*fmt= "%-15s\t%5u/%-4d offenses %s (%s)\n";
ez_fprintf(G.listing_fh, dns_name ? dns_fmt : fmt
ez_fprintf(G.listing_fh, e->dnsName ? dns_fmt : fmt
, e->addr
, e->count
, nAllowed
, e->cntry[0] ? e->cntry : "--"
, bits2str(flags, BlockBitTuples)
, dns_name
, e->dnsName
);
}

View File

@ -39,6 +39,9 @@
#define N_ADDRESSES_HINT 10000
#define BUCKET_DEPTH_HINT 10
/* How long to wait for reverse DNS lookups before bailing out */
#define DFLT_DNS_PAUSE_SEC 180
/* Where to find stuff */
#define CONFIGFILE "/etc/ban2fail/ban2fail.cfg"
#define LOCKPATH "/run/lock/ban2fail"

13
es.c
View File

@ -434,6 +434,8 @@ ES_registerVSignal (
* On failure, -1 is returned.
*/
{
if(TS.tid != pthread_self()) initialize();
Cb *cb;
if(!Cb_VSignalCreate(cb, signum, callback_f, ctxt)) assert(0);
@ -849,7 +851,12 @@ ES_spawn_thread_sched(
if(pthread_mutex_lock(&S.spawn.cond_mtx)) assert (0);
/* Spawn the new thread */
memset(&tid, 0, sizeof(tid));
rtn = pthread_create (&tid, &attr, user_main, arg);
if(rtn) {
sys_eprintf("ERROR: pthread_create()");
abort();
}
/* Now we, the parent, wait on the child */
while(!S.spawn.release_parent) {
@ -934,16 +941,14 @@ ES_cleanup(void)
int
ES_VSignal (pthread_t tid, int signum)
/**********************************************************************
* Send a virtual signal to tid by placing signum in a mutex protected
* message queue, and then call pthread_kill(tid, SIGHUP) to notify the thread.
* Send a virtual signal to tid, which is multiplexed on SIGUSR2.
*
* tid: Target thread identifier.
* signum: Any integer number which is meaningful to your application.
*
* RETURNS:
* 0: successful
* EOF: the message queue is full
* -1: All other failures.
* -1: failures.
*/
{
int rtn= EOF-1;

6
es.h
View File

@ -77,16 +77,14 @@ ES_registerVSignal (
int
ES_VSignal (pthread_t tid, int signum);
/**********************************************************************
* Send a virtual signal to tid by placing signum in a mutex protected
* message queue, and then call pthread_kill(tid, SIGHUP) to notify the thread.
* Send a virtual signal to tid, which is multiplexed on SIGUSR2.
*
* tid: Target thread identifier.
* signum: Any integer number which is meaningful to your application.
*
* RETURNS:
* 0: successful
* EOF: the message queue is full
* -1: All other failures.
* -1: failures.
*/
int

View File

@ -94,6 +94,12 @@ LOGENTRY_destructor(LOGENTRY *self)
* Free resources.
*/
{
/* Sometimes this is assigned a static string,
* so just let it leak.
*/
// if(self->dnsName)
// free(self->dnsName);
return self;
}

View File

@ -30,6 +30,7 @@ typedef struct _LOGENTRY {
char addr[46],
cntry[3];
unsigned count;
char *dnsName;
} LOGENTRY;
#ifdef __cplusplus

View File

@ -99,6 +99,8 @@ MSGQUEUE_submitMsg (
*
* Returns: 0 for success, non-zero otherwise.
*/
#define MSGQUEUE_submitTypedMsg(self, msg) \
MSGQUEUE_submitMsg(self, &(msg))
int
MSGQUEUE_extractMsg (
@ -113,6 +115,9 @@ MSGQUEUE_extractMsg (
*
* Returns: 0 for success, EOF if the queue is empty.
*/
#define MSGQUEUE_extractTypedMsg(self, msg) \
MSGQUEUE_extractMsg(self, &(msg))
int
MSGQUEUE_checkQueue (

357
pdns.c Normal file
View File

@ -0,0 +1,357 @@
/***************************************************************************
* Copyright (C) 2019 by John D. Robertson *
* john@rrci.com *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the *
* Free Software Foundation, Inc., *
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
#include <assert.h>
#include <limits.h>
#include <signal.h>
#include "ez_es.h"
#include "ez_libc.h"
#include "msgqueue.h"
#include "pdns.h"
#include "util.h"
enum vsignals {
/* All vsigs before this are used to indicate child ready to join */
EXIT_VSIG= PDNS_N_THREADS,
CHECK_INBOX_VSIG
};
/*============================================================*/
/*=========== Forward declarations ===========================*/
/*============================================================*/
static int check_inbox_f(void *data, int signo);
static int child_check_inbox_f(void *vp_ndx, int signo);
static void* child_main (void *data);
static int child_exit_f(void *data, int signo);
static int join_f(void *data, int signo);
static void stop_remaining_children(void);
static int timeout_f(void *data);
/*============================================================*/
/*=========== Static data ====================================*/
/*============================================================*/
static struct {
enum {
EXIT_FLG= 1<<0,
DONE_FLG= 1<<1
} flags;
int timeoutKey,
inboxKey,
joinKeyArr[PDNS_N_THREADS];
pthread_t tid;
MSGQUEUE inbox;
LOGENTRY **leArr;
unsigned ndx,
nItems,
nSucc,
nFail;
/* One of these for each child thread */
struct child {
int is_joined;
pthread_t tid;
MSGQUEUE inbox;
} childArr[PDNS_N_THREADS];
} S;
/*============================================================*/
/*=========== PDNS ===========================================*/
/*============================================================*/
int
PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms)
/**************************************************************
* Perform parallel DNS reverse lookups on all LOGENTRY objects
* referenced in leArr.
*/
{
int rtn= -1;
/* Publish our thread ID */
S.tid= pthread_self();
/* Prepare our inbox */
MSGQUEUE_constructor(&S.inbox, sizeof(unsigned), PDNS_INBOX_SZ);
/* Stash this where it's easy to get to */
S.nItems= nItems;
S.leArr= leArr;
/* Register a countdown timer to know when to stop */
S.timeoutKey= ez_ES_registerTimer(timeout_ms, 0, timeout_f, NULL);
/* Check inbox on CHECK_INBOX_VSIG */
S.inboxKey= ez_ES_registerVSignal(CHECK_INBOX_VSIG, check_inbox_f, NULL);
assert(S.timeoutKey && S.inboxKey);
/* Start child threads */
for(; S.ndx < PDNS_N_THREADS; ++S.ndx) {
/* Register the join handler on vsig= array index */
S.joinKeyArr[S.ndx]= ez_ES_registerVSignal(S.ndx, join_f, NULL);
struct child *ch= S.childArr + S.ndx;
/* Pass the child's array index in to child_main() */
ch->tid= ES_spawn_thread(child_main, (void*)(long unsigned)S.ndx);
/* Give the child something to do */
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.leArr[S.ndx]);
assert(0 == rc);
/* Prompt child to check inbox */
ES_VSignal(ch->tid, CHECK_INBOX_VSIG);
}
/* Wait for something to happen */
ES_run();
//eprintf("------------ ALL THREADS TERMINATED ------------");
/* Unregister signal handlers for this thread */
if(S.timeoutKey)
ez_ES_unregister(S.timeoutKey);
ez_ES_unregister(S.inboxKey);
/* Release all the join registrations */
for(unsigned i= 0; i < PDNS_N_THREADS; ++i) {
ez_ES_unregister(S.joinKeyArr[i]);
}
//eprintf("INFO: nItems= %u, nSucc= %u, nFail= %u", nItems, S.nSucc, S.nFail);
rtn= 0;
abort:
return rtn;
}
static int
check_inbox_f(void *data, int signo)
/*********************************************************
* Parent was prompted to check the inbox to see which
* child is ready for another task.
*/
{
int rtn= -1;
unsigned child_ndx;
while(EOF != MSGQUEUE_extractTypedMsg(&S.inbox, child_ndx)) {
/* Get pointer to child */
struct child *ch= S.childArr + child_ndx;
/* Noting left to do here */
if(S.ndx == S.nItems) {
pthread_kill(ch->tid, SIGTERM);
continue;
}
/* Tell child to work on next task */
int rc= MSGQUEUE_submitTypedMsg(&ch->inbox, S.leArr[S.ndx]);
assert(0 == rc);
ES_VSignal(ch->tid, CHECK_INBOX_VSIG);
/* Move the "todo" ndx forward */
++S.ndx;
if(S.ndx == S.nItems)
S.flags |= DONE_FLG;
}
rtn= 0;
abort:
return rtn;
}
static int
join_f(void *data, int signo)
/*********************************************************
* Child prompted us to join
*/
{
struct child *ch= S.childArr + signo;
void *pRtn;
pthread_join(ch->tid, &pRtn);
ch->is_joined= 1;
unsigned i;
for(i= 0; i < PDNS_N_THREADS; ++i) {
struct child *ch= S.childArr + i;
if(!ch->is_joined) break;
}
/* This will naturally terminate when we are done.*/
return PDNS_N_THREADS == i ? -1 : 0;
}
static void
stop_remaining_children(void)
/*********************************************************
* Signal any remaining children to stop.
*/
{
/* Tell all remaining child threads to exit now */
unsigned i;
for(i= 0; i < PDNS_N_THREADS; ++i) {
struct child *ch= S.childArr + i;
/* If it has already joined, skip it */
if(ch->is_joined) continue;
/* Prompt child to shut down now */
pthread_kill(ch->tid, SIGTERM);
}
}
static int
timeout_f(void *data)
/*********************************************************
* Countdown timer has expired.
*/
{
/* Note that the countdown timer fired */
S.timeoutKey= 0;
/* Post notice that it is time to shut down */
S.flags |= EXIT_FLG;
//eprintf("============ TERMINATE NOW ============");
stop_remaining_children();
return 0;
}
/*============================================================*/
/*================= Child threads ============================*/
/*============================================================*/
static void*
child_main (void *vp_ndx)
/*********************************************************
* Children begin execution here.
*/
{
unsigned ndx= (long unsigned)vp_ndx;
struct child *self= S.childArr + ndx;
/* Prepare child's static data */
MSGQUEUE_constructor(&self->inbox, sizeof(LOGENTRY*), PDNS_CHILD_INBOX_SZ);
/* Register to exit when prompted */
ez_ES_registerSignal(SIGTERM, child_exit_f, NULL);
/* Register to check inbox when prompted */
ez_ES_registerVSignal(CHECK_INBOX_VSIG, child_check_inbox_f, vp_ndx);
/* Parent has been blocked waiting for this call */
ES_release_parent();
/* Respond to directives from parent */
ES_run();
/* Let the main thread know we are ready to join */
ES_VSignal(S.tid, ndx);
return NULL;
}
static int
child_check_inbox_f(void *vp_ndx, int signo)
/*********************************************************
* Child was prompted to check the inbox for tasks.
*/
{
int rtn= -1;
unsigned ndx= (long unsigned)vp_ndx;
struct child *self= S.childArr + ndx;
char hostBuf[PATH_MAX];
LOGENTRY *e;
const static struct addrinfo hints= {
.ai_family = AF_UNSPEC, /* Allow IPv4 or IPv6 */
.ai_flags = AI_NUMERICHOST /* doing reverse lookups */
};
while(!(S.flags & EXIT_FLG) &&
EOF != MSGQUEUE_extractTypedMsg(&self->inbox, e))
{
assert(e);
/* Get a populated addrinfo object */
struct addrinfo *res= NULL;
int rc= ez_getaddrinfo(e->addr, NULL, &hints, &res);
assert(0 == rc);
assert(res && res->ai_addr && res->ai_addrlen);
/* Now do blocking reverse lookup */
rc= ez_getnameinfo(res->ai_addr, res->ai_addrlen, hostBuf, sizeof(hostBuf)-1, NULL, 0, NI_NAMEREQD);
switch(rc) {
case 0:
e->dnsName= strdup(hostBuf);
++S.nSucc;
break;
case EAI_NONAME:
e->dnsName= "not found: 3(NXDOMAIN)";
++S.nFail;
break;
case EAI_AGAIN:
e->dnsName= "not found: 2(SERVFAIL)";
++S.nFail;
break;
default:
abort();
}
}
if(S.flags & EXIT_FLG) return -1;
/* Submit the child's array ndx to main parent
* thread's inbox to indicate we are ready for
* more.
*/
MSGQUEUE_submitTypedMsg(&S.inbox, ndx);
ES_VSignal(S.tid, CHECK_INBOX_VSIG);
rtn= 0;
abort:
return rtn;
}
static int
child_exit_f(void *vp_ndx, int signo)
/*********************************************************
* Child was prompted to exit now, so return -1 so child_main()
* will return from ES_run().
*/
{
return -1;
}

44
pdns.h Normal file
View File

@ -0,0 +1,44 @@
/***************************************************************************
* Copyright (C) 2019 by John D. Robertson *
* john@rrci.com *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the *
* Free Software Foundation, Inc., *
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
#ifndef PARALLEL_DNS_H
#define PARALLEL_DNS_H
#include "logEntry.h"
#ifdef __cplusplus
extern "C" {
#endif
/* Number of threads to use in parallel */
#define PDNS_N_THREADS 160
#define PDNS_INBOX_SZ 200
#define PDNS_CHILD_INBOX_SZ 2
int
PDNS_lookup(LOGENTRY *leArr[], unsigned nItems, unsigned timeout_ms);
/**************************************************************
* Perform parallel DNS reverse lookups on all LOGENTRY objects
* referenced in leArr until finished, or timeout_ms has lapsed.
*/
#ifdef __cplusplus
}
#endif
#endif

4
util.h
View File

@ -276,7 +276,9 @@ regex_compile(regex_t *preg, const char *pattern, int cflags);
FILE*
pager_open(void);
/***************************************************
* open() the caller's $PAGER on tmpfile.
* popen() the caller's $PAGER. Calling pclose()
* will then wait for pager to finish, and subsequently
* close the stream.
*/
const char*