В статье ниже я попытаюсь кратко рассказать о том, что такое Tarantool и как начать его использовать в уже существующем проекте если вы программируете на Java. Если же вы программируете на другом языке, то вам могут быть интересны некоторые инструменты доступные в коннекторе, такие как возможность редактирование xlog файлов и создание snap файлов из любых данных. Если вы не знаете, что такое Tarantool, то лучше прочитать этот пост.
Tarantool — это ключ-кортеж хранилище данных. Все данные и индексы хранятся в оперативной памяти. Значения составляют кортеж, далее tuple, кортежи — пространство, далее space, пространства — модель данных. Поддерживаются 3 типа данных: 32 битное без знаковое целоe, 64 битное без знаковое целое и бинарная строка, далее NUM, NUM64 и STR соответственно. Для любого пространства должны быть определены тип и структура первичного индекса, например: HASH по полям 1,2 где 1 — NUM, а 2 — NUM64. Вторичные индексы задаются точно так же как и первичные. DML операции атомарны на уровне кортежа и выполняются только по первичному индексу. Для выполнения нескольких операций атомарно нужно использовать встроенный язык Lua. Сохранность данных обеспечивается путём сохранения снимка текущего состояния, далее snapshot, и бинарного лога, далее xlog. Для хранения кортежей используется slab.
В примерах ниже используется java connector, подробнее о нём можно узнать по адресу dgreenru.github.com/tarantool-java. Последняя стабильная версия на момент написания статьи 0.1.2. Ниже я рассмотрю пример использования дополнительного функционала позволяющего переносить и синхронизировать данные с любыми другими хранилищами.
Первичный индекс id и 2 вторичных уникальных индекса username и email. Из непереносимых по умолчанию мест можно выделить auto_increment и timestamp. Для первого можно использовать хранимую процедуру box.auto_increment, а для второго можно хранить данные в формате yyyyMMddhhmmss или секундах. Если таблица user достаточно небольшая, то можно просто прочитать данные из mysql и вставить в Tarantool Box, на этой задаче я останавливаться не буду, а расскажу, что делать если таблица очень большая, т.е. содержит очень много записей, пусть и каждая из них небольшого размера. Для начала нужно выгрузить данные в удобный для нас формат, желательно не сильно занимая при этом ресурсы сервера.
Скопировав файл на нужный сервер или локальный компьютер, можно приступить к его обработке и конвертации в формат Tarantool Box. В примере ниже, для простоты, не рассматриваются эскейп последовательности. Если у вас в таблицах встречаются символы табуляции, переноса строки, возврата каретки, обратный слэш или поля содержат NULL значения, вам нужно добавить их обработку самостоятельно.
В результате имеем файл
теперь необходимо настроить space 0 соответствующим образом.
Далее нам нужно заменить 00000000000000000001.snap находящегося в папке work_dir из конфигурационного файла на созданный нами файл.
и попробовать запустить сервер
так же посмотрите файл tarantool.log, в случае успешного запуска он будет заканчиваться на строки похожие на приведённые ниже, в случае ошибки, вы сразу увидите причину.
Далее корректность вставки данных можно проверить простым способом
т.е. мы проверили нахождение данных по 3-м ключам, указанным нами в конфиге. Далее можно посмотреть количество потребляемой процессом памяти в системе и отчёт команды show slab в консоли Tarantool Box.
Tarantool Box запущен, теперь нужно позаботится о резервном копирование данных и поддержке таблицы MySQL в актуальном состояние на случай, если какие-то запросы используют данные из неё в своих целях. Это достаточно просто организовать при помощи класса ReplicationClient. Он позволит иметь почти полную резервную копию xlog без использования полноценного slave сервера и организовать обновление таблицы в MySQL без затрат дополнительных ресурсов и времени. Не забудьте указать replication_port в конфиге, чтобы сделать репликацию возможной. Описанный ниже класс сохраняем все логи полученные от сервера в файлы длинной по 50 тыс. записей. Алгоритм работы достаточно простой:
1. поиск уже существующих логов
2. определение максимального lsn
3. подключение на порт репликации
4. транслируем в файл получаемые данные
Логика обновления MySQL в данном коде отсутствует, но её легко реализовать немного изменив цикл в функции main. Сложным место является расширение класса ReplicationClient кодом, который записывает получаемые данные в бинарный лог, расширяя их до формата xlog. На этом месте можно особо не останавливаться, т.к. данный пример скорее заготовка для реального приложения, чем демонстрация использования.
Так же отдельно хотелось бы отметить, что при использовании функционала работы с xlog файлами практически невозможно потерять данные, даже если вы случайно удалили кортеж или очистили целиком space, используя классы XLogReader и XLogWriter вы сможете легко отредактировать xlog.
На этом в принципе всё, ещё раз напоминаю, что более подробно о коннекторе можно узнать по адресу dgreenru.github.com/tarantool-java, исходный код использованных примеров доступен в репозитории на гитхабе.
Tarantool — это ключ-кортеж хранилище данных. Все данные и индексы хранятся в оперативной памяти. Значения составляют кортеж, далее tuple, кортежи — пространство, далее space, пространства — модель данных. Поддерживаются 3 типа данных: 32 битное без знаковое целоe, 64 битное без знаковое целое и бинарная строка, далее NUM, NUM64 и STR соответственно. Для любого пространства должны быть определены тип и структура первичного индекса, например: HASH по полям 1,2 где 1 — NUM, а 2 — NUM64. Вторичные индексы задаются точно так же как и первичные. DML операции атомарны на уровне кортежа и выполняются только по первичному индексу. Для выполнения нескольких операций атомарно нужно использовать встроенный язык Lua. Сохранность данных обеспечивается путём сохранения снимка текущего состояния, далее snapshot, и бинарного лога, далее xlog. Для хранения кортежей используется slab.
В примерах ниже используется java connector, подробнее о нём можно узнать по адресу dgreenru.github.com/tarantool-java. Последняя стабильная версия на момент написания статьи 0.1.2. Ниже я рассмотрю пример использования дополнительного функционала позволяющего переносить и синхронизировать данные с любыми другими хранилищами.
Пример переноса таблицы MySQL в Tarantool Box:
mysql> desc user;
+------------+--------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------+--------------+------+-----+-------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| username | varchar(255) | NO | UNI | NULL | |
| email | varchar(255) | NO | UNI | NULL | |
| enabled | tinyint(1) | NO | | 1 | |
| registered | timestamp | NO | | CURRENT_TIMESTAMP | |
+------------+--------------+------+-----+-------------------+----------------+
5 rows in set
Первичный индекс id и 2 вторичных уникальных индекса username и email. Из непереносимых по умолчанию мест можно выделить auto_increment и timestamp. Для первого можно использовать хранимую процедуру box.auto_increment, а для второго можно хранить данные в формате yyyyMMddhhmmss или секундах. Если таблица user достаточно небольшая, то можно просто прочитать данные из mysql и вставить в Tarantool Box, на этой задаче я останавливаться не буду, а расскажу, что делать если таблица очень большая, т.е. содержит очень много записей, пусть и каждая из них небольшого размера. Для начала нужно выгрузить данные в удобный для нас формат, желательно не сильно занимая при этом ресурсы сервера.
mysql> select * into outfile '/tmp/user' from user;
Query OK, 73890541 rows affected
$ head -1 /tmp/user
1 username email@domain.tld 1 2012-10-14 01:27:05
Скопировав файл на нужный сервер или локальный компьютер, можно приступить к его обработке и конвертации в формат Tarantool Box. В примере ниже, для простоты, не рассматриваются эскейп последовательности. Если у вас в таблицах встречаются символы табуляции, переноса строки, возврата каретки, обратный слэш или поля содержат NULL значения, вам нужно добавить их обработку самостоятельно.
BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream("/tmp/user.gz")), "utf-8"));
SnapshotWriter writer = new SnapshotWriter(new FileOutputStream("/tmp/user.snap").getChannel());
String line = null;
DateFormat indf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
DateFormat outdf = new SimpleDateFormat("yyyyMMddhhmmss");
Pattern pattern = Pattern.compile("\t");
while ((line = reader.readLine()) != null) {
try {
String[] values = pattern.split(line);
if (values.length == 5) {
Integer id = Integer.parseInt(values[0]);
String username = values[1];
String email = values[2];
byte[] enabled = { Byte.valueOf(values[3]) };
Long registered = Long.parseLong(outdf.format(indf.parse(values[4])));
Tuple tuple = new Tuple(5).setInt(0, id).setString(1, username, "UTF-8")
.setString(2, email, "UTF-8").setBytes(3, enabled).setLong(4, registered);
writer.writeRow(0, tuple);
} else {
System.err.println("Line should be splited in 5 parts, but has " + values.length + " for " + line);
}
} catch (Exception e) {
System.err.println("Can't parse line " + line);
e.printStackTrace();
}
}
writer.close();
reader.close();
В результате имеем файл
$ ls -sh /tmp/user.snap
16.1G /tmp/user.snap
теперь необходимо настроить space 0 соответствующим образом.
# Этот параметр ограничивает суммарный размер памяти выделенной под slab блоки.
# Индексы и другие накладные расходы хранятся вне slab,
# поэтому суммарная память процесса может быть до 2х больше.
# В нашем случае логично поставить здесь 24 гигабайта.
slab_alloc_arena = 24
# Так же имеет смысл откорректировать количество записей в одном xlog файле.
rows_per_wal = 500000
# И конечно же конфигурация ключей space 0
space[0].enabled = 1
# id. Чтобы использовать box.auto_increment тип дерева должен быть TREE.
space[0].index[0].type = "TREE"
space[0].index[0].unique = 1
space[0].index[0].key_field[0].fieldno = 0
space[0].index[0].key_field[0].type = "NUM"
#username
space[0].index[1].type = "HASH"
space[0].index[1].unique = 1
space[0].index[1].key_field[0].fieldno = 1
space[0].index[1].key_field[0].type = "STR"
#password
space[0].index[2].type = "HASH"
space[0].index[2].unique = 1
space[0].index[2].key_field[0].fieldno = 2
space[0].index[2].key_field[0].type = "STR"
Далее нам нужно заменить 00000000000000000001.snap находящегося в папке work_dir из конфигурационного файла на созданный нами файл.
$ mv /tmp/user.snap /var/lib/tarantool/00000000000000000001.snap
и попробовать запустить сервер
$ tarantool_box --background
$ ps -C tarantool_box -o pid=,cmd=
8853 tarantool_box: primary pri: 33013 sec: 33014 adm: 33015
так же посмотрите файл tarantool.log, в случае успешного запуска он будет заканчиваться на строки похожие на приведённые ниже, в случае ошибки, вы сразу увидите причину.
1350504007.249 7127 1/sched _ I> Space 0: done
1350504007.249 7127 101/33013/primary _ I> bound to port 33013
1350504007.249 7127 101/33013/primary _ I> I am primary
1350504007.249 7127 102/33014/secondary _ I> bound to port 33014
1350504007.250 7127 103/33015/admin _ I> bound to port 33015
1350504007.251 7127 1/sched _ C> log level 4
1350504007.251 7127 1/sched _ C> entering event loop
Далее корректность вставки данных можно проверить простым способом
$ tarantool -a 127.0.0.1
127.0.0.1> select * from t0 where k0 = 1
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', '\x01', '\x21\x8b\xe4\xc9\x4c\x12']
127.0.0.1> select * from t0 where k1 = 'username'
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', '\x01', '\x21\x8b\xe4\xc9\x4c\x12']
127.0.0.1> select * from t0 where k2 = 'email@domain.tld'
Select OK, 1 rows affected
[1, 'username', 'email@domain.tld', '\x01', '\x21\x8b\xe4\xc9\x4c\x12']
т.е. мы проверили нахождение данных по 3-м ключам, указанным нами в конфиге. Далее можно посмотреть количество потребляемой процессом памяти в системе и отчёт команды show slab в консоли Tarantool Box.
Tarantool Box запущен, теперь нужно позаботится о резервном копирование данных и поддержке таблицы MySQL в актуальном состояние на случай, если какие-то запросы используют данные из неё в своих целях. Это достаточно просто организовать при помощи класса ReplicationClient. Он позволит иметь почти полную резервную копию xlog без использования полноценного slave сервера и организовать обновление таблицы в MySQL без затрат дополнительных ресурсов и времени. Не забудьте указать replication_port в конфиге, чтобы сделать репликацию возможной. Описанный ниже класс сохраняем все логи полученные от сервера в файлы длинной по 50 тыс. записей. Алгоритм работы достаточно простой:
1. поиск уже существующих логов
2. определение максимального lsn
3. подключение на порт репликации
4. транслируем в файл получаемые данные
Логика обновления MySQL в данном коде отсутствует, но её легко реализовать немного изменив цикл в функции main. Сложным место является расширение класса ReplicationClient кодом, который записывает получаемые данные в бинарный лог, расширяя их до формата xlog. На этом месте можно особо не останавливаться, т.к. данный пример скорее заготовка для реального приложения, чем демонстрация использования.
public class Backup {
protected DecimalFormat xlogNameFormat = new DecimalFormat("00000000000000000000");
protected String folder;
protected FileChannel xlogChannel;
protected int row;
protected int limit = 50000;
protected long lsn = 0L;
protected ReplicationClient client;
protected XLogWriter writer;
public void setLimit(int limit) {
this.limit = limit;
}
public Backup(String folder, String host, int port) throws IOException {
this.folder = folder;
}
protected void getLatestLSN(String folder) throws IOException, FileNotFoundException {
final File backupFolder = new File(folder);
String[] xlogs = backupFolder.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.endsWith(".xlog");
}
});
boolean hasLogs = xlogs != null && xlogs.length > 0;
if (hasLogs) {
Arrays.sort(xlogs);
XLogReader reader = new XLogReader(new FileInputStream(folder + "/" + xlogs[xlogs.length - 1]).getChannel());
XLogEntry xlogEntry = null;
while ((xlogEntry = reader.nextEntry()) != null) {
lsn = xlogEntry.header.lsn;
}
reader.close();
}
}
public void start() throws IOException {
getLatestLSN(folder);
System.out.println("Planning to start from lsn: " + lsn);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
try {
synchronized (this) {
close();
}
} catch (IOException e) {
throw new IllegalStateException("Can't close xlog", e);
}
}
}));
final ByteBuffer rowStartMarker = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putInt(Const.ROW_START_MARKER);
client = new ReplicationClient(SocketChannel.open(new InetSocketAddress("127.0.0.1", 33016)), lsn + 1L) {
@Override
protected ByteBuffer readBody(Header header) throws IOException {
if (Backup.this.xlogChannel == null) {
Backup.this.xlogChannel = nextFile(folder);
}
ByteBuffer body = super.readBody(header);
this.header.flip();
rowStartMarker.flip();
synchronized (Backup.this) {
while (rowStartMarker.hasRemaining())
Backup.this.xlogChannel.write(rowStartMarker);
while (this.header.hasRemaining())
Backup.this.xlogChannel.write(this.header);
while (body.hasRemaining())
Backup.this.xlogChannel.write(body);
Backup.this.xlogChannel.force(false);
body.flip();
}
return body;
}
};
}
public XLogEntry nextEntry() throws IOException {
XLogEntry entry = client.nextEntry();
lsn = entry.header.lsn;
if (++row >= limit) {
close();
xlogChannel = nextFile(folder);
row = 0;
}
return entry;
}
protected FileChannel nextFile(String folder) throws IOException {
String fileName = folder + "/" + xlogNameFormat.format(lsn + 1L) + ".xlog";
new File(fileName).createNewFile();
FileChannel channel = new FileOutputStream(fileName, true).getChannel();
writer = new XLogWriter(channel);
return channel;
}
public void close() throws IOException {
if (writer != null) {
writer.close();
}
}
public static void main(String[] args) throws IOException {
final Backup backup = new Backup("/home/dgreen/backup", "localhost", 33016);
backup.start();
XLogEntry entry = null;
while ((entry = backup.nextEntry()) != null) {
StringBuilder pk = new StringBuilder();
for (int i = 0; i < entry.tuple.size(); i++) {
if (pk.length() > 0) {
pk.append(" - ");
}
switch (entry.tuple.getBytes(i).length) {
case 4:
pk.append(String.valueOf(entry.tuple.getInt(i)));
break;
case 8:
pk.append(String.valueOf(entry.tuple.getLong(i)));
break;
default:
pk.append(entry.tuple.getString(i, "UTF-8"));
}
}
switch (entry.op) {
case Update.OP_CODE:
System.out.println("Got update on #" + pk.toString());
break;
case Insert.OP_CODE:
System.out.println("Got insert " + pk.toString());
break;
case Delete.OP_CODE:
System.out.println("Got delete of #" + pk.toString());
break;
default:
System.out.println("Got unknown op " + entry.op + " " + pk.toString());
break;
}
}
}
}
Так же отдельно хотелось бы отметить, что при использовании функционала работы с xlog файлами практически невозможно потерять данные, даже если вы случайно удалили кортеж или очистили целиком space, используя классы XLogReader и XLogWriter вы сможете легко отредактировать xlog.
На этом в принципе всё, ещё раз напоминаю, что более подробно о коннекторе можно узнать по адресу dgreenru.github.com/tarantool-java, исходный код использованных примеров доступен в репозитории на гитхабе.