Pull to refresh

Использование LevelDB

Reading time 8 min
Views 41K
Столкнулся с ситуацией, когда мои коллеги для организации локального персистентного key-value хранилища используют SQLite, MemcacheDB, Redis игнорируя встраиваемые хранилища такие как LevelDB, Sophia, HamsterDB и т.д.

Я разбил статью на две части:
  1. небольшое введение в api LevelDB;
  2. использование LevelDB, для хранения временных рядов.



LevelDB и его API


Некоторые свойства LevelDB:
  • хранилище типа ключ-значение;
  • ключ и значение это произвольный массив байт;
  • данные хранятся упорядоченно, порядок можно задавать;
  • прямой и обратный итератор для обхода данных;
  • множественное атомарное обновление;
  • поддержка снимков;
  • сжатие данных через Snappy.


Открытие и закрытие

Открытие:
#include <assert>
#include "leveldb/db.h"

leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
assert(status.ok());


Закрытие:
delete db;


Опции:
Имя Описание Значение по умолчанию
comparator компаратор задающий порядок ключей BytewiseComparator, использует внутри себя memcmp
create_if_missing создать базу, если отсутствует false
error_if_exists выкинуть ошибку, если база существует false
paranoid_checks агрессивная проверка целостности базы false
env окружение через которое будет производится операции ввода/вывода Env::Default()
write_buffer_size размер буфера на запись 4MB
max_open_files количество открытых файлов 1000
block_cache использовать специальный кеш для блоков NULL, создает и использует внутренний кеш объемом 8MB
block_size приблизительный объем пользовательских данных в блоке 4K
compression сжатие блока kSnappyCompression
filter_policy фильтр(Блума) для уменьшения операций чтения с диска. NULL


Slice

Slice это структура представляющая ключ и значение в ряде методов. Slice содержит в себе указатель на данные и размер данных, при этом Slice не содержит в себе буфера для данных, поэтому нельзя допускать таких ситуаций:
leveldb::Slice slice;
if (...) {
  std::string str = ...;
  slice = str;
}
Use(slice);

leveldb::Slice имеет ряд конструкторов для удобства использования:
Slice(const char* d, size_t n) : data_(d), size_(n) { }
Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }
Slice(const char* s) : data_(s), size_(strlen(s)) { }

И методы для получения данных
const char* leveldb::Slice::data() const;
char leveldb::Slice::operator[](size_t n) const;
std::string leveldb::Slice::ToString() const;


Статус


Для информирования о возможных ошибках большинство функций в LevelDB возвращают статус.
По статусу можно проверить выполнилась функция успешно и получить текстовое описание ошибки.
leveldb::Status s = ...;
if (!s.ok()) cerr << s.ToString() << endl;


Чтение, запись, удаление

Сигнатуры Put, Get, Delete:
Status leveldb::DB::Put(const WriteOptions& options, const Slice& key, const Slice& value);
Status leveldb::DB::Get(const ReadOptions& options, const Slice& key, std::string* value);
Status leveldb::DB::Delete(const WriteOptions& options, const Slice& key);

Пример использования:
std::string value;
leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value);
if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value);
if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);


Итератор


Итератор представлен классом leveldb::Iterator и имеет следующий интерфейс:
bool leveldb::Iterator::Valid() const;
void leveldb::Iterator::SeekToFirst();
void leveldb::Iterator::SeekToLast();
void leveldb::Iterator::Seek(const Slice& target);
void leveldb::Iterator::Next();
void leveldb::Iterator::Prev();
Slice leveldb::Iterator::key() const;
Slice leveldb::Iterator::value() const;
Status leveldb::Iterator::status() const;


Интерфейс итератора предоставляет методы для последовательного и произвольного
доступа. Последовательный доступ может осуществлятся как в прямом, так и в обратном
направлении.

leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
  cout << it->key().ToString() << ": "  << it->value().ToString() << endl;
}
assert(it->status().ok());  // Check for any errors found during the scan
delete it;

for (it->Seek(start);
     it->Valid() && it->key().ToString() < limit;
     it->Next()) {
  ...
} 

for (it->SeekToLast(); it->Valid(); it->Prev()) {
  ...
}


Использование LevelDB, для хранения временных рядов.



Моя работа связана с системами мониторинга, и поэтому появились временные ряды и time series database.

Ограничимся двумя операциями:
  • добавление данных;
  • последовательное чтение интервала.


Схема данных


Для хранение метрик в хранилище типа ключ-значение используют следующую схему: ключ=метрика+временная метка+теги(теги опцианальны).
Подобным образом устроена OpenTSDB, работающая поверх HBase.
Внутри OpenTSDB есть схема uid'ов метрик и схема данных. Такой же принцып будет задействован и здесь.

Одна база будет использоваться для хранения идентификаторов метрик. Ключем тут будет число в size_t, значением строка в стиле Си.

Втроая база будет исопльзоваться под данные, ключем тут будет структура вида:
struct Key
{
    size_t muid;
    time_t timestamp;
};

значение будет хранится в виде double. Тут на полную катушку используется тот факт, что ключ и значение в LevelDB массив байт,
а значит можем использвать простые структуры данных без всякой сериализации.

Интерфейс хранилища


#pragma once

#include <ctime>
#include <cstdint>

#include <memory>
#include <unordered_map>

namespace leveldb
{
    class DB;
    class Iterator;
    class Slice;
    class Comparator;
    class Cache;
}

/*!
 * Хранилище метрик
 */
class Storage
{
public:
    class Iterator;
    typedef size_t MetricUid;

    /*!
     * Представление ключа
     */
    struct Key
    {
        MetricUid muid;   //!< uid метрики
        time_t timestamp; //!< время
    };

    /*!
     * Конструктор
     * @param dir каталог для размещения базы данных и базы метрик
     * @param cacheSizeMb размер блока кеша
     */
    Storage(const std::string& dir, size_t cacheSizeMb = 16);

    /*!
     * @brief Добавление метрики.
     * @param name имя метрики
     * @return уникальный идентификатор метрики
     *
     * Добавляет метрику в базу UID'ов и возвращает UID метрики.
     * Если метрика уже была добавлена, то возращает UID метрики
     */
    MetricUid addMetric(const std::string& name);

    /*!
     * Записать значение
     * @param muid идентификатор метрики
     * @param timestamp временная точка
     * @param value значение
     * @return true если нет ошибок
     */
    bool put(MetricUid muid, time_t timestamp, double value);

    /*!
     * Получить итератор для интервала значений метрики
     * @param muid идентификатор метрики
     * @param from начало интервала
     * @param to конец интервала
     * @return итератор
     */
    Iterator get(MetricUid muid, time_t from, time_t to);

    Storage(const Storage&) = delete;
    Storage& operator=(const Storage&) = delete;

private:

    /*!
     * Инициализация базы uid метрик
     */
    void initUID();

    /*!
     * Инициализация данных
     */
    void initData();

private:

    /*!
     * Текущий индекс для UID
     */
    MetricUid m_currentIndx;

    /*!
     * Базовый каталог
     */
    std::string m_dir;

    /*!
     * Размер блока кеша
     */
    size_t m_cacheSizeMb;

    /*!
     * Кеш для данных
     */
    std::shared_ptr<leveldb::Cache> m_dataCache;

    /*!
     * База UID'ов
     */
    std::shared_ptr<leveldb::DB> m_uid;

    /*!
     * База измерений
     */
    std::shared_ptr<leveldb::DB> m_data;

    /*!
     * Мэп метрика -> uid
     */
    std::unordered_map<std::string, MetricUid> m_metric2uid;
};

/*!
 * Итератор для обхода последовательности данных
 */
class Storage::Iterator
{
public:
    typedef std::tuple<time_t, double> Value;
    typedef std::shared_ptr<leveldb::Iterator> IteratorPrivate;

    Iterator();

    Iterator(const IteratorPrivate& iter, const Key& limit);

    /*!
     * Проверка итератора на валидность
     * @return true если итератор валиден
     */
    bool valid() const;

    /*!
     * Получить значение
     * @return кортеж <время, значение>
     */
    Value value() const;

    /*!
     * Переход к следующему элементу
     */
    void next();

private:

    IteratorPrivate m_iter; //!< итератор LevelDB
    Key m_limit; //!< ключ для ограничения последовательности справа
};


Конструктор Storage принимает путь к каталогу, где будут размещатся база с uid и база с данными, размер блока кеша.

Реализация


Начнем с компаратора, т.к. memcmp не подходит для сравнения чисел. Благодаря использования структуры в качестве ключа, код прост и читаем:
namespace
{
    class TimeMeasurementComporator: public leveldb::Comparator
    {
    public:
        int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const
        {
            const char* dataA = a.data();
            const char* dataB = b.data();
            const Storage::Key* keyA =
                    reinterpret_cast<const Storage::Key*>(dataA);
            const Storage::Key* keyB =
                    reinterpret_cast<const Storage::Key*>(dataB);
            if (keyA->muid < keyB->muid)
            {
                return -1;
            }
            else if (keyA->muid > keyB->muid)
            {
                return 1;
            }

            if (keyA->timestamp < keyB->timestamp)
            {
                return -1;
            }
            else if (keyA->timestamp > keyB->timestamp)
            {
                return 1;
            }

            return 0;
        }

        // Ignore the following methods for now:
        const char* Name() const
        {
            return "TimeMeasurementComporator";
        }
        void FindShortestSeparator(std::string*, const leveldb::Slice&) const
        {
        }
        void FindShortSuccessor(std::string*) const
        {
        }
    };
    TimeMeasurementComporator GLOBAL_COMPORATOR;
}


Дальше инициализация/создание базы под данные:

void Storage::initData()
{
    DB* data;
    Options options;
    options.create_if_missing = true;
    options.compression = kNoCompression;
    options.comparator = &GLOBAL_COMPORATOR;

    if (m_cacheSizeMb)
    {
        options.block_cache = leveldb::NewLRUCache(m_cacheSizeMb * 1048576);
        m_dataCache.reset(options.block_cache);
    }

    Status status = DB::Open(options, m_dir + "/data", &data);
    if (!status.ok())
    {
        LOG(ERROR)<<"Error opening database "<<status.ToString();
        exit(1);
    }
    m_data.reset(data);
}

В опциях передается глобальный компаратор, и отключено сжатие т.к. LelelDB собирается без Snappy.

Инициализация базы с идентификаторами метрик:
void Storage::initUID()
{
    Options options;
    options.create_if_missing = true;
    options.compression = kNoCompression;

    DB* cfg;
    Status status = DB::Open(options, m_dir + "/conf", &cfg);
    if (!status.ok())
    {
        LOG(ERROR)<<"Error opening database "<<status.ToString();
        exit(1);
    }
    m_uid.reset(cfg);

    std::unique_ptr<leveldb::Iterator> it(
            m_uid->NewIterator(leveldb::ReadOptions()));

    for (it->SeekToFirst(); it->Valid(); it->Next())
    {
        const size_t* index = reinterpret_cast<const size_t*>(it->key().data());
        m_metric2uid[it->value().ToString()] = *index;
        m_currentIndx = *index;
    }
}

Тут происходит инициализация базы и заполнения отображения метрики в UID.

Добавление данных довольно простое:
Storage::MetricUid Storage::addMetric(const std::string& name)
{
    auto result = m_metric2uid.find(name);
    if (result != m_metric2uid.end())
    {
        return result->second;
    }
    ++m_currentIndx;
    m_metric2uid[name] = m_currentIndx;
    const auto s = m_uid->Put(WriteOptions(),
            Slice(reinterpret_cast<const char*>(&m_currentIndx), sizeof(m_currentIndx)),
            name);

    if (!s.ok())
    {
        LOG(ERROR)<<"Error put "<<s.ToString();
    }

    return m_currentIndx;
}

bool Storage::put(MetricUid muid, time_t timestamp, double value)
{
    const Key key = {muid, timestamp};

    const auto s = m_data->Put(WriteOptions(),
            Slice(reinterpret_cast<const char*>(&key), sizeof(key)),
            Slice(reinterpret_cast<char*>(&value), sizeof(value)));

    if (!s.ok())
    {
        LOG(ERROR)<<"Error put "<<s.ToString();
    }

    return s.ok();
}


Получение данных реализовано посредством создание обертки над итератором LevelDB:
Storage::Iterator Storage::get(MetricUid muid, time_t from, time_t to)
{
    const Key begin = {muid, from};
    const Key end = { muid, to };

    Storage::Iterator::IteratorPrivate iter(m_data->NewIterator(ReadOptions()));
    iter->Seek(Slice(reinterpret_cast<const char*>(&begin),
                     sizeof(begin)));
    return Storage::Iterator(iter, end);
}

Storage::Iterator::Iterator():
        m_iter(nullptr)
{
    memset(&m_limit, 0, sizeof(m_limit));
}


Storage::Iterator::Iterator(const IteratorPrivate& iter, const Key& limit) :
        m_iter(iter),
        m_limit(limit)
{
}

bool Storage::Iterator::valid() const
{
    if(!m_iter)
    {
        return false;
    }

    const Slice right(reinterpret_cast<const char*>(&m_limit),
                      sizeof(m_limit));

    return m_iter->Valid() &&
           (GLOBAL_COMPORATOR.Compare(m_iter->key(),right) < 0);
}

Storage::Iterator::Value Storage::Iterator::value() const
{
    if(!m_iter)
    {
        return Value(0,0);
    }

    const Key* data =reinterpret_cast<const Key*>(m_iter->key().data());
    double val = *reinterpret_cast<const double*>(m_iter->value().data());
    return Value(data->timestamp, val);
}

void Storage::Iterator::next()
{
    if(m_iter && m_iter->Valid())
    {
        m_iter->Next();
    }
}


Исходные коды прототипа находится на GitHub.

Из интересного:
Обзор популярных современных алгоритмов хранения данных на диске: LevelDB, TokuDB, LMDB, Sophia
Глубокое погружение в дисковые структуры данных, B-деревья, LSM-деревья и фрактальные деревья
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+12
Comments 14
Comments Comments 14

Articles