Pull to refresh

Структуры данных в memcached/MemcacheDB. Часть 1

Reading time 8 min
Views 4.8K
Достаточно часто нам приходится хранить данные в memcached или MemcacheDB. Это могут быть относительно простые данные, например, закэшированные выборки из базы данных, а иногда необходимо хранить и обрабатывать более сложные структуры данных, которые обновляются одновременно из нескольких процессов, обеспечивать быстрое чтение данных и т.п. Реализация таких структур данных уже не укладывается в комбинацию команд memcached get/set. В данной статье будут описаны способы хранения некоторых структур данных в memcached с примерами кода и описанием основных идей.

Memcached и MemcacheDB в данной статье рассматриваются вместе, потому что имеют общий интерфейс доступа и логика работы большей части структур данных будет одинаковой, далее будем называть их просто «memcached». Зачем нам нужно хранить структуры данных в memcached? Чаще всего для распределенного доступа к данным из разных процессов, с разных серверов и т.п. А иногда для решения задачи хранения данных достаточно интерфейса, предоставляемого MemcacheDB, и необходимость в использовании СУБД отпадает.

Иногда проект разрабатывается изначально для нераспределенного случая (работа в рамках одного сервера), однако предполагая будущую необходимость масштабирования, лучше использовать сразу такие алгоритмы и структуры данных, которые могут обеспечить легкое масштабирование. Например, даже если данные будут храниться просто в памяти процесса, но интерфейс к доступа к ним повторяет семантику memcached, то при переходе к распределенной и масштабируемой архитектуре достаточно будет заменить обращения к внутреннему хранилищу на обращения к серверу (или кластеру серверов) memcached.

Всего будет рассмотрено 6 структур данных:
  1. блокировка;
  2. счетчик;
  3. счетчик посетителей;
  4. лог событий;
  5. массив;
  6. таблица.


C первой по третью в этой части, а с четвертой по шестую — в следующей.

Примеры кода будут написаны на Python, в них будет использоваться следующий
интерфейс доступа к memcached:

class Memcache(object):
    def get(self, key):
        """
        Получить значение ключа.

        @param key: ключ
        @type key: C{str}
        @return: значение ключа
        @raises KeyError: ключ отсутствует
        """

    def add(self, key, value, expire=0):
        """
        Установить значение ключа, если он еще не существует.
    
        @param key: ключ
        @type key: C{str}
        @param value: значение ключа
        @param expire: "срок годности" ключа в секундах (0 — бесконечно)
        @type expire: C{int}
        @raises KeyError: ключ уже существует
        """

    def incr(self, key, value=1):
        """
        Увеличить на единицу значение ключа.

        @param key: ключ
        @type key: C{str}
        @param value: инкремент
        @type value: C{int}
        @return: новое значение ключа (после увеличения)
        @rtype: C{int}
        @raises KeyError: ключ не существует
        """

    def append(self, key, value):
        """
        Добавить суффикс к значению существующего ключа.
    
        @param key: ключ
        @type key: C{str}
        @param value: суффикс
        @raises KeyError: ключ не существует
        """

    def delete(self, key):
        """
        Удалить ключ.

        @param key: ключ
        @type key: C{str}
        """

Все наши структуры данных будут наследниками базового класса следующего вида:

class MemcacheObject(object):
    def __init__(self, mc):
        """
        Конструктор.

        @param mc: драйвер memcached
        @type mc: L{Memcache}
        """
        self.mc = mc


Блокировка


Задача


Блокировка (mutex, или в данной ситуации скорее spinlock) — это не совсем структура данных, но она может быть использована как «кирпичик» при построении сложных структур данных в memcached. Блокировка используется для исключения одновременного доступа к некоторым ключам, возможности блокировки отсутствуют в API memcached, поэтому она эмулируется с помощью других операций.

Итак, блокировка определяется своим именем, работает распределённо (то есть из любого процесса, имеющего доступ к memcached), имеет автоматическое «умирание» (на случай, если процесс, поставивший блокировку, не сможет её снять, например, по причине аварийного завершения).

Операции над блокировкой:

  • попробовать заблокироваться (try-lock);
  • разблокировать (unlock).


Решение


class MCCounter(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        """
        super(MCCounter, self).__init__(mc)
        self.key = 'counter' + name

    def increment(self, value=1):
        """
        Увеличить значение счетчика на указанное значение.

        @param value: инкремент
        @type value: C{int}
        """
        while True:
            try:
                self.mc.incr(self.key, value)
                return
            except KeyError:
                pass

            try:
                self.mc.add(self.key, value, 0)
                return
            except KeyError:
                pass

    def value(self):
        """
        Получить значение счетчика.

        @return: текущее значение счетчика
        @rtype: C{int}
        """
        try:
            return self.mc.get(self.key)
        except KeyError:
            return 0


Обсуждение


Корректность работы блокировки основывается на операции `add`, которая гарантирует что ровно один процесс сможет «первым» установить значение ключа, все остальные процессы получат ошибку. Приведенный выше класс может быть обернут более удобно, например, для with-обработчика Python. Более подробно вопрос блокировок обсуждался в посте про одновременное перестроение кэшей.

Атомарный счетчик


Задача


Необходимо вычислять количество событий, которые происходят распределённо на разных серверах, также получать текущее значение счетчика. При этом счетчик не должен «терять» события и начислять «лишние» значения.

Тип данных: целое число.

Начальное значение: ноль.

Операции:

  • увеличить значение счетчика на указанное значение;
  • получить текущее значение счетчика.


Решение


def time():
    """
    Текущее время в секундах с любого момента времени (например, UNIX Epoch).

    @return: текущее время в секундах
    @rtype: C{int}
    """

class MCVisitorCounter(MemcacheObject):
    def __init__(self, mc, name, T):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        @param T: период счетчика, секунды
        @type T: C{int}
        """
        super(MCVisitorCounter, self).__init__(mc)
        self.keys = ('counter' + name + '_0', 'counter' + name + '_1')

    def _active(self):
        """
        Получить индекс текущего счетчика.

        @return: 0 или 1
        """
        return 0 if (time() % (2*T)) < T else 1

    def increment(self):
        """
        Увеличить значение счетчика.
        """
        active = self._active()
        while True:
            try:
                self.mc.incr(self.keys[1-active], 1)
                return
            except KeyError:
                pass

            try:
                self.mc.add(self.keys[1-active], 1, 2*T)   # строка 43
                return
            except KeyError:
                pass

    def value(self):
        """
        Получить значение счетчика.

        @return: текущее значение счетчика
        @rtype: C{int}
        """
        try:
            return self.mc.get(self.keys[self._active()])
        except KeyError:
            return 0


Обсуждение


Реализация счетчика достаточно очевидная, самый сложный момент — это «вечный» цикл в методе increment. Он необходим для корректной инициализации значения счетчика в условиях конкурентного доступа к нему. Если операция incr завершилась ошибкой отсутствия ключа, нам необходимо создать ключ счетчика, но этот код могут одновременно выполнять несколько процессов, тогда одному из них удастся add, а второй получит ошибку и инкрементирует уже созданный счетчик с помощью incr. Зацикливание невозможно, т.к. одна из операций incr или add должна быть успешной: после создания ключа в memcached операция incr будет успешной всё время существования ключа.

Как быть с ситуацией, когда ключ со счетчиком из memcached пропадет? Возможны две ситуации, когда ключ исчезнет:
  1. memcached не хватает памяти для размещения других ключей;
  2. процесс memcached аварийно завершился (сервер «упал»).


(В случае MemcacheDB, настроенной репликации и т.п. падение сервера не должно приводить к потере ключей.) В первом случае надо лишь правильно обслуживать memcached — памяти должно хватать для счетчиков, иначе мы начинаем терять значения, а это неприемлемо в данной ситуации (но совершенно нормально, например, для хранения кэшей в memcached). Второй случай всегда возможен, если такая ситуация часто возникает, можно дублировать счетчики в нескольких memcached-серверах.

Счетчик посетителей


Задача


Необходимо вычислить количество уникальных обращений к сайту в течение некоторого периода T. Например, T может быть равно 5 минутам. Пусть мы можем сказать, когда было последнее обращение данного посетителя к сайту — более T минут назад или менее (то есть является ли оно уникальным в течение периода T).

Операции над счетчиком:

  • увеличить значение счетчика на единицу (обращение уникального в течение периода T посетителя);
  • получить текущее число посетителей.


Решение


def time():
    """
    Текущее время в секундах с любого момента времени (например, UNIX Epoch).

    @return: текущее время в секундах
    @rtype: C{int}
    """

class MCVisitorCounter(MemcacheObject):
    def __init__(self, mc, name, T):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        @param T: период счетчика, секунды
        @type T: C{int}
        """
        super(MCVisitorCounter, self).__init__(mc)
        self.keys = ('counter' + name + '_0', 'counter' + name + '_1')

    def _active(self):
        """
        Получить индекс текущего счетчика.

        @return: 0 или 1
        """
        return 0 if (time() % (2*T)) < T else 1

    def increment(self):
        """
        Увеличить значение счетчика.
        """
        active = self._active()
        while True:
            try:
                self.mc.incr(self.keys[1-active], 1)
                return
            except KeyError:
                pass

            try:
                self.mc.add(self.keys[1-active], 1, 2*T)   # строка 43
                return
            except KeyError:
                pass

    def value(self):
        """
        Получить значение счетчика.

        @return: текущее значение счетчика
        @rtype: C{int}
        """
        try:
            return self.mc.get(self.keys[self._active()])
        except KeyError:
            return 0


Обсуждение


Работа счетчика посетителей

Основная структура работы со счетчиком в memcached в данной задаче повторяет решение задачи атомарного счетчика. Базовая идея — использование теневого и текущего счетчика: теневой счетчик получает обновления (увеличивается), а текущий используется для получения значения числа посетителей (его значение было накоплено, когда она был теневым). Каждый период T текущий и теневой счетчик меняются местами. Ключ теневого счетчика создается со сроком годности 2*T секунд, то есть его время жизни — T секунд, пока он был теневым (и увеличивался), и Т секунд, пока его значение использовалось для чтения. К моменту того, как этот ключ снова станет теневым счетчиком, он должен быть уничтожен memcached, т.к. срок его годности истек. Идея теневого и текущего счетчика напоминает, например, двойную буферизацию при выводе на экран в играх.

Необходимо обратить внимание на функцию _active(): она реализована именно через вычисление остатка от деления текущего времени в секундах, а не, например, через периодическое (раз в Т секунд) смену значения active, т.к. если время на всех серверах синхронизировано с разумной точностью, то и результат функции _active() будет на всех серверах одинаковым, что важно для корректного распределенного функционирования данной структуры данных.

Описанный подход будет работать корректно только при достаточно большом количестве посетителей, когда временной интервал между очередной сменой значения функции _active() и обращением к функции increment() будет как можно меньше. То есть когда increment() вызывается часто, то есть когда посетителей достаточно много, например, 1000 человек при периоде Т=3 минуты. Иначе создание и уничтожение ключей не будет синхронизировано с моментом смены значения _active() и начнут пропадать посетители, накопленные в теневом счетчике, который будет уничтожен в процессе накопления значений и создан заново. В этой ситуации его время жизни 2*T будет «сдвинуто» относительно моментов 0, T функции time() % (2*T). В силу того, что memcached рассматривает срок годности ключей дискретно относительно секунд (с точностью не более одной секунды), любой сдвиг времени жизни ключа может составлять 1,2,...,T-1 секунду (отсутствие сдвига — 0 секунд). Дискретность срока годности означает, что если ключ был создан в 25 секунд 31 миллисекунду со сроком годности 10 секунд, то он будет уничтожен на 35-й (или 36-й) секунде 0 миллисекунд (а не 31-й миллисекунде). Для компенсация сдвига в целое число секунд необходимо исправить строчку 43 примера следующим образом:

self.mc.add(self.keys[1-active], 1, 2*T - time() % T)


Значение счетчика посетителей не изменяется в течение периода времени T, для более приятного отображения можно к значению счетчика при выводе добавить нормально распределенную величину с небольшой дисперсией и математическим ожиданием около 5% от значения счетчика.

Немного более сложный вариант такого счетчика (с интерполяцией во времени), но с точно такой же идеей был описан в посте про атомарность операции и счетчики в memcached.

UPD: Вторая часть статьи.
Tags:
Hubs:
+47
Comments 23
Comments Comments 23

Articles