今週はデックの3回目ということで、マルチスレッドで使用するためのキュー、デックを紹介します。

Java Collections Frameworkで定義されているコレクションクラスは基本的にはスレッドセーフではありません。マルチスレッドで使用する場合にはCollectionsクラスのsynchronizedListメソッドやsynchronizedMapメソッドなどを使用して、スレッドセーフなListオブジェクトやMapオブジェクトを生成する必要がありました。

これに対し、J2SE 5.0では並列プログラミングを行うためのConcurrecy Utilitiesが導入されました。

Concurrency Utilitiesによって、非同期処理やアトミック処理などが簡単に行えるようになりました。また、Concurrency Utilitiesではコレクションに対してもマルチスレッドで使うことを前提にしたクラスを定義しています。

たとえば、読み込みは複数のスレッドからできるが、書き込みは1つのスレッドからしかできないjava.util.concurrent.CopyOnWriteArrayListクラスなどが提供されています。

これと同じようにキュー、デックもスレッドセーフなクラスが用意されています。

単にスレッドセーフにするのであれば、java.util.concurrent.ConcurrentLinkedQueueクラスが提供されています。残念ながら、デックでConcurrentLinkedQueueクラスに相当するクラスはありません。

これらに加え、Concurrency Utilitiesでは、処理をブロックするjava.util.concurrency.BlockingQueueインタフェースとjava.util.concurrency.BlockingDequeインタフェースが提供されています。Java SE 6で導入されたのはBlockingDequeインタフェースです。

ブロックするというのはどういうことなのでしょう。BlockingQueueオブジェクトであれば、要素を取り出すときキュー/デックに要素がなければ、要素が追加されるまで処理待ちをします。また、要素追加するときキュー/デックの容量がいっぱいで追加できないのであれば、要素が取り出されるまで処理待ちをします。

このように処理ができるようになるまで処理がブロックされることからBlockingQueue/BlockingDequeという名前になっています。

ブロックするメソッドはタイムアウトするものとしないものの2種類用意されています。表1にBlockingQueueインタフェースのメソッド、表2にBlockingDequeインタフェースのメソッドを示しておきます。

表1 BlockingQueueインタフェースのメソッド
操作 ブロックしない ブロックする
例外をスローする 例外をスローしない タイムアウトなし タイムアウトあり
追加 add(e) offer(e) put(e) offer(e, time, unit)
取り出し remove() poll() take() poll(time, unit)
検査 element() peek() なし なし

 

表2 BlockingDequeインタフェースのメソッド
操作 先頭に対する操作
ブロックしない ブロックする
例外をスローする 例外をスローしない タイムアウトなし タイムアウトあり
追加 addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
取り出し removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
検査 getFirst() peekFirst() なし なし

タイムアウトを指定する場合、列挙型のTimeUnitを使用して時間の単位を指定し、longで量を指定します。

それでは、簡単なサンプルでBlockingQueueインタフェース/BlockingDequeインタフェースの動きを確かめてみましょう。

サンプルのソース BlockingDequeSample.java

BlockingQueueインタフェースでもBlockingDequeueインタフェースでも使い方はほとんど同じなので、ここではBlockingDequeインタフェースを使用します。

BlockingDequeSampleクラスはマルチスレッドで3つのタスクを並列に処理します。

ここではBlockingDequeインタフェースの実装クラスとしてLinkedBlockingDequeクラスを使用しました。また、デックのサイズは2としています。

      queue = new LinkedBlockingDeque<String>(2);

はじめのタスクはデックに要素を追加します。

      Runnable runnable1 = new Runnable() {
          public void run() {
              try {
                  System.out.println("put A");
                  queue.putFirst("A");
                  System.out.println("done put A");
                  System.out.println("put B");
                  queue.putFirst("B");
                  System.out.println("done put B");
                  System.out.println("put C");
                  queue.putFirst("C");
                  System.out.println("done put C");
                  sleep();
                  sleep();
                  System.out.println("put D");
                  queue.putFirst("D");
                  System.out.println("done put D");
              } catch (InterruptedException ex) {}
          }
      };

sleepメソッドは100ミリ秒スリープするというメソッドです。つまり、Cを追加してからDを追加するまでに200ミリ秒間隔があくことを示しています。

次のタスクは200ミリ秒スリープした後に、デックから要素を取り出します。

      Runnable runnable2 = new Runnable() {
          public void run() {
              try {
                  sleep();
                  sleep();
                  System.out.println("    take");
                  System.out.println("    took " + queue.takeFirst());
                  System.out.println("    take");
                  System.out.println("    took " + queue.takeFirst());
              } catch (InterruptedException ex) {}
          }
      };

最後のタスクも100ミリ秒スリープした後に、要素を追加し、さらに100ミリ秒スリープした後に要素を取り出します。

      Runnable runnable3 = new Runnable() {
          public void run() {
              try {
                  sleep();
                  System.out.println("        put E");
                  queue.putFirst("E");
                  System.out.println("        done put E");
                  sleep();
                  System.out.println("        take");
                  System.out.println("        took " + queue.takeFirst());
                  System.out.println("        take");
                  System.out.println("        took " + queue.takeFirst());
                  System.out.println("        take");
                  System.out.println("        took " + queue.take());
              } catch (InterruptedException ex) {}
          }
      };

これらのタスクはスレッドプールを用いて実行しました。

      ExecutorService service = Executors.newFixedThreadPool(3);
      service.submit(runnable1);
      service.submit(runnable2);
      service.submit(runnable3);

さて、実行したらどうなるでしょう?

C:\>java BlockingDequeSample
put A
done put A
put B
done put B
put C
        put E
    take
    took B
done put C
    take
    took C
        done put E
        take
        took E
        take
        took A
        take
put D
done put D
        took D

AとBを追加すると、デックの容量はいっぱいになってしまいます。このため、Cを追加しようとしても、追加できずブロックしてしまいます。また、3つ目のタスクもEを追加しようとしてブロックしてしまいました。

その後、2つ目のタスクがBを取り出します。すると、ブロックしていたCの追加処理が完了します。しかし、まだEの追加処理はブロックしたままです。

続けて、Cが2つ目のタスクによって取り出されます。これで、デックに空きができたため、Eの追加処理が完了します。

その後、3つ目のタスクが要素を取りのぞいていきますが、要素がなくなってしまったため再びブロックしてしまいます。そこで、1つ目のタスクがデックにDを追加することにより、ブロックしていた取り出し処理が完了します。

複雑に見えますが、デックの要素数を考えながら結果を見てみると、すぐに理解できるはずです。

ところで、ブロッキングキュー/デックというのはどういう時に使うのでしょう。

たとえば、スレッドプールでタスクを非同期に実行させる場合などに使用することができます。

ブロッキングキューがなかった時代には、このような処理はwaitとnotifyを使用して実装するしかありませんでした。しかし、この実装は意外と手間がかかります。

BlockingQueueインタフェース/BlockingDequeueインタフェースがあれば、このようにマルチスレッドを使用した非同期処理などを、簡単に実装することができるのです。

なお、今回は使用しませんでしたが、BlockingQueueを実装したクラスとしてLinkedBlockingQueueクラス以外にも、ArrayBlockingQueueクラス、PriorityBlockingQueueクラス、DelayQueueクラス、SynchronousQueueクラスがあります。それぞれ特徴があるので、興味をお持ちの方はぜひ調べてみてはいかがでしょうか。

 

著者紹介 櫻庭祐一

横河電機 ネットワーク開発センタ所属。Java in the Box 主筆

今月の櫻庭

今年の夏はとても暑かったですね。それでも、ようやく秋の気配が聞こえてきました。

秋というと、何はともあれ、食べ物がおいしい季節です。いろいろとおいしいものはありますが、櫻庭が気になるのはキノコ。

というとマツタケのことを想像される方も多いかもしれません。もちろん、マツタケもおいしいのですが、それよりも櫻庭が気になるのはポルチーニなのです。

ポルチーニはイタリアを代表するキノコです。以前は生のポルチーニはなかなか手にいれることができなかったのですが、最近はイタリアから空輸されたものを手に入れることができます。

ポルチーニはそのままソテーして食べてもいいですし、パスタとあえてもおいしいです。櫻庭のお気に入りはポルチーニのリゾット。写真はLa Bettola da Ochiaiのポルチーニのリゾット。これは本当においしかったですよ。

ポルチーニ ポルチーニのリゾット
ポルチーニ ポルチーニのリゾット