Docs

Consuming Reactive Streams

How to use server push with reactive streams.

When building the user interface with Vaadin Flow, you can use reactive streams to allow a background job to update the user interface. This is covered in the Producing Reactive Streams documentation page.

This page covers subscribing to reactive streams, handling errors, and buffering high-frequency updates.

Types of Subscriptions

Background threads typically use cold streams for output. A cold stream starts emitting values when the client subscribes to it, and then completes.

Broadcasts typically use hot streams for output. A hot stream emits values regardless of whether a client is subscribed or not. A subscriber only receives the values that were emitted while it was subscribed.

In your user interfaces, you typically don’t need to worry about unsubscribing from cold streams, as they’re often short lived. However, if you subscribe to a hot stream, it’s important that you remember to unsubscribe when no longer needed.

Subscribing

You can use both Flux and Mono. You subscribe to both by calling the subscribe() method. The method takes a callback which is called for each emitted value. You should implement the callback as a private method, and then wrap it inside UI.accessLater() when you subscribe.

For example, a method for handling successful completion could look like this:

Source code
Java
private void onJobCompleted(String result) {
    Notification.show("Job completed: " + result);
}

The UI.accessLater() method is explained in the Pushing UI Updates documentation page.

In the following example, a background job returns a Mono<String>. The stream is cold, so you don’t need to unsubscribe explicitly from it, as this happens once the Mono has completed. The job is started by a button click listener.

Source code
Java
button.addClickListener(clickEvent -> {
    var ui = UI.getCurrentOrThrow();
    service.startBackgroundJob()
        .subscribe(ui.accessLater(this::onJobCompleted, null));
});

In the following example, a Flux<ChatMessage> is used to receive chat messages. The stream is hot, so you have to subscribe to it when the component is attached, and unsubscribe when it’s detached:

Source code
Java
private void onMessageReceived(ChatMessage message) {
    // Add the message to a message list
}

@Override
protected void onAttach(AttachEvent attachEvent) {
    var ui = attachEvent.getUI();
    var subscription = chatService.messages()
        .subscribe(ui.accessLater(this::onMessageReceived, null));
    addDetachListener(detachEvent -> {
        detachEvent.unregisterListener();
        subscription.dispose();
    });
}

For a hot stream that several users share, you can instead feed the stream into a shared signal once, in a singleton bean, and let each view bind to it. This removes the per-view subscribe/unsubscribe and UI.accessLater() bookkeeping:

Source code
Java
@Component // Singleton: subscribed once for the whole application
public class ChatMessages {
    private final SharedListSignal<ChatMessage> messages =
            new SharedListSignal<>(ChatMessage.class);

    ChatMessages(ChatService chatService) {
        chatService.messages().subscribe(messages::insertLast); 1
    }

    public SharedListSignal<ChatMessage> messages() {
        return messages;
    }
}
  1. The bean holds a single subscription for all users. Views bind to messages() with bindChildren(), and Flow pushes new messages to every connected user automatically.

See Broadcasting to All Users for the matching view code. Note that the buffering guidance below still applies: signals don’t rate-limit updates, so buffer a high-frequency stream before feeding it into the signal.

Handling Errors

In a reactive stream, an error is a terminal event. This means that the subscription is cancelled and no more values are emitted. If you’re dealing with a hot stream, you should therefore consider resubscribing to it as a part of error recovery.

You can use the doOnError() method to attach a callback that’s called if an error occurs. As for successful completion, you should declare a private method and wrap it inside UI.accessLater().

For example, a method for handling errors could look like this:

Source code
Java
private void onJobFailed(Throwable error) {
    Notification.show("Job failed: " + error.getMessage());
}

In the following example, a button click listener starts a new background job, and uses the callback method to handle any errors that may occur:

Source code
Java
button.addClickListener(clickEvent -> {
    var ui = UI.getCurrentOrThrow();
    service.startBackgroundJob()
        .doOnError(ui.accessLater(this::onJobFailed, null))
        .subscribe(ui.accessLater(this::onJobCompleted, null));
});

Buffering

You shouldn’t push updates to the browser more than 2 to 4 times per second. If your Flux is emitting events faster than that, you should buffer them and update the user interface in batches. Buffering a Flux is easy, as it has built-in support for it through the buffer() method.

In the following example, the buffered stream buffers events for 250 milliseconds before it emits them in batches. Because of this, the user interface is receiving a List<Event> instead of an Event:

Source code
Java
private Flux<Event> eventStream() {
    ...
}

public Flux<List<Event>> bufferedEventStream() {
    return eventStream().buffer(Duration.ofMillis(250));
}

You can also do the buffering in the user interface, before you subscribe to the stream.

In the following example, the user interface component subscribes to the buffered stream when it’s attached, and unsubscribes when it’s detached:

Source code
Java
@Override
protected void onAttach(AttachEvent attachEvent) {
    var subscription = myService.eventStream()
        .buffer(Duration.ofMillis(250))
        .subscribe(attachEvent.getUI().accessLater((eventList) -> {
            // Update your UI here
        }, null));
    addDetachListener(detachEvent -> {
        detachEvent.unregisterListener();
        subscription.dispose();
    });
}

Updated