In an earlier post I explained why it is important to separate I/O work from CPU-intensive work. I hinted then about non-blocking I/O and how that would change the behaviour of threads from the traditional blocking I/O operations.

In this post I'll explain what non-blocking I/O is, how you can use it, and when it might be useful. The code examples will be in Kotlin and running on the JVM, but the principles behind this would work regardless of programming language as this is usually supported through the kernel of your OS.


Why blocking can be bad

Defining blocking and non-blocking function calls can be a bit tricky. While time is a factor, it isn't necessarily so that a function that takes a long time is blocking while a fast running call is non-blocking. Reading a single byte from a file on a modern hard drive is usually really fast, but it is still a blocking call. Iterating over a huge array can take much longer, but is usually not considered a blocking call.

In the context of this post, I would like to define blocking calls as those that block outside of what the developer can affect, while a non-blocking call is the opposite. The developer can't change the fact that there will be IOwait states when reading a single byte from a file, but she can optimise the iteration of the huge array and there are no IOwaits involved.

Blocking is not always a bad thing. Often we need this as it is the only way to know when an operation is completed and there is a result to process. But having too many blocking calls means we need a dedicated thread for each of them. That can lead to resource waste and potentially slowing down the application when it needs context switching between thousands of running threads.

We will rarely find that  non-blocking I/O is useful in client-side applications, especially when it comes to the type of I/O we usually do (HTTP calls, database queries, etc.). Non-blocking I/O is much more useful in server-side code when dealing with potentially thousands of parallel client requests.

A non-blocking echo server

The simplest possible server we can implement is an echo server, where a client connects to a TCP socket and transmits a piece of text which is then echoed back from the server on the same socket. I will show the code for this using Kotlin targeting the JVM platform. The principals about how this works is the same regardless of which programming language you would use, as long as the underlying platform supports non-blocking I/O.

java.nio.Buffer

One of the core components of the non-blocking API in Java is the Buffer class. You might have been using this already in Android applications, especially if you've been writing code for the camera, or used RenderScript or OpenGL. A Buffer is a container for data of a primitive type. You can find subclasses for all the Java primitives (ByteBuffer, IntBuffer, DoubleBuffer etc.) in the same package. You can think of the Buffer class as an array with some fancy utility functions. The difference between a regular Java array and a Buffer is that the later can be mapped directly to a piece of native memory instead of being allocated in the JVM heap. This is important for the non-blocking I/O APIs (as well as APIs such as OpenGL or RenderScript) as they need direct native buffers to work efficiently.

A Buffer has three important properties you need to understand;

  • capacity - Indicates how many elements (bytes, ints, doubles etc.) the buffer can contain.
  • position - The position in the buffer where the next read or write will happen.
  • limit - Indicates the index of the first element in the buffer which does not contain valid data. This will always be equal or higher than the position and equal or lower than the capacity.

This means that when you read an element from a Buffer, you will get the next valid element at the position index. After that read operation, the position is incremented by one. You can continue to read (or write) until position is equal to limit. Usually you only need to keep track of the limit and position, as capacity is static and will never change. A newly allocated Buffer has the limit set to its capacity and the position to 0.

The way you usually use a Buffer is by first writing data to it and then read from it (or the inverse). The following code demonstrates how this works.

import java.nio.CharBuffer

const val HELLO_WORLD = "Hello, World!"

fun main() {
    val buffer = CharBuffer.allocate(128)
    println("Buffer before put: capacity: ${buffer.capacity()}, " +
            "position: ${buffer.position()}, limit: ${buffer.limit()}")
    buffer.put(HELLO_WORLD.toCharArray())
    println("Buffer after put: capacity: ${buffer.capacity()}, " +
            "position: ${buffer.position()}, limit: ${buffer.limit()}")
    buffer.flip()
    println("Buffer after flip: capacity: ${buffer.capacity()}, " +
            "position: ${buffer.position()}, limit: ${buffer.limit()}")
    val content = CharArray(buffer.limit())
    buffer.get(content)
    println("Buffer after get: capacity: ${buffer.capacity()}, " +
            "position: ${buffer.position()}, limit: ${buffer.limit()}")
    println("Content read: ${content.joinToString("")}")
}
Kotlin function showing how CharBuffer reading and writing works. Try it here: https://pl.kotl.in/2QSW4oxPK

If you run the code above it will result in the following printout:

Buffer before put: capacity: 128, position: 0, limit: 128
Buffer after put: capacity: 128, position: 13, limit: 128
Buffer after flip: capacity: 128, position: 0, limit: 13
Buffer after get: capacity: 128, position: 13, limit: 13
Content read: Hello, World!
Print output from running the Buffer example above.

This is a simple (and naive) example of how the Buffer works and how to use it. The limit and position properties of a Buffer lets you easily write and read the content, regardless of how much data it is (as long as it is within its capacity). If you would do this with a regular array, you would need to keep track of how much data you have occupied yourself.

The usefulness of Buffer will become more apparent in the next section where we start using the actual non-blocking I/O APIs.

Channels and Selectors

The two most important concepts in the non-blocking I/O API in Java are SelectableChannel and Selector. For a networking server, the channel is equivalent to the Socket or ServerSocket. Once you have a channel, you can register it on a Selector for certain operations. For instance, SocketChannel have three possible operations: connect, read, and write. When the read operation is available on a SocketChannel, it means there is data available to be consumed. This is different from the traditional read() function on an InputStream, as it won't block the calling thread.

The Selector allows you to listen for to the available operations on multiple channels. Every time the select() method on the Selector returns with a value larger than 0 we have a number of channels available with operations ready to be performed.

The select() method found in the Selector is also a blocking call. In the echo-server we will see next, this is the only blocking call in the code. This call is not blocking on an I/O operation though, but is instead waiting for the registered channels to signal that an interesting operation is available. There is also a selectNow() method that is non-blocking and returns the number of available channels at that moment (which will usually be 0), but we won't be using that here.

Server socket and listening for clients

The following code shows how to create a server socket and wait for incoming connection using this non-blocking I/O API.

val selector = Selector.open()
val buffer = ByteBuffer.allocate(1024) // Unused for now
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
serverChannel.bind(InetSocketAddress(9191))
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
var running = true;

while(running) {
  val channelCount = selector.select() // This will block!
  val selectedKeys = selector.selectedKeys()
  selectedKeys.forEach {
    if (it.isAcceptable) {
      val sc = it.channel() as ServerSocketChannel
      val clientChannel = sc.accept()
      if (clientChannel != null) {
        clientChannel.configureBlocking(false)
        clientChannel.register(selector, SelectionKey.OP_READ)
      }
    }
  }
}
Code for setting up a server socket in non-blocking mode and listening for incoming connections

Most of the code is fairly self explanatory, but a few calls might need some additional explanation. The call to configureBlocking(false)  is needed in order to set the socket to a non-blocking mode. Without this, we wouldn't be able to register the channel on our Selector.

The register() call adds our serverChannel as a registered channel in our Selector with the accept operation as the one we are interested in. A registration returns a SelectionKey, which we could keep track of here if want wanted to cancel and close the ServerSocketChannel outside of this thread.

Calling select() will block until a channel triggers a registered operation, in this case a client is trying to connect to the server socket on port 9191.

Next, we fetch all the SelectionKeys that triggered this select() call to return and iterate over them. For each key we check what non-blocking operation is available at this moment. In this case, it will only be the SelectionKey.OP_ACCEPT, since that is the only thing we have registered.

From the key we fetch the channel and call accept() to receive the SocketChannel for the new client. We make a null check and configure the channel to be non-blocking. Finally, we register the read operation (SelectionKey.OP_READ), since we first want to read data from the client.

Read from and write to clients

The principle behind our echo server is that for each client we wait for input, read as much as we can, and then write it back on the same socket. This means that we start with registering each SocketChannel for the OP_READ operation. Once the read operation triggers for a channel, we use the ByteBuffer seen on line 2 in the code sample above to fetch the available data and print it to the console.

Next, we make a new registration for the channel, but this time for OP_WRITE, since we want to echo the data back. As a convenience, the register() method takes an optional third parameter that can be any valid object (Any in Kotlin). We take the data we just read (a regular ByteArray) and pass that as our attachment for the registration. This can then be retrieved once the write operation is triggered.

The code for this looks as follows:

selectedKeys.forEach {
  if (it.isAcceptable) {
    val sc = it.channel() as ServerSocketChannel
    val clientChannel = sc.accept()
    if (clientChannel != null) {
      clientChannel.configureBlocking(false)
      clientChannel.register(selector, SelectionKey.OP_READ)
    }
  }
  
  if (it.isReadable && it.isReadInteresting) {
    val socket = it.channel as SocketChannel
    if (socket.isConnected) {
      socket.read(buffer)
      buffer.flip()
      val content == ByteArray(buffer.limit())
      buffer.get(content)
      val text = String(content, Charsets.UTF8)
      if (text.isNotEmpty()) {
        println("Received: $text")
        socket.register(selector, SelectionKEy.OP_WRITE, content)
      }
      buffer.rewind()
    }
  }
} 
Code showing how to read data from a client and register for a write operation

The if statement for reading data looks a bit more complex than the one for accept, so let's focus on that first.

For the server socket, the only valid operation and what we're interested in is accept. We never read or write on the ServerSocketChannel, we only "accept" new clients. This means it will never have any other operation ready.

For SocketChannels we have two potential operations. The properties isReadable and isWritable will almsot always be true as long as the client is connected. These properties only signal what is possible. In order to avoid writing when we're not supposed to or read before we written some data back, we must combine the condition with the interest operation.

If you look at the API for SelectionKey you'll find that there is no property called isReadInteresting or isWriteInteresting. This is because these are extension properties I added on the SelectionKey class, which look like this.

val SelectionKey.isReadInteresting
    get() = this.interestOps() and SelectionKey.OP_READ == SelectionKey.OP_READ
val SelectionKey.isWriteInteresting
    get() = this.interestOps() and SelectionKey.OP_WRITE == SelectionKey.OP_WRITE
Extension properties on SelectionKey

If we didn't do this, the check for reading would pass even though we're about to write data. The difference between isReadable and isReadInteresting is that the earlier might be true even if we haven't registered for that operation.

The write operation looks similar to the read operation.

if (it.isWritable && it.isWriteInteresting) {
  val socket = it.channel() as SocketChannel
  if (socket.isConnected) {
    val content = it.attachment() as ByteArray
    buffer.put(content)
    println("Wrote ${String(content, Charsets.UTF_8)} on channel")
    buffer.flip()
    socket.write(buffer)
    socket.register(selector, SelectionKey.OP_READ, null)
    buffer.rewind()
  }
}

Once the write operation is complete, we register the channel again for read operations, without an attachment (same as when we received the new connection on the server socket).

Note how the use of the buffer is different in the read and write cases. When reading, we pass the buffer to SocketChannel.read(). This operation is non-blocking (compared to InputStream.read()). We then flip the buffer and copy the content to a ByteArray which is then passed to a String with the UTF-8 encoding.

When writing, we take the ByteArray that was attached to the registration and put it into our ByteBuffer. This will update the position, so we call flip() again to prepare it to be written to our SocketChannel.

Once we're done with the reading from or writing to a channel , we call rewind() to reset the position and limit of the Buffer. This makes the Buffer ready for the next read or write operation from another client.

We now have a simple echo server that can handle multiple simultaneous client connections on a single thread, all thanks to the non-blocking I/O API.

But why?

As I mentioned earlier, most developers should never have to write code using this API. The non-blocking API in java.nio is mostly used for implementing high-performance web servers. Most Java-based application servers (Tomcat, Jetty etc.) uses this to implement their I/O code.

The most obvious advantage here is that we can keep the number of threads down to a minimum. We no longer need one thread per client connection, since there is no longer any blocking calls involved. In the case of our simple echo server it is sufficient to have only a single thread, since we're not doing any additional processing of the content from the clients.

For more complex scenarios, like web servers and such, there might be multiple threads calling Selector.select() to improve performance, and usually a thread pool that does the actual processing of the channels. This will still require much fewer threads than having a dedicated thread per client, since we will only block on the select() call.

Using non-blocking I/O requires fewer threads than blocking I/O, but this will only become relevant when we have a lot of threads involved. While it would be possible to write a client-side library, like OkHttp or Okio, using non-blocking I/O APIs, it is unlikely that we would have any significant performance gains. Also, since the API is a bit more complicated, it would most likely require much more effort to develop and maintain.

Apart from the reduced thread count, we also have the opportunity to reduce the memory overhead. In our echo server example, we only have a single ByteBuffer for reading from and writing to all the clients. In a more traditional blocking I/O echo server, we would need to have one ByteBuffer for every client.

Note that it would be possible to write a blocking echo server without any additional memory allocations by simply reading one byte at a time and immediately write it back to the client. However, that would be fairly inefficient in the real world.

You might say that we still allocate a ByteArray for each read operation, which is true. However, these are fairly short lived and only take as much memory as they need. Since the server doesn't know in advance how much data the client is sending, we will always need a large enough buffer to handle the relevant cases. Again, this memory optimisation only becomes relevant in server-side application with lots of clients.

Conclusions

The non-blocking API is really powerful, but doesn't motivate the usage unless you're dealing with server-side code with lots of concurrent clients. Of course, there might be exception that apply to client-side code as well.

However, it is important to have an understanding of this technique as a developer, even if you rarely would come across it. Also, the pattern used here is found in other APIs as well. The non-blocking API is basically a form of a reactive approach to I/O. Instead of pulling data from a stream, we have it "pushed" to us as it becomes available.

The echo server described here can be found in this GitHub gist. It is not a perfect example of how to use it, but it is simple enough to get familiar with the APIs and how they work. If you're interested in playing around with the non-blocking I/O API in Java, this can be a good place to start.