Solving a parallel streams puzzler in Java 8
Master simple guidelines for implementing the Streams API in practice
In this post, you will learn about a different approach from traditional Java programming that helps you to use parallel streams the right way in your code.
Let’s say that you need to process the values of a list of transactions by accumulating them into a particular bank account. The class Account
below provides three simple methods to process a transaction, add a certain amount to the total balance, and return the total balance.
class Account {
private long total = 0;
public void process(Transaction transaction) {
add(transaction.getValue());
}
public void add(long amount) {
total += amount;
}
public long getAvailableAmount(){
return total;
}
}
Let’s say you have a list of transactions objects available, you can simply iterate through the list and process each transaction one by one using your bank account:
Account myAccount = getBankAccountWithId(1337);
for(Transaction transaction: transactions) {
account.process(transaction);
}
System.out.println(myAccount.getAvailableAmount());
Inspired from the code above, you may be tempted to use the Streams API to solve this problem as follows:
transactions.stream()
.forEach(myAccount::process);
System.out.println(myAccount.getAvailableAmount());
What’s the problem here? You are modifying the state of the account using an inherently sequential approach (i.e., you are iteratively updating its state.)
But what happens if we run this code in parallel using parallel streams? Let’s use a generated sample of transactions to test:
List<Transaction> transactions
= LongStream.rangeClosed(0, 1_000)
.mapToObj(Transaction::new)
.collect(toList());
You can now run the code in parallel and print the output:
transactions.parallelStream()
.forEach(myAccount::process);
System.out.println(myAccount.getAvailableAmount());
You will find out that you will get different results with different execution runs. For example, when we ran it:
The total balance is 448181
The total balance is 421258
The total balance is 398291
This is very far off the correct result, which is 500500! In fact, what is happening is that you have a data race on each access to the field total
. Multiple threads are trying to read, modify, and update the shared state of the bank account. As a consequence, they are stepping on each other’s toes which leads to unpredictable outputs.
What’s the solution? You may be tempted to simply refactor the add
method to be synchronized
. But this is a bad solution because it adds further thread contention. In other words, your threads are waiting on the result of another before they can proceed. Although using AtomicLong
doesn’t require a global lock, the same principle remains: you want to let threads work independently without waiting on one another.
The Streams API is designed to work correctly under certain guidelines. In practice, to benefit from parallelism, each operation is not allowed to change the state of shared objects (such operations are called side-effect-free). Provided you follow this guideline, the internal implementation of parallel streams cleverly splits the data, assigns different parts to independent threads, and merges the final result. A more idiomatic form of solving the initial problem is as follows:
long sum = transactions.parallelStream()
.mapToLong(Transaction::getValue)
.sum();
myAccount.add(sum);
System.out.println(myAccount.getAvailableAmount());
While it may look appealing to use parallel streams because it is very simple to do so (after all, it’s only a parallel()
or parallelStream()
call), code that works sequentially can often not work as expected in parallel. In addition, using parallel streams doesn’t guarantee that the code will run any faster (it may actually run slower!) There are many caveats to consider including computation cost per element, size of the data, and characteristics about the data source. The code for the above sample is available here.
You can learn more about recommended Java 8 techniques and guidelines in our one-day online course Refactoring Legacy Code with Java 8 on July 19th, 2016.