Going reactive with PostgreSQL, Apache AGE and MyBatis

In my previous posts I described how to use MyBatis to interact with Apache AGE, the Graph database extension for PostgreSQL. In this post we will see how to write a reactive API using Spring WebFlux and MyBatis.

Traditional Spring Web APIs

If you have played with the Spring ecosystem and the Spring Boot framework you are probably used to the "traditional" way of building web applications and APIs, that is based on the Spring Web library built on top of Java Servlets.

I call these traditional because using Servlets has been the standard for almost the last two decades, and this is still the default way of building web applications in Java, being them built to run on Tomcat or on a Java Enterprise Edition application server like WebLogic or WebSphere.

There is nothing wrong on developing application with these building blocks, but in recent years the technology has changed a lot, and the processors inside computers are not scaling anymore "vertically" (by increasing the frequency) but "horizontally" (by increasing the number of cores). This means that the way we build applications needs to change to better exploit the power of these new CPUs.

In addition to this, servlets are still built on top of a mainly synchronous stack, so every request that comes to the application server keeps one thread busy until a response is sent back to the user. Servlet 3.0 and 3.1 are an evolution toward async execution but most of the business logic invoked by the servlets themselves is often still synchronous. This means that the application is still keeping threads busy while processing requests, and this limits the amount of calls that the application server is able to process and increases the resource utilization on the hardware.

The reactive evolution

In recent years the reactive approach was proposed and a few solutions have emerged to overcome this problem and increase the level of parallel calls while improving the overall utilization of the available processing power. Example of these libraries are Eclipse Vert.x, RxJava and Spring's own Project Reactor.

The latter is now part of the Spring Boot framework and you can choose which kind of application to build by swapping the starter dependencies of your project and following the different development paths related to the Web (Servlet) and WebFlux (Reactive) approaches.

All of these libraries are built on top of reactive streams, a new paradigm that changes the way information is processed. In a reactive stream, you reason in term of flow, the stream itself, and every computation should be thought in terms of processing one or more elements of a stream through Producers and Consumers.

In a stream context, you can have functions returning zero, one or N elements, and the basic types to work with a stream are Mono<T> and Flux<T>, with Mono being a stream of zero or one element, and flux being a stream of zero to N elements.

A stream should be thought as a pipeline, where elements enter on one side, are filtered and processed on one or more steps and then exit from the stream on the other side.

The main concept to keep in mind is that nothing happens inside the stream if no one subscribe to the output of the stream, so if you build the chain of calls to services and databases, but you don't subscribe to use the output of the stream, nothing gets invoked and processed.

This approach is more declarative, meaning that you declare the way every item of the stream should be processed by defining the steps of the pipeline that the item should travel from the Producer to the Consumer.

Another important concept is that the steps of the computation are not bound to a thread, and the reactive stream is built to use only a small amount of threads. This means that when a call reaches the endpoint, the application server (that is not Tomcat anymore but can be Netty or Undertow) takes the request and routes it to the business logic stream, then frees the thread that is immediately able to serve another request. When the backend business logic starts producing a response, the stream pipeline is followed backwards until a result is ready to be served back to the caller.

This way of processing, in addition to a better resource utilization, allows other features like backpressure, so that when the server reaches its limits, new calls are discarded after a timeout, preventing overload and denial of service.

Reactive Hello World with Spring Boot

To create a reactive Spring Boot application you can start with the Spring Initializer by selecting the Spring Reactive Web dependency.

Creating a Spring Reactive Web applicationT

Then pressing the Generate button at the bottom of the page you will get the template project ready to be imported inside your IDE of choice.

To add a simple HelloWorldController to the project, just create a new Class like the following:

@RestController
public class HelloController {

    @GetMapping("/")
    public Mono<String> hello(String name) {
        return Mono.just("Hello ".concat(Optional.ofNullable(name).orElse("World!")));
    }
}
The HelloWorldController

As you can see the Controller is very similar to a standard Spring @RestController, with the only exception of returning a Mono<String> instead of a plain String.

If you then start the project and point your browser to http://localhost:8080/?name=Fabio you should get the greeting Hello Fabio, or if you skip the name parameter you will get the Hello World! response.

Reactive DB access with R2DBC

When building a reactive application, as discussed, the entire processing chain should be reactive, meaning that no blocking calls should be used. This means that if you need to access a database in a reactive stream, you should not use JDBC, because it is built on a synchronous stack.

To solve this issue, the R2DBC project was created to build a new stack to interact with databases in a reactive fashion. The main databases are now supported, so your application can use MySQL, MariaDB, PostgreSQL, SQL Server, Oracle Database and other databases in a reactive way.

MyBatis in a reactive way

As discussed in other posts, my library of choice when dealing with databases is MyBatis, but as of now, it doesn't yet support reactive database access out of the box. To enable the support for reactive streams, I found a compatibility library that does just that, adds compatibility with both the R2DBC library and Spring.

GitHub - chenggangpro/reactive-mybatis-support: reactive mybatis support for reactive project using r2dbc
reactive mybatis support for reactive project using r2dbc - GitHub - chenggangpro/reactive-mybatis-support: reactive mybatis support for reactive project using r2dbc

The good part of this library is that you can keep using MyBatis with almost no changes to the standard approach.

The differences are in the application.properties settings and in the signature of Mappers and TypeHandlers, the rest is pretty unchanged.

To add the library to your project (I use gradle but of course it works the same with Maven) you just have to add the dependencies to mybatis-r2dbc and mybatis-r2dbc-spring to the build.gradle file inside the dependencies

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'pro.chenggang:mybatis-r2dbc:1.0.8.RELEASE'
    implementation 'pro.chenggang:mybatis-r2dbc-spring:1.0.8.RELEASE'
}
The build.gradle file for the project

Then in the application.properties file add the configurations for the database connection and mapper location.

spring.r2dbc.mybatis.r2dbc-url=r2dbc:postgresql://localhost:5432/postgres
spring.r2dbc.mybatis.username=postgres
spring.r2dbc.mybatis.password=postgres

r2dbc.mybatis.mapper-locations=classpath:mappers/*.xml
The application.properties file for the project

These are the main requirements to get MyBatis to work with reactive applications. After completing these steps we can create our mappers and start using them.

In our example we will build a couple of calls to retrieve data from a PostgreSQL database through Spring WebFlux.

The example refers to a SQL table named person that is defined as follows:

CREATE TABLE person (
    id UUID NOT NULL,
    name VARCHAR(100) NOT NULL,
    age INTEGER NOT NULL,
    PRIMARY KEY (id)
)

Let's start with the mapper XML declaration.

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE mapper
 PUBLIC "-//ibatis.apache.org//DTD Mapper 3.0//EN"
 "http://ibatis.apache.org/dtd/ibatis-3-mapper.dtd">

<mapper namespace="net.fabiomarini.r2dbcdemo.mapper.PersonMapper">

	<select id="findAllByName" resultType="net.fabiomarini.r2dbcdemo.model.Person">
		select id, name, age
		from person
		<where>
			<if test="name != null and name != ''">name = #{name}</if>
		</where>
        <if test="maxItems != null">limit #{maxItems}</if>
	</select>
	<select id="countByName" resultType="java.lang.Long">
		select count(*)
		from person
		<where>
			<if test="name != null and name != ''">name = #{name}</if>
		</where>
	</select>
</mapper>
The PersonMapper XML declaration

Nothing too fancy here, just a couple of queries to retrieve all records from the person table or just the ones referring to a person with a specific name and to count the results of the latter.

These queries can be invoked through the Mapper interface named PersonMapper here described.

@Mapper
public interface PersonMapper {

    Flux<Person> findAllByName(@Param("name") String name,
                               @Param("maxItems") Integer maxItems
    );

    Mono<Long> countByName(@Param("name") String name);
}
The PersonMapper Mapper interface

Here the only difference you see from a standard Mapper interface is the usage of the Flux<Person> and Mono<Person> return types. The parameters are passed the same way as in the standard MyBatis.

Now to expose the queries to the world via REST APIs we need to declare a @RestController that calls the mapper methods. Here is the example controller:

@RestController("/persons")
public class PersonController {

    private final PersonMapper personMapper;

    public DemoController(PersonMapper personMapper) {
        this.personMapper = personMapper;
    }

    @GetMapping("/")
    public Flux<Person> getPersons(String name) {
        return personMapper.findAllByName(name, null);
    }

    @GetMapping("/paged")
    public Mono<SizedResponse<List<Person>>> getPagedPersons(String name, Integer maxItems) {
        return Mono.zip(
            personMapper.findAllByName(name, maxItems).collectList(),
            personMapper.countByName(name)
        ).map(o -> new SizedResponse<>(o.getT1(), o.getT2()));
    }

}
The PersonController REST Controller

In this controller we can see a couple of example of basic reactive APIs, the getPersons method calls the mapper by passing it the name parameter and returning the list of results if any.

The second method, getPagedPersons does a similar job but it uses the Mono.zip method to bind together two different calls, one to the findAllByName query that looks for a page of result with maxItems results at most, and the other to the countByName query that return the total records matching the query. The result is then collected inside a SizedResponse custom object that contains both the data and the total count.

public class SizedResponse<T> {

    private final T data;

    private final Long count;

    public SizedResponse(T data, Long count) {
        this.data = data;
        this.count = count;
    }

    public T getData() {
        return data;
    }

    public Long getCount() {
        return count;
    }
}
The SizedResponse object

Interacting with Apache AGE in a reactive API

Now that we have seen how to make MyBatis to work inside a Spring WebFlux application, let's see how to interact with Apache AGE inside the same application.

First we need to define a graph in AGE and create a couple of Item nodes inside it.

SELECT create_graph('playground');

SELECT * from cypher('playground', $$
  CREATE (i:Item { code: '1001', name: 'Item 1'})
  RETURN i
$$) as (a agtype);

SELECT * from cypher('playground', $$
  CREATE (i:Item { code: '1002', name: 'Item 2'})
  RETURN i
$$) as (a agtype);

SELECT * from cypher('playground', $$
  CREATE (i:Item { code: '1003', name: 'Item 3'})
  RETURN i
$$) as (a agtype);

Then we define our XML Mapper with the query for MyBatis:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!DOCTYPE mapper
 PUBLIC "-//ibatis.apache.org//DTD Mapper 3.0//EN"
 "http://ibatis.apache.org/dtd/ibatis-3-mapper.dtd">

<mapper namespace="net.fabiomarini.r2dbcdemo.mapper.ItemMapper">

	<select id="findAll" resultType="net.fabiomarini.r2dbcdemo.model.Item">
		select * from ag_catalog.cypher('playground', $$
			MATCH (i:Item)
			<if test="!code.isNull()">WHERE i.code = ${code}</if>
			RETURN id(i), properties(i)
		<if test="maxItems > 0">LIMIT ${maxItems}</if>
		$$) as (id bigint, properties ag_catalog.agtype)
	</select>

</mapper>
The ItemMapper XML query definition

As you can see we have a simple Cypher query that looks for nodes labelled as Item and if we pass a code it filters the node with the given code. In addition to this if we pass a valid maxItems parameter we also limit the amount of items to the given value.

This is a quite standard MyBatis XML aside from the parameters that are handled with the ${} placeholder through the AgtypeWrapper we built in previous article and that you can find in my age-utils repository on GitHub.

The mapper interface for the previous query is the following

@Mapper
public interface ItemMapper {

    Flux<Item> findAll(
        @Param("code") AgtypeWrapper<String> code,
        @Param("maxItems") AgtypeWrapper<Integer> maxItems
    );

}
The ItemMapper interface

Here you can see that the mapper returns a Flux<Item> where an Item is defined as an extension to a GraphItem<ItemProperties> object

public class Item extends GraphItem<ItemProperties> {

}
The Item class

and the GraphItem<T> that is also part of the age-utils repository is defined as follows:

public class GraphItem<T> {
    private long id;
    private T properties;

    public GraphItem() {
    }

    public long getId() {
        return this.id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public T getProperties() {
        return this.properties;
    }

    public void setProperties(T properties) {
        this.properties = properties;
    }
}
The GraphItem class

while the ItemProperties type is a Java bean representing the properties of an Item node.

public class ItemProperties {

    private String code;

    private String name;

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
The ItemProperties class

The call to the API is then managed through an ItemController like the following:

@RestController("/items")
public class ItemController {

    private final ItemMapper itemMapper;

    public DemoController(ItemMapper itemMapper) {
        this.itemMapper = itemMapper;
    }

    @GetMapping("/")
    public Flux<Item> getItems(String code, Integer count) {
        return itemMapper.findAll(
                AgtypeWrapper.from(code),
                AgtypeWrapper.from(Optional.ofNullable(count).orElse(5))
        );
    }

}
The ItemController REST Controller

The getItems method takes two optional parameters that are passed to the mapper via the AgtypeWrapper that handles the conversion to make it compatible with AGE and the Cypher query.

Handling the result types of reactive queries

If you try to run the previous code right now you will get an error from the application, because MyBatis doesn't yet know how to handle the results of the query and how to map it to an Item. To fix this issue we need to use the JsonTypeHandler defined in previous article and extend it to make it compatible with the mybatis-r2dbc library.

Just as a reference, the JsonTypeHandler is defined as follows:

public class JsonTypeHandler<T> extends BaseTypeHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(JsonTypeHandler.class);
    private static final ThreadLocal<ObjectMapper> objectMapper = ThreadLocal.withInitial(() -> {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        return objectMapper;
    });

    @Override
    public void setNonNullParameter(PreparedStatement preparedStatement, int i, Object t, JdbcType jdbcType) throws SQLException {
        try {
            preparedStatement.setString(i, objectMapper.get().writeValueAsString(t));
        } catch (JsonProcessingException e) {
            throw new SQLException(e);
        }
    }

    @Override
    public T getNullableResult(ResultSet resultSet, String columnName) throws SQLException {
        String jsonString = resultSet.getString(columnName);
        return readFromString(jsonString);
    }

    @Override
    public T getNullableResult(ResultSet resultSet, int columnIdx) throws SQLException {
        String jsonString = resultSet.getString(columnIdx);
        return readFromString(jsonString);
    }

    @Override
    public T getNullableResult(CallableStatement callableStatement, int columnIdx) throws SQLException {
        String jsonString = callableStatement.getString(columnIdx);
        return readFromString(jsonString);
    }

    protected T readFromString(String jsonString) {
        if (jsonString == null) {
            return null;
        }
        try {
            return (T) objectMapper.get().readValue(jsonString, new TypeReference<T>() {});
        } catch (Exception e) {
            logger.error("Error converting JSON value", e);
            return null;
        }
    }
}
The JsonTypeHandler class

It is just a standard MyBatis type handler that leverages the Jackson library to convert a JSON object to the corresponding Java type.

The reactive library adds a layer on top of standard TypeHandlers so we need to extend the previous class to make it compatible with the new reactive paradigm.

public abstract class JsonR2dbcTypeHandlerAdapter<T> extends JsonTypeHandler<T> implements R2dbcTypeHandlerAdapter<T> {

    @Override
    public abstract Class<T> adaptClazz();

    @Override
    public void setParameter(Statement statement, ParameterHandlerContext parameterHandlerContext, T parameter
    ) {
        statement.bind(parameterHandlerContext.getIndex(), parameter.toString());
    }

    @Override
    public T getResult(Row row, RowMetadata rowMetadata, String columnName) {
        String value = row.get(columnName, String.class);
        return value != null ? readFromString(value) : null;
    }

    @Override
    public T getResult(Row row, RowMetadata rowMetadata, int columnIndex) {
        String value = row.get(columnIndex, String.class);
        return value != null ? readFromString(value) : null;
    }

}
The JsonR2dbcTypeHandlerAdapter class

This is the JsonR2dbcTypeHandlerAdapter class that extends the previous JsonTypeHandler, and as you can see it is abstract, because to make it work correctly with mybatis-r2dbc we need to implement a type handler adapter for each specific Java type.

I extended the JsonTypeHandler instead of creating a different object for two main reasons: to reuse code as much as possible and because mybatis-r2dbc still leverages TypeHandlers in addition to the R2dbcTypeHandlerAdapter if you use result maps.

We don't need to handle the entire Item type, but just the ItemProperties because the result class is built through reflection at the property level so we just need to tell the library how to manage the internal property types if they are not base Java types.

The final implementation that is able to handle our ItemProperties type is the following:

public class ItemPropertiesTypeHandlerAdapter extends JsonR2dbcTypeHandlerAdapter<ItemProperties>{
    @Override
    public Class<ItemProperties> adaptClazz() {
        return ItemProperties.class;
    }
}

As you can see it just extends the JsonR2dbcTypeHandlerAdapter specifying our custom type. We need to override the adaptClazz method because it is abstract in the parent class.

With all these in place we can now call our API and get the results:

curl http://localhost:8080/items/\?code\=1001
[{
    "id":844424930131976,
    "properties":{
        "code":"1001",
        "name":"Item 1"
    }
}]

I hope you find this article useful to get started both with MyBatis and Spring WebFlux, and to start playing with Apache AGE in your reactive applications.