seefood_diet / Assets / Plugins / UniRx / Scripts / Subjects / ReplaySubject.cs
ReplaySubject.cs
Raw
using System;
using System.Collections.Generic;
using UniRx.InternalUtil;

namespace UniRx
{
    public sealed class ReplaySubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
    {
        object observerLock = new object();

        bool isStopped;
        bool isDisposed;
        Exception lastError;
        IObserver<T> outObserver = EmptyObserver<T>.Instance;

        readonly int bufferSize;
        readonly TimeSpan window;
        readonly DateTimeOffset startTime;
        readonly IScheduler scheduler;
        Queue<TimeInterval<T>> queue = new Queue<TimeInterval<T>>();


        public ReplaySubject()
            : this(int.MaxValue, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
        {
        }

        public ReplaySubject(IScheduler scheduler)
            : this(int.MaxValue, TimeSpan.MaxValue, scheduler)
        {
        }

        public ReplaySubject(int bufferSize)
            : this(bufferSize, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
        {
        }

        public ReplaySubject(int bufferSize, IScheduler scheduler)
            : this(bufferSize, TimeSpan.MaxValue, scheduler)
        {
        }

        public ReplaySubject(TimeSpan window)
            : this(int.MaxValue, window, Scheduler.DefaultSchedulers.Iteration)
        {
        }

        public ReplaySubject(TimeSpan window, IScheduler scheduler)
            : this(int.MaxValue, window, scheduler)
        {
        }

        // full constructor
        public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
        {
            if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize");
            if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window");
            if (scheduler == null) throw new ArgumentNullException("scheduler");

            this.bufferSize = bufferSize;
            this.window = window;
            this.scheduler = scheduler;
            startTime = scheduler.Now;
        }

        void Trim()
        {
            var elapsedTime = Scheduler.Normalize(scheduler.Now - startTime);

            while (queue.Count > bufferSize)
            {
                queue.Dequeue();
            }
            while (queue.Count > 0 && elapsedTime.Subtract(queue.Peek().Interval).CompareTo(window) > 0)
            {
                queue.Dequeue();
            }
        }

        public void OnCompleted()
        {
            IObserver<T> old;
            lock (observerLock)
            {
                ThrowIfDisposed();
                if (isStopped) return;

                old = outObserver;
                outObserver = EmptyObserver<T>.Instance;
                isStopped = true;
                Trim();
            }

            old.OnCompleted();
        }

        public void OnError(Exception error)
        {
            if (error == null) throw new ArgumentNullException("error");

            IObserver<T> old;
            lock (observerLock)
            {
                ThrowIfDisposed();
                if (isStopped) return;

                old = outObserver;
                outObserver = EmptyObserver<T>.Instance;
                isStopped = true;
                lastError = error;
                Trim();
            }

            old.OnError(error);
        }

        public void OnNext(T value)
        {
            IObserver<T> current;
            lock (observerLock)
            {
                ThrowIfDisposed();
                if (isStopped) return;

                // enQ
                queue.Enqueue(new TimeInterval<T>(value, scheduler.Now - startTime));
                Trim();

                current = outObserver;
            }

            current.OnNext(value);
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null) throw new ArgumentNullException("observer");

            var ex = default(Exception);
            var subscription = default(Subscription);

            lock (observerLock)
            {
                ThrowIfDisposed();
                if (!isStopped)
                {
                    var listObserver = outObserver as ListObserver<T>;
                    if (listObserver != null)
                    {
                        outObserver = listObserver.Add(observer);
                    }
                    else
                    {
                        var current = outObserver;
                        if (current is EmptyObserver<T>)
                        {
                            outObserver = observer;
                        }
                        else
                        {
                            outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
                        }
                    }

                    subscription = new Subscription(this, observer);
                }

                ex = lastError;
                Trim();
                foreach (var item in queue)
                {
                    observer.OnNext(item.Value);
                }
            }

            if (subscription != null)
            {
                return subscription;
            }
            else if (ex != null)
            {
                observer.OnError(ex);
            }
            else
            {
                observer.OnCompleted();
            }

            return Disposable.Empty;
        }

        public void Dispose()
        {
            lock (observerLock)
            {
                isDisposed = true;
                outObserver = DisposedObserver<T>.Instance;
                lastError = null;
                queue = null;
            }
        }

        void ThrowIfDisposed()
        {
            if (isDisposed) throw new ObjectDisposedException("");
        }

        public bool IsRequiredSubscribeOnCurrentThread()
        {
            return false;
        }

        class Subscription : IDisposable
        {
            readonly object gate = new object();
            ReplaySubject<T> parent;
            IObserver<T> unsubscribeTarget;

            public Subscription(ReplaySubject<T> parent, IObserver<T> unsubscribeTarget)
            {
                this.parent = parent;
                this.unsubscribeTarget = unsubscribeTarget;
            }

            public void Dispose()
            {
                lock (gate)
                {
                    if (parent != null)
                    {
                        lock (parent.observerLock)
                        {
                            var listObserver = parent.outObserver as ListObserver<T>;
                            if (listObserver != null)
                            {
                                parent.outObserver = listObserver.Remove(unsubscribeTarget);
                            }
                            else
                            {
                                parent.outObserver = EmptyObserver<T>.Instance;
                            }

                            unsubscribeTarget = null;
                            parent = null;
                        }
                    }
                }
            }
        }
    }
}