Java SE 9の新機能としてはあまり目立ってはいませんが、非同期処理ライブラリの「Concurrency Utilities」にも新しい機能が導入されています。Concurrency Utilitesの新機能はJEP 266 More Concurrency Updateで提案されています。導入される主な機能は以下の2つです。

  • Reactive Streamsの導入
  • CompletableFutureクラスの拡充

 前者のReactive Streamsは新しい概念の導入になります。一方、後者のCompletableFutureクラスは様々なメソッドが追加されているものの、基本的な使い方はこれまでと同じです。そこで、今回は前者のReactive Streamsについて紹介していきます。

Reactive Streams

 Reactive Streamsは、非同期ストリーム処理を実現するため、Reactive Streams Special Interest Group(SIG)が策定した仕様です。Javaだけでなく、JavaScriptや.NET向けのインタフェースも定義されています。

 Reactie Streamsを採用しているライブラリとして、RxJavaAkka Streamsなどがあります。Java SE 9では、このReactive Streamsとして策定されたインタフェースを取り込んでいます*1

 Reactive Streamsは「リアクティブプログラミング」の一種としてとらえることができます。リアクティブプログラミングは、データの流れ(ストリーム)に着目したプログラミングスタイルです。一般に、「Publish-Subscribe」モデルを採用してデータストリームを扱います。

 Publish-Subscribeモデルでは、データを作成(生産)してストリームとして流すPublisherと、流れてきたデータを処理(消費)するSubscriberとの間でやり取りします。リアクティブプログラミングでは非同期処理が必須というわけではありませんが、データの生産側と消費側が疎結合になるモデルなため、非同期処理にへの移行が容易になります。

*1 ストリームといってはいますが、java.util.stream.Streamインタフェースとは別ものなので、ご注意ください。

Publish-Subscribeモデルの処理の流れ

 Reactive Streamsでの処理の流れを説明する前に、まず一般的なPublish-Subscribeモデルでの処理の流れを説明しておきましょう。

 図1に正常時の処理の流れをUMLのシーケンス図で示しました。

図1●Publish-Subscribeモデルの処理の流れ
図1●Publish-Subscribeモデルの処理の流れ

 まず、SubscriberはPublisherに購読(subscribe)の登録を行います。すると、Publisherはデータを生成しSubscriberに通知します。データの生成が続く限り、通知を繰り返します。

 データの生成が完了したら、PublisherはSubscriberにその旨を通知します。また、図1には示していませんが、途中でエラーが発生した場合、Subscriberにエラーを通知します。図1では、Publisher自身がデータを生成しているように記述していますが、このほかにもGUIでのイベントやファイルの読み込み、ネットワークを介した通信といった様々なデータを扱えます。さらに、SubscriberとPublisherを組み合わせ、受け取ったデータを再加工して他のSubscriberに通知することもよく行われます。