From 18f1d995f8ba1eae8de6735e25feb158f6d4be11 Mon Sep 17 00:00:00 2001 From: wangyu Date: Fri, 28 Jul 2017 19:47:30 +0800 Subject: [PATCH] support multi client,but still buggy --- main.cpp | 258 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 203 insertions(+), 55 deletions(-) diff --git a/main.cpp b/main.cpp index 49843d0..ece220e 100755 --- a/main.cpp +++ b/main.cpp @@ -95,7 +95,7 @@ const int udp_timeout=3000; const int heartbeat_interval=1000; -const int timer_interval=200; +const int timer_interval=500; const int RETRY_TIME=3; @@ -131,6 +131,9 @@ int epollfd=-1; int random_number_fd=-1; int timer_fd=-1; +unordered_map udp_fd_mp(100007); +unordered_map timer_fd_mp(100007); + char key_string[1000]= "secret key"; char key[16],key2[16]; @@ -140,6 +143,9 @@ const int anti_replay_window_size=1000; const int max_conv_num=10000; const int conv_timeout=120000; //60 second const int conv_clear_ratio=10; + +const int conn_timeout=60000; +const int conn_clear_ratio=10; uint64_t current_time_rough=0; @@ -360,11 +366,20 @@ struct conv_manager_t //TODO change map to unordered map clear_it=conv_last_active_time.begin(); //clear_function=0; } + ~conv_manager_t() + { + clear(); + } int get_size() { return conv_to_u64.size(); } - + void reserve() + { + u64_to_conv.reserve(10007); + conv_to_u64.reserve(10007); + conv_last_active_time.reserve(10007); + } void clear() { if(disable_conv_clear) return ; @@ -472,7 +487,7 @@ struct conv_manager_t //TODO change map to unordered map } return 0; } -}conv_manager; +};//g_conv_manager; struct packet_info_t @@ -535,6 +550,8 @@ struct conn_info_t id_t my_id; id_t oppsite_id; int retry_counter; + conv_manager_t conv_manager; + int timer_fd; conn_info_t() { //send_packet_info.protocol=g_packet_info_send.protocol; @@ -547,6 +564,9 @@ struct conn_info_t struct conn_manager_t { unordered_map mp; + unordered_map::iterator it; + unordered_map::iterator clear_it; + unordered_map::iterator old_it; uint32_t current_ready_ip; uint16_t current_ready_port; conn_manager_t() @@ -584,11 +604,64 @@ struct conn_manager_t u64|=port; return mp[u64]; } +int clean_inactive() +{ + if(disable_conv_clear) return 0; + + //map::iterator it; + int cnt=0; + it=clear_it; + int size=mp.size(); + int num_to_clean=size/conv_clear_ratio+1; //clear 1/10 each time,to avoid latency glitch + + uint64_t current_time=get_current_time(); + for(;;) + { + if(cnt>=num_to_clean) break; + if(mp.begin()==mp.end()) break; + + if(it==mp.end()) + { + it=mp.begin(); + } + + if( current_time - it->second.last_hb_recv_time >conv_timeout ) + { + //mylog(log_info,"inactive conv %u cleared \n",it->first); + old_it=it; + it++; + timer_fd_mp.erase(old_it->second.timer_fd); + close(old_it->second.timer_fd);// close will auto delte it from epoll + mp.erase(old_it->first); + } + else + { + it++; + } + cnt++; + } + return 0; +} }conn_manager; int TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT; ////////==========================type divider======================================================= +uint64_t pack_u64(uint32_t a,uint32_t b) +{ + uint64_t ret=a; + ret<<=32u; + ret+=b; + return ret; +} +uint32_t get_u64_h(uint64_t a) +{ + return a>>32u; +} +uint32_t get_u64_l(uint64_t a) +{ + return (a<<32u)>>32u; +} char * my_ntoa(uint32_t ip) { @@ -940,18 +1013,18 @@ uint64_t get_current_time() void server_clear_function(uint64_t u64) { - int fd=int((u64<<32u)>>32u); + int fd=int(u64); epoll_event ev; ev.events = EPOLLIN; ev.data.u64 = u64; - - int ret = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); + int ret; + /*ret = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); if (ret!=0) { mylog(log_fatal,"fd:%d epoll delete failed!!!!\n",fd); myexit(-1); //this shouldnt happen - } + }*/ //no need ret= close(fd); if (ret!=0) @@ -959,6 +1032,7 @@ void server_clear_function(uint64_t u64) mylog(log_fatal,"close fd %d failed !!!!\n",fd); myexit(-1); //this shouldnt happen } + udp_fd_mp.erase(udp_fd); } @@ -2333,7 +2407,7 @@ int keep_connection_client() //for client raw_info_t &raw_info=g_conn_info.raw_info; current_time_rough=get_current_time(); - conv_manager.clean_inactive(); + g_conn_info.conv_manager.clean_inactive(); mylog(log_trace,"timer!\n"); begin: @@ -2483,11 +2557,12 @@ int keep_connection_client() //for client return 0; } -int keep_connection_server_multi() +int keep_connection_server_multi(conn_info_t &conn_info) { - raw_info_t &raw_info=g_conn_info.raw_info; + mylog(log_debug,"server timer!\n"); + raw_info_t &raw_info=conn_info.raw_info; current_time_rough=get_current_time(); - conv_manager.clean_inactive(); + conn_info.conv_manager.clean_inactive(); if(g_conn_info.server_current_state==server_ready) { @@ -2617,6 +2692,34 @@ int set_timer(int epollfd,int &timer_fd) return 0; } +int set_timer_server(int epollfd,int &timer_fd) +{ + int ret; + epoll_event ev; + + itimerspec its; + memset(&its,0,sizeof(its)); + + if((timer_fd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK)) < 0) + { + mylog(log_fatal,"timer_fd create error\n"); + myexit(1); + } + its.it_interval.tv_nsec=timer_interval*1000ll*1000ll; + its.it_value.tv_nsec=1; //imidiately + timerfd_settime(timer_fd,0,&its,0); + + + ev.events = EPOLLIN; + ev.data.u64 = pack_u64(2,timer_fd); + + epoll_ctl(epollfd, EPOLL_CTL_ADD, timer_fd, &ev); + if (ret < 0) { + mylog(log_fatal,"epoll_ctl return %d\n", ret); + myexit(-1); + } + return 0; +} int client_on_raw_recv() { @@ -2786,15 +2889,15 @@ int client_on_raw_recv() uint32_t tmp_conv_id= ntohl(* ((uint32_t *)&data[1])); - if(!conv_manager.is_conv_used(tmp_conv_id)) + if(!g_conn_info.conv_manager.is_conv_used(tmp_conv_id)) { mylog(log_info,"unknow conv %d,ignore\n",tmp_conv_id); return 0; } - conv_manager.update_active_time(tmp_conv_id); + g_conn_info.conv_manager.update_active_time(tmp_conv_id); - uint64_t u64=conv_manager.find_u64_by_conv(tmp_conv_id); + uint64_t u64=g_conn_info.conv_manager.find_u64_by_conv(tmp_conv_id); sockaddr_in tmp_sockaddr; @@ -2819,12 +2922,12 @@ int client_on_raw_recv() } return 0; } -int server_on_raw_ready() +int server_on_raw_ready(conn_info_t &conn_info) { int data_len; char *data; - raw_info_t &raw_info = g_conn_info.raw_info; - packet_info_t &send_info = g_conn_info.raw_info.send_info; - packet_info_t &recv_info = g_conn_info.raw_info.recv_info; + raw_info_t &raw_info = conn_info.raw_info; + packet_info_t &send_info = conn_info.raw_info.send_info; + packet_info_t &recv_info = conn_info.raw_info.recv_info; if (recv_safer(raw_info, data, data_len) != 0) { return -1; @@ -2842,16 +2945,16 @@ int server_on_raw_ready() if (data[0] == 'h' && data_len == 1) { uint32_t tmp = ntohl(*((uint32_t *) &data[1 + sizeof(uint32_t)])); mylog(log_debug, "received hb <%x,%x>\n", oppsite_id, tmp); - g_conn_info.last_hb_recv_time = current_time_rough; + conn_info.last_hb_recv_time = current_time_rough; return 0; } else if (data[0] == 'd' && data_len >= sizeof(uint32_t) + 1) { uint32_t tmp_conv_id = ntohl(*((uint32_t *) &data[1])); - g_conn_info.last_hb_recv_time = current_time_rough; + conn_info.last_hb_recv_time = current_time_rough; mylog(log_debug, "<<<>>>\n", tmp_conv_id); - if (!conv_manager.is_conv_used(tmp_conv_id)) { - if (conv_manager.get_size() >= max_conv_num) { + if (!conn_info.conv_manager.is_conv_used(tmp_conv_id)) { + if (conn_info.conv_manager.get_size() >= max_conv_num) { mylog(log_warn, "ignored new conv %x connect bc max_conv_num exceed\n", tmp_conv_id); @@ -2883,7 +2986,7 @@ int server_on_raw_ready() } struct epoll_event ev; - uint64_t u64 = new_udp_fd; + uint64_t u64 = (uint32_t(new_udp_fd))+(1llu<<32u); mylog(log_trace, "u64: %ld\n", u64); ev.events = EPOLLIN; @@ -2897,7 +3000,8 @@ int server_on_raw_ready() return -1; } - conv_manager.insert_conv(tmp_conv_id, u64); + conn_info.conv_manager.insert_conv(tmp_conv_id, new_udp_fd); + udp_fd_mp[new_udp_fd]=pack_u64(conn_info.raw_info.recv_info.src_ip,conn_info.raw_info.recv_info.src_port); mylog(log_info, "new conv conv_id=%x, assigned fd=%d\n", tmp_conv_id, new_udp_fd); @@ -2906,9 +3010,9 @@ int server_on_raw_ready() } - uint64_t u64 = conv_manager.find_u64_by_conv(tmp_conv_id); + uint64_t u64 = conn_info.conv_manager.find_u64_by_conv(tmp_conv_id); - conv_manager.update_active_time(tmp_conv_id); + conn_info.conv_manager.update_active_time(tmp_conv_id); int fd = int((u64 << 32u) >> 32u); @@ -2938,10 +3042,10 @@ int server_on_raw_recv_multi() return -1; } mylog(log_info,"peek_raw %s %d\n",my_ntoa(ip),port); - if(ip==conn_manager.current_ready_ip&&port==conn_manager.current_ready_port) + /*if(ip==conn_manager.current_ready_ip&&port==conn_manager.current_ready_port) { return server_on_raw_ready(); - } + }*/ int data_len; char *data; if(!conn_manager.exist(ip,port)) @@ -3020,6 +3124,11 @@ int server_on_raw_recv_multi() packet_info_t &recv_info=conn_info.raw_info.recv_info; raw_info_t &raw_info=conn_info.raw_info; + if(conn_info.server_current_state==server_ready) + { + return server_on_raw_ready(conn_info); + } + if(recv_bare(conn_info.raw_info,data,data_len)<0) return -1; @@ -3100,13 +3209,16 @@ int server_on_raw_recv_multi() conn_info.last_hb_recv_time=get_current_time(); conn_info.last_hb_sent_time=conn_info.last_hb_recv_time; - send_safer(g_conn_info.raw_info,(char *)"h",1);/////////////send + send_safer(conn_info.raw_info,(char *)"h",1);/////////////send mylog(log_info,"changed state to server_ready,%d %d\n",ip,port); anti_replay.re_init(); g_conn_info=conn_info; - + int new_timer_fd; + set_timer_server(epollfd,new_timer_fd); + timer_fd_mp[new_timer_fd]=pack_u64(ip,port); + //timer_fd_mp[new_timer_fd] /* if(oppsite_const_id!=0&&tmp_oppsite_const_id!=oppsite_const_id) //TODO MOVE TO READY { @@ -3120,7 +3232,7 @@ int server_on_raw_recv_multi() } - +/* int server_on_raw_recv() { raw_info_t &raw_info=g_conn_info.raw_info; @@ -3239,11 +3351,11 @@ int server_on_raw_recv() mylog(log_trace,"unexpected adress\n"); return 0; } - /* - if(data_len=max_conv_num) + if(g_conn_info.conv_manager.get_size() >=max_conv_num) { mylog(log_warn,"ignored new udp connect bc max_conv_num exceed\n"); continue; } - conv=conv_manager.get_new_conv(); - conv_manager.insert_conv(conv,u64); + conv=g_conn_info.conv_manager.get_new_conv(); + g_conn_info.conv_manager.insert_conv(conv,u64); mylog(log_info,"new connection from %s:%d,conv_id=%x\n",inet_ntoa(udp_new_addr_in.sin_addr),ntohs(udp_new_addr_in.sin_port),conv); } else { - conv=conv_manager.find_conv_by_u64(u64); + conv=g_conn_info.conv_manager.find_conv_by_u64(u64); } - conv_manager.update_active_time(conv); + g_conn_info.conv_manager.update_active_time(conv); if(g_conn_info.client_current_state==client_ready) { @@ -3607,7 +3721,6 @@ int client_event_loop() return 0; } - int server_event_loop() { char buf[buf_len]; @@ -3684,11 +3797,30 @@ int server_event_loop() for (n = 0; n < nfds; ++n) { //printf("%d %d %d %d\n",timer_fd,raw_recv_fd,raw_send_fd,n); - if (events[n].data.u64 == timer_fd) + if ((events[n].data.u64 ) == timer_fd) { - uint64_t value; - read(timer_fd, &value, 8); - keep_connection_server_multi(); + conn_manager.clean_inactive(); + } + if ((events[n].data.u64 >>32u) == 2u) + { + int fd=get_u64_l(events[n].data.u64); + uint64_t dummy; + read(fd, &dummy, 8); + if(timer_fd_mp.find(fd)==timer_fd_mp.end()) + { + mylog(log_info,"timer_fd no longer exits\n", nfds); + continue; + } + uint64_t u64=timer_fd_mp[fd]; + uint32_t ip=get_u64_h(u64); + uint32_t port=get_u64_l(u64); + if(!conn_manager.exist(ip,port)) + { + mylog(log_info,"ip port no longer exits\n", nfds); + continue; + } + conn_info_t &conn_info=conn_manager.find(ip,port); + keep_connection_server_multi(conn_info); } else if (events[n].data.u64 == raw_recv_fd) { @@ -3696,20 +3828,36 @@ int server_event_loop() server_on_raw_recv_multi(); } else - //if ((events[n].data.u64 >>32u) > 0u) + if ((events[n].data.u64 >>32u) == 1u) { //uint32_t conv_id=events[n].data.u64>>32u; - int fd=int(events[n].data.u64); + int fd=int((events[n].data.u64<<32u)>>32u); - if(!conv_manager.is_u64_used(events[n].data.u64)) + if(udp_fd_mp.find(fd)==udp_fd_mp.end()) + { + mylog(log_debug,"fd no longer exists in udp_fd_mp,udp fd %d\n",fd); + continue; + } + + uint64_t u64=udp_fd_mp[fd]; + uint32_t ip=get_u64_h(u64); + uint32_t port=get_u64_l(u64); + if(conn_manager.exist(ip,port)==0) + { + mylog(log_debug,"conn_info no longer exists,udp fd %d\n",fd); + continue; + } + conn_info_t &conn_info=conn_manager.find(ip,port); + + if(!conn_info.conv_manager.is_u64_used(fd)) { mylog(log_debug,"conv no longer exists,udp fd %d\n",fd); int recv_len=recv(fd,buf,buf_len,0); ///////////TODO ,delete this continue; } - uint32_t conv_id=conv_manager.find_conv_by_u64(fd); + uint32_t conv_id=conn_info.conv_manager.find_conv_by_u64(fd); int recv_len=recv(fd,buf,buf_len,0); @@ -3724,9 +3872,9 @@ int server_event_loop() //return 0; } - conv_manager.update_active_time(conv_id); + conn_info.conv_manager.update_active_time(conv_id); - if(g_conn_info.server_current_state==server_ready) + if(conn_info.server_current_state==server_ready) { send_data_safer(g_conn_info.raw_info,buf,recv_len,conv_id); //send_data(g_packet_info_send,buf,recv_len,my_id,oppsite_id,conv_id);