C freebsd kqueue strange problem with multithreading

Hi all,

I am developing a server program with kqueue. The program is multithreaded and each thread has its own kqueue created. Each thread has its listening socket listening to the same port (through SO_REUSEPORT). When it runs there is no error at bind() and listen(), or registering with kqueue. However the accept callback is not triggered, no connection can be made, or only triggered on one of the listening ports. At development, it runs on vmware freebsd 11.2 guest. When i upload it to a bhyve freebsd vm (11.2 too) to test, all ports are responding and connections are all working.

I use kqueue in multithreaded code due to the need of shared data among workers. Freebsd kqueue is not quite threaded friendly. If one single kqueue and one listening port are used for all the threads, I got strange problems like unexpected event fired right after an event on the same socket even EV_DISPATCH + EV_CLEAR is used. Try with kqeueu per thread to fix this but have the above problem.

Thanks
 
It seems that your problem is that having multiple accept() calls on a single port doesn't always work. I have no idea whether this is supported or even supposed to work, never tried it. The part I don't understand is: why are you using kqueue? Other than delivering messages on the sockets, why does the kernel have to be involved at all?

But let's think of it from a more general viewpoint. If your program is multi-threaded, it has to have locking or a similar mechanism of synchronization. One such mechanism is thread-safe queues (in fact, for simple programs it is my favorite mechanism, because it is so easy). You say that you need to share data among workers. The problem with sharing data in a parallel program is that you need to make sure that (a) only one thread changes the shared data at a time, to make sure it is always in a consistent state, and multiple changes are isolated from each other and don't create a mix, and (b) threads that read the shared data always get the most recent copy, so changes are durable. Usually, that is handled by protecting the shared data structure with a lock; and if threads need to wait for the shared data structure to be updated, or data to be delivered, that will need condition variables.

So here is a proposal for simplifying things: Have only one thread call accept from the socket. That master thread gets all the incoming requests, and then spreads it to worker threads. The communication with worker threads can be done with user-space thread-safe queues, without involving kqueue. Depending on which programming language you are using, there are some convenient built-in queues with automatic locking and waiting.

One other trick, which may or may not work for you, but that I've used for simplifying things: make it so the shared data structures don't need locking at all. What I've done is: the master thread creates data structures with parameters and results for the worker threads completely, before creating those threads; it then hands each worker a private data structure, which doesn't need locking, since it is not shared. Then the worker does its thing, reading parameters and depositing results. The master then only looks at the results after the worker has finished, again without locking. The worker may be listening to a queue that the master fills occasionally, and at some point the master puts a special "finish now" message into the queue.
 
Note, it is a while ago when I programmed a threaded server whose threads are listening on IP sockets for incoming requests on a certain port and responding the computation results to the respective clients. However, in my thread model, new worker threads are spawned in an endless loop when a valid connection socket has been returned by accept().
So here, accept() is not part of the new thread, but it is the gate keeper before spawning new ones. Once a new worker thread has been spawned and detached, accept() is almost immediately ready for receiving new connection requests. For a given IP port only one accept() is listening at any time. I never thought about putting accept() into the worker threads, however I can imagine this won't work without some fiddling - SO_REUSEPORT to begin with (not necessary here).

Why do you do this? Is this for performance reasons? My worker threads terminate when a session with a certain client finished. Do your threads respond to several clients in a row? I tend to not to like this idea too much. A new worker thread for a new client means a clean start. Reusing the same worker thread for more clients in a row, means you opened a hotel thread, and you need to cleanup the "bed linen" and the "bath" once the old client left and before letting the new one coming in.
 
Honestly, I don't know why the OP wanted to have accept() in multiple threads. While I don't see it as being outright illegal or crazy, it seems unusual enough to raise suspicions that things might not work.

In my case, I've had two reasons for reusing worker threads, or "hotel threads" as you so nicely call it. The first reason is performance: Sometimes starting a worker thread takes a lot of work, because it has to set up a lot of private state. The other reason may be that a particular worker thread needs to keep state from request to request. Let me give you a concrete example: My system is a (light-weight home-brew) industrial control system. There is a particular thread that processes all the water pressure measurements, and whose main job it is to turn a pump on and off to keep the pressure within a reasonable range. That thread keeps state about recent measurements, for example to implement hysteresis: It turns the pump on when the pressure drops below 40psi, and turns it off when the pressure exceeds 60psi. To do that, the thread needs some state, for example knowing whether the pump is currently on or off, or perhaps remembering what the previous measurement was. If every incoming measurement were given to a memoryless clean thread, this would simply stop working. Furthermore, this thread may want to keep a lot of state, for example to implement alarms: If the pressure exceeds 80psi for 3 or more measurements (even though the pump is already off), it needs to raise an alarm that the pipe has frozen, and if the pressure drops below 5psi for 60 seconds or more (even though the pump is now on), it needs to raise an alarm that either the pump is broken, or we have a catastrophic leak. All that requires a lot of state to be kept, and a dedicated thread for dealing with related messages is an easy way to implement that.

Clearly, your argument is very sensible when messages are idempotent and isolated from each other.
 
Yes of course, I got also a few permanently running workers for almost exactly the same purpose as you lined out, namely acquiring, processing and acting upon measurement data. These threads are autonomous and do not maintain their own sockets, and I agree to 100 %, that it does not make sense to stop/start these kind of threads frequently. I was talking about the threads which receive data from and send responses to connected clients. Once a connection has finished these receive/response threads terminate as well.

May I ask which kind of data aquisition system are you using? I use DAQ boards from National Instruments in a FreeBSD based measurement controller. My software works with the PCI(e) 6251 and the PCIe 6351, perhaps with other M- and X-series boards as well. I need 16bit DACs and ADCs, and I need the high speed update rate 2.86 MS/s and sample rate 1 MS/s. However, these boards are a major cost factor of my measurement unit, and I am looking for less expensive alternatives.
 
(This is off topic). Mine is very amateurish. It's a mix of: (a) Weeder IO boards (they come from weedtech.com), which are connected via RS-232 to a FreeBSD server; (b) Dallas 1-wire temperature sensors usually connected to an Embedded Data Systems HA7Net or to a Raspberry Pi; (3) Omega engineering industrial controllers with USB ports, which emulates ModBus pretending to be a USB serial port, connected to a Raspberry Pi; (4) A Davis Instruments weather station via Ethernet (although that data is not used in production).
 
Why do you do this? Is this for performance reasons? My worker threads terminate when a session with a certain client finished. Do your threads respond to several clients in a row? I tend to not to like this idea too much. A new worker thread for a new client means a clean start. Reusing the same worker thread for more clients in a row, means you opened a hotel thread, and you need to cleanup the "bed linen" and the "bath" once the old client left and before letting the new one coming in.
Yes it uses the event driven model. Each worker thread is handling thousands of connections at the same time. I.e. It is doing the same thing like Nginx.
 
As I read setsockopt(2) it seems you might want SO_REUSEPORT_LB, but you haven’t provided enough information of what type of connections / how each thread is operating here to be sure.

Thanks for the info. Seems SO_REUSEPORT is not sufficient to enable load balancing like on Linux. However the option is not available on 11.2. Yet to build the program to test on 12.0.
 
Honestly, I don't know why the OP wanted to have accept() in multiple threads. While I don't see it as being outright illegal or crazy, it seems unusual enough to raise suspicions that things might not work.

In my case, I've had two reasons for reusing worker threads, or "hotel threads" as you so nicely call it. The first reason is performance: Sometimes starting a worker thread takes a lot of work, because it has to set up a lot of private state. The other reason may be that a particular worker thread needs to keep state from request to request. Let me give you a concrete example: My system is a (light-weight home-brew) industrial control system. There is a particular thread that processes all the water pressure measurements, and whose main job it is to turn a pump on and off to keep the pressure within a reasonable range. That thread keeps state about recent measurements, for example to implement hysteresis: It turns the pump on when the pressure drops below 40psi, and turns it off when the pressure exceeds 60psi. To do that, the thread needs some state, for example knowing whether the pump is currently on or off, or perhaps remembering what the previous measurement was. If every incoming measurement were given to a memoryless clean thread, this would simply stop working. Furthermore, this thread may want to keep a lot of state, for example to implement alarms: If the pressure exceeds 80psi for 3 or more measurements (even though the pump is already off), it needs to raise an alarm that the pipe has frozen, and if the pressure drops below 5psi for 60 seconds or more (even though the pump is now on), it needs to raise an alarm that either the pump is broken, or we have a catastrophic leak. All that requires a lot of state to be kept, and a dedicated thread for dealing with related messages is an easy way to implement that.

Clearly, your argument is very sensible when messages are idempotent and isolated from each other.

It is addressing the 10K socket problem so kqueue/epoll is needed. You are right it might be advantageous to have one dedicated thread doing accept calls (the cashier line paradigm, esp. when some threads doing things more complicated while others not). It however involves some overhead in the way it passes the socket to workers. This is yet to be tested.
 
Yes it uses the event driven model. Each worker thread is handling thousands of connections at the same time. I.e. It is doing the same thing like Nginx.
Did you see the following in kqueue(2), 4th paragraph of the Description?
Code:
...
     Multiple events which trigger the filter do not result in multiple
     kevents being placed on the kqueue; instead, the filter will aggregate
     the events into a single struct kevent.  Calling close() on a file
     descriptor will remove any kevents that reference the descriptor.
...
This could explain why only one thread has been triggered in your tests in the VM. Actually there may be many events packed into one struct. So in case multiple events arrive in one struct, the receiving event loop shall distribute the work accordingly. Perhaps you do this already?

In regards to the 10,000 socket problem (nowadays we want to add a few zeros, in order to come into the problem zone), the event driven thread model would be significantly more beneficial only in the case of a high number of concurrent connection requests and short lasting connections, while in the case of a high number of concurrent long lasting established connections the worker thread model may be beneficial, but at least not inferior. My server code does all measures to keep established connections for a given client alive, exactly for this reason.
 
Did you see the following in kqueue(2), 4th paragraph of the Description?
Code:
...
     Multiple events which trigger the filter do not result in multiple
     kevents being placed on the kqueue; instead, the filter will aggregate
     the events into a single struct kevent.  Calling close() on a file
     descriptor will remove any kevents that reference the descriptor.
...
This could explain why only one thread has been triggered in your tests in the VM. Actually there may be many events packed into one struct. So in case multiple events arrive in one struct, the receiving event loop shall distribute the work accordingly. Perhaps you do this already?

In regards to the 10,000 socket problem (nowadays we want to add a few zeros, in order to come into the problem zone), the event driven thread model would be significantly more beneficial only in the case of a high number of concurrent connection requests and short lasting connections, while in the case of a high number of concurrent long lasting established connections the worker thread model may be beneficial, but at least not inferior. My server code does all measures to keep established connections for a given client alive, exactly for this reason.

With the same socket, there are multiple results in one struct kevent only when subscribing for multiple filters (for socket just read or write). The 1st issue mentioned above is, on vmware workstation kqueue is not triggering events at all on any of the listening ports, while on bhyve it works. I think it has to do with the vm platform. Bhyve is kernel mode hypervisor while vmware workstation is not.

Thread per connection doesn't work well for 'concurrent long lasting established connections'.
 
Back
Top