Mirror of Svelto.ECS because we're a fan of it
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
5.2KB

  1. using System;
  2. using Svelto.DataStructures;
  3. namespace Svelto.ECS
  4. {
  5. /// <summary>
  6. /// Do not use this class in place of a normal polling.
  7. /// I eventually realised than in ECS no form of communication other than polling entity components can exist.
  8. /// Using groups, you can have always an optimal set of entity components to poll, so EntityStreams must be used
  9. /// only if:
  10. /// - you want to polling engine to be able to track all the entity changes happening in between polls and not
  11. /// just the current state
  12. /// - you want a thread-safe way to read entity states, which includes all the state changes and not the last
  13. /// one only
  14. /// - you want to communicate between EnginesRoots
  15. /// </summary>
  16. class EntitiesStream : IDisposable
  17. {
  18. internal Consumer<T> GenerateConsumer<T>(string name, uint capacity) where T : unmanaged, IEntityStruct
  19. {
  20. if (_streams.ContainsKey(TypeRefWrapper<T>.wrapper) == false) _streams[TypeRefWrapper<T>.wrapper] = new EntityStream<T>();
  21. return (_streams[TypeRefWrapper<T>.wrapper] as EntityStream<T>).GenerateConsumer(name, capacity);
  22. }
  23. public Consumer<T> GenerateConsumer<T>(ExclusiveGroup group, string name, uint capacity)
  24. where T : unmanaged, IEntityStruct
  25. {
  26. if (_streams.ContainsKey(TypeRefWrapper<T>.wrapper) == false) _streams[TypeRefWrapper<T>.wrapper] = new EntityStream<T>();
  27. return (_streams[TypeRefWrapper<T>.wrapper] as EntityStream<T>).GenerateConsumer(group, name, capacity);
  28. }
  29. internal void PublishEntity<T>(ref T entity, EGID egid) where T : unmanaged, IEntityStruct
  30. {
  31. if (_streams.TryGetValue(TypeRefWrapper<T>.wrapper, out var typeSafeStream))
  32. (typeSafeStream as EntityStream<T>).PublishEntity(ref entity, egid);
  33. else
  34. Console.LogDebug("No Consumers are waiting for this entity to change ", typeof(T));
  35. }
  36. readonly ThreadSafeDictionary<RefWrapper<Type>, ITypeSafeStream> _streams =
  37. new ThreadSafeDictionary<RefWrapper<Type>, ITypeSafeStream>();
  38. public void Dispose()
  39. {
  40. _streams.Clear();
  41. }
  42. }
  43. interface ITypeSafeStream
  44. {}
  45. class EntityStream<T> : ITypeSafeStream where T : unmanaged, IEntityStruct
  46. {
  47. public void PublishEntity(ref T entity, EGID egid)
  48. {
  49. for (int i = 0; i < _consumers.Count; i++)
  50. {
  51. if (_consumers[i]._hasGroup)
  52. {
  53. if (egid.groupID == _consumers[i]._group)
  54. {
  55. _consumers[i].Enqueue(entity, egid);
  56. }
  57. }
  58. else
  59. {
  60. _consumers[i].Enqueue(entity, egid);
  61. }
  62. }
  63. }
  64. public Consumer<T> GenerateConsumer(string name, uint capacity)
  65. {
  66. var consumer = new Consumer<T>(name, capacity, this);
  67. _consumers.Add(consumer);
  68. return consumer;
  69. }
  70. public Consumer<T> GenerateConsumer(ExclusiveGroup group, string name, uint capacity)
  71. {
  72. var consumer = new Consumer<T>(group, name, capacity, this);
  73. _consumers.Add(consumer);
  74. return consumer;
  75. }
  76. public void RemoveConsumer(Consumer<T> consumer)
  77. {
  78. _consumers.UnorderedRemove(consumer);
  79. }
  80. readonly FasterListThreadSafe<Consumer<T>> _consumers = new FasterListThreadSafe<Consumer<T>>();
  81. }
  82. public struct Consumer<T> : IDisposable where T : unmanaged, IEntityStruct
  83. {
  84. internal Consumer(string name, uint capacity, EntityStream<T> stream):this()
  85. {
  86. #if DEBUG && !PROFILER
  87. _name = name;
  88. #endif
  89. _ringBuffer = new RingBuffer<ValueTuple<T, EGID>>((int) capacity,
  90. #if DEBUG && !PROFILER
  91. _name
  92. #else
  93. string.Empty
  94. #endif
  95. );
  96. _stream = stream;
  97. }
  98. internal Consumer(ExclusiveGroup group, string name, uint capacity, EntityStream<T> stream) : this(name,
  99. capacity, stream)
  100. {
  101. _group = group;
  102. _hasGroup = true;
  103. }
  104. internal void Enqueue(in T entity, in EGID egid)
  105. {
  106. _ringBuffer.Enqueue((entity, egid));
  107. }
  108. public bool TryDequeue(out T entity)
  109. {
  110. var tryDequeue = _ringBuffer.TryDequeue(out var values);
  111. entity = values.Item1;
  112. return tryDequeue;
  113. }
  114. public bool TryDequeue(out T entity, out EGID id)
  115. {
  116. var tryDequeue = _ringBuffer.TryDequeue(out var values);
  117. entity = values.Item1;
  118. id = values.Item2;
  119. return tryDequeue;
  120. }
  121. public void Flush() { _ringBuffer.Reset(); }
  122. public void Dispose() { _stream.RemoveConsumer(this); }
  123. public uint Count() { return (uint) _ringBuffer.Count; }
  124. readonly RingBuffer<ValueTuple<T, EGID>> _ringBuffer;
  125. readonly EntityStream<T> _stream;
  126. internal readonly ExclusiveGroup _group;
  127. internal readonly bool _hasGroup;
  128. #if DEBUG && !PROFILER
  129. readonly string _name;
  130. #endif
  131. }
  132. }