From 75221baa19b57a161cfc5d457205917214af74e5 Mon Sep 17 00:00:00 2001 From: Wojtek Figat Date: Thu, 15 Sep 2022 13:13:16 +0200 Subject: [PATCH] Improvements --- Source/Engine/Networking/NetworkInternal.h | 11 ++ Source/Engine/Networking/NetworkManager.cpp | 14 +- .../Engine/Networking/NetworkReplicator.cpp | 140 +++++++++++++++--- Source/Engine/Networking/NetworkStream.cpp | 11 +- Source/Engine/Networking/NetworkStream.h | 20 ++- 5 files changed, 165 insertions(+), 31 deletions(-) diff --git a/Source/Engine/Networking/NetworkInternal.h b/Source/Engine/Networking/NetworkInternal.h index 8bbd505bb..4e6f26ceb 100644 --- a/Source/Engine/Networking/NetworkInternal.h +++ b/Source/Engine/Networking/NetworkInternal.h @@ -4,9 +4,20 @@ #include "Types.h" +enum class NetworkMessageIDs : uint8 +{ + None = 0, + Handshake, + HandshakeReply, + ReplicatedObject, + + MAX, +}; + class NetworkInternal { public: static void NetworkReplicatorClear(); static void NetworkReplicatorUpdate(); + static void OnNetworkMessageReplicatedObject(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer); }; diff --git a/Source/Engine/Networking/NetworkManager.cpp b/Source/Engine/Networking/NetworkManager.cpp index cf601f7d7..b1787f3f4 100644 --- a/Source/Engine/Networking/NetworkManager.cpp +++ b/Source/Engine/Networking/NetworkManager.cpp @@ -26,15 +26,6 @@ Delegate NetworkManager::ClientConnecting; Delegate NetworkManager::ClientConnected; Delegate NetworkManager::ClientDisconnected; -enum class NetworkMessageIDs : uint8 -{ - None = 0, - Handshake, - HandshakeReply, - - MAX, -}; - PACK_STRUCT(struct NetworkMessageHandshake { NetworkMessageIDs ID; @@ -120,6 +111,7 @@ namespace nullptr, OnNetworkMessageHandshake, OnNetworkMessageHandshakeReply, + NetworkInternal::OnNetworkMessageReplicatedObject, }; } @@ -245,7 +237,7 @@ bool NetworkManager::StartClient() return true; if (!Peer->Connect()) return true; - LocalClient = New(NetworkConnection{0}); + LocalClient = New(NetworkConnection{ 0 }); return false; } @@ -260,7 +252,7 @@ bool NetworkManager::StartHost() return true; if (!Peer->Listen()) return true; - LocalClient = New(NetworkConnection{0}); + LocalClient = New(NetworkConnection{ 0 }); // Auto-connect host LocalClient->State = NetworkConnectionState::Connected; diff --git a/Source/Engine/Networking/NetworkReplicator.cpp b/Source/Engine/Networking/NetworkReplicator.cpp index 46bfc730e..b988914fd 100644 --- a/Source/Engine/Networking/NetworkReplicator.cpp +++ b/Source/Engine/Networking/NetworkReplicator.cpp @@ -6,6 +6,10 @@ #include "NetworkInternal.h" #include "NetworkStream.h" #include "INetworkSerializable.h" +#include "NetworkMessage.h" +#include "NetworkPeer.h" +#include "NetworkChannelType.h" +#include "NetworkEvent.h" #include "Engine/Core/Log.h" #include "Engine/Core/Collections/HashSet.h" #include "Engine/Platform/CriticalSection.h" @@ -17,12 +21,21 @@ // Enables verbose logging for Network Replicator actions (dev-only) #define NETWORK_REPLICATOR_DEBUG_LOG 1 +PACK_STRUCT(struct NetworkMessageReplicatedObject + { + NetworkMessageIDs ID; + Guid ObjectId; // TODO: introduce networked-ids to synchronize unique ids as ushort (less data over network) + Guid OwnerId; + char ObjectTypeName[128]; // TODO: introduce networked-name to synchronize unique names as ushort (less data over network) + uint16 DataSize; + }); + struct NetworkReplicatedObject { ScriptingObjectReference Object; + Guid ObjectId; Guid OwnerId; #if NETWORK_REPLICATOR_DEBUG_LOG - Guid ObjectId; bool InvalidTypeWarn = false; #endif @@ -36,13 +49,14 @@ struct NetworkReplicatedObject return Object == other; } + bool operator==(const Guid& other) const + { + return ObjectId == other; + } + String ToString() const { -#if NETWORK_REPLICATOR_DEBUG_LOG return ObjectId.ToString(); -#else - return Object.GetID().ToString(); -#endif } }; @@ -55,7 +69,9 @@ namespace { CriticalSection ObjectsLock; HashSet Objects; - NetworkStream* CachedStream = nullptr; + NetworkStream* CachedWriteStream = nullptr; + NetworkStream* CachedReadStream = nullptr; + Array CachedTargets; } class NetworkReplicationService : public EngineService @@ -76,6 +92,25 @@ void NetworkReplicationService::Dispose() NetworkReplicationService NetworkReplicationServiceInstance; +NetworkReplicatedObject* ResolveObject(const Guid& objectId, const Guid& ownerId, char objectTypeName[128]) +{ + const auto it = Objects.Find(objectId); + if (it != Objects.End()) + return &it->Item; + + // TODO: cache objects remapping table to skip this search on 2nd sync + + // Try to use remapped object + const auto ownerIt = Objects.Find(ownerId); + if (ownerIt != Objects.End()) + { + // TODO: find object of given type within owner (only objects that ahs not been sync-replicated yet) + //return &ownerIt->Item; + } + + return nullptr; +} + void NetworkReplicator::AddObject(ScriptingObject* obj, ScriptingObject* owner) { if (!obj || NetworkManager::State == NetworkConnectionState::Offline) @@ -88,9 +123,9 @@ void NetworkReplicator::AddObject(ScriptingObject* obj, ScriptingObject* owner) // Add object to the list NetworkReplicatedObject item; item.Object = obj; + item.ObjectId = obj->GetID(); item.OwnerId = owner->GetID(); #if NETWORK_REPLICATOR_DEBUG_LOG - item.ObjectId = obj->GetID(); LOG(Info, "[NetworkReplicator] Add new object {}:{}, owned by {}:{}", item.ToString(), obj->GetType().ToString(), item.OwnerId.ToString(), owner->GetType().ToString()); #endif Objects.Add(MoveTemp(item)); @@ -106,7 +141,9 @@ void NetworkInternal::NetworkReplicatorClear() #endif Objects.Clear(); Objects.SetCapacity(0); - SAFE_DELETE(CachedStream); + SAFE_DELETE(CachedWriteStream); + SAFE_DELETE(CachedReadStream); + CachedTargets.Resize(0); } void NetworkInternal::NetworkReplicatorUpdate() @@ -115,11 +152,13 @@ void NetworkInternal::NetworkReplicatorUpdate() ScopeLock lock(ObjectsLock); if (Objects.Count() == 0) return; - if (CachedStream == nullptr) - CachedStream = New(); + if (CachedWriteStream == nullptr) + CachedWriteStream = New(); + NetworkStream* stream = CachedWriteStream; + NetworkPeer* peer = NetworkManager::Peer; // TODO: introduce NetworkReplicationHierarchy to optimize objects replication in large worlds (eg. batched culling networked scene objects that are too far from certain client to be relevant) // TODO: per-object sync interval (in frames) - could be scaled by hierarchy (eg. game could slow down sync rate for objects far from player) - // TODO: network authority (eg. object owned by client) + // TODO: network authority (eg. object owned by client that can affect server) if (NetworkManager::IsClient()) { @@ -127,6 +166,17 @@ void NetworkInternal::NetworkReplicatorUpdate() } else { + // Collect clients for replication + CachedTargets.Clear(); + // TODO: per-object relevancy for connected clients (eg. skip replicating actor to far players) + for (const NetworkClient* client : NetworkManager::Clients) + { + if (client->State == NetworkConnectionState::Connected) + { + CachedTargets.Add(client->Connection); + } + } + // Brute force synchronize all networked objects with clients for (auto it = Objects.Begin(); it.IsNotEnd(); ++it) { @@ -144,10 +194,10 @@ void NetworkInternal::NetworkReplicatorUpdate() // Serialize object // TODO: cache per-type serialization thunk to boost CPU performance - CachedStream->Initialize(1024); + stream->Initialize(); if (auto* serializable = ScriptingObject::ToInterface(obj)) { - serializable->Serialize(CachedStream); + serializable->Serialize(stream); } else { @@ -160,14 +210,70 @@ void NetworkInternal::NetworkReplicatorUpdate() #endif continue; } - // TODO: how to serialize object? in memory to MemoryWriteStream? handle both C++ and C# without any memory alloc! - // Brute force object to all clients - for (NetworkClient* client : NetworkManager::Clients) + // Send object to clients { + const uint32 size = stream->GetPosition(); + ASSERT(size <= MAX_uint16) + NetworkMessageReplicatedObject msgData; + msgData.ID = NetworkMessageIDs::ReplicatedObject; + msgData.ObjectId = item.ObjectId; + msgData.OwnerId = item.OwnerId; + // TODO: put timestamp (or server tick number) to prevent applying replicated object changes from previous packet (Unreliable and Unordered channel is used) + const StringAnsiView& objectTypeName = obj->GetType().Fullname; + Platform::MemoryCopy(msgData.ObjectTypeName, objectTypeName.Get(), objectTypeName.Length()); + msgData.ObjectTypeName[objectTypeName.Length()] = 0; + msgData.DataSize = size; // TODO: split object data (eg. more messages) if needed - // TODO: send message from Peer to client->Connection + NetworkMessage msg = peer->BeginSendMessage(); + msg.WriteStructure(msgData); + msg.WriteBytes(stream->GetBuffer(), size); + peer->EndSendMessage(NetworkChannelType::Unreliable, msg, CachedTargets); + + // TODO: stats for bytes send per object type } } } } + +void NetworkInternal::OnNetworkMessageReplicatedObject(NetworkEvent& event, NetworkClient* client, NetworkPeer* peer) +{ + NetworkMessageReplicatedObject msgData; + event.Message.ReadStructure(msgData); + ScopeLock lock(ObjectsLock); + NetworkReplicatedObject* e = ResolveObject(msgData.ObjectId, msgData.OwnerId, msgData.ObjectTypeName); + if (e) + { + auto& item = *e; + ScriptingObject* obj = item.Object.Get(); + if (!obj) + return; + + // Setup message reading stream + if (CachedReadStream == nullptr) + CachedReadStream = New(); + NetworkStream* stream = CachedReadStream; + stream->Initialize(event.Message.Buffer + event.Message.Position, msgData.DataSize); + + // Deserialize object + // TODO: cache per-type serialization thunk to boost CPU performance + if (auto* serializable = ScriptingObject::ToInterface(obj)) + { + serializable->Deserialize(stream); + } + else + { +#if NETWORK_REPLICATOR_DEBUG_LOG + if (!item.InvalidTypeWarn) + { + item.InvalidTypeWarn = true; + LOG(Error, "[NetworkReplicator] Cannot serialize object {} (missing serialization logic)", item.ToString()); + } +#endif + } + } + else + { + // TODO: put message to the queue to be resolved later (eg. object replication came before spawn packet) - use TTL to prevent memory overgrowing + } +} diff --git a/Source/Engine/Networking/NetworkStream.cpp b/Source/Engine/Networking/NetworkStream.cpp index 99e80cbc6..e426629aa 100644 --- a/Source/Engine/Networking/NetworkStream.cpp +++ b/Source/Engine/Networking/NetworkStream.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved. +// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved. #include "NetworkStream.h" @@ -38,6 +38,15 @@ void NetworkStream::Initialize(uint32 minCapacity) _position = _buffer; } +void NetworkStream::Initialize(byte* buffer, uint32 length) +{ + if (_allocated) + Allocator::Free(_buffer); + _position = _buffer = buffer; + _length = length; + _allocated = false; +} + void NetworkStream::Flush() { // Nothing to do diff --git a/Source/Engine/Networking/NetworkStream.h b/Source/Engine/Networking/NetworkStream.h index bbe35b207..9c6472799 100644 --- a/Source/Engine/Networking/NetworkStream.h +++ b/Source/Engine/Networking/NetworkStream.h @@ -1,4 +1,4 @@ -// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved. +// Copyright (c) 2012-2022 Wojciech Figat. All rights reserved. #pragma once @@ -21,10 +21,26 @@ private: public: ~NetworkStream(); + /// + /// Gets the pointer to the native stream memory buffer. + /// + API_PROPERTY() byte* GetBuffer() const + { + return _buffer; + } + /// /// Initializes the stream for writing. Allocates the memory or reuses already existing memory. Resets the current stream position to beginning. /// - API_FUNCTION() void Initialize(uint32 minCapacity); + /// The minimum capacity (in bytes) for the memory buffer used for data storage. + API_FUNCTION() void Initialize(uint32 minCapacity = 1024); + + /// + /// Initializes the stream for reading. + /// + /// The allocated memory. + /// The allocated memory length (bytes count). + API_FUNCTION() void Initialize(byte* buffer, uint32 length); /// /// Writes bytes to the stream