From 6458d7e0db0f16de2c0fffd5bbd38bbf1d4ca0a5 Mon Sep 17 00:00:00 2001 From: Wojciech Figat Date: Fri, 2 Dec 2022 12:18:08 +0100 Subject: [PATCH] Add support for sending replicated object data in parts --- Source/Engine/Networking/NetworkInternal.h | 2 + Source/Engine/Networking/NetworkManager.cpp | 1 + Source/Engine/Networking/NetworkMessage.h | 6 +- .../Engine/Networking/NetworkReplicator.cpp | 242 ++++++++++++++---- 4 files changed, 197 insertions(+), 54 deletions(-) diff --git a/Source/Engine/Networking/NetworkInternal.h b/Source/Engine/Networking/NetworkInternal.h index d063a0ca3..dd01210a1 100644 --- a/Source/Engine/Networking/NetworkInternal.h +++ b/Source/Engine/Networking/NetworkInternal.h @@ -10,6 +10,7 @@ enum class NetworkMessageIDs : uint8 Handshake, HandshakeReply, ObjectReplicate, + ObjectReplicatePart, ObjectSpawn, ObjectDespawn, ObjectRole, @@ -27,6 +28,7 @@ public: static void NetworkReplicatorPreUpdate(); static void NetworkReplicatorUpdate(); static void OnNetworkMessageObjectReplicate(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); + static void OnNetworkMessageObjectReplicatePart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); static void OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); static void OnNetworkMessageObjectDespawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); static void OnNetworkMessageObjectRole(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); diff --git a/Source/Engine/Networking/NetworkManager.cpp b/Source/Engine/Networking/NetworkManager.cpp index 72634230a..ef5060e73 100644 --- a/Source/Engine/Networking/NetworkManager.cpp +++ b/Source/Engine/Networking/NetworkManager.cpp @@ -129,6 +129,7 @@ namespace OnNetworkMessageHandshake, OnNetworkMessageHandshakeReply, NetworkInternal::OnNetworkMessageObjectReplicate, + NetworkInternal::OnNetworkMessageObjectReplicatePart, NetworkInternal::OnNetworkMessageObjectSpawn, NetworkInternal::OnNetworkMessageObjectDespawn, NetworkInternal::OnNetworkMessageObjectRole, diff --git a/Source/Engine/Networking/NetworkMessage.h b/Source/Engine/Networking/NetworkMessage.h index 4d0bc4330..fa6119e89 100644 --- a/Source/Engine/Networking/NetworkMessage.h +++ b/Source/Engine/Networking/NetworkMessage.h @@ -69,7 +69,7 @@ public: /// The amount of bytes to write from the bytes pointer. FORCE_INLINE void WriteBytes(uint8* bytes, const int numBytes) { - ASSERT(Position + numBytes < BufferSize); + ASSERT(Position + numBytes <= BufferSize); Platform::MemoryCopy(Buffer + Position, bytes, numBytes); Position += numBytes; Length = Position; @@ -85,7 +85,7 @@ public: /// The minimal amount of bytes that the buffer contains. FORCE_INLINE void ReadBytes(uint8* bytes, const int32 numBytes) { - ASSERT(Position + numBytes < BufferSize); + ASSERT(Position + numBytes <= BufferSize); Platform::MemoryCopy(bytes, Buffer + Position, numBytes); Position += numBytes; } @@ -97,7 +97,7 @@ public: /// Pointer to skipped data beginning. FORCE_INLINE void* SkipBytes(const int32 numBytes) { - ASSERT(Position + numBytes < BufferSize); + ASSERT(Position + numBytes <= BufferSize); byte* result = Buffer + Position; Position += numBytes; return result; diff --git a/Source/Engine/Networking/NetworkReplicator.cpp b/Source/Engine/Networking/NetworkReplicator.cpp index ad1ffebe5..0b3202b60 100644 --- a/Source/Engine/Networking/NetworkReplicator.cpp +++ b/Source/Engine/Networking/NetworkReplicator.cpp @@ -12,7 +12,6 @@ #include "NetworkRpc.h" #include "INetworkSerializable.h" #include "INetworkObject.h" -#include "Engine/Core/Log.h" #include "Engine/Core/Collections/HashSet.h" #include "Engine/Core/Collections/Dictionary.h" #include "Engine/Core/Collections/ChunkedArray.h" @@ -34,6 +33,7 @@ #define NETWORK_REPLICATOR_DEBUG_LOG 0 #if NETWORK_REPLICATOR_DEBUG_LOG +#include "Engine/Core/Log.h" #define NETWORK_REPLICATOR_LOG(messageType, format, ...) LOG(messageType, format, ##__VA_ARGS__) #else #define NETWORK_REPLICATOR_LOG(messageType, format, ...) @@ -47,6 +47,18 @@ PACK_STRUCT(struct NetworkMessageObjectReplicate Guid ParentId; char ObjectTypeName[128]; // TODO: introduce networked-name to synchronize unique names as ushort (less data over network) uint16 DataSize; + uint16 PartsCount; + }); + +PACK_STRUCT(struct NetworkMessageObjectReplicatePart + { + NetworkMessageIDs ID = NetworkMessageIDs::ObjectReplicatePart; + uint32 OwnerFrame; + uint16 DataSize; + uint16 PartsCount; + uint16 PartStart; + uint16 PartSize; + Guid ObjectId; // TODO: introduce networked-ids to synchronize unique ids as ushort (less data over network) }); PACK_STRUCT(struct NetworkMessageObjectSpawn @@ -131,6 +143,15 @@ struct Serializer void* Tags[2]; }; +struct ReplicateItem +{ + ScriptingObjectReference Object; + Guid ObjectId; + uint16 PartsLeft; + uint32 OwnerFrame; + Array Data; +}; + struct SpawnItem { ScriptingObjectReference Object; @@ -164,6 +185,7 @@ namespace { CriticalSection ObjectsLock; HashSet Objects; + Array ReplicationParts; Array SpawnQueue; Array DespawnQueue; Array RpcQueue; @@ -462,6 +484,81 @@ void SetupObjectSpawnGroupItem(ScriptingObject* obj, ArrayItems.Add(&spawnItem); } +void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject* obj) +{ + // TODO: implement objects state replication frequency and dirtying +} + +template +ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& msgData, uint16 partStart, uint16 partSize) +{ + // Reuse or add part item + ReplicateItem* replicateItem = nullptr; + for (auto& e : ReplicationParts) + { + if (e.OwnerFrame == msgData.OwnerFrame && e.Data.Count() == msgData.DataSize && e.ObjectId == msgData.ObjectId) + { + // Reuse + replicateItem = &e; + break; + } + } + if (!replicateItem) + { + // Add + replicateItem = &ReplicationParts.AddOne(); + replicateItem->ObjectId = msgData.ObjectId; + replicateItem->PartsLeft = msgData.PartsCount; + replicateItem->OwnerFrame = msgData.OwnerFrame; + replicateItem->Data.Resize(msgData.DataSize); + } + + // Copy part data + ASSERT(replicateItem->PartsLeft > 0); + replicateItem->PartsLeft--; + ASSERT(partStart + partSize <= replicateItem->Data.Count()); + const void* partData = event.Message.SkipBytes(partSize); + Platform::MemoryCopy(replicateItem->Data.Get() + partStart, partData, partSize); + + return replicateItem; +} + +void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize) +{ + ScriptingObject* obj = item.Object.Get(); + if (!obj) + return; + + // Skip replication if we own the object (eg. late replication message after ownership change) + if (item.Role == NetworkObjectRole::OwnedAuthoritative) + return; + + // Drop object replication if it has old data (eg. newer message was already processed due to unordered channel usage) + if (item.LastOwnerFrame >= ownerFrame) + return; + item.LastOwnerFrame = ownerFrame; + + // Setup message reading stream + if (CachedReadStream == nullptr) + CachedReadStream = New(); + NetworkStream* stream = CachedReadStream; + stream->Initialize(data, dataSize); + + // Deserialize object + const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, false); + if (failed) + { + //NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot serialize object {} of type {} (missing serialization logic)", item.ToString(), obj->GetType().ToString()); + } + + if (item.AsNetworkObject) + item.AsNetworkObject->OnNetworkDeserialize(); + + // Speed up replication of client-owned objects to other clients from server to reduce lag (data has to go from client to server and then to other clients) + if (NetworkManager::IsServer()) + DirtyObjectImpl(item, obj); +} + #if !COMPILE_WITHOUT_CSHARP #include "Engine/Scripting/ManagedCLR/MUtils.h" @@ -757,11 +854,6 @@ void NetworkReplicator::SetObjectOwnership(ScriptingObject* obj, uint32 ownerCli } } -void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject* obj) -{ - // TODO: implement objects state replication frequency and dirtying -} - void NetworkReplicator::DirtyObject(ScriptingObject* obj) { ScopeLock lock(ObjectsLock); @@ -853,6 +945,7 @@ void NetworkInternal::NetworkReplicatorClear() Objects.Remove(it); } } + RpcQueue.Clear(); SpawnQueue.Clear(); DespawnQueue.Clear(); IdsRemappingTable.Clear(); @@ -1021,6 +1114,31 @@ void NetworkInternal::NetworkReplicatorUpdate() SpawnQueue.Clear(); } + // Apply parts replication + for (int32 i = ReplicationParts.Count() - 1; i >= 0; i--) + { + auto& e = ReplicationParts[i]; + if (e.PartsLeft > 0) + { + // TODO: remove replication items after some TTL to prevent memory leaks + continue; + } + ScriptingObject* obj = e.Object.Get(); + if (obj) + { + auto it = Objects.Find(obj->GetID()); + if (it != Objects.End()) + { + auto& item = it->Item; + + // Replicate from all collected parts data + InvokeObjectReplication(item, e.OwnerFrame, e.Data.Get(), e.Data.Count()); + } + } + + ReplicationParts.RemoveAt(i); + } + // Brute force synchronize all networked objects with clients // TODO: introduce NetworkReplicationHierarchy to optimize objects replication in large worlds (eg. batched culling networked scene objects that are too far from certain client to be relevant) // TODO: per-object sync interval (in frames) - could be scaled by hierarchy (eg. game could slow down sync rate for objects far from player) @@ -1066,10 +1184,27 @@ void NetworkInternal::NetworkReplicatorUpdate() } GetNetworkName(msgData.ObjectTypeName, obj->GetType().Fullname); msgData.DataSize = size; - // TODO: split object data (eg. more messages) if needed + const uint32 msgMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicate); + const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicatePart); + uint32 partsCount = 1; + uint32 dataStart = 0; + uint32 msgDataSize = size; + if (size > msgMaxData) + { + // Send msgMaxData within first message + msgDataSize = msgMaxData; + dataStart += msgMaxData; + + // Send rest of the data in separate parts + partsCount += Math::DivideAndRoundUp(size - dataStart, partMaxData); + } + else + dataStart += size; + ASSERT(partsCount <= MAX_uint8) + msgData.PartsCount = partsCount; NetworkMessage msg = peer->BeginSendMessage(); msg.WriteStructure(msgData); - msg.WriteBytes(stream->GetBuffer(), size); + msg.WriteBytes(stream->GetBuffer(), msgDataSize); if (isClient) peer->EndSendMessage(NetworkChannelType::Unreliable, msg); else @@ -1079,6 +1214,27 @@ void NetworkInternal::NetworkReplicatorUpdate() peer->EndSendMessage(NetworkChannelType::Unreliable, msg, CachedTargets); } + // Send all other parts + for (uint32 partIndex = 1; partIndex < partsCount; partIndex++) + { + NetworkMessageObjectReplicatePart msgDataPart; + msgDataPart.OwnerFrame = msgData.OwnerFrame; + msgDataPart.ObjectId = msgData.ObjectId; + msgDataPart.DataSize = msgData.DataSize; + msgDataPart.PartsCount = msgData.PartsCount; + msgDataPart.PartStart = dataStart; + msgDataPart.PartSize = Math::Min(size - dataStart, partMaxData); + msg = peer->BeginSendMessage(); + msg.WriteStructure(msgDataPart); + msg.WriteBytes(stream->GetBuffer() + msgDataPart.PartStart, msgDataPart.PartSize); + dataStart += msgDataPart.PartSize; + if (isClient) + peer->EndSendMessage(NetworkChannelType::Unreliable, msg); + else + peer->EndSendMessage(NetworkChannelType::Unreliable, msg, CachedTargets); + } + ASSERT_LOW_LAYER(dataStart == size); + // TODO: stats for bytes send per object type } } @@ -1136,52 +1292,39 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo if (DespawnedObjects.Contains(msgData.ObjectId)) return; // Skip replicating not-existing objects NetworkReplicatedObject* e = ResolveObject(msgData.ObjectId, msgData.ParentId, msgData.ObjectTypeName); - if (e) + if (!e) + return; + auto& item = *e; + + // Reject event from someone who is not an object owner + if (client && item.OwnerClientId != client->ClientId) + return; + + if (msgData.PartsCount == 1) { - auto& item = *e; - ScriptingObject* obj = item.Object.Get(); - if (!obj) - return; - - // Reject event from someone who is not an object owner - if (client && item.OwnerClientId != client->ClientId) - return; - - // Skip replication if we own the object (eg. late replication message after ownership change) - if (item.Role == NetworkObjectRole::OwnedAuthoritative) - return; - - // Drop object replication if it has old data (eg. newer message was already processed due to unordered channel usage) - if (item.LastOwnerFrame >= msgData.OwnerFrame) - return; - item.LastOwnerFrame = msgData.OwnerFrame; - - // Setup message reading stream - if (CachedReadStream == nullptr) - CachedReadStream = New(); - NetworkStream* stream = CachedReadStream; - stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.DataSize); - - // Deserialize object - const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, false); - if (failed) - { - //NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot serialize object {} of type {} (missing serialization logic)", item.ToString(), obj->GetType().ToString()); - } - - if (item.AsNetworkObject) - item.AsNetworkObject->OnNetworkDeserialize(); - - // Speed up replication of client-owned objects to other clients from server to reduce lag (data has to go from client to server and then to other clients) - if (NetworkManager::IsServer()) - DirtyObjectImpl(item, obj); + // Replicate + InvokeObjectReplication(item, msgData.OwnerFrame, event.Message.Buffer + event.Message.Position, msgData.DataSize); } else { - // TODO: put message to the queue to be resolved later (eg. object replication came before spawn packet) - use TTL to prevent memory overgrowing + // Add to replication from multiple parts + const uint16 msgMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicate); + ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData, 0, msgMaxData); + replicateItem->Object = e->Object; } } +void NetworkInternal::OnNetworkMessageObjectReplicatePart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer) +{ + NetworkMessageObjectReplicatePart msgData; + event.Message.ReadStructure(msgData); + ScopeLock lock(ObjectsLock); + if (DespawnedObjects.Contains(msgData.ObjectId)) + return; // Skip replicating not-existing objects + + AddObjectReplicateItem(event, msgData, msgData.PartStart, msgData.PartSize); +} + void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer) { NetworkMessageObjectSpawn msgData; @@ -1341,10 +1484,7 @@ void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkCl auto sceneObject = ScriptingObject::Cast(obj); if (sceneObject) { - if (parent && parent - -> - Object.Get() && parent->Object->Is() - ) + if (parent && parent->Object.Get() && parent->Object->Is()) sceneObject->SetParent(parent->Object.As()); else if (auto* parentActor = Scripting::TryFindObject(msgDataItem.ParentId)) sceneObject->SetParent(parentActor);