#include "fakeip.h" extern int errno; const int specialsize = 164; static int msgcount = 0; // from fakeip.h // struct ip_packet // { byte source_addr[6]; // byte dest_addr[6]; // short int protocol; // short int length; // byte content[0]; }; struct extras { int msgid; short int more; short int frag; }; const int overhead = sizeof(ip_packet) + sizeof(extras); ip_packet * create_ip_packet(int content_length) { ip_packet * p = (ip_packet *)malloc(content_length + overhead); p->length = content_length; return p; } void delete_ip_packet(ip_packet * p) { free(p); } string ip_address_to_string(byte * addr) { char str[50]; sprintf(str, "%d.%d.%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3], addr[4], addr[5]); return str; } // from fakeip.h // struct ip_packet_link // { ip_packet * item; // ip_packet_link * next; // ip_packet_link(ip_packet * i); }; ip_packet_link::ip_packet_link(ip_packet * i) { item = i; next = NULL; } // from fakeip.h // class ip_packet_queue // { protected: // semaphore s; // pthread_cond_t rready; // ip_packet_link * first, * last; // int number, maxnumber; // // public: // ip_packet_queue(int maxlen = 100); // ~ip_packet_queue(); // bool is_empty(); // bool is_full(); // int length(); // bool add(ip_packet * p); // ip_packet * get(); // ip_packet * get_wait(int msec = -1); }; ip_packet_queue::ip_packet_queue(int maxlen) { pthread_mutex_init(& s, NULL); pthread_cond_init(& rready, NULL); first = NULL; last = NULL; number = 0; maxnumber = maxlen; } ip_packet_queue::~ip_packet_queue() { pthread_mutex_lock(& s); while (first!=NULL) { ip_packet_link * p = first; first = first->next; delete p; } pthread_cond_destroy(& rready); pthread_mutex_destroy(& s); } bool ip_packet_queue::is_empty() { return number == 0; } bool ip_packet_queue::is_full() { return number >= maxnumber; } int ip_packet_queue::length() { return number; } bool ip_packet_queue::add(ip_packet * p) { pthread_mutex_lock(& s); if (number>=maxnumber) { pthread_mutex_unlock(& s); return false; } ip_packet_link * L = new ip_packet_link(p); if (last==NULL) first = L; else last->next = L; last = L; number += 1; pthread_cond_signal(& rready); pthread_mutex_unlock(& s); } ip_packet * ip_packet_queue::get_wait(int msec) { pthread_mutex_lock(& s); while (first==NULL) { if (msec==0) { pthread_mutex_unlock(& s); return NULL; } else if (msec<0) pthread_cond_wait(& rready, & s); else { timeval tv; struct timezone tz; timespec ts; int secs = msec/1000, usecs = msec%1000*1000; gettimeofday(& tv, & tz); secs += tv.tv_sec; usecs += tv.tv_usec; if (usecs>=1000000) { int carry = usecs/1000000; secs += carry; usecs -= carry*1000000; } ts.tv_sec = secs; ts.tv_nsec = usecs*1000; int r = pthread_cond_timedwait(& rready, & s, & ts); if (r<0 && errno!=ETIMEDOUT) { pthread_mutex_unlock(& s); perror("waiting to receive"); return NULL; } if (first==NULL) { pthread_mutex_unlock(& s); return NULL; } } } ip_packet_link * L = first; ip_packet * p = first->item; first = first->next; delete L; if (first==NULL) last = NULL; number -= 1; pthread_mutex_unlock(& s); return p; } ip_packet * ip_packet_queue::get() { return get_wait(0); } // from fakeip.h // class ip_system // { protected: // bool started; // int servesocket, tracing, reliability; // ip_packet_queue * rcvq; // byte address[6]; // pthread_t threadid; // friend void * listening_process(void * a); // void put_on_receive_queue(ip_packet * p); // // public: // ip_system(); // ~ip_system(); // byte * start(); // bool ok(); // byte * get_address(); // void set_trace_level(int n); // void set_reliability(double r); // int receive_queue_length(); // int transmit_queue_length(); // ip_packet * receive(); // ip_packet * wait_to_receive(int msec = -1); // bool transmit(ip_packet * p); // void wait_until_dead(); }; ip_system::ip_system() { started = false; tracing = 0; reliability = 1000000000; rcvq = new ip_packet_queue(100); } ip_system::~ip_system() { delete rcvq; started = false; } void ip_system::set_trace_level(int n) { if (n<0 || n>2) { fprintf(stderr, "\n*** trace level must be 0, 1, or 2 ***\n"); return; } tracing = n; } void ip_system::set_reliability(double r) { if (r<0.0 || r>1.0) { fprintf(stderr, "\n*** reliability must be between 0 and 1 inclusive ***\n"); return; } reliability = (int)(r*1000000000.0); } void * listening_process(void * a) { ip_system * ips = (ip_system *)a; byte * buffer = new byte[70000]; ip_packet * p = NULL; int expectid = 0, expectfrag = 0, fragoffs = 0, currmax = 0; while (true) { struct sockaddr_in incoming_info; unsigned int socklen = sizeof(incoming_info); int r = recvfrom(ips->servesocket, buffer, 69999, 0, (sockaddr *) &incoming_info, &socklen); if (r<0) { perror("receive error"); usleep(50000); continue; } if (r==0) { fprintf(stderr, "receive size zero\n"); usleep(50000); continue; } if (r=ips->reliability) continue; ip_packet * hd = (ip_packet *)buffer; extras * ex = (extras *)(buffer + r - sizeof(extras)); if (ex->frag==0) { if (p!=NULL) delete_ip_packet(p); p = create_ip_packet(hd->length); currmax = hd->length; fragoffs = r - overhead; memcpy(p, buffer, r - sizeof(extras)); expectid = ex->msgid; expectfrag = 1; } else { if (ex->frag!=expectfrag || ex->msgid!=expectid) { if (p!=NULL) { delete_ip_packet(p); p = NULL; expectfrag = 0; continue; } } if (fragoffs+hd->length <= currmax) memcpy(p->content + fragoffs, buffer + sizeof(ip_packet), hd->length); fragoffs += hd->length; expectfrag += 1; } if (ex->more==0) { if (ips->tracing>=1) { printf("[rcvd %s -> %s proto %d len %d", ip_address_to_string(p->source_addr).c_str(), ip_address_to_string(p->dest_addr).c_str(), p->protocol, p->length); if (ips->tracing==2) { printf(": "); for (int i=0; icontent[i]; if (b<' ' || b>'~') printf("[%d]", b); else printf("%c", b); } } printf("]\n"); } ips->put_on_receive_queue(p); p = NULL; expectfrag = 0; } } return NULL; } byte * ip_system::start() { int portnum; char hname[1000]; if (started) return address; int r = gethostname(hname, 999); if (r<0) { perror("gethostname() doesn't work\n"); return NULL; } hostent * record = gethostbyname(hname); if (record==NULL) { herror("gethostbyname failed"); return NULL; } unsigned char * ip4 = (unsigned char *)record->h_addr; servesocket = socket(AF_INET, SOCK_DGRAM, 0); if (servesocket<0) { perror("couldn't create a socket"); return NULL; } sockaddr_in serveinfo; serveinfo.sin_len = sizeof(serveinfo); serveinfo.sin_family = AF_INET; serveinfo.sin_addr.s_addr = htonl(INADDR_ANY); srandom(getuid()); while (true) { portnum = random() % 64500 + 1024; serveinfo.sin_port = htons(portnum); r = bind(servesocket, (sockaddr *) &serveinfo, sizeof(serveinfo)); if (r>=0) break; } for (int i=0; i<4; i+=1) address[i] = ip4[i]; address[4] = portnum & 255; address[5] = portnum >> 8; if (tracing>=1) printf("[local address is %s]\n", ip_address_to_string(address).c_str()); pthread_create(& threadid, NULL, listening_process, this); started = true; return address; } bool ip_system::ok() { return started; } byte * ip_system::get_address() { if (!started) return NULL; return address; } int ip_system::receive_queue_length() { if (!started) return 0; return rcvq->length(); } int ip_system::transmit_queue_length() { return 0; } ip_packet * ip_system::receive() { if (!started) return NULL; return rcvq->get(); } ip_packet * ip_system::wait_to_receive(int msec) { if (!started) return NULL; return rcvq->get_wait(msec); } bool ip_system::transmit(ip_packet * p) { if (!started) { delete_ip_packet(p); return false; } sockaddr_in remote_info; remote_info.sin_len = sizeof(remote_info); remote_info.sin_family = AF_INET; remote_info.sin_addr.s_addr = *(int *)&p->dest_addr[0]; remote_info.sin_port = htons(*(short int *)&p->dest_addr[4]); if (tracing>=1) { printf("[sent %s -> %s proto %d len %d", ip_address_to_string(p->source_addr).c_str(), ip_address_to_string(p->dest_addr).c_str(), p->protocol, p->length); if (tracing==2) { int r = p->length; printf(": "); for (int i=0; icontent[i]; if (b<' ' || b>'~') printf("[%d]", b); else printf("%c", b); } } printf("]\n"); } int rnd = random() % 1000000000; if (rnd>=reliability) { delete_ip_packet(p); return true; } msgcount+=1; int totlen = p->length + overhead; if (totlen<=specialsize) { unsigned char * b = (unsigned char *)p; extras * ex = (extras *)(b + totlen - sizeof(extras)); ex->msgid = msgcount; ex->more = 0; ex->frag = 0; int r = sendto(servesocket, p, totlen, MSG_EOR, (sockaddr *)&remote_info, sizeof(remote_info)); if (r<0) { perror("send error"); delete_ip_packet(p); return false; } } else { unsigned char b[specialsize]; int frag = 0, totsent = 0; totlen = p->length; while (totlen>0) { int sendnow = totlen; int totnow = sendnow + overhead; if (totnow > specialsize) { totnow = specialsize; sendnow = totnow - overhead; } totlen -= sendnow; extras * ex = (extras *)(b + totnow - sizeof(extras)); memcpy(b+sizeof(ip_packet), p->content+totsent, sendnow); ex->msgid = msgcount; ex->more = 1; if (totlen<=0) ex->more = 0; ex->frag = frag; if (frag!=0) p->length = sendnow; frag += 1; memcpy(b, p, sizeof(ip_packet)); totsent += sendnow; int r = sendto(servesocket, b, totnow, MSG_EOR, (sockaddr *)&remote_info, sizeof(remote_info)); if (r<0) { perror("send error"); delete_ip_packet(p); return false; } } } delete_ip_packet(p); return true; } void ip_system::put_on_receive_queue(ip_packet * p) { if (!started) return; rcvq->add(p); } void ip_system::wait_until_dead() { void * x; pthread_join(threadid, & x); }