// Copyright (c) 2012-2020 Wojciech Figat. All rights reserved. #pragma once #include "Engine/Platform/Platform.h" #include "Engine/Platform/CriticalSection.h" #include "Engine/Core/Memory/Allocation.h" #include "Engine/Core/Math/Math.h" #include "Engine/Core/Core.h" /// /// The concurrent data buffer allows to implement asynchronous data writing to the linear buffer by more than one worker thread at once. /// Supports only value types that don't require constructor/destructor invocation. /// template class ConcurrentBuffer { friend ConcurrentBuffer; private: int64 _count; int64 _capacity; T* _data; CriticalSection _resizeLocker; public: /// /// Initializes a new instance of the class. /// ConcurrentBuffer() : _count(0) , _capacity(0) , _data(nullptr) { } /// /// Initializes a new instance of the class. /// /// The capacity. ConcurrentBuffer(int32 capacity) : _count(0) , _capacity(capacity) { if (_capacity > 0) _data = (T*)Allocator::Allocate(_capacity * sizeof(T)); else _data = nullptr; } /// /// Finalizes an instance of the class. /// ~ConcurrentBuffer() { Allocator::Free(_data); } public: /// /// Gets the amount of the elements in the collection. /// /// The items count. FORCE_INLINE int64 Count() { return Platform::AtomicRead(&_count); } /// /// Get amount of the elements that can be holed by collection without resizing. /// /// the items capacity. FORCE_INLINE int64 Capacity() const { return _capacity; } /// /// Determines whether this collection isn't empty. /// /// true if this collection has elements; otherwise, false. FORCE_INLINE bool HasItems() const { return _count != 0; } /// /// Determines whether this collection is empty. /// /// true if this collection is empty; otherwise, false. FORCE_INLINE bool IsEmpty() const { return _count == 0; } /// /// Gets the pointer to the first element in the collection. /// /// The allocation start. FORCE_INLINE T* Get() { return _data; } /// /// Gets the pointer to the first element in the collection. /// /// The allocation start. FORCE_INLINE const T* Get() const { return _data; } /// /// Gets the last element. /// /// The last element reference. FORCE_INLINE T& Last() { ASSERT(_count > 0); return _data[_count - 1]; } /// /// Gets the last element. /// /// The last element reference. FORCE_INLINE const T& Last() const { ASSERT(_count > 0); return _data[_count - 1]; } /// /// Gets the first element. /// /// The first element reference. FORCE_INLINE T& First() { ASSERT(_count > 0); return _data[0]; } /// /// Gets the first element. /// /// The first element reference. FORCE_INLINE const T& First() const { ASSERT(_count > 0); return _data[0]; } /// /// Get or sets element by the index. /// /// The index. /// The item reference. FORCE_INLINE T& operator[](int64 index) { ASSERT(index >= 0 && index < _count); return _data[index]; } /// /// Get or sets element by the index. /// /// The index. /// The item reference (constant). FORCE_INLINE const T& operator[](int64 index) const { ASSERT(index >= 0 && index < _count); return _data[index]; } public: /// /// Clear the collection but without changing its capacity. /// FORCE_INLINE void Clear() { Platform::InterlockedExchange(&_count, 0); } /// /// Releases this buffer data. /// void Release() { _resizeLocker.Lock(); Allocator::Free(_data); _data = nullptr; _capacity = 0; _count = 0; _resizeLocker.Unlock(); } /// /// Sets the custom size of the collection. Only for the custom usage with dedicated data. /// /// The size. void SetSize(int32 size) { ASSERT(size >= 0 && size <= _capacity); _count = size; } /// /// Adds the single item to the collection. Handles automatic buffer resizing. Thread-safe function that can be called from many threads at once. /// /// The item to add. /// The index of the added item. FORCE_INLINE int64 Add(const T& item) { return Add(&item, 1); } /// /// Adds the array of items to the collection. Handles automatic buffer resizing. Thread-safe function that can be called from many threads at once. /// /// The collection of items to add. /// The items count. /// The index of the added first item. int64 Add(const T* items, int32 count) { const int64 index = Platform::InterlockedAdd(&_count, (int64)count); const int64 newCount = index + (int64)count; EnsureCapacity(newCount); Memory::CopyItems(_data + index, items, count); return index; } // Add collection of items to the collection // @param collection Array with the items to add FORCE_INLINE void Add(ConcurrentBuffer& collection) { Add(collection.Get(), collection.Count()); } /// /// Adds the given amount of items to the collection. /// /// The items count. /// The index of the added first item. int64 AddDefault(int32 count = 1) { const int64 index = Platform::InterlockedAdd(&_count, (int64)count); const int64 newCount = index + (int64)count; EnsureCapacity(newCount); Memory::ConstructItems(_data + newCount, count); return index; } /// /// Adds the one item to the collection and returns the reference to it. /// /// The reference to the added item. FORCE_INLINE T& AddOne() { const int64 index = Platform::InterlockedAdd(&_count, 1); const int64 newCount = index + 1; EnsureCapacity(newCount); Memory::ConstructItems(_data + index, 1); return _data[index - 1]; } /// /// Adds the new items to the end of the collection, possibly reallocating the whole collection to fit. The new items will be zeroed. /// /// Warning! AddZeroed() will create items without calling the constructor and this is not appropriate for item types that require a constructor to function properly. /// /// /// The number of new items to add. /// The index of the added first item. int64 AddZeroed(int32 count = 1) { const int64 index = Platform::InterlockedAdd(&_count, 1); const int64 newCount = index + 1; EnsureCapacity(newCount); Platform::MemoryClear(Get() + index, count * sizeof(T)); return _data[index - 1]; } /// /// Ensures that the buffer has the given the capacity (equal or more). Preserves the existing items by copy operation. /// /// The minimum capacity. void EnsureCapacity(int64 minCapacity) { // Early out if (_capacity >= minCapacity) return; _resizeLocker.Lock(); // Skip if the other thread performed resizing if (_capacity >= minCapacity) { _resizeLocker.Unlock(); return; } // Compute the new capacity int64 newCapacity = _capacity == 0 ? 8 : Math::RoundUpToPowerOf2(_capacity) * 2; if (newCapacity < minCapacity) { newCapacity = minCapacity; } ASSERT(newCapacity > _capacity); // Allocate new data T* newData = nullptr; if (newCapacity > 0) { newData = (T*)Allocator::Allocate(newCapacity * sizeof(T)); } // Check if has data if (_data) { // Check if preserve contents if (newData && _count > 0) { Platform::MemoryCopy(newData, _data, _capacity * sizeof(T)); } // Delete old data Allocator::Free(_data); } // Set state _data = newData; _capacity = newCapacity; _resizeLocker.Unlock(); } /// /// Swaps the contents of buffer with the other object without copy operation. Performs fast internal data exchange. /// /// The other buffer. void Swap(ConcurrentBuffer& other) { const auto count = _count; const auto capacity = _capacity; const auto data = _data; _count = other._count; _capacity = other._capacity; _data = other._data; other._count = count; other._capacity = capacity; other._data = data; } public: /// /// Checks if the given element is in the collection. /// /// The item. /// true if collection contains the specified item; otherwise, false. bool Contains(const T& item) const { for (int32 i = 0; i < _count; i++) { if (_data[i] == item) { return true; } } return false; } /// /// Searches for the specified object and returns the zero-based index of the first occurrence within the entire collection. /// /// The item. /// The zero-based index of the first occurrence of itm within the entire collection, if found; otherwise, INVALID_INDEX. int32 IndexOf(const T& item) const { for (int32 i = 0; i < _count; i++) { if (_data[i] == item) { return i; } } return INVALID_INDEX; } };