Pull to refresh

Java EE, JCA и jNode 2.X announce

Reading time12 min
Views11K

Доброго времени суток, %username%.
Скажу сразу, на 99% данный пост про Java EE Connector Architecture, с примерами кода. Откуда взялся 1% про Fidonet вы поймете в самом конце.

Резюме для ленивых
JMS и JCA — родственники, входящие принимает MessageDrivenBean, исходящие отправляются через ConnectionFactory.
Минимальный пакет для входящего соединения — 4 класса, для исходящего — 8 классов и настройка адаптера на стороне сервера приложений.
Дальше — только подробности и боль


Для начала — история вопроса и решение бизнес-задачи.

Постановка задачи


Мне поставили задачу об интеграции существующей бизнес-системы («система А» ) с другой системой, которая была разработана много лет назад и понимает только собственный протокол передачи данных («система B»). Модифицировать чужие системы нельзя, соответственно задача свелась к написанию некой шины/прокси. Интеграция состоит в передаче туда-сюда сообщений с конвертацией их в процессе из одного формата в другой.

Система «А» имела много современных механизмов интеграции, самым простым для использования были признаны веб-сервисы. Под это дело был оперативно запилен стандартный интеграционный скелет для JEE — JAX-WS+EJB+JMS для гарантированной доставки сообщения.
А вот для работы с системой «B» стандартных средств не было. Робкие попытки поработать с сетью из контекста EJB успехом не увенчались, гугл подсказал два варианта решения проблемы: костылить сервлеты для работы с non-http или написать JCA-адаптер. Понятно, что был выбран второй путь — с JCA я до этого не работал, а узнать что-то новое всегда интересно.

Исследование


Начав копать гугл, я достаточно сильно обломался. Везде писали, ЧТО именно нужно сделать ( коннектор, менеджер, адаптер итд ), но почти нигде не писали, КАК это сделать. Стандартный способ «посмотреть чужой код и понять процесс» дал сбой — чужого кода был такой мизер, что понять что-то мне не представлялось возможным.

Спасли меня две вещи: JSR 322 и единственный нагугленный адаптер на google code. Собственно, это и стало отправной точкой — задеплоив примеры из jca-sockets и открыв pdf, я начал разбираться и путем научного тыка понимать, как оно собственно работает.

Потратив около 16 часов на исследование и эксперименты, я выяснил следующее:

JCA-модуль имеет внутри себя две независимых части: «Входящие» и «Исходящие». Эти части могут быть как вместе, так и по-отдельности. Более того, их может быть несколько. Сам модуль регистритуется классом, реализующим javax.resource.spi.ResourceAdapter и указанным в META-INF/ra.xml, при этом ResourceAdapter нужен в первую очередь для работы с Входящими; Для Исходящих адаптер ничего не делает и его скелет можно даже не заполнять.

Входящие


Входящий канал биндится к MessageEndpoint'у ( обычно это @MessageDrivenBean; да-да, JCA это кишки JMS ) и активируется ActivationSpec'ом.
META-INF/ra.xml — описание ResourceAdapter'а и inbound потоков
ra.xml
<connector xmlns="http://xmlns.jcp.org/xml/ns/javaee"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
           http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd"
           version="1.7" metadata-complete="true">

   <vendor-name>xxx-services</vendor-name>
   <eis-type>FidoNet</eis-type>
   <resourceadapter-version>2.5</resourceadapter-version>
   <resourceadapter>
<!-- Класс, который реализует javax.resource.spi.ResourceAdapter; config-property - поля, должны быть доступны через геттеры/сеттеры -->
      <resourceadapter-class>in.fidonode.binkp.ra.BinkpServerResourceAdapter</resourceadapter-class>
      <config-property>
         <config-property-name>version</config-property-name>
         <config-property-type>java.lang.String</config-property-type>
         <config-property-value>jnode-jee 2.5 binkp/1.1</config-property-value>
      </config-property>
<!-- Описание входящего потока -->
      <inbound-resourceadapter>
         <messageadapter>
            <messagelistener>
<!-- Интерфейс, который должен реализовать @MessageDrivenBean для того, чтоб получать сообщения с этого адаптера -->
               <messagelistener-type>in.fidonode.binkp.ra.BinkpMessageListener</messagelistener-type>
               <activationspec>
<!-- Класс-холдер параметров, которая передается через @ActivationConfigProperty, должны быть доступны геттеры и сеттеры параметров -->
                  <activationspec-class>in.fidonode.binkp.ra.BinkpActivationSpec</activationspec-class>
<!-- Список обязательных параметров -->
                  <required-config-property>
                     <config-property-name>listenPort</config-property-name>
                  </required-config-property>
<!-- Описание параметров параметров -->
                  <config-property>
                     <config-property-name>listenPort</config-property-name>
                     <config-property-type>java.lang.Integer</config-property-type>
                     <config-property-value>24554</config-property-value>
                  </config-property>
               </activationspec>
            </messagelistener>
         </messageadapter>
      </inbound-resourceadapter>
   </resourceadapter>
</connector>



Интерфейс BinkpMessageListener — для клиентов и должен быть в classpath;

Приведу его тут:
public interface BinkpMessageListener {
	public void onMessage(FidoMessage message);
}


Теперь рассмотрим простейшую реализацию ResourceAdapter
BinkpServerResourceAdapter.java
public class BinkpServerResourceAdapter implements ResourceAdapter, Serializable {

	private static final long serialVersionUID = 1L;
	private static Logger log = Logger.getLogger(BinkpServerResourceAdapter.class
			.getName());
	private ConcurrentHashMap<BinkpActivationSpec, BinkpEndpoint> activationMap = 
			new ConcurrentHashMap<BinkpActivationSpec, BinkpEndpoint>();
	private BootstrapContext ctx;
	private String version;

	@Override
	public void endpointActivation(MessageEndpointFactory endpointFactory,
			ActivationSpec spec) throws ResourceException {
		BinkpEndpoint activation = new BinkpEndpoint(ctx.getWorkManager(),
				(BinkpActivationSpec) spec, endpointFactory);
		activationMap.put((BinkpActivationSpec) spec, activation);
		activation.start();

		log.info("endpointActivation(" + activation + ")");

	}
	@Override
	public void endpointDeactivation(MessageEndpointFactory endpointFactory,
			ActivationSpec spec) {
		BinkpEndpoint activation = activationMap.remove(spec);
		if (activation != null)
			activation.stop();

		log.info("endpointDeactivation(" + activation + ")");

	}
	@Override
	public void start(BootstrapContext ctx)
			throws ResourceAdapterInternalException {
		this.ctx = ctx;
		log.info("start()");

	}
	@Override
	public void stop() {
		for (BinkpEndpoint act : activationMap.values()) {
			act.stop();
		}
		activationMap.clear();
		log.info("stop()");
	}

	@Override
	public XAResource[] getXAResources(ActivationSpec[] arg0)
			throws ResourceException {
		return null;
	}

	public String getVersion() {
		return version;
	}

	public void setVersion(String version) {
		this.version = version;
	}

}



Что тут происходит? При загрузке JCA-модуля создается экземпляр класса BinkpServerResourceAdapter, у него заполняются параметры ( в данном случае — поле version) и вызывается метод start().
На самом деле, внутри метода start() можно делать много всего, но в данном примере мы просто сохраняем контекст для получения из него в дальнейшем WorkManager'а.

Когда сервер приложений находит @MessageDrivenBean, он пытается найти адаптер, который отправляет сообщения на тот интерфейс, который реализует бин. Для JMS это MessageListener, у нас это BinkpMessageListener. Создается ActivationSpec ( у нас это BinkpActivationSpec, реализующий javax.resource.spi.ActivationSpec), поля в котором заполняются согласно данным в activationConfig, создается MessageEndpointFactory и вызывается ResourceAdapter.endpointActivation(). В этой функции необходимо создать тот «сервер», который будет принимать входящие соединения, будь то tcp/ip сервер или поток для работы с unix-socket, создать на основе того конфига который был в MDB. Класс BinkpEndpoint — это и есть тот самый «сервер».
BinkpEndpoint.java
public class BinkpEndpoint implements Work, FidoMessageListener {
	private static final Logger logger = Logger.getLogger(BinkpEndpoint.class
			.getName());
	private BinkpServer server;
	private final WorkManager workManager;
	private final MessageEndpointFactory messageEndpointFactory;

	public BinkpEndpoint(WorkManager workManager,
			BinkpActivationSpec activationSpec,
			MessageEndpointFactory messageEndpointFactory) {
		this.workManager = workManager;
		this.messageEndpointFactory = messageEndpointFactory;
		server = new BinkpServer(activationSpec.getListenPort(), this);
	}

	public void start() throws ResourceException {
		workManager.scheduleWork(this);
	}


	public void stop() {
			if (server != null) {
				server.stop();
			}
	}
	/** из FidoMessageListener **/
	@Override
	public Message incomingMessage(FidoMessage message) {
			String message = msg.encode();
			BinkpMessageListener listener = (BinkpMessageListener) messageEndpointFactory
					.createEndpoint(null);
			listener.onMessage(message);
	}
	/** из Work **/
	@Override
	public void run() {
		server.start();

	}
	/** из Work **/
	@Override
	public void release() {
		stop();
	}

}



Можно заметить, что везде фигурируют некие endpoint'ы. У меня был с этим некоторый затык, поэтому расшифрую:
Endpoint — это то, что слушает «входящий» поток. Именно к нему относятся функции endpointActication
MessageEndpoint — экземпляр MDB, который обрабатывает то или иное сообщение. Получается вызовом MessageEndpointFactory.createEndpoint() ( Эту функцию нельзя вызывать из основного треда ). Он легко кастится к интерфейсу MDB.

Собственно, все. Реализацию BinkpServer за ненадобностью опущу, но принцип должен быть понятен, минимальный «Входящий» JCA делается из четырех классов ( ResourceAdapter, MessageListener, ActivationSpec, Endpoint )

Создание Endpoint'а и обработка входящих:
@MessageDriven(messageListenerInterface = BinkpMessageListener.class, 
activationConfig = { @ActivationConfigProperty(propertyName = "listenPort", propertyValue = "24554") })
public class ReceiveMessageBean implements BinkpMessageListener {

	@Override
	public void onMessage(FidoMessage msg) {
		// do smth with mesaage
	}
}


Исходящие



А вот тут — все веселее, минимальный «Исходящий» JCA делается аж из 8 классов, что в 2 раза больше чем «Входящий». Но давайте по-порядку.

META-INF/ra.xml — описание ResourceAdapter'а и outbound потоков

ra.xml
<connector xmlns="http://xmlns.jcp.org/xml/ns/javaee"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
           http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd"
           version="1.7" metadata-complete="true">

   <vendor-name>xxx-services</vendor-name>
   <eis-type>FidoNet</eis-type>
   <resourceadapter-version>2.5</resourceadapter-version>
   <resourceadapter>
<!-- Класс, который реализует javax.resource.spi.ResourceAdapter; config-property - поля, должны быть доступны через геттеры/сеттеры -->
      <resourceadapter-class>in.fidonode.binkp.ra.BinkpServerResourceAdapter</resourceadapter-class>
      <config-property>
         <config-property-name>version</config-property-name>
         <config-property-type>java.lang.String</config-property-type>
         <config-property-value>jnode-jee 2.5 binkp/1.1</config-property-value>
      </config-property>
<!-- Описание исходящего потока. Всегда ПЕРЕД входящим -->
 <outbound-resourceadapter>
         <connection-definition>
<!-- Фабрика с которой будет работать JEE-сервер, она создает фабрику для создания соединений -->
            <managedconnectionfactory-class>in.fidonode.binkp.ra.ManagedConnectionFactory</managedconnectionfactory-class>
<!-- Фабика соединений, которая будет отдаваться по запросу. Интерфейс должен быть у клиента в classpath -->
            <connectionfactory-interface>in.fidonode.binkp.ra.ConnectionFactory</connectionfactory-interface>
            <connectionfactory-impl-class>in.fidonode.binkp.ra.ConnectionFactoryImpl</connectionfactory-impl-class>
<!-- Соединение, которое будет отдавать фабрика соединений. Интерфейс должен быть у клиента в classpath -->
            <connection-interface>in.fidonode.binkp.ra.Connection</connection-interface>
            <connection-impl-class>in.fidonode.binkp.ra.ConnectionImpl</connection-impl-class>
         </connection-definition>
<!-- Про транзакции и аутентификацию есть отдельный талмуд, я этот момент пропустил -->
         <transaction-support>NoTransaction</transaction-support>
         <reauthentication-support>false</reauthentication-support>
      </outbound-resourceadapter>
<!-- Описание входящего потока -->
      <inbound-resourceadapter>
<!-- ... -->
      </inbound-resourceadapter>
   </resourceadapter>
</connector>



Интерфейсы Connection и ConnectionFactory — для клиентов и должны быть в classpath. Сразу приведу их тут, там ничего интересного.

BinkpClient приводить не буду :-)
public interface Connection {
	public BinkpClient connect(String hostname, int port);
}
public interface ConnectionFactory {
	public Connection createConnection();
}


Соединения бывают Managed и Unmanaged. Первые — со свистелками, listener'ами и другим, вторые — без.
Класс, реализующий ManagedConnectionFactory, должен уметь создавать оба типа соединений.
ManagedConnectionFactory.java
public class ManagedConnectionFactory implements
		javax.resource.spi.ManagedConnectionFactory {
	private PrintWriter logwriter;
	private static final long serialVersionUID = 1L;

	/**
	 * Создание фабрики для unmanaged-соединений
	 */
	@Override
	public Object createConnectionFactory() throws ResourceException {
		return new ConnectionFactoryImpl();
	}

	/**
	 * Создание managed-фабрики для managed-connection
	 */
	@Override
	public Object createConnectionFactory(ConnectionManager cxManager)
			throws ResourceException {
		return new ManagedConnectionFactoryImpl(this, cxManager);
	}

	/**
	 * Создание managed-соединения
	 */
	@Override
	public ManagedConnection createManagedConnection(Subject subject,
			ConnectionRequestInfo cxRequestInfo) throws ResourceException {
		return new in.fidonode.binkp.ra.ManagedConnection();
	}

	@Override
	public PrintWriter getLogWriter() throws ResourceException {
		return logwriter;
	}

	@SuppressWarnings("rawtypes")
	@Override
	public ManagedConnection matchManagedConnections(Set connectionSet,
			Subject subject, ConnectionRequestInfo cxRequestInfo)
			throws ResourceException {
		ManagedConnection result = null;
		Iterator it = connectionSet.iterator();
		while (result == null && it.hasNext()) {
			ManagedConnection mc = (ManagedConnection) it.next();
			if (mc instanceof in.fidonode.binkp.ra.ManagedConnection) {
				result = mc;
			}

		}
		return result;
	}

	@Override
	public void setLogWriter(PrintWriter out) throws ResourceException {
		logwriter = out;

	}

}



Когда приложение запрашивает у JEE-сервера тот или иной коннектор, сервер приложений просит ManagedConnectionFactory создать ConnectionFactory и отдает его приложению.

Как можно заметить, ConnectionFactory тоже бывает Managed и Unmanaged. В принципе все это можно свести к одному классу, но это сильно зависит от того, что именно и как мы передаем, есть ли там транзакции итд.
ConnectionFactoryIml просто делает new ConnectionImpl(), а вот ManagedConnectionFactoryImpl чуть посложнее:

ManagedConnectionFactoryImpl.java
public class ManagedConnectionFactoryImpl implements ConnectionFactory {
	private ManagedConnectionFactory factory;
	private ConnectionManager manager;

	public ManagedConnectionFactoryImpl(ManagedConnectionFactory factory,
			ConnectionManager manager) {
		super();
		this.factory = factory;
		this.manager = manager;
	}
/** создает managed-соединение через родителя-ManagedConnectionFactory **/
	@Override
	public Connection createConnection() {
		try {
			return (Connection) manager.allocateConnection(factory, null);
		} catch (ResourceException e) {
			return null;
		}
	}

}



ManagedConnection, реализующий javax.resource.spi.ManagedConnection — это обертка для интерфейса Connection, которая как-раз добавляет свистелок и listener'ов. Именно этот класс возвращает ManagedConnectionFactory.createManagedConnection(), которую мы вызываем при создании соединения из ManagedConnectionFactoryImpl.createConnection() через ConnectionManager.allocateConnection()

ManagedConnection.java
public class ManagedConnection implements javax.resource.spi.ManagedConnection {
	private PrintWriter logWriter;
	private Connection connection;
	private List<ConnectionEventListener> listeners;

	public ManagedConnection() {
		listeners = Collections
				.synchronizedList(new ArrayList<ConnectionEventListener>());
	}

	@Override
	public void associateConnection(Object connection) throws ResourceException {
		if (connection != null && connection instanceof Connection) {
			this.connection = (Connection) connection;
		}
	}

	@Override
	public Object getConnection(Subject subject,
			ConnectionRequestInfo cxRequestInfo) throws ResourceException {
		if (connection == null) {
			connection = new ManagedConnectionImpl();
		}
		return connection;
	}

	@Override
	public void cleanup() throws ResourceException {

	}

	@Override
	public void destroy() throws ResourceException {

	}

	@Override
	public PrintWriter getLogWriter() throws ResourceException {
		return logWriter;
	}

	@Override
	public ManagedConnectionMetaData getMetaData() throws ResourceException {
		throw new NotSupportedException();
	}

	@Override
	public XAResource getXAResource() throws ResourceException {
		throw new NotSupportedException();
	}

	@Override
	public LocalTransaction getLocalTransaction() throws ResourceException {
		return null;
	}

	@Override
	public void setLogWriter(PrintWriter out) throws ResourceException {
		logWriter = out;

	}

	@Override
	public void addConnectionEventListener(ConnectionEventListener listener) {
		if (listener != null) {
			listeners.add(listener);
		}
	}

	@Override
	public void removeConnectionEventListener(ConnectionEventListener listener) {
		if (listener != null) {
			listeners.remove(listener);
		}
	}
}



Ну вот, теперь мы подошли к самому простому — реализации соединения :-)
public class ConnectionImpl implements Connection {

	@Override
	public BinkpClient connect(String hostname, int port) {
		return new BinkpClient(hostname, port);
	}

}


Итоговая цепочка вызовов для установления исходящего соединения
ManagedConnectionFactory.createConnectionFactory()
->ManagedConnectionFactoryImpl.createConnection()
-->СonnectionManager.allocateConnection()
--->ManagedConnectionFactory.createManagedConnection()
---->ManagedConnection.getConnection()
----->ManagedConnectionImpl.connect()

Ну и не забудьте настроить сервер приложений для работы с этим адаптером, ну и jndi указать.

Код для вызова:
	private BinkpClient createBinkpClient(String host, int port) {
		ConnectionFactory cf = ((ConnectionFactory) new InitialContext().lookup("java:eis/BinkpConnectionFactory"));
		Connection conn = cf.getConnection();
		return conn.connect(host, port);
	}

А причем тут Фидо?



А почти непричем. Дело в том, что изначальная задача была вовсе не о binkp, но она была рабочей, а значит попадала под NDA. Поэтому, разобравшись с JCA и решив что нужно написать статью на Хабре ( кстати, оглядываясь назад я начинаю понимать, почему никто такую статью еще не написал. И это еще без транзакций! ), я разморозил старую идею — форк jnode для JEE-серверов, для запуска ноды в виде одного ear. В свое время именно знаний JCA мне не хватило для того, чтобы запустить проект :-)

Под это все я написал вышеприведенные примеры, и они даже заработали. Так что если вы хотите потренироваться в java ee вообще и рефакторинге из java se в частности — пишите письма и коммитьте код. Да, пойнтов все еще беру.

Спасибо за внимание, оставайтесь с нами. Опечатки можно писать в комментариях, я даже не сомневаюсь что их тут десятки.
Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
+8
Comments5

Articles