mike-neckのブログ

Java or Groovy or Swift or Golang

Java 8 Streamの挙動を調べたった

表記の件について、JavaListから作るStreamは操作中に変更をするとその後の挙動についてなんら結果を保証しないので、ListからStreamを作った場合は、変更をしないか、あるいは不可変なListに変換してから作るのが良いです。

と、書いておいてからなのですが、こんなツイート見かけました。

peekListの要素を削除するというやってはいけないパターンです。

@Test
public void intList() {
    List<Integer> list = IntStream.range(0, 10)
            .boxed().collect(toList());
    list.stream()
            .sorted()
            .peek(list::remove)
            .forEach(System.out::println);
}

これを実行するとどうなるでしょうか?

  1. ConcurrentModificationException
  2. 0〜9が表示される
  3. 0〜10が表示される
  4. 何も表示されない

ConcurrentModificationException

さて、上記のコードを以下のように変更するとConcurrentModificationExceptionが発生します。

@Test
public void sample() {
    List<Integer> list = IntStream.range(0, 9)
            .boxed()
            .collect(toList());
    list.stream()
            .limit(7)
            .peek(list::remove)
            .forEach(System.out::println);
}

実行結果

0

java.util.ConcurrentModificationException
    at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1353)
    at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
    at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:529)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:516)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)

この差は何によるのでしょうか?

Sink

ポイントはSinkというConsumerを拡張したインターフェースの実装にあります。

Streamは各操作ごとにオブジェクトが作成されていて、それがリンクドリスト状に連なっていきます。それをストリーム・パイプラインと呼びます。そして、終端操作(forEachcollectなど)が呼び出された時に、ストリーム・パイプラインの中でデータやデータ量などの情報を運ぶのがSinkです。Sinkは状態を二つ持っています。実行可能状態と初期状態の二つです。初期状態→begin(int)(操作開始)→実行可能→accept(T)(操作)→end()(操作終了)→初期状態の順で状態が変わります。Sink自体はインターフェースなのでそのように呼び出すという契約となっています。

ここで各操作のクラスが実装しなければならないメソッドに次のメソッドがあります。

Sink<P_OUT> opWrapSink(int flags, Sink<R> sink);

このメソッドの引数のSink<R>は次の操作、戻り値のSink<P_OUT>は実装するメソッドが提供する操作です。

普通のSinkの実装

ではlimitメソッドでは次のような実装になっています。

ReferencePipeline リンク

@Override
public final Stream<P_OUT> limit(long maxSize) {
    if (maxSize < 0)
        throw new IllegalArgumentException(Long.toString(maxSize));
    return SliceOps.makeRef(this, 0, maxSize);
}

SliceOps.javaというのがlimit操作を提供するクラスです。

その中で、opWrapSinkメソッドをこのように実装しています。

@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
    return new Sink.ChainedReference<T, T>(sink) {
        long n = skip;
        long m = limit >= 0 ? limit : Long.MAX_VALUE;
        @Override
        public void begin(long size) {
            downstream.begin(calcSize(size, skip, m));
        }
        @Override
        public void accept(T t) {
            if (n == 0) {
                if (m > 0) {
                    m--;
                    downstream.accept(t);
                }
            }
            else {
                n--;
            }
        }
        @Override
        public boolean cancellationRequested() {
            return m == 0 || downstream.cancellationRequested();
        }
    };
}

ここで実装されるSinkacceptで次の操作(downstream)の操作が呼び出されています。ほぼほとんどのSinkの実装はこのようになっています。

sortedでのSinkの実装

一方、sorted実装は次のようになっています。

@Override
public final Stream<P_OUT> sorted() {
    return SortedOps.makeRef(this);
}

SortedOps#makeRef次のようになっています

static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
    return new OfRef<>(upstream);
}

SortedOps.OfRefというのがsort操作を提供するクラスです。

その中で、opWrapSinkメソッドはこのように実装されています

@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
    Objects.requireNonNull(sink);
    // If the input is already naturally sorted and this operation
    // also naturally sorted then this is a no-op
    if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
        return sink;
    else if (StreamOpFlag.SIZED.isKnown(flags))
        return new SizedRefSortingSink<>(sink, comparator);
    else
        return new RefSortingSink<>(sink, comparator);
}

ここで、flagsの値にはCollection#spliterator()リンク -> Spliterators#spliterator(Collection, int)リンク -> IteratorSpliterator(Collection, int)リンクと辿って行くと、List#streamから生成されるStreamの特性にSpliterator.SIZEDSpliterator.SUBSIZEDが与えられていることがわかります。

また、コードのコメントを信じると(あかん…)、まだソートされていないので、StreamOpFlag.SORTED.isKnown(flags)falseになると考えられるので(もっとソース嫁)、SizedRefSortingSinkが実装クラスであることがわかります。

そして、SizedRefSortingSinkクラスは次のように実装されています。

private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
    private T[] array;
    private int offset;
    SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
        super(sink, comparator);
    }
    @Override
    @SuppressWarnings("unchecked")
    public void begin(long size) {
        if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        array = (T[]) new Object[(int) size];
    }
    @Override
    public void end() {
        Arrays.sort(array, 0, offset, comparator);
        downstream.begin(offset);
        if (!cancellationWasRequested) {
            for (int i = 0; i < offset; i++)
                downstream.accept(array[i]);
        }
        else {
            for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
                downstream.accept(array[i]);
        }
        downstream.end();
        array = null;
    }
    @Override
    public void accept(T t) {
        array[offset++] = t;
    }
}

ここで注目すべきはacceptメソッドであって、このメソッドではdownstreamの操作は行われません。downstreamの操作はendメソッドで一気に行われます。ここで先ほど書いたSinkの状態を思い出してください。このSizedRefSortingSinkの操作終了(end())が呼び出されて初めて次の操作が開始するのです。したがって、sorted以降のストリーム・パイプラインで記述された操作は、sortedが終了してから初めて行われるのです。そして、mapにおけるSinkの実装で記述したようにacceptメソッドで次の操作(downstream)のacceptが呼び出されるようになっていますので、sorted以降のストリーム・パイプラインの一連の操作はarrayの順番で呼び出されるようになります。すると、最初に引用したツイートのような事象も納得が行くようになります。

Streamの実行

Streamの操作は次のように実行されます(詳しくはForEachOps.ForEachOpコードや、ReduceOpsコードや、ArrayList.ArrayListSpliteratorコードHashMap.KeySpliteratorコードなどを参照)。

  1. 終端操作に与えられた操作をラップしたSinkが作成される
  2. 一つ前の操作のopWrapSinkSinkをラップする(操作を合成する)
  3. すべての中間操作のopWrapSinkSinkをラップする(操作を合成する)
  4. 3.で得たSink(全操作合成済み)のbegin(long)を呼び出す(すべての操作のSinkに波及する)
  5. ソースのSpliterator#tryAdvance(Consumer)trueを返し続ける間、3.で得たSink(全操作合成済み)を渡して処理をおこなう
  6. tryAdvanceでは元のコレクションに変更がないか検査して、変更があった場合はConcurrentModificationExceptionを投げる
  7. tryAdvanceですべての要素が処理されたか検査して、すべて操作された場合はfalseを返す
  8. 3.で得たSink(全操作合成済み)のend()を呼び出す

一方でsortedが挟まった場合のStreamは次のように実行されます。

  1. 上記の3.で得たSinkbegin(long)を呼び出す(sortedbeginの波及が止まる)
  2. ソースのSpliterator#tryAdvance(Consumer)trueを返し続ける間、上記の3.で得たSinkを渡して処理をおこなう(sortedで処理が中断される)
  3. tryAdvanceでは元のコレクションに変更がないか検査して、変更があった場合はConcurrentModificationExceptionを投げる
  4. tryAdvanceですべての要素が処理されたか検査して、すべて操作された場合はfalseを返す
  5. 上記の3.で得たSinkend()を呼び出す
  6. sortedend()にて並び替えが行われる
  7. sorted以降のSinkbegin()が呼び出される
  8. 並べ替えが完了した要素の順番でSink#acceptが適用される
  9. sorted以降のSinkend()が呼び出される

Sinkの実装によりストリームのソースのtryAdvance(正確には変更の検査)が呼び出される順番と操作の順番が入れ替わるために、冒頭の引用のような状況が発生するわけです。

まとめ

  • ストリームのメソッド一つ一つでStreamを実装したクラス(正確にはAbstractPipelineを継承したクラス)が作成される
  • Sinkというインターフェースにすべての操作が合成されていく
  • ソース(Spliterator)のtryAdvanceSinkが渡されることで要素1つずつが順番に実行される
  • sortedされるとそれ以降の操作順の制御はSinkがおこなう

前々からStreamはどのように実装されているのか、どのように遅延評価されているのか興味があったので、いい勉強になった。特にストリーム・パイプラインが双方向リンクドリストで実装されているのがわかったのが収穫だった。というか、今なら遅延評価するオレオレOptionalが書ける気がする。