RxJava is implementation of ReactiveX. ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
Callbacks are too low level and CompletableFuture deals only with single value, there comes streaming event driven and declarative API of RxJava.
Observable
Observable is generalisation of Observer pattern. Observable adds stream-like API, API with common operations and back pressure or throttling.
Java streams should be used when we know what data are going to be processed, something like batch processing. While Observable is more for situations when we do not know what is coming, something like on demand computations.
We create observable and emit a "Hello World" message. Then we subscribe to that observable and print the message out.
Flowables
Flowable has similar API as Observable. Observable does not provide reactive pull back pressure. Flowable is the new Observable and Flowable should be used instead.
Interval
We can define interval that will execute consequent calls on flowable. In this case, we create interval that emits random long numbers every two seconds.
When we run that code, we get a random long every two seconds. Just imagine what threads would you have to create you have to create without Flowable.
Zip
Zip helps to do operations on two flowables.
When we run zipWith on two flowables, it creates inner join and maps the values using the function provided as second parameter.
The first example of zipWith is really easy. Lets try to do something more interesting. Lets create two streams of numbers and then zip them together. Each stream will generate a number after two seconds.
When we run this code, each stream generates number from 0 to 10. Then the zipWith method will make sum of those two numbers.
Filtering Values
We can use filter method to filter values coming from flowable. We are going to filter values that are lower than 3 in the following example.
Here is the output when we run the code.
Publish & Subscribe
We can create well defined publishers and subscribers using RxJava. In this example, we are going to publish two messages and announce completion of production.
Here is the output. First subscriber will receive messages. Then another subscriber, who has only one method prints received messages.
Testing
When it comes to testing, it might be tricky to test some features of Flowable. There are couple of methods that help to verify expected behavior.
Error Handling
Error handling is one of the most important features we are going to use when working with Flowable. We might need to recover from network errors, when remote service fails and we need to retry couple of times.
Or we need to recover from execution of some logic when an exception was thrown. Or we need to recover from system errors when, for example, an exception was thrown because of missing space on hard drive.
The following code is going to try 5 times to recover from error state. When it fails 6th time, it handle error by returning human readable message.
Here how the output of execution looks like.
Fetching Data
Lets say we want to fetch data from some server. We start with list of URLs and we want to end up with list HTML that is fetched from web servers. Also we want to take care of error handling. If an URL does not exist, we want to return human readable message.
Wehn we run the code, we get HTML from google and then we get an exception because the second page does not exist.
Throttling
Throttling is mechanism to avoid memory, network or CPU issues by reducing number of operations. In short, it is one of the mechanisms to avoid system overload.
Lets create flowable that emits a random integer every 100 milliseconds. That could be a lot for our system and we can handle it by taking only one number every second.
Another way to handle the load is to create buffer of certain size, 5 in our case, and then do operation on that buffer. We are going to take average number from every 5 values we receive.
Another way to handle big load of values is to get only first value. We are going to Therea only first value that was received.
Backpressure
With back-pressure we control how much is coming it.
Scheduler
There are different types of schedulers when using Flowable. Choosing scheduler is going to inform flowable what kind of strategy it should use for processing the items.
When we use trampoline scheduler, we receive the same thread 5 times.
When we use io scheduler, and we do some IO operations, we will see IO threads were created.
[onSubscribe]
[onNext] Starting publishing...
[onNext] Another message published
[onComplete]
[accept] Starting publishing...
[accept] Another message published
// throttling -> throw away some data to handle the load
Flowable
.interval(100, TimeUnit.MILLISECONDS)
.map(x -> new Random().nextInt())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(System.out::println);
// throttling -> get average
Flowable
.interval(100, TimeUnit.MILLISECONDS)
.map(x -> new Random().nextInt())
.buffer(5)
.map(numbers -> numbers.stream().mapToDouble(d-> d).average().getAsDouble())
.blockingSubscribe(System.out::println);
// throttling -> throttle first value
Flowable
.interval(100, TimeUnit.MILLISECONDS)
.map(x -> new Random().nextInt())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(System.out::println);