From 047af63801f19bdb15b17e4561985b435cb3133c Mon Sep 17 00:00:00 2001 From: wangyu Date: Fri, 21 Jul 2017 10:36:27 +0800 Subject: [PATCH] multiplex works --- main.cpp | 238 ++++++++++++++++++++++++++++++++++++++++++++++++------- makefile | 2 +- 2 files changed, 211 insertions(+), 29 deletions(-) diff --git a/main.cpp b/main.cpp index fb93919..c7c2c4c 100755 --- a/main.cpp +++ b/main.cpp @@ -49,13 +49,13 @@ int local_port = -1, remote_port = -1; int epollfd ; uint32_t const_id=0; -uint32_t oppsite_const_id; + +uint32_t oppsite_const_id=0; uint32_t my_id=0; uint32_t oppsite_id=0; uint32_t conv_id=0; -uint32_t oppsite_const_id=0; const int handshake_timeout=2000; @@ -87,7 +87,8 @@ const int seq_mode=2; //0 dont increase /1 increase //increase randomly,abo const uint64_t epoll_timer_fd_sn=1; const uint64_t epoll_raw_recv_fd_sn=2; -uint64_t epoll_udp_fd_sn=256; //udp_fd_sn =256,512,768......the lower 8 bit is not used,to avoid confliction +const uint64_t epoll_udp_fd_sn_begin=256; +uint64_t epoll_udp_fd_sn=epoll_udp_fd_sn_begin; //udp_fd_sn =256,512,768......the lower 8 bit is not used,to avoid confliction const int server_nothing=0; @@ -164,7 +165,6 @@ uint32_t get_true_random_number() read(fd,&ret,sizeof(ret)); return htonl(ret); } - struct anti_replay_t { uint64_t max_packet_received; @@ -388,8 +388,26 @@ long long get_current_time() } const int conv_timeout=60000; //60 second const int conv_clear_ratio=10; +void server_clear(uint64_t u64) +{ + int fd=int((u64<<32u)>>32u); + epoll_event ev; -struct conv_manager + ev.events = EPOLLIN; + ev.data.u64 = u64; + + int ret = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev); + if (ret!=0) + { + printf("fd:%d epoll delete failed!!!!\n",fd); + } + ret= close(fd); + if (ret!=0) + { + printf("close fd %d failed !!!!\n",fd); + } +} +struct conv_manager_t { map u64_to_conv; //conv and u64 are both supposed to be uniq map conv_to_u64; @@ -398,14 +416,40 @@ struct conv_manager map::iterator clear_it; - conv_manager() + void (*clear_function)(uint64_t u64) ; + + + conv_manager_t() { clear_it=conv_last_active_time.begin(); + clear_function=0; + } + + void set_clear_function(void (*a)(uint64_t u64)) + { + clear_function=a; + } + void clear() + { + if(clear_function!=0) + { + map::iterator it; + for(it=conv_last_active_time.begin();it!=conv_last_active_time.end();it++) + { + clear_function(it->second); + } + } + u64_to_conv.clear(); + conv_to_u64.clear(); + conv_last_active_time.clear(); + + clear_it=conv_last_active_time.begin(); + } uint32_t get_new_conv() { uint32_t conv=get_true_random_number(); - while(conv_to_u64.find(conv)!=conv_to_u64.end()) + while(conv!=0&&conv_to_u64.find(conv)!=conv_to_u64.end()) { conv=get_true_random_number(); } @@ -419,11 +463,11 @@ struct conv_manager { return u64_to_conv.find(u64)!=u64_to_conv.end(); } - int find_conv_by_u64(uint64_t u64) + uint32_t find_conv_by_u64(uint64_t u64) { return u64_to_conv[u64]; } - int find_u64_by_conv(uint32_t conv) + uint64_t find_u64_by_conv(uint32_t conv) { return conv_to_u64[conv]; } @@ -441,12 +485,16 @@ struct conv_manager int erase_conv(uint32_t conv) { uint64_t u64=conv_to_u64[conv]; + if(clear_function!=0) + { + clear_function(u64); + } conv_to_u64.erase(conv); u64_to_conv.erase(u64); conv_last_active_time.erase(conv); return 0; } - int clean_inactive() + int clean_inactive( ) { map::iterator old_it; map::iterator it; @@ -468,9 +516,11 @@ struct conv_manager if( current_time -it->second >conv_timeout ) { + printf("inactive conv %u cleared !!!!!!!!!!!!!!!!!!!!!!!!!\n",it->first); old_it=it; it++; erase_conv(old_it->first); + } else { @@ -480,7 +530,7 @@ struct conv_manager } return 0; } -}; +}conv_manager; void init_filter(int port) { code[8].k=code[10].k=port; @@ -925,6 +975,7 @@ int client_bind_to_a_new_port() } int fake_tcp_keep_connection_client() //for client { + conv_manager.clean_inactive(); if(debug_mode)printf("timer!\n"); //fflush(stdout); begin: @@ -1036,6 +1087,7 @@ int fake_tcp_keep_connection_client() //for client int fake_tcp_keep_connection_server() { + conv_manager.clean_inactive(); //begin: if(debug_mode) printf("timer!\n"); if(server_current_state==server_nothing) @@ -1131,7 +1183,6 @@ int set_timer(int epollfd,int &timer_fd) int client_raw_recv(iphdr *iph,tcphdr *tcph,char * data,int data_len) { - if(client_current_state==client_syn_sent ) { if (!(tcph->syn==1&&tcph->ack==1&&data_len==0)) return 0; @@ -1236,7 +1287,7 @@ int client_raw_recv(iphdr *iph,tcphdr *tcph,char * data,int data_len) return 0; } - if(data_len=hb_length&&data[0]=='h') { if(debug_mode)printf("heart beat received\n"); last_hb_recv_time=get_current_time(); @@ -1260,17 +1311,40 @@ int client_raw_recv(iphdr *iph,tcphdr *tcph,char * data,int data_len) return 0; } + last_hb_recv_time=get_current_time(); + uint32_t tmp_conv_id= ntohl(* ((uint32_t *)&data[1+sizeof(my_id)*2])); + /* if(tmp_conv_id!=conv_id) { printf("conv id mismatch%x %x,ignore\n",tmp_oppsite_session_id,my_id); return 0; + }*/ + + if(!conv_manager.is_conv_used(tmp_conv_id)) + { + printf("unknow conv %d,ignore\n",tmp_conv_id); + return 0; } - last_hb_recv_time=get_current_time(); - int ret=sendto(udp_fd,data+1+sizeof(my_id)*3,data_len -(1+sizeof(my_id)*3),0,(struct sockaddr *)&udp_old_addr_in,sizeof(udp_old_addr_in)); + conv_manager.update_active_time(tmp_conv_id); + + uint64_t u64=conv_manager.find_u64_by_conv(tmp_conv_id); + + sockaddr_in tmp_sockaddr; + memset(&tmp_sockaddr,0,sizeof(tmp_sockaddr)); + + tmp_sockaddr.sin_family = AF_INET; + tmp_sockaddr.sin_addr.s_addr=(u64>>32u); + + tmp_sockaddr.sin_port= htons(uint16_t((u64<<32u)>>32u)); + + + int ret=sendto(udp_fd,data+1+sizeof(my_id)*3,data_len -(1+sizeof(my_id)*3),0,(struct sockaddr *)&tmp_sockaddr,sizeof(tmp_sockaddr)); + if(ret<0)perror("ret<0"); - printf("%d byte sent\n",ret); + printf("%s :%d\n",inet_ntoa(tmp_sockaddr.sin_addr),ntohs(tmp_sockaddr.sin_port)); + printf("%d byte sent!!!!!!!!!!!!!!!!!!\n",ret); } return 0; } @@ -1340,6 +1414,15 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len) uint32_t tmp_session_id= ntohl(* ((uint32_t *)&data[1+sizeof(my_id)])); + uint32_t tmp_oppsite_const_id=ntohl(* ((uint32_t *)&data[1+sizeof(my_id)*2])); + + if(oppsite_const_id!=0&&tmp_oppsite_const_id!=oppsite_const_id) + { + conv_manager.clear(); + } + oppsite_const_id=tmp_oppsite_const_id; + + printf("received hb %x %x\n",oppsite_id,tmp_session_id); if(tmp_session_id!=my_id) @@ -1357,7 +1440,7 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len) last_state_time=get_current_time(); last_hb_recv_time=get_current_time(); - first_data_packet=1; + //first_data_packet=1; printf("changed state to server_ready\n"); @@ -1376,6 +1459,7 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len) uint32_t tmp= ntohl(* ((uint32_t *)&data[1+sizeof(uint32_t)])); if(debug_mode)printf("received hb <%x,%x>\n",oppsite_id,tmp); last_hb_recv_time=get_current_time(); + return 0; } else if(data[0]=='d'&&data_len>=sizeof(my_id)*3+1) { @@ -1395,8 +1479,72 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len) printf("oppsite id mismatch,ignore\n"); return 0; } + last_hb_recv_time=get_current_time(); + + printf("<<<>>>\n",tmp_conv_id); + if(!conv_manager.is_conv_used(tmp_conv_id)) + { + struct sockaddr_in remote_addr_in; + + socklen_t slen = sizeof(sockaddr_in); + memset(&remote_addr_in, 0, sizeof(remote_addr_in)); + remote_addr_in.sin_family = AF_INET; + remote_addr_in.sin_port = htons(remote_port); + remote_addr_in.sin_addr.s_addr = inet_addr(remote_address); + + int new_udp_fd=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if(new_udp_fd<0) + { + printf("create udp_fd error"); + return -1; + } + set_udp_buf_size(new_udp_fd); + + printf("created new udp_fd %d\n",new_udp_fd); + int ret = connect(new_udp_fd, (struct sockaddr *) &remote_addr_in, slen); + if(ret!=0) + { + printf("udp fd connect fail\n"); + close(new_udp_fd); + return -1; + } + struct epoll_event ev; + + uint64_t u64=((u_int64_t(tmp_conv_id))<<32u)+(uint32_t)new_udp_fd; + printf("u64: %ld\n",u64); + ev.events = EPOLLIN; + + ev.data.u64 = u64; + + ret = epoll_ctl(epollfd, EPOLL_CTL_ADD, new_udp_fd, &ev); + + if (ret!= 0) { + printf("add udp_fd error\n"); + close(new_udp_fd); + return -1; + } + + conv_manager.insert_conv(tmp_conv_id,u64); + + } + + uint64_t u64=conv_manager.find_u64_by_conv(tmp_conv_id); + + conv_manager.update_active_time(tmp_conv_id); + + int fd=int((u64<<32u)>>32u); + + printf("received a data from fake tcp,len:%d\n",data_len); + int ret=send(fd,data+1+sizeof(my_id)*3,data_len -(1+sizeof(my_id)*3),0); + + printf("%d byte sent ,fd :%d\n ",ret,fd); + if(ret<0) + { + perror("what happened????"); + } + /* if(first_data_packet==0&& tmp_conv_id!=conv_id) //magic to find out which one is actually larger //consider 0xffffffff+1= 0x0 ,in this case 0x0 is "actually" larger { @@ -1469,11 +1617,9 @@ int server_raw_recv(iphdr * iph,tcphdr *tcph,char * data,int data_len) { conv_id=tmp_conv_id; } + */ + - printf("received a data from fake tcp,len:%d\n",data_len); - last_hb_recv_time=get_current_time(); - int ret=send(udp_fd,data+1+sizeof(my_id)*3,data_len -(1+sizeof(my_id)*3),0); - printf("%d byte sent\n",ret); } } } @@ -1742,6 +1888,7 @@ int client() printf("Received packet from %s:%d,len: %d\n", inet_ntoa(udp_new_addr_in.sin_addr), ntohs(udp_new_addr_in.sin_port),recv_len); + /* if(udp_old_addr_in.sin_addr.s_addr==0&&udp_old_addr_in.sin_port==0) { memcpy(&udp_old_addr_in,&udp_new_addr_in,sizeof(udp_new_addr_in)); @@ -1760,14 +1907,29 @@ int client() memcpy(&udp_old_addr_in,&udp_new_addr_in,sizeof(udp_new_addr_in)); conv_id++; } + }*/ + + //last_udp_recv_time=get_current_time(); + uint64_t u64=((uint64_t(udp_new_addr_in.sin_addr.s_addr))<<32u)+ntohs(udp_new_addr_in.sin_port); + uint32_t conv; + + if(!conv_manager.is_u64_used(u64)) + { + printf("new connection!!!!!!!!!!!\n"); + conv=conv_manager.get_new_conv(); + conv_manager.insert_conv(conv,u64); + } + else + { + conv=conv_manager.find_conv_by_u64(u64); } - last_udp_recv_time=get_current_time(); - if(client_current_state=client_ready) + conv_manager.update_active_time(conv); + + if(client_current_state==client_ready) { - send_data(g_packet_info,buf,recv_len,my_id,oppsite_id,conv_id); + send_data(g_packet_info,buf,recv_len,my_id,oppsite_id,conv); } - ////send_data_raw(buf,recv_len); } } } @@ -1776,6 +1938,7 @@ int client() int server() { + conv_manager.set_clear_function(server_clear); int i, j, k;int ret; g_packet_info.src_ip=inet_addr(local_address); @@ -1836,9 +1999,20 @@ int server() const int MTU=1440; for (n = 0; n < nfds; ++n) { - if (events[n].data.u64 == epoll_udp_fd_sn) + if ((events[n].data.u64 >>32u) > 0u) { - int recv_len=recv(udp_fd,buf,buf_len,0); + uint32_t conv_id=events[n].data.u64>>32u; + + if(!conv_manager.is_u64_used(events[n].data.u64)) + { + printf("conv no longer exists"); + continue; + } + + int fd=int((events[n].data.u64<<32u)>>32u); + + int recv_len=recv(fd,buf,buf_len,0); + printf("received a packet from udp_fd,len:%d\n",recv_len); if(recv_len<0) @@ -1848,7 +2022,14 @@ int server() continue; //return 0; } - send_data(g_packet_info,buf,recv_len,my_id,oppsite_id,conv_id); + + conv_manager.update_active_time(conv_id); + + if(server_current_state==server_ready) + { + send_data(g_packet_info,buf,recv_len,my_id,oppsite_id,conv_id); + printf("send !!!!!!!!!!!!!!!!!!"); + } } //printf("%d %d %d %d\n",timer_fd,raw_recv_fd,raw_send_fd,n); if (events[n].data.u64 == epoll_timer_fd_sn) @@ -1877,6 +2058,7 @@ int server() } return 0; } + int main(int argc, char *argv[]) { const_id=get_true_random_number(); diff --git a/makefile b/makefile index 8753679..44d64ed 100755 --- a/makefile +++ b/makefile @@ -3,5 +3,5 @@ all: killall raw||true sleep 1 g++ main.cpp -o raw -static -lrt -ggdb -I. aes.c md5.c encrypt.cpp -O3 - ${ccmips} main.cpp -o rawmips -static -lrt -ggdb -I. aes.c md5.c encrypt.cpp -O3 +# ${ccmips} main.cpp -o rawmips -static -lrt -ggdb -I. aes.c md5.c encrypt.cpp -O3