Pull to refresh

Преобразование асинхронных методов в синхронные с возвращением результата в Java

Reading time 3 min
Views 8.1K
В данной статье я бы хотел обсудить с уважаемым сообществом методы синхронизации потоков в Java. Чтобы не увязать в теории, попробуем рассмотреть синхронизацию потоков в задаче преобразования асинхронных методов в синхронные.

Дано


  • JavaSE 6+
  • библиотека с асинхронным методом

void A.asyncMethod(Callback callback);

  • метод, который нужно переопределить, и вернуть из него результат

@Override
Object overridenMethod() {
	return syncMethod();
}


Задача


Преобразовать асинхронный вызов метода в синхронный с возвращением результата.

Решение


Создать метод-обертку для преобразования асинхронного вызова в синхронный.

Метод 1 — блокировка на объекте

Object syncMethod() {
	final Queue<Object> queue = new LinkedList<Object>();
	try {
		A.asyncMethod(new Callback() {
			@Override
			public void onFinish(Object o) {
				synchronized (queue) {
					queue.add(o);
					queue.notify();
				}
			}
		});
		synchronized (queue) {
			while (queue.size() == 0) {
				queue.wait();
			}
			return queue.poll();
		}
	} catch (InterruptedException e) {
		return null;
	}
}

Создаем очередь для возвращения результата. Вызываем асинхронный метод и ждем его выполнения, если у нас еще нет результата в очереди. По завершению выполнения метода, запихиваем объект в очередь и оповещаем ожидающий поток о том что можно продолжить исполнение. Возвращаем объект из очереди. Блокировка проводится на объекте очереди.

Метод 2 — с помощью CountDownLatch

Object syncMethod() {
	final CountDownLatch done = new CountDownLatch(1);
	final Queue<Object> queue = new LinkedList<Object>();
	try {
		A.asyncMethod(new Callback() {
			@Override
			public void onFinish(Object o) {
				try {
					queue.add(o);
				} finally {
					done.countDown();
				}
			}
		});
		done.await();
		return queue.poll();
	} catch (InterruptedException e) {
		return null;
	}
}

Вместо блокировки на объекте используем объект класса CountDownLatch. При его создании задаем начальное значение счетчика 1. После вызова метода ожидаем пока значение счетчика будет 0. По завершению асинхронного метода понижаем счетчик, он становится 0, главная нить просыпается и мы возвращаем объект из очереди.

Метод 3 — с помощью CyclicBarrier

Object syncMethod() {
	final CyclicBarrier barrier = new CyclicBarrier(2);
	final Queue<Object> queue = new LinkedList<Object>();
	try {
		A.asyncMethod(new Callback() {
			@Override
			public void onFinish(Object o) {
				try {
					queue.add(o);
				} finally {
					try {
						barrier.await();
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
					} 
					catch (BrokenBarrierException e) {}
				}
			}
		});
		barrier.await();
		return queue.poll();
	} catch (InterruptedException e) {
		return null;
	} catch (BrokenBarrierException e) {
		return null;
	}
}

При создании экземпляра класса CyclicBarrier указывается количество нитей, которые одновременно могут пройти этот барьер. После вызова асинхронного метода ждем второй нити, которая вызовет barrier.await(). По завершению метода, нить вызвавшая тот же метод у барьера, становится второй, барьер достигает своего ограничения и восстанавливает выполнение всех приостановенных нитей.

Метод 4 — с помощью метода take интерфейса BlockingQueue

Object syncMethod() {
	final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1);
	try {
		A.asyncMethod(new Callback() {
			@Override
			public void onFinish(Object o) {
				try {
					queue.put(o);
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		});
		return queue.take();
	} catch (InterruptedException e) {
		return null;
	}
}

За замечание спасибо apangin. Метод take интерфейса блокирующей очереди (BlockingQueue) при отсутствии елементов в ней останавливает нить и ожидает появления элементов в ней. При появлении элемента, нить восстанавливается и продолжает своё исполнение.

Метод 5 — с помощью Exchanger

Object syncMethod() {
	final Exchanger<Object> exchanger = new Exchanger<Object>();
	try {
		A.asyncMethod(new Callback() {
			@Override
			public void onFinish(Object o) {
				try {
					exchanger.exchange(o);
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		});
		return exchanger.exchange(null);
	} catch (InterruptedException e) {
		return null;
	}
}

Спасибо ashofthedream за метод. Объект класса Exchanger останавливает текущую нить при первом вызове метода exchange до тех пор пока из другой нити с ним не захотят «поменяться» чем-либо. После обмена обе нити, из которых вызваны методы, продолжат своё выполнение.

Итого


В ходе данной статьи мы узнали, на примере практической задачи преобразования асинхронных методов в синхронные, средства для синхронизации потоков в Java 6.
Дополнения, здравая критика и обсуждение недостатков/достоинств этих методов приветствуются.
Tags:
Hubs:
+1
Comments 39
Comments Comments 39

Articles