Pull to refresh

Сигналы на c#

Reading time4 min
Views14K
Доброго времени суток Хабр. Вдохновленный моделью синхронизации потоков в go и сигналов в QT появилась идея реализовать нечто подобное на c#.

image

Если интересно, прошу под кат.

В данный момент синхронизация потоков в c# вызывает некоторые затруднения, в частности передача примитивов синхронизации между объектами Вашего приложения и поддержка этого всего в дальнейшем.

Текущая модель с Task и IAsyncResult а так же TPL в целом решают все проблемы при должном проектировании но хотелось создать простой класс через который можно будет отправлять и принимать сигналы с блокировкой потока.

В общем в голове созрел некий интерфейс:

	public interface ISignal<T> : IDisposable
	{
		void Send(T signal);

		T Receive();

		T Receive(int timeOut);
	}

, где T — сущность которую необходимо передать получателю.

Пример вызова:

		[TestMethod]
		public void ExampleTest()
		{
			var signal = SignalFactory.GetInstanse<string>();
			var task1 = Task.Factory.StartNew(() => // старт потока
			{
				Thread.Sleep(1000);
				signal.Send("Some message");
			});
			// блокировка текущего потока
			string message = signal.Receive();
			Debug.WriteLine(message);
		}

Для получения объекта сигнала создадим фабрику.

	public static class SignalFactory 
	{
		public static ISignal<T> GetInstanse<T>()
		{
			return new Signal<T>();
		}

		public static ISignal<T> GetInstanse<T>(string name)
		{
			return new CrossProcessSignal<T>(name);
		}
	}

Signal — internal класс для синхронизации внутри одного процесса. Для синхронизации необходима ссылка на объект.

CrossProcessSignal — internal класс который может синхронизировать потоки в отдельных процессах(но об этом чуть позже).

Теперь о реализации Signal


Первое, что приходит на ум, в Receive блокировать выполнение потока с помощью Semaphore а в методе Send вызывать Release() этого семафора с количеством блокированных потоков.
После разблокировки потоков возвращать результат из поля класса T buffer. Но мы не знаем какое количество потоков будет висеть в Receive и нет гарантии что к вызову Release не подбежит еще пара потоков.

В качестве примитива синхронизации был выбран AutoResetEvent. Для каждого нового потока будет создаваться свой AutoResetEvent, хранить все это добро мы будем в словаре Dictionary<int,AutoResetEvent> где ключ это id потока.

Собственно поля класса выглядят так:

private T buffer;

Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>();

private volatile object sync = new object();

private bool isDisposabled = false;

Объект sync будет нам необходим при вызове Send, дабы несколько потоков не начали перетирать буфер.

isDisposabled флаг указывающий был ли вызван Dispose(), если не вызван то вызываем его в деструкторе.

public void Dispose()
{
	foreach(var resetEvent in events.Values)
	{
		resetEvent.Dispose();
	}
	isDisposabled = true;
}
~Signal()
{
	if (!isDisposabled)
	{
		Dispose();
	}
}

Теперь о методе Receive.

		public T Receive()
		{
			var waiter = GetEvents();
			waiter.WaitOne();
			waiter.Reset();
			return buffer;
		}

GetEvents() достает из словаря AutoResetEvent если есть, если нет то создает новый и кладет его в словарь.

waiter.WaitOne() блокировка потока до ожидания сигнала.

waiter.Reset() сброс текущего состояния AutoResetEvent. Следующий вызов WaitOne приведет к блокировке потока.

Осталось только вызвать метод Set для каждого AutoResetEvent.

public void Send(T signal)
{
	lock (sync)
	{
		buffer = signal;
		foreach(var autoResetEvent in events.Values)
		{
			autoResetEvent.Set();
		}
	}
}

Проверить данную модель можно тестом:

Тест
private void SendTest(string name = "")
{
	ISignal<string> signal;
	if (string.IsNullOrEmpty(name))
	{
		 signal = SignalFactory.GetInstanse<string>(); // создаем локальный сигнал
	}
	else
	{
		signal = SignalFactory.GetInstanse<string>(name);
	}

	var task1 = Task.Factory.StartNew(() => // старт потока
	{
		for (int i = 0; i < 10; i++)
		{
			// блокировка потока, ожидание сигнала
			var message = signal.Receive();
			Debug.WriteLine($"Thread 1 {message}");
		}
		});
	var task2 = Task.Factory.StartNew(() => // старт потока
	{
		for (int i = 0; i < 10; i++)
		{
			// блокировка потока, ожидание сигнала
			var message = signal.Receive();
			Debug.WriteLine($"Thread 2 {message}");
		}
	});

	for (int i = 0; i < 10; i++)
	{
		// отправка сигнала ожидающим потокам.
		signal.Send($"Ping {i}");
		Thread.Sleep(50);
	}

}

Листинг класса Signal
using System.Collections.Generic;
using System.Threading;

namespace Signal
{
	internal class Signal<T> : ISignal<T>
	{
		private T buffer;

		Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>();

		private volatile object sync = new object();

		private bool isDisposabled = false;

		~Signal()
		{
			if (!isDisposabled)
			{
				Dispose();
			}
		}

		public T Receive()
		{
			var waiter = GetEvents();
			waiter.WaitOne();
			waiter.Reset();
			return buffer;
		}

		public T Receive(int timeOut)
		{
			var waiter = GetEvents();
			waiter.WaitOne(timeOut);
			waiter.Reset();
			return buffer;
		}

		public void Send(T signal)
		{
			lock (sync)
			{
				buffer = signal;
				foreach(var autoResetEvent in events.Values)
				{
					autoResetEvent.Set();
				}
			}
		}

		private AutoResetEvent GetEvents()
		{
			var threadId = Thread.CurrentThread.ManagedThreadId;
			AutoResetEvent autoResetEvent;
			if (!events.ContainsKey(threadId))
			{
				autoResetEvent = new AutoResetEvent(false);
				events.Add(threadId, autoResetEvent);
			}
			else
			{
				autoResetEvent = events[threadId];
			}
			return autoResetEvent;
		}

		public void Dispose()
		{
			foreach(var resetEvent in events.Values)
			{
				resetEvent.Dispose();
			}
			isDisposabled = true;
		}
	}
}

Данной реализации есть куда расти в плане надежности. В исходниках есть межпроцессорная реализация этой идеи с передачей сигнала через shared memory, если будет интересно могу написать об этом отдельную статью.

Исходники на Гитхабе
Tags:
Hubs:
+7
Comments42

Articles

Change theme settings