From 114828adcbbce683498bcdfdc9d7528151fb5667 Mon Sep 17 00:00:00 2001 From: Wojtek Figat Date: Thu, 30 Oct 2025 17:12:49 +0100 Subject: [PATCH] Refactor NetworkReplicator update into separate function for cleaner code --- .../Engine/Networking/NetworkReplicator.cpp | 437 +++++++++--------- 1 file changed, 226 insertions(+), 211 deletions(-) diff --git a/Source/Engine/Networking/NetworkReplicator.cpp b/Source/Engine/Networking/NetworkReplicator.cpp index fba916891..9fdaf7a97 100644 --- a/Source/Engine/Networking/NetworkReplicator.cpp +++ b/Source/Engine/Networking/NetworkReplicator.cpp @@ -589,7 +589,7 @@ void SendObjectRoleMessage(const NetworkReplicatedObject& item, const NetworkCli msg.WriteNetworkId(objectId); if (NetworkManager::IsClient()) { - NetworkManager::Peer->EndSendMessage(NetworkChannelType::ReliableOrdered, msg); + peer->EndSendMessage(NetworkChannelType::ReliableOrdered, msg); } else { @@ -598,6 +598,227 @@ void SendObjectRoleMessage(const NetworkReplicatedObject& item, const NetworkCli } } +void SendDespawn(DespawnItem& e) +{ + NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Despawn object ID={}", e.Id.ToString()); + NetworkMessageObjectDespawn msgData; + Guid objectId = e.Id; + { + // Remap local client object ids into server ids + IdsRemappingTable.KeyOf(objectId, &objectId); + } + auto peer = NetworkManager::Peer; + NetworkMessage msg = peer->BeginSendMessage(); + msg.WriteStructure(msgData); + msg.WriteNetworkId(objectId); + BuildCachedTargets(NetworkManager::Clients, e.Targets); + if (NetworkManager::IsClient()) + peer->EndSendMessage(NetworkChannelType::ReliableOrdered, msg); + else + peer->EndSendMessage(NetworkChannelType::ReliableOrdered, msg, CachedTargets); +} + +void SendReplication(ScriptingObject* obj, NetworkClientsMask targetClients) +{ + auto it = Objects.Find(obj->GetID()); + if (it.IsEnd()) + return; + auto& item = it->Item; + const bool isClient = NetworkManager::IsClient(); + + // Skip serialization of objects that none will receive + if (!isClient) + { + BuildCachedTargets(item, targetClients); + if (CachedTargets.Count() == 0) + return; + } + + if (item.AsNetworkObject) + item.AsNetworkObject->OnNetworkSerialize(); + + // Serialize object + NetworkStream* stream = CachedWriteStream; + stream->Initialize(); + const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, true); + if (failed) + { + //NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot serialize object {} of type {} (missing serialization logic)", item.ToString(), obj->GetType().ToString()); + return; + } + const uint32 size = stream->GetPosition(); + if (size > MAX_uint16) + { + LOG(Error, "Too much data for object {} replication ({} bytes provided while limit is {}).", item.ToString(), size, MAX_uint16); + return; + } + +#if USE_NETWORK_REPLICATOR_CACHE + // Process replication cache to skip sending object data if it didn't change + if (item.RepCache.Data.Length() == size && + item.RepCache.Mask == targetClients && + Platform::MemoryCompare(item.RepCache.Data.Get(), stream->GetBuffer(), size) == 0) + { + return; + } + item.RepCache.Mask = targetClients; + item.RepCache.Data.Copy(stream->GetBuffer(), size); +#endif + // TODO: use Unreliable for dynamic objects that are replicated every frame? (eg. player state) + constexpr NetworkChannelType repChannel = NetworkChannelType::Reliable; + + // Send object to clients + NetworkMessageObjectReplicate msgData; + msgData.OwnerFrame = NetworkManager::Frame; + Guid objectId = item.ObjectId, parentId = item.ParentId; + { + // Remap local client object ids into server ids + IdsRemappingTable.KeyOf(objectId, &objectId); + IdsRemappingTable.KeyOf(parentId, &parentId); + } + NetworkPeer* peer = NetworkManager::Peer; + NetworkMessage msg = peer->BeginSendMessage(); + msg.WriteStructure(msgData); + msg.WriteNetworkId(objectId); + msg.WriteNetworkId(parentId); + msg.WriteNetworkName(obj->GetType().Fullname); + NetworkMessageObjectReplicatePayload msgDataPayload; + msgDataPayload.DataSize = size; + const uint32 networkKeyIdWorstCaseSize = sizeof(uint32) + sizeof(Guid); + const uint32 msgMaxData = peer->Config.MessageSize - msg.Position - sizeof(NetworkMessageObjectReplicatePayload); + const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicatePart) - networkKeyIdWorstCaseSize; + uint32 partsCount = 1; + uint32 dataStart = 0; + uint32 msgDataSize = size; + if (size > msgMaxData) + { + // Send msgMaxData within first message + msgDataSize = msgMaxData; + dataStart += msgMaxData; + + // Send rest of the data in separate parts + partsCount += Math::DivideAndRoundUp(size - dataStart, partMaxData); + } + else + dataStart += size; + ASSERT(partsCount <= MAX_uint8); + msgDataPayload.PartsCount = partsCount; + msgDataPayload.PartSize = msgDataSize; + msg.WriteStructure(msgDataPayload); + msg.WriteBytes(stream->GetBuffer(), msgDataSize); + uint32 dataSize = msgDataSize, messageSize = msg.Length; + if (isClient) + peer->EndSendMessage(repChannel, msg); + else + peer->EndSendMessage(repChannel, msg, CachedTargets); + + // Send all other parts + for (uint32 partIndex = 1; partIndex < partsCount; partIndex++) + { + NetworkMessageObjectReplicatePart msgDataPart; + msgDataPart.OwnerFrame = msgData.OwnerFrame; + msgDataPart.DataSize = msgDataPayload.DataSize; + msgDataPart.PartsCount = msgDataPayload.PartsCount; + msgDataPart.PartStart = dataStart; + msgDataPart.PartSize = Math::Min(size - dataStart, partMaxData); + msg = peer->BeginSendMessage(); + msg.WriteStructure(msgDataPart); + msg.WriteNetworkId(objectId); + msg.WriteBytes(stream->GetBuffer() + msgDataPart.PartStart, msgDataPart.PartSize); + messageSize += msg.Length; + dataSize += msgDataPart.PartSize; + dataStart += msgDataPart.PartSize; + if (isClient) + peer->EndSendMessage(repChannel, msg); + else + peer->EndSendMessage(repChannel, msg, CachedTargets); + } + ASSERT_LOW_LAYER(dataStart == size); + +#if COMPILE_WITH_PROFILER + // Network stats recording + if (NetworkInternal::EnableProfiling) + { + const Pair name(obj->GetTypeHandle(), StringAnsiView::Empty); + auto& profileEvent = NetworkInternal::ProfilerEvents[name]; + profileEvent.Count++; + profileEvent.DataSize += dataSize; + profileEvent.MessageSize += messageSize; + profileEvent.Receivers += isClient ? 1 : CachedTargets.Count(); + } +#endif +} + +void SendRpc(RpcItem& e) +{ + ScriptingObject* obj = e.Object.Get(); + if (!obj) + return; + auto it = Objects.Find(obj->GetID()); + if (it == Objects.End()) + { +#if !BUILD_RELEASE + if (!DespawnedObjects.Contains(obj->GetID())) + LOG(Error, "Cannot invoke RPC method '{0}.{1}' on object '{2}' that is not registered in networking (use 'NetworkReplicator.AddObject').", e.Name.First.ToString(), String(e.Name.Second), obj->GetID()); +#endif + return; + } + auto& item = it->Item; + const NetworkManagerMode mode = NetworkManager::Mode; + NetworkPeer* peer = NetworkManager::Peer; + + // Send RPC message + //NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Rpc {}::{} object ID={}", e.Name.First.ToString(), String(e.Name.Second), item.ToString()); + NetworkMessageObjectRpc msgData; + Guid msgObjectId = item.ObjectId; + Guid msgParentId = item.ParentId; + { + // Remap local client object ids into server ids + IdsRemappingTable.KeyOf(msgObjectId, &msgObjectId); + IdsRemappingTable.KeyOf(msgParentId, &msgParentId); + } + msgData.ArgsSize = (uint16)e.ArgsData.Length(); + NetworkMessage msg = peer->BeginSendMessage(); + msg.WriteStructure(msgData); + msg.WriteNetworkId(msgObjectId); + msg.WriteNetworkId(msgParentId); + msg.WriteNetworkName(obj->GetType().Fullname); + msg.WriteNetworkName(e.Name.First.GetType().Fullname); + msg.WriteNetworkName(e.Name.Second); + msg.WriteBytes(e.ArgsData.Get(), e.ArgsData.Length()); + uint32 dataSize = e.ArgsData.Length(), messageSize = msg.Length, receivers = 0; + NetworkChannelType channel = (NetworkChannelType)e.Info.Channel; + if (e.Info.Server && mode == NetworkManagerMode::Client) + { + // Client -> Server +#if USE_NETWORK_REPLICATOR_LOG + if (e.Targets.Length() != 0) + NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Server RPC '{}::{}' called with non-empty list of targets is not supported (only server will receive it)", e.Name.First.ToString(), e.Name.Second.ToString()); +#endif + peer->EndSendMessage(channel, msg); + receivers = 1; + } + else if (e.Info.Client && (mode == NetworkManagerMode::Server || mode == NetworkManagerMode::Host)) + { + // Server -> Client(s) + BuildCachedTargets(NetworkManager::Clients, item.TargetClientIds, e.Targets, NetworkManager::LocalClientId); + peer->EndSendMessage(channel, msg, CachedTargets); + receivers = CachedTargets.Count(); + } + +#if COMPILE_WITH_PROFILER + // Network stats recording + if (NetworkInternal::EnableProfiling && receivers) + { + auto& profileEvent = NetworkInternal::ProfilerEvents[e.Name]; + profileEvent.Count++; + profileEvent.DataSize += dataSize; + profileEvent.MessageSize += messageSize; + profileEvent.Receivers += receivers; + } +#endif +} + void DeleteNetworkObject(ScriptingObject* obj) { // Remove from the mapping table @@ -1652,9 +1873,6 @@ void NetworkInternal::NetworkReplicatorUpdate() if (Objects.Count() == 0) return; const bool isClient = NetworkManager::IsClient(); - const bool isServer = NetworkManager::IsServer(); - const bool isHost = NetworkManager::IsHost(); - NetworkPeer* peer = NetworkManager::Peer; if (!isClient && NewClients.Count() != 0) { @@ -1694,22 +1912,7 @@ void NetworkInternal::NetworkReplicatorUpdate() PROFILE_CPU_NAMED("DespawnQueue"); for (DespawnItem& e : DespawnQueue) { - // Send despawn message - NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Despawn object ID={}", e.Id.ToString()); - NetworkMessageObjectDespawn msgData; - Guid objectId = e.Id; - { - // Remap local client object ids into server ids - IdsRemappingTable.KeyOf(objectId, &objectId); - } - NetworkMessage msg = peer->BeginSendMessage(); - msg.WriteStructure(msgData); - msg.WriteNetworkId(objectId); - BuildCachedTargets(NetworkManager::Clients, e.Targets); - if (isClient) - peer->EndSendMessage(NetworkChannelType::ReliableOrdered, msg); - else - peer->EndSendMessage(NetworkChannelType::ReliableOrdered, msg, CachedTargets); + SendDespawn(e); } DespawnQueue.Clear(); } @@ -1871,136 +2074,11 @@ void NetworkInternal::NetworkReplicatorUpdate() PROFILE_CPU_NAMED("Replication"); if (CachedWriteStream == nullptr) CachedWriteStream = New(); - NetworkStream* stream = CachedWriteStream; - stream->SenderId = NetworkManager::LocalClientId; + CachedWriteStream->SenderId = NetworkManager::LocalClientId; // TODO: use Job System when replicated objects count is large for (auto& e : CachedReplicationResult->_entries) { - ScriptingObject* obj = e.Object; - auto it = Objects.Find(obj->GetID()); - if (it.IsEnd()) - continue; - auto& item = it->Item; - - // Skip serialization of objects that none will receive - if (!isClient) - { - BuildCachedTargets(item, e.TargetClients); - if (CachedTargets.Count() == 0) - continue; - } - - if (item.AsNetworkObject) - item.AsNetworkObject->OnNetworkSerialize(); - - // Serialize object - stream->Initialize(); - const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, true); - if (failed) - { - //NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot serialize object {} of type {} (missing serialization logic)", item.ToString(), obj->GetType().ToString()); - continue; - } - const uint32 size = stream->GetPosition(); - if (size > MAX_uint16) - { - LOG(Error, "Too much data for object {} replication ({} bytes provided while limit is {}).", item.ToString(), size, MAX_uint16); - continue; - } - -#if USE_NETWORK_REPLICATOR_CACHE - // Process replication cache to skip sending object data if it didn't change - if (item.RepCache.Data.Length() == size && - item.RepCache.Mask == e.TargetClients && - Platform::MemoryCompare(item.RepCache.Data.Get(), stream->GetBuffer(), size) == 0) - { - continue; - } - item.RepCache.Mask = e.TargetClients; - item.RepCache.Data.Copy(stream->GetBuffer(), size); -#endif - // TODO: use Unreliable for dynamic objects that are replicated every frame? (eg. player state) - constexpr NetworkChannelType repChannel = NetworkChannelType::Reliable; - - // Send object to clients - NetworkMessageObjectReplicate msgData; - msgData.OwnerFrame = NetworkManager::Frame; - Guid objectId = item.ObjectId, parentId = item.ParentId; - { - // Remap local client object ids into server ids - IdsRemappingTable.KeyOf(objectId, &objectId); - IdsRemappingTable.KeyOf(parentId, &parentId); - } - NetworkMessage msg = peer->BeginSendMessage(); - msg.WriteStructure(msgData); - msg.WriteNetworkId(objectId); - msg.WriteNetworkId(parentId); - msg.WriteNetworkName(obj->GetType().Fullname); - NetworkMessageObjectReplicatePayload msgDataPayload; - msgDataPayload.DataSize = size; - const uint32 networkKeyIdWorstCaseSize = sizeof(uint32) + sizeof(Guid); - const uint32 msgMaxData = peer->Config.MessageSize - msg.Position - sizeof(NetworkMessageObjectReplicatePayload); - const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicatePart) - networkKeyIdWorstCaseSize; - uint32 partsCount = 1; - uint32 dataStart = 0; - uint32 msgDataSize = size; - if (size > msgMaxData) - { - // Send msgMaxData within first message - msgDataSize = msgMaxData; - dataStart += msgMaxData; - - // Send rest of the data in separate parts - partsCount += Math::DivideAndRoundUp(size - dataStart, partMaxData); - } - else - dataStart += size; - ASSERT(partsCount <= MAX_uint8); - msgDataPayload.PartsCount = partsCount; - msgDataPayload.PartSize = msgDataSize; - msg.WriteStructure(msgDataPayload); - msg.WriteBytes(stream->GetBuffer(), msgDataSize); - uint32 dataSize = msgDataSize, messageSize = msg.Length; - if (isClient) - peer->EndSendMessage(repChannel, msg); - else - peer->EndSendMessage(repChannel, msg, CachedTargets); - - // Send all other parts - for (uint32 partIndex = 1; partIndex < partsCount; partIndex++) - { - NetworkMessageObjectReplicatePart msgDataPart; - msgDataPart.OwnerFrame = msgData.OwnerFrame; - msgDataPart.DataSize = msgDataPayload.DataSize; - msgDataPart.PartsCount = msgDataPayload.PartsCount; - msgDataPart.PartStart = dataStart; - msgDataPart.PartSize = Math::Min(size - dataStart, partMaxData); - msg = peer->BeginSendMessage(); - msg.WriteStructure(msgDataPart); - msg.WriteNetworkId(objectId); - msg.WriteBytes(stream->GetBuffer() + msgDataPart.PartStart, msgDataPart.PartSize); - messageSize += msg.Length; - dataSize += msgDataPart.PartSize; - dataStart += msgDataPart.PartSize; - if (isClient) - peer->EndSendMessage(repChannel, msg); - else - peer->EndSendMessage(repChannel, msg, CachedTargets); - } - ASSERT_LOW_LAYER(dataStart == size); - -#if COMPILE_WITH_PROFILER - // Network stats recording - if (EnableProfiling) - { - const Pair name(obj->GetTypeHandle(), StringAnsiView::Empty); - auto& profileEvent = ProfilerEvents[name]; - profileEvent.Count++; - profileEvent.DataSize += dataSize; - profileEvent.MessageSize += messageSize; - profileEvent.Receivers += isClient ? 1 : CachedTargets.Count(); - } -#endif + SendReplication(e.Object, e.TargetClients); } } @@ -2009,70 +2087,7 @@ void NetworkInternal::NetworkReplicatorUpdate() PROFILE_CPU_NAMED("Rpc"); for (auto& e : RpcQueue) { - ScriptingObject* obj = e.Object.Get(); - if (!obj) - continue; - auto it = Objects.Find(obj->GetID()); - if (it == Objects.End()) - { -#if USE_EDITOR || !BUILD_RELEASE - if (!DespawnedObjects.Contains(obj->GetID())) - LOG(Error, "Cannot invoke RPC method '{0}.{1}' on object '{2}' that is not registered in networking (use 'NetworkReplicator.AddObject').", e.Name.First.ToString(), String(e.Name.Second), obj->GetID()); -#endif - continue; - } - auto& item = it->Item; - - // Send RPC message - //NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Rpc {}::{} object ID={}", e.Name.First.ToString(), String(e.Name.Second), item.ToString()); - NetworkMessageObjectRpc msgData; - Guid msgObjectId = item.ObjectId; - Guid msgParentId = item.ParentId; - { - // Remap local client object ids into server ids - IdsRemappingTable.KeyOf(msgObjectId, &msgObjectId); - IdsRemappingTable.KeyOf(msgParentId, &msgParentId); - } - msgData.ArgsSize = (uint16)e.ArgsData.Length(); - NetworkMessage msg = peer->BeginSendMessage(); - msg.WriteStructure(msgData); - msg.WriteNetworkId(msgObjectId); - msg.WriteNetworkId(msgParentId); - msg.WriteNetworkName(obj->GetType().Fullname); - msg.WriteNetworkName(e.Name.First.GetType().Fullname); - msg.WriteNetworkName(e.Name.Second); - msg.WriteBytes(e.ArgsData.Get(), e.ArgsData.Length()); - uint32 dataSize = e.ArgsData.Length(), messageSize = msg.Length, receivers = 0; - NetworkChannelType channel = (NetworkChannelType)e.Info.Channel; - if (e.Info.Server && isClient) - { - // Client -> Server -#if USE_NETWORK_REPLICATOR_LOG - if (e.Targets.Length() != 0) - NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Server RPC '{}::{}' called with non-empty list of targets is not supported (only server will receive it)", e.Name.First.ToString(), e.Name.Second.ToString()); -#endif - peer->EndSendMessage(channel, msg); - receivers = 1; - } - else if (e.Info.Client && (isServer || isHost)) - { - // Server -> Client(s) - BuildCachedTargets(NetworkManager::Clients, item.TargetClientIds, e.Targets, NetworkManager::LocalClientId); - peer->EndSendMessage(channel, msg, CachedTargets); - receivers = CachedTargets.Count(); - } - -#if COMPILE_WITH_PROFILER - // Network stats recording - if (EnableProfiling && receivers) - { - auto& profileEvent = ProfilerEvents[e.Name]; - profileEvent.Count++; - profileEvent.DataSize += dataSize; - profileEvent.MessageSize += messageSize; - profileEvent.Receivers += receivers; - } -#endif + SendRpc(e); } RpcQueue.Clear(); }