Project Reactor has doOn Callback operators that we can use to perform custom actions without modifying the elements in the sequence. The doOn Callbacks allow us to peek into the events emitted by the Publisher (Mono or Flux).
We call them the side effect operators since they don’t change the original sequence.
Exploring the doOn Callbacks in Project Reactor
There are many useful doOn Callback operators available in Project Reactor.
In this post, we will explore the most used ones, such as:
- doOnSubscribe() – gets invoked for every new subscription from the Subscriber
- doOnNext() – gets invoked for every element that gets emitted from the Publisher
- doOnComplete() – gets invoked when the Completion signal gets sent from the Publisher
- doOnError() – gets invoked when the Error signal is sent from the Publisher
- doFinally() – gets executed at the end in both scenarios successful and failed.
doOnSubscribe() method
The below example shows how we can peek into the onSubscribe event, which happens when the Subscriber subscribes to the Publisher. In this way, we can get the Subscription object that is being sent from the Publisher.
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just("data1", "data2", "data3") .doOnSubscribe(subscription -> System.out.println("Subscription: " + subscription)) .subscribe(); } }
doOnNext() method
With this method, we can peek into every data that is being sent from the Publisher.
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just("data1", "data2", "data3") .doOnNext(data -> System.out.println("Data: " + data)) .subscribe(); } }
doOnComplete() method
We can set some code that will be executed as soon as the Completion signal is sent from the Publisher, using the doOnComplete() method.
class ReactiveJavaTutorial { public static void main(String[] args) { Flux.just("data1", "data2", "data3") .doOnNext(data -> System.out.println("Data: " + data)) .doOnComplete(() -> System.out.println("Publisher sent Completion signal!")) .subscribe(); } }
doOnError() method
The doOnError() method will be executed if some exception occurs and Publisher sends the onError signal to the Subscriber.
class ReactiveJavaTutorial { public static void main(String[] args) { Mono mono = Mono.fromSupplier(() -> { throw new RuntimeException(" an error occurred!"); // invoking the onError signal from the Publisher (Mono) }); mono.doOnError(err -> System.out.println("Error: " + err)).subscribe(); } }
doFinally() method
If we want to get notified in both successful and failed scenarios, we will use the doOnFinally() method.
class ReactiveJavaTutorial { public static void main(String[] args) { // successful scenario Mono.just("data") .doFinally(signal -> System.out.println(signal + " signal received.")) .subscribe(); // failed scenario Mono mono = Mono.fromSupplier(() -> { throw new RuntimeException(" an error occurred!"); // invoking the onError signal from the Publisher (Mono) }); mono.doFinally(signal -> System.out.println(signal + " signal received.")).subscribe(); } }