表記の件について、JavaのList
から作るStream
は操作中に変更をするとその後の挙動についてなんら結果を保証しないので、List
からStream
を作った場合は、変更をしないか、あるいは不可変なList
に変換してから作るのが良いです。
と、書いておいてからなのですが、こんなツイート見かけました。
これは思わぬ結果
ステートフルな中間操作のsorted()かますと
最終的にListは空になるけどforEachで全て出力される。
なんでだろう?
これで今日も眠れないw pic.twitter.com/864kLHyK8K
— Yucchi (@Yucchi_jp) May 8, 2015
peek
でList
の要素を削除するというやってはいけないパターンです。
@Test public void intList() { List<Integer> list = IntStream.range(0, 10) .boxed().collect(toList()); list.stream() .sorted() .peek(list::remove) .forEach(System.out::println); }
これを実行するとどうなるでしょうか?
ConcurrentModificationException
- 0〜9が表示される
- 0〜10が表示される
- 何も表示されない
ConcurrentModificationException
さて、上記のコードを以下のように変更するとConcurrentModificationException
が発生します。
@Test public void sample() { List<Integer> list = IntStream.range(0, 9) .boxed() .collect(toList()); list.stream() .limit(7) .peek(list::remove) .forEach(System.out::println); }
実行結果
0 java.util.ConcurrentModificationException at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1353) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:529) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:516) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
この差は何によるのでしょうか?
Sink
ポイントはSink
というConsumer
を拡張したインターフェースの実装にあります。
Stream
は各操作ごとにオブジェクトが作成されていて、それがリンクドリスト状に連なっていきます。それをストリーム・パイプラインと呼びます。そして、終端操作(forEach
やcollect
など)が呼び出された時に、ストリーム・パイプラインの中でデータやデータ量などの情報を運ぶのがSink
です。Sink
は状態を二つ持っています。実行可能状態と初期状態の二つです。初期状態→begin(int)
(操作開始)→実行可能→accept(T)
(操作)→end()
(操作終了)→初期状態の順で状態が変わります。Sink
自体はインターフェースなのでそのように呼び出すという契約となっています。
ここで各操作のクラスが実装しなければならないメソッドに次のメソッドがあります。
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink);
このメソッドの引数のSink<R>
は次の操作、戻り値のSink<P_OUT>
は実装するメソッドが提供する操作です。
普通のSink
の実装
ではlimit
メソッドでは次のような実装になっています。
ReferencePipeline
リンク
@Override public final Stream<P_OUT> limit(long maxSize) { if (maxSize < 0) throw new IllegalArgumentException(Long.toString(maxSize)); return SliceOps.makeRef(this, 0, maxSize); }
SliceOps.java
というのがlimit
操作を提供するクラスです。
その中で、opWrapSink
メソッドをこのように実装しています。
@Override Sink<T> opWrapSink(int flags, Sink<T> sink) { return new Sink.ChainedReference<T, T>(sink) { long n = skip; long m = limit >= 0 ? limit : Long.MAX_VALUE; @Override public void begin(long size) { downstream.begin(calcSize(size, skip, m)); } @Override public void accept(T t) { if (n == 0) { if (m > 0) { m--; downstream.accept(t); } } else { n--; } } @Override public boolean cancellationRequested() { return m == 0 || downstream.cancellationRequested(); } }; }
ここで実装されるSink
のaccept
で次の操作(downstream
)の操作が呼び出されています。ほぼほとんどのSink
の実装はこのようになっています。
sorted
でのSink
の実装
一方、sorted
の実装は次のようになっています。
@Override public final Stream<P_OUT> sorted() { return SortedOps.makeRef(this); }
SortedOps#makeRef
は次のようになっています。
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) { return new OfRef<>(upstream); }
SortedOps.OfRef
というのがsort
操作を提供するクラスです。
その中で、opWrapSink
メソッドはこのように実装されています。
@Override public Sink<T> opWrapSink(int flags, Sink<T> sink) { Objects.requireNonNull(sink); // If the input is already naturally sorted and this operation // also naturally sorted then this is a no-op if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink<>(sink, comparator); else return new RefSortingSink<>(sink, comparator); }
ここで、flags
の値にはCollection#spliterator()
リンク -> Spliterators#spliterator(Collection, int)
リンク -> IteratorSpliterator(Collection, int)
リンクと辿って行くと、List#stream
から生成されるStream
の特性にSpliterator.SIZED
とSpliterator.SUBSIZED
が与えられていることがわかります。
また、コードのコメントを信じると(あかん…)、まだソートされていないので、StreamOpFlag.SORTED.isKnown(flags)
はfalse
になると考えられるので(もっとソース嫁)、SizedRefSortingSink
が実装クラスであることがわかります。
そして、SizedRefSortingSink
クラスは次のように実装されています。
private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> { private T[] array; private int offset; SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super(sink, comparator); } @Override @SuppressWarnings("unchecked") public void begin(long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); array = (T[]) new Object[(int) size]; } @Override public void end() { Arrays.sort(array, 0, offset, comparator); downstream.begin(offset); if (!cancellationWasRequested) { for (int i = 0; i < offset; i++) downstream.accept(array[i]); } else { for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) downstream.accept(array[i]); } downstream.end(); array = null; } @Override public void accept(T t) { array[offset++] = t; } }
ここで注目すべきはaccept
メソッドであって、このメソッドではdownstream
の操作は行われません。downstream
の操作はend
メソッドで一気に行われます。ここで先ほど書いたSink
の状態を思い出してください。このSizedRefSortingSink
の操作終了(end()
)が呼び出されて初めて次の操作が開始するのです。したがって、sorted
以降のストリーム・パイプラインで記述された操作は、sorted
が終了してから初めて行われるのです。そして、map
におけるSink
の実装で記述したようにaccept
メソッドで次の操作(downstream
)のaccept
が呼び出されるようになっていますので、sorted
以降のストリーム・パイプラインの一連の操作はarray
の順番で呼び出されるようになります。すると、最初に引用したツイートのような事象も納得が行くようになります。
Stream
の実行
Stream
の操作は次のように実行されます(詳しくはForEachOps.ForEachOp
のコードや、ReduceOps
のコードや、ArrayList.ArrayListSpliterator
のコードやHashMap.KeySpliterator
のコードなどを参照)。
- 終端操作に与えられた操作をラップした
Sink
が作成される - 一つ前の操作の
opWrapSink
でSink
をラップする(操作を合成する) - すべての中間操作の
opWrapSink
でSink
をラップする(操作を合成する) - 3.で得た
Sink
(全操作合成済み)のbegin(long)
を呼び出す(すべての操作のSink
に波及する) - ソースの
Spliterator#tryAdvance(Consumer)
がtrue
を返し続ける間、3.で得たSink
(全操作合成済み)を渡して処理をおこなう tryAdvance
では元のコレクションに変更がないか検査して、変更があった場合はConcurrentModificationException
を投げるtryAdvance
ですべての要素が処理されたか検査して、すべて操作された場合はfalse
を返す- 3.で得た
Sink
(全操作合成済み)のend()
を呼び出す
一方でsorted
が挟まった場合のStream
は次のように実行されます。
- 上記の3.で得た
Sink
のbegin(long)
を呼び出す(sorted
でbegin
の波及が止まる) - ソースの
Spliterator#tryAdvance(Consumer)
がtrue
を返し続ける間、上記の3.で得たSink
を渡して処理をおこなう(sorted
で処理が中断される) tryAdvance
では元のコレクションに変更がないか検査して、変更があった場合はConcurrentModificationException
を投げるtryAdvance
ですべての要素が処理されたか検査して、すべて操作された場合はfalse
を返す- 上記の3.で得た
Sink
のend()
を呼び出す sorted
のend()
にて並び替えが行われるsorted
以降のSink
のbegin()
が呼び出される- 並べ替えが完了した要素の順番で
Sink#accept
が適用される sorted
以降のSink
のend()
が呼び出される
Sink
の実装によりストリームのソースのtryAdvance
(正確には変更の検査)が呼び出される順番と操作の順番が入れ替わるために、冒頭の引用のような状況が発生するわけです。
まとめ
- ストリームのメソッド一つ一つで
Stream
を実装したクラス(正確にはAbstractPipeline
を継承したクラス)が作成される Sink
というインターフェースにすべての操作が合成されていく- ソース(
Spliterator
)のtryAdvance
にSink
が渡されることで要素1つずつが順番に実行される sorted
されるとそれ以降の操作順の制御はSink
がおこなう
前々からStream
はどのように実装されているのか、どのように遅延評価されているのか興味があったので、いい勉強になった。特にストリーム・パイプラインが双方向リンクドリストで実装されているのがわかったのが収穫だった。というか、今なら遅延評価するオレオレOptional
が書ける気がする。