《Designing Data Intensive Applications》读书笔记 - 批处理

Unix 工具批处理

这一章先以 Unix 命令处理日志分析为例,介绍 Unix 设计的哲学,之后介绍 MapReduce 时会比较它们的相似之处。

下面这个命令是一个简单的日志分析,统计访问量最多的前 5 个 URL:

1
2
3
4
5
6
cat /var/log/nginx/access.log |
awk '{print $7}' |
sort |
uniq -c |
sort -r -n |
head -n 5

同时与 Ruby 脚本的实现代码对比,Unix 链式管道更为简洁。另外一个大的不同时,Ruby 脚本使用了一个哈希表,而 Unix 命令使用了排序。
如果数据量较小,所有的数据都能够放入内存,哈希表的性能更好。如果数据量非常大,不能完全放入内容,排序的方法就更合适。
sort 命令可以自动处理大量数据,超出内存会使用磁盘并且利用多核并行排序。

Unix 哲学:每个程序只做一件事,但是做的很好;程序之间通过管道连接;统一的文本接口;逻辑和 I/O 分离。

MapReduce 和分布式文件系统

MapReduce 有点像 Unix 工具,只不过可能分布在上千台机器上。每个 MapReduce 任务就像单个 Unix 命令,接收一个或多个输入,产生一个或多个输出,通常不会修改输入。

MapReduce 读写分布式文件系统上的文件,Hadoop 的 MapReduce 文件系统是 HDFS,基于无共享原则。 每台机器上运行一个守护进程,对外暴露网络服务,允许其它节点访问存储其上的文件。一个中心化的 NameNode 记录文件的位置。
考虑故障,每个文件会有多个副本。

MapReduce 作业执行

以上面的日志分析为例,一个 MapReduce 作业步骤:

  1. 读取输入文件,将其分成记录,每个记录对应一行日志。
  2. 调用 mapper 函数提取记录中的 URL 作为 key, value 为空
  3. 排序所有的键值对
  4. 调用 reducer 函数处理排序后的键值对

分布式执行
和 Unix 命令行最大的不同是 MapReduce 可以在多个节点上并行执行。map 任务的数量通常由输入文件块的数量决定,而 reduce 任务的数量由用户指定。框架会确保相同 key 的记录会被发送到同一个 reduce 任务。
键值对需要进行排序,但是数据量可能非常大,无法在单台机器上排序,需要分阶段排序。首先,map 任务会对输出进行排序,基于 key 的哈希将输出分为多个分区(对应 reducer),每个分区写入到单独的文件中。
在 mapper 完成任务之后, reducer 开始下载排序后的文件并进行合并同时保留了原来的顺序。

MapReduce 工作流
单个 MapReduce 可以解决的问题有限,像上面的日志分析例子,单个任务只能决定每个 URL 的访问次数,如果需要找到最热门的 URL,需要另外一个 MapReduce 任务。所以,将多个 MapReduce 任务组合成一个工作流是很常见的。
有点像 Unix 中的管道了。

Reduce-Side Join and Grouping

数据库的 join 操作非常常见,比如一个数据集是用户信息,另一个数据集是用户的日志,需要将两个数据集合并到一起。

Sort-merge joins
两个 mapper 任务,一个处理用户信息,一个处理日志,输出的键值对都是用户 ID 和信息。在 reduce 阶段,将两个数据集合并到一起,这个过程叫做 sort-merge join。

mapper 就像发送信息到 reducer,而 key 就像地址。

处理倾斜
如果存在热点,比如一个用户有大量的日志,这个用户的数据会集中在一个 reducer 上,就会导致这个 reducer 的负载很高。
一种方法是,将热点分散到多个 reducer 上。 Hive 使用的是另一种方法,需要先标识出热点,然后在执行 join 时,使用 map-side join。

Map-Side Join

Broadcast hash join
简单来说就是如果用户数据量比较小,直接在 map 阶段将用户数据加载到内存中,然后在 map 阶段就可以进行 join 操作。

Partitioned hash join
如果数据量比较大,不能放入内存,可以将用户数据分区,每个分区对应一个 reducer,然后在 map 阶段将用户数据发送到对应的 reducer 上。

应用

一是建立索引,二是机器学习分类或推荐系统。

与分布式数据库的比较

可以说 MapReduce 更为灵活,存储可以是任意格式数据。另外,MapReduce 是针对频繁故障设计,倒不是硬件太不可靠,而是需要任意终止任务。

MapReduce 之上

MapReduce 很容易理解,但是使用起来非常麻烦,所以有了很多的抽象实现。

中间状态的物化

MapReduce 任务之间的数据传递是通过文件系统,每个任务都会将输出写入到文件系统,下一个任务再读取。和开头的 Unix 例子相比,这样的不足:

  1. 作业只有在所有先行作业完成之后才能开始
  2. 不必要的 Mapper
  3. 存储这些中间状态意味着在节点之间复制

保存这些中间状态的一个好处是容错,如果一个任务失败,可以重新执行。 Flink 和 Spark 避免将中间状态写入文件系统,在机器出现故障时,需要重新计算,这就需要框架追踪数据时如何计算的。

高级 API

高级语言像 Pig 和 Hive,可以将 SQL 转换为 MapReduce 任务。这些语言提供了更高级的抽象,比如 join 操作,分组,过滤等。

声明式查询语言
查询优化器可以利用列式存储只读取需要的列。

总结

这一章感觉内容并不深,尤其是已经对 MapReduce 有所了解的情况。最有意思或许是将其与 Unix 命令行做比较,让人更有启发。

《Designing Data Intensive Applications》读书笔记 - 数据库复制

这一章讲数据库复制(Replication),目标很简单就是保存数据副本在多个机器上,但是实现却没那么容易。首先需要数据复制的几个原因:

  1. 数据中心地理上更靠近用户
  2. 增强可用性,即便部分服务器节点失败,整个系统依然可用
  3. 提高读取的吞吐量
阅读更多

《The DynamoDB Book》读书笔记

同事推荐的一本书,只有英文电子版。作者是Alex DeBrie,之前介绍过,是单表设计的推崇者。

这本书前面部分几个章节介绍 DynamoDB 的基本概念,后面部分是一些实际的设计案例。

阅读更多

DynamoDB 单表设计的优势与考量

大多数开发都有关系数据库设计经验,在初次使用 DynamoDB 设计数据模型的时候,很容易陷入关系数据库的思维陷阱, 不自觉的遵守关系数据库设计的范式, 尝试将数据模型规范化,每个实体或实体关系都有对应的单独的表,通常称之为多表设计。
与之对应的是,将所有实体和实体关系都存储在同一张表中,毕竟 DynamoDB 是 Schemaless 的数据库,称之为单表设计。
这儿要强调的是,这两种设计只是极端的两点。可能也不是一个合适的命名,因为在实际应用中,单表设计并不意味着只能有一张表。
在两个极端之间,单表设计更倾向于将相关实体存入在同一张表中,多表设计则倾向将不同实体类型存入不同的表中。

官方文档中,单表和多表设计比较时也较为推荐单表设计。本文就来根据实际经验,讨论下实际实践中单表设计的优势。
我们自己的项目采用的是单表设计,很大程度上受 《The DynamoDB Book》影响,作者 Alex DeBrie 是单表设计的推崇者。当然,我们项目中已经有十几张表,尽管我们已经尽量将相关实体存入同一张表中。

阅读更多

AWS Connect 转接最近通话的客服

需求

最近接到一个需求,需要将客户来电转接到最近与客户通话的客服。这个需求很容易理解,
客户可能因为各种各样的原因中断通话,再次来电很可能是因为同一个诉求,比如保险索赔,可能需要多次来回沟通。
将通话转给同一个客服,客服可以接着继续处理而不用熟悉客户场景,这样做能够提高处理效率。
尽管这个需求看起来很基础,但是并没有一个开箱可用的方案。我们的呼叫中心是 Amazon Connect,不过并没有启用 Profile,一些方案也不能采用。

阅读更多

AWS client getaddrinfo EMFILE issue

最近,在我们系统中引入了 AWS Cloud Map 作为我们的服务发现系统。部署几周后没有问题,今天突然抛出错误,日志显示错误 getaddrinfo EMFILE events.ap-southeast-2.amazonaws.com
当然,并非所有请求都触发了此错误,只是在高流量时段才出现了这个错误。

阅读更多

如何防止重复处理 SQS 消息

问题

一般来说在我们的系统中,消息处理必须保证幂等性,以防止消息重复处理。在我们的系统中,下面两种情况可能导致相同消息被重复处理:

  1. 调度器和消息生产者:调度器或消息生产者可能会被多次触发,比如时不时有些任务因为超时而被多次触发。
  2. 队列管理:如果一个 Lambda 实例处理消息超时,另一个实例可能会在 visibility timeout 设置不合适的情况下得到重新处理相同消息的机会。

如果消息被多次处理,我们可能会向客户发送重复的电子邮件和短信,甚至礼品卡都可能重复发送。所以,我们需要一个通用的机制来确保相同消息不会被多次处理。

阅读更多