// Copyright (c) Wojciech Figat. All rights reserved.
#pragma once
#include "Engine/Platform/Platform.h"
#include "Engine/Platform/CriticalSection.h"
#include "Engine/Core/Memory/Allocation.h"
#include "Engine/Core/Math/Math.h"
#include "Engine/Core/Core.h"
///
/// The concurrent data buffer allows to implement asynchronous data writing to the linear buffer by more than one worker thread at once.
/// Supports only value types that don't require constructor/destructor invocation.
///
template
class ConcurrentBuffer
{
friend ConcurrentBuffer;
private:
int64 _count;
int64 _capacity;
T* _data;
CriticalSection _resizeLocker;
public:
///
/// Initializes a new instance of the class.
///
ConcurrentBuffer()
: _count(0)
, _capacity(0)
, _data(nullptr)
{
}
///
/// Initializes a new instance of the class.
///
/// The capacity.
ConcurrentBuffer(int32 capacity)
: _count(0)
, _capacity(capacity)
{
if (_capacity > 0)
_data = (T*)Allocator::Allocate(_capacity * sizeof(T));
else
_data = nullptr;
}
///
/// Finalizes an instance of the class.
///
~ConcurrentBuffer()
{
Allocator::Free(_data);
}
public:
///
/// Gets the amount of the elements in the collection.
///
/// The items count.
FORCE_INLINE int64 Count()
{
return Platform::AtomicRead(&_count);
}
///
/// Get amount of the elements that can be holed by collection without resizing.
///
/// the items capacity.
FORCE_INLINE int64 Capacity() const
{
return _capacity;
}
///
/// Determines whether this collection isn't empty.
///
/// true if this collection has elements; otherwise, false.
FORCE_INLINE bool HasItems() const
{
return _count != 0;
}
///
/// Determines whether this collection is empty.
///
/// true if this collection is empty; otherwise, false.
FORCE_INLINE bool IsEmpty() const
{
return _count == 0;
}
///
/// Gets the pointer to the first element in the collection.
///
/// The allocation start.
FORCE_INLINE T* Get()
{
return _data;
}
///
/// Gets the pointer to the first element in the collection.
///
/// The allocation start.
FORCE_INLINE const T* Get() const
{
return _data;
}
///
/// Gets the last element.
///
/// The last element reference.
FORCE_INLINE T& Last()
{
ASSERT(_count > 0);
return _data[_count - 1];
}
///
/// Gets the last element.
///
/// The last element reference.
FORCE_INLINE const T& Last() const
{
ASSERT(_count > 0);
return _data[_count - 1];
}
///
/// Gets the first element.
///
/// The first element reference.
FORCE_INLINE T& First()
{
ASSERT(_count > 0);
return _data[0];
}
///
/// Gets the first element.
///
/// The first element reference.
FORCE_INLINE const T& First() const
{
ASSERT(_count > 0);
return _data[0];
}
///
/// Get or sets element by the index.
///
/// The index.
/// The item reference.
FORCE_INLINE T& operator[](int64 index)
{
ASSERT(index >= 0 && index < _count);
return _data[index];
}
///
/// Get or sets element by the index.
///
/// The index.
/// The item reference (constant).
FORCE_INLINE const T& operator[](int64 index) const
{
ASSERT(index >= 0 && index < _count);
return _data[index];
}
public:
///
/// Clear the collection but without changing its capacity.
///
FORCE_INLINE void Clear()
{
Platform::InterlockedExchange(&_count, 0);
}
///
/// Releases this buffer data.
///
void Release()
{
_resizeLocker.Lock();
Allocator::Free(_data);
_data = nullptr;
_capacity = 0;
_count = 0;
_resizeLocker.Unlock();
}
///
/// Sets the custom size of the collection. Only for the custom usage with dedicated data.
///
/// The size.
void SetSize(int32 size)
{
ASSERT(size >= 0 && size <= _capacity);
_count = size;
}
///
/// Adds the single item to the collection. Handles automatic buffer resizing. Thread-safe function that can be called from many threads at once.
///
/// The item to add.
/// The index of the added item.
FORCE_INLINE int64 Add(const T& item)
{
return Add(&item, 1);
}
///
/// Adds the array of items to the collection. Handles automatic buffer resizing. Thread-safe function that can be called from many threads at once.
///
/// The collection of items to add.
/// The items count.
/// The index of the added first item.
int64 Add(const T* items, int32 count)
{
const int64 index = Platform::InterlockedAdd(&_count, (int64)count);
const int64 newCount = index + (int64)count;
EnsureCapacity(newCount);
Memory::CopyItems(_data + index, items, count);
return index;
}
///
/// Adds a collection of items to the collection.
///
/// The collection of items to add.
FORCE_INLINE void Add(ConcurrentBuffer& collection)
{
Add(collection.Get(), collection.Count());
}
///
/// Adds the given amount of items to the collection.
///
/// The items count.
/// The index of the added first item.
int64 AddDefault(int32 count = 1)
{
const int64 index = Platform::InterlockedAdd(&_count, (int64)count);
const int64 newCount = index + (int64)count;
EnsureCapacity(newCount);
Memory::ConstructItems(_data + newCount, count);
return index;
}
///
/// Adds the one item to the collection and returns the reference to it.
///
/// The reference to the added item.
FORCE_INLINE T& AddOne()
{
const int64 index = Platform::InterlockedAdd(&_count, 1);
const int64 newCount = index + 1;
EnsureCapacity(newCount);
Memory::ConstructItems(_data + index, 1);
return _data[index - 1];
}
///
/// Adds the new items to the end of the collection, possibly reallocating the whole collection to fit. The new items will be zeroed.
///
/// Warning! AddZeroed() will create items without calling the constructor and this is not appropriate for item types that require a constructor to function properly.
///
///
/// The number of new items to add.
/// The index of the added first item.
int64 AddZeroed(int32 count = 1)
{
const int64 index = Platform::InterlockedAdd(&_count, 1);
const int64 newCount = index + 1;
EnsureCapacity(newCount);
Platform::MemoryClear(Get() + index, count * sizeof(T));
return _data[index - 1];
}
///
/// Ensures that the buffer has the given the capacity (equal or more). Preserves the existing items by copy operation.
///
/// The minimum capacity.
void EnsureCapacity(int64 minCapacity)
{
// Early out
if (_capacity >= minCapacity)
return;
_resizeLocker.Lock();
// Skip if the other thread performed resizing
if (_capacity >= minCapacity)
{
_resizeLocker.Unlock();
return;
}
// Compute the new capacity
int64 newCapacity = _capacity == 0 ? 8 : Math::RoundUpToPowerOf2(_capacity) * 2;
if (newCapacity < minCapacity)
{
newCapacity = minCapacity;
}
ASSERT(newCapacity > _capacity);
// Allocate new data
T* newData = nullptr;
if (newCapacity > 0)
{
newData = (T*)Allocator::Allocate(newCapacity * sizeof(T));
}
// Check if has data
if (_data)
{
// Check if preserve contents
if (newData && _count > 0)
{
Platform::MemoryCopy(newData, _data, _capacity * sizeof(T));
}
// Delete old data
Allocator::Free(_data);
}
// Set state
_data = newData;
_capacity = newCapacity;
_resizeLocker.Unlock();
}
///
/// Swaps the contents of buffer with the other object without copy operation. Performs fast internal data exchange.
///
/// The other buffer.
void Swap(ConcurrentBuffer& other)
{
const auto count = _count;
const auto capacity = _capacity;
const auto data = _data;
_count = other._count;
_capacity = other._capacity;
_data = other._data;
other._count = count;
other._capacity = capacity;
other._data = data;
}
public:
///
/// Checks if the given element is in the collection.
///
/// The item.
/// true if collection contains the specified item; otherwise, false.
bool Contains(const T& item) const
{
for (int32 i = 0; i < _count; i++)
{
if (_data[i] == item)
{
return true;
}
}
return false;
}
///
/// Searches for the specified object and returns the zero-based index of the first occurrence within the entire collection.
///
/// The item.
/// The zero-based index of the first occurrence of item within the entire collection, if found; otherwise, INVALID_INDEX.
int32 IndexOf(const T& item) const
{
for (int32 i = 0; i < _count; i++)
{
if (_data[i] == item)
{
return i;
}
}
return INVALID_INDEX;
}
};