Chapter 4. Dealing with Persistence, State, and Clients
There is no serious enterprise-grade application without persistent data. And the same is true for microservices-based applications. Event sourcing and CQRS are fundamental concepts behind Lagom’s support for services that store information. To understand these concepts, it is recommended that you read Lagom’s CQRS and Event Sourcing documentation.
When using event sourcing, all changes are captured as domain events, which are immutable facts of things that have happened. The persistent equivalent is called an AggregateRoot (PersistentEntity
). This is a cluster of domain objects that can be treated as a single unit. An example may be a piece of cargo and its transport legs, which will be separate objects, but it’s useful to treat the cargo (together with its transport legs) as a single aggregate. The aggregate can reply to queries for a specific identifier but it cannot be used for serving queries that span more than one aggregate. Therefore, you need to create another view of the data that is tailored to the queries that the service provides (see Figure 4-1). This separation of the write-side and the read-side of the persistent data is often referred to as the CQRS pattern.
To implement persistence in Lagom you have to implement a class that extends PersistentEntity<Command, Event, State>
.
If you are familiar with JPA, you might want to consider a PersistentEntity
as a mixture between data access objects (DTOs) and a JPA @Entity
. But the differences are obvious.
While a JPA entity is loaded from the database wherever it is needed, there may be many Java object instances with the same entity identifier. In contrast, there is only one instance of PersistentEntity
with a given identifier. With JPA you typically only store the current state and the history of how the state was reached is not captured.
You interact with a PersistentEntity
by sending command messages to it. Commands are processed sequentially, one at a time, for a specific entity instance. A command may result in state changes that are persisted as events, representing the effect of the command. The current state is not stored for every change, since it can be derived from the events. These events are only ever appended to storage; nothing is ever mutated, which allows for very high transaction rates and efficient replication.
With all this knowledge, the remaining lines of code in the RegistrationServiceImpl
just got a little clearer. In Lagom, the way to send commands to a PersistentEntity
is by using a PersistentEntityRef
, which needs to be looked up via the PersistentEntityRegistry
. This means that CargoEntity
is the PersistentEntity
and RegisterCargo
is the command we want to send:
// Look up the CargoEntity for the given ID.
PersistentEntityRef
<
RegistrationCommand
>
ref
=
persistentEntityRegistry
.
refFor
(
CargoEntity
.
class
,
request
.
getId
());
// Tell the entity to use the Cargo information in the request.
return
ref
.
ask
(
RegisterCargo
.
of
(
request
));
We made it all the way through the service implementation down to the CargoEntity
. This is an event-sourced entity. It has a state CargoState
, which stores information about the registered cargo. It can also receive commands that are defined in the RegistrationCommand
and translate them into events that are defined in the RegistrationEvent
class.
As mentioned before, this is the place where commands are translated into events. And as such, the PersistentEntity
has to define a behavior for every command it understands. A behavior is defined by registering commands and event handlers:
public
class
CargoEntity
extends
PersistentEntity
<
RegistrationCommand
,
RegistrationEvent
,
CargoState
>
{
@Override
public
Behavior
initialBehavior
(
Optional
<
CargoState
>
snapshotState
)
{
// command and event handlers
return
b
.
build
();
}
}
If you look at the supported commands, you will find only one, RegisterCargo
. It is sent down from the UI when a user adds a new cargo. By convention, the commands should be inner classes of the interface, which makes it simple to get a complete picture of what commands an entity supports. Commands are also immutable objects:
public
interface
RegistrationCommand
extends
Jsonable
{
@Value.Immutable
@ImmutableStyle
@JsonDeserialize
(
as
=
RegisterCargo
.
class
)
public
interface
AbstractRegisterCargo
extends
RegistrationCommand
,
CompressedJsonable
,
PersistentEntity
.
ReplyType
<
Done
>
{
@Value.Parameter
Cargo
getCargo
();
}
}
Commands get translated to events, and it’s the events that get persisted. Each event will have an event handler registered for it, and an event handler simply applies an event to the current state. This will be done when the event is first created, and it will also be done when the entity is loaded from the database—each event will be replayed to re-create the state of the entity. The RegistrationEvent
interface defines all the events supported by the CargoEntity
. In our case it is exactly one event: CargoRegistered
:
public
interface
RegistrationEvent
extends
Jsonable
,
AggregateEvent
<
RegistrationEvent
>
{
@Immutable
@ImmutableStyle
@JsonDeserialize
(
as
=
CargoRegistered
.
class
)
interface
AbstractCargoRegistered
extends
RegistrationEvent
{
@Override
default
public
AggregateEventTag
<
RegistrationEvent
>
aggregateTag
()
{
return
RegistrationEventTag
.
INSTANCE
;
}
@Value.Parameter
String
getId
();
@Value.Parameter
Cargo
getCargo
();
}
}
This event is emitted when a RegisterCargo
command is received. Events and commands are nothing more than immutable objects. Let’s add the different behaviors to handle the command and trigger events.
Behavior is defined using a behavior builder. The behavior builder starts with a state, and if this entity supports snapshotting—with an optimization strategy that allows the state itself to be persisted to combine many events into one—then the passed-in snapshotState
may have a value that can be used. Otherwise, the default state is to use a dummy cargo
with an id
of empty string:
@Override
public
Behavior
initialBehavior
(
Optional
<
CargoState
>
snapshotState
)
{
BehaviorBuilder
b
=
newBehaviorBuilder
(
snapshotState
.
orElse
(
CargoState
.
builder
().
cargo
(
Cargo
.
builder
()
.
id
(
""
)
.
description
(
""
)
.
destination
(
""
)
.
name
(
""
)
.
owner
(
""
).
build
())
.
timestamp
(
LocalDateTime
.
now
()
).
build
()));
//...
The functions that process incoming commands are registered in the behavior using setCommandHandler
of the BehaviorBuilder
. We start with the initial RegisterCargo
command. The command handler validates the command payload (in this case it only checks if the cargo has a name set) and emits the CargoRegistered
event with the new payload. A command handler returns a persist directive that defines what event or events, if any, to persist. This example uses the thenPersist
directive, which only stores a single event:
//...
b
.
setCommandHandler
(
RegisterCargo
.
class
,
(
cmd
,
ctx
)
->
{
if
(
cmd
.
getCargo
().
getName
()
==
null
||
cmd
.
getCargo
().
getName
().
equals
(
""
))
{
ctx
.
invalidCommand
(
"Name must be defined"
);
return
ctx
.
done
();
}
final
CargoRegistered
cargoRegistered
=
CargoRegistered
.
builder
().
cargo
(
cmd
.
getCargo
()).
id
(
entityId
()).
build
();
return
ctx
.
thenPersist
(
cargoRegistered
,
evt
->
ctx
.
reply
(
Done
.
getInstance
()));
});
When an event has been persisted successfully the current state is updated by applying the event to the current state. The functions for updating the state are also registered with the setEventHandler
method of the BehaviorBuilder
. The event handler returns the new state. The state must be immutable, so you return a new instance of the state:
b
.
setEventHandler
(
CargoRegistered
.
class
,
// We simply update the current
// state to use the new cargo payload
// and update the timestamp
evt
->
state
()
.
withCargo
(
evt
.
getCargo
())
.
withTimestamp
(
LocalDateTime
.
now
())
);
Note
The event handlers are typically only updating the state, but they may also change the behavior of the entity in the sense that new functions for processing commands and events may be defined. Learn more about this in the PersistentEntity
documentation.
We successfully persisted an entity. Let’s finish the example and see how it is displayed to the user. The getLiveRegistrations()
service call subscribes to the topic that was created in the register()
service call before and returns the received content:
@Override
public
ServiceCall
<
NotUsed
,
NotUsed
,
Source
<
Cargo
,
?>>
getLiveRegistrations
()
{
return
(
id
,
req
)
->
{
PubSubRef
<
Cargo
>
topic
=
topics
.
refFor
(
TopicId
.
of
(
Cargo
.
class
,
"topic"
));
return
CompletableFuture
.
completedFuture
(
topic
.
subscriber
()
);
};
}
To see the consumer side, you have to look into the front-end project and open the ReactJS application in main.jsx. The createCargoStream()
function points to the API endpoint and the live cargo events are published to the cargoNodes
function and rendered accordingly (see Figure 4-2).
One last step in this example is to add a REST-based API to expose all the persisted cargo to an external system. While persistent entities are used for holding the state of individual entities—and to work with them you need to know the identifier of an entity—the readAll (select *)
is a different use case. Another view on the persisted data is tailored to the queries the service provides. Lagom has support for populating this read-side view of the data and also for building queries on the read-side.
We start with the service implementation again. The CassandraSession
is injected in the constructor of the implementation class. CassandraSession
provides several methods in different flavors for executing queries. All methods are nonblocking and they return a CompletionStage
or a Source
. The statements are expressed in Cassandra query language (CQL) syntax:
@Override
public
ServiceCall
<
NotUsed
,
NotUsed
,
PSequence
<
Cargo
>>
getAllRegistrations
()
{
return
(
userId
,
req
)
->
{
CompletionStage
<
PSequence
<
Cargo
>>
result
=
db
.
selectAll
(
"SELECT cargoid,"
+
"name, description, owner,"
+
"destination FROM cargo"
)
.
thenApply
(
rows
->
{
List
<
Cargo
>
cargos
=
rows
.
stream
().
map
(
row
->
Cargo
.
of
(
row
.
getString
(
"cargoid"
),
row
.
getString
(
"name"
),
row
.
getString
(
"description"
),
row
.
getString
(
"owner"
),
row
.
getString
(
"destination"
)))
.
collect
(
Collectors
.
toList
());
return
TreePVector
.
from
(
cargos
);
});
return
result
;
};
}
Before the query side actually works, we need to work out a way to transform the events generated by the persistent entity into database tables. This is done with a CassandraReadSideProcessor
:
public
class
CargoEventProcessor
extends
CassandraReadSideProcessor
<
RegistrationEvent
>
{
@Override
public
AggregateEventTag
<
RegistrationEvent
>
aggregateTag
()
{
return
RegistrationEventTag
.
INSTANCE
;
}
@Override
public
CompletionStage
<
Optional
<
UUID
>>
prepare
(
CassandraSession
session
)
{
// TODO prepare statements, fetch offset
return
noOffset
();
}
@Override
public
EventHandlers
defineEventHandlers
(
EventHandlersBuilder
builder
)
{
// TODO define event handlers
return
builder
.
build
();
}
}
To make the events available for read-side processing, the events must implement the aggregateTag
method of the AggregateEvent
interface to define which events belong together. Typically, you define this aggregateTag
on the top-level event type of a PersistentEntity
class. Note that this is also used to create read-side views that span multiple PersistentEntities
:
public
class
RegistrationEventTag
{
public
static
final
AggregateEventTag
<
RegistrationEvent
>
INSTANCE
=
AggregateEventTag
.
of
(
RegistrationEvent
.
class
);
}
Finally, the RegistrationEvent
also needs to extend the AggregateEvent<E>
interface. Now, we’re ready to implement the remaining methods of the CargoEventProcessor
.
Tables and prepared statements need to be created first. Further on, it has to be decided how to process existing entity events, which is the primary purpose of the prepare
method. Each event is associated with a unique offset, a time-based UUID. The offset is a parameter to the event handler for each event and should typically be stored so that it can be retrieved with a select statement in the prepare
method. You can use the CassandraSession
to get the stored offset.
Composing all of the described asynchronous CompletionStage
tasks for this example look like this:
@Override
public
CompletionStage
<
Optional
<
UUID
>>
prepare
(
CassandraSession
session
)
{
return
prepareCreateTables
(
session
).
thenCompose
(
a
->
prepareWriteCargo
(
session
).
thenCompose
(
b
->
prepareWriteOffset
(
session
).
thenCompose
(
c
->
selectOffset
(
session
))));
}
Starting with the table preparation for the read-side is simple. Use the CassandraSession
to create the two tables:
private
CompletionStage
<
Done
>
prepareCreateTables
(
CassandraSession
session
)
{
return
session
.
executeCreateTable
(
"CREATE TABLE IF NOT EXISTS cargo ("
+
"cargoId text, name text, description text,"
+
"owner text, destination text,"
+
"PRIMARY KEY (cargoId, destination))"
)
.
thenCompose
(
a
->
session
.
executeCreateTable
(
"CREATE TABLE IF NOT EXISTS cargo_offset ("
+
"partition int, offset timeuuid, "
+
"PRIMARY KEY (partition))"
));
}
The same can be done with the prepared statements. This is the example for inserting new cargo into the cargo table:
private
CompletionStage
<
Done
>
prepareWriteCargo
(
CassandraSession
session
)
{
return
session
.
prepare
(
"INSERT INTO cargo"
+
"(cargoId, name, description, "
+
"owner,destination) VALUES (?, ?,?,?,?)"
)
.
thenApply
(
ps
->
{
setWriteCargo
(
ps
);
return
Done
.
getInstance
();
});
}
The last missing piece is the event handler. Whenever a CargoRegistered
event is received, it should be persisted into the table. The events are processed by event handlers that are defined in the method defineEventHandlers
, one handler for each event class. A handler is a BiFunction
that takes the event and the offset as parameters and returns zero or more bound statements that will be executed before processing the next event.
@Override
public
EventHandlers
defineEventHandlers
(
EventHandlersBuilder
builder
)
{
builder
.
setEventHandler
(
CargoRegistered
.
class
,
this
::
processCargoRegistered
);
return
builder
.
build
();
}
private
CompletionStage
<
List
<
BoundStatement
>>
processCargoRegistered
(
CargoRegistered
event
,
UUID
offset
)
{
// bind the prepared statement
BoundStatement
bindWriteCargo
=
writeCargo
.
bind
();
// insert values into prepared statement
bindWriteCargo
.
setString
(
"cargoId"
,
event
.
getCargo
().
getId
());
bindWriteCargo
.
setString
(
"name"
,
event
.
getCargo
().
getName
());
bindWriteCargo
.
setString
(
"description"
,
event
.
getCargo
().
getDescription
());
bindWriteCargo
.
setString
(
"owner"
,
event
.
getCargo
().
getOwner
());
bindWriteCargo
.
setString
(
"destination"
,
event
.
getCargo
().
getDestination
());
// bind the offset prepared statement
BoundStatement
bindWriteOffset
=
writeOffset
.
bind
(
offset
);
return
completedStatements
(
Arrays
.
asList
(
bindWriteCargo
,
bindWriteOffset
));
}
In this example we add one row to the cargo table and update the current offset for each RegistrationEvent
.
Note
It is safe to keep state in variables of the enclosing class and update it from the event handlers. The events are processed sequentially, one at a time. An example of such state could be values for calculating a moving average.
If there is a failure when executing the statements the processor will be restarted after a backoff delay. This delay is increased exponentially in the case of repeated failures.
There is another tool that can be used if you want to do something else with the events other than updating tables in a database. You can get a stream of the persistent events with the eventStream
method of the PersistentEntityRegistry
.
You have already seen the service implementation that queries the database. Let’s try out the API endpoint and get a list of all the registered cargo in the system by curling it:
curl http://localhost:9000/api/registration/all
[
{
"id":"522871",
"name":"TEST",
"description":"TEST",
"owner":"TEST",
"destination":"TEST"
},
{
"id":"623410",
"name":"SECOND",
"description":"SECOND",
"owner":"SECOND",
"destination":"SECOND"
}
]
Consuming Services
We’ve seen how to define service descriptors and implement them, but now we need to consume services. The service descriptor contains everything Lagom needs to know to invoke a service. Consequently, Lagom is able to implement service descriptor interfaces for you.
The first thing necessary to consume a service is to bind it, so that Lagom can provide an implementation for your application to use. We’ve done that with the service before. Let’s add a client call from the shipping-impl to the registration-api and validate a piece of cargo before we add a leg in the shipping-impl:
public
class
ShippingServiceModule
extends
AbstractModule
implements
ServiceGuiceSupport
{
@Override
protected
void
configure
()
{
bindServices
(
serviceBinding
(
ShippingService
.
class
,
ShippingServiceImpl
.
class
));
bindClient
(
RegistrationService
.
class
);
}
}
Make sure to also add the dependency between both projects in the build.sbt file by adding .dependsOn(registrationApi)
to the shipping-impl project.
Having bound the client, you can now have it injected into any Lagom component using the @Inject
annotation. In this example it is injected into the ShippingServiceImpl:
public
class
ShippingServiceImpl
implements
ShippingService
{
private
final
RegistrationService
registrationService
;
@Inject
public
ShippingServiceImpl
(
PersistentEntityRegistry
persistentEntityRegistry
,
RegistrationService
registrationService
)
{
this
.
registrationService
=
registrationService
;
//...
}
The service can be used to validate a cargo ID in the shipping-impl
before adding a leg:
@Override
public
ServiceCall
<
String
,
Leg
,
Done
>
addLeg
()
{
return
(
id
,
request
)
->
{
CompletionStage
<
Cargo
>
response
=
registrationService
.
getRegistration
()
.
invoke
(
request
.
getCargoId
(),
NotUsed
.
getInstance
());
PersistentEntityRef
<
ShippingCommand
>
itinerary
=
persistentEntityRegistry
.
refFor
(
ItineraryEntity
.
class
,
id
);
return
itinerary
.
ask
(
AddLeg
.
of
(
request
));
};
}
All service calls with Lagom service clients are by default using circuit breakers. Circuit breakers are used and configured on the client side, but the granularity and configuration identifiers are defined by the service provider. By default, one circuit breaker instance is used for all calls (methods) to another service. It is possible to set a unique circuit breaker identifier for each method to use a separate circuit breaker instance for each method. It is also possible to group related methods by using the same identifier on several methods. You can find more information about how to configure the circuit breaker in the Lagom documentation.
Get Developing Reactive Microservices now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.