Merge remote-tracking branch 'origin/master' into 1.6

This commit is contained in:
Wojtek Figat
2023-04-17 13:18:08 +02:00
39 changed files with 954 additions and 362 deletions

View File

@@ -156,7 +156,6 @@ void NetworkTransform::Serialize(NetworkStream* stream)
transform = Transform::Identity;
// Encode data
const NetworkObjectRole role = NetworkReplicator::GetObjectRole(this);
Data data;
data.LocalSpace = LocalSpace;
data.HasSequenceIndex = Mode == ReplicationModes::Prediction;

View File

@@ -236,6 +236,16 @@ NetworkClient* NetworkManager::GetClient(const NetworkConnection& connection)
return nullptr;
}
NetworkClient* NetworkManager::GetClient(uint32 clientId)
{
for (NetworkClient* client : Clients)
{
if (client->ClientId == clientId)
return client;
}
return nullptr;
}
bool NetworkManager::StartServer()
{
PROFILE_CPU();

View File

@@ -153,6 +153,13 @@ public:
/// <returns>Found client or null.</returns>
API_FUNCTION() static NetworkClient* GetClient(API_PARAM(Ref) const NetworkConnection& connection);
/// <summary>
/// Gets the network client with a given identifier. Returns null if failed to find it.
/// </summary>
/// <param name="clientId">Network client identifier (synchronized on all peers).</param>
/// <returns>Found client or null.</returns>
API_FUNCTION() static NetworkClient* GetClient(uint32 clientId);
public:
/// <summary>
/// Starts the network in server mode. Returns true if failed (eg. invalid config).

View File

@@ -29,12 +29,12 @@
#include "Engine/Threading/Threading.h"
#include "Engine/Threading/ThreadLocal.h"
// Enables verbose logging for Network Replicator actions (dev-only)
#define NETWORK_REPLICATOR_DEBUG_LOG 0
#if NETWORK_REPLICATOR_DEBUG_LOG
#if !BUILD_RELEASE
bool NetworkReplicator::EnableLog = false;
#include "Engine/Core/Log.h"
#define NETWORK_REPLICATOR_LOG(messageType, format, ...) LOG(messageType, format, ##__VA_ARGS__)
#include "Engine/Content/Content.h"
#define NETWORK_REPLICATOR_LOG(messageType, format, ...) if (NetworkReplicator::EnableLog) { LOG(messageType, format, ##__VA_ARGS__); }
#define USE_NETWORK_REPLICATOR_LOG 1
#else
#define NETWORK_REPLICATOR_LOG(messageType, format, ...)
#endif
@@ -107,10 +107,15 @@ struct NetworkReplicatedObject
uint32 OwnerClientId;
uint32 LastOwnerFrame = 0;
NetworkObjectRole Role;
uint8 Spawned = false;
uint8 Spawned : 1;
DataContainer<uint32> TargetClientIds;
INetworkObject* AsNetworkObject;
NetworkReplicatedObject()
{
Spawned = 0;
}
bool operator==(const NetworkReplicatedObject& other) const
{
return Object == other.Object;
@@ -149,6 +154,7 @@ struct ReplicateItem
Guid ObjectId;
uint16 PartsLeft;
uint32 OwnerFrame;
uint32 OwnerClientId;
Array<byte> Data;
};
@@ -179,6 +185,7 @@ struct RpcItem
NetworkRpcName Name;
NetworkRpcInfo Info;
BytesContainer ArgsData;
DataContainer<uint32> Targets;
};
namespace
@@ -330,6 +337,46 @@ void BuildCachedTargets(const Array<NetworkClient*>& clients, const DataContaine
}
}
void BuildCachedTargets(const Array<NetworkClient*>& clients, const DataContainer<uint32>& clientIds1, const Span<uint32>& clientIds2, const uint32 excludedClientId = NetworkManager::ServerClientId)
{
CachedTargets.Clear();
if (clientIds1.IsValid())
{
if (clientIds2.IsValid())
{
for (const NetworkClient* client : clients)
{
if (client->State == NetworkConnectionState::Connected && client->ClientId != excludedClientId)
{
for (int32 i = 0; i < clientIds1.Length(); i++)
{
if (clientIds1[i] == client->ClientId)
{
for (int32 j = 0; j < clientIds2.Length(); j++)
{
if (clientIds2[j] == client->ClientId)
{
CachedTargets.Add(client->Connection);
break;
}
}
break;
}
}
}
}
}
else
{
BuildCachedTargets(clients, clientIds1, excludedClientId);
}
}
else
{
BuildCachedTargets(clients, clientIds2, excludedClientId);
}
}
FORCE_INLINE void BuildCachedTargets(const NetworkReplicatedObject& item)
{
// By default send object to all connected clients excluding the owner but with optional TargetClientIds list
@@ -484,13 +531,43 @@ void SetupObjectSpawnGroupItem(ScriptingObject* obj, Array<SpawnGroup, InlinedAl
group->Items.Add(&spawnItem);
}
void FindObjectsForSpawn(SpawnGroup& group, ChunkedArray<SpawnItem, 256>& spawnItems, ScriptingObject* obj)
{
// Add any registered network objects
auto it = Objects.Find(obj->GetID());
if (it != Objects.End())
{
auto& item = it->Item;
if (!item.Spawned)
{
// One of the parents of this object is being spawned so spawn it too
item.Spawned = true;
auto& spawnItem = spawnItems.AddOne();
spawnItem.Object = obj;
spawnItem.Targets.Link(item.TargetClientIds);
spawnItem.OwnerClientId = item.OwnerClientId;
spawnItem.Role = item.Role;
group.Items.Add(&spawnItem);
}
}
// Iterate over children
if (auto* actor = ScriptingObject::Cast<Actor>(obj))
{
for (auto* script : actor->Scripts)
FindObjectsForSpawn(group, spawnItems, script);
for (auto* child : actor->Children)
FindObjectsForSpawn(group, spawnItems, child);
}
}
void DirtyObjectImpl(NetworkReplicatedObject& item, ScriptingObject* obj)
{
// TODO: implement objects state replication frequency and dirtying
}
template<typename MessageType>
ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& msgData, uint16 partStart, uint16 partSize)
ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& msgData, uint16 partStart, uint16 partSize, uint32 senderClientId)
{
// Reuse or add part item
ReplicateItem* replicateItem = nullptr;
@@ -510,6 +587,7 @@ ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& ms
replicateItem->ObjectId = msgData.ObjectId;
replicateItem->PartsLeft = msgData.PartsCount;
replicateItem->OwnerFrame = msgData.OwnerFrame;
replicateItem->OwnerClientId = senderClientId;
replicateItem->Data.Resize(msgData.DataSize);
}
@@ -523,7 +601,7 @@ ReplicateItem* AddObjectReplicateItem(NetworkEvent& event, const MessageType& ms
return replicateItem;
}
void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize)
void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, byte* data, uint32 dataSize, uint32 senderClientId)
{
ScriptingObject* obj = item.Object.Get();
if (!obj)
@@ -543,6 +621,7 @@ void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, b
CachedReadStream = New<NetworkStream>();
NetworkStream* stream = CachedReadStream;
stream->Initialize(data, dataSize);
stream->SenderId = senderClientId;
// Deserialize object
const bool failed = NetworkReplicator::InvokeSerializer(obj->GetTypeHandle(), obj, stream, false);
@@ -559,6 +638,11 @@ void InvokeObjectReplication(NetworkReplicatedObject& item, uint32 ownerFrame, b
DirtyObjectImpl(item, obj);
}
NetworkRpcParams::NetworkRpcParams(const NetworkStream* stream)
: SenderId(stream->SenderId)
{
}
#if !COMPILE_WITHOUT_CSHARP
#include "Engine/Scripting/ManagedCLR/MUtils.h"
@@ -600,9 +684,9 @@ void NetworkReplicator::AddRPC(const ScriptingTypeHandle& typeHandle, const Stri
NetworkRpcInfo::RPCsTable[rpcName] = rpcInfo;
}
void NetworkReplicator::CSharpEndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream)
void NetworkReplicator::CSharpEndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream, MonoArray* targetIds)
{
EndInvokeRPC(obj, type, GetCSharpCachedName(name), argsStream);
EndInvokeRPC(obj, type, GetCSharpCachedName(name), argsStream, MUtils::ToSpan<uint32>(targetIds));
}
StringAnsiView NetworkReplicator::GetCSharpCachedName(const StringAnsiView& name)
@@ -881,10 +965,11 @@ NetworkStream* NetworkReplicator::BeginInvokeRPC()
if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>();
CachedWriteStream->Initialize();
CachedWriteStream->SenderId = NetworkManager::LocalClientId;
return CachedWriteStream;
}
void NetworkReplicator::EndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream)
void NetworkReplicator::EndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream, Span<uint32> targetIds)
{
const NetworkRpcInfo* info = NetworkRpcInfo::RPCsTable.TryGet(NetworkRpcName(type, name));
if (!info || !obj || NetworkManager::IsOffline())
@@ -895,8 +980,8 @@ void NetworkReplicator::EndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHa
rpc.Name.First = type;
rpc.Name.Second = name;
rpc.Info = *info;
const Span<byte> argsData(argsStream->GetBuffer(), argsStream->GetPosition());
rpc.ArgsData.Copy(argsData);
rpc.ArgsData.Copy(Span<byte>(argsStream->GetBuffer(), argsStream->GetPosition()));
rpc.Targets.Copy(targetIds);
#if USE_EDITOR || !BUILD_RELEASE
auto it = Objects.Find(obj->GetID());
if (it == Objects.End())
@@ -983,12 +1068,9 @@ void NetworkInternal::NetworkReplicatorUpdate()
ScopeLock lock(ObjectsLock);
if (Objects.Count() == 0)
return;
if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>();
const bool isClient = NetworkManager::IsClient();
const bool isServer = NetworkManager::IsServer();
const bool isHost = NetworkManager::IsHost();
NetworkStream* stream = CachedWriteStream;
NetworkPeer* peer = NetworkManager::Peer;
if (!isClient && NewClients.Count() != 0)
@@ -1114,9 +1196,16 @@ void NetworkInternal::NetworkReplicatorUpdate()
}
// Spawn groups of objects
ChunkedArray<SpawnItem, 256> spawnItems;
for (SpawnGroup& g : spawnGroups)
{
// Include any added objects within spawn group that were not spawned manually (eg. AddObject for script/actor attached to spawned actor)
ScriptingObject* groupRoot = g.Items[0]->Object.Get();
FindObjectsForSpawn(g, spawnItems, groupRoot);
SendObjectSpawnMessage(g, NetworkManager::Clients);
spawnItems.Clear();
}
SpawnQueue.Clear();
}
@@ -1139,7 +1228,7 @@ void NetworkInternal::NetworkReplicatorUpdate()
auto& item = it->Item;
// Replicate from all collected parts data
InvokeObjectReplication(item, e.OwnerFrame, e.Data.Get(), e.Data.Count());
InvokeObjectReplication(item, e.OwnerFrame, e.Data.Get(), e.Data.Count(), e.OwnerClientId);
}
}
@@ -1147,6 +1236,10 @@ void NetworkInternal::NetworkReplicatorUpdate()
}
// Brute force synchronize all networked objects with clients
if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>();
NetworkStream* stream = CachedWriteStream;
stream->SenderId = NetworkManager::LocalClientId;
// TODO: introduce NetworkReplicationHierarchy to optimize objects replication in large worlds (eg. batched culling networked scene objects that are too far from certain client to be relevant)
// TODO: per-object sync interval (in frames) - could be scaled by hierarchy (eg. game could slow down sync rate for objects far from player)
for (auto it = Objects.Begin(); it.IsNotEnd(); ++it)
@@ -1276,12 +1369,16 @@ void NetworkInternal::NetworkReplicatorUpdate()
if (e.Info.Server && isClient)
{
// Client -> Server
#if USE_NETWORK_REPLICATOR_LOG
if (e.Targets.Length() != 0)
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Server RPC '{}::{}' called with non-empty list of targets is not supported (only server will receive it)", e.Name.First.ToString(), e.Name.Second.ToString());
#endif
peer->EndSendMessage(channel, msg);
}
else if (e.Info.Client && (isServer || isHost))
{
// Server -> Client(s)
BuildCachedTargets(NetworkManager::Clients, item.TargetClientIds, NetworkManager::LocalClientId);
BuildCachedTargets(NetworkManager::Clients, item.TargetClientIds, e.Targets, NetworkManager::LocalClientId);
peer->EndSendMessage(channel, msg, CachedTargets);
}
}
@@ -1307,16 +1404,17 @@ void NetworkInternal::OnNetworkMessageObjectReplicate(NetworkEvent& event, Netwo
if (client && item.OwnerClientId != client->ClientId)
return;
const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId;
if (msgData.PartsCount == 1)
{
// Replicate
InvokeObjectReplication(item, msgData.OwnerFrame, event.Message.Buffer + event.Message.Position, msgData.DataSize);
InvokeObjectReplication(item, msgData.OwnerFrame, event.Message.Buffer + event.Message.Position, msgData.DataSize, senderClientId);
}
else
{
// Add to replication from multiple parts
const uint16 msgMaxData = peer->Config.MessageSize - sizeof(NetworkMessageObjectReplicate);
ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData, 0, msgMaxData);
ReplicateItem* replicateItem = AddObjectReplicateItem(event, msgData, 0, msgMaxData, senderClientId);
replicateItem->Object = e->Object;
}
}
@@ -1329,7 +1427,8 @@ void NetworkInternal::OnNetworkMessageObjectReplicatePart(NetworkEvent& event, N
if (DespawnedObjects.Contains(msgData.ObjectId))
return; // Skip replicating not-existing objects
AddObjectReplicateItem(event, msgData, msgData.PartStart, msgData.PartSize);
const uint32 senderClientId = client ? client->ClientId : NetworkManager::ServerClientId;
AddObjectReplicateItem(event, msgData, msgData.PartStart, msgData.PartSize, senderClientId);
}
void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
@@ -1465,10 +1564,6 @@ void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkCl
if (!obj->IsRegistered())
obj->RegisterObject();
const NetworkReplicatedObject* parent = ResolveObject(msgDataItem.ParentId);
if (!parent && msgDataItem.ParentId.IsValid())
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Failed to find object {} as parent to spawned object", msgDataItem.ParentId.ToString());
}
// Add object to the list
NetworkReplicatedObject item;
@@ -1499,6 +1594,21 @@ void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkCl
sceneObject->SetParent(parent->Object.As<Actor>());
else if (auto* parentActor = Scripting::TryFindObject<Actor>(msgDataItem.ParentId))
sceneObject->SetParent(parentActor);
else if (msgDataItem.ParentId.IsValid())
{
#if USE_NETWORK_REPLICATOR_LOG
// Ignore case when parent object in a message was a scene (eg. that is already unloaded on a client)
AssetInfo assetInfo;
if (!Content::GetAssetInfo(msgDataItem.ParentId, assetInfo) || assetInfo.TypeName == TEXT("FlaxEngine.SceneAsset"))
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Failed to find object {} as parent to spawned object", msgDataItem.ParentId.ToString());
}
#endif
}
}
else if (!parent && msgDataItem.ParentId.IsValid())
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Failed to find object {} as parent to spawned object", msgDataItem.ParentId.ToString());
}
if (item.AsNetworkObject)
@@ -1602,7 +1712,7 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie
const NetworkRpcInfo* info = NetworkRpcInfo::RPCsTable.TryGet(name);
if (!info)
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object {} RPC {}::{}", msgData.ObjectId, String(msgData.RpcTypeName), String(msgData.RpcName));
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown RPC {}::{} for object {}", String(msgData.RpcTypeName), String(msgData.RpcName), msgData.ObjectId);
return;
}
@@ -1622,6 +1732,7 @@ void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClie
if (CachedReadStream == nullptr)
CachedReadStream = New<NetworkStream>();
NetworkStream* stream = CachedReadStream;
stream->SenderId = client ? client->ClientId : NetworkManager::ServerClientId;
stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.ArgsSize);
// Execute RPC

View File

@@ -119,10 +119,11 @@ namespace FlaxEngine.Networking
/// <param name="type">The RPC type.</param>
/// <param name="name">The RPC name.</param>
/// <param name="argsStream">The RPC serialized arguments stream returned from BeginInvokeRPC.</param>
/// <param name="targetIds">Optional list with network client IDs that should receive RPC. Empty to send on all clients. Ignored by Server RPCs.</param>
[Unmanaged]
public static void EndInvokeRPC(Object obj, Type type, string name, NetworkStream argsStream)
public static void EndInvokeRPC(Object obj, Type type, string name, NetworkStream argsStream, uint[] targetIds = null)
{
Internal_CSharpEndInvokeRPC(FlaxEngine.Object.GetUnmanagedPtr(obj), type, name, FlaxEngine.Object.GetUnmanagedPtr(argsStream));
Internal_CSharpEndInvokeRPC(FlaxEngine.Object.GetUnmanagedPtr(obj), type, name, FlaxEngine.Object.GetUnmanagedPtr(argsStream), targetIds);
}
/// <summary>

View File

@@ -3,6 +3,7 @@
#pragma once
#include "Types.h"
#include "Engine/Core/Types/Span.h"
#include "Engine/Scripting/ScriptingObject.h"
#include "Engine/Scripting/ScriptingType.h"
@@ -34,6 +35,13 @@ API_CLASS(static, Namespace = "FlaxEngine.Networking") class FLAXENGINE_API Netw
typedef void (*SerializeFunc)(void* instance, NetworkStream* stream, void* tag);
public:
#if !BUILD_RELEASE
/// <summary>
/// Enables verbose logging of the networking runtime. Can be used to debug problems of missing RPC invoke or object replication issues.
/// </summary>
API_FIELD() static bool EnableLog;
#endif
/// <summary>
/// Adds the network replication serializer for a given type.
/// </summary>
@@ -168,13 +176,14 @@ public:
/// <param name="type">The RPC type.</param>
/// <param name="name">The RPC name.</param>
/// <param name="argsStream">The RPC serialized arguments stream returned from BeginInvokeRPC.</param>
static void EndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream);
/// <param name="targetIds">Optional list with network client IDs that should receive RPC. Empty to send on all clients. Ignored by Server RPCs.</param>
static void EndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream, Span<uint32> targetIds = Span<uint32>());
private:
#if !COMPILE_WITHOUT_CSHARP
API_FUNCTION(NoProxy) static void AddSerializer(const ScriptingTypeHandle& typeHandle, const Function<void(void*, void*)>& serialize, const Function<void(void*, void*)>& deserialize);
API_FUNCTION(NoProxy) static void AddRPC(const ScriptingTypeHandle& typeHandle, const StringAnsiView& name, const Function<void(void*, void*)>& execute, bool isServer, bool isClient, NetworkChannelType channel);
API_FUNCTION(NoProxy) static void CSharpEndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream);
API_FUNCTION(NoProxy) static void CSharpEndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream, MonoArray* targetIds);
static StringAnsiView GetCSharpCachedName(const StringAnsiView& name);
#endif
};

View File

@@ -5,6 +5,7 @@
#include "Engine/Core/Log.h"
#include "Engine/Core/Types/StringView.h"
#include "Engine/Core/Types/Pair.h"
#include "Engine/Core/Types/Span.h"
#include "Engine/Core/Collections/Array.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Scripting/ScriptingType.h"
@@ -12,6 +13,24 @@
class NetworkStream;
// Additional context parameters for Network RPC execution (eg. to identify who sends the data).
API_STRUCT(NoDefault, Namespace="FlaxEngine.Networking") struct FLAXENGINE_API NetworkRpcParams
{
DECLARE_SCRIPTING_TYPE_MINIMAL(NetworkRpcParams);
NetworkRpcParams() = default;
NetworkRpcParams(const NetworkStream* stream);
/// <summary>
/// The ClientId of the network client that is a data sender. Can be used to detect who send the incoming RPC or replication data. Ignored when sending data.
/// </summary>
API_FIELD() uint32 SenderId = 0;
/// <summary>
/// The list of ClientId of the network clients that should receive RPC. Can be used to send RPC to a specific client(s). Ignored when receiving data.
/// </summary>
API_FIELD() Span<uint32> TargetIds;
};
// Network RPC identifier name (pair of type and function name)
typedef Pair<ScriptingTypeHandle, StringAnsiView> NetworkRpcName;

View File

@@ -23,6 +23,11 @@ private:
public:
~NetworkStream();
/// <summary>
/// The ClientId of the network client that is a data sender. Can be used to detect who send the incoming RPC or replication data. Set to the current client when writing data.
/// </summary>
API_FIELD(ReadOnly) uint32 SenderId = 0;
/// <summary>
/// Gets the pointer to the native stream memory buffer.
/// </summary>

View File

@@ -17,3 +17,4 @@ struct NetworkConnection;
struct NetworkMessage;
struct NetworkConfig;
struct NetworkDriverStats;
struct NetworkRpcParams;