Pull to refresh

Pooling соединений к базе данных на node.js

Reading time 7 min
Views 12K
В этой статье я опишу две абстракции-классы, написанные средствами nodejs, которые предоставляют функционал распределения запросов по открытым каналам (tcp-socket). При этом учитывается общая загруженность системы и, если каналов не хватает, открываются новые, по мере уменьшения общего количества запросов — «лишние» каналы закрываются.

Этот клиент можно использовать для распределения запросов по каналам, которые представляют собой по сути net.Socket. Для этого нужно внести изменения в метод по открытию и закрытию канала, добавлению запроса в канал.

В примере, который я опишу, используется библиотека pg, предоставляющая функционал по открытию сокетов к серверу с базой данных. При этом дефолтовое управление пулом коннектов, предоставляемое библиотекой, никак не используется.

Для начала рассмотрим класс Connect, с помощью которого будет осуществляться управление одной сущностью — коннектом:

Класс Connect
/* Конструктор класса коннект, в качестве аргумента строка формата "pg://USER:PASSWORD@HOST:PORT/DATABASE" */
function Connect(connString) {
    // сохраняем параметры в свойстве объекта
    this._connString = connString;

    // свойство отвечающее, за запуск обработки запросов
    this._isRun = false;

    // максимальное количество запросов помещенных в сокет, после которого будет вызвано событие "maxCount"
    this._maxQueryCount = 100;

    // служебное свойство, используемое в методе _nextTick
    this._worked = false;

    // количество запросов, висящих на коннекте
    this._queryCount = 0;

    // "движок" класса
    this._emitter  =  new (require('events').EventEmitter);

    // делаем "селфи"
    var self = this;

    // на открытие коннекта создаем обработчик "open", в котором регистрируем массив коннектов
    this._emitter.on('open', function() {
        self._arrayQuery = [];
    });

    // на событие ошибки будет сгенерирована ошибка, которая если не навесить обработчик, повалит выполнение скрипта
    this._emitter.on('error', function(err) {
        throw err;
    });

    // на событие достижения лимита этого коннекта, пометим его флагом
    this._emitter.on('maxCount', function(message) {
        self._setMax = true;
    });

    // при создании экземпляра класса открываем коннект до базы, здесь может быть открытие любого коннекта,
    // который представляет собой по сути net.Socket
    pg.connect(this._connString, function(err, client, done) {
        if (err) {
            return self._emitter.emit('error', err);
        }

        // запишем в "внутреннее" свойство ссылку на клиент, который общается с базой
        self._client = client;

        // "мягкое закрытие" клиента
        self._done = done;

        // вызываем событие готовности (передаем событие далее по цепочке)
        self._emitter.emit('open');
    });
}

/* метод, который предоставляет функционал по "навешиванию" обработчиков на события */
Connect.prototype.on = function(typeEvent, func) {
    if(typeEvent == 'error') {
        // если это обработчик на ошибки подменяем стандартный обработчик пользовательским
        this._emitter.removeAllListeners('error');
    }

    this._emitter.addListener(typeEvent, func);
};

/* метод, которые запускает работу по обработке запросов */
Connect.prototype.start = function() {
    this._isRun = true;
    this._nextTick();
};

/* метод, которые останавливает работу по обработке запросов */
Connect.prototype.stop = function() {
    this._isRun = false;
};

/* метод, возвращающий состоянии коннекта (заполнен оли он) */
Connect.prototype.isFull = function() {
    return this._setMax;
};

/*
    метод, закрывающий мягко коннект
    (т.е. если на коннекте висят запросы, программа дождется их выполнения и закроет коннект)
*/
Connect.prototype.close = function () {
    if(this._done) {
        this._emitter.emit('close');
        this._done();
    } else {
        this._emitter.emit('error', new Error('connect is not active'));
    }
};

/* метод, возвращающий массив обрабатываемых запросов */
Connect.prototype.queryQueue = function () {
    return this._arrayQuery;
};

/*
    главный рабочий метод класса - добавление запроса.
    В качестве аргументов сам запрос в виде строки, параметры запроса, коллбэк на завершении запроса
*/
Connect.prototype.addQuery = function (query, params, cb) {
    if(!(typeof query == 'string')) {
        return this._emitter.emit('error', new Error('not valid query'));
    }

    if( !(typeof params == "object") || !(params instanceof Array) ) {
        return this._emitter.emit('error', new Error('not valid argument'));
    }

    this._queryCount++;
    this._arrayQuery.push({ query: query, params: params, callback: cb });

    if(this._queryCount > this._maxQueryCount) {
        this._emitter.emit('maxCount', 'in queue added too many requests, the waiting time increases');
    }

    this._nextTick();
};

/* метод по манипулированию максимальным количеством запросов в коннекте */
Connect.prototype.maxQueryCount = function (count) {
    if(count) {
        this._maxQueryCount = count;
    } else {
        return this._maxQueryCount;
    }
};

/* возвращает количество обрабатываемых запросов */
Connect.prototype.queryCount = function () {
    return this._queryCount;
};

/*
    внутренний метод класса, ответственный за выполнение запросов,
    в данном случае реализован вариант, когда все запросы сразу отправляются
    к базе, возможна реализация в случае с последовательным выполнением
    запросы хранятся во внутреннем буффере (массиве _arrayQuery)
    и отправляются к базе по мере выполнения предыдущего
*/
Connect.prototype._nextTick = function() {
    var self = this;
    if(this._worked) {
        return;
    }

    while(this._isRun && this._arrayQuery.length>0) {
        this._worked = true;
        var el = this._arrayQuery.shift();

        // здесь используется синтаксис библиотеки pg, к которой мы привязаны
        this._client.query(el.query, el.params, function(err, result) {
            self._queryCount--;
            if(err) {
                return el.callback(err);
            }
            el.callback(null, result);

            if(self._queryCount==0) {
                self._emitter.emit('drain');
                self._setMax = false;
            }

        })
    }

    this._worked = false;
};       




Теперь непосредственно класс Balanсer, который будет управлять нашими коннектами: открывать новые, закрывать лишние, распределять между ними запросы, предоставлять единый вход для сервиса

Класс Balancer
/* конструктор класса, который будет распределять запросы */
function Balancer(minCountConnect, maxCountConnect) {
    // записываем в свойство максимальный предел открытых коннектов до базы
    this._maxCountConnect = maxCountConnect;

    // записываем в свойство минимальный предел открытых коннектов до базы
    this._minCountConnect = minCountConnect;

    // массив коннектов
    this._connectArray = [];

    // закрываемые коннекты
    this._closedConnect = [];

    // массив задач
    this._taskArray = [];

    // служебный флаг
    this._run = false;

    // движок класса
    this._emitter = new (require('events').EventEmitter);

    // запускаем инициализацию
    this._init();
}

/* метод инициализации класса, открывающий коннекты последовательно, один за другим */
Balancer.prototype._init = function() {
    this._cursor = 0;
    this.activQuery = 0;
    var self = this;

    var i=0;

    // рекурсивный вызов функции, добавляющей новый коннект
    var cycle = function() {
        i++;
        if(i<self._minCountConnect) {
            self._addNewConnect(cycle);
        }   else {
            self._emitter.emit('ready');
        }
    };

    this._addNewConnect(cycle);
};

/* собственно метод, открывающий соединение, используем класс коннекта */
Balancer.prototype._addNewConnect = function(cb) {
    var self = this;

    var connect = new GPSconnect(connString);
    connect.on('open', function() {
        self._connectArray.push(connect);
        cb();
    });
};

/* метод, по проверке "загруженности" коннекта и возвращающий индекс коннекта */
Balancer.prototype._cycle = function(pos) {
    for (var i=pos;i<this._connectArray.length;i++) {
        if( !(this._connectArray[i].isFull()) )
            break;
    }
    return i;
};

/* метод, заполняющий коннект запросами */
Balancer.prototype._next = function(connect, el) {
    connect.addQuery(el.query, el.params, el.cb);
    connect.start();
    this._distribution();
};


/*
    Главный метод класса - распределяет запросы между коннектами.
    Распределение проходит по принципу "Round-robin" с проверкой на загруженность коннекта.
    Это нужно в случае, если какой то запрос оказался "тяжелым",
    чтобы снять нагрузку с этого коннекта и перераспределить запросы на другие коннекты
    код оформлен конечно криво, надеюсь в скором времени поправить
*/
Balancer.prototype._distribution = function() {
    if(this._taskArray.length>0) {
        var el = this._taskArray.shift();
        this._cursor = this._cycle(this._cursor);
        var self = this;

        if(this._cursor<this._connectArray.length) {
            var connect = this._connectArray[this._cursor];
            this._next(connect, el);
            this._cursor++;

        }   else {
            this._cursor = this._cycle(0);

            if(this._cursor<this._connectArray.length) {
                var connect = this._connectArray[this._cursor];
                this._next(connect, el);
                this._cursor++;
            } else if( this._connectArray.length<this._maxCountConnect) {
                self._addNewConnect(function() {
                    self._cursor = self._connectArray.length-1;
                    var connect = self._connectArray[self._cursor];
                    self._next(connect, el);
                });
            } else {
                for (var i=0;i<this._connectArray.length;i++) {
                    if( this.activQuery/this._connectArray.length > this._connectArray[i].queryCount() ) {
                        break;
                    }
                }
                if(i==this._connectArray.length) {
                    i = 0;
                }
                this._cursor = i;

                var connect = this._connectArray[this._cursor];
                this._next(connect, el);
            }
        }
    }   else {
        this._run = false;
    }
};

/* метод, который предоставляет функционал по "навешиванию" обработчиков на события */
Balancer.prototype.on = function(typeEvent, func) {
    this._emitter.addListener(typeEvent, func);
};


/*
    метод, вызываемый для проверки количества открытых коннектов, и если необходимости в таком количестве нет
    "лишние" коннекты исключается из системы распределения
*/
Balancer.prototype._removeLoad = function() {
    var self = this;

    var temp = this._connectArray[0].maxQueryCount().toFixed();
    var currentCount = (this.activQuery/temp < this._minCountConnect) ? this._minCountConnect : temp;

    if(currentCount< this._connectArray.length ) {
        while( this._connectArray.length  != currentCount  ) {
            var poppedConnect = this._connectArray.pop();
            if(poppedConnect.queryCount()==0) {
                poppedConnect.close();
            }   else {
                poppedConnect.index = self._closedConnect.length;
                poppedConnect.on('drain', function() {
                    poppedConnect.close();
                    self._closedConnect.slice(poppedConnect.index, 1);
                });
                self._closedConnect.push(poppedConnect);
            }
        }
    }
};

/*
    Cобственно метод, который предоставляет вход-трубу, через который добавляются все запросы.
    Параметр tube, возможно использовать для дифференсации запросов между собой,
    пока он никак не используется.
*/
Balancer.prototype.addQuery = function(tube, query, params, cb) {
    this.activQuery++;
    var self = this;

    this._removeLoad();
    var wrappCb = function() {
        self.activQuery--;
        cb.apply(this, arguments);
    };

    this._taskArray.push({ query: query, params: params, cb: wrappCb });
    if(!this._run) {
        this._run = true;
        this._distribution();
    }
};



Как все это проверить? Для тестирования я использую запрос «select pg_sleep(1)», который выполняется 1 секунду и имитирует запрос к базе.

10 000 таких запросов обрабатывались балансировщиком ~101590 ms, при максимальном количестве запросов на коннект равным 100 и границах общего количества каналов=сокетов от 10 до 100.

Использованный скрипт:

var balancer = new Balancer(10,100);
balancer.on('ready', function() {

    var y=0;
    var time = +new Date();
    for(var i=0;i<10000; i++) {
        balancer.addQuery('gps', 'select pg_sleep(1)', [], function(err, result) {
            if(err) console.log(err);
            y++;
            if(y==10000) {
                console.log(balancer._connectArray.length);
                console.log(+new Date()-time);
            }
        });
    }
});


Все исходники доступны на гитхабе.
Клиент еще, конечно, сырой, многое нужно допилить/переписать, так что прошу сильно не ругать. Если нужно, могу заняться им плотнее.
Tags:
Hubs:
+16
Comments 16
Comments Comments 16

Articles