栈和队列很简单:接口极其有限,并且它们非常紧密地专注于特定用途。并非所有数据结构都那么简单;大多数数据结构都支持多种操作。原则上,这可以带来更多并发机会,但也使保护数据的任务变得更加困难,因为需要考虑多种访问模式。在设计用于并发访问的此类数据结构时,各种可执行操作的精确性非常重要。
为了了解其中涉及的一些问题,让我们看一下查找表的设计。
6.3.1 使用锁编写线程安全的查找表(lookup table)
查找表或字典将一种类型(key type 键类型)的值与相同或不同类型(mapped type 映射类型)的值相关联。一般来说,这种结构背后的目的是允许代码根据给定的键值查询数据。在 C++ 标准库中,此功能由关联容器提供:std::map<>、std::multimap<>、std::unordered_map<> 和 std::unordered_multimap<>。
查找表的使用模式与栈或队列不同。虽然栈或队列上的几乎每个操作都会以某种方式对其进行修改(添加元素或删除元素),但查找表可能很少被修改。列表 3.13 中的简单 DNS 缓存就是这种场景的一个例子,与 std::map<> 相比,它的接口大大减少。正如您在栈和队列中看到的那样,当要从多个线程同时访问数据结构时,标准容器的接口不适合,因为接口设计中存在固有的竞争条件,因此需要将它们删减并修改。
从并发角度来看,std::map<> 接口的最大问题是迭代器(iterators)。尽管迭代器可以在其他线程访问(以及修改)容器(container)时提供安全的访问,这也是个棘手的问题。正确处理迭代器需要您处理诸如另一个线程删除迭代器所引用的元素之类的问题,这可能会相当复杂。对于线程安全查找表接口的第一部分,您将跳过迭代器。鉴于 std::map<>(以及标准库中的其他关联容器)的接口很大程度上基于迭代器,因此需要将它们放在一边并从头开始设计接口。 查找表只有几个基本操作:
■ 添加新的键/值对。
■ 更改与给定键关联的值。
■ 删除键及其关联值。
■ 获取与给定键关联的值(如果有)。
还有一些可能有用的容器相关操作(container-wide operations),例如检查容器是否为空、完整键列表的快照或完整键/值对集的快照。
如果您坚持简单的线程安全准则(例如不返回引用)并在每个成员函数的整个范围内加锁,那么所有这些都是安全的;它们要么出现在另一个线程的某些修改之前,要么出现在它之后。竞争场景的最大可能性是添加新的键/值对时;如果两个线程添加新值,则只有一个线程会成为第一个,因此第二个线程将失败。一种可能性是将添加和更改合并到一个成员函数中,就像列表 3.13 中对 DNS 缓存所做的那样。
从接口的角度来看,唯一有趣的一点是获取关联值的部分(如果有的话)。一种选择是允许用户提供在 key 不存在时返回的“默认”结果: mapped_type get_value(key_type const& key, mapped_type default_value);
在这种情况下,如果未显式提供default_value,则可以使用mapped_type 的默认构造实例。这也可以扩展为返回 std::pair<mapped_type,bool> 而不仅仅是mapped_type 的实例,其中 bool 指示该值是否存在。另一种选择是返回引用该值的智能指针;如果指针值为 NULL,则没有可返回的值。
正如已经提到的,一旦确定了接口,那么(假设没有接口竞争场景)可以通过在每个成员函数周围使用单个互斥体和简单的锁以保护底层数据结构来保证线程安全。然而,单独数据结构读取和修改的接口会浪费并发的可能性。一种选择是使用支持多个读取器线程或单个写入器线程的互斥体,例如列表 3.13 中使用的 boost::shared_mutex。虽然这确实会提高并发访问的可能性,但一次只有一个线程可以修改数据结构。理想中,您希望比这更好。
设计用于细粒度锁定的map数据结构 与第 6.2.3 节中讨论的队列一样,为了允许细粒度锁定,您需要仔细查看数据结构的细节,而不是仅仅包装现有的容器(如std::map<>)。实现关联容器(例如查找表)有三种常见方法:
■ 二叉树,例如红黑树
■ 已排序的数组
■ 哈希表
二叉树并没有提供太多扩展并发机会的空间;每次查找或修改都必须从访问根节点开始,因此必须锁定根节点。尽管当访问线程沿着树向下移动时可以释放该锁,但这并不比整个数据结构中的单个锁好多少。
排序数组更糟糕,因为您无法提前知道给定数据值将位于数组中的哪个位置,因此您需要对整个数组使用单个锁。
这就留下了哈希表。假设数量固定的桶,一个 key 属于哪个桶纯粹取决于 key 的值以及它的哈希函数。这意味着您可以安全地为每个存储桶加锁。如果您再次使用支持多个读取器或单个写入器的互斥锁,则并发机会会增加 N 倍,其中 N 是存储桶的数量。缺点是您需要一个良好的密钥哈希函数。 C++ 标准库为此提供了 std::hash<> 模板。它已经专门提供了基本类型(如 int)和常见类型(如 std::string)的序列化,并且用户可以轻松序列化其他关 key 类型。如果您遵循标准无序容器的准则,并将用于进行散列的函数对象的类型作为模板参数,则用户可以选择是否将 std::hash<> key 类型序列化或者提供一个单独的哈希函数。
那么,让我们看一些代码。线程安全查找表的实现可能是什么样的?这里显示了一种可能性。
Listing 6.11 A thread-safe lookup table
template<typename key,typename value,typename hash="std::hash<Key"> >
class threadsafe_lookup_table
{
private:
class bucket_type
{
private:
typedef std::pair<key,value> bucket_value;
typedef std::list<bucket_value> bucket_data;
typedef typename bucket_data::iterator bucket_iterator;
bucket_data data;
mutable boost::shared_mutex mutex; - [1]
bucket_iterator find_entry_for(Key const& key) const - [2]
{
return std::find_if(data.begin(),data.end(),
[&](bucket_value const& item)
{return item.first==key;});
}
public:
Value value_for(Key const& key,Value const& default_value) const
{
boost::shared_lock<boost::shared_mutex> lock(mutex); - [3]
bucket_iterator const found_entry=find_entry_for(key);
return (found_entry==data.end())?
default_value:found_entry->second;
}
void add_or_update_mapping(Key const& key,Value const& value)
{
std::unique_lock<boost::shared_mutex> lock(mutex); - [4]
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry==data.end())
{
data.push_back(bucket_value(key,value));
}
else
{
found_entry->second=value;
}
}
void remove_mapping(Key const& key)
{
std::unique_lock<boost::shared_mutex> lock(mutex); - [5]
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry!=data.end())
{
data.erase(found_entry);
}
}
};
std::vector<std::unique_ptr<bucket_type> > buckets; - [6]
Hash hasher;
bucket_type& get_bucket(Key const& key) const - [7]
{
std::size_t const bucket_index=hasher(key)%buckets.size();
return *buckets[bucket_index];
}
public:
typedef Key key_type;
typedef Value mapped_type;
typedef Hash hash_type;
threadsafe_lookup_table(
unsigned num_buckets=19,Hash const& hasher_=Hash()):
buckets(num_buckets),hasher(hasher_)
{
for(unsigned i=0;i<num_buckets;++i) { buckets[i].reset(new bucket_type); } threadsafe_lookup_table(threadsafe_lookup_table const& other)="delete;" threadsafe_lookup_table& operator="(" threadsafe_lookup_table value value_for(key key, default_value="Value())" const return get_bucket(key).value_for(key,default_value); - [8] void add_or_update_mapping(key key,value value) get_bucket(key).add_or_update_mapping(key,value); [9] remove_mapping(key key) get_bucket(key).remove_mapping(key); [10] }; ``` 该实现使用 std::vector<std::unique_ptr<bucket_type>> [6] 来保存桶,这允许在构造函数中指定存储桶的数量。默认为19,为任意素数(prime number);哈希表在桶数为素数时效果最佳。每个存储桶都使用 boost::shared_mutex [1] 的实例进行保护,以允许对每个桶进行多个并发读取或单个修改。
因为桶的数量是固定的,所以 get_bucket() 函数 [7] 可以无锁调用 [8],[9],[10],然后桶的互斥体可以通过函数被共享(read-only)关系[4]或独占(read/write)关系[4],[5]锁定。
使用 find_entry_for() [2] 的三个函数都是用来确定条目是否在桶中。每个存储桶仅包含键/值对的 std::list<>,因此添加和删除条目很容易。
我已经介绍了并发角度,并且所有内容都通过互斥锁进行了适当的保护,那么异常安全性(exception safety)又如何呢? value_for 不会修改任何内容,所以没关系;如果抛出异常,不会影响数据结构。 remove_mapping 通过调用erase来修改列表,但这保证不会抛出异常,所以这是安全的。这留下了 add_or_update_mapping,它 if 的两个分支都可能会抛出异常。 push_back 是异常安全的,如果抛出异常,列表将保持原始状态,因此该分支没问题。唯一的问题是在替换现有值时的赋值;如果赋值抛出异常,那么您希望它原有值不被修改。但是,这不会影响整个数据结构,并且完全是用户提供的类型的属性,因此您可以放心地将其留给用户来处理。
在本节的开头,我提到这种查找表的一个不错的功能是将当前状态的快照检索到例如 std::map<> 中。这将需要锁定整个容器,以确保检索副本的一致性,这需要锁定所有存储桶。由于查找表上的“正常”操作一次只需要锁定一个存储桶,因此这将是唯一需要锁定所有存储桶的操作。因此,如果你每次都以相同的顺序锁定它们(例如,增加存储桶索引),就不会有死锁的机会。下面的列表显示了这样的实现。
**Listing 6.12 Obtaining contents of a threadsafe_lookup_table as a std::map<>**
std::map<key,value> threadsafe_lookup_table::get_map() const { std::vector<std::unique_lockboost::shared_mutex > locks; for(unsigned i=0;i<buckets.size();++i) { locks.push_back( std::unique_lockboost::shared_mutex(buckets[i].mutex)); } std::map<key,value> res; for(unsigned i=0;i<buckets.size();++i) { for(bucket_iterator it="buckets[i].data.begin();" it!="buckets[i].data.end();" ++it) res.insert(*it); } return res; ``` 列表 6.11 中的查找表使用 boost::shared_mutex 单独锁定每一个桶允许每个存储桶上并发读取,增加了整个查找表并发的机会。但是,如果您可以通过更细粒度的锁定来增加存储桶上并发的潜力呢?在下一节中,您将通过使用支持迭代器的线程安全列表容器来实现这一点。 ### 6.3.2 使用锁编写线程安全列表(list) 列表是最基本的数据结构之一,因此编写线程安全的列表应该很简单,不是吗?好吧,这取决于您想要什么组件,并且您需要一个支持迭代器的组件,我避免将其添加到您的map中,因为它太复杂了。 stl 样式迭代器支持的基本问题是迭代器必须保存对容器内部数据结构的某种引用。如果可以从另一个线程修改容器,则该引用必须以某种方式保持有效,这本质上要求迭代器在结构的某些部分上持有锁。鉴于 样式迭代器的生命周期完全不受容器的控制,这是一个坏主意。 另一种方法是提供迭代函数,例如 for_each 作为容器本身的一部分。这让容器完全负责迭代和锁定,但它确实违反了第 3 章中的死锁避免准则。为了让 做任何有用的事情,它必须在持有内部锁的同时调用用户提供的代码。不仅如此,它还必须将对每个节点的引用传递给用户提供代码,以便用户提供的代码能够处理该节点。您可以通过将每个节点的副本传递给用户提供的代码来避免这种情况,但如果数据项目很大,那么成本会很高。 因此,现在您将由用户来确保他们不会因在用户提供操作中获取锁而导致死锁,并且不会因存储锁外部访问的引用而导致数据竞争。对于查找表使用列表的情况,这是完全安全的,因为你知道你不会做任何顽皮的事情。 这就需要您来考虑您的列表提供哪些操作。如果您回头看看列表 和 6.12,您可以看到您需要的操作类型: ■ 将节点添加到列表中。 如果某个节点满足特定条件,则将其从列表中删除。 在列表中查找满足特定条件的节点。 更新满足特定条件的节点。 将列表中的每个节点复制到另一个容器。 为了使其成为一个良好的通用列表容器,添加更多操作(例如位置插入)会很有帮助,但这对于您的查找表来说是不必要的,因此我将其作为读者的练习。 链表细粒度锁定的基本思想是每个节点有一个互斥体。如果列表变大,就会有很多互斥体!这样做的好处是,对列表的不同部分的操作是真正并发的:每个操作仅持有它实际感兴趣的节点上的锁,并在每个节点移动到下一个节点时解锁它。下一个列表显示了此类列表的实现。 listing 6.13 a thread-safe list with iteration support template<typename t> class threadsafe_list { struct node { - [1] std::mutex m; std::shared_ptr<t> data; std::unique_ptr<node> next;
node(): - [2]
next()
{}
node(T const& value): - [3]
data(std::make_shared<t>(value))
{}
};
node head;
public: threadsafe_list() {} ~threadsafe_list() { remove_if([](node const&){return true;}); } threadsafe_list(threadsafe_list const& other)=delete; threadsafe_list& operator=(threadsafe_list const& other)=delete;
void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value)); - [4]
std::lock_guard<std::mutex> lk(head.m);
new_node->next=std::move(head.next); - [5]
head.next=std::move(new_node); - [6]
}
template<typename function>
void for_each(Function f) - [7]
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m); - [8]
while(node* const next=current->next.get()) - [9]
{
std::unique_lock<std::mutex> next_lk(next->m); - [10]
lk.unlock(); - [11]
f(*next->data); - [12]
current=next;
lk=std::move(next_lk); - [13]
}
}
template<typename predicate>
std::shared_ptr<t> find_first_if(Predicate p) - [14]
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if(p(*next->data)) - [15]
{
return next->data; - [16]
}
current=next;
lk=std::move(next_lk);
}
return std::shared_ptr<t>();
}
template<typename predicate>
void remove_if(Predicate p) - [17]
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*next->data)) - [18]
{
std::unique_ptr<node> old_next=std::move(current->next);
current->next=std::move(next->next); - [19]
next_lk.unlock();
} - [20]
else
{
lk.unlock(); - [21]
current=next;
lk=std::move(next_lk);
}
}
}
};
列表 6.13 中的 threadsafe_list<> 是一个单链表,其中每个条目都是一个 node 结构 [1]。默认构造的 node 作为列表的 head 头部,其 next 指针为 NULL [2]。使用push_front()函数添加新节点;首先构造一个新节点 [4],它在堆上分配存储的数据[3],同时将next指针保留为NULL。然后,您需要获取 head 节点互斥锁上的锁,以便获取适当的 next 值 [5],并通过设置 head.next 指向新节点将节点插入到列表的前面[6]。到目前为止,一切都很好:您只需锁定一个互斥体即可将新项目添加到列表中,因此不存在死锁的风险。此外,缓慢的内存分配发生在锁之外,因此锁仅保护几个不会失败的指针值的更新。继续迭代函数。
首先,让我们看一下 for_each() [7]。该操作传入某种类型的 Function 来操作列表中的每个元素;与大多数标准库算法一样,它按值获取该函数,并将与真正的函数或带函数调用运算符的类型的对象一起使用。在这种情况下,函数必须接受类型 T 的值作为唯一参数。这里您可以依次手动锁定。首先,您可以锁定head节点上的互斥体[8]。然后就可以安全地获取指向 next 节点的指针(使用 get() 因为您没有获得该指针的所有权)。如果该指针不为 NULL [9],则锁定该节点上的互斥锁 [10] 以处理数据。一旦获得了该节点上的锁,就可以释放前一个节点上的锁 [11] 并调用指定的函数 [12]。函数完成后,您可以将 current 指针更新为刚刚处理的节点,并将锁的所有权从 next_lk 移至 lk [13]。因为 for_each 将每个数据项直接传递给提供的 Function,所以您可以使用它来更新节点(如果需要)或将它们复制到另一个容器中,等等。如果函数表现良好,这是完全安全的,因为保存数据项的节点的互斥体在整个调用过程中都被持有。
find_first_if() [14] 与 for_each() 类似;关键的区别在于,所提供的 Predicate 必须返回 true 来表示匹配,或者返回 false 来表示不匹配 [15]。一旦找到匹配项,您只需返回找到的数据 [16] 而不是继续搜索。您可以使用 for_each() 来完成此操作,但即使找到匹配项,它也会不必要地继续处理列表的其余部分。
remove_if() [17] 略有不同,因为该函数必须实际更新列表;你不能为此使用 for_each() 。如果 Predicate 返回 true [18],则可以通过更新 current->next [19] 从列表中删除该节点。完成此操作后,您可以释放 next 节点上互斥体的锁定。该节点会在被 move 的 std::unique_ptr<node> 超出作用域后被删除 [20]。在这种情况下,您不会更新 current 节点,因为您需要检查新 的next 节点。如果 Predicate 返回 false,您只想像之前一样继续 [21]。
那么,所有这些互斥体是否存在死锁或竞争场景?这里的答案绝对是否定的,只要提供的 Predicate 和 Function 表现良好。迭代始终是一种方式,始终从 head 节点开始,并且始终在释放当前互斥锁之前锁定下一个互斥锁,因此不同线程中不可能有不同的锁定顺序。竞争场景唯一的可能是在remove_if()中删除已删除的节点 [20],因为您在解锁互斥体后执行此操作(销毁锁定的互斥体是未定义的行为)。然而,稍微思考一下就会发现这确实是安全的,因为您仍然持有前一个节点(当前)上的互斥锁,因此没有新线程可以尝试获取您要删除的节点上的锁。
并发的机会怎么样?这种细粒度锁定的全部目的是提高单个互斥体并发的可能性,那么您实现了吗?是的,你有:不同的线程可以同时在列表中的不同节点上工作,无论它们只是使用 for_each() 处理每个节点,使用 find_first_if() 搜索,还是使用remove_if() 删除节点。但由于每个节点的互斥锁必须依次锁定,因此线程之间无法相互传递。如果一个线程花费很长时间处理某一特定节点,则其他线程在到达该特定节点时将不得不等待。
## 6.4 总结
本章首先介绍了设计并发数据结构的含义,并提供了一些指导方针。然后,我们研究了几种常见的数据结构(栈、队列、哈希映射和链表),研究如何应用这些准则以专为并发访问设计的方式实现它们,使用锁来保护数据并防止数据竞争。您现在应该能够查看自己的数据结构的设计,以了解并发机会在哪里以及哪里存在竞争条件的可能性。
在第 7 章中,我们将研究完全避免锁的方法,使用底层原子操作来提供必要的排序约束,同时坚持同一组准则。
</node></node></std::mutex></std::mutex></typename></t></std::mutex></std::mutex></t></typename></std::mutex></std::mutex></typename></std::mutex></node></t></node></t></buckets.size();++i)></key,value></buckets.size();++i)></std::unique_lock<boost::shared_mutex></key,value></num_buckets;++i)></std::unique_ptr<bucket_type></boost::shared_mutex></boost::shared_mutex></boost::shared_mutex></bucket_value></key,value></typename></mapped_type,bool>