Pull to refresh

Паттерны использования Riak

Reading time 7 min
Views 3.6K
Riak это NoSQL решение, честная DHT (key/value storage) с дополнительными возможностями для разруливания конфликтов.

У распределенной хеш таблицы есть как плюсы, так и минусы. DHT хорошо масштабируется, но возможны потери данных из-за конфликтов конкурентного доступа, рассмотрим следующий пример:

client a: def o-value = DHT.get("some-key");
client a: def a-value = changeValue(o-value);
client b: def o-value = DHT.get("some-key");
client a: DHT.put("some-key", a-value);
client b: def b-value = changeValue(o-value);
client b: DHT.put("some-key", b-value);


Получилось, что клиент b переписал данные клиента a и никто об этом не знает (ни a, ни b, ни тот, кто прочтет данные по этому ключу позже).

Так как многие NoSQL базы данных в своей основе имеют DHT, интересно смотреть как они пытаются решить проблему конкурентного доступа.

Например, MongoDB использует compare-and-swap стратегию: с каждым документом (значением) храниться его версия, при обновлении указывается версия «предка» измененного документа, если в базе в момент обновления храниться предок, то обновление проходит, если нет, то нет: обновляющая сторона получает сообщение, и пытается провести обновление снова — аналог STM. Такой подход хорошо работает с шардами, но плохо с репликацией.

Riak решает проблему конкурентного доступа подобно системам контроля версий, он, как бы, сохраняет конфликтные версии в разных бранчах, предоставляя программе при следующей выборке провести merge. Такой подход позволяет разрешать конфликты, связанные не только с конкурентным доступом, но и с времянной изолированостью части кластера (partition tolerance: кластер машин может распаться на две части, обе части будут работать и смогут без проблем объединиться в будущем).

Riak накладыват больше условий на разработку, но обеспечивает масштабируемость и надежность данных при работе с большим объемом информации. Статья опишет, как «обойти» ограничения Riak при разработке типичных web приложений.

Блог


Рассмотрим первый примитив, реализованный на базе Riak — append-only список небольшого размера.

Представим, что мы пишем блог, к каждому посту которого будет не очень много комментариев и комментарий нельзя изменять после добавления. В этом случае, разумно в качестве значения хранить весь пост с комментариями, так операция чтения будет проходить за O(1). Опишем схему для данных:

public class Post {
    public static class Comment implements Comparable<Comment> {
        public String text;
        public int ts; //timestamp
        /* equals,  hashCode, compareTo*/
    }
    public String text;
    public List<Comment> comments;
}

Теперь представим, что два пользователя «одновременно» добавили комментарий:

client a: def o-post = DHT.get("post/13");
client a: def a-post = addComment(o-value, "забыл хабракат и все, что после него");
client b: def o-post = DHT.get("some-key");
client a: DHT.put("post/13", a-post);
client b: def b-post = addComment(o-post, "автор посчитал это очевидным");
client b: DHT.put("post/13", b-post);


Теперь в базе с id «post/13» хранятся две записи; и первый, кто обратиться по этому ключу, получит их обе и должен будет самостоятельно их смерджить. Для простоты, предположим, что пост редактировать нельзя, поэтому подойдет пост из любой «ветки», а так как комментарии могут только добавляться, то списки комментариев к обоим постам имеют общий префикс, следовательно, нужно его выделить и создать новый список из префикса, его дополнения первого списка и его дополнения второго списка. Операция мерджа будет следущая:

public static Post merge(Post a, Post b) {
    Post c = new Post();
    c.text = a.text;
    c.comments = Mergers.mergeLists(a.comments, b.comments);
    return c;
}

Где mergeLists определена следующем образом:

public static <T extends Comparable<T>> List<T> mergeLists(List<T> a, List<T> b) {
    List<T> result = new ArrayList<T>();
    List<T> rest   = new ArrayList<T>();
    int max = Math.min(a.size(), b.size());
    int i=0;
    // выделяем общий префикс
    for(;i<max && a.get(i).equals(b.get(i));i++) {
        result.add(a.get(i));
    }
    // собираем хвосты
    for(int j=i;j<a.size();j++) {
        rest.add(a.get(j));
    }
    for(int j=i;j<b.size();j++) {
        rest.add(b.get(j));
    }
    // сортируем хвосты
    Collections.sort(rest);
    // добовляем хвост
    for(T item : rest) {
        result.add(item);
    }
    return result;
}

Очевидно, что mergeLists очень похожа на объединение множеств, а следовательно, если какой-то элемент был в a или b, то он будет и в результирующем списке, следовательно, при слиянии нет потерь данных. Получается, что сейчас мы научились писать в список в Riak, избегая проблем конкурентного доступа.

Если нужно смерджить несколько постов, то используем merge внутри fold комбинатора.

Оповещения (сообщения, обновления)


Следующий примитив, который мы рассмотрим будет списком фиксированного размера с возможностью удаления. В случае, когда он достигает максимального размера, из него выкидывается какой-нибудь элемент (например, самый старый, хотя в распределенных системах это понятие весьма условное). Так как размер списка фиксированный, будем хранить весь список опять как одно значение.

На этот примитив хорошо ложатся всевозможные уведомления. Во-первых, уведомление об событии намного менее важно, чем само событие; во-вторых, вряд ли пользователь хочет видеть уведомление о старом событии, если он давно не заходил в систему; в третьих, если пользователь уже изучает информацию о событии, то уведомление стоит удалить.

В отличии от предыдущей схемы, где в качестве значения хранился один список объектов, здесь будут храниться два списка: список «оповещений» и список удаленных «оповещений». В случае мержа соответствующие списки будут объединяться и, таким образом, удаленный объект останется в списке удаленных объектов, а добавленный в списке добавленных (естественно после мержа из добавленных нужно будет вычесть удаленные). Запишем более формально:

image

Проблема в том, что при таком определении операций, наши списки неограничено растут, хотя реализуют необходимый примитив. Попробуем ограничить эти списки: если список добавленных объектов вырос до максимума — переносим любой объект в список удаленных, если вырос список удаленных — удаляем из него какой-нибудь объект, опять чуть более формально:

image

После каждой операции add, delete и merge нужно выполнять операцию ram. Понятно, что граничив длинну списков мы в чем-то потеряли и, скорее всего, преобрели нежелательное поведение. Попробуем это померить. В нашем случае (когда пропажа объектов штатное дело) единственным нежелательным поведением является появление уже удаленного оповещения. Для измерения этого показателя я смоделировал процесс и произвел серию наблюдений. Вполне очевидно, что количество ошибок должно зависить от длинны списка и от произведения длительности обработки запроса на частоту запросов (это действительно так, я проверял), назовем этот параметр ключевым. Ниже идут несколько графиков, по которым можно понять динамику:

Процент ошибок записи от длинны списка

График отражает долю изменений списка, после которых удаленное «оповещение» вновь появлялось как новое, в зависимости от максимального числа элементов в списке. Ключевой параметр был фиксирован (0.8 и 2), что соответсвует примерно 8 запросам в сек и 20 запросам в сек. Ниже будет написано, что это не так мало, как кажется.
image

Процент ошибок записи от ключевого параметра

На графике отображается динамика процента ошибок в зависимости от ключевого параметра при фиксированной длинне списка (30 и 130 элементов соответственно).
image

1% зона ошибок

По оси абсцисс отложен ключевой параметр, красная линия отвечает за значение 1; по оси ординат отложена длинна списка, красные линии отвечают за 100, 200 и 300 элементов. Черным отмечена зона параметров, при которых ошибка менее 1%.

image

Почему 10 запросов в сек это не так мало, как кажется. Во-первых, учитываются только запросы на запись, во-вторых, это не общее количество запросов, а кол-ко запросов к одному объекту. В случае если мы проектируем, например, google+, то 10 запросов в сек это не кол-во обращений ко всему гуглу, а прогнозируемое частота комментариев к одной записи.

Поток (стена в вконтакте или лента в твиттер)


Последний паттерн в этой статье — большой список, с возможностью добавлять записи в конец, читать записи как с конца, так и с начала, а так же получать за один запрос несколько записей. Предполагается, что удалений из списка будет очень мало.

Этот примитив хорошо описывает ленту в твиттере, а так же стены в социальных сетях, но для него можно придумать и другие применения.

В отличие от предыдущих схем, которые описывались одной парой ключ-значение, этот паттерн описывается несколькими — служебной парой ключ-значением с информацией о начале и конце списка, а так же собственно чанками данных, в которых хранится отрезок фиксированного размера из списка. Определим дата модель, а так же операцию merge для каждого типа данных:

class Info {
    public String key;

    public String prefix;
    public int lastChunk = 0;

    public String getChunkKey(int chunk) {
        return prefix + chunk;
    }

    public Info mergeWith(Info brother) {
        Info info = new Info();
        info.key = key;
        info.prefix = prefix;
        info.lastChunk = Math.max(lastChunk, brother.lastChunk);
        return info;
    }
}

class Chunk<T extends Comparable<T>> {
    public String key;

    List<T> added = new ArrayList<T>();
    List<T> deleted = new ArrayList<T>();

    public void add(T obj) {
        added.add(obj);
    }

    public void delete(T obj) {
        deleted.add(obj);
    }

    public Iterable<T> getData() {
        List<T> data = new ArrayList<T>(added);
        data.removeAll(deleted);
        return data;
    }

    public Chunk<T> mergeWith(Chunk<T> brother) {
        Chunk<T> chunk = new Chunk();
        chunk.key = key;
        chunk.added = mergeLists(added, brother.added);
        chunk.deleted = mergeLists(deleted, brother.deleted);
        return chunk;
    }
}


Думаю, операции очевидны. Если хотим добавить элемент в список:
  • достаем info, которая соответствует каждому списку
  • определяем и достаем последний чанк
  • если он не полон добавляем запись и сохраняем чанк
  • иначе создаем новый чанк, добавляем запись и сохраняем его; кроме того правим «указатель» на последний чанк в info и так же сохраняем
Так как обход потока происходит последовательно (twitter, стена...) чанк за чанком, то при частом удалении есть вероятность находить пустые чанки; тогда потребуется брать следующий чанк (или следующие), чтобы вернуть данные. Получается, что при очень частом удалении длительность операции получения данных нельзя оценить, но при редком удалении, она O(1).

Заключение


Как видно из статьи, на Riak можно положить распространенные в web схемы работы с данными и впоследствии получить безболезненное распределение этих данных по нескольким узлам. Это достигается это за счет того набора примитивов, которые предоставляет Riak программисту.

Мне подход Riak понравился из-за прозрачного подхода к разрешению конфликтов и гибкости при выборе ограничений (CAP) при каждом запросе.
Tags:
Hubs:
+18
Comments 5
Comments Comments 5

Articles