diff --git a/Source/Engine/Core/Collections/Dictionary.h b/Source/Engine/Core/Collections/Dictionary.h index e2f5f0ed6..e18a5b999 100644 --- a/Source/Engine/Core/Collections/Dictionary.h +++ b/Source/Engine/Core/Collections/Dictionary.h @@ -4,6 +4,9 @@ #include "HashSetBase.h" +template +class ConcurrentDictionary; + /// /// Describes single portion of space for the key and value pair in a hash map. /// @@ -13,6 +16,7 @@ struct DictionaryBucket friend Memory; friend HashSetBase; friend Dictionary; + friend ConcurrentDictionary; /// The key. KeyType Key; diff --git a/Source/Engine/Threading/ConcurrentDictionary.h b/Source/Engine/Threading/ConcurrentDictionary.h new file mode 100644 index 000000000..22395d798 --- /dev/null +++ b/Source/Engine/Threading/ConcurrentDictionary.h @@ -0,0 +1,316 @@ +// Copyright (c) Wojciech Figat. All rights reserved. + +#pragma once + +#include "Engine/Core/Collections/Dictionary.h" +#include "Engine/Platform/CriticalSection.h" + +/// +/// Template for unordered dictionary with mapped key with value pairs that supports asynchronous data reading and writing. +/// +/// The type of the keys in the dictionary. +/// The type of the values in the dictionary. +/// The type of memory allocator. +template +class ConcurrentDictionary : Dictionary +{ + friend ConcurrentDictionary; +public: + typedef Dictionary Base; + typedef DictionaryBucket Bucket; + using AllocationData = typename AllocationType::template Data; + using AllocationTag = typename AllocationType::Tag; + +private: + mutable volatile int64 _threadsReading = 0; + volatile int64 _threadsWriting = 0; + CriticalSection _locker; + +public: + /// + /// Initializes an empty without reserving any space. + /// + ConcurrentDictionary() + { + } + + /// + /// Initializes an empty without reserving any space. + /// + /// The custom allocation tag. + ConcurrentDictionary(AllocationTag tag) + : Base(tag) + { + } + + /// + /// Finalizes an instance of the class. + /// + ~ConcurrentDictionary() + { + Clear(); + } + +public: + /// + /// Gets the amount of the elements in the collection. + /// + int32 Count() const + { + Reader reader(this); + return Base::_elementsCount; + } + + /// + /// Gets the amount of the elements that can be contained by the collection. + /// + int32 Capacity() const + { + Reader reader(this); + return Base::_size; + } + + /// + /// Tries to get element with given key. + /// + /// The key of the element. + /// The result value. + /// True if element of given key has been found, otherwise false. + template + bool TryGet(const KeyComparableType& key, ValueType& result) const + { + Reader reader(this); + typename Base::FindPositionResult pos; + Base::FindPosition(key, pos); + if (pos.ObjectIndex != -1) + result = Base::_allocation.Get()[pos.ObjectIndex].Value; + return pos.ObjectIndex != -1; + } + +public: + /// + /// Adds a pair of key and value to the collection. + /// + /// The key. + /// The value. + /// True if added element, otherwise false if it already exists (or other thread added it). + template + bool Add(const KeyComparableType& key, const ValueType& value) + { + Writer writer(this); + Bucket* bucket = Base::OnAdd(key, false, true); + if (bucket) + bucket->Occupy(key, value); + return bucket != nullptr; + } + + /// + /// Removes element with a specified key. + /// + /// The element key to remove. + /// True if item was removed from collection, otherwise false. + template + bool Remove(const KeyComparableType& key) + { + Writer writer(this); + return Base::Remove(key); + } + +public: + /// + /// Removes all elements from the collection. + /// + void Clear() + { + Writer writer(this); + Base::Clear(); + } + +public: + /// + /// The read-only dictionary collection iterator. + /// + struct ConstIterator : Base::IteratorBase + { + friend ConcurrentDictionary; + public: + ConstIterator(const ConcurrentDictionary* collection, const int32 index) + : Base::IteratorBase(collection, index) + { + if (collection) + collection->BeginRead(); + } + + ConstIterator(const ConstIterator& i) + : Base::IteratorBase(i._collection, i._index) + { + if (i.collection) + i.collection->BeginRead(); + } + + ConstIterator(ConstIterator&& i) noexcept + : Base::IteratorBase(i._collection, i._index) + { + i._collection = nullptr; + } + + ~ConstIterator() + { + if (this->_collection) + ((ConcurrentDictionary*)this->_collection)->EndRead(); + } + + public: + FORCE_INLINE bool operator!() const + { + return !(bool)*this; + } + + FORCE_INLINE bool operator==(const ConstIterator& v) const + { + return this->_index == v._index && this->_collection == v._collection; + } + + FORCE_INLINE bool operator!=(const ConstIterator& v) const + { + return this->_index != v._index || this->_collection != v._collection; + } + + ConstIterator& operator=(const ConstIterator& v) + { + this->_collection = v._collection; + this->_index = v._index; + return *this; + } + + ConstIterator& operator=(ConstIterator&& v) noexcept + { + this->_collection = v._collection; + this->_index = v._index; + v._collection = nullptr; + return *this; + } + + ConstIterator& operator++() + { + this->Next(); + return *this; + } + + ConstIterator operator++(int) const + { + ConstIterator i = *this; + i.Next(); + return i; + } + + ConstIterator& operator--() + { + this->Prev(); + return *this; + } + + ConstIterator operator--(int) const + { + ConstIterator i = *this; + i.Prev(); + return i; + } + }; + + ConstIterator begin() const + { + ConstIterator i(this, -1); + ++i; + return i; + } + + FORCE_INLINE ConstIterator end() const + { + return ConstIterator(this, Base::_size); + } + +private: + void BeginWrite() + { + Platform::InterlockedIncrement(&_threadsWriting); + + // Wait for all reads to end + RETRY: + while (Platform::AtomicRead(&_threadsReading)) + Platform::Yield(); + + // Thread-safe writing + _locker.Lock(); + if (Platform::AtomicRead(&_threadsReading)) + { + // Other reader entered during mutex locking so give them a chance to transition into active-waiting + _locker.Unlock(); + goto RETRY; + } + } + + void EndWrite() + { + _locker.Unlock(); + Platform::InterlockedDecrement(&_threadsWriting); + } + + void BeginRead() const + { + RETRY: + Platform::InterlockedIncrement(&_threadsReading); + + // Check if any thread is writing (or is about to write) + if (Platform::AtomicRead(&_threadsWriting) != 0) + { + // Wait for all writes to end + Platform::InterlockedDecrement(&_threadsReading); + while (Platform::AtomicRead(&_threadsWriting)) + Platform::Yield(); + + // Try again + goto RETRY; + } + } + + void EndRead() const + { + Platform::InterlockedDecrement(&_threadsReading); + } + +private: + // Utility for methods that read-write state. + struct Writer + { + ConcurrentDictionary* _collection; + + Writer(ConcurrentDictionary* collection) + : _collection(collection) + { + _collection->BeginWrite(); + } + + ~Writer() + { + _collection->EndWrite(); + } + }; + + // Utility for methods that read-only state. + struct Reader + { + const ConcurrentDictionary* _collection; + + Reader(const ConcurrentDictionary* collection) + : _collection(collection) + { + _collection->BeginRead(); + } + + ~Reader() + { + _collection->EndRead(); + } + }; +};