diff --git a/Source/Engine/Networking/NetworkInternal.h b/Source/Engine/Networking/NetworkInternal.h
index d063a0ca3..dd01210a1 100644
--- a/Source/Engine/Networking/NetworkInternal.h
+++ b/Source/Engine/Networking/NetworkInternal.h
@@ -10,6 +10,7 @@ enum class NetworkMessageIDs : uint8
Handshake,
HandshakeReply,
ObjectReplicate,
+ ObjectReplicatePart,
ObjectSpawn,
ObjectDespawn,
ObjectRole,
@@ -27,6 +28,7 @@ public:
static void NetworkReplicatorPreUpdate();
static void NetworkReplicatorUpdate();
static void OnNetworkMessageObjectReplicate(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
+ static void OnNetworkMessageObjectReplicatePart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectDespawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectRole(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
diff --git a/Source/Engine/Networking/NetworkManager.cpp b/Source/Engine/Networking/NetworkManager.cpp
index 72634230a..ef5060e73 100644
--- a/Source/Engine/Networking/NetworkManager.cpp
+++ b/Source/Engine/Networking/NetworkManager.cpp
@@ -129,6 +129,7 @@ namespace
OnNetworkMessageHandshake,
OnNetworkMessageHandshakeReply,
NetworkInternal::OnNetworkMessageObjectReplicate,
+ NetworkInternal::OnNetworkMessageObjectReplicatePart,
NetworkInternal::OnNetworkMessageObjectSpawn,
NetworkInternal::OnNetworkMessageObjectDespawn,
NetworkInternal::OnNetworkMessageObjectRole,
diff --git a/Source/Engine/Networking/NetworkMessage.h b/Source/Engine/Networking/NetworkMessage.h
index 4d0bc4330..fa6119e89 100644
--- a/Source/Engine/Networking/NetworkMessage.h
+++ b/Source/Engine/Networking/NetworkMessage.h
@@ -69,7 +69,7 @@ public:
/// The amount of bytes to write from the bytes pointer.
FORCE_INLINE void WriteBytes(uint8* bytes, const int numBytes)
{
- ASSERT(Position + numBytes < BufferSize);
+ ASSERT(Position + numBytes <= BufferSize);
Platform::MemoryCopy(Buffer + Position, bytes, numBytes);
Position += numBytes;
Length = Position;
@@ -85,7 +85,7 @@ public:
/// The minimal amount of bytes that the buffer contains.
FORCE_INLINE void ReadBytes(uint8* bytes, const int32 numBytes)
{
- ASSERT(Position + numBytes < BufferSize);
+ ASSERT(Position + numBytes <= BufferSize);
Platform::MemoryCopy(bytes, Buffer + Position, numBytes);
Position += numBytes;
}
@@ -97,7 +97,7 @@ public:
/// Pointer to skipped data beginning.
FORCE_INLINE void* SkipBytes(const int32 numBytes)
{
- ASSERT(Position + numBytes < BufferSize);
+ ASSERT(Position + numBytes <= BufferSize);
byte* result = Buffer + Position;
Position += numBytes;
return result;
diff --git a/Source/Engine/Networking/NetworkReplicator.cpp b/Source/Engine/Networking/NetworkReplicator.cpp
index ad1ffebe5..0b3202b60 100644
--- a/Source/Engine/Networking/NetworkReplicator.cpp
+++ b/Source/Engine/Networking/NetworkReplicator.cpp
@@ -12,7 +12,6 @@
#include "NetworkRpc.h"
#include "INetworkSerializable.h"
#include "INetworkObject.h"
-#include "Engine/Core/Log.h"
#include "Engine/Core/Collections/HashSet.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Core/Collections/ChunkedArray.h"
@@ -34,6 +33,7 @@
#define NETWORK_REPLICATOR_DEBUG_LOG 0
#if NETWORK_REPLICATOR_DEBUG_LOG
+#include "Engine/Core/Log.h"
#define NETWORK_REPLICATOR_LOG(messageType, format, ...) LOG(messageType, format, ##__VA_ARGS__)
#else
#define NETWORK_REPLICATOR_LOG(messageType, format, ...)
@@ -47,6 +47,18 @@ PACK_STRUCT(struct NetworkMessageObjectReplicate
Guid ParentId;
char ObjectTypeName[128]; // TODO: introduce networked-name to synchronize unique names as ushort (less data over network)
uint16 DataSize;
+ uint16 PartsCount;
+ });
+
+PACK_STRUCT(struct NetworkMessageObjectReplicatePart
+ {
+ NetworkMessageIDs ID = NetworkMessageIDs::ObjectReplicatePart;
+ uint32 OwnerFrame;
+ uint16 DataSize;
+ uint16 PartsCount;
+ uint16 PartStart;
+ uint16 PartSize;
+ Guid ObjectId; // TODO: introduce networked-ids to synchronize unique ids as ushort (less data over network)
});
PACK_STRUCT(struct NetworkMessageObjectSpawn
@@ -131,6 +143,15 @@ struct Serializer
void* Tags[2];
};
+struct ReplicateItem
+{
+ ScriptingObjectReference Object;
+ Guid ObjectId;
+ uint16 PartsLeft;
+ uint32 OwnerFrame;
+ Array Data;
+};
+
struct SpawnItem
{
ScriptingObjectReference Object;
@@ -164,6 +185,7 @@ namespace
{
CriticalSection ObjectsLock;
HashSet Objects;
+ Array ReplicationParts;
Array SpawnQueue;
Array DespawnQueue;
Array RpcQueue;
@@ -462,6 +484,81 @@ void SetupObjectSpawnGroupItem(ScriptingObject* obj, ArrayItems.Add(&spawnItem);
}
+void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject* obj)
+{
+ // TODO: implement objects state replication frequency and dirtying
+}
+
+template
+ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& msgData, uint16 partStart, uint16 partSize)
+{
+ // Reuse or add part item
+ ReplicateItem* replicateItem = nullptr;
+ for (auto& e : ReplicationParts)
+ {
+ if (e.OwnerFrame == msgData.OwnerFrame && e.Data.Count() == msgData.DataSize && e.ObjectId == msgData.ObjectId)
+ {
+ // Reuse
+ replicateItem = &e;
+ break;
+ }
+ }
+ if (!replicateItem)
+ {
+ // Add
+ replicateItem = &ReplicationParts.AddOne();
+ replicateItem->ObjectId = msgData.ObjectId;
+ replicateItem->PartsLeft = msgData.PartsCount;
+ replicateItem->OwnerFrame = msgData.OwnerFrame;
+ replicateItem->Data.Resize(msgData.DataSize);
+ }
+
+ // Copy part data
+ ASSERT(replicateItem->PartsLeft > 0);
+ replicateItem->PartsLeft--;
+ ASSERT(partStart + partSize <= replicateItem->Data.Count());
+ const void* partData = event.Message.SkipBytes(partSize);
+ Platform::MemoryCopy(replicateItem->Data.Get() + partStart, partData, partSize);
+
+ return replicateItem;
+}
+
+void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize)
+{
+ ScriptingObject* obj = item.Object.Get();
+ if (!obj)
+ return;
+
+ // Skip replication if we own the object (eg. late replication message after ownership change)
+ if (item.Role == NetworkObjectRole::OwnedAuthoritative)
+ return;
+
+ // Drop object replication if it has old data (eg. newer message was already processed due to unordered channel usage)
+ if (item.LastOwnerFrame >= ownerFrame)
+ return;
+ item.LastOwnerFrame = ownerFrame;
+
+ // Setup message reading stream
+ if (CachedReadStream == nullptr)
+ CachedReadStream = New();
+ NetworkStream* stream = CachedReadStream;
+ stream->Initialize(data, dataSize);
+
+ // Deserialize object
+ const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, false);
+ if (failed)
+ {
+ //NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot serialize object {} of type {} (missing serialization logic)", item.ToString(), obj->GetType().ToString());
+ }
+
+ if (item.AsNetworkObject)
+ item.AsNetworkObject->OnNetworkDeserialize();
+
+ // Speed up replication of client-owned objects to other clients from server to reduce lag (data has to go from client to server and then to other clients)
+ if (NetworkManager::IsServer())
+ DirtyObjectImpl(item, obj);
+}
+
#if !COMPILE_WITHOUT_CSHARP
#include "Engine/Scripting/ManagedCLR/MUtils.h"
@@ -757,11 +854,6 @@ void NetworkReplicator::SetObjectOwnership(ScriptingObject* obj, uint32 ownerCli
}
}
-void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject* obj)
-{
- // TODO: implement objects state replication frequency and dirtying
-}
-
void NetworkReplicator::DirtyObject(ScriptingObject* obj)
{
ScopeLock lock(ObjectsLock);
@@ -853,6 +945,7 @@ void NetworkInternal::NetworkReplicatorClear()
Objects.Remove(it);
}
}
+ RpcQueue.Clear();
SpawnQueue.Clear();
DespawnQueue.Clear();
IdsRemappingTable.Clear();
@@ -1021,6 +1114,31 @@ void NetworkInternal::NetworkReplicatorUpdate()
SpawnQueue.Clear();
}
+ // Apply parts replication
+ for (int32 i = ReplicationParts.Count() - 1; i >= 0; i--)
+ {
+ auto& e = ReplicationParts[i];
+ if (e.PartsLeft > 0)
+ {
+ // TODO: remove replication items after some TTL to prevent memory leaks
+ continue;
+ }
+ ScriptingObject* obj = e.Object.Get();
+ if (obj)
+ {
+ auto it = Objects.Find(obj->GetID());
+ if (it != Objects.End())
+ {
+ auto& item = it->Item;
+
+ // Replicate from all collected parts data
+ InvokeObjectReplication(item, e.OwnerFrame, e.Data.Get(), e.Data.Count());
+ }
+ }
+
+ ReplicationParts.RemoveAt(i);
+ }
+
// Brute force synchronize all networked objects with clients
// TODO: introduce NetworkReplicationHierarchy to optimize objects replication in large worlds (eg. batched culling networked scene objects that are too far from certain client to be relevant)
// TODO: per-object sync interval (in frames) - could be scaled by hierarchy (eg. game could slow down sync rate for objects far from player)
@@ -1066,10 +1184,27 @@ void NetworkInternal::NetworkReplicatorUpdate()
}
GetNetworkName(msgData.ObjectTypeName, obj->GetType().Fullname);
msgData.DataSize = size;
- // TODO: split object data (eg. more messages) if needed
+ const uint32 msgMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicate);
+ const uint32 partMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicatePart);
+ uint32 partsCount = 1;
+ uint32 dataStart = 0;
+ uint32 msgDataSize = size;
+ if (size > msgMaxData)
+ {
+ // Send msgMaxData within first message
+ msgDataSize = msgMaxData;
+ dataStart += msgMaxData;
+
+ // Send rest of the data in separate parts
+ partsCount += Math::DivideAndRoundUp(size - dataStart, partMaxData);
+ }
+ else
+ dataStart += size;
+ ASSERT(partsCount <= MAX_uint8)
+ msgData.PartsCount = partsCount;
NetworkMessage msg = peer->BeginSendMessage();
msg.WriteStructure(msgData);
- msg.WriteBytes(stream->GetBuffer(), size);
+ msg.WriteBytes(stream->GetBuffer(), msgDataSize);
if (isClient)
peer->EndSendMessage(NetworkChannelType::Unreliable, msg);
else
@@ -1079,6 +1214,27 @@ void NetworkInternal::NetworkReplicatorUpdate()
peer->EndSendMessage(NetworkChannelType::Unreliable, msg, CachedTargets);
}
+ // Send all other parts
+ for (uint32 partIndex = 1; partIndex < partsCount; partIndex++)
+ {
+ NetworkMessageObjectReplicatePart msgDataPart;
+ msgDataPart.OwnerFrame = msgData.OwnerFrame;
+ msgDataPart.ObjectId = msgData.ObjectId;
+ msgDataPart.DataSize = msgData.DataSize;
+ msgDataPart.PartsCount = msgData.PartsCount;
+ msgDataPart.PartStart = dataStart;
+ msgDataPart.PartSize = Math::Min(size - dataStart, partMaxData);
+ msg = peer->BeginSendMessage();
+ msg.WriteStructure(msgDataPart);
+ msg.WriteBytes(stream->GetBuffer() + msgDataPart.PartStart, msgDataPart.PartSize);
+ dataStart += msgDataPart.PartSize;
+ if (isClient)
+ peer->EndSendMessage(NetworkChannelType::Unreliable, msg);
+ else
+ peer->EndSendMessage(NetworkChannelType::Unreliable, msg, CachedTargets);
+ }
+ ASSERT_LOW_LAYER(dataStart == size);
+
// TODO: stats for bytes send per object type
}
}
@@ -1136,52 +1292,39 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo
if (DespawnedObjects.Contains(msgData.ObjectId))
return; // Skip replicating not-existing objects
NetworkReplicatedObject* e = ResolveObject(msgData.ObjectId, msgData.ParentId, msgData.ObjectTypeName);
- if (e)
+ if (!e)
+ return;
+ auto& item = *e;
+
+ // Reject event from someone who is not an object owner
+ if (client && item.OwnerClientId != client->ClientId)
+ return;
+
+ if (msgData.PartsCount == 1)
{
- auto& item = *e;
- ScriptingObject* obj = item.Object.Get();
- if (!obj)
- return;
-
- // Reject event from someone who is not an object owner
- if (client && item.OwnerClientId != client->ClientId)
- return;
-
- // Skip replication if we own the object (eg. late replication message after ownership change)
- if (item.Role == NetworkObjectRole::OwnedAuthoritative)
- return;
-
- // Drop object replication if it has old data (eg. newer message was already processed due to unordered channel usage)
- if (item.LastOwnerFrame >= msgData.OwnerFrame)
- return;
- item.LastOwnerFrame = msgData.OwnerFrame;
-
- // Setup message reading stream
- if (CachedReadStream == nullptr)
- CachedReadStream = New();
- NetworkStream* stream = CachedReadStream;
- stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.DataSize);
-
- // Deserialize object
- const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, false);
- if (failed)
- {
- //NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot serialize object {} of type {} (missing serialization logic)", item.ToString(), obj->GetType().ToString());
- }
-
- if (item.AsNetworkObject)
- item.AsNetworkObject->OnNetworkDeserialize();
-
- // Speed up replication of client-owned objects to other clients from server to reduce lag (data has to go from client to server and then to other clients)
- if (NetworkManager::IsServer())
- DirtyObjectImpl(item, obj);
+ // Replicate
+ InvokeObjectReplication(item, msgData.OwnerFrame, event.Message.Buffer + event.Message.Position, msgData.DataSize);
}
else
{
- // TODO: put message to the queue to be resolved later (eg. object replication came before spawn packet) - use TTL to prevent memory overgrowing
+ // Add to replication from multiple parts
+ const uint16 msgMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicate);
+ ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData, 0, msgMaxData);
+ replicateItem->Object = e->Object;
}
}
+void NetworkInternal::OnNetworkMessageObjectReplicatePart(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
+{
+ NetworkMessageObjectReplicatePart msgData;
+ event.Message.ReadStructure(msgData);
+ ScopeLock lock(ObjectsLock);
+ if (DespawnedObjects.Contains(msgData.ObjectId))
+ return; // Skip replicating not-existing objects
+
+ AddObjectReplicateItem(event, msgData, msgData.PartStart, msgData.PartSize);
+}
+
void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
{
NetworkMessageObjectSpawn msgData;
@@ -1341,10 +1484,7 @@ void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkCl
auto sceneObject = ScriptingObject::Cast(obj);
if (sceneObject)
{
- if (parent && parent
- ->
- Object.Get() && parent->Object->Is()
- )
+ if (parent && parent->Object.Get() && parent->Object->Is())
sceneObject->SetParent(parent->Object.As());
else if (auto* parentActor = Scripting::TryFindObject(msgDataItem.ParentId))
sceneObject->SetParent(parentActor);