tcp_nodelay
This commit is contained in:
parent
1f37dcced8
commit
716fd154aa
|
@ -8,8 +8,9 @@
|
|||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
#include "headers/bdd_cont.h"
|
||||
#include "headers/instance.h"
|
||||
#include "headers/accept.h"
|
||||
#include "headers/serve.h"
|
||||
|
@ -209,6 +210,8 @@ enum bdd_cont bdd_accept_continue(struct bdd_ssl_cb_ctx *ctx) {
|
|||
return bdd_cont_conversation_discard;
|
||||
}
|
||||
|
||||
BDD_CONVERSATION_AGE_MS(conversation, "accept");
|
||||
|
||||
int fd = bdd_io_fd(io);
|
||||
|
||||
struct bdd_service_instance *service_inst = conversation->sosi.service_instance;
|
||||
|
@ -279,13 +282,11 @@ void bdd_accept(struct bdd_worker_data *worker_data) {
|
|||
}
|
||||
|
||||
// accept
|
||||
BDD_DEBUG_LOG("accepting tcp connection\n");
|
||||
fd = accept(worker_data->serve_fd, NULL, NULL);
|
||||
if (fd < 0) {
|
||||
BDD_DEBUG_LOG("rejected tcp connection\n");
|
||||
goto err;
|
||||
}
|
||||
BDD_DEBUG_LOG("accepted tcp connection\n");
|
||||
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
|
||||
|
||||
if (!SSL_set_fd(ssl, fd)) {
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include "headers/bdd_service.h"
|
||||
#include "headers/bdd_io.h"
|
||||
#include "headers/bdd_event.h"
|
||||
#include "headers/debug_log.h"
|
||||
|
||||
void *bdd_get_associated(struct bdd_conversation *conversation) {
|
||||
return conversation->associated.data;
|
||||
|
@ -81,6 +82,9 @@ struct bdd_conversation *bdd_conversation_obtain(int epoll_fd) {
|
|||
io_array[idx].state = bdd_io_unused;
|
||||
io_array[idx].conversation_id = id;
|
||||
}
|
||||
#ifndef NDEBUG
|
||||
conversation->spawn = bdd_time();
|
||||
#endif
|
||||
conversation->state = bdd_conversation_obtained;
|
||||
conversation->epoll_fd = epoll_fd;
|
||||
conversation->tl = false;
|
||||
|
@ -109,6 +113,7 @@ void bdd_conversation_discard(struct bdd_conversation *conversation) {
|
|||
}
|
||||
}
|
||||
if (conversation->state >= bdd_conversation_obtained) {
|
||||
BDD_CONVERSATION_AGE_MS(conversation, "d");
|
||||
conversation->state = bdd_conversation_unused;
|
||||
|
||||
if (!atomic_load(&(bdd_gv.exiting))) {
|
||||
|
|
|
@ -5,24 +5,21 @@
|
|||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
|
||||
int bdd_vdl_SSL_write(void *x, char *data, size_t len) {
|
||||
if (len == 0) {
|
||||
puts("SSL_write len is 0?");
|
||||
}
|
||||
ssize_t bdd_vdl_SSL_write(void *x, char *data, size_t len) {
|
||||
printf("[send %zi] ----------\n", len);
|
||||
for (size_t idx = 0; idx < len; ++idx) {
|
||||
if (data[idx] >= 0x20 && data[idx] <= 0x7e) {
|
||||
if (data[idx] >= 0x20 && data[idx] <= 0x7e && data[idx] != '\\') {
|
||||
putc(data[idx], stdout);
|
||||
} else {
|
||||
printf("\\x%02x", data[idx]);
|
||||
}
|
||||
}
|
||||
putchar('\n');
|
||||
return SSL_write(x, data, len);
|
||||
ssize_t v = SSL_write(x, data, len);
|
||||
printf("\n---------- [sent %zi]\n", v);
|
||||
return v;
|
||||
}
|
||||
int bdd_vdl_send(int a, char *b, size_t c, int _) {
|
||||
if (c == 0) {
|
||||
puts("send len is 0?");
|
||||
}
|
||||
ssize_t bdd_vdl_send(int a, char *b, size_t c, int _) {
|
||||
printf("[send %zu] ----------\n", c);
|
||||
for (size_t idx = 0; idx < c; ++idx) {
|
||||
if (b[idx] >= 0x20 && b[idx] <= 0x7e) {
|
||||
putc(b[idx], stdout);
|
||||
|
@ -30,8 +27,9 @@ int bdd_vdl_send(int a, char *b, size_t c, int _) {
|
|||
printf("\\x%02x", b[idx]);
|
||||
}
|
||||
}
|
||||
putchar('\n');
|
||||
return send(a, b, c, _);
|
||||
ssize_t v = send(a, b, c, _);
|
||||
printf("\n---------- [sent %zi]\n", v);
|
||||
return v;
|
||||
}
|
||||
int bdd_vdl_pthread_mutex_lock(void *_, char *name, int ln) {
|
||||
printf("%p (%s) lock attempt @ %i!\n", _, name, ln);
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
#include <stdint.h>
|
||||
#include <signal.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
struct bdd_settings {
|
||||
int epoll_timeout;
|
||||
|
@ -13,6 +14,8 @@ struct bdd_settings {
|
|||
sigset_t sigmask;
|
||||
|
||||
int *sockfds;
|
||||
|
||||
bool tcp_nodelay;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -35,6 +35,10 @@ enum bdd_conversation_state {
|
|||
};
|
||||
|
||||
struct bdd_conversation {
|
||||
#ifndef NDEBUG
|
||||
time_t spawn;
|
||||
#endif
|
||||
|
||||
enum bdd_conversation_state state;
|
||||
int epoll_fd;
|
||||
|
||||
|
|
|
@ -4,15 +4,18 @@
|
|||
#ifdef NDEBUG
|
||||
|
||||
#define BDD_DEBUG_LOG(x...) (0)
|
||||
#define BDD_CONVERSATION_AGE_MS(x...) (0)
|
||||
|
||||
#else
|
||||
|
||||
#include <stdio.h>
|
||||
#define BDD_DEBUG_LOG(string, args...) (printf("[DEBUG (%p)] " string, (void *)pthread_self(), ##args), fflush(stdout))
|
||||
|
||||
#include "conversations.h"
|
||||
#include "timeout_list.h"
|
||||
#define BDD_DEBUG_LOG(string, args...) (printf("[DEBUG] " string, ##args), fflush(stdout))
|
||||
#define BDD_CONVERSATION_AGE_MS(conversation, string, args...) (printf("[DEBUG conversation %p age %zums] " string "\n", conversation, bdd_time() - conversation->spawn, ##args), fflush(stdout))
|
||||
#ifdef BIDIRECTIOND_VERBOSE_DEBUG_LOG
|
||||
int bdd_vdl_SSL_write(void *x, char *data, size_t len);
|
||||
int bdd_vdl_send(int a, char *b, size_t c, int _);
|
||||
ssize_t bdd_vdl_SSL_write(void *x, char *data, size_t len);
|
||||
ssize_t bdd_vdl_send(int a, char *b, size_t c, int _);
|
||||
#define SSL_write bdd_vdl_SSL_write
|
||||
#define send bdd_vdl_send
|
||||
int bdd_vdl_pthread_mutex_lock(void *_, char *name, int ln);
|
||||
|
|
|
@ -44,6 +44,8 @@ struct bdd_gv {
|
|||
unsigned short int n_workers;
|
||||
struct bdd_worker_data *worker;
|
||||
unsigned short int workers_idx;
|
||||
|
||||
bool tcp_nodelay;
|
||||
};
|
||||
#define bdd_gv_worker(idx) (struct bdd_worker_data *)((char *)bdd_gv.worker + ((sizeof(struct bdd_worker_data) + (sizeof(struct epoll_event) * bdd_gv.n_epoll_oevents)) * idx))
|
||||
extern struct bdd_gv bdd_gv;
|
||||
|
|
|
@ -15,4 +15,6 @@ void bdd_tl_process(struct bdd_tl *timeout_list);
|
|||
|
||||
void bdd_tl_init(struct bdd_tl *timeout_list);
|
||||
|
||||
time_t bdd_time(void);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -35,6 +35,8 @@ struct bdd_gv bdd_gv = {
|
|||
|
||||
.worker = NULL,
|
||||
.workers_idx = 0,
|
||||
|
||||
.tcp_nodelay = true,
|
||||
};
|
||||
|
||||
SSL_CTX *bdd_ssl_ctx_skel(void) {
|
||||
|
@ -130,9 +132,14 @@ bool bdd_go(struct bdd_settings settings) {
|
|||
) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bdd_gv.tcp_nodelay = settings.tcp_nodelay;
|
||||
bdd_gv.n_workers = sysconf(_SC_NPROCESSORS_ONLN);
|
||||
|
||||
#ifndef NDEBUG
|
||||
bdd_gv.n_workers = 1;
|
||||
#endif
|
||||
|
||||
bool locked = false;
|
||||
|
||||
SSL_CTX *cl_ssl_ctx = SSL_CTX_new(TLS_client_method());
|
||||
|
|
19
bdd/src/io.c
19
bdd/src/io.c
|
@ -8,6 +8,8 @@
|
|||
#include <fcntl.h>
|
||||
#include <alloca.h>
|
||||
#include <errno.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <netinet/in.h>
|
||||
|
||||
#include "headers/instance.h"
|
||||
#include "headers/accept.h"
|
||||
|
@ -15,6 +17,7 @@
|
|||
#include "headers/bdd_service.h"
|
||||
#include "headers/bdd_io.h"
|
||||
#include "headers/bidirectiond_n_io.h"
|
||||
#include "headers/debug_log.h"
|
||||
|
||||
bdd_io_id bdd_io_id_of(struct bdd_io *io) {
|
||||
struct bdd_conversation *conversation = io_conversation(io);
|
||||
|
@ -62,6 +65,8 @@ void bdd_io_epoll_mod(struct bdd_io *io, uint8_t remove_events, uint8_t add_even
|
|||
#ifndef NDEBUG
|
||||
if (io->rdhup) {
|
||||
assert(!(io->epoll_events & bdd_epoll_in));
|
||||
} else {
|
||||
assert(io->epoll_events & bdd_epoll_in);
|
||||
}
|
||||
#endif
|
||||
if (io->in_epoll) {
|
||||
|
@ -117,6 +122,9 @@ void bdd_io_epoll_remove(struct bdd_io *io) {
|
|||
|
||||
bool bdd_io_hup(struct bdd_io *io, bool rdhup) {
|
||||
assert(io->state >= bdd_io_est);
|
||||
#ifndef NDEBUG
|
||||
BDD_DEBUG_LOG("conversation: %p, io: %i, %shup\n", io_conversation(io), bdd_io_id_of(io), rdhup ? "rd" : "wr");
|
||||
#endif
|
||||
if (rdhup) {
|
||||
io->rdhup = 1;
|
||||
} else {
|
||||
|
@ -246,10 +254,7 @@ __attribute__((warn_unused_result)) ssize_t bdd_io_read(
|
|||
bdd_io_epoll_mod(io, bdd_epoll_in, 0, false);
|
||||
return -4;
|
||||
} else if (
|
||||
(
|
||||
err == SSL_ERROR_WANT_READ /* read all of the bytes and no close_notify received */ ||
|
||||
err == SSL_ERROR_NONE
|
||||
)
|
||||
err == SSL_ERROR_WANT_READ /* read all of the bytes and no close_notify received */
|
||||
) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -541,6 +546,12 @@ enum bdd_cont bdd_io_connect(
|
|||
abort();
|
||||
}
|
||||
int fd = socket(sockaddr->sa_family, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
||||
|
||||
if (bdd_gv.tcp_nodelay) {
|
||||
int flag = 1;
|
||||
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &(flag), sizeof(int));
|
||||
}
|
||||
|
||||
if (fd < 0) {
|
||||
goto err;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
#include <openssl/ssl.h>
|
||||
#include <signal.h>
|
||||
#include <sys/epoll.h>
|
||||
#include <unistd.h>
|
||||
|
@ -57,8 +58,6 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
|
|||
struct epoll_event *events = worker_data->events;
|
||||
epoll:;
|
||||
|
||||
BDD_DEBUG_LOG("polling\n");
|
||||
|
||||
int n_events;
|
||||
do {
|
||||
n_events = epoll_wait(epoll_fd, events, bdd_gv.n_epoll_oevents, bdd_gv.epoll_timeout);
|
||||
|
@ -81,6 +80,7 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
|
|||
continue;
|
||||
}
|
||||
struct bdd_conversation *conversation = io_conversation(io);
|
||||
BDD_CONVERSATION_AGE_MS(conversation, "event loop");
|
||||
if (conversation->remove) {
|
||||
continue;
|
||||
}
|
||||
|
@ -231,9 +231,19 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
|
|||
}
|
||||
|
||||
if (conversation->n_ev != 0) {
|
||||
BDD_CONVERSATION_AGE_MS(conversation, "s");
|
||||
conversation->sosi.service->handle_events(conversation);
|
||||
BDD_CONVERSATION_AGE_MS(conversation, "e");
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
for (bdd_io_id idx = 0; idx < BIDIRECTIOND_N_IO; ++idx) {
|
||||
if (conversation->io_array[idx].state != bdd_io_unused && conversation->io_array[idx].ssl) {
|
||||
assert(!SSL_has_pending(conversation->io_array[idx].io.ssl));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (conversation->n_in_epoll_with_events == 0 || conversation->remove) {
|
||||
conversation_discard:;
|
||||
bdd_conversation_discard(conversation);
|
||||
|
|
|
@ -7,12 +7,9 @@
|
|||
#include "headers/conversations.h"
|
||||
|
||||
time_t bdd_time(void) {
|
||||
time_t ms = 0;
|
||||
struct timeval x;
|
||||
gettimeofday(&(x), NULL);
|
||||
ms += x.tv_sec * 1000;
|
||||
ms += x.tv_usec / 1000;
|
||||
return ms;
|
||||
struct timespec now;
|
||||
clock_gettime(CLOCK_REALTIME, &(now));
|
||||
return (now.tv_sec * 1000) + (now.tv_nsec / 1000000);
|
||||
}
|
||||
|
||||
void bdd_tl_link(struct bdd_tl *timeout_list, struct bdd_conversation *conversation) {
|
||||
|
|
26
cmd/main.c
26
cmd/main.c
|
@ -13,6 +13,7 @@
|
|||
#include <sys/stat.h>
|
||||
#include <sys/un.h>
|
||||
#include <unistd.h>
|
||||
#include <netinet/tcp.h>
|
||||
|
||||
#include "core_settings.h"
|
||||
#include "cp_pwd.h"
|
||||
|
@ -32,6 +33,7 @@ struct bdd_settings settings = {
|
|||
.n_epoll_oevents = 0x200,
|
||||
.epoll_timeout = -1,
|
||||
.sockfds = NULL,
|
||||
.tcp_nodelay = true,
|
||||
};
|
||||
|
||||
#define PASTE(x, y) x##y
|
||||
|
@ -86,10 +88,13 @@ int main(int argc, char *argv[], char *env[]) {
|
|||
};
|
||||
int sig_fd = -1;
|
||||
|
||||
size_t n_threads = sysconf(_SC_NPROCESSORS_ONLN) /* workers */;
|
||||
size_t n_workers = sysconf(_SC_NPROCESSORS_ONLN);
|
||||
#ifndef NDEBUG
|
||||
n_workers = 1;
|
||||
#endif
|
||||
|
||||
// name_descs
|
||||
if (!bdd_name_descs_create(n_threads + 1)) {
|
||||
if (!bdd_name_descs_create(n_workers + 1)) {
|
||||
fputs("failed to allocate name_descs\n", stderr);
|
||||
goto clean_up;
|
||||
}
|
||||
|
@ -259,10 +264,9 @@ int main(int argc, char *argv[], char *env[]) {
|
|||
strcpy(input_addr.sun_path, arg[1]);
|
||||
}
|
||||
arg += 2;
|
||||
} else if (strcmp((*arg), "--big-alloc") == 0) {
|
||||
EXPECT_ARGS(1);
|
||||
EXPECT(stosz(&(big_alloc_sz), arg[1]));
|
||||
arg += 2;
|
||||
} else if (strcmp((*arg), "--tcp-delay") == 0) {
|
||||
settings.tcp_nodelay = false;
|
||||
arg += 1;
|
||||
} else {
|
||||
for (size_t idx = 0; idx < n_services; ++idx) {
|
||||
if (services[idx].supported_arguments != NULL)
|
||||
|
@ -307,7 +311,7 @@ int main(int argc, char *argv[], char *env[]) {
|
|||
"that some bidirectiond settings can be modified "
|
||||
"without restarting\n"
|
||||
"--n-epoll-oevents: epoll_wait maxevents\n"
|
||||
"--big-alloc: reserve some memory\n", stdout);
|
||||
"--tcp-delay: disables tcp_nodelay\n", stdout);
|
||||
for (size_t idx = 0; idx < n_services; ++idx) {
|
||||
if (services[idx].arguments_help != NULL) {
|
||||
fputs(services[idx].arguments_help, stdout);
|
||||
|
@ -343,11 +347,11 @@ int main(int argc, char *argv[], char *env[]) {
|
|||
sv_addr.inet6.sin6_port = htons(port);
|
||||
}
|
||||
// try to bind to port
|
||||
sockfds = malloc(sizeof(int) * n_threads);
|
||||
sockfds = malloc(sizeof(int) * n_workers);
|
||||
if (sockfds == NULL) {
|
||||
goto clean_up;
|
||||
}
|
||||
for (; fuck_idx < n_threads; ++fuck_idx) {
|
||||
for (; fuck_idx < n_workers; ++fuck_idx) {
|
||||
int fd = socket(af, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
||||
if (fd < 0) {
|
||||
fprintf(stderr, "failed to create sv_socket! errno: %i\n", errno);
|
||||
|
@ -355,6 +359,10 @@ int main(int argc, char *argv[], char *env[]) {
|
|||
}
|
||||
int opt = 1;
|
||||
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &(opt), sizeof(opt));
|
||||
if (settings.tcp_nodelay) {
|
||||
assert(opt == 1);
|
||||
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &(opt), sizeof(int));
|
||||
}
|
||||
if (bind(fd, (struct sockaddr *)&(sv_addr), sv_addr_sz) < 0) {
|
||||
fprintf(stderr, "failed to bind sv_socket! errno: %i\n", errno);
|
||||
goto sock_err;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#include "../../bdd/headers/services.h"
|
||||
#include <netdb.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <sys/socket.h>
|
||||
#include <sys/types.h>
|
||||
|
@ -19,7 +20,7 @@ struct associated {
|
|||
|
||||
static inline uint8_t serve(struct bdd_conversation *conversation, bdd_io_id from, bdd_io_id to) {
|
||||
struct associated *associated = bdd_get_associated(conversation);
|
||||
for (size_t it = 0; it < 10; ++it) {
|
||||
for (size_t it = 0;; ++it) {
|
||||
ssize_t r = associated->n[to] = bdd_io_read(conversation, from, &(associated->buf[clsvb(to)]), buf_sz_each);
|
||||
if (r <= 0) {
|
||||
return r * -1;
|
||||
|
@ -74,7 +75,8 @@ void general_service__handle_events(struct bdd_conversation *conversation) {
|
|||
associated->idx[io_id] += r;
|
||||
}
|
||||
if (ev->events & bdd_ev_in) {
|
||||
switch (serve(conversation, io_id, io_id ^ 1)) {
|
||||
int r = serve(conversation, io_id, io_id ^ 1);
|
||||
switch (r) {
|
||||
case (4): case (2): { // rdhup
|
||||
switch (bdd_io_shutdown(conversation, io_id ^ 1)) {
|
||||
case (bdd_shutdown_conversation_discard): {
|
||||
|
|
Loading…
Reference in New Issue