is implementation of . ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
Callbacks are too low level and CompletableFuture deals only with single value, there comes streaming event driven and declarative API of RxJava.
Observable
Observable is generalisation of Observer pattern. Observable adds stream-like API, API with common operations and back pressure or throttling.
Java streams should be used when we know what data are going to be processed, something like batch processing. While Observable is more for situations when we do not know what is coming, something like on demand computations.
We create observable and emit a "Hello World" message. Then we subscribe to that observable and print the message out.
Hello World
Flowables
Flowable has similar API as Observable. Observable does not provide reactive pull back pressure. Flowable is the new Observable and Flowable should be used instead.
Interval
We can define interval that will execute consequent calls on flowable. In this case, we create interval that emits random long numbers every two seconds.
import io.reactivex.Flowable;
class FlowableInterval {
public static void main(String[] args) {
Flowable<Long> flowable = Flowable
.interval(2, TimeUnit.SECONDS)
.map(x -> new Random().nextLong());
flowable.blockingSubscribe(System.out::println);
}
}
When we run that code, we get a random long every two seconds. Just imagine what threads would you have to create you have to create without Flowable.
class FlowableZip {
public static void main(String[] args) {
Flowable<Integer> f1 = Flowable.fromArray(1, 2, 3, 4, 5);
Flowable<Integer> f2 = Flowable.fromArray(1, 2, 3);
Flowable<Integer> sum = f1.zipWith(f2, (x, y) -> x + y);
sum.blockingSubscribe(System.out::println);
}
}
When we run zipWith on two flowables, it creates inner join and maps the values using the function provided as second parameter.
2
4
6
The first example of zipWith is really easy. Lets try to do something more interesting. Lets create two streams of numbers and then zip them together. Each stream will generate a number after two seconds.
class FlowableZipStreams {
public static void main(String[] args) {
Flowable<Integer> f1 = Flowable
.interval(1, TimeUnit.SECONDS)
.map(x -> new Random().nextInt(10));
Flowable<Integer> f2 = Flowable
.interval(1, TimeUnit.SECONDS)
.map(x -> new Random().nextInt(10));
Flowable<Integer> sum = f1.zipWith(f2, (x, y) -> x + y);
sum.blockingSubscribe(System.out::println);
}
}
When we run this code, each stream generates number from 0 to 10. Then the zipWith method will make sum of those two numbers.
10
11
10
15
7
10
Filtering Values
We can use filter method to filter values coming from flowable. We are going to filter values that are lower than 3 in the following example.
We can create well defined publishers and subscribers using RxJava. In this example, we are going to publish two messages and announce completion of production.
Here is the output. First subscriber will receive messages. Then another subscriber, who has only one method prints received messages.
[onSubscribe]
[onNext] Starting publishing...
[onNext] Another message published
[onComplete]
[accept] Starting publishing...
[accept] Another message published
Testing
When it comes to testing, it might be tricky to test some features of Flowable. There are couple of methods that help to verify expected behavior.
Error handling is one of the most important features we are going to use when working with Flowable. We might need to recover from network errors, when remote service fails and we need to retry couple of times.
Or we need to recover from execution of some logic when an exception was thrown. Or we need to recover from system errors when, for example, an exception was thrown because of missing space on hard drive.
The following code is going to try 5 times to recover from error state. When it fails 6th time, it handle error by returning human readable message.
class FLowableErrorHandling {
public static void main(String[] args) {
// network errors, remote service fails -> we might want to retry
Flowable<Object> stream = Flowable
.interval(1, TimeUnit.SECONDS)
.map(it -> {
System.out.println(it);
throw new RuntimeException();
})
.retry(5)
.onErrorReturnItem("Sorry, can't do it, god knows I tried.");
stream.blockingSubscribe(System.out::println);
}
}
Here how the output of execution looks like.
0
0
0
0
0
0
Sorry, can't do it, god knows I tried.
Fetching Data
Lets say we want to fetch data from some server. We start with list of URLs and we want to end up with list HTML that is fetched from web servers. Also we want to take care of error handling. If an URL does not exist, we want to return human readable message.
Throttling is mechanism to avoid memory, network or CPU issues by reducing number of operations. In short, it is one of the mechanisms to avoid system overload.
Lets create flowable that emits a random integer every 100 milliseconds. That could be a lot for our system and we can handle it by taking only one number every second.
// throttling -> throw away some data to handle the load
Flowable
.interval(100, TimeUnit.MILLISECONDS)
.map(x -> new Random().nextInt())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(System.out::println);
Another way to handle the load is to create buffer of certain size, 5 in our case, and then do operation on that buffer. We are going to take average number from every 5 values we receive.
// throttling -> get average
Flowable
.interval(100, TimeUnit.MILLISECONDS)
.map(x -> new Random().nextInt())
.buffer(5)
.map(numbers -> numbers.stream().mapToDouble(d-> d).average().getAsDouble())
.blockingSubscribe(System.out::println);
Another way to handle big load of values is to get only first value. We are going to Therea only first value that was received.
// throttling -> throttle first value
Flowable
.interval(100, TimeUnit.MILLISECONDS)
.map(x -> new Random().nextInt())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(System.out::println);
Backpressure
With back-pressure we control how much is coming it.
There are different types of schedulers when using Flowable. Choosing scheduler is going to inform flowable what kind of strategy it should use for processing the items.