序言
游戏中有许多种管理对象的方式。而 ECS (Entity Component System) 能够脱颖而出,被广泛使用,其中经历了一定的发展过程。
最早也是最朴素的一种方式,是通过继承来表达GameObject之间的关系,传统的OOP设计思想。不同的GameObject通过继承来获得特定的功能,但这种方式显然存在问题,无法实现功能之间的任意组合,使用多重继承会导致代码混乱,难以维护;此外,也无法实现功能的动态添加和删除。
还有一种经典的方式是 GC (Gameobject Component) 模式,也是Unity早期使用的模式。软件开发中强调组合优于继承 ,故由GameObject作为统一的容器,通过插入各种组件Component来实现不同的功能。实践证明,用这种方式组织代码,清晰易读,解决了上一种方法的问题。(注意此处的Component与ECS中的Component不是一个概念,此处的Component包含了所有处理逻辑。)
但是随着游戏的体量越来越大,这种方法中暴露出性能上的缺陷。每个Component由所属的GameObject管理,在内存上是离散存储的,因此每次需要需要遍历某一类组件时(如对于渲染系统和物理系统),必须遍历所有GameObject,如果该GO有对应组件,再跳转到组件所在内存。内存分散会导致缓存命中率低,程序性能下降;同时,查找特定Component的效率也较低。
ECS 架构由 GC 模式改进而来。ECS本身运用了DOP(Data Oriented Programming,面向数据编程)的思想,其应用也不止于游戏开发。在这种架构中,Entity作为纯ID代表一个GameObject,Component作为纯数据,在内存中连续存储,而System不含数据,是处理特定种类Component的逻辑。通过将Entity的ID映射到不同Component,来表达GO持有不同Component这个概念。
Unity的DOTS技术栈以及后来的Entities,Unreal的Mass框架都采用了ECS架构。至于ECS具体是怎么实现的,关键在于Component数据的组织方式,目前常用的方式有3种:
基于大矩阵
最简单直接的方法,将所有Component存在一个“二维数组”矩阵中(如下图),实际上每一行是单独一个数组,存放特定类型的所有Component数据,列坐标则对应了Entity ID,为了加速查找,这种模式通常会为每个Entity维护一个位掩码(bitmask),表明该Entity具有的Component类型。然而这种模式的问题很多,一是浪费了很多空间,每个Entity不管有没有这类Component,都要开辟大小等同于所有Component的大小之和的空间,二是位掩码只支持有限种类的Component,在大型项目中,若有几百种Component,该方法就不适用了。
Github上的开源项目EntityX 采用了这种实现,该项目或许是第一个C++实现的ECS项目。
基于 SparseSet
SparseSet(稀疏集)可以理解为一种在特定条件下优于哈希表的数据结构,该模式将每种Component存储在一个SparseSet中,实现高效的遍历和查询。在后文中会详细介绍。
开源项目EnTT 采用了这种实现,本文也参考了该项目的所有者skypjack的博客内容 (有中文翻译版本 ),以及他的演讲 。
基于 Archetype
Archetype(原型)类似OOP中的一个类,在ECS框架中表示一系列Component类型的组合,Component数据存放在Archetype管理的内存块中,实现多组件查询的最优性能。后文也会详细介绍。
Unity和Unreal采用的ECS架构模式都是基于Archetype的。
以上三种方式中,第一种的缺点较明显(可以优化),而后两种在不同情况下各有利弊,本文详细介绍后两种的具体实现。
GAMES104中提到关于DOP的几个准则:
要处理的数据尽量紧邻
访问时尽量顺序存取
分配/释放内存时尽量整体分配/释放
在实现ECS系统的过程中,我们正是围绕这几个准则来提高性能的。
基于SparseSet的实现
SparseSet数据结构
SparseSet是一个可以在严格的O ( 1 ) O(1) O ( 1 ) 时间复杂度内完成查找,插入和删除的数据结构,并且所存储的实际数据在内存中紧密排列,但要求键值只能是非负整数。
经典的SparseSet由一个密集数组(dense array)和一个稀疏数组(sparse array)两部分构成。
如图,密集数组中存放着实际元素的ID,且紧密排列;稀疏数组中对应实际元素ID的下标位置处,存放该实际元素ID在密集数组中的下标索引。例如图中:
1 2 3 dense[0 ] = 2 , sparse[2 ] = 0 ; dense[1 ] = 7 , sparse[7 ] = 1 ; dense[2 ] = 0 , sparse[0 ] = 2 ;
由于数组索引只能是非负整数,因此SparseSet只适用于存储ID为非负整数的元素。实际应用中,可以让密集数组只存ID,用另一个数组存储实际元素的值(如ECS中Component的实际数据),并与密集数组一一对应,这样就相当于每个component数据都有了自己的ID。
密度数组可以方便地进行排序,因此SparseSet支持排序。
插入操作
要插入一个元素,将该元素置于密集数组末端,在Sparse数组中更新对应位置的索引即可。注意我们讨论的SparseSet不考虑重复ID的元素,支持重复元素的SparseSet实现略有不同,考虑到实际场景中EntityID不会重复,此处不再详细展开。
删除操作
要删除指定ID的元素,首先在稀疏数组中下标为ID的位置处,找到实际元素在密集数组中的下标,记为x,随后将该位置置空。在密集数组中,交换x下标的元素与末尾元素(这样是为了保持密集数组的紧密排列),删除末尾元素(交换后即为指定元素),并更新交换后x下标的元素在稀疏数组中的索引。以上图为例,删除ID为7的元素的操作为:
1 2 3 4 5 int x = sparse[7 ]; sparse[7 ] = null; sparse[dense.back ()] = x; swap (dense[x], dense.back ());dense.pop_back ();
“置空”所赋的null
值,可定义为元素ID类型的最大值,一般不设为0,因为0是密集数组中最常用的下标。
一些特殊情况不再赘述。
查找操作
一开始将稀疏数组中的元素都设为null
,每次查找只需要访问稀疏数组中ID对应的下标是否为null,即可判断有无某个元素。
遍历操作
要遍历SparseSet维护的所有元素,只需要遍历密集数组即可,由于内存上连续,SparseSet的遍历性能优秀。
SparseSet的改进──分页
传统的SparseSet有一个问题,就是稀疏数组需要开得很大,足够覆盖所有可能的元素ID,倘若元素ID排到了100000,那么稀疏数组就要占据超过100000个ID类型大小的内存,这还只是一个SparseSet,显然空间浪费很大。
一个解决办法是,将一个固定的稀疏数组拆分成多页,每一页存储固定数目个索引,在需要用到ID对应那页时再动态开辟内存。例如,要存储ID为100000-100200的元素,则稀疏数组中不用开辟前100000个元素ID对应的页,只需要用100000左右对应的那一页,就可以存储需要用到的ID了。
实际应用中,每页存储的数目可设为1024或4096等,匹配操作系统内存页大小。
分页的弊端是在访问稀疏数组时,会产生一些间接跳转,降低效率,不过这部分影响很小,可以暂不考虑。
上文提到的ECS的“大矩阵”模型也可以通过分页的方法来优化空间浪费的问题。
当然,分页并不是解决空间占用的唯一方法,事实上,在ECS系统中,往往有EntityID的回收机制,两种优化可以一并使用,来达到最优效果。
代码实现
该实现是已经结合了EntityID 回收机制的版本,后文有具体介绍。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 #include <limits> struct EntityID { uint32_t index : 24 = 0 ; uint32_t version : 8 = 0 ; bool operator ==(const EntityID &other) const { return index == other.index && version == other.version; } }; template <typename Component, size_t PageSize>class SparseSet final { public : Component &Add (EntityID id, Component &&component) { if (Contain (id)) { auto &index = Index (id); Version (id) = id.version; entities[index] = id; components[index] = std::forward<Component>(component); return components[index]; } else { entities.push_back (id); components.emplace_back (std::forward<Component>(component)); Index (id) = entities.size () - 1 ; Version (id) = id.version; return components.back (); } } Component &Get (EntityID id) { return components[Index (id)]; } void Remove (EntityID id) { if (!Contain (id)) return ; if (Version (id) != id.version) return ; auto &idx = Index (id); if (idx == entities.size () - 1 ) { entities.pop_back (); components.pop_back (); } else { auto last = entities.back (); Index (last) = idx; std::swap (entities[idx], entities.back ()); components[idx] = std::move (components.back ()); entities.pop_back (); components.pop_back (); } idx = null; Version (id) = 0 ; if (IsPageEmpty (id.index / PageSize)) { RemovePage (id.index / PageSize); } } bool Contain (EntityID id) const { size_t pageIndex = id.index / PageSize; size_t offset = id.index % PageSize; if (pageIndex >= sparsePages.size () || !sparsePages[pageIndex]) return false ; return sparsePages[pageIndex]->indices.at (offset) != null; } void Clear () { entities.clear (); components.clear (); sparsePages.clear (); } std::vector<EntityID> entities; std::vector<Component> components; struct Page { std::array<uint8_t , PageSize> versions; std::array<uint32_t , PageSize> indices; }; std::vector<std::unique_ptr<Page>> sparsePages; private : static constexpr uint32_t null = std::numeric_limits<uint32_t >::max (); uint32_t &Index (EntityID id) ; { size_t pageIndex = id.index / PageSize; size_t offset = id.index % PageSize; if (pageIndex >= sparsePages.size () || !sparsePages[pageIndex]) { AllocatePage (pageIndex); } return sparsePages[pageIndex]->indices.at (offset); } uint8_t &Version (EntityID id) { size_t pageIndex = id.index / PageSize; size_t offset = id.index % PageSize; if (pageIndex >= sparsePages.size () || !sparsePages[pageIndex]) { AllocatePage (pageIndex); } return sparsePages[pageIndex]->versions.at (offset); } void AllocatePage (size_t pageIndex) { if (pageIndex >= sparsePages.size ()) { sparsePages.resize (pageIndex + 1 ); } sparsePages[pageIndex] = std::make_unique <Page>(); auto &page = sparsePages[pageIndex]; page->indices.fill (null); page->versions.fill (0 ); } void RemovePage (size_t pageIndex) { sparsePages[pageIndex] = nullptr ; } bool IsPageEmpty (size_t pageIndex) const { if (pageIndex >= sparsePages.size () || !sparsePages[pageIndex]) { return true ; } auto &page = sparsePages[pageIndex]; return std::all_of (page->indices.begin (), page->indices.end (), [](uint32_t id) { return id == null; }); } };
唯一标识符
在实现完整的ECS系统之前,我们需要实现一个基础功能,如何给每个Component类型分配一个从零开始递增的ID?
我们可以通过C++的一点模板技巧来实现,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 class IndexGetter { public : template <typename T> static uint32_t Get () { static uint32_t id = curIdx++; return id; } private : inline static uint32_t curIdx = 0 ; };
基本思想很简单。维护一个共享计数器curIdx
,每次为新类型生成ID时,该计数器都自增。对于每个新的T
类型,维护一个id
固定为当前计数器的值,后续再访问IndexGetter::Get<T>
时,返回的就一直是这个值了。
EntityID回收机制
另外一个基础功能是为每个Entity分配从零开始递增的ID。这个功能可以简单实现,维护一个计数器,每次调用Get
时递增:
1 2 3 4 5 6 7 8 9 10 class IDGenerator final { public : static uint32_t Get () { return curIdx++; } private : inline static uint32_t curIdx = 0 ; };
问题在于EntityID只会不断递增,导致游戏运行时间长时,EntityID的数字很大,一旦用稀疏的数据结构存储(如SparseSet)时,就导致空间浪费。除了分页的方法之外,复用EntityID也是很重要的一个方法。
具体来说,当一个Entity被销毁之后,我们可以记录其ID,在后续创建新的Entity时复用这个ID。那么如何判断复用的这个ID的Entity与之前销毁的那个Entity是不同的呢?我们引入版本号,每次复用时,ID部分不变,版本号+1,这样就做出区分了。
为了不增大整体EntityID的占用大小(假设是uint32_t
),我们可以用C++的位域来方便地存储在同样大小的4字节内:
1 2 3 4 5 6 7 8 9 struct EntityID { uint32_t index : 24 ; uint32_t version : 8 ; bool operator ==(const EntityID &other) const { return index == other.index && version == other.version; } };
用一个uint32_t
的24位存真正的ID,8位存版本号,用位域的方法省去了位运算操作,更加直观。
之后我们创建EntityManager
类专门控制EntityID的创建与回收,维护一个全局的freeList来存储可以复用的ID。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class EntityManager final { public : static EntityID Create () { if (!freeList.empty ()) { EntityID id = freeList.back (); freeList.pop_back (); id.version++; return id; } return EntityID{curIdx++, 0 }; } static void Destroy (EntityID id) { freeList.push_back (id); } private : inline static std::vector<EntityID> freeList; inline static uint32_t curIdx = 0 ; };
实现细节
我们定义一个World
类用于存储ECS相关的所有数据,由于SparseSet存储的是一个特定类型的Component的数据,我们需要进行一层封装,命名为ComponentData
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 struct IComponentData { virtual ~IComponentData () = default ; virtual void Remove (EntityID entity) = 0 ; }; template <typename T>struct ComponentData final : IComponentData { virtual void Remove (EntityID entity) final override { sparseSet.Remove (entity); } SparseSet<T, 4096 > sparseSet; };
维护一个哈希表componentMap
,存储ComponentID
(即上文所说的唯一标识符)与具体的ComponentData
的映射;还要维护另一个哈希表entities
,存储EntityID
与ComponentID
的关联。
1 2 std::unordered_map<ComponentID, std::unique_ptr<IComponentData>> componentMap; std::unordered_map<EntityID, std::vector<ComponentID>> entities;
接下来实现核心功能,给定Entity添加一个组件AddComponent<T>
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 template <typename T>T &cWorld::AddComponent (EntityID entity, T &&component) { auto cid = IndexGetter::Get <T>(); entities[entity].push_back (cid); auto it = componentMap.find (cid); if (it == componentMap.end ()) { componentMap[cid] = std::make_unique<ComponentData<T>>(); } auto componentData = static_cast <ComponentData<T> *>(componentMap[cid].get ()); return componentData->sparseSet.Add (entity, std::forward<T>(component)); }
给定Entity删除一个组件RemoveComponent<T>
方法:
1 2 3 4 5 6 7 8 9 10 11 12 template <typename T>void World::RemoveComponent (EntityID entity) { auto cid = IndexGetter::Get <T>(); auto it = componentMap.find (cid); if (it != componentMap.end ()) { entities[entity].erase (std::find (entities[entity].begin (), entities[entity].end (), cid)); auto componentData = static_cast <ComponentData<T> *>(componentMap[cid].get ()); componentData->sparseSet.Remove (entity); } }
判断给定Entity是否具有某些类型的组件HasComponent<T...>
(支持同时传多个类型),用到了C++的模板参数包的功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 bool World::HasComponent (EntityID entity) { return HasComponentRecursive <Components...>(entity); } template <typename T, typename ... Remains>bool World::HasComponentRecursive (EntityID entity) { bool hasCurrentComponent = false ; auto cid = IndexGetter::Get <T>(); if (auto it = componentMap.find (cid); it != componentMap.end ()) { auto componentData = static_cast <ComponentData<T> *>(it->second.get ()); hasCurrentComponent = componentData->sparseSet.Contain (entity); } if (!hasCurrentComponent) return false ; if constexpr (sizeof ...(Remains) != 0 ) { return HasComponentRecursive <Remains...>(entity); } else { return true ; } }
由AddComponent
方法,进一步可以实现传入Component数据,直接生成Entity:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 template <typename ... Components>EntityID World::SpawnEntity (Components &&...components) { EntityID entity = EntityManager::Create (); SpawnEntityRecursive (entity, std::forward<Components>(components)...); return entity; } template <typename T, typename ... Remains>void World::SpawnEntityRecursive (EntityID entity, T &&component, Remains &&...remains) { AddComponent (entity, std::forward<T>(component)); if constexpr (sizeof ...(remains) != 0 ) { SpawnEntityRecursive (entity, std::forward<Remains>(remains)...); } }
删除给定的Entity(包括其关联的所有组件),这里就用到了ComponentData
中的虚函数,因为这里没有给定类型,无法通过指针转换来获取具体的Component类型,必须通过虚函数来访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 void World::DestroyEntity (EntityID entity) { if (auto it = entities.find (entity); it != entities.end () && it->first.version == entity.version) { for (auto cid : it->second) { componentMap[cid]->Remove (entity); } EntityManager::Destroy (it->first); entities.erase (it); } }
最后,也是最重要的是查询遍历 功能,获取具有给定种类的Component的所有Entity:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 template <typename ... Components>std::vector<EntityID> World::Query () { return QueryRecursive <Components...>(); } template <typename T, typename ... Remains>std::vector<EntityID> World::QueryRecursive () { std::vector<EntityID> outEntities; auto cid = IndexGetter::Get <T>(); auto it = componentMap.find (cid); if (it == componentMap.end ()) return outEntities; auto componentData = static_cast <ComponentData<T> *>(it->second.get ()); for (auto entity : componentData->sparseSet.entities) { if constexpr (sizeof ...(Remains) == 0 ) { outEntities.push_back (entity); } else { if (HasComponent <Remains...>(entity)) outEntities.push_back (entity); } } return outEntities; }
由于SparseSet单独存储每类Component,该方式实现的ECS系统查询单组件时效率达到最高,而一旦涉及到多组件查询,其效率就不及基于Archetype的ECS实现了。单组件查询时,所有数据紧密排列在密度数组中,System处理时将cache miss降到最低。而多组件查询实际是遍历第一个要求的Component对应的SparseSet,然后判断当前Entity有无剩下要求的组件,导致System在访问对应Component的数据时,只有第一个Component的数据是顺序的(虽然也会不连续),其余Component的数据其实是随机访问的,cache miss的概率较高。
优化的方法是,利用SparseSet可排序的性质,维护一个分组,将特定Component组合的entity在每个对应的SparseSet中都放在最前面,具体思路可参考EnTT 库的所有者的博客 。
使用方式
一个典型的ECS程序如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 struct RenderComponent { int someData; float someOtherData; }; struct PhysicsComponent { int someData; } int main () { World world; EntityID entity = world.SpawnEntity (RenderComponent{123 , 1.0f }); world.AddComponent (entity, PhysicsComponent{456 }); for (EntityID id : world.Query <RenderComponent, PhysicsComponent>()) { auto & renderComp = world.GetComponent <RenderComponent>(); } world.DestroyEntity (entity); return 0 ; }
如果觉得这样的接口显得怪异,可以仿照传统的GameObject Component模式设计接口,创建Entity
类,持有一个所在World
的引用,进行一层封装后,可以对Entity
类型进行组件的增删和查询。
注意头文件循环包含的问题,可以将之前的World
改为基类BasicWorld
,其上再做一层包装,SpawnEntity
时返回的是包装后的Entity
,就可完全隐藏Entity作为纯ID的实现。
至此,读者是否发现,ECS中的System,文章完全没有提及,事实上,System作为纯逻辑的部分,自然融入到程序中即可,正如示例程序中,遍历Query
得到的结果,对相应的组件数据进行处理,这就是System的功能。
基于Archetype的实现
基于Archetype实现ECS在难度上比SparseSet高不少,主要原因是对内存的管理更加硬核。Archetype需要管理多个Component的数据混杂的数据块,由于Component类型不确定,故简单用std::vector
存储不方便。我们采用分块(Chunk)存储,这也是现代ECS主流采取的方式。
Chunk
如这张Unity官方的图所示,每个Archetype管理若干个Chunk,这些Chunk中存放着相同类型组件的Entity的Component数据,在Chunk内部,数据的组织方式是 SoA (Structure of Arrays) 而非 AoS, SoA的数据访问效率往往比AoS高。
一般一个Chunk的大小固定为16KB或其倍数,因为大部分CPU的L1都是16KB的倍数,随着技术发展,这一数字可能会变大。
问题来了,如何在有限的一块内存中,排布好几个不同Component类型的数组呢?这涉及到对类型大小的计算和偏移量的计算。
内存对齐
内存对齐是C++的一种内存布局优化机制,确保数据在内存中的起始地址是其自身大小的整数倍。一方面,这是由于大多数CPU访问对齐的数据更快,另一方面,ARM架构等一些架构无法读取非对齐数据。
内存对齐的规则:
基本数据类型的对齐等于其大小
结构体的对齐等于成员中的最大对齐
例如:
1 2 3 4 5 6 7 struct data { char a; int b; };
可以使用alignof(T)
来获取T
类型的对齐要求。
Chunk的内存布局
Chunk中的数据分为EntityID的数组,以及各个Component的数组。首先,要计算出一个Chunk能放下多少个该类Entity的数据。假设已经有了ComponentLayout
,记录该Archetype每个组件的大小和对齐要求,我们现在要求出数组的偏移量。
1 2 3 4 5 6 7 struct ComponentLayout { size_t offset; size_t alignment; size_t size; }; std::unordered_map<ComponentID, ComponentLayout> componentLayouts;
我们通过以下方式来计算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 size_t AlignUp (size_t offset, size_t alignment) { return (offset + alignment - 1 ) & ~(alignment - 1 ); } void CalculateLayout () { constexpr size_t kChunkSize = Chunk::kSize; size_t perEntitySize = sizeof (EntityID); for (const auto &[_, layout] : componentLayouts) { perEntitySize += layout.size; } size_t estimatedCapacity = std::max (1ul , kChunkSize / perEntitySize); size_t finalCapacity = 0 ; for (int i = 0 ; i < 5 ; i++) { size_t currentOffset = 0 ; size_t requiredSpace = 0 ; size_t entityArraySize = sizeof (EntityID) * estimatedCapacity; requiredSpace = currentOffset + entityArraySize; currentOffset = requiredSpace; for (const auto &[_, layout] : componentLayouts) { currentOffset = AlignUp (currentOffset, layout.alignment); size_t compArraySize = layout.size * estimatedCapacity; requiredSpace = currentOffset + compArraySize; currentOffset = requiredSpace; } if (requiredSpace <= kChunkSize) { finalCapacity = estimatedCapacity; estimatedCapacity++; } else { break ; } } chunkCapacity = std::max (1ul , finalCapacity); size_t currentOffset = 0 ; currentOffset += sizeof (EntityID) * chunkCapacity; for (auto &[_, layout] : componentLayouts) { currentOffset = AlignUp (currentOffset, layout.alignment); layout.offset = currentOffset; currentOffset += layout.size * chunkCapacity; } }
这部分计算可以在创建Archetype时进行,并且在Archetype中管理计算结果,该Archetype下的每个Chunk都可以共享。
之后每次需要访问Chunk内数组,就获取对应的offset
,用reinterpret_cast
强转buffer + offset
来获取。指定index
为要访问的元素下标,则可以用加上index * size
的方法来跳转。
Chunk的初步实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 class Chunk { public : static constexpr size_t kSize = 16384 ; explicit Chunk (Archetype *archetype) : archetype(archetype), entityCount(0 ), capacity(archetype->chunkCapacity) { buffer = static_cast <uint8_t *>(operator new (kSize, std::align_val_t {64 })); if (!buffer) throw std::bad_alloc (); } ~Chunk () { if (buffer) { operator delete (buffer, std::align_val_t {64 }) ; } } Chunk (const Chunk &) = delete ; Chunk &operator =(const Chunk &) = delete ; EntityID *GetEntityArray () const { return reinterpret_cast <EntityID *>(buffer); } template <typename T> T *GetComponentArray () const { auto cid = IndexGetter::Get <T>(); return reinterpret_cast <T *>(buffer + archetype->componentLayouts[cid].offset); } uint8_t *GetComponentArray (ComponentID cid) const { if (!archetype->componentLayouts.contains (cid)) { throw std::runtime_error ("Archetype does not contain component" ); } return buffer + archetype->componentLayouts[cid].offset; } bool HasSpace () const { return entityCount < capacity; } template <typename ... Components> size_t AddEntity (EntityID entity, Components &&...components) { if (!HasSpace ()) { throw std::runtime_error ("Chunk is full" ); } const size_t index = entityCount++; EntityID *entities = GetEntityArray (); entities[index] = entity; CopyComponentDataRecursive (index, std::forward<Components>(components)...); return index; } template <typename T, typename ... Remains> void Chunk::CopyComponentDataRecursive (size_t index, T &&component, Remains &&...remains) { auto cid = IndexGetter::Get <T>(); uint8_t *compArray = buffer + archetype->componentLayouts[cid].offset; size_t compSize = archetype->componentLayouts[cid].size; T *dest = reinterpret_cast <T *>(compArray + index * compSize); *dest = std::forward<T>(component); if constexpr (sizeof ...(Remains) != 0 ) CopyComponentDataRecursive (index, std::forward<Remains>(remains)...) ; } size_t GetEntityCount () const { return entityCount; } private : uint8_t *buffer = nullptr ; Archetype *archetype = nullptr ; size_t entityCount; size_t capacity; };
Archetype
正如Component有属于自己的唯一ID,如何给Archetype生成一个专属ID呢,我采用的方法是给所有的ComponentID算一个哈希值,作为唯一标识符。
1 2 3 4 5 6 7 8 9 constexpr ArchetypeID HashComponentTypes (std::vector<ComponentID> &componentTypes) { ArchetypeID hash = 0 ; for (ComponentID id : componentTypes) { hash = (hash ^ id) * 16777619 ; } return hash; }
然而,光有哈希值仍不行,Archetype之间不仅要判断相等,还要判断包含关系(这时位掩码的优势体现出来了),因此必须再维护一个ComponentID的列表,用于后续查询。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class Archetype { public : Archetype (size_t hash, std::unordered_map<ComponentID, ComponentLayout> &layouts) : hash (hash), componentLayouts (layouts) { CalculateLayout (); for (auto &[type, _] : layouts) { componentTypes.push_back (type); } } Archetype () = default ; template <typename ... Components> std::tuple<Chunk *, size_t > AddEntity (EntityID entity, Components &&...components) { Chunk *targetChunk = nullptr ; for (auto &chunk : chunks) { if (chunk->HasSpace ()) { targetChunk = chunk.get (); break ; } } if (!targetChunk) { chunks.push_back (std::make_unique <Chunk>(this )); targetChunk = chunks.back ().get (); } auto index = targetChunk->AddEntity (entity, std::forward<Components>(components)...); return {targetChunk, index}; } std::vector<EntityID> GetEntities () const { std::vector<EntityID> outEntities; for (auto &chunk : chunks) { EntityID *entityArray = chunk->GetEntityArray (); for (int i = 0 ; i < chunk->GetEntityCount (); i++) { outEntities.push_back (entityArray[i]); } } return outEntities; } static constexpr size_t CHUNK_SIZE = 16 * 1024 ; std::vector<std::unique_ptr<Chunk>> chunks; ArchetypeID hash; std::vector<ComponentID> componentTypes; std::unordered_map<ComponentID, ComponentLayout> componentLayouts; size_t chunkCapacity = 0 ; };
实现ECS功能
基于SparseSet的实现中,我们维护了EntityID到其所有的ComponentID的映射,又得益于SparseSetO ( 1 ) O(1) O ( 1 ) 的查找,可以迅速访问对应数据。而在基于Archetype的实现中,我们要实现相同的功能,需要维护EntityID到一个三元组<Archetype *, Chunk *, index>
的映射,表示该Entity的相关组件数据存放在哪个Archetype下的哪个Chunk的第几号位置。
1 2 std::unordered_map<EntityID, std::tuple<Archetype *, Chunk *, size_t >> entities; std::unordered_map<ArchetypeID, Archetype> archetypes;
当然,World
中还要存放所有已有的Archetype。
仿照第一种实现,我们需要实现同样的功能,然而各功能的复杂度截然不同。
由易到难,GetComponent
和HasComponent
较简单,通过记录好的全局信息查询即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 template <typename T>T &GetComponent (EntityID entity) { auto [archetype, chunk, index] = entities[entity]; return chunk->GetComponentArray <T>()[index]; } template <typename T>bool HasComponent (EntityID entity) { if (auto it = entities.find (entity); it == entities.end ()) return false ; auto [archetype, chunk, index] = entities[entity]; auto cid = IndexGetter<Component>::Get <T>(); return archetype->componentLayouts.contains (cid); }
再实现SpawnEntity
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 EntityID SpawnEntity (Components &&...components) { EntityID entity = EntityManager::Create (); std::vector<ComponentID> componentTypes = GetComponentTypes <Components...>(); size_t hash = HashComponentTypes (componentTypes); if (auto it = archetypes.find (hash); it == archetypes.end ()) { std::unordered_map<ComponentID, ComponentLayout> componentLayouts = CalculateLayout <Components...>(); CreateArchetype (hash, componentLayouts); } auto [chunk, index] = archetypes[hash].AddEntity (entity, std::forward<Components>(components)...); entities[entity] = {&archetypes[hash], chunk, index}; return entity; } void CreateArchetype (size_t hash, std::unordered_map<ComponentID, ComponentLayout> componentLayouts) { archetypes.try_emplace (hash, hash, componentLayouts); }
接着是销毁Entity,Chunk内的删除需要swap然后删除数组末尾元素(同样是为了紧密排布),因此Chunk删除元素后需要告知它移动了哪个元素的位置,以便World
中的映射进行相应更新。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 void DestroyEntity (EntityID entity) { if (auto it = entities.find (entity); it != entities.end () && it->first.version == entity.version) { auto [archetype, chunk, index] = entities[entity]; auto changedEntity = chunk->RemoveEntity (index); if (changedEntity.index != 0xffffff ) { entities[changedEntity] = {archetype, chunk, index}; } entities.erase (entity); EntityManager::Destroy (entity); } } EntityID Chunk::RemoveEntity (size_t index) { const size_t lastIndex = --entityCount; EntityID *entityArray = GetEntityArray (); if (index == lastIndex || lastIndex == 0 ) { return EntityID{0xffffff , 0xff }; } entityArray[index] = entityArray[lastIndex]; for (auto type : archetype->componentTypes) { uint8_t *compData = buffer + archetype->componentLayouts[type].offset; size_t compSize = archetype->componentLayouts[type].size; void *src = compData + lastIndex * compSize; void *dest = compData + index * compSize; memcpy (dest, src, compSize); } return entityArray[lastIndex]; }
最困难的是增加和删除Component操作,因为Chunk中只能存一类Entity的数据,增删Component后Entity所属的Archetype直接变了,必须将相应组件全部搬到另一个Archetype的Chunk中。
这也表明Archetype的ECS系统不适合频繁变动组件的情形,它更偏向于引擎的底层架构,如对于基本的渲染或者物理这些几乎不需要增删的Component,使用起来效果很好。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 template <typename T>void AddComponent (EntityID entity, T &&component) { auto cid = IndexGetter<Component>::Get <T>(); auto [archetype, chunk, index] = entities[entity]; auto newComponentTypes = archetype->componentTypes; newComponentTypes.push_back (cid); size_t hash = HashComponentTypes (newComponentTypes); if (auto it = archetypes.find (hash); it == archetypes.end ()) { auto newComponentLayouts = archetype->componentLayouts; newComponentLayouts[cid] = {0 , alignof (T), sizeof (T)}; CreateArchetype (hash, newComponentLayouts); } Archetype *destArchetype = &archetypes[hash]; auto [destChunk, destIndex] = destArchetype->AllocateEntity (entity); CopyComponents (archetype, chunk, index, destArchetype, destChunk, destIndex); destChunk->CopyComponentData (destIndex, std::forward<T>(component)); chunk->RemoveEntity (index); entities[entity] = {destArchetype, destChunk, destIndex}; } void CopyComponents (Archetype *src, Chunk *srcChunk, size_t srcIndex, Archetype *dest, Chunk *destChunk, size_t destIndex) { for (auto &[type, _] : src->componentLayouts) { if (dest->componentLayouts.contains (type)) { size_t compSize = src->componentLayouts[type].size; uint8_t *srcData = srcChunk->GetComponentArray (type) + compSize * srcIndex; uint8_t *destData = destChunk->GetComponentArray (type) + compSize * destIndex; memcpy (destData, srcData, compSize); } } }
删除Component的思路类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 template <typename T>void RemoveComponent (EntityID entity) { auto cid = IndexGetter<Component>::Get <T>(); auto [archetype, chunk, index] = entities[entity]; auto newComponentTypes = archetype->componentTypes; std::erase (newComponentTypes, cid); size_t hash = HashComponentTypes (newComponentTypes); if (auto it = archetypes.find (hash); it == archetypes.end ()) { auto newComponentLayouts = archetype->componentLayouts; newComponentLayouts.erase (cid); CreateArchetype (hash, newComponentLayouts); } Archetype *destArchetype = &archetypes[hash]; auto [destChunk, destIndex] = destArchetype->AllocateEntity (entity); CopyComponents (archetype, chunk, index, destArchetype, destChunk, destIndex); chunk->RemoveEntity (index); entities[entity] = {destArchetype, destChunk, destIndex}; }
最后是核心的查询遍历功能,也是Archetype优势的最大体现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 template <typename ... Components>std::vector<EntityID> Query () { std::vector<EntityID> outEntities; auto componentTypes = GetComponentTypes <Components...>(); for (auto &[_, archetype] : archetypes) { bool match = true ; for (auto componentType : componentTypes) { if (!archetype.componentLayouts.contains (componentType)) { match = false ; break ; } } if (match) { auto archetypeEntities = archetype.GetEntities (); outEntities.insert (outEntities.end (), archetypeEntities.begin (), archetypeEntities.end ()); } } return outEntities; }
可以看到性能瓶颈在于需要遍历所有已有Archetypes并筛选符合要求的,可以通过位掩码来优化,此处不再展开。
但是无论如何,Archetype查找后的遍历性能达到最高,在同一Archetype内连续访问,cache命中率高。
基准测试
通过以下的程序进行Benchmark,每组测试1000次取平均。其中第一组采用离散的内存分配(new/delete),模拟传统的GameObject-Component模型,第二、第三组分别采用基于SparseSet和基于Archetype的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 struct Physics { float force; float x, y, z; }; struct Shader { int vs, fs; }; struct Transform { float x, y, z; }; World world; for (int i = 0 ; i < SIZE; i++){ auto entity = world.SpawnEntity <Transform>(Transform{1.0f * i, 2.0f * i, 3.0f * i}); if (i % 2 ) { entity.AddComponent <Shader>(Shader{i, 2 * i}); } if (i % 3 ) { entity.AddComponent <Physics>(Physics{0.5f * i, 1.0f * i, 2.0f * i, 3.0f * i}); } if (i % 6 ) { entity.Destroy (); } } for (auto e : world.Query <Shader, Physics>()){ e.GetComponent <Shader>().vs++; e.GetComponent <Physics>().force += 1 ; }
结果如下表:
SIZE
无优化
SparseSet实现
Archetype实现
1000
8830 μs
1515 μs
1582 μs
10000
47108 μs
14689 μs
16730 μs
100000
2930805 μs
146138 μs
164415 μs
共享数据
在核心的ECS之外,有时我们需要存储一些特殊的Component,这些Component全局只有一份数据,可能会被多个System使用。这种共享数据我们定义为Resource
,由于只有一份,不用考虑内存布局的问题了。我们直接使用一个哈希表维护所有的Resource
。
与正常的Component做出区别,Resource有自己的一套ID,我们可以复用IndexGetter
来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 std::unordered_map<ResourceID, ResourceInfo> resources; struct ResourceInfo { void *resource; std::function<void *(void )> create; std::function<void (void *)> destroy; ResourceInfo () = default ; ResourceInfo (std::function<void *(void )> create, const std::function<void (void *)> destroy) : create (create), destroy (destroy) { if (!create) assert (false ); if (!destroy) assert (false ); } ~ResourceInfo () { destroy (resource); } };
在World
类中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 template <typename T>T &World::SetResource (T &&resource) { auto index = IndexGetter<Resource>::Get <T>(); if (auto it = resources.find (index); it != resources.end ()) { auto &resourceInfo = it->second; *static_cast <T *>(resources[index].resource) = std::forward<T>(resource); } else { auto defaultCreate = []() -> void * { return new T; }; auto defaultDestroy = [](void *elem) { delete static_cast <T *>(elem); }; resources.try_emplace (index, defaultCreate, defaultDestroy); resources[index].resource = resources[index].create (); *static_cast <T *>(resources[index].resource) = std::forward<T>(resource); } return *static_cast <T *>(resources[index].resource); } template <typename T>void World::RemoveResource () { auto index = IndexGetter<Resource>::Get <T>(); if (auto it = resources.find (index); it != resources.end ()) { it->second.destroy (); it->second.resource = nullptr ; } } template <typename T>bool World::HasResource () const { auto index = IndexGetter<Resource>::Get <T>(); auto it = resources.find (index); return it != resources.end () && it->second.resource; } template <typename T>T &World::GetResource () const { auto index = IndexGetter<Resource>::Get <T>(); return *static_cast <T *>(resources.at (index).resource); }
参考资料: