err/removed events guaranteed before in/out events

This commit is contained in:
aiden 2022-06-24 06:06:35 +01:00
parent 7f7ee3f84e
commit 0a19851480
No known key found for this signature in database
GPG Key ID: 0D87FF3415416DB1
4 changed files with 33 additions and 22 deletions

@ -294,8 +294,6 @@ int main(int argc, char *argv[], char *env[]) {
arg_err:; arg_err:;
fputs("argument parsing failed\n" fputs("argument parsing failed\n"
"-t: set the amount of worker threads\n" "-t: set the amount of worker threads\n"
"--client-timeout: set the timeout (in ms) for "
"client socket i/o\n"
"--epoll-timeout: set the timeout (in ms) for " "--epoll-timeout: set the timeout (in ms) for "
"bdd_conversation structs\n" "bdd_conversation structs\n"
"-l: set the rlimits for open files (soft limit, " "-l: set the rlimits for open files (soft limit, "

@ -225,6 +225,7 @@ __attribute__((warn_unused_result)) ssize_t bdd_io_read(
} }
if (io->state < bdd_io_est || io->rdhup) { if (io->state < bdd_io_est || io->rdhup) {
fputs("programming error: bdd_io_read called with an io_id which is in an invalid state\n", stderr); fputs("programming error: bdd_io_read called with an io_id which is in an invalid state\n", stderr);
printf("io->state %i, io->rdhup %i\n", io->state, io->rdhup);
abort(); abort();
return -1; return -1;
} }

@ -98,6 +98,7 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
struct bdd_conversation *conversation = process_list; struct bdd_conversation *conversation = process_list;
process_list = conversation->next; process_list = conversation->next;
size_t non_err_idx = 0;
for (size_t idx = 0; idx < conversation->n_ev;) { for (size_t idx = 0; idx < conversation->n_ev;) {
struct bdd_ev *ev = bdd_ev(conversation, idx); struct bdd_ev *ev = bdd_ev(conversation, idx);
@ -171,6 +172,15 @@ void *bdd_serve(struct bdd_worker_data *worker_data) {
remove_event:; remove_event:;
memmove(ev, &(ev[1]), (--conversation->n_ev - idx) * sizeof(struct bdd_ev)); memmove(ev, &(ev[1]), (--conversation->n_ev - idx) * sizeof(struct bdd_ev));
} else { } else {
if (ev->events & (bdd_ev_err | bdd_ev_removed)) {
if (non_err_idx != idx) {
struct bdd_ev this_ev = *ev;
struct bdd_ev *non_err_ev = bdd_ev(conversation, non_err_idx);
*ev = *non_err_ev;
*non_err_ev = this_ev;
}
non_err_idx += 1;
}
idx += 1; idx += 1;
} }
} }

@ -50,46 +50,48 @@ static uint8_t serve(struct bdd_conversation *conversation, struct associated *a
void general_service__handle_events(struct bdd_conversation *conversation) { void general_service__handle_events(struct bdd_conversation *conversation) {
struct associated *a = bdd_get_associated(conversation); struct associated *a = bdd_get_associated(conversation);
size_t n_ev = bdd_n_ev(conversation); size_t n_ev = bdd_n_ev(conversation);
uint8_t events[2] = { 0, 0, };
for (size_t idx = 0; idx < n_ev; ++idx) { for (size_t idx = 0; idx < n_ev; ++idx) {
struct bdd_ev *ev = bdd_ev(conversation, idx); struct bdd_ev *ev = bdd_ev(conversation, idx);
events[ev->io_id] = ev->events; bdd_io_id io_id = ev->io_id;
} if (!(ev->events & (bdd_ev_err | bdd_ev_removed))) {
for (size_t idx = 0; idx < 2; ++idx) { break;
if (events[idx] & bdd_ev_err) { }
if (ev->events & bdd_ev_err) {
goto err; goto err;
} }
if (events[idx] & bdd_ev_removed) { if (ev->events & bdd_ev_removed) {
bool c = true; bool c = true;
if (!(a->flags & (rdhup << clsv(idx)))) { if (!(a->flags & (rdhup << clsv(io_id)))) {
c = false; c = false;
} }
if (!(a->flags & (called_shutdown << clsv(idx)))) { if (!(a->flags & (called_shutdown << clsv(io_id)))) {
c = false; c = false;
} }
if (c) { if (c) {
a->flags |= (wrhup << clsv(idx)); a->flags |= (wrhup << clsv(ev->io_id));
} else { } else {
goto err; goto err;
} }
} }
} }
for (size_t idx = 0; idx < 2; ++idx) { for (size_t idx = 0; idx < n_ev; ++idx) {
if (events[idx] & bdd_ev_out) { struct bdd_ev *ev = bdd_ev(conversation, idx);
assert(!(events[idx] & bdd_ev_in)); bdd_io_id io_id = ev->io_id;
ssize_t r = bdd_io_write(conversation, idx, &(a->buf[clsvb(idx)]), a->n[idx] - a->idx[idx]); if (ev->events & bdd_ev_out) {
assert(!(ev->events & bdd_ev_in));
ssize_t r = bdd_io_write(conversation, io_id, &(a->buf[clsvb(io_id)]), a->n[io_id] - a->idx[io_id]);
if (r < 0) { if (r < 0) {
goto err; goto err;
} }
a->idx[idx] += r; a->idx[io_id] += r;
} }
if (events[idx] & bdd_ev_in) { if (ev->events & bdd_ev_in) {
switch (serve(conversation, a, idx, idx ^ 1)) { switch (serve(conversation, a, io_id, io_id ^ 1)) {
case (2): { case (2): {
a->flags |= (rdhup << clsv(idx)); a->flags |= (rdhup << clsv(io_id));
a->flags |= (called_shutdown << clsv(idx ^ 1)); a->flags |= (called_shutdown << clsv(io_id ^ 1));
if (bdd_io_shutdown(conversation, idx ^ 1) != bdd_shutdown_inprogress) { if (bdd_io_shutdown(conversation, io_id ^ 1) != bdd_shutdown_inprogress) {
a->flags |= (wrhup << clsv(idx ^ 1)); a->flags |= (wrhup << clsv(io_id ^ 1));
} }
break; break;
} }