/// Поток независимый от пула потоков для скорости и независимости от алгоримов управления пулом
///
public abstract class ThreadTickObserver : IObserver
{
protected bool finishTickWorker = false;
protected Task tickWorker;
protected ManualResetEventSlim _newTickEvent;
protected ConcurrentQueue
protected int tickCount = 0;
public ThreadTickObserver()
{
_TickQueue = new ConcurrentQueue
_newTickEvent = new ManualResetEventSlim();
tickWorker = new Task(TaskFunc);
tickWorker.Start();
// ожидаем запуска , а то бывает не получает время на закуск
while (tickWorker.Status != TaskStatus.Running) Thread.Sleep(100);
}
public void TaskFunc()
{
Tick tick = null;
while (!finishTickWorker)
{
_newTickEvent.Wait();
while(_TickQueue.TryDequeue(out tick))
TickAction(tick);
_newTickEvent.Reset();
}
}
protected abstract void TickAction(Tick tick);
public virtual void OnNext(Tick tick)
{
tickCount++;
_TickQueue.Enqueue(tick);
_newTickEvent.Set();
}
public virtual void OnError(Exception error)
{
}
public virtual void OnCompleted()
{
if (tickWorker != null)
{
finishTickWorker = true;
_newTickEvent.Set();
tickWorker.Wait();
}
}
public void Dispose()
{
OnCompleted();
}
}