先週はノンブロッキングI/Oがどういうものかを解説し,ベンチマークを行ってみました。今週は実際にコードを書いてみましょう。
ノンブロッキングI/Oが真価を発揮するのはサーバーなので,ここでもサーバーに関して解説します。
Selectorクラス
図1 Selectorクラスと関連するクラス |
---|
ノンブロッキングの主役となるのが,先週言及したjava.nio.channels.Selectorクラスです。
主役がSelectorクラスだとしたら,脇役は?
脇役として登場するのはjava.nio.channels.SelectableChannelクラスです。そして,黒子としてjava.nio.channels.SelectionKeyクラスがいます。
Selectorクラスは入出力に関する操作を監視するためのクラスです。監視する対象であるチャネルがSelectableChannelクラスになります。
SelectableChannelクラスは抽象クラスであり,派生クラスとしてServerSocketChannelクラスとSocketChannelクラスが提供されています。
Selectorオブジェクトで入出力操作を監視するためには,監視対象のSelectableChannelオブジェクトを登録する必要があります。このとき,Selectorオブジェクトが直接SelectableChannelオブジェクトを保持するのではなく,間にSelectionKeyオブジェクトを介在させます。
これをクラス図で表したものを図1に示します。
Selectorオブジェクトの生成はstaticメソッドのopenメソッドを使用します。
そして,SelectorオブジェクトにSelectableChannelオブジェクトを登録するときには,SelectableChannelクラスのregisterメソッドを使用します。Selectorオブジェクトに登録するのだから,感覚的には逆の気がしますが,こういうものなのでしょう。
ServerSocketChannelオブジェクトに対応するServerSocketオブジェクトを取得し,バインドを行います。registerメソッドの第1引数はSelectorオブジェクト,第2引数は監視する操作を指定します。
監視できる操作は4種類あり,SelectionKeyクラスで定義されています。
操作名 | 説明 |
---|---|
OP_ACCESS | サーバー・ソケットに対するアクセプト操作 |
OP_CONNECT | ソケットの接続操作 |
OP_READ | 読み込み操作 |
OP_WRITE | 書き込み操作 |
例えば,サーバー・ソケットに対するアクセプトを監視する場合は以下のコードの(1)のように記述し,読み込みを監視するのであれば(2)のように記述します。
// (1)サーバー・ソケットに対するアクセプトを監視 SelectionKey acceptKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT); // (2)ソケットに対する読み込みを監視 SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
registerメソッドの戻り値はSelectorオブジェクトとSelectableChannelオブジェクトの間を介在するSelectionKeyオブジェクトです。
ここで,気を付けなくてはならないのは,SelectionKeyクラスとSelectableChannelクラスは一対一の関係にあるということです。
例えば,次のようなコードを記述したとします。このとき,SelectionKeyオブジェクトとSelectableChannelオブジェクトは一対一なので,key1とkey2は同一オブジェクトになります。そして,監視対象となる操作は上書きされてOP_WRITEだけになってしまいます。
SelectionKey key1 = channel.register(selector, SelectionKey.OP_READ); SelectionKey key2 = channel.register(selector, SelectionKey.OP_WRITE);
複数の操作を監視するには,操作のORを取ります。読み込みと書き込みの両方を監視するのであればSelectionKey.OP_READ | SelectionKey.OP_WRITEと記述します。
また,SelectionKey#interestOpメソッドを使用して,監視する操作を後から変更することもできます。
SelectorオブジェクトにSelectableChannelオブジェクトを登録した後の処理は次のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// 操作の監視 while(selector.select() > 0) { // 行われた操作の一覧を取得 Set |
はじめに,selectorメソッドで操作の監視を行います(2行目)。何らかの操作が可能になるまで,selectorメソッドはブロックします。
操作が可能になったSelectionKeyオブジェクトは5行目に示したように,Selector#selectedKeysメソッドで取得することができます。戻り値はSetオブジェクトなので,Iteratorインタフェースを使用して,SelectionKeyオブジェクトを取り出します(7行目)。
取り出したSelectionKeyに応じた処理を行うわけですが,その前にIterator#removeメソッドをコールして,SelectionKeyオブジェクトを削除しておきます(14行目)。こうしておかないと,操作に対する処理が行われていないと判断されてしまうためです。
最後にSelectionKeyに割り当てられている操作に対応した処理を行います。そのためには,SelctionKeyオブジェクトと対応したSelecatbleChannelオブジェクトが必要です。SelectableChannelオブジェクトの取得には17行目で行っているようにchannelメソッドを使用します。
SelectionKeyオブジェクトがどの操作に対応しているかは,SelectionKeyクラスの次のメソッドで調べることができます。
- isAcceptableメソッド
- isConnectableメソッド
- isReadableメソッド
- isWritableメソッド
また,SelectionKeyオブジェクトが妥当であるかどうかを調べるためにはisValidメソッドを使用します。
24行目から32行目までのif文がif ... else if ...ではなく,単独のif文になっているのは,読み込みと書き込みが同時に可能になる場合があるからです。
Selectorクラスを使用する場合の注意点
Selectorクラスで書き込みを監視する場合,注意が必要です。
SelectorオブジェクトへのSelectableChannelオブジェクトへの登録は,実際に書き込みを行うときに登録するようにします。
入力がない状態ではソケットへの書き込みが可能なので,selectメソッドはブロックすることなく,すぐに返ります。ここで,書き込むデータがあれば問題ではありません。
しかし,書き込みを行わなければ,このループは単なる無限ループになってしまいCPUの負荷が異常に高くなってしまいます。
これを防ぐために,書き込みを行う直前にSelectorオブジェクトに登録し,書き込みが終了したらSelectionKey#cancelメソッドをコールしてSelectorから登録を削除するようにします。
読み込みと書き込みの両方を行う場合も同様です。はじめに登録するときはOP_READで登録を行います。書き込みが必要になった場面で,以下のようにinterstOpsメソッドでOP_WRITEも合わせて登録します。
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
書き込みが終了したら,再びinterstOpsメソッドを使用してOP_READだけを監視するようにします。
また,大量の書き込みを行うと,通信状態や受信側の処理能力により,一時的に書き込み不可の状態になることもあります。したがって,大量の書き込みを一度に行うのではなく,書き込み可能かどうかをチェックしながら複数回に分けて書き込む必要があります。
サンプルを作ってみる
ここまで,Selectorクラスの使い方について説明しましたが,実際にコードがないと実感がわかないですよね。
そこで,簡単なサンプルを作ってみることにしましょう。単にクライアントから送信されたデータをそのままクライアントに送り返すサーバーです。
サンプルは三つのクラスと一つのインタフェースから構成されます。メインとなるクラスがEchoBackServerクラスです。残りのクラスがアクセプトを行うAcceptHandlerクラスと,読み書きを行うIOHandlerクラスです。AcceptHandlerクラスとIOHandlerクラスはHandlerインタフェースをインプリメントしています。
サンプルのソース
それではメインのクラスとなるEchoBackServerクラスから見ていきましょう。
New I/Oを使用した入出力処理はチャネルを用いて行いますが,これはソケット通信においても同様です。
ソケット通信ではjava.nio.channels.SocketChannelクラスとjava.nio.channels.ServerSocketChannelクラスを使用します。サーバーは名前の通りServerSocketChannelクラスを使用します。
java.nio.channels.FileChannelクラスとは異なり,ServerSocketChannelクラスは直接オープンすることが可能です。
しかし,ServerSocketChannelクラスではアドレスとのバインディングを行うことができません。そこで,socketメソッドを使用してServerSocketChannelオブジェクトに対応するServerSocketオブジェクトを取得し,バインドを行います。
// ソケット・チャネルを生成・設定 serverChannel = ServerSocketChannel.open(); serverChannel.socket().setReuseAddress(true); serverChannel.socket().bind(new InetSocketAddress(port));
このまま serverChannel を使用すれば,ブロッキングモードで通信を行うことができます。ノンブロッキングモードにするにはconfigureBlockingメソッドを使用します。引数がbooleanで,値がfalseの場合ノンブロッキングモードになります。
configureBlockingメソッドはServerSocketChannelクラスのスーパークラスであるSelectableChannelクラスで定義されたメソッドです。
// ノンブロッキングモードに設定 serverChannel.configureBlocking(false);
ここまで来て,やっとSelectorクラスの登場です。Selectorクラスも,openメソッドを使用してオブジェクトを生成します。そして,serverChannelをselectorに登録します。このとき監視する操作はアクセプト操作なのでOP_ACCEPTを指定します。
// セレクタの生成 selector = Selector.open(); // ソケット・チャネルをセレクタに登録 serverChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptHandler());
registerメソッドの第3引数には任意のオブジェクトを指定できます。ここで指定したオブジェクトはSelectionKey#attachmentメソッドで取り出すことができます。なぜこんなオブジェクトを使うのかは,もう少し後で説明しましょう。
selectorにSelectableChannelオブジェクトの登録を行った後は,監視を行いましょう。
// セレクタによる監視 while(selector.select() > 0) { // SelectionKeyを取り出す Setkeys = selector.selectedKeys(); for (Iterator it = keys.iterator(); it.hasNext(); ) { SelectionKey key = it.next(); it.remove(); // アタッチしたオブジェクトに処理を委譲 Handler handler = (Handler)key.attachment(); handler.handle(key); } }
前述した説明ではSelectionKeyオブジェクトを取り出して,if文で処理を分岐させていましたが,ここではif文が見あたりません。
ここで使用しているのが,先ほどregisterメソッドの第3引数で指定したオブジェクトです。SelectionKeyオブジェクトにこのようなオブジェクトをアタッチできることを利用して,if文を削除しています。
ただし,OP_READとOP_WRITEを同時に監視している場合などは,アタッチしたオブジェクトの中でif文を使用する必要があります。この例は読み書きを行っているIOHandlerクラスのところで説明します。
ここで使用しているHandlerインタフェースはhandleメソッドが定義されているだけです。HandlerインタフェースをインプリメントしたAcceptHandlerの中身は次のようになっています。
public void handle(SelectionKey key) throws ClosedChannelException, IOException { ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel(); // アクセプト処理 SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); // 入出力用のハンドラを生成し,アタッチする // 監視する操作は読み込みのみ IOHandler handler = new IOHandler(); channel.register(key.selector(), SelectionKey.OP_READ, handler); }
SelectionKeyオブジェクトであるkeyからSelectableChannelオブジェクトをまず取り出します。
ここではアクセプト処理だということはわかっているので,ServerSocketChannelクラスにキャストします。そして,acceptメソッドをコールしてアクセプトを行います。acceptメソッドの戻り値はSocketChannelオブジェクトなので,これもSelectorオブジェクトに登録します。
このとき,先ほどと同様にSelectionKeyオブジェクトにオブジェクトをアタッチしておきます。ここでアタッチするオブジェクトはIOHandlerクラスのオブジェクトです。
このSocketChannelオブジェクトは読み込み・書き込みの両方の操作を行うのですが,前述したように書き込み操作の監視は必要なときだけにします。そのため,registerメソッドではOP_READだけになっています。
次にアタッチしたIOHandlerを見ていきましょう。
IOHandlerクラスは読みこんだデータを保持しておき,書き込み可能状態になったら保持したデータを書き込むという処理を行っています。このデータの保持用にListオブジェクトをフィールドに持ちます。もちろん,読み書きするデータはByteBufferクラスを使用します。
private Listbuffers; public IOHandler() { // 読みこんだデータを格納するためのリストの初期化 buffers = new ArrayList (); }
IOHandlerクラスのhandleメソッドでは読み込み処理と書き込み処理の振り分けを行います。
public void handle(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel channel = (SocketChannel)key.channel(); // 読み込み可であれば,読み込みを行う if (key.isReadable()) { read(key); } // 書き込み可であれば,書き込みを行う if (key.isWritable() && key.isValid()) { write(key); } }
前述したように,読み込みと書き込みは同時に可能になることがあるため,if...else if...を使用せずに,独立した二つのif文で記述してあります。
読み込みは次のように行います。
private void read(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel channel = (SocketChannel)key.channel(); // 読み込み用のバッファの生成 ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); // 読み込み channel.read(buffer); // フリップしてからリストに追加 buffer.flip(); buffers.add(buffer); if (key.interestOps() != (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) { // 読み込み操作に対する監視を行う key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } }
New I/OのチャネルはすべてByteBufferを使用して読み書きを行うので,ここでも読み込み用のByteBufferオブジェクトを生成します。
読み込みを行った後は,flipメソッドをコールして,すぐに書き込みできるようにしておきます。チャネルを用いた読み書きに関しては先月の解説をご参照ください。
データを読み込んだので,エコーバックするための書き込みができる状態になりました。そこで,最後に監視する操作にOP_WRITEも加えておきます。
そして,書き込みを行うwriteメソッドを次に示します。
private void write(SelectionKey key) throws ClosedChannelException, IOException { SocketChannel channel = (SocketChannel)key.channel(); if (!buffers.isEmpty()) { // リストが空でなければ,先頭のバッファを取り出し // 書き込みを行う ByteBuffer buffer = buffers.get(0); channel.write(buffer); // 書き込みが終わったバッファは削除する buffers.remove(0); } else { // 書き込むデータがなければ,書き込み操作の監視をやめる key.interestOps(SelectionKey.OP_READ); } }
書き込むデータが保持されているbuffersから,先頭のByteBufferオブジェクトを取り出し,書き込みを行います。書き込みが終わったデータはbuffersから削除しておきます。
ここで,buffersにまだデータが残っていたとしても,連続して書き込みを行わず,再び書き込み可能状態であるかどうかを調べるためにメソッドを抜けるようにします。
前述したように,書き込むデータがない場合に書き込みの監視をすると,不必要にCPUの負荷を高くしてしまいます。そこで,データがなければ書き込みの監視をやめて,読み込みだけを監視するようにします。
New I/O のまとめ
6回にわたって,New I/Oの解説を行ってきましたが,いかがだったでしょうか。
今までのストリームに比べると,チャネルとバッファを使うことは確かに面倒かもしれません。しかし,パフォーマンスの向上には目をみはるものがあります。特に大量のI/O処理を行う場合やスケーラビリティが求められる場合にNew I/Oは真価を発揮します。
New I/Oを使用しているシステムも最近では徐々に増えてきています。Tomcat 5.5系はNew I/Oを使用していることを存じの方も多いのではないでしょうか。
最近話題になっているのがGlassfishに含まれているHTTPサーバー/ServletコンテナのGrizzlyです。昨年のJavaOneのGrizzlyのセッション資料によると,JSPの処理をCで記述されたHTTPサーバーより高速に行えることが示されています。
大量のI/O処理やスケーラビリティを求められている場合は,New I/Oを検討してみてはいかがでしょうか。それだけの価値はありますよ。
著者紹介 櫻庭祐一 横河電機の研究部門に勤務。同氏のJavaプログラマ向け情報ページ「Java in the Box」はあまりに有名 |