JAVA

индекс
157,35

Java: Socks 4 Proxy работа с неблокирующими сокетами

Начиная с версии 1.4 в j2se появился package java.nio, который позволяет работать с сокетами в неблокирующем режиме, что зачастую повышает производительность, упрощает код и даёт дополнительные возможности и функционал. А начиная с версии j2se 1.6 на серверах под упралением ОС линукс(kernel 2.6) реализация класса Selector выполнена с использованием epoll, что обеспечивает максимально возможную производительность.

В примере описанном ниже я постараюсь продемонстрироватьь основные принципе работы с неблокирующими сокетами, на примере вполне реальной задачи – реализации Socks 4 прокси сервер.

Во время жизни с неблокирующим сокетом может приключиться всякое, а именно

ServerSocketChannel
  • OP_ACCEPT – входящее соединение

SocketChannel
  • OP_READ – на соске данные или дисконнект
  • OP_WRITE – соска готова к записи или дисконнект
  • OP_CONNECT – соединение или установлено или нет



Выбираются сокеты на которых что-то случилось при помощи одного из методов
  • select() – блокирующий метод, просыпается по событию или по wakeUp()
  • select(long) – та же тема только с таймаутом
  • selectNow() – ну и неблокирующий вариант


В нашем случае прокся штука пассивная, поэтому нам больше подходит базовый блокирующий select().
После этого нужно запросить у селектора ключи которые проявили активность за последнюю выборку и используя методы isAcceptable(), isReadable(), isWriteable(), isConnectable() узнать что с ними произошло.

Основной алгоритм работы нашего прокси сервера такой:
  1. Принимаем соединение
  2. Парсим заголовок (для упрощения этого шага мы предполагаем что размер заголовка всегда меньше размера буфера)
  3. Устанавливаем соединение с целью
  4. Отвечаем клиенту что всё ОК
  5. Проксируем
  6. Закрываем соединения


Чтобы избежать проблем с полными сокет буферами проксировать будем следующим образом:
Пусть у нас два конца A и B при этом A.in=B.out и наоборот, следовательно A.interestOps()|OP_READ!= B.interestOps()|OP_WRITE (чтобы один буфер одновременно не использовался двумя каналами).
После того как одна из сторон закроет соединение, надо дописать данные из буфера второй стороне и закрыть соединение.

Ну и собственно сам код, функции старался расположить в порядке действий для упрощения понимания алгоритма, комментарии прилагаются.
package ru.habrahabr;
 
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
 
/**
 * Класс реализующий простой неблокирующий Socks 4 Proxy Сервер Реализуюший
 * только команду connect
 * 
 * @author dgreen
 * @date 19.09.2009
 * 
 */

public class Socks4Proxy implements Runnable {
    int bufferSize = 8192;
    /**
     * Порт
     */

    int port;
    /**
     * Хост
     */

    String host;
 
    /**
     * Дополнительная информация цепляемая к каждому ключу {@link SelectionKey}
     * 
     * @author dgreen
     * @date 19.09.2009
     * 
     */

    static class Attachment {
        /**
         * Буфер для чтения, в момент проксирования становится буфером для
         * записи для ключа хранимого в peer
         * 
         * ВАЖНО: При парсинге Socks4 заголовком мы предполагаем что размер
         * буфера, больше чем размер нормального заголовка, у браузера Mozilla
         * Firefox, размер заголовка равен 12 байт 1 версия + 1 команда + 2 порт +
         * 4 ip + 3 id (MOZ) + 1 \0
         */

 
        ByteBuffer in;
        /**
         * Буфер для записи, в момент проксирования равен буферу для чтения для
         * ключа хранимого в peer
         */

        ByteBuffer out;
        /**
         * Куда проксируем
         */

        SelectionKey peer;
 
    }
 
    /**
     * так выглядит ответ ОК или Сервис предоставлен
     */

    static final byte[] OK = new byte[] { 0x00, 0x5a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 };
 
    /**
     * Сердце неблокирующего сервера, практически не меняется от приложения к
     * приложению, разве что при использование неблокирующего сервера в
     * многопоточном приложение, и работе с ключами из других потоков, надо
     * будет добавить некий KeyChangeRequest, но нам в этом приложение это без
     * надобности
     */

    @Override
    public void run() {
        try {
            // Создаём Selector
            Selector selector = SelectorProvider.provider().openSelector();
            // Открываем серверный канал
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // Убираем блокировку
            serverChannel.configureBlocking(false);
            // Вешаемся на порт
            serverChannel.socket().bind(new InetSocketAddress(host, port));
            // Регистрация в селекторе
            serverChannel.register(selector, serverChannel.validOps());
            // Основной цикл работу неблокирующего сервер
            // Этот цикл будет одинаковым для практически любого неблокирующего
            // сервера
            while (selector.select() > -1) {
                // Получаем ключи на которых произошли события в момент
                // последней выборки
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isValid()) {
                        // Обработка всех возможнных событий ключа
                        try {
                            if (key.isAcceptable()) {
                                // Принимаем соединение
                                accept(key);
                            } else if (key.isConnectable()) {
                                // Устанавливаем соединение
                                connect(key);
                            } else if (key.isReadable()) {
                                // Читаем данные
                                read(key);
                            } else if (key.isWritable()) {
                                // Пишем данные
                                write(key);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            close(key);
                        }
                    }
                }
            }
 
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalStateException(e);
        }
    }
 
    /**
     * Функция принимает соединение, регистрирует ключ с интересуемым действием
     * чтение данных (OP_READ)
     * 
     * @param key
     *            ключ на котором произошло событие
     * @throws IOException
     * @throws ClosedChannelException
     */

    private void accept(SelectionKey key) throws IOExceptionClosedChannelException {
        // Приняли
        SocketChannel newChannel = ((ServerSocketChannel) key.channel()).accept();
        // Неблокирующий
        newChannel.configureBlocking(false);
        // Регистрируем в селекторе
        newChannel.register(key.selector()SelectionKey.OP_READ);
    }
 
    /**
     * Читаем данные доступные в данный момент. Функция бывает в двух состояних -
     * чтение заголовка запроса и непосредственного проксирование
     * 
     * @param key
     *            ключ на котором произошло событие
     * @throws IOException
     * @throws UnknownHostException
     * @throws ClosedChannelException
     */

    private void read(SelectionKey key) throws IOExceptionUnknownHostExceptionClosedChannelException {
        SocketChannel channel = ((SocketChannel) key.channel());
        Attachment attachment = ((Attachment) key.attachment());
        if (attachment == null) {
            // Лениво инициализируем буферы
            key.attach(attachment = new Attachment());
            attachment.in = ByteBuffer.allocate(bufferSize);
        }
        if (channel.read(attachment.in) < 1) {
            // -1 - разрыв 0 - нету места в буфере, такое может быть только если
            // заголовок превысил размер буфера
            close(key);
        } else if (attachment.peer == null) {
            // если нету второго конца :) стало быть мы читаем заголовок
            readHeader(key, attachment);
        } else {
            // ну а если мы проксируем, то добавляем ко второму концу интерес
            // записать
            attachment.peer.interestOps(attachment.peer.interestOps() | SelectionKey.OP_WRITE);
            // а у первого убираем интерес прочитать, т.к пока не записали
            // текущие данные, читать ничего не будем
            key.interestOps(key.interestOps() ^ SelectionKey.OP_READ);
            // готовим буфер для записи
            attachment.in.flip();
        }
    }
 
    private void readHeader(SelectionKey key, Attachment attachment) throws IllegalStateExceptionIOException,
            UnknownHostExceptionClosedChannelException {
        byte[] ar = attachment.in.array();
        if (ar[attachment.in.position() - 1] == 0 ) {
            // Если последний байт \0 это конец ID пользователя.
            if (ar[0 ] !4 && ar[1] !1 || attachment.in.position() < 8) {
                // Простенькая проверка на версию протокола и на валидность
                // команды,
                // Мы поддерживаем только conect
                throw new IllegalStateException("Bad Request");
            } else {
                // Создаём соединение
                SocketChannel peer = SocketChannel.open();
                peer.configureBlocking(false);
                // Получаем из пакета адрес и порт
                byte[] addr = new byte[] { ar[4], ar[5], ar[6], ar[7] };
                int p = (((0xFF & ar[2]) << 8) + (0xFF & ar[3]));
                // Начинаем устанавливать соединение
                peer.connect(new InetSocketAddress(InetAddress.getByAddress(addr), p));
                // Регистрация в селекторе
                SelectionKey peerKey = peer.register(key.selector()SelectionKey.OP_CONNECT);
                // Глушим запрашивающее соединение
                key.interestOps(0 );
                // Обмен ключами :)
                attachment.peer = peerKey;
                Attachment peerAttachemtn = new Attachment();
                peerAttachemtn.peer = key;
                peerKey.attach(peerAttachemtn);
                // Очищаем буфер с заголовками
                attachment.in.clear();
            }
        }
    }
 
    /**
     * Запись данных из буфера
     * 
     * @param key
     * @throws IOException
     */

    private void write(SelectionKey key) throws IOException {
        // Закрывать сокет надо только записав все данные
        SocketChannel channel = ((SocketChannel) key.channel());
        Attachment attachment = ((Attachment) key.attachment());
        if (channel.write(attachment.out) == -1) {
            close(key);
        } else if (attachment.out.remaining() == 0 ) {
            if (attachment.peer == null) {
                // Дописали что было в буфере и закрываемся
                close(key);
            } else {
                // если всё записано, чистим буфер
                attachment.out.clear();
                // Добавялем ко второму концу интерес на чтение
                attachment.peer.interestOps(attachment.peer.interestOps() | SelectionKey.OP_READ);
                // А у своего убираем интерес на запись
                key.interestOps(key.interestOps() ^ SelectionKey.OP_WRITE);
            }
        }
    }
 
    /**
     * Завершаем соединение
     * 
     * @param key
     *            ключ на котором произошло событие
     * @throws IOException
     */

    private void connect(SelectionKey key) throws IOException {
        SocketChannel channel = ((SocketChannel) key.channel());
        Attachment attachment = ((Attachment) key.attachment());
        // Завершаем соединение
        channel.finishConnect();
        // Создаём буфер и отвечаем OK
        attachment.in = ByteBuffer.allocate(bufferSize);
        attachment.in.put(OK).flip();
        attachment.out = ((Attachment) attachment.peer.attachment()).in;
        ((Attachment) attachment.peer.attachment()).out = attachment.in;
        // Ставим второму концу флаги на на запись и на чтение
        // как только она запишет OK, переключит второй конец на чтение и все
        // будут счастливы
        attachment.peer.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
        key.interestOps(0 );
    }
 
    /**
     * No Comments
     * 
     * @param key
     * @throws IOException
     */

    private void close(SelectionKey key) throws IOException {
        key.cancel();
        key.channel().close();
        SelectionKey peerKey = ((Attachment) key.attachment()).peer;
        if (peerKey !null) {
            ((Attachment)peerKey.attachment()).peer=null;
            if((peerKey.interestOps()&SelectionKey.OP_WRITE)==0 ) {
                ((Attachment)peerKey.attachment()).out.flip();
            }
            peerKey.interestOps(SelectionKey.OP_WRITE);
        }
    }
 
    public static void main(String[] args) {
        Socks4Proxy server = new Socks4Proxy();
        server.host = "127.0.0.1";
        server.port = 1080;
        server.run();
    }
}


Далее открываем ваш любимый браузер, выбираем socks 4 proxy вводим 127.0.0.1:1080 и проверяем работоспособность.
+32
25 сентября 2009, 21:19
30

комментарии (26)

+3
dgreen #
Была бы карма :) запостил бы в Java
+1
dive #
теперь попробуйте.
спасибо за статью.
+3
dgreen #
:) ну постараюсь в ближайщих статьях, осветить создание своего сокет-сервера делающего чего-нить полезное, с неблокирующим коннектором и многопоточными воркерами :)
–5
dotCypress #
Джава — красивый язык, но мне как опологету C# ненравится стиль оформления.
Я даже в свободное время игрушку написал для смей мобилы )
+1
akuznetsov #
Спасибо за статью. На сколько я понял у вас только один поток отвечает за работу сокетами. Что бы увеличить их число нужна ли какая либо дополнительная синхронизация?
+2
dgreen #
Selector и должен работать в одном потоке, ключам даже interestOps поменять из другого нельзя.
Пример использования нескольких потоков, планируется в следующем посте, обычно использую блокирующую очередь и N воркеров для этой задачи.

Если предложите что полезного бы сервер мог делать, сделаю пример на базе этих полезных действий.
0
akuznetsov #
Ну на пример кеш, со стандартным набором команд: положить, взять, удалить, для простого типа String. На политики очистки кеша можно не заморачиваться.
0
dgreen #
Ну тогда хранилище с простым протоколом для хранения текстовых данных,
данные будут складываться например в lucene с возможность полнотекстового поиска.

Команда на положить:

>key \n
data data data\n
data data data\n
data 123 data data\n
\n

команда на поискать.

?123 data\n

и дальше соответственно ответ.

Поисковые задачи будут соответственно распаралелены.

0
akuznetsov #
Для примера достаточно складывать просто в ConcurrentMap, и доставать только по ключу. Если понадобиться можно легко будет перейти на другое хранилище.
0
dgreen #
Вообще для операции проксирования, одного потока вполне достаточно, т.к. все методы неблокируюшие, процессор тратится только на работу с доступными данными, размер которых на одной итерации ограничен размером сокет буфера или байтбуфера в который происходит чтение. Скорее всего такой сервер сможет достаточно безпроблемно прокачать 100мБ канал.

+1
insa #
Раз у нас Java >= 1.5, то вот это безобразие…

Iterator <SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();

}

… наверно можно в одну строчку переписать?
for(SelectionKey key: selector.selectedKeys()) {...}

// -----------8K-------------

Byte[] ar;
int p = (((0xFF & ar[2]) << 8) + (0xFF & ar[3]));

«О амперсанд, безсмысленный и беспощадный!»
Разве в Byte может быть что-то болшее чем 0xFF?

int p = (ar[2] << 8) | ar[3];

// -----------8K-------------

Attachment peerAttachemtn // — опечатка

// -----------8K-------------

p.s. вообще такие вещи проще писать на чистом Си с использованием libevent.
+3
Nevil #
>Iterator iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();

Нельзя, при одновременном проходе по коллеции и удалении элементов из нее цикл foreach вызовет ConcurrentModificationException (http://www.java2s.com/Code/Java/Collections-Data-Structure/IterateaCollectionandremoveanitemExceptionwrongversion.htm)
Для этих целей и нужен iterator
0
Nuru #
Нельзя, при одновременном проходе по коллеции и удалении элементов из нее цикл foreach вызовет ConcurrentModificationException ну для этих целей ещё можно проходится по коллекции в обратном направлении. Помогает
0
Nuru #
Блин, только проснулся) Сделаем по — другому…

>>Нельзя, при одновременном проходе по коллеции и удалении элементов из нее цикл foreach вызовет ConcurrentModificationException
Ну, для этих целей ещё можно проходится по коллекции в обратном направлении. Помогает
0
dgreen #
А можно пример для Сета :)
0
Nuru #
for (int i = collection.size() — 1; i > 0; i--)
collection.remove(i);
0
dgreen #
java.sun.com/javase/6/docs/api/java/util/Set.html
не вижу remove по индексу, в сете вообще нету индексов :) и правда ещё не проснулся :)
0
Nuru #
>>А можно пример для Сета :)

Омг)))) Я понял о чем вы говорите))))) Так, я пошёл делать себе кофе )
+1
mraleph #
амперсанд тут по делу… рекомендую выполнить

byte x = (byte)0xFF;
byte y = (byte)0xFF;
System.out.println (((0xFF & x) << 8) + (0xFF & y));
System.out.println ((x << 8) | y);

и удивиться. дело в том, что в Java побитовые операции применяются только к интам и лонгам (в байткоде не предусмотрены побитовые операции для байтов), поэтому перед применением операции сдвига происходит знаковое расширение байта до инта…
0
insa #
Спасибо. Крутое западло, не знал.
+3
dborovikov #
Отличная статья! Хочу лишь заметить, что для реальных задачь все же лучше использовать Apache Mina — обертку над NIO.
+1
johndow #
Я смотрел на MINA, на Grizzly и Netty. Остановился в конце-концов на Netty.
0
zubrabubra #
Зачем Runnable? Если нужен все таки поток, то вы не через run() start-уйте ;-)
Надеюсь у вас появится возможность сделать сравнение linux kern2.6 vs kern2.4 vs xp.
+1
maxcom #
accept лучше делать в отдельном треде классическим блокирующим способом, lattency от этого сильно выигрывает
0
dgreen #
Не поспоришь, конечно же аццептить будет намного быстрее, только это может вызвать усложнение кода при регистрации сокета в селекторе, скорее всего это операция возможна только в том же треде, где происходит select(), по крайней мере с установкой interstOps всё именно так и приходится кидать ChangeRequest в очередь на выполнение и кричать selector.wakeUp(), т.е. после accept скорее всего надо будет сделать тоже самое, только RegisterRequest.
0
maxcom #
да, но выигрышь того стоит. Плюс еще в acceptor'е можно разбрасывать соединения на несколько реакторов (чтобы лучше загружать ядра)

Только зарегистрированные пользователи могут оставлять комментарии. Войдите, пожалуйста.