Improvements
This commit is contained in:
@@ -4,9 +4,20 @@
|
||||
|
||||
#include "Types.h"
|
||||
|
||||
enum class NetworkMessageIDs : uint8
|
||||
{
|
||||
None = 0,
|
||||
Handshake,
|
||||
HandshakeReply,
|
||||
ReplicatedObject,
|
||||
|
||||
MAX,
|
||||
};
|
||||
|
||||
class NetworkInternal
|
||||
{
|
||||
public:
|
||||
static void NetworkReplicatorClear();
|
||||
static void NetworkReplicatorUpdate();
|
||||
static void OnNetworkMessageReplicatedObject(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer);
|
||||
};
|
||||
|
||||
@@ -26,15 +26,6 @@ Delegate<NetworkClientConnectionData&> NetworkManager::ClientConnecting;
|
||||
Delegate<NetworkClient*> NetworkManager::ClientConnected;
|
||||
Delegate<NetworkClient*> NetworkManager::ClientDisconnected;
|
||||
|
||||
enum class NetworkMessageIDs : uint8
|
||||
{
|
||||
None = 0,
|
||||
Handshake,
|
||||
HandshakeReply,
|
||||
|
||||
MAX,
|
||||
};
|
||||
|
||||
PACK_STRUCT(struct NetworkMessageHandshake
|
||||
{
|
||||
NetworkMessageIDs ID;
|
||||
@@ -120,6 +111,7 @@ namespace
|
||||
nullptr,
|
||||
OnNetworkMessageHandshake,
|
||||
OnNetworkMessageHandshakeReply,
|
||||
NetworkInternal::OnNetworkMessageReplicatedObject,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -245,7 +237,7 @@ bool NetworkManager::StartClient()
|
||||
return true;
|
||||
if (!Peer->Connect())
|
||||
return true;
|
||||
LocalClient = New<NetworkClient>(NetworkConnection{0});
|
||||
LocalClient = New<NetworkClient>(NetworkConnection{ 0 });
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -260,7 +252,7 @@ bool NetworkManager::StartHost()
|
||||
return true;
|
||||
if (!Peer->Listen())
|
||||
return true;
|
||||
LocalClient = New<NetworkClient>(NetworkConnection{0});
|
||||
LocalClient = New<NetworkClient>(NetworkConnection{ 0 });
|
||||
|
||||
// Auto-connect host
|
||||
LocalClient->State = NetworkConnectionState::Connected;
|
||||
|
||||
@@ -6,6 +6,10 @@
|
||||
#include "NetworkInternal.h"
|
||||
#include "NetworkStream.h"
|
||||
#include "INetworkSerializable.h"
|
||||
#include "NetworkMessage.h"
|
||||
#include "NetworkPeer.h"
|
||||
#include "NetworkChannelType.h"
|
||||
#include "NetworkEvent.h"
|
||||
#include "Engine/Core/Log.h"
|
||||
#include "Engine/Core/Collections/HashSet.h"
|
||||
#include "Engine/Platform/CriticalSection.h"
|
||||
@@ -17,12 +21,21 @@
|
||||
// Enables verbose logging for Network Replicator actions (dev-only)
|
||||
#define NETWORK_REPLICATOR_DEBUG_LOG 1
|
||||
|
||||
PACK_STRUCT(struct NetworkMessageReplicatedObject
|
||||
{
|
||||
NetworkMessageIDs ID;
|
||||
Guid ObjectId; // TODO: introduce networked-ids to synchronize unique ids as ushort (less data over network)
|
||||
Guid OwnerId;
|
||||
char ObjectTypeName[128]; // TODO: introduce networked-name to synchronize unique names as ushort (less data over network)
|
||||
uint16 DataSize;
|
||||
});
|
||||
|
||||
struct NetworkReplicatedObject
|
||||
{
|
||||
ScriptingObjectReference<ScriptingObject> Object;
|
||||
Guid ObjectId;
|
||||
Guid OwnerId;
|
||||
#if NETWORK_REPLICATOR_DEBUG_LOG
|
||||
Guid ObjectId;
|
||||
bool InvalidTypeWarn = false;
|
||||
#endif
|
||||
|
||||
@@ -36,13 +49,14 @@ struct NetworkReplicatedObject
|
||||
return Object == other;
|
||||
}
|
||||
|
||||
bool operator==(const Guid& other) const
|
||||
{
|
||||
return ObjectId == other;
|
||||
}
|
||||
|
||||
String ToString() const
|
||||
{
|
||||
#if NETWORK_REPLICATOR_DEBUG_LOG
|
||||
return ObjectId.ToString();
|
||||
#else
|
||||
return Object.GetID().ToString();
|
||||
#endif
|
||||
}
|
||||
};
|
||||
|
||||
@@ -55,7 +69,9 @@ namespace
|
||||
{
|
||||
CriticalSection ObjectsLock;
|
||||
HashSet<NetworkReplicatedObject> Objects;
|
||||
NetworkStream* CachedStream = nullptr;
|
||||
NetworkStream* CachedWriteStream = nullptr;
|
||||
NetworkStream* CachedReadStream = nullptr;
|
||||
Array<NetworkConnection> CachedTargets;
|
||||
}
|
||||
|
||||
class NetworkReplicationService : public EngineService
|
||||
@@ -76,6 +92,25 @@ void NetworkReplicationService::Dispose()
|
||||
|
||||
NetworkReplicationService NetworkReplicationServiceInstance;
|
||||
|
||||
NetworkReplicatedObject* ResolveObject(const Guid& objectId, const Guid& ownerId, char objectTypeName[128])
|
||||
{
|
||||
const auto it = Objects.Find(objectId);
|
||||
if (it != Objects.End())
|
||||
return &it->Item;
|
||||
|
||||
// TODO: cache objects remapping table to skip this search on 2nd sync
|
||||
|
||||
// Try to use remapped object
|
||||
const auto ownerIt = Objects.Find(ownerId);
|
||||
if (ownerIt != Objects.End())
|
||||
{
|
||||
// TODO: find object of given type within owner (only objects that ahs not been sync-replicated yet)
|
||||
//return &ownerIt->Item;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void NetworkReplicator::AddObject(ScriptingObject* obj, ScriptingObject* owner)
|
||||
{
|
||||
if (!obj || NetworkManager::State == NetworkConnectionState::Offline)
|
||||
@@ -88,9 +123,9 @@ void NetworkReplicator::AddObject(ScriptingObject* obj, ScriptingObject* owner)
|
||||
// Add object to the list
|
||||
NetworkReplicatedObject item;
|
||||
item.Object = obj;
|
||||
item.ObjectId = obj->GetID();
|
||||
item.OwnerId = owner->GetID();
|
||||
#if NETWORK_REPLICATOR_DEBUG_LOG
|
||||
item.ObjectId = obj->GetID();
|
||||
LOG(Info, "[NetworkReplicator] Add new object {}:{}, owned by {}:{}", item.ToString(), obj->GetType().ToString(), item.OwnerId.ToString(), owner->GetType().ToString());
|
||||
#endif
|
||||
Objects.Add(MoveTemp(item));
|
||||
@@ -106,7 +141,9 @@ void NetworkInternal::NetworkReplicatorClear()
|
||||
#endif
|
||||
Objects.Clear();
|
||||
Objects.SetCapacity(0);
|
||||
SAFE_DELETE(CachedStream);
|
||||
SAFE_DELETE(CachedWriteStream);
|
||||
SAFE_DELETE(CachedReadStream);
|
||||
CachedTargets.Resize(0);
|
||||
}
|
||||
|
||||
void NetworkInternal::NetworkReplicatorUpdate()
|
||||
@@ -115,11 +152,13 @@ void NetworkInternal::NetworkReplicatorUpdate()
|
||||
ScopeLock lock(ObjectsLock);
|
||||
if (Objects.Count() == 0)
|
||||
return;
|
||||
if (CachedStream == nullptr)
|
||||
CachedStream = New<NetworkStream>();
|
||||
if (CachedWriteStream == nullptr)
|
||||
CachedWriteStream = New<NetworkStream>();
|
||||
NetworkStream* stream = CachedWriteStream;
|
||||
NetworkPeer* peer = NetworkManager::Peer;
|
||||
// TODO: introduce NetworkReplicationHierarchy to optimize objects replication in large worlds (eg. batched culling networked scene objects that are too far from certain client to be relevant)
|
||||
// TODO: per-object sync interval (in frames) - could be scaled by hierarchy (eg. game could slow down sync rate for objects far from player)
|
||||
// TODO: network authority (eg. object owned by client)
|
||||
// TODO: network authority (eg. object owned by client that can affect server)
|
||||
|
||||
if (NetworkManager::IsClient())
|
||||
{
|
||||
@@ -127,6 +166,17 @@ void NetworkInternal::NetworkReplicatorUpdate()
|
||||
}
|
||||
else
|
||||
{
|
||||
// Collect clients for replication
|
||||
CachedTargets.Clear();
|
||||
// TODO: per-object relevancy for connected clients (eg. skip replicating actor to far players)
|
||||
for (const NetworkClient* client : NetworkManager::Clients)
|
||||
{
|
||||
if (client->State == NetworkConnectionState::Connected)
|
||||
{
|
||||
CachedTargets.Add(client->Connection);
|
||||
}
|
||||
}
|
||||
|
||||
// Brute force synchronize all networked objects with clients
|
||||
for (auto it = Objects.Begin(); it.IsNotEnd(); ++it)
|
||||
{
|
||||
@@ -144,10 +194,10 @@ void NetworkInternal::NetworkReplicatorUpdate()
|
||||
|
||||
// Serialize object
|
||||
// TODO: cache per-type serialization thunk to boost CPU performance
|
||||
CachedStream->Initialize(1024);
|
||||
stream->Initialize();
|
||||
if (auto* serializable = ScriptingObject::ToInterface<INetworkSerializable>(obj))
|
||||
{
|
||||
serializable->Serialize(CachedStream);
|
||||
serializable->Serialize(stream);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -160,14 +210,70 @@ void NetworkInternal::NetworkReplicatorUpdate()
|
||||
#endif
|
||||
continue;
|
||||
}
|
||||
// TODO: how to serialize object? in memory to MemoryWriteStream? handle both C++ and C# without any memory alloc!
|
||||
|
||||
// Brute force object to all clients
|
||||
for (NetworkClient* client : NetworkManager::Clients)
|
||||
// Send object to clients
|
||||
{
|
||||
const uint32 size = stream->GetPosition();
|
||||
ASSERT(size <= MAX_uint16)
|
||||
NetworkMessageReplicatedObject msgData;
|
||||
msgData.ID = NetworkMessageIDs::ReplicatedObject;
|
||||
msgData.ObjectId = item.ObjectId;
|
||||
msgData.OwnerId = item.OwnerId;
|
||||
// TODO: put timestamp (or server tick number) to prevent applying replicated object changes from previous packet (Unreliable and Unordered channel is used)
|
||||
const StringAnsiView& objectTypeName = obj->GetType().Fullname;
|
||||
Platform::MemoryCopy(msgData.ObjectTypeName, objectTypeName.Get(), objectTypeName.Length());
|
||||
msgData.ObjectTypeName[objectTypeName.Length()] = 0;
|
||||
msgData.DataSize = size;
|
||||
// TODO: split object data (eg. more messages) if needed
|
||||
// TODO: send message from Peer to client->Connection
|
||||
NetworkMessage msg = peer->BeginSendMessage();
|
||||
msg.WriteStructure(msgData);
|
||||
msg.WriteBytes(stream->GetBuffer(), size);
|
||||
peer->EndSendMessage(NetworkChannelType::Unreliable, msg, CachedTargets);
|
||||
|
||||
// TODO: stats for bytes send per object type
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NetworkInternal::OnNetworkMessageReplicatedObject(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer)
|
||||
{
|
||||
NetworkMessageReplicatedObject msgData;
|
||||
event.Message.ReadStructure(msgData);
|
||||
ScopeLock lock(ObjectsLock);
|
||||
NetworkReplicatedObject* e = ResolveObject(msgData.ObjectId, msgData.OwnerId, msgData.ObjectTypeName);
|
||||
if (e)
|
||||
{
|
||||
auto& item = *e;
|
||||
ScriptingObject* obj = item.Object.Get();
|
||||
if (!obj)
|
||||
return;
|
||||
|
||||
// Setup message reading stream
|
||||
if (CachedReadStream == nullptr)
|
||||
CachedReadStream = New<NetworkStream>();
|
||||
NetworkStream* stream = CachedReadStream;
|
||||
stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.DataSize);
|
||||
|
||||
// Deserialize object
|
||||
// TODO: cache per-type serialization thunk to boost CPU performance
|
||||
if (auto* serializable = ScriptingObject::ToInterface<INetworkSerializable>(obj))
|
||||
{
|
||||
serializable->Deserialize(stream);
|
||||
}
|
||||
else
|
||||
{
|
||||
#if NETWORK_REPLICATOR_DEBUG_LOG
|
||||
if (!item.InvalidTypeWarn)
|
||||
{
|
||||
item.InvalidTypeWarn = true;
|
||||
LOG(Error, "[NetworkReplicator] Cannot serialize object {} (missing serialization logic)", item.ToString());
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO: put message to the queue to be resolved later (eg. object replication came before spawn packet) - use TTL to prevent memory overgrowing
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved.
|
||||
// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved.
|
||||
|
||||
#include "NetworkStream.h"
|
||||
|
||||
@@ -38,6 +38,15 @@ void NetworkStream::Initialize(uint32 minCapacity)
|
||||
_position = _buffer;
|
||||
}
|
||||
|
||||
void NetworkStream::Initialize(byte* buffer, uint32 length)
|
||||
{
|
||||
if (_allocated)
|
||||
Allocator::Free(_buffer);
|
||||
_position = _buffer = buffer;
|
||||
_length = length;
|
||||
_allocated = false;
|
||||
}
|
||||
|
||||
void NetworkStream::Flush()
|
||||
{
|
||||
// Nothing to do
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved.
|
||||
// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved.
|
||||
|
||||
#pragma once
|
||||
|
||||
@@ -21,10 +21,26 @@ private:
|
||||
public:
|
||||
~NetworkStream();
|
||||
|
||||
/// <summary>
|
||||
/// Gets the pointer to the native stream memory buffer.
|
||||
/// </summary>
|
||||
API_PROPERTY() byte* GetBuffer() const
|
||||
{
|
||||
return _buffer;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes the stream for writing. Allocates the memory or reuses already existing memory. Resets the current stream position to beginning.
|
||||
/// </summary>
|
||||
API_FUNCTION() void Initialize(uint32 minCapacity);
|
||||
/// <param name="minCapacity">The minimum capacity (in bytes) for the memory buffer used for data storage.</param>
|
||||
API_FUNCTION() void Initialize(uint32 minCapacity = 1024);
|
||||
|
||||
/// <summary>
|
||||
/// Initializes the stream for reading.
|
||||
/// </summary>
|
||||
/// <param name="buffer">The allocated memory.</param>
|
||||
/// <param name="length">The allocated memory length (bytes count).</param>
|
||||
API_FUNCTION() void Initialize(byte* buffer, uint32 length);
|
||||
|
||||
/// <summary>
|
||||
/// Writes bytes to the stream
|
||||
|
||||
Reference in New Issue
Block a user