Reactive Streams Over the Network With RSocket
RSocket is a protocol that allows you to reactively stream data over the network. One of the benefits of RSocket is that the header of the frame itself is being sent in binary. This reduces the overal network payload and decreases network latency.
SETTING UP YOUR PRODUCER
The first step to set up our project is to head over to Spring Initializr and select the RSocket dependency (
spring-boot-starter-rsocket). Just like with R2DBC, this feature relies on Spring boot 2.2.x, so make sure to select that as well. Additionally, I’ll use Lombok, but you don’t need it.
If you prefer to set up your project manually, you can do so by adding the following dependency:
The next step is to decide on which port you want to run your RSocket server. You can change this by setting the
In this example, I’ll run the application on port 8000.
CREATING A CONTROLLER
Working with RSocket is similar to working with other messaging protocols, first you have to create a class representing the data you want to transfer (eg. a DTO). For example:
After that, we can create a controller and define the endpoints we want to provide by using the
Now that we’ve defined our producer, we’re basically ready to send some messages into the world!
SETTING UP YOUR CUSTOMER
Setting up the consumer happens in a similar way. For the consumer you also need a Spring boot project containing the RSocket dependency (
The next step is to set up our
RSocket client, for example:
As you can see in the code above, we’re telling RSocket that we’ll be sending JSON payloads and that we should connect to port 8000. By using
PayloadDecoder.ZERO_COPY we tell the RSocket client that the incoming payloads won’t be copied, which will increase the performance as mentioned in the RSocket documentation.
Additionally, we’re using the
cache() operator so that the cold observable turns into a hot one, which means that if multiple beans autowire and subscribe to this
RSocket reactive stream, the upstream source will only be created once and cached for all other subscribers. The benefit of that is that we’re only creating one
After that, we should wrap the
RSocket instance within Spring’s
RSocketRequester, which provides a more fluent API for requesting data from RSocket. To do that, I’m going to create the following method:
The parameters provided to this method are the
RSocket reactive stream we created in our previous method, and
RSocketStrategies, which is a bean created by the RSocket autoconfiguration.
The reason we’re wrapping
RSocket is because RSocket does support reactive streams, but doesn’t contain the types introduced by Project Reactor, such as
Flux, additionally, we would have to do the mapping to
PersonMessage objects by ourselves.
With the wrapper on the other hand, we could write our code like this:
CONNECTING TO THE CUSTOMER
Now that we’ve defined all the building blocks to connect to our RSocket server, we could write an
ApplicationRunner that fetches the data. For example:
Theoretically, this code should work. One issue with this code is that Spring will kill the application as soon as the main thread is no longer occupied. Considering that we’re using reactive streams, which are non-blocking and asynchronous by nature, the application would be killed before we even obtain a single object.
To solve this issue, we can use a
CountDownLatch, set it to 1 entry, and to count down to zero as soon as the the reactive stream is complete.
As long as the
CountDownLatch doesn’t count down to zero, the application will keep running.
If we run both applications now, we’ll see that the
PersonMessage objects appear in the console of the consumer application.
CBOR INSTEAD OF JSON
As you’ve seen in the previous section, we’ve set up the RSocket client to work over TCP, and to request and parse the body as JSON. While RSocket defined the structure of the frame (which should be binary), you’re free to send any body you’d like.
Another choice for encoding and decoding objects is the use of CBOR or the Concise Binary Object Representation. CBOR is loosely based on JSON, but provides a more concise format.
To make this work on the consumer-end, we have to change the mediatypes to
application/cbor. For example:
Additionally, we should change the mimetype in the wrapper as well:
On the producer-end, nothing has to change, since the initial request made by the consumer includes the requested mimetype, and the producer is already setup to support CBOR out of the box.
If you run the application again, you shouldn’t be surprised that there’s no difference at all. Under the hood however, we’re now encoding messages as CBOR, and no longer as JSON.
With RSocket, we have a proper alternative to WebSockets or Server Sent Events when it comes to sending data reactively over the network. The support within Spring boot is already working properly to cover most aspects, though it hasn’t matured yet.
As usual, the code can be found on GitHub. There are a few differences between this project and the code mentioned in this tutorial, as the GitHub project contains both the producer and the consumer within the same Maven project. Additionally, the data comes from a database rather than using