JDK Stream реверс-инженеринг (реконструкция)

a09d6b7c9ecd1fd143dfd97411141677

Привет all!

Вступление

Приглянулась мне однажды идея реверс-инженеринга (реконструкции) StreamAPI из JDK8. Что и как из этого вышло опишу далее.

Ссылки

Актуальный репозиторий

Ветка актуальная для статьи

Вкратце

StreamAPI — это технология появившаяся в Java 8 позволяющая манипулировать данными в наборах (напр. коллекциями) в функциональном стиле (лямбда-выражениями). Более подробно про стримы можно почитать тут.

Зачем и для чего

Для обучения, для более глубокого понимания исследуемой темы. В процессе написания данной статьи было получено немало опыта, как по стримам, так и в целом по Java и программированию. Поэтому эта риверсия является неплохой обучающей практикой. Я бы порекомендовал и новичкам и тем кто уровнем повыше выполнить самостоятельную реализацию стрима. Поверьте, независимо от вашего опыта, вас ждут открытия :)

Название

Название было выбрано следующим образом: Stream Reversed → StreamRe → StreamEr → Streamer

Возможные возможности/невозможности

Поскольку это обучающий реверс-инженеринг, целью которого является максимально понятно реализовать уже реализованное, то не стоит ожидать от стримера эффективного расхода памяти и быстродействия уровня Enterprise. Более того, в конце статьи я приведу пример сравнения производительности разных стримов, в котором стример уступает оригиналу из JDK по быстродействию.

Так же из реализации была исключена возможность распараллеливания стримов (Parallel stream), т.к. приемлемая реализация этого подхода потребует иных принципов построения и выходит за рамки этого материала.

Описанная тут реализация сохраняет следующие преимущества гибкости StreamAPI:

  • чтения данных не происходит до вызова одного из терминальных методов. Т.е., пока мы собираем стример, мы никак не влияем на источник данных, не читаем его, а только формируем набор правил, по которым будет работать конвеер стрима, когда он будет запущен вызовом одного из терминальных методов.

  • после вызова терминального метода, данные последовательно читаются из родительского набора и так же последовательно проходят по цепочке операций, не накапливаясь при этом в коллекциях, массивах и т.д. Конечно, за исключением метода сортировки, который по очевидным причинам требует предварительного накопления данных.

Spliterator vs. Iterator

Для того чтобы Stream мог функционировать, ему необходим источник данных. Стандартная реализация JDK (далее «оригинал»), под капотом, для чтения источника, использует сплитератор — Spliterator.

Основная задача сплитератора — разделять данные на блоки, т.е. порцировать их. Порцирование используется «оригиналом» для воможности распараллеливания стримов, когда разные порции обрабатываются разными потоками. Более подробно, о сплитераторах можно почитать тут.

Поскольку мы не будем реализовывать parallel для стримера, то и в разделении данных на блоки у нас тоже нет необходимости. Для простоты примера хватит итераторa — Iterator, поэтому «под капотом» именно через него и будем получать данные из родительского источника.

Жизненный цикл (внутренние состояния)

Жизненный цикл стримера я разделил на три состояния:

  • Ожидание (WAITING) — начальное состояние стримера. В этом состоянии экземпляры создаются. Пока стример находится в этом состоянии, мы можем конструировать его из операций и вызвать один из терминальных методов когда потребуется «включить конвеер».

  • В работе (OPERATED) — в это состояние стример переходит после вызова любого терминального метода. Это состояние означает, что либо стример находится в работе — «конвеер запущен», либо готов к запуску, т.е. уже сконструирован, а значит вызовы как конвеерных так и терминальных методов более невозможны.

  • Завершен (CLOSED) — Это состояние означает, что стример завершил выполнение работы, ссылка на внешний итератор-источник обнулена (RefCount для GC).В это состояние стример переходит после того как:

    • завершилась работа любого из терминальных методов. Даже если в источнике остались данные. Например findFirst() вернул «первый» элемент. Данные возможно еще остались, но стример отработал свою задачу и может освободить не используемые ссылки.

    • В источнике закончились данные — hasNext() изменил свое состояние с true на false.

    • Извне, был вызван метод явного закрытия — close () и при этом стример находился в WAITING состоянии. Данное условие (WAITING) является обязательным, поскольку мы не можем по запросу завершать работающий стрим. Таковы правила, далее я это рассмотрю.

Подготовка

Разделим методы, стрима которые будем реализовывать на три группы:

Порождающие (factory): empty, of, generate, iterate, concat

Промежуточные (intermediate)/конвеерные: peek, onClose, distinct, filter, skip, limit, sorted, map, mapToInt, mapToLong, mapToDouble, flatMap, flatMapToInt, flatMapToLong, flatMapToDouble

Завершающие (terminal): spliterator, parallel, unordered, forEachOrdered, collect, min, max, reduce, count, forEach, allMatch, anyMatch, noneMatch, findFirst, findAny, iterator, toArray

Прочие: close, isParallel, sequential

Итак, создадим проект с начальной структурой и классом Streamer, реализующий интерфейс java.util.stream.Stream. Позволим IDE сгенерировать пустую реализацию всех методов (перечислены выше). Сгенерированные методы заглушим при помощи UnsupportedOperationException.

В итоге должно получиться примерно так.

Так же, сразу напишем реализацию простых методов — «однострочников» чтобы более к ним не возвращаться.

@Override
public Optional findFirst() {
    return findAny(); //поскольку у нас упорядоченный стрим, то первый элемент 				(First) и есть "произвольный" (Any)
}

@Override
public boolean isParallel() {
    return false; //мы не поддерживаем параллелизм, поэтому всегда false
}

@Override
public Stream sequential() {
    return this; //мы "последовательны", поэтому вернем себя же
}

@Override
public void forEachOrdered(Consumer action) {
    forEach(action); //опять же, мы упорядочены источником, поэтому в нашем 							случае forEach и forEachOrdered эквивалентны
}

@Override
public Spliterator spliterator() {
    return Spliterators.spliteratorUnknownSize(this.iterator(), Spliterator.ORDERED); //создадим сплитератор на основе «внутреннего» 												итератора
}

@Override
public Stream unordered() {
    return this; //так же, можно вернуть себя
}

Создание экземпляров

Под капотом, экземпляры будут создаваться единственым закрытым (private) конструктором, который в качестве аргумента принимает внешний итератор-источник. Этот итератор и будем использовать в качестве источника данных. Клиенты же, как и в оригинале, будут получать экземпляры стримера из статических фабрик. Стоит добавить, что к статической фабрике of () я дополнительно добавил перегруженные методы получения экземпляров Streamer из коллекций, перечисляемых (Iterable) типов, и непосредственно из самих Iterator`ов.

Примеры порождения стримера:

package pw.komarov.streamer;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

public class StreamerInstancesCreationExamples {
    public static void main(String[] args) {
        Streamer.empty(); //пустой

        Streamer.of(new Object()); //единичный объект
        Streamer.of(new Integer[]{1, 4, 8, 17}); //массив
        Streamer.of(Arrays.asList(7.34, 9, 18.7, 3)); //Iterable (List)
        Streamer.of("Foo", "Bar", "Juice", "hello", "streamer"); //из констант

        //Infinite
        Streamer.generate(() -> ThreadLocalRandom.current().nextInt()); //бесконечный (рэндом-число)
        Streamer.generate(() ->
            {
                List strings = Arrays.asList("randomly", "returned", "string", "value");
                return strings.get(ThreadLocalRandom.current().nextInt(strings.size()));
            }); //рэндом значение

        Streamer.iterate(100, (i) -> i * 2); //последовательность {100,200,400.........n}
    }
}

Методы generate() и iterate() порождают бесконечный стрим, который на каждом шаге получает значение из бесконечного итератора, у которого hasNext() всегда == true и «заглушен» метод forEachRemaining():

private static abstract class AbstractInfiniteIterator implements Iterator {
    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public void forEachRemaining(Consumer consumer) {
        throw new UnsupportedOperationException();
    }
}

Итератор для generate():

private static class InfiniteGenerator extends AbstractInfiniteIterator {
    private final Supplier supplier;
    
    InfiniteGenerator(Supplier supplier) {
        this.supplier = supplier;
    }

    @Override
    public E next() {
        return supplier.get();
    }
}

Далее, сначала создаем экземпляр этого генерирующего итератора, и затем из него стример:

public static  Streamer generate(Supplier supplier) {
    return of(new InfiniteGenerator<>(supplier));
}

Похожим образом реализован и iterate():

public static class InfiniteIterator extends AbstractInfiniteIterator {
    private E value; //значение предыдущего шага, при первом вызове — initial

    private final UnaryOperator unaryOperator; //клиенсткая функция 							генерации значения

    InfiniteIterator(E initial, UnaryOperator unaryOperator) {
        this.value = initial;
        this.unaryOperator = unaryOperator;
    }

    @Override
    public E next() {
        E prev = this.value;
        this.value = unaryOperator.apply(prev);
        return prev;
    }
}

public static  Streamer iterate(E initial, UnaryOperator unaryOperator {
    return of(new InfiniteIterator<>(initial, unaryOperator));
}

В итоге должно получиться примерно так.

Закрытие/завершение

Опишем два метода закрытия/завершения стримера. Первый — internalClose() для внутреннего использования. Вызывать его будем когда работа стримера логически завершена. Например закончились данные в источнике или завершена работа одного из терминальных методов. В общем, в тех случаях, когда использование стримера более невозможно. Этот метод будет так же обнулять ссылки на внешние ресурсы (чтобы уменьшить RefCount для GC) и переводить стример в CLOSED состояние.

Второй метод — внешнего закрытия, реализует close() интерфейса AutoCloseable. Фактически же, завершает стример только из состояния WAITING. Это сделано для того, чтобы внешний вызов не мог повлиять на работу выполняющегося стрима. Так работает оригинал. На мой взгляд это поведение не логично. И вот почему… Предположим, что стрим выполняет тяжеловесную операцию одним из терминальных методов. В какой то момент (к примеру, пользователь запросил отмену действия), мы понимаем что больше не нуждаемся в этой тяжеловесной работе и хотим ее принудительно прекратить. Стрим исполняется в другом потоке, но у нас есть указатель на этот стрим. Вызываем close () в надежде прекратить выполнение операции, но он продолжает работать как ни в чем небывало… А жаль… Ведь так хотелось… :).

Второй важной частью работы этого метода является вызов пользовательских onClose последовательностей. Но и тут скрывается подвох. В оригинальном стриме эти onClose выполняются только в случае явного вызова метода close (). Т.е. если стрим завершил работу, допустим найдено искомое (min, max и т.д.), то onClose будут просто проигнорированы, а ведь возможно там были важные финализаторы… При описанном поведении инструмент предоставляемый методом onClose () вообще не представляет практической ценности, поскольку те же самые операции можно вызвать «вручную» из клиентского кода, после вызова close () например. Можно будет даже более гибко обработать возможные исключения.

Ну что же, имеем то, что имеем… поэтому для поддержания совместимости реализуем эти особенности в том же виде:

private enum State {WAITING, OPERATED, CLOSED}

private State state = State.WAITING;

private final List onCloseSequences = new LinkedList<>();

@Override
public void close() {
    if (state == State.WAITING)
        internalClose();

    //обработаем (выполним) клиентские onClose последовательности...
    RuntimeException rte = null;
    for (Iterator iterator = onCloseSequences.iterator(); 					iterator.hasNext(); ) {
        Runnable runnable = iterator.next();
        try {
            runnable.run();
        } catch (RuntimeException e) {
            if (rte == null) //если это первое исключение в цепочке...
                rte = e; //...сохраним его
            else //если не первое...
                rte.addSuppressed(e); //...сохраним его в suppressed первого
        } finally {
            iterator.remove();
        }
    }

    if (rte != null)
        throw rte;
}

private void internalClose() {
    externalIterator = null;

    state = State.CLOSED;
}

private void throwIfNotWaiting() {
    if (state != State.WAITING)
        throw new IllegalStateException("stream has already been operated upon 		or closed");
}

@Override
public Stream onClose(Runnable closeHandler) {
    throwIfNotWaiting();

    onCloseSequences.add(closeHandler);

    return this;
}

Контракт onClose () для стрима гласит, что первое исключение погашается и сохраняется, прочие исключение (если они есть), добавляются в suppressed первого. И если было первое, то оно и бросается после выполнения всех onClose`ов. Этот контракт так же сохранен в реализации приведенной выше.

Расстановка по шаблону

Ранее мы реализовали метод проверки текущего состояния стримера, который бросает IllegalStateException если стример не в WAITING состоянии. Теперь пришло время его расставить в места где это нужно. А нужно это сделать во всех терминальных и конвеерных методах, кроме «однострочников» описанных ранее, т.к. они все равно ссылаются на эти методы.

Поскольку конвеерные методы будут работать по принципу Builder`a — иметь возможность телескопического построения (прим.: object.method1().method2().method3().methodN()…), то каждый из этих методов должен возвращать экземпляр себя. В итоге шаблон конвеерного метода приобрел такой вид:

{
    throwIfNotWaiting();
        
          //todo: тут будет создание и добавление операций

    return this;
}

Каждый терминальный метод должен переводить стример из WAITING в OPERATED состояние, а по завершению работы — корректно закрывать его. Резюмируя вышесказанное, «шаблон» терминального метода приобретает такой вид:

{
    throwIfNotWaiting(); //IllegalStateException если пытаемся использовать 				запущенный или завершенный стример

    state = State.OPERATED; //переведем в OPERATED

	try {
	    ;//todo: терминальные операции…
	} finally {
	    internalClose(); //выполним завершение
	}
    throw new UnsupportedOperationException("will be soon");  //чтобы не забыть 					про return :)
}

В итоге получилось так.

Промежуточные операции (intermediate/conveyor)

Ну вот мы и подошли к логике работы стримера. Как известно, стрим состоит из набора операций, которые последовательно применяются к данным которые представлены этому стриму. Поставим вопрос, как будем хранить и как будем «строить» наборы этих операций? Тут все очень просто.

Для обозначения самой операции, объявим интерфейс:

private interface IntermediateOperation {}

Набор операций — список элементов этого интерфейса:

private List intermediateOperations = new LinkedList<>();

А добавлять в этот список конкретные операции будем из конвеерных методов.

Из всех конвеерных операций стрима выделим отдельную группу — фильтрующие операции. Это операции, которые на основании некоторого условия (предиката), зависящего от типа операции, определяют — пройдет ли элемент данных далее по конвееру или будет отброшен на текущем шаге. Вот список всех конвеерные методов, относящихся к фильтрующим операциям: skip(), limit(), distinct(), filter().

Для обозначения этих операций, объявим еще один интерфейс:

private interface FilteringOperation extends IntermediateOperation, 	Predicate {}

Predicate является функциональным интерфейсом (FunctionalInterface, подробнее https://habr.com/ru/post/512730/), и его функциональный метод — boolean test(). Реализацией этого метода в конкретной операции мы и будем определять, пройдет ли элемент по конвееру дальше, или будет «отброшен».

Вот так будет выглядеть класс конкретной операции (в приведенном случае skip):

private static class SkipOperation implements FilteringOperation {
    private final long totalCount; //количество элементов которые требуется 								"пропустить"
    private long processedCount; //количество уже "пропущеных" элементов 								текущей операцией
    
    SkipOperation(long totalCount) {
        this.totalCount = totalCount;
    }

    @Override
    public boolean test(Object o) {
        if (processedCount < totalCount) {
            processedCount++;
            
            return true; //пропустим элемент далее
        }

        return false; //отбросим/отфильтруем элемент
    }
}

@Override
public Stream skip(long n) {
    throwIfNotWaiting(); //проверим текущее состояние

    intermediateOperations.add(new SkipOperation(n)); //создадим Skip-операцию, 								и добавим ее в список операций.

    return this; //вернем экземпляр «себя» для возможности телескопического 											построения
}

По такому же принципу реализуем добавление остальных фильтрующих операций:

//limit()
private long filteredByLimit; //количество "отсеяных" limit'ом элементов

private class LimitOperation implements FilteringOperation {
    private final long maxSize; //собственно и есть лимит

    LimitOperation(long maxSize) {
        this.maxSize = maxSize;
    }

    @Override
    public boolean test(Object o) {
        return maxSize < ++filteredByLimit;
    }
}

@Override
public Stream limit(long maxSize) {
    throwIfNotWaiting();

    intermediateOperations.add(new LimitOperation(maxSize));

    return this;
}

//distinct()

private static class DistinctOperation implements FilteringOperation {
    private Set objects = new HashSet<>();
    
    @Override
    public boolean test(Object o) {
        return !objects.add(o);
    }
}

@Override
public Stream distinct() {
    throwIfNotWaiting();

    intermediateOperations.add(new DistinctOperation());

    return this;
}

private static class FilterOperation implements FilteringOperation {
    private final Predicate predicate;

    public FilterOperation(Predicate predicate) {
        this.predicate = predicate;
    }

    @Override
    public boolean test(T t) {
        return !predicate.test(t);
    }
}

@Override
public Stream filter(Predicate predicate) {
    throwIfNotWaiting();

    intermediateOperations.add(new FilterOperation<>(predicate));

    return this;
}

Не фильтрующие:

//sorted()
public static class SortedOperation implements IntermediateOperation {
    private final Comparator comparator;

    public SortedOperation() {
        this.comparator = null;
    }

    public SortedOperation(Comparator comparator) {
        this.comparator = comparator;
    }
}

@Override
public Stream sorted() {
    throwIfNotWaiting();

    intermediateOperations.add(new SortedOperation<>());

    return this;
}

@Override
public Stream sorted(Comparator comparator) {
    throwIfNotWaiting();

    intermediateOperations.add(new SortedOperation<>(comparator));

    return this;
}

//map()
private static class MapOperation implements IntermediateOperation {
    private final Function function;
    MapOperation(Function function) {
        this.function = function;
    }
}

@SuppressWarnings("unchecked")
@Override
public  Stream map(Function mapper) {
    throwIfNotWaiting();

    intermediateOperations.add(new MapOperation<>(mapper));

    return (Streamer) this;
}

//flatMap()
private static class FlatMapOperation implements IntermediateOperation {
    private final Function> function;
	    FlatMapOperation(Function> 								function) {
        this.function = function;
    }
}

@SuppressWarnings("unchecked")
@Override
public  Stream flatMap(Function> 				mapper) {
    throwIfNotWaiting();

    intermediateOperations.add(new FlatMapOperation<>(mapper));

    return (Streamer) this;
}

Отдельно тут стоит отметить peek. peek-непьющий трудовик. peek-операции, как и onClose, будем хранить в отдельном списке, без классов-оберток над ним, т.к. peek хоть и Intermediate операции, но работают немного по другому принципу — peek-последовательности выполняются ВСЕ, РАЗОМ, ПОСЛЕ вычисления КАЖДОГО элемента.

//peek()

private final List> peekSequences = new LinkedList<>();

@Override
public Stream peek(Consumer action) {
    throwIfNotWaiting();

    peekSequences.add(action);

    return this;
}

В итоге.

Конвеерная логика

Теперь реализуем главное — логику работы «конвеера». «На пальцах» можно описать работу этого механизма примерно так: внешний итератор (externalIterator) получает элемент от источника, затем он проходит (или не проходит) по конвееру и передается запросившему клиенту через «обратный» (streamerIterator) итератор. Похоже на систему водопровода — когда вода подается в систему насосом (насос тут externalIterator, качает речную воду), проходит по трубам, фильтрам (конвеер), которые отсеивают нежелательные элементы, и подается потребителю по средствам открытия крана. Кран для потребителя — streamerIterator.

Если где-то, на этом пути элемент был отброшен (отфильтрован), то из источника будет запрошен следующий. И так далее. До тех пор, пока либо в итераторе-источнике не закончатся данные и в этом случае мы так же сообщим потребителю: «данных для Вас больше нет» (hasNext() == false), либо представим этот элемент потребителю.

Реализация конвеера (водопровода):

private class StreamerIterator implements Iterator {
    private Boolean hasNext;
    private T next;

    @Override
    public boolean hasNext() {
        if (hasNext == null) {

            calcNextAndHasNext();

            if (!hasNext && state != State.CLOSED) //если нет больше данных...
                internalClose(); //...завершим
        }

        return hasNext;
    }

    @Override
    public T next() {
        if (!hasNext()) //запросили, но нет больше элементов?...
            throw new NoSuchElementException(); //... получите exception

        hasNext = null; //переведем состояние hasNext в «неизвестно»

        return next;
    }

    private void calcNextAndHasNext() { //метод расчитывающий внутренние			закрытые поля next и hasNext на основании наличия опционала из getNext()
        Optional opt = getNext(intermediateOperations);

        //noinspection OptionalAssignedToNull
        hasNext = opt != null; //если опционал не null — естьСледующий = true
        if (hasNext) //а если следующий есть — то...
            next = opt.orElse(null); //… это значение из опционала (если 						значения в опционале нет — то оно null
    }
	
    //водопровод:
    @SuppressWarnings("unchecked")
    private Optional getNext(List operations) {
        T next = null;
        boolean terminated = false;
        boolean hasNext = externalIterator.hasNext();

        while (hasNext && !terminated) {
            next = externalIterator.next();
            boolean filtered = false;
            for (IntermediateOperation operation : operations) //пройдем по 												всем операциям
                if (operation instanceof FilteringOperation) { //если операция 										- фильтрующая...
                    if (!filtered) { //… и не была отфильтрована ранее
                        filtered = ((FilteringOperation) 								operation).test(next); //фильтруем? (test`ом)
                        if (filtered && operation instanceof LimitOperation)
                            terminated = true; //а если была отфильтрована 					лимитной — то еще и прервем while
                    }
                } else if (operation instanceof MapOperation) //если map-												операция
                    next = (T) ((MapOperation)operation).function.apply(next);
                else
                    throw new UnsupportedOperationException("getNext(): " + 						operation.getClass().getSimpleName()); 													//неизвестная
            if (!filtered)
                break;
            else
                hasNext = externalIterator.hasNext();
        }

        if (hasNext && !terminated) {
						//применим к полученному в итоге значению peek-операции
            for (Consumer peekSequence : peekSequences)
                peekSequence.accept(next);

            return Optional.ofNullable(next);
        }

        //noinspection OptionalAssignedToNull
        return null;
    }
}

Объявим переменную (а это — кран водопровода):

private final StreamerIterator streamerIterator = new StreamerIterator();

P.S. Тут есть один нюанс. Optional у меня может быть null`ом. Да может, и это его логичное на мой взгляд применение. Не нашли значение — опционал == null, нашли значение, опционал его содержит (даже если оно null), иначе какая польза от этого опционала? Да никакой! К тому же, такое его использование осуществляется только внутри закрытых методов, а значит не нарушает никаких внешних соглашений. Но, в своем рабочем коде я использую свой класс NullableOptional<>. Помимо того что он может быть EMPTY (в случаях когда значение не найдено), в нем еще есть и некий сахар, например elseIf(), которого мне переодически нехватает в JDK Optional<> как дополнение для ifPresent(). К сожалению Optional<> объявлен как final и поэтому мой NullableOptional растет отдельной иерархией. Если кому интересно, можете глянуть (покрытие unit-тестами прилагается):

https://github.com/koma1/Streamer/compare/NullableOptional

Наладочный пуск

Настало время выполнить первый тестовый запуск. Для этого добавим реализацию двух терминальных методов: iterator() и forEach():

@Override
public Iterator iterator() {
    throwIfNotWaiting(); //бросим исключение если не в WAITING состоянии

    state = State.OPERATED; //сменим состояние на OPERATED

    return streamerIterator; //вернем «внутренний» итератор - (кран)
}

@Override
public void forEach(Consumer action) {
    throwIfNotWaiting();

    state = State.OPERATED;

    while (streamerIterator.hasNext()) //пока в «кране» есть вода...
        action.accept(streamerIterator.next());//...применим action к этой воде
}

Коммитушка

Протестируем:

final Stream stream =
    Streamer
        .of(108, 5, 12, 11, 4, 9, 7, 5) //инстанциируем стример из набора 												констант
            .distinct() //(108, 5, 12, 11, 4, 9, 7, [5]) - отбросим дубли
            .skip(1)    //([108], 5, 12, 11, 4, 9, 7) - отбросим из начала - 										один элемент
            .limit(6)   //(5, 12, 11, 4, 9, 7) - лимитируем выборку в шесть 											элементов
            .limit(5)   //(5, 12, 11, 4, 9, [7]) - их всего шесть,,,, 									значит лимитируем в пять :)
            .map(i -> i == 11 ? 12 : i) //(5, 12, [11]->12, 4, 9) //там где 								значение == 11, заменим на 12, в 								других случаях - оставим как есть
            .distinct() //(5, 12, [12], 4, 9) //повторно отсеим новые дубли
            .map(i -> (i & 1) == 1 ? i * 2 : i) //([5]->10, 12, 4, [9]->18) 									//каждое нечетное умножим на 									два, остальные оставим как есть
            .skip(1) //([10], 12, 4, 18) - отбросим из начала - один элемент
            .map(String::valueOf) //("12", "4", "18") - преобразуем в строковые 						(изменится и тип стрима, поэтому он объявлен 						как )
            .map(s -> s.equals("12") ? "twelve" : s.equals("18") ? "eighteen" : String.format("(%s)unknown", s)) //то, что знаем, преобразуем в строки
        ; 					

stream.forEach(System.out::println); //("twelve", "(4)unknown", "eighteen")

Версия на github`e

А вот и результат его выполнения:

twelve
(4)unknown
eighteen

Process finished with exit code 0

Вполне ожидаемый. Для проверки можно применить «хитрость», заменить Streamer.of на Stream.of и посмотреть как отработает «оригинал». Результат в обоих случаях должен быть одинаковый. Ну вот мы и реализовали большинство конвеерных методов и два терминальных, которых достаточно для проверки работы стримера. Из конвеерных, пока не реализованы: sorted() и мэпперы (mapTo…, flatMap (), flatMapTo…). Эти методы имеют некоторые особенности, поэтому рассмотрим их реализацию отдельно.

sorted ()

Как следует из названия — данный метод сортирует данные в стриме. Делает он это Comparator`ом представленным в аргументе, либо компаратаром для представленного в стриме типа (он должен быть Comparable с собой же, иначе — ClassCastException).

Особенностью работы данного метода является то, что операция сортировки не является последовательной, т.е. требует предварительного накопления всех имеющихся в источнике данных.

Рассмотрим условный пример:

	streamer.iterate(…)
			  .limit(…) // 1.1
		.sorted() //1
			  .distinct(…) //2.1
			  .filter(…) //2.2
		.sorted() //2
			  .map(…) //3.1
			  .distinct(…) //3.2
		.sorted() //3
			  .skip(…)   //n.1
			  .filter(…)  //n.2

В этом примере операции можно разделить на три условных блока с сортировкой в конце каждого (последние операции «n.1» и «n.2» не замыкаются сортировкой, поэтому не входят в условный блок). Для того, чтобы выполнить этот пример мы должны пойти примерно следующим путем:

  1. вычитать весь «внешний источник-итератор» в какой либо контейнер (коллекция, массив, файл и т.д.)

  2. «Прогнать» элементы получившегося контейнера, последовательно по конвееру операций текущего «условного блока»

  3. отсортировать этот контейнер с применением указанного в сорировщике компаратора (либо компаратором по умолчанию в случае отсутствия первого).

  4. заменить указатель внешнего итератора-источника, на итератор этой отсортированной коллекции. Теперь наш источник, это нами же порожденный внутренний контейнер.

  5. сменить «условный блок» на следующий и выполнить данный алгоритм заново, с пункта №1. Если это был последний «условный блок», то считать выполнение сортировок оконченой и вернуть управление конвееру с итератором, указывающим на итератор отсортированный ранее коллекции.

Если тоже самое выразить языком кода, то у меня получился следующий набор изменений:

//sorted()
private int sortedCount; //поле стримера, хранит кол-во операций сортировки

...
  
intermediateOperations.add(...);
sortedCount++;

...

@Override
public boolean hasNext() {
    if (hasNext == null) {

        if (sortedCount > 0) //если есть сортировки...
            calculateSorted(); //… выполним сначала их...

        calcNextAndHasNext();
...
@SuppressWarnings({"OptionalAssignedToNull","unchecked"})
private void calculateSorted() {
    for (int i = 1; i <= sortedCount; i++) { //цикл по «условным блокам» 
        final List localOperations = new LinkedList<>();
        SortedOperation sortedOperation = null;
        for (Iterator itr = intermediateOperations.iterator(); itr.hasNext(); ) {
            IntermediateOperation operation = itr.next();
            try {
                if (operation instanceof SortedOperation) { //если операция 						сортировки — выделим этот блок...
                    sortedOperation = (SortedOperation) operation;
                    break;
                } else
                    localOperations.add(operation);
            } finally {
                itr.remove();
            }
        }

        //на основании «условного блока» соберем коллекцию
        final List data = new ArrayList<>();
        Optional nextOpt;
        do {
            nextOpt = getNext(localOperations);
            if (nextOpt != null)
                data.add(nextOpt.orElse(null));
        } while (nextOpt != null);

        //отсортируем получившийся список...
        if (sortedOperation != null)
            data.sort(sortedOperation.comparator);

        //подменим итератор
        externalIterator = data.iterator();
    }
}

diff commit`a

В этом коммите так же изменен и StressRunner — добавлено несколько сортировок.

flatMap ()

Этот метод порождает новый стрим, «раскрывая/разворачивая/расхлопывая» элементы родительского стрима. Схематично это можно отобразить так:

	- element1
		- subelement1_FROM_element1
		- subelement2_FROM_element1
		- subelement3_FROM_element1
	- element2
		- subelement4_FROM_element2
		- subelement5_FROM_element2

Простейший способ реализации может выглядеть так:

@Override
public  Stream flatMap(Function> mapper) {
    Objects.requireNonNull(mapper);

    throwIfNotWaiting();

    Stream result = Stream.empty();

    for (T t : this)
        result = Stream.concat(result, mapper.apply(t));

    return result;
}

Это решение не совсем корректно. В нем порождаемый стрим не является последовательным. Данные в нем накапливаются и затем соединяются (конкатенируются) в общий итоговый результат. Это наглядно демонстрирует пример.

Запустив его, мы увидим единоразовый, массовый «выброс» результата в консоль. Но нас это не устраивает, мы предпочитаем «последовательность» «массовости». Исправить это можно реализовав например такую идею: Опишем класс итератора результатирующего типа —Iterator. На вход ему будем передавать итератор текущего стрима (назовем его OfT) и клиентскую функцию mapper которая будет раскладывать элементы полученные из — OfT на элементы подмножества — ofR, которые и будем по одному возвращать клиенту. Звучит запутанно, не совсем понятно, поэтому лучше смотреть код:

//flatMap()

@Override
public  Stream flatMap(Function> 					mapper) {
    Objects.requireNonNull(mapper);

    class IteratorOfR implements Iterator {
        private final Iterator OfT = Streamer.this.iterator(); 					//родительский итератор (содержит элементы множества, которые 					будем раскладывать)
        private Iterator ofR; //элементы подмножества Stream, 								текущего элемента из ofT, 							которые и будем возвращать конечному клиенту
        
        @Override
        public R next() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofR.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofR == null || !ofR.hasNext()) && OfT.hasNext()) //если ofR 								не задан (напр.: первый запрос), 								или в ofR отсутствуют элементы и 								при этом есть что раскладывать в 								родительском (ofT)...
                ofR = mapper.apply(OfT.next()).iterator(); //...разложим 								элемент из ofT в подмножество ofR
            return ofR != null && ofR.hasNext();
        }
    }

    return Streamer.of(new IteratorOfR());
}

Кстати, IteratorOfR я решил сделать вложенным (enclosure) классом, так как его использование за пределами метода flatMap () не предполагается.

https://github.com/koma1/Streamer/commit/51dc50c2c3ff9c429d1ed7b3046081e8e664f396

[flat]MapTo{Int/Long/Double}()

Этот набор методов стоит рассмотреть отдельно. Все они возвращают стрим одного из трех типов: IntStream, LongStream, DoubleStream. Спецификой данных типов стримов, является то, что они оперируют примитивами, а не обертками. Изза этого, например IntStream гораздо быстрее работает с числами, чем Stream, ведь первый работает со значением, а второй с оберткой. Реализация этих методов схожа с реализацией flatMap() описанной выше. Тут стоит правда отметить, что поскольку mapTo[Int/Long/Double]() возвращают указанные выше типы стримов, то принцип их реализации немного отличается от обычного map() и более похож на реализацию flatMap(), за тем исключением что элементы родительского стрима не раскладываются на подмножества, а модифицируются соответствующим mapper`ом и возвращаются по одному. Звучит запутанно, смотрим код:

@Override
public IntStream mapToInt(ToIntFunction mapper) {

    Objects.requireNonNull(mapper);

    class OfInt implements PrimitiveIterator.OfInt {

        @Override
        public int nextInt() {
            return mapper.applyAsInt(streamerIterator.next());
        }
        @Override
        public boolean hasNext() {
            return streamerIterator.hasNext();
        }
    }

    return StreamSupport
            .intStream(
                    Spliterators.spliteratorUnknownSize(
                            new OfInt(),
                            0),
                    false);
}

Оставшиеся методы …mapTo…() сделаны схожим образом, поэтому просто приведу их код:

@Override
public LongStream mapToLong(ToLongFunction mapper) {
    Objects.requireNonNull(mapper);

    class OfLong implements PrimitiveIterator.OfLong {
        private final Iterator ofT = Streamer.this.iterator();

        @Override
        public long nextLong() {
            return mapper.applyAsLong(ofT.next());
        }

        @Override
        public boolean hasNext() {
            return ofT.hasNext();
        }
    }

    return StreamSupport
            .longStream(
                    Spliterators.spliteratorUnknownSize(
                            new OfLong(),
                            0),
                    false);
}

@Override
public DoubleStream mapToDouble(ToDoubleFunction mapper) {
    Objects.requireNonNull(mapper);

    class OfDouble implements PrimitiveIterator.OfDouble {
        private final Iterator ofT = Streamer.this.iterator();

        @Override
        public double nextDouble() {
            return mapper.applyAsDouble(ofT.next());
        }

        @Override
        public boolean hasNext() {
            return ofT.hasNext();
        }
    }

    return StreamSupport
            .doubleStream(
                    Spliterators.spliteratorUnknownSize(
                            new OfDouble(),
                            0),
                    false);
}

@Override
public IntStream flatMapToInt(Function mapper) {
    Objects.requireNonNull(mapper);

    class OfInt implements PrimitiveIterator.OfInt {
        private final Iterator ofT = Streamer.this.iterator();
        private PrimitiveIterator.OfInt ofInt;


        @Override
        public int nextInt() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofInt.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofInt == null || !ofInt.hasNext()) && ofT.hasNext())
                ofInt = mapper.apply(ofT.next()).iterator();

            return ofInt != null && ofInt.hasNext();
        }
    }

    return StreamSupport.intStream(
            Spliterators.spliteratorUnknownSize(new OfInt(), 0),
            false
    );
}

@Override
public LongStream flatMapToLong(Function 							mapper) {
    Objects.requireNonNull(mapper);

    class OfLong implements PrimitiveIterator.OfLong {

        private final Iterator ofT = Streamer.this.iterator();
        private PrimitiveIterator.OfLong ofLong;

        @Override
        public long nextLong() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofLong.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofLong == null || !ofLong.hasNext()) && ofT.hasNext())
                ofLong = mapper.apply(ofT.next()).iterator();

            return ofLong != null && ofLong.hasNext();
        }
    }

    return StreamSupport.longStream(
            Spliterators.spliteratorUnknownSize(new OfLong(), 0),
            false
    );
}

@Override
public DoubleStream flatMapToDouble(Function 						mapper) {
    Objects.requireNonNull(mapper);

    class OfDouble implements PrimitiveIterator.OfDouble {
        private final Iterator ofT = Streamer.this.iterator();

        private PrimitiveIterator.OfDouble ofDouble;

        @Override
        public double nextDouble() {
            if (!hasNext())
                throw new NoSuchElementException();

            return ofDouble.next();
        }

        @Override
        public boolean hasNext() {
            while ((ofDouble == null || !ofDouble.hasNext()) && ofT.hasNext())
                ofDouble = mapper.apply(ofT.next()).iterator();

            return ofDouble != null && ofDouble.hasNext();
        }
    }

    return StreamSupport.doubleStream(
            Spliterators.spliteratorUnknownSize(new OfDouble(), 0),
            false
    );
}

Терминальные методы

Все терминальные методы стримера работают по общей последовательности действий: 1 — проверка корректности переданных аргументов (как правило — «не null»); 2 — проверка текущего состояния — должно быть WAITING, если ДА, то переводим стример в OPERATED, в противном случае бросаем исключение; 3 — выполнение требуемой логики метода и возврат результата; 4 — завершение стримера. Шаблон описанный выше выглядит так:

public ... someTerminalMethod(... args) {
    Objects.requireNonNull(args); //1 — проверка аргументов

    throwIfNotWaitingOrSetOperated(); //2 — проверка и переключение состояния

    try {
		... //3 - выполнение требуемых действий...
    } finally {
        internalClose(); //4 — завершение стримера
    }
}

Еще, тут стоит добавить, что внутри терминальных методов мы «вращаем» streamerIterator. Напомню — это наш «кран» из «водопроводной», а не «речной воды».

P.S., так же, во всех конвеерных методах я заменил возвращаемый тип со Stream на Streamer т.к. это помогает избежать ненужных приведений типов в клиентском коде.

diff

P.P. S. так же, в этом коммите я покрыл тестами терминальные методы. Отчасти, логику работы этих методов можно понять по этим тестам.

Баги

В процессе написания этой публикации были обнаружены некоторые баги. Например такой код приведет к ClassCastException:

    Streamer streamer = Streamer.of("10", "5", "15");
    streamer.map(Integer::valueOf); //CCE не тут...
    streamer.forEach(System.out::println); //← а тут!

Это происходит потому, что стример объявлен как Streamer, но после вызова map() происходит смена типа на Streamer. В forEach(), параметр action объявлен как Consumersuper T>. На этапе компиляции, компилятор неявно добавит приведение к типу: (String)streamerIterator.next() в методе forEach(), которое и приведет к CCE в момент выполнения.

Чтобы решить эту проблему, можно воспользоваться способом, которым реализован flatMap () — возвращать экземпляр нового стримера, который связан с текущим, а текущий переводить в OPERATED. Похожим образом работает и оригинальный стрим, но подробнее об этом поговорим позже. Для внедрения исправления, необходимо всего лишь заменить:

© Habrahabr.ru