C kqueue nonblocking socket timeout with siege

I'm trying to create a server that works with multiple connections using kqueue(2). After much research, I developed my code to test the technology. So I started testing with the siege, however, after an average of 300 requests, the server begins to become unstable, sometimes the siege points timeout, sometimes it can get the answer.

I 'm using the g++ to compile.

Here is my code.
Code:
[noparse]
#include <iostream>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <cstring>
#include <cerrno>
#include <cstdlib>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sstream>

using namespace std;

struct client_s {
  int fd;
  int type;
  socklen_t addrlen;
  struct sockaddr addr;
  int bufflen;
};

int main (int argc, char *argv[]) {

  int portN, sockFD, sockOPT, eveCT, optRET, bindRET, listenRET, kernelQUE, nev, connectionFlags, numBT;
  struct sockaddr_in sockADDR;
  struct kevent events[2];
  struct kevent changes[2];

  if (argc < 2) {
    cerr << "Argument required: port" << endl;
    return -1;
  }

  portN = atoi(argv[1]);

  sockFD = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);

  if (sockFD < 0) {
    cerr << "Error while opening socket: " << strerror(errno) << endl;
    return -1;
  } else
    clog << "Socket openend. Nonblock socket defined." << endl;

  sockOPT = 1;

  optRET = setsockopt(sockFD, SOL_SOCKET, SO_REUSEADDR, &sockOPT, sizeof(sockOPT)); // Avoid TIME_WAIT
  if (optRET < 0) {
    cerr << "Error while setting flag SO_REUSEADDR: " << strerror(errno) << endl;
    return -1;
  } else
    clog << "SO_REUSEADDR flag ok." << endl;

  optRET = setsockopt(sockFD, IPPROTO_TCP, TCP_NODELAY, &sockOPT, sizeof(sockOPT)); // Avoid socket buffer
  if (optRET < 0) {
    cerr << "Error while setting flag TCP_NODELAY: " << strerror(errno) << endl;
    return -1;
  } else
    clog << "TCP_NODELAY flag ok." << endl;

  memset(&sockADDR, 0, sizeof(struct sockaddr_in));

  sockADDR.sin_family = AF_INET;
  sockADDR.sin_port = htons(portN);
  sockADDR.sin_addr.s_addr = INADDR_ANY;

  bindRET = bind(sockFD, (struct sockaddr*)&sockADDR, sizeof(sockADDR));

  if (bindRET < 0) {
    cerr << "Error while binding socket: " << strerror(errno) << endl;
    return -1;
  } else
    clog << "Socket binded." << endl;

  listenRET = listen(sockFD, 1000);

  if (listenRET < 0) {
    cerr << "Error while start listening the port:" << strerror(errno) << endl;
    return -1;
  } else
    clog << "Socket is listening the port " << argv[1] << endl;

  kernelQUE = kqueue();

  if (kernelQUE < 0) {
    cerr << "Error on creating kqueue." << endl;
    return -1;
  } else
    clog << "Starting kernel queue." << endl;

  memset(events, 0, sizeof(events));
  memset(changes, 0, sizeof(changes));

  EV_SET(&changes[0], sockFD, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);

  eveCT = 1;
  for (;;) {

    nev = kevent(kernelQUE, changes, eveCT, events, eveCT, NULL);

      if (nev < 0) {
        cerr << "Error resolving event." << endl;
        return -1;
      }

    for (int i = 0;i <= nev; i++) {
      struct client_s * client = static_cast<client_s *>(malloc(sizeof(struct client_s)));

      if (events[i].ident == sockFD && events[i].filter == EVFILT_READ) {
         client->fd = accept4(sockFD, &client->addr, &client->addrlen, SOCK_NONBLOCK);

         if (client->fd < 0) {
           cerr << "Error while accepting new connection." << strerror(errno) << endl;
           free(client);
        } else {
          client->type = 2;
          client->bufflen = 0;
          EV_SET(&changes[1], client->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, client);
          eveCT=2;
       }
    }

    if (events[i].filter == EVFILT_READ && events[i].udata && events[i].data > 0) {
      client = static_cast<client_s *>(events[i].udata);
      if (client->type == 2) {
        client->type++;
        char *buffer = new char[events[i].data];
        client->bufflen = events[i].data;
        numBT = recv(client->fd, buffer, events[i].data, 0);
        delete[] buffer;
        if (numBT == events[i].data) {
          EV_SET(&changes[1], client->fd, EVFILT_READ, EV_DISABLE, 0, 0, 0);
          kevent(kernelQUE, &changes[1], 1, NULL, 0, 0);
          EV_SET(&changes[1], client->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, client);
        } else
          cerr << "Error while reading." << strerror(errno) << endl;
        }
   } else if (events[i].filter == EVFILT_WRITE && events[i].udata) {
       client = static_cast<client_s *>(events[i].udata);
       if (client->type == 3) {
         string query("HTTP/1.1 200 OK\r\n\r\nKernel events, baby!";);
         numBT = send(client->fd, query.c_str(), query.size(), 0);
         if (numBT == query.size()) {
           EV_SET(&changes[1], client->fd, EVFILT_WRITE, EV_DISABLE, 0, 0, 0);
           kevent(kernelQUE, &changes[1], 1, NULL, 0, 0);
           memset(&changes[1], 0, sizeof(struct kevent));
           shutdown(client->fd, SHUT_RDWR);
           close(client->fd);
           free(client);
           eveCT=1;
        } else
           cerr << "Error while writing." << strerror(errno) << endl;
        }
   }
  }
  }

  shutdown(sockFD, SHUT_RDWR);
  close(sockFD);

  return 0;
}
[/noparse]
If possible I would also like to request you the most specific materials in relation to kqueue(2). I see that there is very little material about it on the internet. The most comprehensive was the FreeBSD documentation itself.
 
Back
Top