Add Network RPC messages splitting for large arguments payloads

#3776
This commit is contained in:
Wojtek Figat
2025-10-30 22:40:23 +01:00
parent 114828adcb
commit 7c3c4f1a63
3 changed files with 213 additions and 146 deletions

View File

@@ -8,7 +8,7 @@
#endif
// Internal version number of networking implementation. Updated once engine changes serialization or connection rules.
#define NETWORK_PROTOCOL_VERSION 4
#define NETWORK_PROTOCOL_VERSION 5
// Enables encoding object ids and typenames via uint32 keys rather than full data send.
#define USE_NETWORK_KEYS 1
@@ -29,6 +29,7 @@ enum class NetworkMessageIDs : uint8
ObjectDespawn,
ObjectRole,
ObjectRpc,
ObjectRpcPart,
MAX,
};
@@ -48,6 +49,7 @@ public:
static void OnNetworkMessageObjectDespawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectRole(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectRpcPart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
#if COMPILE_WITH_PROFILER

View File

@@ -391,6 +391,7 @@ namespace
NetworkInternal::OnNetworkMessageObjectDespawn,
NetworkInternal::OnNetworkMessageObjectRole,
NetworkInternal::OnNetworkMessageObjectRpc,
NetworkInternal::OnNetworkMessageObjectRpcPart,
};
}

View File

@@ -55,14 +55,14 @@ PACK_STRUCT(struct NetworkMessageObjectReplicate
uint32 OwnerFrame;
});
PACK_STRUCT(struct NetworkMessageObjectReplicatePayload
PACK_STRUCT(struct NetworkMessageObjectPartPayload
{
uint16 DataSize;
uint16 PartsCount;
uint16 PartSize;
});
PACK_STRUCT(struct NetworkMessageObjectReplicatePart
PACK_STRUCT(struct NetworkMessageObjectPart
{
NetworkMessageIDs ID = NetworkMessageIDs::ObjectReplicatePart;
uint32 OwnerFrame;
@@ -111,7 +111,7 @@ PACK_STRUCT(struct NetworkMessageObjectRole
PACK_STRUCT(struct NetworkMessageObjectRpc
{
NetworkMessageIDs ID = NetworkMessageIDs::ObjectRpc;
uint16 ArgsSize;
uint32 OwnerFrame;
});
struct NetworkReplicatedObject
@@ -182,13 +182,14 @@ struct Serializer
void* Tags[2];
};
struct ReplicateItem
struct PartsItem
{
ScriptingObjectReference<ScriptingObject> Object;
Guid ObjectId;
uint16 PartsLeft;
uint32 OwnerFrame;
uint32 OwnerClientId;
const void* Tag;
Array<byte> Data;
};
@@ -220,7 +221,7 @@ struct DespawnItem
DataContainer<uint32> Targets;
};
struct RpcItem
struct RpcSendItem
{
ScriptingObjectReference<ScriptingObject> Object;
NetworkRpcName Name;
@@ -233,11 +234,12 @@ namespace
{
CriticalSection ObjectsLock;
HashSet<NetworkReplicatedObject> Objects;
Array<ReplicateItem> ReplicationParts;
Array<PartsItem> ReplicationParts;
Array<PartsItem> RpcParts;
Array<SpawnItemParts> SpawnParts;
Array<SpawnItem> SpawnQueue;
Array<DespawnItem> DespawnQueue;
Array<RpcItem> RpcQueue;
Array<RpcSendItem> RpcQueue;
Dictionary<Guid, Guid> IdsRemappingTable;
NetworkStream* CachedWriteStream = nullptr;
NetworkStream* CachedReadStream = nullptr;
@@ -251,6 +253,7 @@ namespace
#endif
Array<Guid> DespawnedObjects;
uint32 SpawnId = 0;
uint32 RpcId = 0;
#if USE_EDITOR
void OnScriptsReloading()
@@ -505,6 +508,76 @@ void SetupObjectSpawnMessageItem(SpawnItem* e, NetworkMessage& msg)
msg.WriteStructure(msgDataItem);
}
void SendInParts(NetworkPeer* peer, NetworkChannelType channel, const byte* data, const uint16 dataSize, NetworkMessage& msg, const NetworkRpcName& name, bool toServer, const Guid& objectId, uint32 ownerFrame, NetworkMessageIDs partId)
{
NetworkMessageObjectPartPayload msgDataPayload;
msgDataPayload.DataSize = dataSize;
const uint32 networkKeyIdWorstCaseSize = sizeof(uint32) + sizeof(Guid);
const uint32 msgMaxData = peer->Config.MessageSize - msg.Position - sizeof(NetworkMessageObjectPartPayload);
const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectPart) - networkKeyIdWorstCaseSize;
uint32 partsCount = 1;
uint32 dataStart = 0;
uint32 msgDataSize = dataSize;
if (dataSize > msgMaxData)
{
// Send msgMaxData within first message
msgDataSize = msgMaxData;
dataStart += msgMaxData;
// Send rest of the data in separate parts
partsCount += Math::DivideAndRoundUp(dataSize - dataStart, partMaxData);
// TODO: promote channel to Ordered when using parts?
}
else
dataStart += dataSize;
ASSERT(partsCount <= MAX_uint8);
msgDataPayload.PartsCount = partsCount;
msgDataPayload.PartSize = msgDataSize;
msg.WriteStructure(msgDataPayload);
msg.WriteBytes(data, msgDataSize);
uint32 messageSize = msg.Length;
if (toServer)
peer->EndSendMessage(channel, msg);
else
peer->EndSendMessage(channel, msg, CachedTargets);
// Send all other parts
for (uint32 partIndex = 1; partIndex < partsCount; partIndex++)
{
NetworkMessageObjectPart msgDataPart;
msgDataPart.ID = partId;
msgDataPart.OwnerFrame = ownerFrame;
msgDataPart.DataSize = msgDataPayload.DataSize;
msgDataPart.PartsCount = msgDataPayload.PartsCount;
msgDataPart.PartStart = dataStart;
msgDataPart.PartSize = Math::Min(dataSize - dataStart, partMaxData);
msg = peer->BeginSendMessage();
msg.WriteStructure(msgDataPart);
msg.WriteNetworkId(objectId);
msg.WriteBytes(data + msgDataPart.PartStart, msgDataPart.PartSize);
messageSize += msg.Length;
dataStart += msgDataPart.PartSize;
if (toServer)
peer->EndSendMessage(channel, msg);
else
peer->EndSendMessage(channel, msg, CachedTargets);
}
ASSERT_LOW_LAYER(dataStart == dataSize);
#if COMPILE_WITH_PROFILER
// Network stats recording
if (NetworkInternal::EnableProfiling)
{
auto& profileEvent = NetworkInternal::ProfilerEvents[name];
profileEvent.Count++;
profileEvent.DataSize += dataSize;
profileEvent.MessageSize += messageSize;
profileEvent.Receivers += toServer ? 1 : CachedTargets.Count();
}
#endif
}
void SendObjectSpawnMessage(const SpawnGroup& group, const Array<NetworkClient*>& clients)
{
PROFILE_CPU();
@@ -682,74 +755,11 @@ void SendReplication(ScriptingObject* obj, NetworkClientsMask targetClients)
msg.WriteNetworkId(objectId);
msg.WriteNetworkId(parentId);
msg.WriteNetworkName(obj->GetType().Fullname);
NetworkMessageObjectReplicatePayload msgDataPayload;
msgDataPayload.DataSize = size;
const uint32 networkKeyIdWorstCaseSize = sizeof(uint32) + sizeof(Guid);
const uint32 msgMaxData = peer->Config.MessageSize - msg.Position - sizeof(NetworkMessageObjectReplicatePayload);
const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicatePart) - networkKeyIdWorstCaseSize;
uint32 partsCount = 1;
uint32 dataStart = 0;
uint32 msgDataSize = size;
if (size > msgMaxData)
{
// Send msgMaxData within first message
msgDataSize = msgMaxData;
dataStart += msgMaxData;
// Send rest of the data in separate parts
partsCount += Math::DivideAndRoundUp(size - dataStart, partMaxData);
}
else
dataStart += size;
ASSERT(partsCount <= MAX_uint8);
msgDataPayload.PartsCount = partsCount;
msgDataPayload.PartSize = msgDataSize;
msg.WriteStructure(msgDataPayload);
msg.WriteBytes(stream->GetBuffer(), msgDataSize);
uint32 dataSize = msgDataSize, messageSize = msg.Length;
if (isClient)
peer->EndSendMessage(repChannel, msg);
else
peer->EndSendMessage(repChannel, msg, CachedTargets);
// Send all other parts
for (uint32 partIndex = 1; partIndex < partsCount; partIndex++)
{
NetworkMessageObjectReplicatePart msgDataPart;
msgDataPart.OwnerFrame = msgData.OwnerFrame;
msgDataPart.DataSize = msgDataPayload.DataSize;
msgDataPart.PartsCount = msgDataPayload.PartsCount;
msgDataPart.PartStart = dataStart;
msgDataPart.PartSize = Math::Min(size - dataStart, partMaxData);
msg = peer->BeginSendMessage();
msg.WriteStructure(msgDataPart);
msg.WriteNetworkId(objectId);
msg.WriteBytes(stream->GetBuffer() + msgDataPart.PartStart, msgDataPart.PartSize);
messageSize += msg.Length;
dataSize += msgDataPart.PartSize;
dataStart += msgDataPart.PartSize;
if (isClient)
peer->EndSendMessage(repChannel, msg);
else
peer->EndSendMessage(repChannel, msg, CachedTargets);
}
ASSERT_LOW_LAYER(dataStart == size);
#if COMPILE_WITH_PROFILER
// Network stats recording
if (NetworkInternal::EnableProfiling)
{
const Pair<ScriptingTypeHandle, StringAnsiView> name(obj->GetTypeHandle(), StringAnsiView::Empty);
auto& profileEvent = NetworkInternal::ProfilerEvents[name];
profileEvent.Count++;
profileEvent.DataSize += dataSize;
profileEvent.MessageSize += messageSize;
profileEvent.Receivers += isClient ? 1 : CachedTargets.Count();
}
#endif
const NetworkRpcName name(obj->GetTypeHandle(), StringAnsiView::Empty);
SendInParts(peer, repChannel, stream->GetBuffer(), size, msg, name, isClient, objectId, msgData.OwnerFrame, NetworkMessageIDs::ObjectReplicatePart);
}
void SendRpc(RpcItem& e)
void SendRpc(RpcSendItem& e)
{
ScriptingObject* obj = e.Object.Get();
if (!obj)
@@ -759,64 +769,60 @@ void SendRpc(RpcItem& e)
{
#if !BUILD_RELEASE
if (!DespawnedObjects.Contains(obj->GetID()))
LOG(Error, "Cannot invoke RPC method '{0}.{1}' on object '{2}' that is not registered in networking (use 'NetworkReplicator.AddObject').", e.Name.First.ToString(), String(e.Name.Second), obj->GetID());
LOG(Error, "Cannot invoke RPC method '{0}.{1}' on object '{2}' that is not registered in networking (use 'NetworkReplicator.AddObject').", e.Name.First.ToString(), e.Name.Second.ToString(), obj->GetID());
#endif
return;
}
auto& item = it->Item;
if (e.ArgsData.Length() > MAX_uint16)
{
LOG(Error, "Too much data for object RPC method '{}.{}' on object '{}' ({} bytes provided while limit is {}).", e.Name.First.ToString(), e.Name.Second.ToString(), obj->GetID(), e.ArgsData.Length(), MAX_uint16);
return;
}
const NetworkManagerMode mode = NetworkManager::Mode;
NetworkPeer* peer = NetworkManager::Peer;
// Send RPC message
//NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Rpc {}::{} object ID={}", e.Name.First.ToString(), String(e.Name.Second), item.ToString());
NetworkMessageObjectRpc msgData;
Guid msgObjectId = item.ObjectId;
Guid msgParentId = item.ParentId;
{
// Remap local client object ids into server ids
IdsRemappingTable.KeyOf(msgObjectId, &msgObjectId);
IdsRemappingTable.KeyOf(msgParentId, &msgParentId);
}
msgData.ArgsSize = (uint16)e.ArgsData.Length();
NetworkMessage msg = peer->BeginSendMessage();
msg.WriteStructure(msgData);
msg.WriteNetworkId(msgObjectId);
msg.WriteNetworkId(msgParentId);
msg.WriteNetworkName(obj->GetType().Fullname);
msg.WriteNetworkName(e.Name.First.GetType().Fullname);
msg.WriteNetworkName(e.Name.Second);
msg.WriteBytes(e.ArgsData.Get(), e.ArgsData.Length());
uint32 dataSize = e.ArgsData.Length(), messageSize = msg.Length, receivers = 0;
NetworkChannelType channel = (NetworkChannelType)e.Info.Channel;
bool toServer;
if (e.Info.Server && mode == NetworkManagerMode::Client)
{
// Client -> Server
#if USE_NETWORK_REPLICATOR_LOG
if (e.Targets.Length() != 0)
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Server RPC '{}::{}' called with non-empty list of targets is not supported (only server will receive it)", e.Name.First.ToString(), e.Name.Second.ToString());
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Server RPC '{}.{}' called with non-empty list of targets is not supported (only server will receive it)", e.Name.First.ToString(), e.Name.Second.ToString());
#endif
peer->EndSendMessage(channel, msg);
receivers = 1;
toServer = true;
}
else if (e.Info.Client && (mode == NetworkManagerMode::Server || mode == NetworkManagerMode::Host))
{
// Server -> Client(s)
BuildCachedTargets(NetworkManager::Clients, item.TargetClientIds, e.Targets, NetworkManager::LocalClientId);
peer->EndSendMessage(channel, msg, CachedTargets);
receivers = CachedTargets.Count();
if (CachedTargets.IsEmpty())
return;
toServer = false;
}
else
return;
#if COMPILE_WITH_PROFILER
// Network stats recording
if (NetworkInternal::EnableProfiling && receivers)
// Send RPC message
//NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Rpc {}.{} object ID={}", e.Name.First.ToString(), e.Name.Second.ToString(), item.ToString());
NetworkMessageObjectRpc msgData;
msgData.OwnerFrame = ++RpcId;
Guid objectId = item.ObjectId;
Guid parentId = item.ParentId;
{
auto& profileEvent = NetworkInternal::ProfilerEvents[e.Name];
profileEvent.Count++;
profileEvent.DataSize += dataSize;
profileEvent.MessageSize += messageSize;
profileEvent.Receivers += receivers;
// Remap local client object ids into server ids
IdsRemappingTable.KeyOf(objectId, &objectId);
IdsRemappingTable.KeyOf(parentId, &parentId);
}
#endif
NetworkMessage msg = peer->BeginSendMessage();
msg.WriteStructure(msgData);
msg.WriteNetworkId(objectId);
msg.WriteNetworkId(parentId);
msg.WriteNetworkName(obj->GetType().Fullname);
msg.WriteNetworkName(e.Name.First.GetType().Fullname);
msg.WriteNetworkName(e.Name.Second);
NetworkChannelType channel = (NetworkChannelType)e.Info.Channel;
SendInParts(peer, channel, e.ArgsData.Get(), e.ArgsData.Length(), msg, e.Name, toServer, objectId, msgData.OwnerFrame, NetworkMessageIDs::ObjectRpcPart);
}
void DeleteNetworkObject(ScriptingObject* obj)
@@ -929,38 +935,43 @@ FORCE_INLINE void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject
Hierarchy->DirtyObject(obj);
}
ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId)
PartsItem* AddPartsItem(Array<PartsItem>& items, NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId)
{
// Reuse or add part item
ReplicateItem* replicateItem = nullptr;
for (auto& e : ReplicationParts)
PartsItem* item = nullptr;
for (auto& e : items)
{
if (e.OwnerFrame == ownerFrame && e.Data.Count() == dataSize && e.ObjectId == objectId)
{
// Reuse
replicateItem = &e;
item = &e;
break;
}
}
if (!replicateItem)
if (!item)
{
// Add
replicateItem = &ReplicationParts.AddOne();
replicateItem->ObjectId = objectId;
replicateItem->PartsLeft = partsCount;
replicateItem->OwnerFrame = ownerFrame;
replicateItem->OwnerClientId = senderClientId;
replicateItem->Data.Resize(dataSize);
item = &items.AddOne();
item->ObjectId = objectId;
item->PartsLeft = partsCount;
item->OwnerFrame = ownerFrame;
item->OwnerClientId = senderClientId;
item->Data.Resize(dataSize);
}
// Copy part data
ASSERT(replicateItem->PartsLeft > 0);
replicateItem->PartsLeft--;
ASSERT(partStart + partSize <= replicateItem->Data.Count());
ASSERT(item->PartsLeft > 0);
item->PartsLeft--;
ASSERT(partStart + partSize <= item->Data.Count());
const void* partData = event.Message.SkipBytes(partSize);
Platform::MemoryCopy(replicateItem->Data.Get() + partStart, partData, partSize);
Platform::MemoryCopy(item->Data.Get() + partStart, partData, partSize);
return replicateItem;
return item;
}
FORCE_INLINE PartsItem* AddObjectReplicateItem(NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId)
{
return AddPartsItem(ReplicationParts, event, ownerFrame, partsCount, dataSize, objectId, partStart, partSize, senderClientId);
}
void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize, uint32 senderClientId)
@@ -1008,6 +1019,24 @@ void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, b
DirtyObjectImpl(item, obj);
}
FORCE_INLINE PartsItem* AddObjectRpcItem(NetworkEvent& event, uint32 ownerFrame, uint16 partsCount, uint16 dataSize, const Guid& objectId, uint16 partStart, uint16 partSize, uint32 senderClientId)
{
return AddPartsItem(RpcParts, event, ownerFrame, partsCount, dataSize, objectId, partStart, partSize, senderClientId);
}
void InvokeObjectRpc(const NetworkRpcInfo* info, byte* data, uint32 dataSize, uint32 senderClientId, ScriptingObject* obj)
{
// Setup message reading stream
if (CachedReadStream == nullptr)
CachedReadStream = New<NetworkStream>();
NetworkStream* stream = CachedReadStream;
stream->SenderId = senderClientId;
stream->Initialize(data, dataSize);
// Execute RPC
info->Execute(obj, stream, info->Tag);
}
void InvokeObjectSpawn(const NetworkMessageObjectSpawn& msgData, const Guid& prefabId, const NetworkMessageObjectSpawnItem* msgDataItems)
{
ScopeLock lock(ObjectsLock);
@@ -2033,6 +2062,7 @@ void NetworkInternal::NetworkReplicatorUpdate()
}
}
// TODO: remove items from RpcParts after some TTL to reduce memory usage
// TODO: remove items from SpawnParts after some TTL to reduce memory usage
// Replicate all owned networked objects with other clients or server
@@ -2100,7 +2130,7 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo
{
PROFILE_CPU();
NetworkMessageObjectReplicate msgData;
NetworkMessageObjectReplicatePayload msgDataPayload;
NetworkMessageObjectPartPayload msgDataPayload;
Guid objectId, parentId;
StringAnsiView objectTypeName;
event.Message.ReadStructure(msgData);
@@ -2110,7 +2140,7 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo
event.Message.ReadStructure(msgDataPayload);
ScopeLock lock(ObjectsLock);
if (DespawnedObjects.Contains(objectId))
return; // Skip replicating not-existing objects
return; // Skip replicating non-existing objects
NetworkReplicatedObject* e = ResolveObject(objectId, parentId, objectTypeName);
if (!e)
return;
@@ -2129,7 +2159,7 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo
else
{
// Add to replication from multiple parts
ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData.OwnerFrame, msgDataPayload.PartsCount, msgDataPayload.DataSize, objectId, 0, msgDataPayload.PartSize, senderClientId);
PartsItem* replicateItem = AddObjectReplicateItem(event, msgData.OwnerFrame, msgDataPayload.PartsCount, msgDataPayload.DataSize, objectId, 0, msgDataPayload.PartSize, senderClientId);
replicateItem->Object = e->Object;
}
}
@@ -2137,13 +2167,13 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo
void NetworkInternal::OnNetworkMessageObjectReplicatePart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
{
PROFILE_CPU();
NetworkMessageObjectReplicatePart msgData;
NetworkMessageObjectPart msgData;
Guid objectId;
event.Message.ReadStructure(msgData);
event.Message.ReadNetworkId(objectId);
ScopeLock lock(ObjectsLock);
if (DespawnedObjects.Contains(objectId))
return; // Skip replicating not-existing objects
return; // Skip replicating non-existing objects
const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId;
AddObjectReplicateItem(event, msgData.OwnerFrame, msgData.PartsCount, msgData.DataSize, objectId, msgData.PartStart, msgData.PartSize, senderClientId);
@@ -2303,14 +2333,16 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie
{
PROFILE_CPU();
NetworkMessageObjectRpc msgData;
Guid msgObjectId, msgParentId;
NetworkMessageObjectPartPayload msgDataPayload;
Guid objectId, parentId;
StringAnsiView objectTypeName, rpcTypeName, rpcName;
event.Message.ReadStructure(msgData);
event.Message.ReadNetworkId(msgObjectId);
event.Message.ReadNetworkId(msgParentId);
event.Message.ReadNetworkId(objectId);
event.Message.ReadNetworkId(parentId);
event.Message.ReadNetworkName(objectTypeName);
event.Message.ReadNetworkName(rpcTypeName);
event.Message.ReadNetworkName(rpcName);
event.Message.ReadStructure(msgDataPayload);
ScopeLock lock(ObjectsLock);
// Find RPC info
@@ -2320,11 +2352,11 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie
const NetworkRpcInfo* info = NetworkRpcInfo::RPCsTable.TryGet(name);
if (!info)
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown RPC {}::{} for object {}", String(rpcTypeName), String(rpcName), msgObjectId);
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown RPC {}::{} for object {}", String(rpcTypeName), String(rpcName), objectId);
return;
}
NetworkReplicatedObject* e = ResolveObject(msgObjectId, msgParentId, objectTypeName);
NetworkReplicatedObject* e = ResolveObject(objectId, parentId, objectTypeName);
if (e)
{
auto& item = *e;
@@ -2344,18 +2376,50 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie
return;
}
// Setup message reading stream
if (CachedReadStream == nullptr)
CachedReadStream = New<NetworkStream>();
NetworkStream* stream = CachedReadStream;
stream->SenderId = client ? client->ClientId : NetworkManager::ServerClientId;
stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.ArgsSize);
// Execute RPC
info->Execute(obj, stream, info->Tag);
const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId;
if (msgDataPayload.PartsCount == 1)
{
// Call RPC
InvokeObjectRpc(info, event.Message.Buffer + event.Message.Position, msgDataPayload.DataSize, senderClientId, obj);
}
else
{
// Add to RPC from multiple parts
PartsItem* rpcItem = AddObjectRpcItem(event, msgData.OwnerFrame, msgDataPayload.PartsCount, msgDataPayload.DataSize, objectId, 0, msgDataPayload.PartSize, senderClientId);
rpcItem->Object = e->Object;
rpcItem->Tag = info;
}
}
else if (info->Channel != static_cast<uint8>(NetworkChannelType::Unreliable) && info->Channel != static_cast<uint8>(NetworkChannelType::UnreliableOrdered))
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object {} RPC {}::{}", msgObjectId, String(rpcTypeName), String(rpcName));
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object {} RPC {}::{}", objectId, String(rpcTypeName), String(rpcName));
}
}
void NetworkInternal::OnNetworkMessageObjectRpcPart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
{
PROFILE_CPU();
NetworkMessageObjectPart msgData;
Guid objectId;
event.Message.ReadStructure(msgData);
event.Message.ReadNetworkId(objectId);
ScopeLock lock(ObjectsLock);
if (DespawnedObjects.Contains(objectId))
return; // Skip replicating non-existing objects
const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId;
PartsItem* rpcItem = AddObjectRpcItem(event, msgData.OwnerFrame, msgData.PartsCount, msgData.DataSize, objectId, msgData.PartStart, msgData.PartSize, senderClientId);
if (rpcItem && rpcItem->PartsLeft == 0)
{
// Got all parts so invoke RPC
ScriptingObject* obj = rpcItem->Object.Get();
if (obj)
{
InvokeObjectRpc((const NetworkRpcInfo*)rpcItem->Tag, rpcItem->Data.Get(), rpcItem->Data.Count(), rpcItem->OwnerClientId, obj);
}
// Remove item
int32 partIndex = (int32)((RpcParts.Get() - rpcItem) / sizeof(rpcItem));
RpcParts.RemoveAt(partIndex);
}
}