Pull to refresh

Структуры данных в memcached/MemcacheDB. Часть 2

Reading time 9 min
Views 3.1K
Продолжение статьи про структуры данных в memcached. В этой завершающей части мы рассмотрим еще три структуры данных: лог событий, массив и таблицу.

Лог событий


Задача


Задача этой структуры данных — хранение событий, произошедших в распределенной системе за последние T секунд. Каждое событие имеет момент времени, когда оно произошло, остальное содержимое события определяется логикой приложения.

Операции над логом событий:

  • добавить сообщение в лог событий (должна быть максимально быстрой);
  • получить события, произошедшие в период времени от Tmin до Tmax (должна быть эффективной, но вызывается реже, чем добавление);


Решение


def time():
    """
    Текущее время в секундах с любого момента времени (например, UNIX Epoch).

    @return: текущее время в секундах
    @rtype: C{int}
    """

class Event:
    """
    Событие, помещаемое в лог событий.
    """

    def when(self):
        """
        Момент времени, когда произошло событие (секунды).
        """

    def serialize(self):
        """
        Сериализовать событие.

        @return: сериализованное представление
        @rtype: C{str}
        """

    @static
    def deserialize(serialized):
        """
        Десериализовать набор событий.

        @param serialized: сериализованное представление одного 
                           или нескольких событий
        @type serialized: C{str}
        @return: массив десериализованных событий
        @rtype: C{list(Event)}
        """

class MCEventLog(MemcacheObject):
    def __init__(self, mc, name, timeChunk=10, numChunks=10):
        """
        Конструктор.

        @param name: имя лога событий
        @type name: C{str}
        @param timeChunk: емкость одного ключа лога в секундах
        @type timeChunk: C{int}
        @param numChunks: число выделяемых ключей под лог
        @type numChunks: C{int}
        """
        super(MCEventLog, self).__init__(mc)
        self.keyTemplate = 'messagelog' + name + '_%d';
        self.timeChunk = timeChunk
        self.numChunks = numChunks

    def put(self, event):
        """
        Поместить событие в лог.

        @param event: событие
        @type event: L{Event}
        """
        serialized = event.serialize()
        key = self.keyTemplate % (event.when() // self.timeChunk % self.numChunks)

        while True:
            try:
                self.mc.append(key, serialized)
                return
            except KeyError:
                pass

            try:
                self.mc.add(key, serialized, self.timeChunk * (self.numChunks-1))
                return
            except KeyError:
                pass

    def fetch(self, first=None, last=None):
        """
        Получить события из лога за указанный период (или все события).

        @param first: минимальное время возвращаемого сообщения
        @type first: C{int}
        @param last: максимальное время возвращаемого сообщения
        @type last: C{int}
        @return: массив событий
        @rtype: C{list(Event)}
        """
        if last is None or last > time():
            last = time()

        if first is None or last < first or 
           (last-first) > self.timeChunk * (self.numChunks-1):
            first = time() — self.timeChunk * (self.numChunks-1)

        firstKey = first / self.timeChunk % self.numChunks
        lastKey = last / self.timeChunk % self.numChunks

        if firstKey < lastKey:
            keyRange = range(firstKey, lastKey+1)
        else:
            keyRange = range(firstKey, self.numChunks) + range(0, lastKey+1)

        keys = [self.keyTemplate % n for n in keyRange]
        result = []
        for key in keys:
            try:
                events = Event.deserialize(self.mc.get(key))
            except KeyError:
                continue

            result.extend(filter(lambda e: e.when() >= first and 
                                                e.when() <= last, l))

        return result


Обсуждение


Основная идея лога событий — кольцевой буфер, состоящий из numChunks ключей в memcached. Каждый ключ активен (то есть дополняется значениями) в течение timeChunk секунд, после чего активным становится следующий ключ (если активным был последний ключ, эта роль переходит к первому ключу). Полный цикл буфера, т.е. период времени между двумя использованиями одного ключа составляет numChunks * timeChunk секунд, а время жизни каждого ключа — (numChunks - 1) * timeChunk секунд, таким образом при любом сдвиге времени создания ключа по модулю timeChunk к моменту времени следующего использования ключ гарантированно будет уничтожен. Таким образом, ёмкость лога событий (или период времени, за который сохраняются события) составляет (numChunks - 1) * timeChunk секунд. Такое разбиение лога на ключи позволяет при получении событий из лога вынимать лишь те ключи, которые соответствуют интересному нам временному отрезку.

Выбор параметров timeChunk и numChunks зависит от применения лога событий: сначала определяется желаемый срок хранения событий, затем по частоте событий выбирается такое значение timeChunk, чтобы размер каждого ключа лога событий был относительно небольшим (например, 10-20Кб). Из этих соображений можно найти значение и второго параметра, numChunks.

В примере используется некоторый класс Event, который обладает единственным интересным для нас свойством — временем, когда произошло событие. В методе put лога событий предполагается, что событие event, переданное в качестве параметра, произошло «недавно», то есть с момента event.when() прошло не более чем (numChunks - 1) * timeChunk секунд (емкость лога). При работе put вычисляется ключ, в который должна быть помещена информация о событии, в соответствие с его временной меткой. После этого с помощью уже знакомой по предыдущим примерам техники ключ либо создается, либо к значению уже существующего ключа дописывается сериализованное представление события.

Метод fetch вычисляет потенциальный набор ключей лога, в которых могут находиться события, произошедшие во временной интервал от first до last. Если временные рамки не заданы, last считается равным текущему моменту времени, а first — моменту времени, отстоящему от текущего на емкость лога. Набор ключей вычисляется с учетом кольцевой структуры метода, после чего выбираются соответствующие ключи, десериализуются последовательно записанные в них события и проводится дополнительная фильтрация на попадание в отрезок [first, last].

Приведенная выше сигнатура метода позволяет последовательными обращениями выводить новые события из лога:

  1. Первый раз вызывается events = fetch(). Вычисляется lastSeen как max(events.when()).
  2. Все последующие обращения выглядят следующим образом: events = fetch(first=lastSeen), при этом
    lastSeen каждый раз перевычисляется.


Массив


Задача 1


В массиве хранится список значений произвольного типа, относительно редко происходит обновление списка, гораздо чаще происходит получение списка целиком.

Операции над массивом:

  • изменить массив (редкая операция);
  • получить массив целиком (частая операция).


Решение 1


def serializeArray(array):
    """
    Сериализовать массив в бинарное представление.
    """

def deserializeArray(str):
    """
    Десериализовать массив из бинарного представления.
    """

class MCArray1(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя массива
        @type name: C{str}
        """
        super(MCArray1, self).__init__(mc)
        self.lock = MCLock(name)
        self.key = 'array' + name

    def fetch(self):
        """
        Получить текущее значение массива.

        @return: массив
        @rtype: C{list}
        """
        try:
            return deserializeArray(self.mc.get(self.key))
        except KeyError:
            return []

    def change(self, add_elems=[], delete_elems=[]):
        """
        Изменить значение массива, добавив или удалив из него
        элементы.
    
        @param add_elems: элементы, которые надо добавить
        @type add_elems: C{list}
        @param delete_elems: элементы, которые надо удалить
        @type delete_elems: C{list}
        """
        while not self.lock.try_lock():
            pass

        try:
            try:
                array = deserializeArray(self.mc.get(self.key))
            except KeyError:
                array = []
            array = filter(lambda e: e not in delete_elems, array) + add_elems
            self.mc.set(self.key, serializeArray(array), 0)
        finally:
            self.lock.unlock()


Обсуждение 1


Приведенный выше способ решения на самом деле не имеет никакого отношения к массивам, а может быть применен для любой структуры данных. Он основан на модели reader-writer, когда есть много читателей и относительно мало писателей. Читатели в любой момент с помощью метода fetch получают содержимое массива, при этом важно, что «писатель» сhange записывает содержимое одной командой memcached, то есть в силу внутренней атомарности операций get и set в memcached и несмотря на отсутствие синхронизации между методами fetch и сhange, результат fetch всегда будет консистентным: это будет значение до или после очередного изменения. Писатели блокируются от одновременного изменения массива с помощью блокировки MCLock, описанной выше.

В данной ситуации можно было бы избежать использования блокировки и воспользоваться командами gets, cas и add из протокола memcached для того, чтобы гарантировать атомарность изменений с помощью функции change.

Задача 2


Массив хранит список значений некоторого типа, часто происходит операция вида «добавить значение в массив». Относительно редко массив запрашивается целиком. Для простоты реализации в дальнейшем будет рассматриваться массив целых чисел, хотя для решения задачи тип данных не имеет существенного значения.

Операции над массивом:

  • добавить значение в массив (частая операция);
  • получить массив целиком.


Решение 2


def serializeInt(int):
    """
    Сериализовать целое число в бинарное представление (str).
    """

def deserializeIntArray(str):
    """
    Десериализовать массив целых чисел из бинарного представления.
    """

class MCArray2(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя массива
        @type name: C{str}
        """
        super(MCArray2, self).__init__(mc)
        self.key = 'array' + name

    def fetch(self):
        """
        Получить текущее значение массива.

        @return: массив
        @rtype: C{list}
        """
        try:
            return deserializeIntArray(self.mc.get(self.key))
        except KeyError:
            return []

    def add(self, element):
        """
        Добавить элемент в массив.

        @param element: элемент, который необходимо добавить в массив
        @type element: C{int}
        """
        element = serializeInt(element)
        while True:
            try:
                self.mc.append(self.key, element)
            except KeyError:
                return

            try:
                self.mc.add(self.key, element, 0)
            except KeyError:
                return


Обсуждение 2


Эта реализация практически повторяет аналогичный код для лога событий, только упрощенный в силу наличия всего одного ключа. По сравнению с первым вариантом реализации типа данных «массив» уменьшилось число операций memcached, все изменяющие массив процессы могут выполняться без задержек (отсутствие блокировок). Как и в первом варианте, не проверяется наличие дубликатов при добавлении элемента в массив (может быть и хорошо, и плохо, в зависимости от применения).

Возможны следующие улучшения (или расширения) описанного примера:

  • использование нескольких ключей для хранения массива вместо одного, распределение элементов по ключам с использованием хэширования; такой вариант позволит ограничить размер каждого ключа, при условии что массив большой (содержит много элементов);
  • реализация в том же стиле операции удаления элемента из массива, тогда массив можно представить
    как последовательность операций «удалить» и «добавить», например сериализованное представление
    +1 +3 +4 -3 +5 будет после десериализации образовывать массив [1, 4, 5]; при этом как
    операция добавления элемента, так и удаления, будет приводить к дописыванию байт в конец
    сериализованного представления (атомарная операция append).


Таблица


Задача


Необходимо хранить множество строк. Операции над множеством:

  • проверка принадлежности строки множеству (самая частая операция);
  • получение множества целиком, добавление элемента, удаление элемента — редкие операции.


Можно рассматривать данную структуру данных как таблицу, в которой осуществляется быстрый поиск нужной строки. Или как хэш, хранящийся в распределенной памяти.

Решение


def serializeArray(array):
    """
    Сериализовать массив в бинарное представление.
    """

def deserializeArray(str):
    """
    Десериализовать массив из бинарного представления.
    """

class MCTable(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя таблицы
        @type name: C{str}
        """
        super(MCTable, self).__init__(mc)
        self.lock = MCLock(name)
        self.key = 'table' + name

    def has(self, key):
        """
        Проверка наличия ключа в таблице.
        
        @param key: ключ
        @type key: C{str}
        @rtype: C{bool}
        """
        try:
            self.mc.get(self.key + '_v_' + key)
            return True
        except KeyError:
            return False

    def fetch(self):
        """
        Получить целиком значение элементов таблицы.
   
        @return: значение таблицы
        @rtype: C{list(str)}
        """
        try:
            return deserializeArray(self.mc.get(self.key + '_keys'))
        except KeyError:
            pass

    def add(self, key):
        """
        Добавить ключ в таблицу.

        @param key: ключ
        @type key: C{str}
        """
        while not self.lock.try_lock():
            pass

        try:
            try:
                array = deserializeArray(self.mc.get(self.key + '_keys'))
            except KeyError:
                array = []
            if key not in array:
                array.append(key)
            self.mc.set(self.key + '_v_' + key, 1, 0)
            self.mc.set(self.key + '_keys', serializeArray(array), 0)
        finally:
            self.lock.unlock()

    def delete(self, key):
        """
        Удалить ключ из таблицы.

        Реализация аналогична методу add().
        """


Обсуждение


Вообще говоря memcached представляет собой огромную хэш-таблицу, правда в ней отсутствует одна операция, которая необходима для нашей структуры данных: получение списка ключей. Поэтому реализация таблицы использует отдельные ключи для хранения каждого элемента таблицы, и отдельно еще один ключ для хранения списка всех её элементов. Реализация хранения списка всех элементов фактически совпадает с реализацией «массива 1». Для сериализации доступа к списку всех элементов используется блокировка, при этом методы fetch и add не синхронизированы друг с другом, т.к. список всех элементов меняется атомарно и при чтении ключа мы всегда получим некоторое консистентное состояние.

Проверка наличия ключа в таблице выполняется максимально быстро: проверяется наличие соответствующего ключа в memcached. Любое изменение списка элементов всегда происходит одновременно и в ключе, хранящем весь список, и в отдельных ключах для каждого элемента (которые используются только для проверки).

На основе приведенной схемы можно реализовать полноценный хэш, когда для каждого элемента таблицы будет храниться связанное значение, это значение необходимо будет записывать только в отдельные ключи, соответствующие элементам, а список элементов не будет содержать значений.

Заключение


Итак, приведем список «приемов» или «трюков», описанных в данной статье:

  • атомарность операций с помощью memcached (пара add/set и т.п.);
  • блокировки;
  • теневые ключи;
  • кольцевой буфер с автоматическим «отмиранием» ключей;
  • блокировки и модель reader-writer.


В статье не рассматривались вопросы оптимизации, специфичной для memcached, например, использование multi-get запросов. Это делалось сознательно, чтобы не перегружать исходный код и рассказ. Во многих ситуациях приведенные выше примеры следует рассматривать скорее как псевдокод, чем как пример идеальной реализации на Python.

Если Вы нашли ошибку, хотите предложить более ясное, более оптимальное решение поставленным задачам, хотите предложить реализацию для какой-то еще структуры данных — буду рад комментариям и критике.
Tags:
Hubs:
+26
Comments 6
Comments Comments 6

Articles