From 7c3c4f1a63123d602afe05481d6fc25389146f94 Mon Sep 17 00:00:00 2001 From: Wojtek Figat Date: Thu, 30 Oct 2025 22:40:23 +0100 Subject: [PATCH] Add Network RPC messages splitting for large arguments payloads #3776 --- Source/Engine/Networking/NetworkInternal.h | 4 +- Source/Engine/Networking/NetworkManager.cpp | 1 + .../Engine/Networking/NetworkReplicator.cpp | 354 +++++++++++------- 3 files changed, 213 insertions(+), 146 deletions(-) diff --git a/Source/Engine/Networking/NetworkInternal.h b/Source/Engine/Networking/NetworkInternal.h index 7eb4f7d52..2aade3811 100644 --- a/Source/Engine/Networking/NetworkInternal.h +++ b/Source/Engine/Networking/NetworkInternal.h @@ -8,7 +8,7 @@ #endif // Internal version number of networking implementation. Updated once engine changes serialization or connection rules. -#define NETWORK_PROTOCOL_VERSION 4 +#define NETWORK_PROTOCOL_VERSION 5 // Enables encoding object ids and typenames via uint32 keys rather than full data send. #define USE_NETWORK_KEYS 1 @@ -29,6 +29,7 @@ enum class NetworkMessageIDs : uint8 ObjectDespawn, ObjectRole, ObjectRpc, + ObjectRpcPart, MAX, }; @@ -48,6 +49,7 @@ public: static void OnNetworkMessageObjectDespawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); static void OnNetworkMessageObjectRole(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); static void OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); + static void OnNetworkMessageObjectRpcPart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); #if COMPILE_WITH_PROFILER diff --git a/Source/Engine/Networking/NetworkManager.cpp b/Source/Engine/Networking/NetworkManager.cpp index 784bbf51e..36ff94c5c 100644 --- a/Source/Engine/Networking/NetworkManager.cpp +++ b/Source/Engine/Networking/NetworkManager.cpp @@ -391,6 +391,7 @@ namespace NetworkInternal::OnNetworkMessageObjectDespawn, NetworkInternal::OnNetworkMessageObjectRole, NetworkInternal::OnNetworkMessageObjectRpc, + NetworkInternal::OnNetworkMessageObjectRpcPart, }; } diff --git a/Source/Engine/Networking/NetworkReplicator.cpp b/Source/Engine/Networking/NetworkReplicator.cpp index 9fdaf7a97..c584d3526 100644 --- a/Source/Engine/Networking/NetworkReplicator.cpp +++ b/Source/Engine/Networking/NetworkReplicator.cpp @@ -55,14 +55,14 @@ PACK_STRUCT(struct NetworkMessageObjectReplicate uint32 OwnerFrame; }); -PACK_STRUCT(struct NetworkMessageObjectReplicatePayload +PACK_STRUCT(struct NetworkMessageObjectPartPayload { uint16 DataSize; uint16 PartsCount; uint16 PartSize; }); -PACK_STRUCT(struct NetworkMessageObjectReplicatePart +PACK_STRUCT(struct NetworkMessageObjectPart { NetworkMessageIDs ID = NetworkMessageIDs::ObjectReplicatePart; uint32 OwnerFrame; @@ -111,7 +111,7 @@ PACK_STRUCT(struct NetworkMessageObjectRole PACK_STRUCT(struct NetworkMessageObjectRpc { NetworkMessageIDs ID = NetworkMessageIDs::ObjectRpc; - uint16 ArgsSize; + uint32 OwnerFrame; }); struct NetworkReplicatedObject @@ -182,13 +182,14 @@ struct Serializer void* Tags[2]; }; -struct ReplicateItem +struct PartsItem { ScriptingObjectReference Object; Guid ObjectId; uint16 PartsLeft; uint32 OwnerFrame; uint32 OwnerClientId; + const void* Tag; Array Data; }; @@ -220,7 +221,7 @@ struct DespawnItem DataContainer Targets; }; -struct RpcItem +struct RpcSendItem { ScriptingObjectReference Object; NetworkRpcName Name; @@ -233,11 +234,12 @@ namespace { CriticalSection ObjectsLock; HashSet Objects; - Array ReplicationParts; + Array ReplicationParts; + Array RpcParts; Array SpawnParts; Array SpawnQueue; Array DespawnQueue; - Array RpcQueue; + Array RpcQueue; Dictionary IdsRemappingTable; NetworkStream* CachedWriteStream = nullptr; NetworkStream* CachedReadStream = nullptr; @@ -251,6 +253,7 @@ namespace #endif Array DespawnedObjects; uint32 SpawnId = 0; + uint32 RpcId = 0; #if USE_EDITOR void OnScriptsReloading() @@ -505,6 +508,76 @@ void SetupObjectSpawnMessageItem(SpawnItem* e, NetworkMessage& msg) msg.WriteStructure(msgDataItem); } +void SendInParts(NetworkPeer* peer, NetworkChannelType channel, const byte* data, const uint16 dataSize, NetworkMessage& msg, const NetworkRpcName& name, bool toServer, const Guid& objectId, uint32 ownerFrame, NetworkMessageIDs partId) +{ + NetworkMessageObjectPartPayload msgDataPayload; + msgDataPayload.DataSize = dataSize; + const uint32 networkKeyIdWorstCaseSize = sizeof(uint32) + sizeof(Guid); + const uint32 msgMaxData = peer->Config.MessageSize - msg.Position - sizeof(NetworkMessageObjectPartPayload); + const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectPart) - networkKeyIdWorstCaseSize; + uint32 partsCount = 1; + uint32 dataStart = 0; + uint32 msgDataSize = dataSize; + if (dataSize > msgMaxData) + { + // Send msgMaxData within first message + msgDataSize = msgMaxData; + dataStart += msgMaxData; + + // Send rest of the data in separate parts + partsCount += Math::DivideAndRoundUp(dataSize - dataStart, partMaxData); + + // TODO: promote channel to Ordered when using parts? + } + else + dataStart += dataSize; + ASSERT(partsCount <= MAX_uint8); + msgDataPayload.PartsCount = partsCount; + msgDataPayload.PartSize = msgDataSize; + msg.WriteStructure(msgDataPayload); + msg.WriteBytes(data, msgDataSize); + uint32 messageSize = msg.Length; + if (toServer) + peer->EndSendMessage(channel, msg); + else + peer->EndSendMessage(channel, msg, CachedTargets); + + // Send all other parts + for (uint32 partIndex = 1; partIndex < partsCount; partIndex++) + { + NetworkMessageObjectPart msgDataPart; + msgDataPart.ID = partId; + msgDataPart.OwnerFrame = ownerFrame; + msgDataPart.DataSize = msgDataPayload.DataSize; + msgDataPart.PartsCount = msgDataPayload.PartsCount; + msgDataPart.PartStart = dataStart; + msgDataPart.PartSize = Math::Min(dataSize - dataStart, partMaxData); + msg = peer->BeginSendMessage(); + msg.WriteStructure(msgDataPart); + msg.WriteNetworkId(objectId); + msg.WriteBytes(data + msgDataPart.PartStart, msgDataPart.PartSize); + messageSize += msg.Length; + dataStart += msgDataPart.PartSize; + if (toServer) + peer->EndSendMessage(channel, msg); + else + peer->EndSendMessage(channel, msg, CachedTargets); + } + ASSERT_LOW_LAYER(dataStart == dataSize); + +#if COMPILE_WITH_PROFILER + // Network stats recording + if (NetworkInternal::EnableProfiling) + { + auto& profileEvent = NetworkInternal::ProfilerEvents[name]; + profileEvent.Count++; + profileEvent.DataSize += dataSize; + profileEvent.MessageSize += messageSize; + profileEvent.Receivers += toServer ? 1 : CachedTargets.Count(); + } +#endif +} + void SendObjectSpawnMessage(const SpawnGroup& group, const Array& clients) { PROFILE_CPU(); @@ -682,74 +755,11 @@ void SendReplication(ScriptingObject* obj, NetworkClientsMask targetClients) msg.WriteNetworkId(objectId); msg.WriteNetworkId(parentId); msg.WriteNetworkName(obj->GetType().Fullname); - NetworkMessageObjectReplicatePayload msgDataPayload; - msgDataPayload.DataSize = size; - const uint32 networkKeyIdWorstCaseSize = sizeof(uint32) + sizeof(Guid); - const uint32 msgMaxData = peer->Config.MessageSize - msg.Position - sizeof(NetworkMessageObjectReplicatePayload); - const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicatePart) - networkKeyIdWorstCaseSize; - uint32 partsCount = 1; - uint32 dataStart = 0; - uint32 msgDataSize = size; - if (size > msgMaxData) - { - // Send msgMaxData within first message - msgDataSize = msgMaxData; - dataStart += msgMaxData; - - // Send rest of the data in separate parts - partsCount += Math::DivideAndRoundUp(size - dataStart, partMaxData); - } - else - dataStart += size; - ASSERT(partsCount <= MAX_uint8); - msgDataPayload.PartsCount = partsCount; - msgDataPayload.PartSize = msgDataSize; - msg.WriteStructure(msgDataPayload); - msg.WriteBytes(stream->GetBuffer(), msgDataSize); - uint32 dataSize = msgDataSize, messageSize = msg.Length; - if (isClient) - peer->EndSendMessage(repChannel, msg); - else - peer->EndSendMessage(repChannel, msg, CachedTargets); - - // Send all other parts - for (uint32 partIndex = 1; partIndex < partsCount; partIndex++) - { - NetworkMessageObjectReplicatePart msgDataPart; - msgDataPart.OwnerFrame = msgData.OwnerFrame; - msgDataPart.DataSize = msgDataPayload.DataSize; - msgDataPart.PartsCount = msgDataPayload.PartsCount; - msgDataPart.PartStart = dataStart; - msgDataPart.PartSize = Math::Min(size - dataStart, partMaxData); - msg = peer->BeginSendMessage(); - msg.WriteStructure(msgDataPart); - msg.WriteNetworkId(objectId); - msg.WriteBytes(stream->GetBuffer() + msgDataPart.PartStart, msgDataPart.PartSize); - messageSize += msg.Length; - dataSize += msgDataPart.PartSize; - dataStart += msgDataPart.PartSize; - if (isClient) - peer->EndSendMessage(repChannel, msg); - else - peer->EndSendMessage(repChannel, msg, CachedTargets); - } - ASSERT_LOW_LAYER(dataStart == size); - -#if COMPILE_WITH_PROFILER - // Network stats recording - if (NetworkInternal::EnableProfiling) - { - const Pair name(obj->GetTypeHandle(), StringAnsiView::Empty); - auto& profileEvent = NetworkInternal::ProfilerEvents[name]; - profileEvent.Count++; - profileEvent.DataSize += dataSize; - profileEvent.MessageSize += messageSize; - profileEvent.Receivers += isClient ? 1 : CachedTargets.Count(); - } -#endif + const NetworkRpcName name(obj->GetTypeHandle(), StringAnsiView::Empty); + SendInParts(peer, repChannel, stream->GetBuffer(), size, msg, name, isClient, objectId, msgData.OwnerFrame, NetworkMessageIDs::ObjectReplicatePart); } -void SendRpc(RpcItem& e) +void SendRpc(RpcSendItem& e) { ScriptingObject* obj = e.Object.Get(); if (!obj) @@ -759,64 +769,60 @@ void SendRpc(RpcItem& e) { #if !BUILD_RELEASE if (!DespawnedObjects.Contains(obj->GetID())) - LOG(Error, "Cannot invoke RPC method '{0}.{1}' on object '{2}' that is not registered in networking (use 'NetworkReplicator.AddObject').", e.Name.First.ToString(), String(e.Name.Second), obj->GetID()); + LOG(Error, "Cannot invoke RPC method '{0}.{1}' on object '{2}' that is not registered in networking (use 'NetworkReplicator.AddObject').", e.Name.First.ToString(), e.Name.Second.ToString(), obj->GetID()); #endif return; } auto& item = it->Item; + if (e.ArgsData.Length() > MAX_uint16) + { + LOG(Error, "Too much data for object RPC method '{}.{}' on object '{}' ({} bytes provided while limit is {}).", e.Name.First.ToString(), e.Name.Second.ToString(), obj->GetID(), e.ArgsData.Length(), MAX_uint16); + return; + } const NetworkManagerMode mode = NetworkManager::Mode; NetworkPeer* peer = NetworkManager::Peer; - // Send RPC message - //NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Rpc {}::{} object ID={}", e.Name.First.ToString(), String(e.Name.Second), item.ToString()); - NetworkMessageObjectRpc msgData; - Guid msgObjectId = item.ObjectId; - Guid msgParentId = item.ParentId; - { - // Remap local client object ids into server ids - IdsRemappingTable.KeyOf(msgObjectId, &msgObjectId); - IdsRemappingTable.KeyOf(msgParentId, &msgParentId); - } - msgData.ArgsSize = (uint16)e.ArgsData.Length(); - NetworkMessage msg = peer->BeginSendMessage(); - msg.WriteStructure(msgData); - msg.WriteNetworkId(msgObjectId); - msg.WriteNetworkId(msgParentId); - msg.WriteNetworkName(obj->GetType().Fullname); - msg.WriteNetworkName(e.Name.First.GetType().Fullname); - msg.WriteNetworkName(e.Name.Second); - msg.WriteBytes(e.ArgsData.Get(), e.ArgsData.Length()); - uint32 dataSize = e.ArgsData.Length(), messageSize = msg.Length, receivers = 0; - NetworkChannelType channel = (NetworkChannelType)e.Info.Channel; + bool toServer; if (e.Info.Server && mode == NetworkManagerMode::Client) { // Client -> Server #if USE_NETWORK_REPLICATOR_LOG if (e.Targets.Length() != 0) - NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Server RPC '{}::{}' called with non-empty list of targets is not supported (only server will receive it)", e.Name.First.ToString(), e.Name.Second.ToString()); + NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Server RPC '{}.{}' called with non-empty list of targets is not supported (only server will receive it)", e.Name.First.ToString(), e.Name.Second.ToString()); #endif - peer->EndSendMessage(channel, msg); - receivers = 1; + toServer = true; } else if (e.Info.Client && (mode == NetworkManagerMode::Server || mode == NetworkManagerMode::Host)) { // Server -> Client(s) BuildCachedTargets(NetworkManager::Clients, item.TargetClientIds, e.Targets, NetworkManager::LocalClientId); - peer->EndSendMessage(channel, msg, CachedTargets); - receivers = CachedTargets.Count(); + if (CachedTargets.IsEmpty()) + return; + toServer = false; } + else + return; -#if COMPILE_WITH_PROFILER - // Network stats recording - if (NetworkInternal::EnableProfiling && receivers) + // Send RPC message + //NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Rpc {}.{} object ID={}", e.Name.First.ToString(), e.Name.Second.ToString(), item.ToString()); + NetworkMessageObjectRpc msgData; + msgData.OwnerFrame = ++RpcId; + Guid objectId = item.ObjectId; + Guid parentId = item.ParentId; { - auto& profileEvent = NetworkInternal::ProfilerEvents[e.Name]; - profileEvent.Count++; - profileEvent.DataSize += dataSize; - profileEvent.MessageSize += messageSize; - profileEvent.Receivers += receivers; + // Remap local client object ids into server ids + IdsRemappingTable.KeyOf(objectId, &objectId); + IdsRemappingTable.KeyOf(parentId, &parentId); } -#endif + NetworkMessage msg = peer->BeginSendMessage(); + msg.WriteStructure(msgData); + msg.WriteNetworkId(objectId); + msg.WriteNetworkId(parentId); + msg.WriteNetworkName(obj->GetType().Fullname); + msg.WriteNetworkName(e.Name.First.GetType().Fullname); + msg.WriteNetworkName(e.Name.Second); + NetworkChannelType channel = (NetworkChannelType)e.Info.Channel; + SendInParts(peer, channel, e.ArgsData.Get(), e.ArgsData.Length(), msg, e.Name, toServer, objectId, msgData.OwnerFrame, NetworkMessageIDs::ObjectRpcPart); } void DeleteNetworkObject(ScriptingObject* obj) @@ -929,38 +935,43 @@ FORCE_INLINE void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject Hierarchy->DirtyObject(obj); } -ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId) +PartsItem* AddPartsItem(Array& items, NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId) { // Reuse or add part item - ReplicateItem* replicateItem = nullptr; - for (auto& e : ReplicationParts) + PartsItem* item = nullptr; + for (auto& e : items) { if (e.OwnerFrame == ownerFrame && e.Data.Count() == dataSize && e.ObjectId == objectId) { // Reuse - replicateItem = &e; + item = &e; break; } } - if (!replicateItem) + if (!item) { // Add - replicateItem = &ReplicationParts.AddOne(); - replicateItem->ObjectId = objectId; - replicateItem->PartsLeft = partsCount; - replicateItem->OwnerFrame = ownerFrame; - replicateItem->OwnerClientId = senderClientId; - replicateItem->Data.Resize(dataSize); + item = &items.AddOne(); + item->ObjectId = objectId; + item->PartsLeft = partsCount; + item->OwnerFrame = ownerFrame; + item->OwnerClientId = senderClientId; + item->Data.Resize(dataSize); } // Copy part data - ASSERT(replicateItem->PartsLeft > 0); - replicateItem->PartsLeft--; - ASSERT(partStart + partSize <= replicateItem->Data.Count()); + ASSERT(item->PartsLeft > 0); + item->PartsLeft--; + ASSERT(partStart + partSize <= item->Data.Count()); const void* partData = event.Message.SkipBytes(partSize); - Platform::MemoryCopy(replicateItem->Data.Get() + partStart, partData, partSize); + Platform::MemoryCopy(item->Data.Get() + partStart, partData, partSize); - return replicateItem; + return item; +} + +FORCE_INLINE PartsItem* AddObjectReplicateItem(NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId) +{ + return AddPartsItem(ReplicationParts, event, ownerFrame, partsCount, dataSize, objectId, partStart, partSize, senderClientId); } void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize, uint32 senderClientId) @@ -1008,6 +1019,24 @@ void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, b DirtyObjectImpl(item, obj); } +FORCE_INLINE PartsItem* AddObjectRpcItem(NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId) +{ + return AddPartsItem(RpcParts, event, ownerFrame, partsCount, dataSize, objectId, partStart, partSize, senderClientId); +} + +void InvokeObjectRpc(const NetworkRpcInfo* info, byte* data, uint32 dataSize, uint32 senderClientId, ScriptingObject* obj) +{ + // Setup message reading stream + if (CachedReadStream == nullptr) + CachedReadStream = New(); + NetworkStream* stream = CachedReadStream; + stream->SenderId = senderClientId; + stream->Initialize(data, dataSize); + + // Execute RPC + info->Execute(obj, stream, info->Tag); +} + void InvokeObjectSpawn(const NetworkMessageObjectSpawn& msgData, const Guid& prefabId, const NetworkMessageObjectSpawnItem* msgDataItems) { ScopeLock lock(ObjectsLock); @@ -2033,6 +2062,7 @@ void NetworkInternal::NetworkReplicatorUpdate() } } + // TODO: remove items from RpcParts after some TTL to reduce memory usage // TODO: remove items from SpawnParts after some TTL to reduce memory usage // Replicate all owned networked objects with other clients or server @@ -2100,7 +2130,7 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo { PROFILE_CPU(); NetworkMessageObjectReplicate msgData; - NetworkMessageObjectReplicatePayload msgDataPayload; + NetworkMessageObjectPartPayload msgDataPayload; Guid objectId, parentId; StringAnsiView objectTypeName; event.Message.ReadStructure(msgData); @@ -2110,7 +2140,7 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo event.Message.ReadStructure(msgDataPayload); ScopeLock lock(ObjectsLock); if (DespawnedObjects.Contains(objectId)) - return; // Skip replicating not-existing objects + return; // Skip replicating non-existing objects NetworkReplicatedObject* e = ResolveObject(objectId, parentId, objectTypeName); if (!e) return; @@ -2129,7 +2159,7 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo else { // Add to replication from multiple parts - ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData.OwnerFrame, msgDataPayload.PartsCount, msgDataPayload.DataSize, objectId, 0, msgDataPayload.PartSize, senderClientId); + PartsItem* replicateItem = AddObjectReplicateItem(event, msgData.OwnerFrame, msgDataPayload.PartsCount, msgDataPayload.DataSize, objectId, 0, msgDataPayload.PartSize, senderClientId); replicateItem->Object = e->Object; } } @@ -2137,13 +2167,13 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo void NetworkInternal::OnNetworkMessageObjectReplicatePart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer) { PROFILE_CPU(); - NetworkMessageObjectReplicatePart msgData; + NetworkMessageObjectPart msgData; Guid objectId; event.Message.ReadStructure(msgData); event.Message.ReadNetworkId(objectId); ScopeLock lock(ObjectsLock); if (DespawnedObjects.Contains(objectId)) - return; // Skip replicating not-existing objects + return; // Skip replicating non-existing objects const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId; AddObjectReplicateItem(event, msgData.OwnerFrame, msgData.PartsCount, msgData.DataSize, objectId, msgData.PartStart, msgData.PartSize, senderClientId); @@ -2303,14 +2333,16 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie { PROFILE_CPU(); NetworkMessageObjectRpc msgData; - Guid msgObjectId, msgParentId; + NetworkMessageObjectPartPayload msgDataPayload; + Guid objectId, parentId; StringAnsiView objectTypeName, rpcTypeName, rpcName; event.Message.ReadStructure(msgData); - event.Message.ReadNetworkId(msgObjectId); - event.Message.ReadNetworkId(msgParentId); + event.Message.ReadNetworkId(objectId); + event.Message.ReadNetworkId(parentId); event.Message.ReadNetworkName(objectTypeName); event.Message.ReadNetworkName(rpcTypeName); event.Message.ReadNetworkName(rpcName); + event.Message.ReadStructure(msgDataPayload); ScopeLock lock(ObjectsLock); // Find RPC info @@ -2320,11 +2352,11 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie const NetworkRpcInfo* info = NetworkRpcInfo::RPCsTable.TryGet(name); if (!info) { - NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown RPC {}::{} for object {}", String(rpcTypeName), String(rpcName), msgObjectId); + NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown RPC {}::{} for object {}", String(rpcTypeName), String(rpcName), objectId); return; } - NetworkReplicatedObject* e = ResolveObject(msgObjectId, msgParentId, objectTypeName); + NetworkReplicatedObject* e = ResolveObject(objectId, parentId, objectTypeName); if (e) { auto& item = *e; @@ -2344,18 +2376,50 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie return; } - // Setup message reading stream - if (CachedReadStream == nullptr) - CachedReadStream = New(); - NetworkStream* stream = CachedReadStream; - stream->SenderId = client ? client->ClientId : NetworkManager::ServerClientId; - stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.ArgsSize); - - // Execute RPC - info->Execute(obj, stream, info->Tag); + const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId; + if (msgDataPayload.PartsCount == 1) + { + // Call RPC + InvokeObjectRpc(info, event.Message.Buffer + event.Message.Position, msgDataPayload.DataSize, senderClientId, obj); + } + else + { + // Add to RPC from multiple parts + PartsItem* rpcItem = AddObjectRpcItem(event, msgData.OwnerFrame, msgDataPayload.PartsCount, msgDataPayload.DataSize, objectId, 0, msgDataPayload.PartSize, senderClientId); + rpcItem->Object = e->Object; + rpcItem->Tag = info; + } } else if (info->Channel != static_cast(NetworkChannelType::Unreliable) && info->Channel != static_cast(NetworkChannelType::UnreliableOrdered)) { - NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object {} RPC {}::{}", msgObjectId, String(rpcTypeName), String(rpcName)); + NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object {} RPC {}::{}", objectId, String(rpcTypeName), String(rpcName)); + } +} + +void NetworkInternal::OnNetworkMessageObjectRpcPart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer) +{ + PROFILE_CPU(); + NetworkMessageObjectPart msgData; + Guid objectId; + event.Message.ReadStructure(msgData); + event.Message.ReadNetworkId(objectId); + ScopeLock lock(ObjectsLock); + if (DespawnedObjects.Contains(objectId)) + return; // Skip replicating non-existing objects + + const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId; + PartsItem* rpcItem = AddObjectRpcItem(event, msgData.OwnerFrame, msgData.PartsCount, msgData.DataSize, objectId, msgData.PartStart, msgData.PartSize, senderClientId); + if (rpcItem && rpcItem->PartsLeft == 0) + { + // Got all parts so invoke RPC + ScriptingObject* obj = rpcItem->Object.Get(); + if (obj) + { + InvokeObjectRpc((const NetworkRpcInfo*)rpcItem->Tag, rpcItem->Data.Get(), rpcItem->Data.Count(), rpcItem->OwnerClientId, obj); + } + + // Remove item + int32 partIndex = (int32)((RpcParts.Get() - rpcItem) / sizeof(rpcItem)); + RpcParts.RemoveAt(partIndex); } }