This commit is contained in:
aiden 2022-06-07 21:27:10 +01:00
parent 8f0646c2cf
commit 092dd9c628
No known key found for this signature in database
GPG Key ID: 0D87FF3415416DB1
5 changed files with 39 additions and 31 deletions

@ -79,7 +79,7 @@ struct bdd_conversation *bdd_conversation_obtain(int epoll_fd) {
conversation->epoll_fd = epoll_fd;
conversation->sosi.service_instance = NULL;
conversation->io_array = io_array;
conversation->n_connecting = 0;
conversation->n_blocking = 0;
conversation->n_in_epoll_with_events = 0;
conversation->n_ev = 0;
conversation->remove = false;

@ -13,6 +13,7 @@ enum bdd_io_state {
// established //
bdd_io_est,
bdd_io_out, // est but with epollout rather than epollin
bdd_io_ssl_shutting, // ssl shutdown in progress
};

@ -41,7 +41,7 @@ struct bdd_conversation {
struct bdd_io *io_array;
bdd_io_id n_connecting;
bdd_io_id n_blocking;
bdd_io_id n_in_epoll_with_events;
bdd_io_id n_ev;

@ -109,9 +109,9 @@ void bdd_io_state(struct bdd_io *io, enum bdd_io_state new_state) {
assert(state != new_state);
if (state == bdd_io_connecting) {
conversation->n_connecting -= 1;
if (!conversation->n_connecting) {
if (state == bdd_io_connecting || state == bdd_io_out) {
conversation->n_blocking -= 1;
if (!conversation->n_blocking) {
for (bdd_io_id idx = 0; idx < BIDIRECTIOND_N_IO; ++idx) {
struct bdd_io *idx_io = bdd_io(conversation, idx);
if (idx_io == io || (idx_io->state != bdd_io_est && idx_io->state != bdd_io_ssl_shutting)) {
@ -127,11 +127,11 @@ void bdd_io_state(struct bdd_io *io, enum bdd_io_state new_state) {
}
}
if (new_state == bdd_io_connecting) {
if (conversation->n_connecting == 0) {
if (new_state == bdd_io_connecting || new_state == bdd_io_out) {
if (conversation->n_blocking == 0) {
for (bdd_io_id idx = 0; idx < BIDIRECTIOND_N_IO; ++idx) {
struct bdd_io *idx_io = bdd_io(conversation, idx);
if (idx_io == io || idx_io->state == bdd_io_connecting) {
if (idx_io == io || idx_io->state == bdd_io_connecting || idx_io->state == bdd_io_out) {
continue;
}
if (idx_io->state == bdd_io_ssl_shutting) {
@ -141,16 +141,22 @@ void bdd_io_state(struct bdd_io *io, enum bdd_io_state new_state) {
}
}
}
conversation->n_connecting += 1;
conversation->n_blocking += 1;
}
if (new_state == bdd_io_connecting) {
bdd_io_epoll_mod(io, 0, EPOLLIN | EPOLLOUT, true);
bdd_io_epoll_add(io);
} else if (new_state == bdd_io_out) {
bdd_io_epoll_mod(io, EPOLLIN, EPOLLOUT, false);
bdd_io_epoll_add(io);
} else if (new_state == bdd_io_est) {
uint32_t epollin = EPOLLIN;
if (io->rdhup) {
epollin = 0;
}
bdd_io_epoll_mod(io, EPOLLOUT, epollin, false);
if (conversation->n_connecting == 0) {
if (conversation->n_blocking == 0) {
bdd_io_epoll_add(io);
} else {
bdd_io_epoll_remove(io);
@ -175,7 +181,7 @@ __attribute__((warn_unused_result)) ssize_t bdd_io_read(
ssize_t sz
) {
struct bdd_io *io = bdd_io(conversation, io_id);
if (io == NULL || buf == NULL || sz <= 0 || conversation->n_connecting > 0) {
if (io == NULL || buf == NULL || sz <= 0 || conversation->n_blocking > 0) {
fputs("programming error: bdd_io_read called with invalid arguments\n", stderr);
abort();
return -1;
@ -251,7 +257,7 @@ __attribute__((warn_unused_result)) ssize_t bdd_io_write(
ssize_t sz
) {
struct bdd_io *io = bdd_io(conversation, io_id);
if (io == NULL || buf == NULL || sz <= 0 || conversation->n_connecting > 0) {
if (io == NULL || buf == NULL || sz <= 0) {
fputs("programming error: bdd_io_write called with invalid arguments\n", stderr);
abort();
return -1;
@ -275,8 +281,8 @@ __attribute__((warn_unused_result)) ssize_t bdd_io_write(
} else if (err == SSL_ERROR_WANT_READ) {
abort(); // fuck re-negotiation
} else if (err == SSL_ERROR_WANT_WRITE) {
bdd_io_epoll_mod(io, 0, EPOLLOUT, false);
return 0;
r = 0;
goto want_send;
}
if (bdd_io_hup(io, false)) {
return -2;
@ -290,18 +296,21 @@ __attribute__((warn_unused_result)) ssize_t bdd_io_write(
goto send;
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
bdd_io_epoll_mod(io, 0, EPOLLOUT, false);
return 0;
r = 0;
goto want_send;
}
if (bdd_io_hup(io, false)) {
return -2;
}
return -1;
}
if (r != sz) {
bdd_io_epoll_mod(io, 0, EPOLLOUT, false);
}
}
if (r == sz) {
return sz;
}
want_send:;
bdd_io_state(io, bdd_io_out);
return r;
}

@ -102,8 +102,6 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
struct bdd_conversation *conversation = process_list;
process_list = conversation->next;
bool any_connecting = conversation->n_connecting > 0;
for (size_t idx = 0; idx < conversation->n_ev;) {
struct bdd_ev *ev = bdd_ev(conversation, idx);
@ -121,20 +119,19 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
break;
}
}
goto remove_event;
} else if (ev->events & bdd_ev_err) {
assert(!(ev->events & bdd_ev_out));
if (bdd_io_hup(io, false)) {
ev->events = bdd_ev_removed;
bdd_io_discard(io);
} else {
if (io->state == bdd_io_ssl_shutting) {
bdd_io_state(io, bdd_io_est);
ev->events &= ~bdd_ev_err;
}
} else if (io->state > bdd_io_est) {
bdd_io_state(io, bdd_io_est);
}
} else if (ev->events & bdd_ev_out) {
if (io->state == bdd_io_est) {
bdd_io_epoll_mod(io, EPOLLOUT, 0, false);
if (io->state == bdd_io_out) {
assert(!io->wrhup);
bdd_io_state(io, bdd_io_est);
} else if (io->state == bdd_io_ssl_shutting) {
if (bdd_ssl_shutdown_continue(io) == bdd_shutdown_complete) {
if (bdd_io_hup(io, false)) {
@ -155,11 +152,12 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
if (io->wrhup && (ev->events & bdd_ev_out)) {
abort();
}
#endif
if (any_connecting) {
ev->events &= ~(bdd_ev_in | bdd_ev_out);
if (conversation->n_blocking > 0) {
assert(!(ev->events & bdd_ev_in));
}
#endif
if (!ev->events) {
remove_event:;
memmove(ev, &(ev[1]), (--conversation->n_ev - idx) * sizeof(struct bdd_ev));
} else {
idx += 1;