[C++] No notifications from kqueue (async connect )

Hi all,

A few days ago I faced a big problem and could not solve it since then. I'll need your help and advice on this.

I'm working with asynchronous sockets and using kqueue as notification mechanism. I have no problems with asynchronously accepting new connections (on the server side) and can handle thousands of incoming connections in a fraction of a second.

However, when I try to use asynchronous connect, I'm not receiving a kqueue notification that the connection has been established. However, when my server disconnects this new client, I receive two notifications from kqueue (I assume that the first one is for connect, and the second one is for disconnect). But why is the first one delayed until the disconnect?

sockstat and other tools show that the connection was successful despite me not receiving kqueue notifications on the client side.

I cannot publish all the source code but happy to publish relevant parts of it if it is needed to solve the problem.

BTW I check every system call and log every error, but no errors are returned by any of the calls.

How I register my connecting socket with kqueue:
Code:
    const short filter = EVFILT_READ | EVFILT_WRITE | EVFILT_AIO;
    struct kevent kev;
    EV_SET(&kev, socket_handle, filter, EV_ADD, 0, 0, context_ptr);

    const int rc = ::kevent(kqueue_, &kev, 1, nullptr, 0, nullptr);
    if (rc == kCallFailed)
    {
      // log error here
      return false;
    }

    return true;

How I connect the socket:
Code:
  enum class AsyncResult { kSuccessCompleted, kSuccessPending, kFailure };

  AsyncResult Connect()
  {
    const int rc = ::connect(socket_, reinterpret_cast<sockaddr *>(&end_points_.remote_address), sizeof(end_points_.remote_address));
    if (rc == SOCKET_ERROR)
    {
      const int error = errno;
      if (error != EINPROGRESS)
      {
        if (error == EISCONN)
          return AsyncResult::kSuccessCompleted;

        // log error here
        return AsyncResult::kFailure;
      }

      return AsyncResult::kSuccessPending;
    }

    return AsyncResult::kSuccessCompleted;
  }

The error it returns is EINPROGRESS (connection is pending).

How I check for kqueue notifications:
Code:
virtual bool GetEvent(IIoContext*& context_ptr, bool& is_timeout,
    size_t &count) override
  {
    struct kevent evt;
    (void)::memset(&evt, 0, sizeof(evt));

    int collected_entry_count = 0;
    do
    {
      collected_entry_count = ::kevent(kqueue_, nullptr, 0, &evt, 1, &ts_);
    }
    while ((collected_entry_count == 1) && (evt.udata == nullptr));

    if (collected_entry_count != 1)
    {
      is_timeout = (collected_entry_count == 0);
      if (!is_timeout)
      {
        // log error here
      }
      return false;
    }

    context_ptr = reinterpret_cast<IIoContext *>(evt.udata);
    return true;
  }

Timeout (ts_) is 100 ms.

And I call GetEvent() in the loop:
Code:
    IIoContext* context_ptr = nullptr;
    bool        is_timeout  = false;
    size_t      count       = 0;;

    while (!stopped_.load(std::memory_order_acquire))
    {
      const bool collected_ok = notifier_->GetEvent(context_ptr, is_timeout, count);
      if (!collected_ok || (context_ptr == nullptr))
      {
        if (!is_timeout)
        {
          // logging error here
        }
        continue;
      }

      context_ptr->ProcessEvent(count);
    }

Obviously, the socket is non-blocking.

Any help and advice is much appreciated. I'm hitting the wall for last few days. Really need some fresh eyes on the problem.

Thanks in advance!

P.S. FreeBSD 9.1 64-bit RELEASE p6. Firewall disabled. I am running it in VirtualBox. I am using GCC 4.8.2 as a compiler (but it should not matter...).
 
Does kqueue support notifications from asynchronous connect?

Has anyone had any practical experience in using kqueue to get notifications from asynchronous connect? Is it possible? Should it work? I've googled the whole Internet and didn't find any example of using kqueue for connect.

We have no problems with using asynchronous connect and epoll on Linux but no luck on FreeBSD :(.

P.S. The rest of asynchronous operations (accept, read and write) work fine with kqueue. But not connect.
 
I solved the problem. The error was in the below line:
Code:
const short filter = [highlight]EVFILT_READ | EVFILT_WRITE | EVFILT_AIO[/highlight];

You have to register an event for EVFILT_READ, for EVFILT_WRITE and for EVFILT_AIO (which I stopped using anyway) separately:
Code:
struct kevent kev;
EV_SET(&kev, socket_handle, [color="Green"]EVFILT_READ[/color], EV_ADD, 0, 0, context_ptr);

int rc = ::kevent(kqueue_, &kev, 1, nullptr, 0, nullptr);
if (rc == kCallFailed)
{
  // log error here
  return false;
}

EV_SET(&kev, socket_handle, [color="Green"]EVFILT_WRITE[/color], EV_ADD, 0, 0, context_ptr);

rc = ::kevent(kqueue_, &kev, 1, nullptr, 0, nullptr);
if (rc == kCallFailed)
{
  // log error here
  return false;
}

...

If you do not register events separately for each filter, then you will receive only one event type (it was a "read" event in my case, after I removed EVFILT_AIO).
 
I also stopped using aio_* functions and switched to simple non-blocking sockets and kqueue notifications. This is better from performance point of view, IMHO.
 
When polling for connect(2) with kqueue(2), just poll for EVFILT_WRITE.

I think this is a good exercise and all, but I really recommend libevent. You're probably just going to re-implement an event loop anyway. You can even have it use kqueue(2). And it even supports aio(4) and deals with signals correctly.
 
Thank you.

The reason for subscribing for EVFILT_READ events was that I wanted to receive read notifications as well.

Re libevent, we looked at it but decided to implement this by ourselves.
 
vand777 said:
Thank you.

The reason for subscribing for EVFILT_READ events was that I wanted to receive read notifications as well.

Re libevent, we looked at it but decided to implement this by ourselves.

Once connect(2) completes, then you subscribe to EVFILT_READ. A socket will almost always be writable after connect(2) completes and so you will flood your event loop with useless EVFILT_WRITE notifications. In contrast, if a connect(2) completes, it may not be readable (since the server may not have sent anything after accept(2)). While polling for both EVFILT_READ and EVFILT_WRITE for a completed connect(2) isn't necessarily harmful, it's more confusing to handle if your program follows an event-driven design. In other words, your read routine would have to not only handle reading data, but also the event when connect(2) completes. You'll also have a write routine that handles the same event when connect(2) completes. That's both confusing and redundant.

Hence, you should first poll only for EVFILT_WRITE to determine when connect(2) completes (you don't need EVFILT_READ but it doesn't necessarily hurt). Then afterward, poll only for EVFILT_READ (EVFILT_WRITE will flood your event loop with write notifications, this is really bad). This is independent of your program's design.
 
nslay said:
Hence, you should first poll only for EVFILT_WRITE to determine when connect(2) completes (you don't need EVFILT_READ but it doesn't necessarily hurt). Then afterward, poll only for EVFILT_READ (EVFILT_WRITE will flood your event loop with write notifications, this is really bad). This is independent of your program's design.

Let's assume that I call send too many times or with too much data, and I fill in the send buffer. In the current design once I receive EAGAIN because of the full send buffer, I just wait for a kqueue notification which tells me that I now have some available space in the send buffer and then I call send again, with the rest of the data.

Without kqueue notifications how do I know that some space in the send buffer has become available?
 
vand777 said:
Let's assume that I call send too many times or with too much data, and I fill in the send buffer. In the current design once I receive EAGAIN because of the full send buffer, I just wait for a kqueue notification which tells me that I now have some available space in the send buffer and then I call send again, with the rest of the data.

Without kqueue notifications how do I know that some space in the send buffer has become available?

Yes, when you're intentionally sending a lot of data, then you intentionally poll for EVFILT_WRITE. But you normally wouldn't want to poll for EVFILT_WRITE. Try unconditionally polling for EVFILT_WRITE and report your CPU usage.
 
When I'm unconditionally polling for EVFILT_WRITE my load testing prototype spends around 1% of the whole time in my code, the rest is spent in kevent, recv, send. These stats are based on the profiling results from callgrind and gprof. top shows the same stats: around 1% in user mode.

I decided to leave the current implementation as it is in the 1st first version because it does not have much overhead. But thanks, @nslay, for a very good idea how I can optimise it in the next version. In the 2nd second version I'll subscribe but disable for EVFILT_WRITE notifications from the beginning. And will enable them only for the periods when I run out of send buffers. But I'll keep the 1st first version simple for now.
 
Last edited by a moderator:
vand777 said:
When I'm unconditionally polling for EVFILT_WRITE my load testing prototype spends around 1% of the whole time in my code, the rest is spent in kevent, recv, send. These stats are based on the profiling results from callgrind and gprof. top shows the same stats: around 1% in user mode.

I decided to leave the current implementation as it is in the 1st first version because it does not have much overhead. But thanks, @nslay, for a very good idea how I can optimise it in the next version. In the 2nd second version I'll subscribe but disable for EVFILT_WRITE notifications from the beginning. And will enable them only for the periods when I run out of send buffers. But I'll keep the 1st first version simple for now.

If it's a small amount of data, you could always loop until send(2) completes.
Code:
uint8_t *pBuf; // Assumed initialized
size_t bufSize; // Ditto
ssize_t sendSize;
while (bufSize > 0) {
  sendSize = send(fd, pBuf, bufSize, 0);
  if (sendSize == -1) {
    if (errno != EAGAIN) {
      // Bad real bad ...
    }
    continue;
  } 

  bufSize -= sendSize;
  pBuf += sendSize;
}

Of course, if you're trying to send a large amount of data while servicing other events, you'll need to set up a callback that tries to send more of the data each time. Something like:

Code:
struct SendBuffer {
  int fd;
  uint8_t *pBuf;
  size_t offset
  size_t bufSize;
};

// Call for each EVFILT_WRITE event ... once finished, stop subscribing to EVFILT_WRITE
void SendMore(struct SendBuffer *stBuf) {
  ssize_t sendSize = send(stBuf->fd, stBuf->pBuf + stBuf->offset, stBuf->bufSize-stBuf->offset, 0);
  if (sendSize == -1) {
    if (errno != EGAIN) {
      // Bad real bad
    }

    return;
  }

  stBuf->offset += sendSize;
}

Sorry it's really rough.
 
Last edited by a moderator:
Thanks. But looping will not be an option. Not efficient.

We have to support 10K connected clients and the traffic might be quite heavy... The product is a real-time financial application where even milliseconds of latency can result in big losses.
 
Back
Top