Pull to refresh

Пишем свой Spliterator

Reading time11 min
Views51K
Многие из вас уже попробовали на вкус Stream API — потоки Java 8. Наверняка у некоторых возникло желание не только пользоваться готовыми потоками от коллекций, массивов, случайных чисел, но и создать какой-то принципиально новый поток. Для этого вам потребуется написать свой сплитератор. Spliterator — это начинка потока, публичная часть его внутренней логики. В этой статье я расскажу, как и зачем я писал сплитератор.

Что же такое сплитератор


Сплитератор — это интерфейс, который содержит 8 методов, причём четыре из них уже имеют реализацию по умолчанию. Оставшиеся методы — это tryAdvance, trySplit, estimateSize и characteristics. Существуют также специальные модификации сплитератора для примитивных типов int, long и double: они добавляют несколько дополнительных методов, чтобы избежать боксинга. Сплитератор похож на обычный итератор. Основное отличие — умение разделиться (split) на две части — лежит в основе параллельной работы потоков. Также в целях оптимизации сплитератор имеет ряд флагов-характеристик и может сообщить точно или приблизительно свой размер. Наконец, сплитератор никогда не модифицирует источник данных: у него нет метода remove как у итератора. Рассмотрим методы подробнее:

  • tryAdvance(Consumer) — объединение методов итератора hasNext() и next(). Если у сплитератора есть следующий элемент, он должен вызвать переданную функцию с этим элементом и вернуть true, иначе функцию не вызывать и вернуть false.
  • trySplit() — попытаться поделиться надвое. Метод возвращает новый сплитератор, который будет пробегать по первой половине исходного набора данных, при этом сам текущий сплитератор перепрыгивает на вторую половину. Лучше всего, когда половины примерно равны, но это не обязательно. Особенно неравномерно делятся сплитераторы с бесконечным набором данных: после деления один из сплитераторов обрабатывает конечный объём, а второй остаётся бесконечным. Метод trySplit() имеет законное право не делиться и вернуть null (не случайно там try). Обычно это делается, когда в текущем сплитераторе осталось мало данных (скажем, только один элемент).
  • characteristics() — возвращает битовую маску характеристик сплитератора. Их на данный момент восемь:
    1. ORDERED — если порядок данных имеет значение. К примеру, сплитератор от HashSet не имеет этой характеристики, потому что порядок данных в HashSet зависит от реализации. Отсутствие этой характеристики автоматически переведёт параллельный поток в неупорядоченный режим, благодаря чему он сможет работать быстрее. Раз в источнике данных порядка не было, то и дальше можно за ним не следить.
    2. DISTINCT — если элементы заведомо уникальны. Любой Set или поток после операции distinct() создаёт сплитератор с такой характеристикой. Например, операция distinct() на потоке из Set выполняться не будет вообще и, стало быть, времени лишнего не займёт.
    3. SORTED — если элементы сортированы. В таком случае обязательно вернуть и ORDERED и переопределить метод getComparator(), вернув компаратор сортировки или null для «естественного порядка». Сортированные коллекции (например, TreeSet) создают сплитератор с такой характеристикой, и с ней потоковая операция sorted() может быть пропущена.
    4. SIZED — если известно точное количество элементов сплитератора. Такую характеристику возвращают сплитераторы всех коллекций. После некоторых потоковых операций (например, map() или sorted()) она сохраняется, а после других (скажем, filter() или distinct()) — теряется. Она полезна для сортировки или, скажем, операции toArray(): можно заранее выделить массив нужного размера, а не гадать, сколько элементов понадобится.
    5. SUBSIZED — если известно, что все дочерние сплитераторы также будут знать свой размер. Эту характеристику возвращает сплитератор от ArrayList, потому что при делении он просто разбивает диапазон значений на два диапазона известной длины. А вот HashSet её не вернёт, потому что он разбивает хэш-таблицу, для которой не известно, сколько содержится элементов в каждой половине. Соответственно дочерние сплитераторы уже не будут возвращать и SIZED.
    6. NONNULL — если известно, что среди элементов нет null. Эту характеристику возвращает, например, сплитератор, созданный ConcurrentSkipListSet: в эту структуру данных null поместить нельзя. Также её возвращают все сплитераторы, созданные на примитивных типах.
    7. IMMUTABLE — если известно, что источник данных в процессе обхода заведомо не может измениться. Сплитераторы от обычных коллекций такую характеристику не возвращают, но её выдаёт, например, сплитератор от Collections.singletonList(), потому что этот список изменить нельзя.
    8. CONCURRENT — если известно, что сплитератор остаётся рабочим после любых изменений источника. Такую характеристику сообщают сплитераторы коллекций из java.util.concurrent. Если сплитератор не имеет характеристик IMMUTABLE и CONCURRENT, то хорошо бы заставить его работать в fail-fast режиме, чтобы он кидал ConcurrentModificationException, если заметит, что источник изменился.
    Насколько мне известно, последние три характеристики сейчас потоками никак не используются (в том числе в коде Java 9).
  • estimateSize() — метод должен возвращать количество оставшихся элементов для SIZED-сплитераторов и как можно более точную оценку в остальных случаях. Например, если мы создадим сплитератор от HashSet и разделим его с помощью trySplit(), estimateSize() будет возвращать половину от исходного размера коллекции, хотя реальное количество элементов в половине хэш-таблицы может отличаться. Если элементов бесконечное количество или посчитать их слишком трудозатратно, можно вернуть Long.MAX_VALUE.

Создать поток по имеющемуся сплитератору очень легко — надо вызвать StreamSupport.stream().

Когда сплитератор писать не надо


Главное понимать, что сам по себе сплитератор вам не нужен, вам нужен поток. Если вы можете создать поток, используя существующий функционал, то стоит сделать именно так. К примеру, хотите вы подружить потоки с XML DOM и создавать поток по NodeList. Стандартного такого метода нет, но его легко написать без дополнительных сплитераторов:

public class XmlStream {
	static Stream<Node> of(NodeList list) {
		return IntStream.range(0, list.getLength()).mapToObj(list::item);
	}
}

Аналогично можно добавить потоки к любой нестандартной коллекции (ещё пример — org.json.JSONArray), которая умеет быстро вернуть длину и элемент по порядковому номеру.

Если вам трудно или лень писать trySplit, лучше не пишите сплитератор вообще. Вот один товарищ пишет библиотеку protonpack, полностью игнорируя существование параллельных потоков. Он написал много сплитераторов, которые вообще не умеют делиться. Сплитератор, который вообще не делится — это плохой, негодный сплитератор. Не делайте так. В данном случае лучше написать обычный итератор и создать по нему сплитератор с помощью методов Spliterators.spliterator или, если вам заранее неизвестен размер коллекции, то Spliterators.spliteratorUnknownSize. Эти методы имеют хоть какую-то эвристику для деления: они обходят часть итератора, вычитывая его в массив и создавая новый сплитератор для этого массива. Если в потоке будет дальше длительная операция, то распараллеливание всё равно ускорит работу.

Если вы реализуете стандартный интерфейс Iterable или Collection, то вам совершенно бесплатно выдаётся default-метод spliterator(). Стоит, конечно, посмотреть, нельзя ли его улучшить. Так или иначе, свои сплитераторы требуется писать весьма редко. Это может пригодиться, если вы разрабатываете свою структуру данных (например, коллекцию на примитивах, как делает leventov).

И всё-таки напишем


Мы напишем новый сплитератор для решения такой задачи: по заданному потоку создать поток пар из соседних значений исходного потока. Так как общепринятого типа для представления пары значений в Java нет и возможных вариантов слишком много (использовать массив из двух значений, список из двух значений, Map.Entry с одинаковым типом ключа и значения и т. д.), мы отдадим это на откуп пользователю: пусть сам решает, как объединить два значения. То есть мы хотим создать метод с такой сигнатурой:

public static <T, R> Stream<R> pairMap(Stream<T> stream, BiFunction<T, T, R> mapper) {...}

С его помощью можно использовать любой тип для представления пары. Например, если мы хотим Map.Entry:

public static <T> Stream<Map.Entry<T, T>> pairs(Stream<T> stream) {
    return pairMap(stream, AbstractMap.SimpleImmutableEntry<T, T>::new);
}

А можно вообще сразу вычислить что-нибудь интересное, не складывая пары в промежуточный контейнер:

public static Stream<Integer> diff(Stream<Integer> stream) {
    return pairMap(stream, (a, b) -> b - a);
}

Этот метод по потоку целых чисел вернёт поток разностей соседних элементов. Как нетрудно догадаться, в итоговом потоке будет на один элемент меньше, чем в исходном.

Мы хотим, чтобы наш pairMap выглядел как обычная промежуточная (intermediate) операция, то есть фактически никаких вычислений производиться не должно, пока дело не дойдёт до терминальной операции. Для этого надо взять spliterator у входного потока, но ничего с ним не делать, пока нас не попросят. Ещё одна маленькая, но важная вещь: при закрытии нового потока через close() надо закрыть исходный поток. В итоге наш метод может выглядеть так:

public static <T, R> Stream<R> pairMap(Stream<T> stream, BiFunction<T, T, R> mapper) {
    return StreamSupport.stream(new PairSpliterator<>(mapper, stream.spliterator()), stream.isParallel()).onClose(stream::close);
}

Исходный поток после вызова метода spliterator() становится «использованным», с ним больше каши не сваришь. Но это нормально: так происходит со всеми промежуточными потоками, когда вы добавляете новую операцию. Метод Stream.concat(), склеивающий два потока, выглядит примерно так же. Осталось написать сам PairSpliterator.

Переходим к сути дела


Самое простое — это написать метод characteristics(). Часть характеристик наследуется у исходного сплитератора, но необходимо сбросить NONNULL, DISTINCT и SORTED: мы не можем гарантировать этих характеристик после применения произвольной mapper-функции:

public int characteristics() {
    return source.characteristics() & (SIZED | SUBSIZED | CONCURRENT | IMMUTABLE | ORDERED);
}

Реализация метода tryAdvance должна быть довольно простой. Надо лишь вычитывать данные из исходного сплитератора, запоминая предыдущий элемент в промежуточном буфере, и вызывать для последней пары mapper. Стоит только помнить, находимся мы в начале потока или нет. Для этого пригодится булева переменная hasPrev, указывающая, есть ли у нас предыдущее значение.

Метод trySplit лучше всего реализовать, вызвав trySplit у исходного сплитератора. Основная сложность тут — обработать пару на стыке двух разделённых кусков исходного потока. Эту пару должен обработать сплитератор, который обходит первую половину. Соответственно, он должен хранить первое значение из второй половины и, когда доберётся до конца, сработать ещё раз, подав его в mapper вместе с последним своим значением.

Разобравшись с этим, напишем конструкторы:

class PairSpliterator<T, R> implements Spliterator<R> {
    Spliterator<T> source;
    boolean hasLast, hasPrev;
    private T cur;
    private final T last;
    private final BiFunction<T, T, R> mapper;

    public PairSpliterator(BiFunction<T, T, R> mapper, Spliterator<T> source) {
        this(mapper, source, null, false, null, false);
    }

    public PairSpliterator(BiFunction<T, T, R> mapper, Spliterator<T> source, T prev, boolean hasPrev, T last,
            boolean hasLast) {
        this.source = source; // исходный сплитератор
        this.hasLast = hasLast; // есть ли дополнительный элемент в конце (первый из следующего куска)
        this.hasPrev = hasPrev; // известен ли предыдущий элемент
        this.cur = prev; // предыдущий элемент
        this.last = last; // дополнительный элемент в конце
        this.mapper = mapper;
    }
    // ...
}

Метод tryAdvance (вместо лямбды для передачи в исходный tryAdvance воспользуемся ссылкой на сеттер):

void setCur(T t) {
    cur = t;
}

@Override
public boolean tryAdvance(Consumer<? super R> action) {
    if (!hasPrev) { // мы в самом начале: считаем один элемент из источника
        if (!source.tryAdvance(this::setCur)) {
            return false; // источник вообще пустой — выходим
        }
        hasPrev = true;
    }
    T prev = cur; // запоминаем предыдущий элемент
    if (!source.tryAdvance(this::setCur)) { // вычитываем следующий из источника
        if (!hasLast)
            return false; // совсем всё закончилось — выходим
        hasLast = false; // обрабатываем пару на стыке двух кусков
        cur = last;
    }
    action.accept(mapper.apply(prev, cur)); // передаём в action результат mapper'а
    return true;
}

А вот и метод trySplit():

public Spliterator<R> trySplit() {
    Spliterator<T> prefixSource = source.trySplit(); // пытаемся разделить источник
    if (prefixSource == null)
        return null; // не вышло — тогда мы сами тоже не делимся
    T prev = cur; // это последний считанный до сих пор элемент, если он вообще был
    if (!source.tryAdvance(this::setCur)) { // вычитываем первый элемент второй половины
        source = prefixSource; // вторая половина источника оказалась пустой — смысла делиться нет
        return null;
    }
    boolean oldHasPrev = hasPrev;
    hasPrev = true; // теперь текущий сплитератор обходит вторую половину, а для первой создаём новый
    return new PairSpliterator<>(mapper, prefixSource, prev, oldHasPrev, cur, true);
}

Написать estimateSize() несложно: если исходный сплитератор способен оценить свой размер, надо лишь проверить флаги и подправить его на единичку туда или обратно:

public long estimateSize() {
    long size = source.estimateSize();
    if (size == Long.MAX_VALUE) // источник не смог оценить свой размер — мы тоже не можем
        return size;
    if (hasLast) // этот сплитератор будет обрабатывать дополнительную пару на стыке кусков
        size++;
    if (!hasPrev && size > 0) // этот сплитератор ещё не вычитал первый элемент
        size--;
    return size;
}

В подобном виде этот сплитератор и попал в мою библиотеку StreamEx. Отличие только в том, что потребовалось сделать версии для примитивных типов, ну и pairMap — это не статический метод.

Всё это, небось, сильно тормозит?


Со скоростью всё не так плохо. Возьмём для примера вот такую задачу со StackOverflow: из заданного набора чисел Integer оставить только те, которые меньше следующего за ними числа, и сохранить результат в новый список. Сама по себе задача очень простая, поэтому существенная часть времени будет уходить на оверхед. Можно предложить две наивные реализации: через итератор (будет работать с любой коллекцией) и через доступ по номеру элемента (будет работать только со списком с быстрым случайным доступом). Вот вариант с итератором (naiveIterator):

List<Integer> result = new ArrayList<>();
Integer last = null;
for (Integer cur : input) {
    if (last != null && last < cur)
        result.add(last);
    last = cur;
}

А вот со случайным доступом (naiveGet):

List<Integer> result = new ArrayList<>();
for (int i = 0; i < input.size() - 1; i++) {
    Integer cur = input.get(i), next = input.get(i + 1);
    if (cur < next)
        result.add(cur);
}

Решение с помощью библиотеки StreamEx очень компактно и работает с любым источником данных (streamEx):

List<Integer> result = StreamEx.of(input).pairMap((a, b) -> a < b ? a : null).nonNull().toList();

Комментаторами было предложено ещё три работающих решения. Наибольшее число голосов набрало более-менее традиционное, которому на входе требуется список со случайным доступом (назовём это решение stream):

List<Integer> result = IntStream.range(0, input.size() - 1).filter(i -> input.get(i) < input.get(i + 1)).mapToObj(input::get)
                .collect(Collectors.toList());

Следующее — это reduce с побочным эффектом, который не параллелится (reduce):

List<Integer> result = new ArrayList<>();
input.stream().reduce((a, b) -> {
    if (a < b)
        result.add(a);
    return b;
});

И последнее — это свой коллектор, который также не параллелится (collector):

public static Collector<Integer, ?, List<Integer>> collectPrecedingValues() {
    int[] holder = { Integer.MAX_VALUE };
    return Collector.of(ArrayList::new, (l, elem) -> {
        if (holder[0] < elem)
            l.add(holder[0]);
        holder[0] = elem;
    }, (l1, l2) -> {
        throw new UnsupportedOperationException("Don't run in parallel");
    });
}

List<Integer> result = input.stream().collect(collectPrecedingValues());

В сравнение также попадают распараллеленные версии stream и streamEx. Опыт будем проводить на массивах случайных целых чисел длиной n = 10 000, 100 000 и 1 000 000 элементов (в результат попадёт порядка половины). Полный код JMH-бенчмарка здесь. Проверено, что все алгоритмы выдают одинаковый результирующий массив.

Замеры проводились на четырёхъядерном Core-i5. Результаты выглядят так (все времена в микросекундах на операцию, меньше — лучше):
Алгоритм n = 10 000 n = 100 000 n = 1 000 000
naiveIterator   97.7   904.0 10592.7
naiveGet   99.8 1084.4 11424.2
collector 112.5 1404.9 14387.2
reduce 112.1 1139.5 12001.5
stream 146.4 1624.1 16600.9
streamEx 115.2 1247.1 12967.0
streamParallel   56.9   582.3   6120.5
streamExParallel   53.4   516.7   5353.4
Видно, что версия с pairMap (streamEx) обгоняет и традиционный потоковый вариант (stream), и версию с коллектором, уступая только неправильному reduce. При этом параллельная версия streamEx также быстрее параллельной версии stream и существенно обгоняет все последовательные версии даже для небольшого набора данных. Это согласуется с эмпирическим правилом из Stream Parallel Guidance: имеет смысл параллелить задачу, если она выполняется не менее 100 микросекунд.

Если вы хотите создавать свои потоки, помните, что от хорошего сплитератора зависит, как будет параллелиться ваша задача. Если вы не хотите заморачиваться с делением, не пишите сплитератор вообще, а воспользуйтесь утилитными методами. Также не стоит писать новый сплитератор, если возможно создать поток, используя существующий функционал JDK. Если у вас сплитератор хороший, то даже не очень сложная задача может ускориться при параллельной обработке.
Tags:
Hubs:
+21
Comments34

Articles