Kernel Queue: The Complete Guide On The Most Essential Technology For High-Performance I/O

206b1b9915238343992fb092bcec0e9d.png

When talking about high-performance software we probably think of server software (such as nginx) which processes millions requests from thousands clients in parallel. Surely, what makes server software work so fast is high-end CPU running with huge amount of memory and a very fast network link. But even then, the software must utilize these hardware resources at maximum efficiency level, otherwise it will end up wasting the most of the valuable CPU power for unnecessary kernel-user context switching or while waiting for slow I/O operations to complete.

Thankfully, the Operating Systems have a solution to this problem, and it’s called kernel event queue. Server software and OS kernel use this mechanism together to achieve minimum latency and maximum scalability (when serving a very large number of clients in parallel). In this article we are going to talk about FreeBSD, macOS and kqueue, Linux and epoll, Windows and I/O Completion Ports. They all have their similarities and differences which we’re going to discuss here. The goal of this article is for you to understand the whole mechanism behind kernel queues and to understand how to work with each API.

I assume you are already familiar with socket programming and with asynchronous operations, but anyway, in case you think there’s something I should define or explain in more detail — send me a message, I’ll try to update the article.

Although I tried to keep this article clean of any unnecessary sentences (it’s not a novel, after all), I sometimes can’t stop myself from expressing my thoughts about something I like or dislike.

Contents:

What is kernel queue?

Kernel event queue (which I’m gonna call KQ from now on) is a fast signal-delivery mechanism which allows server software to process events from OS in a very effective way. KQ is a bunch of data living in kernel memory and a bunch of kernel code that operates with this data to notify a user-level application about various system events. A user app can’t access KQ data directly (it’s managed by kernel) and so it operates with KQ via the API that OS provides. There are 3 different API we’re going to use here: kqueue, epoll, IOCP. However, this section describes kernel queues in general so the API doesn’t matter for now.

Because the main purpose of KQ is to deliver notifications from network sockets, let me formulate the key idea in a different way:

A user application wants to be notified when any of its sockets is ready to read or write some data, and the OS kernel serves this purpose by maintaining the list of all registered and signalled events.

Use-case N1

What an application achieves through KQ technology is that the app is notified about an I/O signal such as when a network packet is received. For example:

  1. Suppose some user app created a UDP socket and registered it with a KQ along with some app-defined data (i.e. cookie).

  2. At some point the last chunk of a UDP packet is received by network device.

  3. OS now has a complete UDP packet and is ready to notify the user process as soon as it calls the KQ waiting function.

  4. At some time the user app calls KQ waiting function which tells the kernel: Give me something new.

  5. OS responds with Got a READ event from the socket associated with your cookie.

  6. This cookie is the object pointer which the app then uses to handle the signal — read a message from UDP socket, in our case.

Note that neither the opening of a socket, neither reading from a socket after the signal is received isn’t normally the part of KQ mechanism. On UNIX we always use conventional socket functions and we use KQ functions to receive events associated with sockets. However, IOCP on Windows is different. There, I/O functions and their associated events are a part of a single mechanism. Anyway, we’ll deal with IOCP later, so for now just don’t bother with it — let us always think by default that KQ just delivers signals.

Use-case N2

Consider the next example where the user app receives a notification after a TCP socket connects to its peer:

  1. User app creates a TCP socket and registers it with a KQ along with some app-defined data (i.e. cookie).

  2. Now the app begins the procedure to connect to a remote host. Obviously, this operation can’t finish immediately most of the time, because it takes some time to transmit 2 TCP packets needed for TCP connection. Moreover, what if the network link is very busy and the packets get dropped? Needless to say that TCP connection may take a long time to finish. Because of that, OS returns the control back to the app with the result Can't finish the operation immediately. While packets are being sent and received, the app keeps doing some other stuff, relying on OS to do its best to complete the connection procedure.

  3. Finally, a SYN+ACK TCP packet is received from the remote host, which means it’s willing to establish a TCP connection with our app. Now OS is ready to signal the app as soon as the latter becomes ready.

  4. At some point the user app calls the KQ waiting function which tells the kernel: Give me something new.

  5. OS responds with Got a WRITE event from the socket associated with your cookie.

  6. This cookie is the object pointer which the app then uses to handle the signal — write some data to the TCP socket, in our case.

Although the primary use of KQ is I/O event notifications, it also can be used for other purposes, for example KQ can notify when a child process signals its parent (i.e. UNIX signals delivery), or KQ can be used to receive notifications from a system timer. I also explain these use-cases and show the example code in this article.

Internal representation example

Let’s see a diagram with an example of how KQ may look like internally after a user app has registered 6 different events there (user-triggered event, I/O events, system timer), 3 of which have signalled already.

         KQ table example
=================================
Event    | Descriptor | Signalled?
---------+------------+-----------
USER     | #789       |
READ     | #1         |
READ     | #2         | yes
WRITE    | #2         | yes
WRITE    | #3         |
TIMER    | #456       | yes

In this example, both READ and WRITE events for socket #2 are in signalled state which means we can read and write data from/to this socket. And the timer event is in signalled state too which means the system timer interval has expired. The signalled flag also means that after a user app calls the function to receive events from KQ, it will receive an array of these 3 signalled events so it can process them. The kernel then may clear the signalled flag so that it won’t deliver the same signals over and over again unless necessary.

Of course in reality KQ is much more complex but we don’t need to know exactly how the KQ is implemented internally — we need just to understand what and when it delivers to us and how me may use it effectively. I’m not a kernel developer so I don’t know much about how it’s implemented inside — you have to read some Linux/FreeBSD kernel manuals and epoll/kqueue code if you are interested in this subject.

API Principles

Now let’s talk about what features all those API provide us with. In general, working with a KQ API consists of 4 steps:

  1. Create KQ object. It’s the easiest part, where we just call a function which returns the descriptor for our new KQ. We may create KQ objects as many as we want, but I don’t see the point of creating more than 1 per process or thread.

  2. Attach file/socket descriptor along with opaque user data to KQ. We have to tell the OS that we want it to notify us about any particular descriptor through a particular KQ object. How else the kernel should know what to notify us about? Here we also associate some data with the descriptor, which is usually a pointer to some kind of a structure object. How else are we going to handle the received signal? The attachment is needed only once for each descriptor, usually it’s done right after the descriptor is configured and ready for I/O operations (though we can delay that until absolutely necessary to probably save a context switch). The detachment procedure usually is not needed (with the right design), so we won’t even talk about it here.

  3. Wait for incoming events from KQ. When the user app has nothing more important to do, it calls a KQ waiting function. We specify the output array of events and timeout value as parameters when calling this function. It fills our array with the information about which events signalled and how they signalled. By using an array of events rather than a single event we save CPU time on somewhat costly kernel-userspace context switches. By using timeout value we control how much time this KQ function can block internally. If we specify a positive value, then the function will block for this amount of time in case it has no events to give us. If we specify 0, it won’t block at all and return immediately.

    Some people use a small timeout value for KQ waiting functions so that they can check for some flags and variables and probably exit the waiting loop if some condition is met. But when using a small timeout value, like 50ms, they waste a lot of context switches unnecessarily. In this case OS periodically wakes up their process, even if it has nothing to do except calling the same KQ waiting function again in the next loop iteration. If you use this technique, it’s most likely that there’s something you do wrong. All normal software should use inifinite timeout, so the process wakes only when it is necessary.

  4. Destroy KQ object. When we don’t need a KQ object anymore, we close it so the OS can free all associated memory. Obviously, after KQ object is closed, you won’t be able to receive any notifications for the file descriptors attached to it.

What I like the most about this whole KQ idea is that user code is very clear and straightforward. I think the OS must deliver a nice, clear and convenient API for their users — the API which everybody understands how it works. And the way I understand it, a canonical KQ mechanism shouldn’t do or require users to do anything else except registering an event inside KQ and delivering this event from KQ to the user once it signals. There is one single promise to the user: When an event you care about signals, I will notify you about it. It allows the user code to be very flexible and free to do whatever it wants. Let’s see an example with pseudo code which proves my point.

Pseudo code example

// Pseudo code for an asynchronous HTTP/1 client
func do_logic(kq)
{
	conn := new
	conn.socket = socket(TCP, NONBLOCK)
	kq.attach(conn.socket, conn) // attach our socket along with the object pointer to KQ
	conn.connect_to_peer()
}

func connect_to_peer(conn)
{
	addr := "1.2.3.4:80"
	result := conn.socket.connect_async(addr) // initiate connection or get the result of the previously initiated connection procedure
	if result == EINPROGRESS {
		conn.write_handler = connect_to_peer
		return
	}

	print("connected to %1", addr)
	conn.write_data()
}

func write_data(conn)
{
	data[] := "GET / HTTP/1.1\r\nHost: hostname\r\n\r\n"
	result := conn.socket.send(data)
	if result == EAGAIN {
		conn.write_handler = write_data
		return
	}

	print("written %1 bytes to socket", result)
	conn.read_data()
}

func read_data(conn)
{
	data[], result := conn.socket.receive()
	if result == EAGAIN {
		conn.read_handler = read_data
		return
	}

	print("received %1 bytes from socket: %2", result, data)
}

func worker(kq)
{
	for {
		events[] := kq_wait(kq)

		for ev := events {

			conn := ev.user_data
			if ev.event == READ {
				conn.read_handler()
			}
			if ev.event == WRITE {
				conn.write_handler()
			}
		}
	}
}

Here we have 3 operations: connect, socket write, socket read. All 3 may block with a normal socket descriptor, so we set a non-blocking flag when creating the socket. Then we attach our socket to KQ along with the pointer to our object conn. Now we are ready to use the socket as we want, in our example we have a client socket which needs to be connected to a server.

We begin a socket connection procedure which may or may not complete immediately. In case it completes immediately — we continue with our program logic as usual, but in case it can’t complete immediately it just returns EINPROGRESS error code. If it does so, we set write_handler function pointer to the name of our function we want to be called when connection is established. And then we just return from our function, because there’s nothing else for us to do — we must wait. At this point our application is free to do whatever it wants — process something else or just wait until some events are received from the kernel. Which is why we have a worker() function. It receives events from KQ and processes them one by one, calling the appropriate handler function. In our case, connect_to_peer() function will be called after the TCP socket connection is established (or failed). Now we’re inside this function the second time and we now get the result of our previous connect request. It may be a failure, but I don’t check it here for the simplicity of our example.

In case the connection was successul, we continue by calling write_data function which sends an HTTP request to a server. Again, it may or may not complete immediately, so in case it returns with EAGAIN error, we set write_handler and return. As simple as that. After some time we are back inside our function again and we try to send the data once more. We may go back and forth with this logic until we have sent the complete data for our HTTP request. Once it’s sent we start reading the HTTP response from server.

We are inside the read_data() function which starts reading data from socket. It may or may not complete immediately. If it returns with EAGAIN, we set read_handler and return. Why do we use a different name for function pointer depending on whether it’s READ or WRITE event? For our example it doesn’t matter, but in real life when we use full-duplex sockets, both events may fire at once, therefore we must be prepared to handle both READ and WRITE events in parallel.

Isn’t it simple? The only thing we need inside our program logic is to check for return values and error codes and set handling function pointers, then after some time we’re back in the same function to try once more with the same code. I love this approach — it makes everything seem very clear, even though we’ve just written the program logic that can easily handle thousands of connections in parallel.

FreeBSD/macOS and kqueue

Now I think we’re ready for some real code with a real API. kqueue API is the one I like the most for some reason, so let’s start with it. I strongly advise you to read this section and try to completely understand it, even if you won’t use FreeBSD in your work. There’s just one syscall kevent() which we use with several different flags to control its behaviour.

Accepting socket connections with kqueue

Let’s see an easy example of a server which accepts a new connection by an event from kqueue.

/* Kernel Queue The Complete Guide: kqueue-accept.c: Accept socket connection
Usage:
	$ ./kqueue-accept
	$ curl 127.0.0.1:64000/
*/
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

int kq;

// the structure associated with a socket descriptor
struct context {
	int sk;
	void (*rhandler)(struct context *obj);
};

void accept_handler(struct context *obj)
{
	printf("Received socket READ event via kqueue\n");

	int csock = accept(obj->sk, NULL, 0);
	assert(csock != -1);
	close(csock);
}

void main()
{
	// create kqueue object
	kq = kqueue();
	assert(kq != -1);

	struct context obj = {};
	obj.rhandler = accept_handler;

	// create and prepare a socket
	obj.sk = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
	assert(obj.sk != -1);
	int val = 1;
	setsockopt(obj.sk, SOL_SOCKET, SO_REUSEADDR, &val, 4);

	struct sockaddr_in addr = {};
	addr.sin_family = AF_INET;
	addr.sin_port = ntohs(64000);
	assert(0 == bind(obj.sk, (struct sockaddr*)&addr, sizeof(addr)));
	assert(0 == listen(obj.sk, 0));

	// attach socket to kqueue
	struct kevent events[2];
	EV_SET(&events[0], obj.sk, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, &obj);
	EV_SET(&events[1], obj.sk, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, &obj);
	assert(0 == kevent(kq, events, 2, NULL, 0, NULL));

	// wait for incoming events from kqueue
	struct timespec *timeout = NULL; // wait indefinitely
	int n = kevent(kq, NULL, 0, events, 1, timeout);
	assert(n > 0);

	// process the received event
	struct context *o = events[0].udata;
	if (events[0].filter == EVFILT_READ)
		o->rhandler(o); // handle read event

	close(obj.sk);
	close(kq);
}

This code creates a TCP socket, attaches it to the newly created KQ, and waits for incoming connections. After a client is connected, it prints a message to stdout and sends an HTTP response. Inline comments explain in short form what each block is for. Now we’re going to describe all this in detail.

Creating and closing kqueue object

To create a new KQ object, we call kqueue() function which returns the descriptor or -1 on error. KQ object is usually stored in the global context (in our case — it’s just a global variable) because we need it all the time while our app is running. We close KQ object with close().

	kq = kqueue();
	...
	close(kq);

Attaching socket descriptor to kqueue

So how do we attach a file descriptor to our kqueue object? First, we prepare an object where we define:

  • Which file descriptor we want to associate with KQ. This value can be a socket, UNIX signal, or an arbitrary user-defined ID, depending on the type of event we want to register.

  • Which event we are interested in. For I/O events this value must be either EVFILT_READ or EVFILT_WRITE. For UNIX signals it’s EVFILT_SIGNAL, for timers it’s EVFILT_TIMER, for user events it’s EVFILT_USER, but they will be explained later in separate sections.

  • What we want kevent() to do: EV_ADD attaches descriptor to KQ. EV_CLEAR flag prevents the event from unnecessary signalling (it’s explained later).

  • What object pointer we associate with our file descriptor. Normally, this object contains at least 2 fields: file descriptor itself and the function pointer which handles the event.

To set the above parameters we use EV_SET() macro for convenience but you may also use the struct kevent fields directly.

Then, we call kevent() function which processes all our events we supplied to it and returns 0 on success.

	struct kevent events[2];
	EV_SET(&events[0], sk, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, obj);
	EV_SET(&events[1], sk, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, obj);
	kevent(kq, events, 2, NULL, 0, NULL);

Receiving events from kqueue

To receive events from KQ we must have a place to store them so the kernel can prepare the data for us — we need an array for signalled events, array of struct kevent objects. Note that it doesn’t necessarily mean that events are stored in kernel memory in the same form — it’s just what the kernel prepares for us. Normally, the events are stored in the chronological order, meaning that the event with index 0 has signalled before the event with a larger index, but this isn’t important for us, because user software is prepared to handle the events in any order anyway. In our example we use an array for the maximum of 1 events. Only very busy software can benefit from using a large array here to minimize the number of context switching.

We call kevent() function and pass the array to it along with a timeout value defining how long it can block in case there are no signalled events. The function returns the number of signalled events, or 0 if timeout value has passed before anything signalled, or -1 on error. Normally, we call KQ waiting functions in a loop like so:

	while (!quit) {
		struct timespec *timeout = NULL; // wait indefinitely
		struct kevent events[1];
		int n = kevent(kq, NULL, 0, events, 1, &timeout);
		if (n < 0 && errno == EINTR)
			continue; // kevent() interrupts when UNIX signal is received
	}

As you can see we also check the return value for error EINTR which means that while kevent() was waiting for events a UNIX signal has been received and processed by a UNIX signal handling function. This behaviour allows us to easily handle some important global flags that signal handlers may set. For example we may handle SIGINT signal which is sent after the user presses Ctrl+C within the terminal window. Then SIGINT signal handling function may set some kind of quit flag to indicate we should exit our app. kevent() then returns with EINTR, and we check for quit value and exit the loop in this case.

Processing received events from kqueue

To process an event which we have received from KQ previously we have to know what to do with it. But all events look pretty much the same to us at this point. That’s why we used an object pointer with EV_SET() to associate it with each event. Now we can simply call an event handling function. We get this pointer by accessing struct kevent.udata field. For full-duplex I/O we need either 2 different handling functions or a single handler which will check itself which filter has signalled. Since all KQ mechanisms have their different ways, I recommend you to go with 2-handlers approach and choose which handler to execute here, at the lowest level, to simplify the higher level code.

	struct context *o = events[i].udata;
	if (events[i].filter == EVFILT_READ)
		o->rhandler(o); // handle read event
	else if (events[i].filter == EVFILT_WRITE)
		o->whandler(o); // handle write event

Do you remember the EV_CLEAR flag we supplied to KQ when we attached socket to it? Here’s why we need to use it. For example, after KQ returns a READ event to us, it won’t signal again until we drain all the data from this socket, i.e. until recv() returns with EAGAIN error. This mechanism prevents from signalling the same event over and over again each time we call KQ waiting function, thus improving overall performance. The software that can’t deal with EV_CLEAR behaviour most probably has a design flaw.

Establishing TCP connection with kqueue

Now let’s see how to correctly use connect() on a TCP socket with kqueue. What makes this use-case special is that there is no function that could return the result of previous connect() operation. Instead, we must use struct kevent to get the error code. Here’s an example.

	int r = connect(sk, ...);
	if (r == 0) {
		... // connection completed successfully

	} else if (errno == EINPROGRESS) {
		// connection is in progress
		struct kevent events[1];
		int n = kevent(kq, NULL, 0, events, 1, &timeout);

		if (events[0].filter == EVFILT_WRITE) {
			errno = 0;
			if (events[i].flags & EV_EOF)
				errno = events[0].fflags;
			... // handle TCP connection result depending on `errno` value
		}

	} else {
		... // fatal error
	}

Suppose that we created a non-blocking TCP socket, attached it to KQ, and now we begin the connection procedure. If it completes successfully right away, then we can read or write data to it immediately, and it isn’t what we are talking about here. But if it returns -1 with EINPROGRESS error, we should wait until OS notifies about with the result of the procedure. And here’s the main thing: when EVFILT_WRITE event is received we test struct kevent.flags field for EV_EOF and if it’s set, then it means that connect() has failed. In this case struct kevent.fflags field contains the error number — the same error that a blocking connect() call would set.

I have to say that I don’t like this whole logic with getting an error code from struct kevent because it forces me to multiply branches in my code. Another reason behind that is because it’s kqueue-specific stuff — for example on Linux we have to call getsockopt(..., SOL_SOCKET, SO_ERROR, ...) to get error code. But on the other hand, on FreeBSD we don’t need to perform another syscall which probably outweighs both of my points above, so in the end I think it’s alright.

Let’s see the complete example of a simple HTTP/1 client that connects, sends request and receives response — all via KQ. But of course you may notice that our code for handling WRITE event from KQ is useless here, because our request is very small and should always fit into the empty socket buffer, i.e. send() will always complete immediately. But I think it’s OK for a sample program — the goal is to show you the general principle.

/* Kernel Queue The Complete Guide: kqueue-connect.c: HTTP/1 client
Usage:
	$ nc -l 127.0.0.1 64000
	$ ./kqueue-connect
*/
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

int kq;
int quit;

// the structure associated with a socket descriptor
struct context {
	int sk;
	void (*rhandler)(struct context *obj);
	void (*whandler)(struct context *obj);
	int data_offset;
};

void obj_write(struct context *obj);
void obj_read(struct context *obj);

void obj_prepare(struct context *obj)
{
	// create and prepare socket
	obj->sk = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
	assert(obj->sk != -1);

	int val = 1;
	assert(0 == setsockopt(obj->sk, 0, TCP_NODELAY, (char*)&val, sizeof(int)));

	// attach socket to KQ
	struct kevent events[2];
	EV_SET(&events[0], obj->sk, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, obj);
	EV_SET(&events[1], obj->sk, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, obj);
	assert(0 == kevent(kq, events, 2, NULL, 0, NULL));
}

void obj_connect(struct context *obj)
{
	if (obj->whandler == NULL) {
		// begin asynchronous connection
		struct sockaddr_in addr = {};
		addr.sin_family = AF_INET;
		addr.sin_port = ntohs(64000);
		char ip4[] = {127,0,0,1};
		*(int*)&addr.sin_addr = *(int*)ip4;

		int r = connect(obj->sk, (struct sockaddr*)&addr, sizeof(struct sockaddr_in));
		if (r == 0) {
			// connection completed successfully
		} else if (errno == EINPROGRESS) {
			// connection is in progress
			obj->whandler = obj_connect;
			return;
		} else {
			assert(0); // fatal error
		}

	} else {
		assert(errno == 0); // connection is successful
		obj->whandler = NULL; // we don't want any more signals from KQ
	}

	printf("Connected\n");
	obj_write(obj);
}

void obj_write(struct context *obj)
{
	const char data[] = "GET / HTTP/1.1\r\nHost: hostname\r\nConnection: close\r\n\r\n";
	int r = send(obj->sk, data + obj->data_offset, sizeof(data)-1 - obj->data_offset, 0);
	if (r > 0) {
		// sent some data
		obj->data_offset += r;
		if (obj->data_offset != sizeof(data)-1) {
			// we need to send the complete request
			obj_write(obj);
			return;
		}
		obj->whandler = NULL;

	} else if (r < 0 && errno == EAGAIN) {
		// the socket's write buffer is full
		obj->whandler = obj_write;
		return;
	} else {
		assert(0); // fatal error
	}

	printf("Sent HTTP request.  Receiving HTTP response...\n");
	obj_read(obj);
}

void obj_read(struct context *obj)
{
	char data[64*1024];
	int r = recv(obj->sk, data, sizeof(data), 0);
	if (r > 0) {
		// received some data
		printf("%.*s", r, data);
		obj_read(obj);
		return;

	} else if (r == 0) {
		// server has finished sending data

	} else if (r < 0 && errno == EAGAIN) {
		// the socket's read buffer is empty
		obj->rhandler = obj_read;
		return;
	} else {
		assert(0); // fatal error
	}

	quit = 1;
}

void main()
{
	// create KQ object
	kq = kqueue();
	assert(kq != -1);

	struct context obj = {};
	obj_prepare(&obj);
	obj_connect(&obj);

	// wait for incoming events from KQ and process them
	while (!quit) {
		struct kevent events[1];
		struct timespec *timeout = NULL; // wait indefinitely
		int n = kevent(kq, NULL, 0, events, 1, timeout);
		if (n < 0 && errno == EINTR)
			continue; // kevent() interrupts when UNIX signal is received
		assert(n > 0);

		// now process each signalled event
		for (int i = 0;  i != n;  i++) {
			struct context *o = events[i].udata;

			errno = 0;
			if (events[i].flags & EV_EOF)
				errno = events[i].fflags;

			if (events[i].filter == EVFILT_READ
				&& o->rhandler != NULL)
				o->rhandler(o); // handle read event

			if (events[i].filter == EVFILT_WRITE
				&& o->whandler != NULL)
				o->whandler(o); // handle write event
		}
	}

	close(obj.sk);
	close(kq);
}

Note that macOS doesn’t support SOCK_NONBLOCK flag in socket() — you should set the socket as nonblocking manually.

Processing stale cached events

One of the most interesting aspects of programming with KQ is handling the events in which we in fact are not interested anymore. Here’s what may happen when we use KQ carelessly:

  • We attach a socket to KQ for both READ and WRITE events.

  • We keep performing normal operations on a socket, reading and writing to it occasionally.

  • At some point both READ and WRITE events get signalled.

  • We use an array of events for KQ waiting function and it returns 2 events to us for the same socket.

  • We start handling the first event which happens to be a READ event.

  • We call READ event handling function, it processes this event and comes to a conclusion that the client object should be closed, because it has sent some invalid data. We close the socket and destroy the object. And everything seems to be correct, because after the socket is closed KQ won’t signal us with it anymore. But remember that we have called a KQ waiting function some time ago and it has already returned 2 events to us. We’ve handled the first event just now, but the second event is still in our array of event objects and is yet to be processed in the next iteration of the loop.

  • We start handling the second event which is WRITE event in our case. We take our object data associated with the event and we try to call the handling function. BAM! We hit the memory region we have just destroyed while handling the READ event.

This can happen at any time while our app is running and we don’t know anything in advance — we need to correctly determine such cases and handle them as they occur. Note that this situation isn’t just limited to full-duplex sockets, but to any KQ event in general. Suppose a timer signal has fired and we decided to close a client connection, but its socket has signalled already, and there’s an associated event already in our cache — we just don’t know it yet, because the timer signal has occurred just before it. So unless we always limit the number of received events from KQ to 1, we must always be ready to handle this situation — we can’t prevent it from happening. And of course, there’s the same problem on Linux with epoll, so it’s not just kqueue-only stuff.

So how are we going to solve this problem? Thankfully, it’s already solved by Igor Sysoev (the great man who initially wrote nginx) long time ago. Here’s the trick.

struct context {
	int sk;
	void (*handler)(struct context *obj);
	int flag;
};

void handler_func(struct context *obj)
{
	if (...) {
		goto finish; // an error occurred
	}
	...

finish:
	close(obj->sk);
	obj->flag = !obj->flag; // turn over the safety flag
}

void main()
{
	...
	struct context *obj = ...;
	obj->flag = 0;
	struct kevent events[2];
	void *ptr = (void*)((size_t)obj | obj->flag); // pass additional flag along with the object pointer to KQ
	EV_SET(&events[0], obj->sk, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, ptr);
	EV_SET(&events[1], obj->sk, EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, ptr);
	kevent(kq, events, 2, NULL, 0, NULL);

	...

	int n = kevent(kq, NULL, 0, events, 8, &timeout);
	for (...) {
		void *ptr = events[i].udata;
		struct context *obj = (void*)((size_t)ptr & ~1); // clear the lowest bit

		int flag = (size_t)ptr & 1; // check the lowest bit
		if (flag != obj->flag)
			continue; // don't process this event

		obj->handler(obj);
	}
}

Explanation:

  • When we attach a socket to KQ, we also associate our object pointer with it. But we can actually store some more information there — not just the pointer. In our example here, we set the value of struct context.flag field (which is 0 at first) as the lowest bit value along with our object pointer. It works because all structure objects that contain a pointer are aligned to at least 4 or 8 bytes by default. In other words, the lowest bit for any object pointer is always 0, and we can use it for our own purposes.

  • After we have received an event from KQ, we clear the lowest bit when converting the user data pointer to our object pointer. All event handlers are called as usual without any problem.

	kevent(EV_ADD, 0x???????0)

	events[] = kevent()
	object = events[0].udata.bits[1..31]

	// object   | events[0]
	// ---------+-------------------
	// {flag=0} | {udata=0x???????0}

	if 0 == 0 // TRUE
	  object.handler()
  • But if inside the event handling function we decided to close the socket, we mark our object as unused — in our case we set an internally stored safety flag to 1. We don’t free the memory associated with our object so that we can access this value later.

	read_handler() {
		close(object.socket)
		object.flag = 1
	}
  • When we start the processing of the next (cached) event for the same socket, we compare the lowest bit from the associated data pointer with the flag stored within our object. In our case they don’t match, which means that we don’t want this event to be processed. Note that C doesn’t allow logical bit operations on pointers, hence the somewhat ugly cast to integer type and back.

	object = events[1].udata.bits[1..31]

	// object   | events[1]
	// ---------+-------------------
	// {flag=1} | {udata=0x???????0}

	if 1 == 0 // FALSE
  • After we have processed all cached events we may free the memory allocated for our objects marked as unused. Or we may decide not to free the memory for our objects at all — because the next iteration may need to create a new object and we would need to allocate memory again. Instead, we may store all unused objects in a list (or array) and free them only when our app is closing the whole KQ subsystem.

  • Next time we use the same object pointer (if we didn’t free its memory), the flag is still set to 1 and so it is passed to KQ as the lowest bit of the user data pointer when we attach a new socket descriptor to KQ.

	kevent(EV_ADD, 0x???????1)

	events[] = kevent()
	object = events[0].udata.bits[1..31]

	// object   | events[0]
	// ---------+-------------------
	// {flag=1} | {udata=0x???????1}

	if 1 == 1 // TRUE
	  object.handler()
  • And again, when we have finished working with this object, we turn it over and set the flag to 0. After that, the values from KQ (the old bit value 1) and the flag value inside the object (now 0) don’t match, therefore the handling function won’t be called which is exactly what we want.

	read_handler() {
		close(object.socket)
		object.flag = 0
	}

	object = events[1].udata.bits[1..31]

	// object   | events[1]
	// ---------+-------------------
	// {flag=0} | {udata=0x???????1}

	if 0 == 1 // FALSE

Some people don’t use the lowest bit approach to handle the problem with stale events. They just use a list where they put the unused objects until they can free them (after each iteration or by a timer signal). Imagine how this can slow down the processing when every cached event starting at index 1 should be checked against some data in a container — a search must be performed which wastes CPU cycles. I don’t know what’s the reasoning for doing so, but I really don’t see how it’s too hard to use the lowest bit trick and turn it over once in a while. So I advise using a list of unused objects only to save on countless memory allocations and deallocations and not for deciding whether an event is stale or not.

User-triggered events with kqueue

When we use an infinite timeout in KQ waiting function, it blocks forever until it can return an event. Similar to how it returns with EINTR after UNIX signal is processed, we can force it to return at any time by sending a user event to KQ. To do it we first register a EVFILT_USER event in KQ, then we can trigger this event via NOTE_TRIGGER. Here’s an example.

/* Kernel Queue The Complete Guide: kqueue-user.c: User-triggered events */
#include 
#include 
#include 
#include 
#include 

int kq;

struct context {
	void (*handler)(struct context *obj);
};

struct context user_event_obj;

void user_event_obj_handler(struct context *obj)
{
	printf("Received user event via kqueue\n");
}

// application calls this function whenever it wants to add a new event to KQ
// which will execute user_event_obj_handler()
void trigger_user_event()
{
	user_event_obj.handler = user_event_obj_handler;

	struct kevent events[1];
	EV_SET(&events[0], 1234, EVFILT_USER, 0, NOTE_TRIGGER, 0, &user_event_obj);
	assert(0 == kevent(kq, events, 1, NULL, 0, NULL));
}

void main()
{
	// create kqueue object
	kq = kqueue();
	assert(kq != -1);

	// register user event with any random ID
	// note that user data is NULL here
	struct kevent events[1];
	EV_SET(&events[0], 1234, EVFILT_USER, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, NULL);
	assert(0 == kevent(kq, events, 1, NULL, 0, NULL));

	trigger_user_event();

	struct timespec *timeout = NULL; // wait indefinitely
	int n = kevent(kq, NULL, 0, events, 1, timeout);
	assert(n > 0);

	struct context *o = events[0].udata;
	if (events[0].filter == EVFILT_USER)
		o->handler(o); // handle user event

	close(kq);
}

To register a new user event in KQ we have to supply an arbitrary ID which we later will use to trigger it, I use 1234 just for example. Note also that contrary to when attaching socket descriptor, we don’t set user data pointer at this point.

	EV_SET(&events[0], 1234, EVFILT_USER, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, NULL);

Then at some point we decide to trigger the event with NOTE_TRIGGER. And now we can pass an object pointer which the waiting function will return back to us after the event signals.

	EV_SET(&events[0], 1234, EVFILT_USER, 0, NOTE_TRIGGER, 0, obj);

After this event is returned from KQ and gets processed, it won’t signal again until we trigger it next time — that’s because we set EV_CLEAR flag when registering the event.

System timer events with kqueue

Another facility that KQ offers us is system timers — we can order KQ to periodically send us a timer event. A timer is necessary when we want to close connections for the clients that are silent for too long, for example. In kqueue we register a timer with EVFILT_TIMER and process its events as usual. For example:

/* Kernel Queue The Complete Guide: kqueue-timer.c: System timer events */
#include 
#include 
#include 
#include 
#include 

int kq;

struct context {
	void (*handler)(struct context *obj);
};

void timer_handler(struct context *obj)
{
	static int n;
	printf("Received timer event via kqueue: %d\n", n++);
}

void main()
{
	kq = kqueue();
	assert(kq != -1);

	struct context obj = {};
	obj.handler = timer_handler;

	// start system timer
	int period_ms = 1000;
	struct kevent events[1];
	EV_SET(&events[0], 1234, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, period_ms, &obj);
	assert(0 == kevent(kq, events, 1, NULL, 0, NULL));

	for (;;) {
		struct timespec *timeout = NULL; // wait indefinitely
		int n = kevent(kq, NULL, 0, events, 1, timeout);
		assert(n > 0);

		struct context *o = events[0].udata;
		if (events[0].filter == EVFILT_TIMER)
			o->handler(o); // handle timer event
	}

	close(kq);
}

Sometimes we don’t need a periodic timer, but the timer which will signal us only once, i.e. a one-shot timer. It’s simple with kqueue — we just use EV_ONESHOT flag:

	EV_SET(&events[0], 1234, EVFILT_TIMER, EV_ADD | EV_ENABLE | EV_ONESHOT, 0, period_ms, obj);

Keep in mind that KQ timers are not designed so that you can use a million of them — you just need 1. Even if our software handles a million clients, we still need 1 system timer, because all we need is to just periodically wake up and process the oldest entries in our timer queue which we handle ourselves with our own code. Timer queue mechanism isn’t a part of KQ, it isn’t in scope of this article, but it’s just a linked-list or rbtree container where the first item is the oldest.

UNIX signals from kqueue

Another convenient feature of KQ is handling UNIX signals. We register a UNIX signal handler with EVFILT_SIGNAL and pass the signal number we want to attach to. When processing an event, we get the signal number from struct kevent.ident field. Example:

/* Kernel Queue The Complete Guide: kqueue-signal.c: UNIX signal handler
Usage:
	$ ./kqueue-signal
	$ killall -SIGUSR1 kqueue-signal
*/
#include 
#include 
#include 
#include 
#include 
#include 

int kq;

struct context {
	void (*handler)(int sig);
};

void unix_signal_handler(int sig)
{
	printf("Received UNIX signal via kqueue: %d\n", sig);
}

void main()
{
	kq = kqueue();
	assert(kq != -1);

	struct context obj = {};
	obj.handler = unix_signal_handler;

	// block default signal handler
	int sig = SIGUSR1;
	sigset_t mask;
	sigemptyset(&mask);
	sigaddset(&mask, sig);
	sigprocmask(SIG_BLOCK, &mask, NULL);

	// register UNIX signal handler
	struct kevent events[1];
	EV_SET(&events[0], sig, EVFILT_SIGNAL, EV_ADD | EV_ENABLE, 0, 0, &obj);
	assert(0 == kevent(kq, events, 1, NULL, 0, NULL));

	struct timespec *timeout = NULL; // wait indefinitely
	int n = kevent(kq, NULL, 0, events, 1, timeout);
	assert(n > 0);

	struct context *o = events[0].udata;
	if (events[0].filter == EVFILT_SIGNAL) {
		int sig = events[0].ident;
		obj.handler(sig); // handle UNIX signal
	}

	close(kq);
}

Note that KQ will return the event only after the signal has been processed with its normal mechanism, i.e. the handlers registered with sigaction() — KQ can’t completely replace this mechanism. But KQ makes it easier to handle signals such as SIGCHLD, when a child process signals its parent about its closure.

Asynchronous file I/O with kqueue

Trying to read or write data from/to files on disk is a little bit harder than performing I/O on sockets. One of the reasons behind this is because sockets don’t have an offset to read at — we always read from one end, while with files we may issue several parallel operations at different offsets. And how can OS notify us about which particular operation has completed? That’s why the kernel has to provide us with a new API for dealing with file AIO. And FreeBSD is the only OS that has a complete implementation of asynchronous file read/write operations. Sadly, it doesn’t look anything like the rest of what we’ve talked about here so far. What we would have expected from kqueue is the mechanism which just signals us when some data is available to read or write from/to a file — exactly the same way we work with sockets. But no, what we have here instead is the mechanism of asynchronous file operations (only read/write operations are supported) which hold (or «lock») the user data buffer internally and don’t signal at all unless the whole data chunk is transferred, depriving us from the flexibility we already got used to when working with sockets. Now all this looks more like IOCP, which can’t be a good sign. But anyway, since I promised you to show everything I know about KQ, let’s see how this mechanism works with files.

  • First, we enable the AIO subsystem by loading the appropriate kernel module:

      % kldload aio
    
  • The next step is to prepare an AIO object of type struct aiocb and call the appropriate function, that is aio_read() or aio_write().

  • Then we immediately check for operation status with aio_error(), because it may have finished already before we even called KQ waiting function.

  • When we know that everything is fine and the operation is in progress, we wait for a signal from KQ as usual.

  • When we receive an event of type EVFILT_AIO from KQ we may read struct kevent.ident field to get struct aiocb* object pointer associated with the operation. This is how we can distinguish several parallel operations on the same file descriptor from each other.

Here’s the minimal example of how to read from a file asynchronously:

/* Kernel Queue The Complete Guide: kqueue-file.c: Asynchronous file reading
Usage:
	$ echo 'Hello file AIO' >./kqueue-file.txt
	$ ./kqueue-file
*/
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

int kq;

struct context {
	int fd;
	struct aiocb acb;
	int (*handler)(struct context *obj, struct aiocb *acb);
};

void file_io_result(const char *via, int res)
{
	printf("Read from file via %s: %d\n", via, res);
}

int file_aio_handler(struct context *obj, struct aiocb *acb)
{
	int r = aio_error(acb);
	if (r == EINPROGRESS) {
		return 0; // AIO in progress
	} else if (r == -1) {
		file_io_result("kqueue", -1); // AIO completed with error
		return -1;
	}

	r = aio_return(acb);
	file_io_result("kqueue", r); // AIO completed successfully
	return 1;
}

void main()
{
	// create KQ object
	kq = kqueue();
	assert(kq != -1);

	// open file descriptor and prepare the associated object
	int fd = open("./kqueue-file.txt", O_RDONLY, 0);
	assert(fd != -1);
	struct context obj = {};
	obj.handler = file_aio_handler;

	// associate the AIO operation with KQ and user object pointer
	memset(&obj.acb, 0, sizeof(obj.acb));
	obj.acb.aio_sigevent.sigev_notify_kqueue = kq;
	obj.acb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
	obj.acb.aio_sigevent.sigev_notify_kevent_flags = EV_CLEAR;
	obj.acb.aio_sigevent.sigev_value.sigval_ptr = &obj;

	void *buf = malloc(4*1024);

	// specify operation parameters
	obj.acb.aio_fildes = fd;
	obj.acb.aio_buf = buf; // destination buffer
	obj.acb.aio_nbytes = 4*1024; // max number of bytes to read
	obj.acb.aio_offset = 0; // offset to begin reading at

	// begin file AIO operation
	obj.acb.aio_lio_opcode = LIO_READ;
	if (0 != aio_read(&obj.acb)) {
		if (errno == EAGAIN || errno == ENOSYS || errno == EOPNOTSUPP) {
			// no resources to complete this I/O operation
			// or AIO module isn't loaded
			// or the system can't perform AIO on this file
		} else {
			file_io_result("aio_read", -1);
			return; // fatal error
		}

		// AIO doesn't work - perform synchronous reading at the specified offset
		int r = pread(fd, buf, obj.acb.aio_nbytes, obj.acb.aio_offset);
		file_io_result("pread", r);
		return;
	}

	// asynchronous file reading has started, but might be finished already
	if (0 != file_aio_handler(&obj, &obj.acb))
		return;

	// asynchronous file reading is in progress, now wait for the signal from KQ
	struct kevent events[1];
	struct timespec *timeout = NULL; // wait indefinitely
	int n = kevent(kq, NULL, 0, events, 1, timeout);

	struct context *o = events[0].udata;
	if (events[0].filter == EVFILT_AIO) {
		struct aiocb *acb = (void*)events[0].ident;
		o->handler(o, acb); // handle file AIO event
	}

	free(buf);
	close(fd);
	close(kq);
}

The main function here is aio_read(), which processes our request and starts the asynchronous operation. It returns 0 if the operation has started successfully.

	struct aiocb acb = ...; // fill in `acb` object
	acb.aio_lio_opcode = LIO_READ;
	aio_read(&acb); // begin file AIO operation

However, if something is wrong, it returns with an error, and we must handle several cases here:

	if (errno == EAGAIN || errno == ENOSYS || errno == EOPNOTSUPP) {
		// no resources to complete this I/O operation
		// or AIO module isn't loaded
		// or the system can't perform AIO on this file
	} else {
		// fatal error
	}

We can handle some types of errors by issuing a synchronous file reading:

	// AIO doesn't work - perform synchronous reading at the specified offset
	int r = pread(fd, buf, size, off);

For writing data to a file asynchronously we use the same template, except the opcode and the function are different, while everything else is the same:

	struct aiocb acb = ...; // fill in `acb` object
	acb.aio_lio_opcode = LIO_WRITE;
	aio_write(&acb); // begin file AIO operation

And of course we use a different function for the synchronous file writing in case file AIO doesn’t work:

	int r = pwrite(fd, buf, size, off);

Though file AIO is good to have, it’s still not enough for high-performance file servers, because such software needs not just file AIO in terms of reading/writing file data, but disk AIO in general. What for? In our example above we use asynchronous file reading only, but we know that the very first step in working with files in UNIX is opening a file descriptor. However, we just can’t perform an asynchronous file open — it’s not supported. And in the real life an open() syscall may take a whole second to complete on a busy machine — it may block our worker thread for a long time. In real life we also want to call stat() or fstat() on a file path or a file descriptor. And OS doesn’t provide a way to call them asynchronously either, except calling them inside another thread. So even if file AIO can help sometimes, it’s still somewhat lame and incomplete. And considering the fact that other OS don’t have an appropriate file AIO implementation at all, it may be a better choice not to use any of those APIs at all. It may be better to use a thread pool with a file operations queue and dispatch operations to another thread. Inside a new thread the operations will be performed synchronously. And then it will signal the main thread when the operation is complete.

Linux and epoll

epoll API is very similar to kqueue for socket I/O notifications, though it’s quite different for other purposes. Here we use epoll_ctl() to attach file descriptors to KQ and we use epoll_wait() to receive events from KQ. There are several more syscalls we’re going to use for user events, timers and UNIX signals: eventfd(), timerfd_create(), timerfd_settime(), signalfd(). Overall, the functionality of epoll is the same as kqueue but sometimes with a slightly different approach.

The key differences of epoll and kqueue are:

  • kevent() supports attaching many file descriptors in a single syscall, epoll_ctl() does not — we must call it once for each fd. kevent() even allows us to attach fd’s AND wait for new events in a single syscall, epoll_wait() can’t do that.

  • epoll may join 2 events (EPOLLIN and EPOLLOUT) into 1 event object in case both READ and WRITE events signal. This is contrary to kqueue which always returns 1 event object per 1 event (EVFILT_READ or EVFILT_WRITE). Be careful with epoll here, always check if it’s alright to execute event handling functions, because otherwise you risk calling WRITE event handler after you have finalized the object in READ event handler.

  • epoll makes it somewhat harder to use additional KQ functionality such as UNIX signals, system timers or user events — see below. In kqueue, however, it looks all the same.

What is different but also similar between epoll and kqueue:

  • We attach a socket to epoll using EPOLLIN | EPOLLOUT flags which is the same as registering 2 separate events EVFILT_READ and EVFILT_WRITE with kqueue.

  • EPOLLET flag in epoll is the same thing as EV_CLEAR flag in kqueue — it prevents epoll from signalling us about the same event more than once until we drain all data from the socket. And it’s the same situation when writing to a streaming (e.g. TCP) socket — we must keep calling write() until it returns with EAGAIN error — only then we may expect epoll to signal us.

Accepting socket connections with epoll

Here’s a minimal example for accepting a socket connection.

/* Kernel Queue The Complete Guide: epoll-accept.c: Accept socket connection
Usage:
	$ ./epoll-accept
	$ curl 127.0.0.1:64000/
*/
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

int kq;

// the structure associated with a socket descriptor
struct context {
	int sk;
	void (*rhandler)(struct context *object);
};

void accept_handler(struct context *obj)
{
	printf("Received socket READ event via epoll\n");

	int csock = accept(obj->sk, NULL, 0);
	assert(csock != -1);
	close(csock);
}

void main()
{
	// create KQ object
	kq = epoll_create(1);
	assert(kq != -1);

	struct context obj = {};
	obj.rhandler = accept_handler;

	// create and prepare a socket
	obj.sk = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
	assert(obj.sk != -1);
	int val = 1;
	setsockopt(obj.sk, SOL_SOCKET, SO_REUSEADDR, &val, 4);

	struct sockaddr_in addr = {};
	addr.sin_family = AF_INET;
	addr.sin_port = ntohs(64000);
	assert(0 == bind(obj.sk, (struct sockaddr*)&addr, sizeof(addr)));
	assert(0 == listen(obj.sk, 0));

	// attach socket to KQ
	struct epoll_event event;
	event.events = EPOLLIN | EPOLLOUT | EPOLLET;
	event.data.ptr = &obj;
	assert(0 == epoll_ctl(kq, EPOLL_CTL_ADD, obj.sk, &event));

	// wait for incoming events from KQ
	struct epoll_event events[1];
	int timeout_ms = -1; // wait indefinitely
	int n = epoll_wait(kq, events, 1, timeout_ms);
	assert(n > 0);

	// process the received event
	struct context *o = events[0].data.ptr;
	if (events[0].events & (EPOLLIN | EPOLLERR))
		o->rhandler(o); // handle read event

	close(obj.sk);
	close(kq);
}

Creating and closing epoll object

epoll_create() function returns new KQ object descriptor which we close as usual with close().

	kq = epoll_create(1);
	...
	close(kq);

Attaching socket descriptor to epoll

EPOLLIN flag means that we want the kernel to notify us when a READ event signals, and EPOLLOUT flag is the same for a WRITE event. EPOLLET prevents epoll from returning to us the same signal unnecessarily. We set our object pointer with struct epoll_event.data.ptr field.

	struct epoll_event event;
	event.events = EPOLLIN | EPOLLOUT | EPOLLET;
	event.data.ptr = obj;
	epoll_ctl(kq, EPOLL_CTL_ADD, sk, &event);

Receiving events from epoll

epoll_wait() function blocks until it has something to return to us, or until the timeout value expires. The function returns the number of signalled events which may be 0 only in case of timeout. It also returns with EINTR exactly like kevent() after a UNIX signal has been received, so we must always handle this case.

	while (!quit) {
		struct epoll_event events[1];
		int timeout_ms = -1; // wait indefinitely
		int n = epoll_wait(kq, events, 1, timeout_ms);
		if (n < 0 && errno == EINTR)
			continue; // epoll_wait() interrupts when UNIX signal is received
	}

Processing received events from epoll

We get the events flags by reading struct epoll_event.events which we should always test for EPOLLERR too, because otherwise we can miss an event we’re waiting for. To get the associated user data pointer, we read struct epoll_event.data.ptr value.

I emphasize once again that you should be careful not to invalidate user object memory region inside READ event handler, or the program may crash inside the WRITE event handler which is executed next. To handle this situation you may just always set event handler function pointers to NULL when you don’t expect them to signal. An alternative solution may be to clear EPOLLOUT | EPOLLERR flags from struct epoll_event.events field from inside READ event handler.

	struct context *o = events[i].data.ptr;

	if ((events[i].events & (EPOLLIN | EPOLLERR))
		&& o->rhandler != NULL)
		o->rhandler(o); // handle read event

	if ((events[i].events & (EPOLLOUT | EPOLLERR))
		&& o->whandler != NULL)
		o->whandler(o); // handle write event

Establishing TCP connection with epoll

As with kqueue, epoll also has a way to notify us about the status of TCP connection. But unlike kqueue which sets the error number for us inside struct kevent object, epoll doesn’t do that (it can’t do that). Instead, we get the error number associated with our socket via getsockopt(..., SOL_SOCKET, SO_ERROR, ...).

	int err;
	socklen_t len = 4;
	getsockopt(obj->sk, SOL_SOCKET, SO_ERROR, &err, &len);
	errno = err;
	... // handle TCP connection result depending on `errno` value

User-triggered events with epoll

There are slightly more things to do with epoll rather than with kqueue to handle user-triggered events:

  • First, we create a new file descriptor with eventfd() which we then attach to KQ.

  • We trigger a user event at any time by writing an 8 byte value to our eventfd descriptor. In our case this value is an object pointer.

  • After we receive an event from KQ, we read an 8 byte value from eventfd descriptor. We can convert this value to an object pointer. Remember that we need to keep reading data from eventfd descriptor until it returns with EAGAIN error, because we use EPOLLET.

/* Kernel Queue The Complete Guide: epoll-user.c: User-triggered events */
#include 
#include 
#include 
#include 
#include 
#include 
#include 

int kq;
int efd;

struct context {
	void (*handler)(struct context *obj);
};

struct context eventfd_obj;
struct context user_event_obj;

void user_event_obj_handler(struct context *obj)
{
	printf("Received user event via epoll\n");
}

// application calls this function whenever it wants to add a new event to KQ
// which will execute user_event_obj_handler()
void trigger_user_event()
{
	struct context *obj = &user_event_obj;
	obj->handler = user_event_obj_handler;

	unsigned long long val = (size_t)obj;
	int r = write(efd, &val, 8);
	assert(r == 8);
}

// handle event from eventfd-descriptor
void handle_eventfd(struct context *obj)
{
	unsigned long long val;
	for (;;) {
		int r = read(efd, &val, 8);
		if (r < 0 && errno == EAGAIN)
			break;
		assert(r == 8);

		struct context *o = (void*)(size_t)val;
		o->handler(o);
	}
}

void main()
{
	// create kqueue object
	kq = epoll_create(1);
	assert(kq != -1);

	struct context obj = {};
	obj.handler = handle_eventfd;

	// prepare eventfd-descriptor for user events
	efd = eventfd(0, EFD_NONBLOCK);
	assert(efd != -1);

	// register eventfd in KQ
	struct epoll_event event;
	event.events = EPOLLIN | EPOLLET;
	event.data.ptr = &obj;
	assert(0 == epoll_ctl(kq, EPOLL_CTL_ADD, efd, &event));

	trigger_user_event();

	struct epoll_event events[1];
	int timeout_ms = -1;
	int n = epoll_wait(kq, events, 1, timeout_ms);
	assert(n > 0);

	struct context *o = events[0].data.ptr;
	if (events[0].events & (EPOLLIN | EPOLLERR))
		o->handler(o); // handle eventfd event

	close(efd); // close eventfd descriptor
	close(kq);
}

System timer events with epoll

To receive the notifications from system timer with epoll we must use a timerfd object — a special file descriptor we can attach to epoll.

/* Kernel Queue The Complete Guide: epoll-timer.c: System timer events */
#include 
#include 
#include 
#include 
#include 
#include 

int kq;
int tfd;

struct context {
	void (*handler)(struct context *obj);
};

void timer_handler(struct context *obj)
{
	static int n;
	printf("Received timerfd event via epoll: %d\n", n++);

	unsigned long long val;
	read(tfd, &val, 8);
}

void main()
{
	// create kqueue object
	kq = epoll_create(1);
	assert(kq != -1);

	struct context obj = {};
	obj.handler = timer_handler;

	// prepare timerfd-descriptor
	tfd = timerfd_create(CLOCK_MONOTONIC, 0);
	assert(tfd != -1);

	// register 
    
            

© Habrahabr.ru