Non-blocking I/O
In this post I'll explain what non-blocking I/O is, how you can use it, and when it might be useful. The code examples will be in Kotlin and running on the JVM, but the principles behind this would work regardless of programming language as this is usually supported through the kernel of your OS.
In an earlier post I explained why it is important to separate I/O work from CPU-intensive work. I hinted then about non-blocking I/O and how that would change the behaviour of threads from the traditional blocking I/O operations.
In this post I'll explain what non-blocking I/O is, how you can use it, and when it might be useful. The code examples will be in Kotlin and running on the JVM, but the principles behind this would work regardless of programming language as this is usually supported through the kernel of your OS.
Why blocking can be bad
Defining blocking and non-blocking function calls can be a bit tricky. While time is a factor, it isn't necessarily so that a function that takes a long time is blocking while a fast running call is non-blocking. Reading a single byte from a file on a modern hard drive is usually really fast, but it is still a blocking call. Iterating over a huge array can take much longer, but is usually not considered a blocking call.
In the context of this post, I would like to define blocking calls as those that block outside of what the developer can affect, while a non-blocking call is the opposite. The developer can't change the fact that there will be IOwait states when reading a single byte from a file, but she can optimise the iteration of the huge array and there are no IOwaits involved.
Blocking is not always a bad thing. Often we need this as it is the only way to know when an operation is completed and there is a result to process. But having too many blocking calls means we need a dedicated thread for each of them. That can lead to resource waste and potentially slowing down the application when it needs context switching between thousands of running threads.
We will rarely find that non-blocking I/O is useful in client-side applications, especially when it comes to the type of I/O we usually do (HTTP calls, database queries, etc.). Non-blocking I/O is much more useful in server-side code when dealing with potentially thousands of parallel client requests.
A non-blocking echo server
The simplest possible server we can implement is an echo server, where a client connects to a TCP socket and transmits a piece of text which is then echoed back from the server on the same socket. I will show the code for this using Kotlin targeting the JVM platform. The principals about how this works is the same regardless of which programming language you would use, as long as the underlying platform supports non-blocking I/O.
java.nio.Buffer
One of the core components of the non-blocking API in Java is the Buffer
class. You might have been using this already in Android applications, especially if you've been writing code for the camera, or used RenderScript or OpenGL. A Buffer
is a container for data of a primitive type. You can find subclasses for all the Java primitives (ByteBuffer
, IntBuffer
, DoubleBuffer
etc.) in the same package. You can think of the Buffer
class as an array with some fancy utility functions. The difference between a regular Java array and a Buffer
is that the later can be mapped directly to a piece of native memory instead of being allocated in the JVM heap. This is important for the non-blocking I/O APIs (as well as APIs such as OpenGL or RenderScript) as they need direct native buffers to work efficiently.
A Buffer
has three important properties you need to understand;
capacity
- Indicates how many elements (bytes, ints, doubles etc.) the buffer can contain.position
- The position in the buffer where the next read or write will happen.limit
- Indicates the index of the first element in the buffer which does not contain valid data. This will always be equal or higher than theposition
and equal or lower than thecapacity
.
This means that when you read an element from a Buffer
, you will get the next valid element at the position
index. After that read operation, the position
is incremented by one. You can continue to read (or write) until position
is equal to limit
. Usually you only need to keep track of the limit
and position
, as capacity
is static and will never change. A newly allocated Buffer
has the limit
set to its capacity
and the position
to 0
.
The way you usually use a Buffer
is by first writing data to it and then read from it (or the inverse). The following code demonstrates how this works.
import java.nio.CharBuffer
const val HELLO_WORLD = "Hello, World!"
fun main() {
val buffer = CharBuffer.allocate(128)
println("Buffer before put: capacity: ${buffer.capacity()}, " +
"position: ${buffer.position()}, limit: ${buffer.limit()}")
buffer.put(HELLO_WORLD.toCharArray())
println("Buffer after put: capacity: ${buffer.capacity()}, " +
"position: ${buffer.position()}, limit: ${buffer.limit()}")
buffer.flip()
println("Buffer after flip: capacity: ${buffer.capacity()}, " +
"position: ${buffer.position()}, limit: ${buffer.limit()}")
val content = CharArray(buffer.limit())
buffer.get(content)
println("Buffer after get: capacity: ${buffer.capacity()}, " +
"position: ${buffer.position()}, limit: ${buffer.limit()}")
println("Content read: ${content.joinToString("")}")
}
If you run the code above it will result in the following printout:
Buffer before put: capacity: 128, position: 0, limit: 128
Buffer after put: capacity: 128, position: 13, limit: 128
Buffer after flip: capacity: 128, position: 0, limit: 13
Buffer after get: capacity: 128, position: 13, limit: 13
Content read: Hello, World!
This is a simple (and naive) example of how the Buffer
works and how to use it. The limit
and position
properties of a Buffer
lets you easily write and read the content, regardless of how much data it is (as long as it is within its capacity). If you would do this with a regular array, you would need to keep track of how much data you have occupied yourself.
The usefulness of Buffer
will become more apparent in the next section where we start using the actual non-blocking I/O APIs.
Channels and Selectors
The two most important concepts in the non-blocking I/O API in Java are SelectableChannel
and Selector
. For a networking server, the channel is equivalent to the Socket
or ServerSocket
. Once you have a channel, you can register it on a Selector
for certain operations. For instance, SocketChannel
have three possible operations: connect, read, and write. When the read operation is available on a SocketChannel
, it means there is data available to be consumed. This is different from the traditional read()
function on an InputStream
, as it won't block the calling thread.
The Selector
allows you to listen for to the available operations on multiple channels. Every time the select()
method on the Selector
returns with a value larger than 0
we have a number of channels available with operations ready to be performed.
The select()
method found in the Selector
is also a blocking call. In the echo-server we will see next, this is the only blocking call in the code. This call is not blocking on an I/O operation though, but is instead waiting for the registered channels to signal that an interesting operation is available. There is also a selectNow()
method that is non-blocking and returns the number of available channels at that moment (which will usually be 0
), but we won't be using that here.
Server socket and listening for clients
The following code shows how to create a server socket and wait for incoming connection using this non-blocking I/O API.
val selector = Selector.open()
val buffer = ByteBuffer.allocate(1024) // Unused for now
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
serverChannel.bind(InetSocketAddress(9191))
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
var running = true;
while(running) {
val channelCount = selector.select() // This will block!
val selectedKeys = selector.selectedKeys()
selectedKeys.forEach {
if (it.isAcceptable) {
val sc = it.channel() as ServerSocketChannel
val clientChannel = sc.accept()
if (clientChannel != null) {
clientChannel.configureBlocking(false)
clientChannel.register(selector, SelectionKey.OP_READ)
}
}
}
}
Most of the code is fairly self explanatory, but a few calls might need some additional explanation. The call to configureBlocking(false)
is needed in order to set the socket to a non-blocking mode. Without this, we wouldn't be able to register the channel on our Selector
.
The register()
call adds our serverChannel
as a registered channel in our Selector
with the accept operation as the one we are interested in. A registration returns a SelectionKey
, which we could keep track of here if want wanted to cancel and close the ServerSocketChannel
outside of this thread.
Calling select()
will block until a channel triggers a registered operation, in this case a client is trying to connect to the server socket on port 9191
.
Next, we fetch all the SelectionKey
s that triggered this select()
call to return and iterate over them. For each key we check what non-blocking operation is available at this moment. In this case, it will only be the SelectionKey.OP_ACCEPT
, since that is the only thing we have registered.
From the key we fetch the channel and call accept()
to receive the SocketChannel
for the new client. We make a null
check and configure the channel to be non-blocking. Finally, we register the read operation (SelectionKey.OP_READ
), since we first want to read data from the client.
Read from and write to clients
The principle behind our echo server is that for each client we wait for input, read as much as we can, and then write it back on the same socket. This means that we start with registering each SocketChannel
for the OP_READ
operation. Once the read operation triggers for a channel, we use the ByteBuffer
seen on line 2 in the code sample above to fetch the available data and print it to the console.
Next, we make a new registration for the channel, but this time for OP_WRITE
, since we want to echo the data back. As a convenience, the register()
method takes an optional third parameter that can be any valid object (Any
in Kotlin). We take the data we just read (a regular ByteArray
) and pass that as our attachment for the registration. This can then be retrieved once the write operation is triggered.
The code for this looks as follows:
selectedKeys.forEach {
if (it.isAcceptable) {
val sc = it.channel() as ServerSocketChannel
val clientChannel = sc.accept()
if (clientChannel != null) {
clientChannel.configureBlocking(false)
clientChannel.register(selector, SelectionKey.OP_READ)
}
}
if (it.isReadable && it.isReadInteresting) {
val socket = it.channel as SocketChannel
if (socket.isConnected) {
socket.read(buffer)
buffer.flip()
val content == ByteArray(buffer.limit())
buffer.get(content)
val text = String(content, Charsets.UTF8)
if (text.isNotEmpty()) {
println("Received: $text")
socket.register(selector, SelectionKEy.OP_WRITE, content)
}
buffer.rewind()
}
}
}
The if
statement for reading data looks a bit more complex than the one for accept, so let's focus on that first.
For the server socket, the only valid operation and what we're interested in is accept. We never read or write on the ServerSocketChannel
, we only "accept" new clients. This means it will never have any other operation ready.
For SocketChannel
s we have two potential operations. The properties isReadable
and isWritable
will almsot always be true as long as the client is connected. These properties only signal what is possible. In order to avoid writing when we're not supposed to or read before we written some data back, we must combine the condition with the interest operation.
If you look at the API for SelectionKey
you'll find that there is no property called isReadInteresting
or isWriteInteresting
. This is because these are extension properties I added on the SelectionKey
class, which look like this.
val SelectionKey.isReadInteresting
get() = this.interestOps() and SelectionKey.OP_READ == SelectionKey.OP_READ
val SelectionKey.isWriteInteresting
get() = this.interestOps() and SelectionKey.OP_WRITE == SelectionKey.OP_WRITE
If we didn't do this, the check for reading would pass even though we're about to write data. The difference between isReadable
and isReadInteresting
is that the earlier might be true even if we haven't registered for that operation.
The write operation looks similar to the read operation.
if (it.isWritable && it.isWriteInteresting) {
val socket = it.channel() as SocketChannel
if (socket.isConnected) {
val content = it.attachment() as ByteArray
buffer.put(content)
println("Wrote ${String(content, Charsets.UTF_8)} on channel")
buffer.flip()
socket.write(buffer)
socket.register(selector, SelectionKey.OP_READ, null)
buffer.rewind()
}
}
Once the write operation is complete, we register the channel again for read operations, without an attachment (same as when we received the new connection on the server socket).
Note how the use of the buffer
is different in the read and write cases. When reading, we pass the buffer
to SocketChannel.read()
. This operation is non-blocking (compared to InputStream.read()
). We then flip the buffer and copy the content to a ByteArray
which is then passed to a String
with the UTF-8 encoding.
When writing, we take the ByteArray
that was attached to the registration and put it into our ByteBuffer
. This will update the position
, so we call flip()
again to prepare it to be written to our SocketChannel
.
Once we're done with the reading from or writing to a channel , we call rewind()
to reset the position
and limit
of the Buffer
. This makes the Buffer
ready for the next read or write operation from another client.
We now have a simple echo server that can handle multiple simultaneous client connections on a single thread, all thanks to the non-blocking I/O API.
But why?
As I mentioned earlier, most developers should never have to write code using this API. The non-blocking API in java.nio
is mostly used for implementing high-performance web servers. Most Java-based application servers (Tomcat, Jetty etc.) uses this to implement their I/O code.
The most obvious advantage here is that we can keep the number of threads down to a minimum. We no longer need one thread per client connection, since there is no longer any blocking calls involved. In the case of our simple echo server it is sufficient to have only a single thread, since we're not doing any additional processing of the content from the clients.
For more complex scenarios, like web servers and such, there might be multiple threads calling Selector.select()
to improve performance, and usually a thread pool that does the actual processing of the channels. This will still require much fewer threads than having a dedicated thread per client, since we will only block on the select()
call.
Using non-blocking I/O requires fewer threads than blocking I/O, but this will only become relevant when we have a lot of threads involved. While it would be possible to write a client-side library, like OkHttp or Okio, using non-blocking I/O APIs, it is unlikely that we would have any significant performance gains. Also, since the API is a bit more complicated, it would most likely require much more effort to develop and maintain.
Apart from the reduced thread count, we also have the opportunity to reduce the memory overhead. In our echo server example, we only have a single ByteBuffer
for reading from and writing to all the clients. In a more traditional blocking I/O echo server, we would need to have one ByteBuffer
for every client.
Note that it would be possible to write a blocking echo server without any additional memory allocations by simply reading one byte at a time and immediately write it back to the client. However, that would be fairly inefficient in the real world.
You might say that we still allocate a ByteArray
for each read operation, which is true. However, these are fairly short lived and only take as much memory as they need. Since the server doesn't know in advance how much data the client is sending, we will always need a large enough buffer to handle the relevant cases. Again, this memory optimisation only becomes relevant in server-side application with lots of clients.
Conclusions
The non-blocking API is really powerful, but doesn't motivate the usage unless you're dealing with server-side code with lots of concurrent clients. Of course, there might be exception that apply to client-side code as well.
However, it is important to have an understanding of this technique as a developer, even if you rarely would come across it. Also, the pattern used here is found in other APIs as well. The non-blocking API is basically a form of a reactive approach to I/O. Instead of pulling data from a stream, we have it "pushed" to us as it becomes available.
The echo server described here can be found in this GitHub gist. It is not a perfect example of how to use it, but it is simple enough to get familiar with the APIs and how they work. If you're interested in playing around with the non-blocking I/O API in Java, this can be a good place to start.