Add NetworkStream::SenderId to detect message sender during object replication or RPC code

This commit is contained in:
Wojtek Figat
2023-04-14 14:25:18 +02:00
parent 4bdeb26e74
commit 62fdfe2519
3 changed files with 22 additions and 10 deletions

View File

@@ -156,7 +156,6 @@ void NetworkTransform::Serialize(NetworkStream* stream)
transform = Transform::Identity; transform = Transform::Identity;
// Encode data // Encode data
const NetworkObjectRole role = NetworkReplicator::GetObjectRole(this);
Data data; Data data;
data.LocalSpace = LocalSpace; data.LocalSpace = LocalSpace;
data.HasSequenceIndex = Mode == ReplicationModes::Prediction; data.HasSequenceIndex = Mode == ReplicationModes::Prediction;

View File

@@ -147,6 +147,7 @@ struct ReplicateItem
Guid ObjectId; Guid ObjectId;
uint16 PartsLeft; uint16 PartsLeft;
uint32 OwnerFrame; uint32 OwnerFrame;
uint32 OwnerClientId;
Array<byte> Data; Array<byte> Data;
}; };
@@ -488,7 +489,7 @@ void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject* obj)
} }
template<typename MessageType> template<typename MessageType>
ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& msgData, uint16 partStart, uint16 partSize) ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& msgData, uint16 partStart, uint16 partSize, uint32 senderClientId)
{ {
// Reuse or add part item // Reuse or add part item
ReplicateItem* replicateItem = nullptr; ReplicateItem* replicateItem = nullptr;
@@ -508,6 +509,7 @@ ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& ms
replicateItem->ObjectId = msgData.ObjectId; replicateItem->ObjectId = msgData.ObjectId;
replicateItem->PartsLeft = msgData.PartsCount; replicateItem->PartsLeft = msgData.PartsCount;
replicateItem->OwnerFrame = msgData.OwnerFrame; replicateItem->OwnerFrame = msgData.OwnerFrame;
replicateItem->OwnerClientId = senderClientId;
replicateItem->Data.Resize(msgData.DataSize); replicateItem->Data.Resize(msgData.DataSize);
} }
@@ -521,7 +523,7 @@ ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& ms
return replicateItem; return replicateItem;
} }
void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize) void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize, uint32 senderClientId)
{ {
ScriptingObject* obj = item.Object.Get(); ScriptingObject* obj = item.Object.Get();
if (!obj) if (!obj)
@@ -541,6 +543,7 @@ void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, b
CachedReadStream = New<NetworkStream>(); CachedReadStream = New<NetworkStream>();
NetworkStream* stream = CachedReadStream; NetworkStream* stream = CachedReadStream;
stream->Initialize(data, dataSize); stream->Initialize(data, dataSize);
stream->SenderId = senderClientId;
// Deserialize object // Deserialize object
const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, false); const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, false);
@@ -879,6 +882,7 @@ NetworkStream* NetworkReplicator::BeginInvokeRPC()
if (CachedWriteStream == nullptr) if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>(); CachedWriteStream = New<NetworkStream>();
CachedWriteStream->Initialize(); CachedWriteStream->Initialize();
CachedWriteStream->SenderId = NetworkManager::LocalClientId;
return CachedWriteStream; return CachedWriteStream;
} }
@@ -981,12 +985,9 @@ void NetworkInternal::NetworkReplicatorUpdate()
ScopeLock lock(ObjectsLock); ScopeLock lock(ObjectsLock);
if (Objects.Count() == 0) if (Objects.Count() == 0)
return; return;
if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>();
const bool isClient = NetworkManager::IsClient(); const bool isClient = NetworkManager::IsClient();
const bool isServer = NetworkManager::IsServer(); const bool isServer = NetworkManager::IsServer();
const bool isHost = NetworkManager::IsHost(); const bool isHost = NetworkManager::IsHost();
NetworkStream* stream = CachedWriteStream;
NetworkPeer* peer = NetworkManager::Peer; NetworkPeer* peer = NetworkManager::Peer;
if (!isClient && NewClients.Count() != 0) if (!isClient && NewClients.Count() != 0)
@@ -1137,7 +1138,7 @@ void NetworkInternal::NetworkReplicatorUpdate()
auto& item = it->Item; auto& item = it->Item;
// Replicate from all collected parts data // Replicate from all collected parts data
InvokeObjectReplication(item, e.OwnerFrame, e.Data.Get(), e.Data.Count()); InvokeObjectReplication(item, e.OwnerFrame, e.Data.Get(), e.Data.Count(), e.OwnerClientId);
} }
} }
@@ -1145,6 +1146,10 @@ void NetworkInternal::NetworkReplicatorUpdate()
} }
// Brute force synchronize all networked objects with clients // Brute force synchronize all networked objects with clients
if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>();
NetworkStream* stream = CachedWriteStream;
stream->SenderId = NetworkManager::LocalClientId;
// TODO: introduce NetworkReplicationHierarchy to optimize objects replication in large worlds (eg. batched culling networked scene objects that are too far from certain client to be relevant) // TODO: introduce NetworkReplicationHierarchy to optimize objects replication in large worlds (eg. batched culling networked scene objects that are too far from certain client to be relevant)
// TODO: per-object sync interval (in frames) - could be scaled by hierarchy (eg. game could slow down sync rate for objects far from player) // TODO: per-object sync interval (in frames) - could be scaled by hierarchy (eg. game could slow down sync rate for objects far from player)
for (auto it = Objects.Begin(); it.IsNotEnd(); ++it) for (auto it = Objects.Begin(); it.IsNotEnd(); ++it)
@@ -1305,16 +1310,17 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo
if (client && item.OwnerClientId != client->ClientId) if (client && item.OwnerClientId != client->ClientId)
return; return;
const uint32 senderClientId = client ? client->ClientId : NetworkManager::LocalClientId;
if (msgData.PartsCount == 1) if (msgData.PartsCount == 1)
{ {
// Replicate // Replicate
InvokeObjectReplication(item, msgData.OwnerFrame, event.Message.Buffer + event.Message.Position, msgData.DataSize); InvokeObjectReplication(item, msgData.OwnerFrame, event.Message.Buffer + event.Message.Position, msgData.DataSize, senderClientId);
} }
else else
{ {
// Add to replication from multiple parts // Add to replication from multiple parts
const uint16 msgMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicate); const uint16 msgMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicate);
ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData, 0, msgMaxData); ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData, 0, msgMaxData, senderClientId);
replicateItem->Object = e->Object; replicateItem->Object = e->Object;
} }
} }
@@ -1327,7 +1333,8 @@ void NetworkInternal::OnNetworkMessageObjectReplicatePart(NetworkEvent& event, N
if (DespawnedObjects.Contains(msgData.ObjectId)) if (DespawnedObjects.Contains(msgData.ObjectId))
return; // Skip replicating not-existing objects return; // Skip replicating not-existing objects
AddObjectReplicateItem(event, msgData, msgData.PartStart, msgData.PartSize); const uint32 senderClientId = client ? client->ClientId : NetworkManager::LocalClientId;
AddObjectReplicateItem(event, msgData, msgData.PartStart, msgData.PartSize, senderClientId);
} }
void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer) void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
@@ -1620,6 +1627,7 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie
if (CachedReadStream == nullptr) if (CachedReadStream == nullptr)
CachedReadStream = New<NetworkStream>(); CachedReadStream = New<NetworkStream>();
NetworkStream* stream = CachedReadStream; NetworkStream* stream = CachedReadStream;
stream->SenderId = client ? client->ClientId : NetworkManager::LocalClientId;
stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.ArgsSize); stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.ArgsSize);
// Execute RPC // Execute RPC

View File

@@ -23,6 +23,11 @@ private:
public: public:
~NetworkStream(); ~NetworkStream();
/// <summary>
/// The ClientId of the network client that is a data sender. Can be used to detect who send the incoming RPC or replication data. Set to the current client when writing data.
/// </summary>
API_FIELD(ReadOnly) uint32 SenderId = 0;
/// <summary> /// <summary>
/// Gets the pointer to the native stream memory buffer. /// Gets the pointer to the native stream memory buffer.
/// </summary> /// </summary>