Add network RPCs

This commit is contained in:
Wojciech Figat
2022-11-16 14:25:12 +01:00
parent 91ff0f76f8
commit efb48697fa
9 changed files with 560 additions and 37 deletions

View File

@@ -13,6 +13,7 @@ enum class NetworkMessageIDs : uint8
ObjectSpawn,
ObjectDespawn,
ObjectRole,
ObjectRpc,
MAX,
};
@@ -29,4 +30,5 @@ public:
static void OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectDespawn(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectRole(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
static void OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
};

View File

@@ -132,6 +132,7 @@ namespace
NetworkInternal::OnNetworkMessageObjectSpawn,
NetworkInternal::OnNetworkMessageObjectDespawn,
NetworkInternal::OnNetworkMessageObjectRole,
NetworkInternal::OnNetworkMessageObjectRpc,
};
}

View File

@@ -9,6 +9,7 @@
#include "NetworkPeer.h"
#include "NetworkChannelType.h"
#include "NetworkEvent.h"
#include "NetworkRpc.h"
#include "INetworkSerializable.h"
#include "INetworkObject.h"
#include "Engine/Core/Log.h"
@@ -71,6 +72,15 @@ PACK_STRUCT(struct NetworkMessageObjectRole
uint32 OwnerClientId;
});
PACK_STRUCT(struct NetworkMessageObjectRpc
{
NetworkMessageIDs ID = NetworkMessageIDs::ObjectRpc;
Guid ObjectId;
char RpcTypeName[128]; // TODO: introduce networked-name to synchronize unique names as ushort (less data over network)
char RpcName[128]; // TODO: introduce networked-name to synchronize unique names as ushort (less data over network)
uint16 ArgsSize;
});
struct NetworkReplicatedObject
{
ScriptingObjectReference<ScriptingObject> Object;
@@ -128,18 +138,30 @@ struct SpawnItem
NetworkObjectRole Role;
};
struct RpcItem
{
ScriptingObjectReference<ScriptingObject> Object;
NetworkRpcName Name;
NetworkRpcInfo Info;
BytesContainer ArgsData;
};
namespace
{
CriticalSection ObjectsLock;
HashSet<NetworkReplicatedObject> Objects;
Array<SpawnItem> SpawnQueue;
Array<Guid> DespawnQueue;
Array<RpcItem> RpcQueue;
Dictionary<Guid, Guid> IdsRemappingTable;
NetworkStream* CachedWriteStream = nullptr;
NetworkStream* CachedReadStream = nullptr;
Array<NetworkClient*> NewClients;
Array<NetworkConnection> CachedTargets;
Dictionary<ScriptingTypeHandle, Serializer> SerializersTable;
#if !COMPILE_WITHOUT_CSHARP
Dictionary<StringAnsiView, StringAnsi*> CSharpCachedNames;
#endif
}
class NetworkReplicationService : public EngineService
@@ -156,6 +178,9 @@ public:
void NetworkReplicationService::Dispose()
{
NetworkInternal::NetworkReplicatorClear();
#if !COMPILE_WITHOUT_CSHARP
CSharpCachedNames.ClearDelete();
#endif
}
NetworkReplicationService NetworkReplicationServiceInstance;
@@ -270,6 +295,12 @@ FORCE_INLINE void BuildCachedTargets(const NetworkReplicatedObject& item)
BuildCachedTargets(NetworkManager::Clients, item.TargetClientIds, item.OwnerClientId);
}
FORCE_INLINE void GetNetworkName(char buffer[128], const StringAnsiView& name)
{
Platform::MemoryCopy(buffer, name.Get(), name.Length());
buffer[name.Length()] = 0;
}
void SendObjectSpawnMessage(const NetworkReplicatedObject& item, ScriptingObject* obj)
{
NetworkMessageObjectSpawn msgData;
@@ -291,9 +322,7 @@ void SendObjectSpawnMessage(const NetworkReplicatedObject& item, ScriptingObject
msgData.PrefabObjectID = objScene->GetPrefabObjectID();
}
msgData.OwnerClientId = item.OwnerClientId;
const StringAnsiView& objectTypeName = obj->GetType().Fullname;
Platform::MemoryCopy(msgData.ObjectTypeName, objectTypeName.Get(), objectTypeName.Length());
msgData.ObjectTypeName[objectTypeName.Length()] = 0;
GetNetworkName(msgData.ObjectTypeName, obj->GetType().Fullname);
auto* peer = NetworkManager::Peer;
NetworkMessage msg = peer->BeginSendMessage();
msg.WriteStructure(msgData);
@@ -368,6 +397,48 @@ void NetworkReplicator::AddSerializer(const ScriptingTypeHandle& typeHandle, con
AddSerializer(typeHandle, INetworkSerializable_Managed, INetworkSerializable_Managed, (void*)*(SerializeFunc*)&serialize, (void*)*(SerializeFunc*)&deserialize);
}
void RPC_Execute_Managed(ScriptingObject* obj, NetworkStream* stream, void* tag)
{
auto signature = (Function<void(void*, void*)>::Signature)tag;
signature(obj, stream);
}
void NetworkReplicator::AddRPC(const ScriptingTypeHandle& typeHandle, const StringAnsiView& name, const Function<void(void*, void*)>& execute, bool isServer, bool isClient, NetworkChannelType channel)
{
if (!typeHandle)
return;
const NetworkRpcName rpcName(typeHandle, GetCSharpCachedName(name));
NetworkRpcInfo rpcInfo;
rpcInfo.Server = isServer;
rpcInfo.Client = isClient;
rpcInfo.Channel = (uint8)channel;
rpcInfo.Invoke = nullptr; // C# RPCs invoking happens on C# side (build-time code generation)
rpcInfo.Execute = RPC_Execute_Managed;
rpcInfo.Tag = (void*)*(SerializeFunc*)&execute;
// Add to the global RPCs table
NetworkRpcInfo::RPCsTable[rpcName] = rpcInfo;
}
void NetworkReplicator::CSharpEndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream)
{
EndInvokeRPC(obj, type, GetCSharpCachedName(name), argsStream);
}
StringAnsiView NetworkReplicator::GetCSharpCachedName(const StringAnsiView& name)
{
// Cache method name on a heap to support C# hot-reloads (also glue code from C# passes view to the stack-only text so cache it here)
StringAnsi* result;
if (!CSharpCachedNames.TryGet(name, result))
{
result = New<StringAnsi>(name);
CSharpCachedNames.Add(StringAnsiView(*result), result);
}
return StringAnsiView(*result);
}
#endif
void NetworkReplicator::AddSerializer(const ScriptingTypeHandle& typeHandle, SerializeFunc serialize, SerializeFunc deserialize, void* serializeTag, void* deserializeTag)
@@ -610,6 +681,32 @@ void NetworkReplicator::DirtyObject(ScriptingObject* obj)
// TODO: implement objects state replication frequency and dirtying
}
Dictionary<NetworkRpcName, NetworkRpcInfo> NetworkRpcInfo::RPCsTable;
NetworkStream* NetworkReplicator::BeginInvokeRPC()
{
if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>();
CachedWriteStream->Initialize();
return CachedWriteStream;
}
void NetworkReplicator::EndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream)
{
const NetworkRpcInfo* info = NetworkRpcInfo::RPCsTable.TryGet(NetworkRpcName(type, name));
if (!info || !obj)
return;
ObjectsLock.Lock();
auto& rpc = RpcQueue.AddOne();
rpc.Object = obj;
rpc.Name.First = type;
rpc.Name.Second = name;
rpc.Info = *info;
const Span<byte> argsData(argsStream->GetBuffer(), argsStream->GetPosition());
rpc.ArgsData.Copy(argsData);
ObjectsLock.Unlock();
}
void NetworkInternal::NetworkReplicatorClientConnected(NetworkClient* client)
{
ScopeLock lock(ObjectsLock);
@@ -685,6 +782,7 @@ void NetworkInternal::NetworkReplicatorUpdate()
if (CachedWriteStream == nullptr)
CachedWriteStream = New<NetworkStream>();
const bool isClient = NetworkManager::IsClient();
const bool isServer = NetworkManager::IsServer();
NetworkStream* stream = CachedWriteStream;
NetworkPeer* peer = NetworkManager::Peer;
@@ -763,6 +861,8 @@ void NetworkInternal::NetworkReplicatorUpdate()
for (auto& e : SpawnQueue)
{
ScriptingObject* obj = e.Object.Get();
if (!obj)
continue;
auto it = Objects.Find(obj->GetID());
if (it == Objects.End())
{
@@ -848,9 +948,7 @@ void NetworkInternal::NetworkReplicatorUpdate()
IdsRemappingTable.KeyOf(msgData.ObjectId, &msgData.ObjectId);
IdsRemappingTable.KeyOf(msgData.ParentId, &msgData.ParentId);
}
const StringAnsiView& objectTypeName = obj->GetType().Fullname;
Platform::MemoryCopy(msgData.ObjectTypeName, objectTypeName.Get(), objectTypeName.Length());
msgData.ObjectTypeName[objectTypeName.Length()] = 0;
GetNetworkName(msgData.ObjectTypeName, obj->GetType().Fullname);
msgData.DataSize = size;
// TODO: split object data (eg. more messages) if needed
NetworkMessage msg = peer->BeginSendMessage();
@@ -869,6 +967,47 @@ void NetworkInternal::NetworkReplicatorUpdate()
}
}
// Invoke RPCs
for (auto& e : RpcQueue)
{
ScriptingObject* obj = e.Object.Get();
if (!obj)
continue;
auto it = Objects.Find(obj->GetID());
if (it == Objects.End())
continue;
auto& item = it->Item;
// Send despawn message
//NETWORK_REPLICATOR_LOG(Info, "[NetworkReplicator] Rpc {}::{} object ID={}", e.Name.First.ToString(), String(e.Name.Second), item.ToString());
NetworkMessageObjectRpc msgData;
msgData.ObjectId = item.ObjectId;
if (isClient)
{
// Remap local client object ids into server ids
IdsRemappingTable.KeyOf(msgData.ObjectId, &msgData.ObjectId);
}
GetNetworkName(msgData.RpcTypeName, e.Name.First.GetType().Fullname);
GetNetworkName(msgData.RpcName, e.Name.Second);
msgData.ArgsSize = (uint16)e.ArgsData.Length();
NetworkMessage msg = peer->BeginSendMessage();
msg.WriteStructure(msgData);
msg.WriteBytes(e.ArgsData.Get(), e.ArgsData.Length());
NetworkChannelType channel = (NetworkChannelType)e.Info.Channel;
if (e.Info.Server && isClient)
{
// Client -> Server
peer->EndSendMessage(channel, msg);
}
else if (e.Info.Client && isServer)
{
// Server -> Client(s)
BuildCachedTargets(item);
peer->EndSendMessage(channel, msg, CachedTargets);
}
}
RpcQueue.Clear();
// Clear networked objects mapping table
Scripting::ObjectsLookupIdMapping.Set(nullptr);
}
@@ -1019,7 +1158,7 @@ void NetworkInternal::OnNetworkMessageObjectSpawn(NetworkEvent& event, NetworkCl
else
{
// Spawn object
const ScriptingTypeHandle objectType = Scripting::FindScriptingType(StringAnsiView(msgData.ObjectTypeName));
const ScriptingTypeHandle objectType = Scripting::FindScriptingType(msgData.ObjectTypeName);
obj = ScriptingObject::NewObject(objectType);
if (!obj)
{
@@ -1139,3 +1278,54 @@ void NetworkInternal::OnNetworkMessageObjectRole(NetworkEvent& event, NetworkCli
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object role update {}", msgData.ObjectId);
}
}
void NetworkInternal::OnNetworkMessageObjectRpc(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
{
NetworkMessageObjectRpc msgData;
event.Message.ReadStructure(msgData);
ScopeLock lock(ObjectsLock);
NetworkReplicatedObject* e = ResolveObject(msgData.ObjectId);
if (e)
{
auto& item = *e;
ScriptingObject* obj = item.Object.Get();
if (!obj)
return;
// Find RPC info
NetworkRpcName name;
name.First = Scripting::FindScriptingType(msgData.RpcTypeName);
name.Second = msgData.RpcName;
const NetworkRpcInfo* info = NetworkRpcInfo::RPCsTable.TryGet(name);
if (!info)
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object {} RPC {}::{}", msgData.ObjectId, String(msgData.RpcTypeName), String(msgData.RpcName));
return;
}
// Validate RPC
if (info->Server && NetworkManager::IsClient())
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot invoke server RPC {}::{} on client", String(msgData.RpcTypeName), String(msgData.RpcName));
return;
}
if (info->Client && NetworkManager::IsServer())
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Cannot invoke client RPC {}::{} on server", String(msgData.RpcTypeName), String(msgData.RpcName));
return;
}
// Setup message reading stream
if (CachedReadStream == nullptr)
CachedReadStream = New<NetworkStream>();
NetworkStream* stream = CachedReadStream;
stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.ArgsSize);
// Execute RPC
info->Execute(obj, stream, info->Tag);
}
else
{
NETWORK_REPLICATOR_LOG(Error, "[NetworkReplicator] Unknown object {} RPC {}::{}", msgData.ObjectId, String(msgData.RpcTypeName), String(msgData.RpcName));
}
}

View File

@@ -30,6 +30,16 @@ namespace FlaxEngine.Networking
/// <param name="streamPtr">var stream = (NetworkStream)Object.FromUnmanagedPtr(streamPtr)</param>
public delegate void SerializeFunc(IntPtr instancePtr, IntPtr streamPtr);
/// <summary>
/// Network RPC executing delegate.
/// </summary>
/// <remarks>
/// Use Object.FromUnmanagedPtr(objPtr/streamPtr) to get object or NetworkStream from raw native pointers.
/// </remarks>
/// <param name="instancePtr">var instance = Object.FromUnmanagedPtr(instancePtr)</param>
/// <param name="streamPtr">var stream = (NetworkStream)Object.FromUnmanagedPtr(streamPtr)</param>
public delegate void ExecuteRPCFunc(IntPtr instancePtr, IntPtr streamPtr);
/// <summary>
/// Registers a new serialization methods for a given C# type.
/// </summary>
@@ -93,5 +103,35 @@ namespace FlaxEngine.Networking
}
return Internal_InvokeSerializer(type, instance, FlaxEngine.Object.GetUnmanagedPtr(stream), serialize);
}
/// <summary>
/// Ends invoking the RPC.
/// </summary>
/// <param name="obj">The target object to invoke RPC.</param>
/// <param name="type">The RPC type.</param>
/// <param name="name">The RPC name.</param>
/// <param name="argsStream">The RPC serialized arguments stream returned from BeginInvokeRPC.</param>
[Unmanaged]
public static void EndInvokeRPC(Object obj, Type type, string name, NetworkStream argsStream)
{
Internal_CSharpEndInvokeRPC(FlaxEngine.Object.GetUnmanagedPtr(obj), type, name, FlaxEngine.Object.GetUnmanagedPtr(argsStream));
}
/// <summary>
/// Registers a RPC method for a given C# method.
/// </summary>
/// <param name="type">The C# type (FlaxEngine.Object).</param>
/// <param name="name">The RPC method name (from that type).</param>
/// <param name="execute">Function to call for RPC execution.</param>
/// <param name="isServer">Server RPC.</param>
/// <param name="isClient">Client RPC.</param>
/// <param name="channel">Network channel to use for RPC transport.</param>
[Unmanaged]
public static void AddRPC(Type type, string name, ExecuteRPCFunc execute, bool isServer = true, bool isClient = false, NetworkChannelType channel = NetworkChannelType.ReliableOrdered)
{
if (!typeof(FlaxEngine.Object).IsAssignableFrom(type))
throw new ArgumentException("Not supported type for RPC. Only FlaxEngine.Object types are valid.");
Internal_AddRPC(type, name, Marshal.GetFunctionPointerForDelegate(execute), isServer, isClient, channel);
}
}
}

View File

@@ -154,8 +154,27 @@ public:
/// <param name="obj">The network object.</param>
API_FUNCTION() static void DirtyObject(ScriptingObject* obj);
public:
/// <summary>
/// Begins invoking the RPC and returns the Network Stream to serialize parameters to.
/// </summary>
/// <returns>Network Stream to write RPC parameters to.</returns>
API_FUNCTION() static NetworkStream* BeginInvokeRPC();
/// <summary>
/// Ends invoking the RPC.
/// </summary>
/// <param name="obj">The target object to invoke RPC.</param>
/// <param name="type">The RPC type.</param>
/// <param name="name">The RPC name.</param>
/// <param name="argsStream">The RPC serialized arguments stream returned from BeginInvokeRPC.</param>
static void EndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream);
private:
#if !COMPILE_WITHOUT_CSHARP
API_FUNCTION(NoProxy) static void AddSerializer(const ScriptingTypeHandle& type, const Function<void(void*, void*)>& serialize, const Function<void(void*, void*)>& deserialize);
API_FUNCTION(NoProxy) static void AddSerializer(const ScriptingTypeHandle& typeHandle, const Function<void(void*, void*)>& serialize, const Function<void(void*, void*)>& deserialize);
API_FUNCTION(NoProxy) static void AddRPC(const ScriptingTypeHandle& typeHandle, const StringAnsiView& name, const Function<void(void*, void*)>& execute, bool isServer, bool isClient, NetworkChannelType channel);
API_FUNCTION(NoProxy) static void CSharpEndInvokeRPC(ScriptingObject* obj, const ScriptingTypeHandle& type, const StringAnsiView& name, NetworkStream* argsStream);
static StringAnsiView GetCSharpCachedName(const StringAnsiView& name);
#endif
};

View File

@@ -0,0 +1,76 @@
// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Core/Types/StringView.h"
#include "Engine/Core/Types/Pair.h"
#include "Engine/Core/Collections/Array.h"
#include "Engine/Core/Collections/Dictionary.h"
#include "Engine/Scripting/ScriptingType.h"
class NetworkStream;
// Network RPC identifier name (pair of type and function name)
typedef Pair<ScriptingTypeHandle, StringAnsiView> NetworkRpcName;
// Network RPC descriptor
struct FLAXENGINE_API NetworkRpcInfo
{
uint8 Server : 1;
uint8 Client : 1;
uint8 Channel : 4;
void (*Execute)(ScriptingObject* obj, NetworkStream* stream, void* tag);
void (*Invoke)(ScriptingObject* obj, void** args);
void* Tag;
/// <summary>
/// Global table for registered RPCs. Key: pair of type, RPC name. Value: RPC descriptor.
/// </summary>
static Dictionary<NetworkRpcName, NetworkRpcInfo> RPCsTable;
};
// Gets the pointer to the RPC argument into the args buffer
template<typename T>
FORCE_INLINE void NetworkRpcInitArg(Array<void*, FixedAllocation<16>>& args, const T& v)
{
args.Add((void*)&v);
}
// Gets the pointers to the RPC arguments into the args buffer
template<typename T, typename... Params>
FORCE_INLINE void NetworkRpcInitArg(Array<void*, FixedAllocation<16>>& args, const T& first, Params&... params)
{
NetworkRpcInitArg(args, first);
NetworkRpcInitArg(args, Forward<Params>(params)...);
}
// Network RPC implementation (placed in the beginning of the method body)
#define NETWORK_RPC_IMPL(type, name, ...) \
{ \
const NetworkRpcInfo& rpcInfo = NetworkRpcInfo::RPCsTable[NetworkRpcName(type::TypeInitializer, StringAnsiView(#name))]; \
const NetworkManagerMode networkMode = NetworkManager::Mode; \
if ((rpcInfo.Server && networkMode == NetworkManagerMode::Client) || (rpcInfo.Client && networkMode != NetworkManagerMode::Client)) \
{ \
Array<void*, FixedAllocation<16>> args; \
NetworkRpcInitArg(args, __VA_ARGS__); \
rpcInfo.Invoke(this, args.Get()); \
if (rpcInfo.Server && networkMode == NetworkManagerMode::Client) \
return; \
if (rpcInfo.Client && networkMode == NetworkManagerMode::Server) \
return; \
} \
}
// Network RPC override implementation (placed in the beginning of the overriden method body - after call to the base class method)
#define NETWORK_RPC_OVERRIDE_IMPL(type, name, ...) \
{ \
const NetworkRpcInfo& rpcInfo = NetworkRpcInfo::RPCsTable[NetworkRpcName(type::TypeInitializer, StringAnsiView(#name))]; \
const NetworkManagerMode networkMode = NetworkManager::Mode; \
if ((rpcInfo.Server && networkMode == NetworkManagerMode::Client) || (rpcInfo.Client && networkMode != NetworkManagerMode::Client)) \
{ \
if (rpcInfo.Server && networkMode == NetworkManagerMode::Client) \
return; \
if (rpcInfo.Client && networkMode == NetworkManagerMode::Server) \
return; \
} \
}