Getting Started with the Java Flow API

Exploring the Java Flow API

The Java Flow API, introduced in Java 9, provides a standard and efficient way for components of a reactive system to communicate with each other asynchronously, without requiring them to know each other's implementation details. In this post, we'll explore the key components of the Flow API and show how to use them to implement the Observer pattern in a more standardized way.

Key Components of the Flow API

The Flow API defines four key components: Publisher, Subscriber, Subscription, and Processor.

Publisher

A Publisher is a provider of data items, which publishes them to one or more Subscriber instances. To implement a custom Publisher, you need to implement the java.util.concurrent.Flow.Publisher interface, which defines the following methods:

  • subscribe(Subscriber<? super T> subscriber): Subscribes a Subscriber to this Publisher, returning a new Subscription instance that represents the connection between them.

Subscriber

A Subscriber is a consumer of data items, which receives them from a Publisher. To implement a custom Subscriber, you need to implement the java.util.concurrent.Flow.Subscriber interface, which defines the following methods:

  • onSubscribe(Subscription subscription): Receives a Subscription instance from the Publisher, which allows the Subscriber to request and receive data items from the Publisher.
  • onNext(T item): Receives the next data item from the Publisher.
  • onError(Throwable throwable): Receives an error notification from the Publisher.
  • onComplete(): Receives a completion notification from the Publisher.

Subscription

A Subscription is a connection between a Publisher and a Subscriber, which allows the Subscriber to request and receive data items from the Publisher. To implement a custom Subscription, you need to implement the java.util.concurrent.Flow.Subscription interface, which defines the following methods:

  • request(long n): Requests n data items from the Publisher.
  • cancel(): Cancels the Subscription, terminating the connection between the Publisher and Subscriber.

Processor

A Processor is a component that both subscribes to a Publisher and publishes to a Subscriber, allowing it to transform or filter data items. To implement a custom Processor, you need to implement both the java.util.concurrent.Flow.Publisher and java.util.concurrent.Flow.Subscriber interfaces.

Implementing a Temperature Converter using the Java Flow API

In this tutorial, we'll create a simple temperature converter application that allows users to convert temperature readings between Celsius and Fahrenheit. We'll use the Java Flow API to implement a reactive architecture that allows temperature readings to be published by a Publisher and consumed by multiple Subscriber instances, each of which can perform its own temperature conversion.

Why Use the Java Flow API for a Temperature Converter?

A temperature converter application might seem like a simple problem that can be solved using basic Java programming techniques. However, there are several good reasons to use the Java Flow API:

  • Asynchronous processing: The Java Flow API provides a standardized way to handle asynchronous processing, allowing the temperature readings to be processed in real-time as they arrive, without blocking the main application thread.
  • Reactive architecture: The Java Flow API supports a reactive architecture, which allows multiple Subscriber instances to consume the same data stream simultaneously, each performing its own processing. This makes it easy to extend the application with new functionality in the future.
  • Backpressure: The Java Flow API supports backpressure, which allows the Publisher to control the rate at which temperature readings are published, preventing the Subscriber instances from being overwhelmed with too much data.

Implementing the Temperature Converter

Let's start by defining a simple TemperatureSensor class that generates temperature readings at random intervals and publishes them to a Publisher:


import java.util.Random;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.SubmissionPublisher;

public class TemperatureSensor {

  private final SubmissionPublisher publisher;
  private final Random random;

  public TemperatureSensor() {
    this.publisher = new SubmissionPublisher<>();
    this.random = new Random();
  }

  public void start() {
    Thread thread =
        new Thread(
            () -> {
              while (true) {
                double temperature = random.nextDouble() * 100;
                publisher.submit(temperature);
                sleep(random.nextInt(5000));
              }
            });
    thread.start();
  }

  public void stop() {
    publisher.close();
  }

  public Publisher asPublisher() {
    return publisher;
  }

  private static void sleep(int millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

The TemperatureSensor class uses a SubmissionPublisher to publish temperature readings to a Subscriber. The start method runs a loop that generates a random temperature reading and publishes it to the Subscriber via the SubmissionPublisher. The stop method closes the SubmissionPublisher. Finally, the asPublisher method returns the Publisher instance, which allows other components to subscribe to it.

Next, let's define a TemperatureConverter interface that defines a single method for converting temperature readings between Celsius and Fahrenheit:


public interface TemperatureConverter {
  double convert(double temperature);
}

We'll implement two concrete TemperatureConverter classes: CelsiusConverter, which converts Celsius to Fahrenheit, and FahrenheitConverter, which converts Fahrenheit to Celsius.


public class CelsiusConverter implements TemperatureConverter {

  @Override
  public double convert(double temperature) {
    return temperature * 1.8 + 32;
  }
}

public class FahrenheitConverter implements TemperatureConverter {

  @Override
  public double convert(double temperature) {
    return (temperature - 32) / 1.8;
  }
}

Now, let's define a simple TemperatureDisplay class that subscribes to the TemperatureSensor and prints temperature readings to the console, after converting them to the desired temperature unit:


import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TemperatureDisplay implements Subscriber {

  private final TemperatureConverter converter;

  private Subscription subscription;

  public TemperatureDisplay(TemperatureConverter converter) {
    this.converter = converter;
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(Double temperature) {
    double converted = converter.convert(temperature);
    System.out.printf("%.2f%s\n", converted, converter instanceof CelsiusConverter ? "F" : "C");
    subscription.request(1);
  }

  @Override
  public void onError(Throwable throwable) {
    throwable.printStackTrace();
  }

  @Override
  public void onComplete() {
    System.out.println("Done");
  }
}

The TemperatureDisplay class implements the Subscriber interface to receive temperature readings from the TemperatureSensor. The constructor takes a TemperatureConverter instance, which is used to convert the temperature readings to the desired unit. The onSubscribe method receives a Subscription instance, which it stores for later use, and requests the first temperature reading by calling subscription.request(1). The onNext method receives each temperature reading, converts it to the desired unit, and prints it to the console with the appropriate unit suffix. Finally, the onError method prints any error that occurs to the console, and the onComplete method indicates that the temperature readings are complete.

Finally, let's see how to use the TemperatureSensor and TemperatureDisplay classes together to implement a simple temperature converter application:


public class Main {
  public static void main(String[] args){
    TemperatureSensor sensor = new TemperatureSensor();
    sensor.start();
    sensor.asPublisher().subscribe(new TemperatureDisplay(new CelsiusConverter()));
    sensor.asPublisher().subscribe(new TemperatureDisplay(new FahrenheitConverter()));
  }
}

The Main class creates a TemperatureSensor instance, starts it by calling sensor.start(), and subscribes two TemperatureDisplay instances to its Publisher, one for Celsius readings and one for Fahrenheit readings.

When you run the Main class, you should see temperature readings printed to the console at random intervals, converted to both Celsius and Fahrenheit:

98.22F
36.05C
13.61C
74.18F
...

And that's it! We've seen how to use the Java Flow API to implement a simple temperature converter application that allows temperature readings to be converted between Celsius and Fahrenheit in real-time, using a reactive architecture that allows multiple Subscriber instances to consume the same data stream.

Comments

Popular posts from this blog

if-else statements

Loops in Java 8