using System; using Svelto.DataStructures; namespace Svelto.ECS { /// /// Do not use this class in place of a normal polling. /// I eventually realised than in ECS no form of communication other than polling entity components can exist. /// Using groups, you can have always an optimal set of entity components to poll, so EntityStreams must be used /// only if: /// - you want to polling engine to be able to track all the entity changes happening in between polls and not /// just the current state /// - you want a thread-safe way to read entity states, which includes all the state changes and not the last /// one only /// - you want to communicate between EnginesRoots /// class EntitiesStream : IDisposable { internal Consumer GenerateConsumer(string name, uint capacity) where T : unmanaged, IEntityStruct { if (_streams.ContainsKey(TypeRefWrapper.wrapper) == false) _streams[TypeRefWrapper.wrapper] = new EntityStream(); return (_streams[TypeRefWrapper.wrapper] as EntityStream).GenerateConsumer(name, capacity); } public Consumer GenerateConsumer(ExclusiveGroup group, string name, uint capacity) where T : unmanaged, IEntityStruct { if (_streams.ContainsKey(TypeRefWrapper.wrapper) == false) _streams[TypeRefWrapper.wrapper] = new EntityStream(); return (_streams[TypeRefWrapper.wrapper] as EntityStream).GenerateConsumer(group, name, capacity); } internal void PublishEntity(ref T entity, EGID egid) where T : unmanaged, IEntityStruct { if (_streams.TryGetValue(TypeRefWrapper.wrapper, out var typeSafeStream)) (typeSafeStream as EntityStream).PublishEntity(ref entity, egid); else Console.LogDebug("No Consumers are waiting for this entity to change ", typeof(T)); } readonly ThreadSafeDictionary, ITypeSafeStream> _streams = new ThreadSafeDictionary, ITypeSafeStream>(); public void Dispose() { _streams.Clear(); } } interface ITypeSafeStream {} class EntityStream : ITypeSafeStream where T : unmanaged, IEntityStruct { public void PublishEntity(ref T entity, EGID egid) { for (int i = 0; i < _consumers.Count; i++) { if (_consumers[i]._hasGroup) { if (egid.groupID == _consumers[i]._group) { _consumers[i].Enqueue(entity, egid); } } else { _consumers[i].Enqueue(entity, egid); } } } public Consumer GenerateConsumer(string name, uint capacity) { var consumer = new Consumer(name, capacity, this); _consumers.Add(consumer); return consumer; } public Consumer GenerateConsumer(ExclusiveGroup group, string name, uint capacity) { var consumer = new Consumer(group, name, capacity, this); _consumers.Add(consumer); return consumer; } public void RemoveConsumer(Consumer consumer) { _consumers.UnorderedRemove(consumer); } readonly FasterListThreadSafe> _consumers = new FasterListThreadSafe>(); } public struct Consumer : IDisposable where T : unmanaged, IEntityStruct { internal Consumer(string name, uint capacity, EntityStream stream):this() { #if DEBUG && !PROFILER _name = name; #endif _ringBuffer = new RingBuffer>((int) capacity, #if DEBUG && !PROFILER _name #else string.Empty #endif ); _stream = stream; } internal Consumer(ExclusiveGroup group, string name, uint capacity, EntityStream stream) : this(name, capacity, stream) { _group = group; _hasGroup = true; } internal void Enqueue(in T entity, in EGID egid) { _ringBuffer.Enqueue((entity, egid)); } public bool TryDequeue(out T entity) { var tryDequeue = _ringBuffer.TryDequeue(out var values); entity = values.Item1; return tryDequeue; } public bool TryDequeue(out T entity, out EGID id) { var tryDequeue = _ringBuffer.TryDequeue(out var values); entity = values.Item1; id = values.Item2; return tryDequeue; } public void Flush() { _ringBuffer.Reset(); } public void Dispose() { _stream.RemoveConsumer(this); } public uint Count() { return (uint) _ringBuffer.Count; } readonly RingBuffer> _ringBuffer; readonly EntityStream _stream; internal readonly ExclusiveGroup _group; internal readonly bool _hasGroup; #if DEBUG && !PROFILER readonly string _name; #endif } }