Ruby 并发和并行:实用教程
已发表: 2022-03-11让我们首先澄清一个在 Ruby 开发人员中非常常见的混淆点; 即:并发和并行不是一回事(即并发!=并行)。
特别是,Ruby并发是指两个任务可以在重叠的时间段内启动、运行和完成。 但是,这并不一定意味着它们会同时运行(例如,单核机器上的多个线程)。 相反,并行性是指两个任务同时运行(例如,多核处理器上的多个线程)。
这里的关键点是并发线程和/或进程不一定会并行运行。
本教程对 Ruby 中可用于并发和并行性的各种技术和方法进行了实践(而不是理论)处理。
有关更多真实世界的 Ruby 示例,请参阅我们关于 Ruby 解释器和运行时的文章。
我们的测试用例
对于一个简单的测试用例,我将创建一个Mailer
类并添加一个 Fibonacci 函数(而不是sleep()
方法)以使每个请求更加占用 CPU,如下所示:
class Mailer def self.deliver(&block) mail = MailBuilder.new(&block).mail mail.send_mail end Mail = Struct.new(:from, :to, :subject, :body) do def send_mail fib(30) puts "Email from: #{from}" puts "Email to : #{to}" puts "Subject : #{subject}" puts "Body : #{body}" end def fib(n) n < 2 ? n : fib(n-1) + fib(n-2) end end class MailBuilder def initialize(&block) @mail = Mail.new instance_eval(&block) end attr_reader :mail %w(from to subject body).each do |m| define_method(m) do |val| @mail.send("#{m}=", val) end end end end
然后我们可以如下调用这个Mailer
类来发送邮件:
Mailer.deliver do from "[email protected]" to "[email protected]" subject "Threading and Forking" body "Some content" end
(注意:此测试用例的源代码可在 github 上找到。)
为了建立比较基准,让我们先做一个简单的基准测试,调用邮件程序 100 次:
puts Benchmark.measure{ 100.times do |i| Mailer.deliver do from "eki_#{i}@eqbalq.com" to "jill_#{i}@example.com" subject "Threading and Forking (#{i})" body "Some content" end end }
这在使用 MRI Ruby 2.0.0p353 的四核处理器上产生了以下结果:
15.250000 0.020000 15.270000 ( 15.304447)
多进程与多线程
在决定是使用多进程还是多线程 Ruby 应用程序时,没有“一刀切”的答案。 下表总结了一些需要考虑的关键因素。
流程 | 线程 |
---|---|
使用更多内存 | 使用更少的内存 |
如果父进程在子进程退出之前就死了,子进程就会变成僵尸进程 | 当进程死亡时所有线程都死亡(没有僵尸的机会) |
由于操作系统需要保存和重新加载所有内容,因此分叉进程切换上下文的成本更高 | 线程的开销要少得多,因为它们共享地址空间和内存 |
分叉的进程被赋予一个新的虚拟内存空间(进程隔离) | 线程共享同一个内存,所以需要控制和处理并发内存问题 |
需要进程间通信 | 可以通过队列和共享内存“通信” |
创建和销毁速度较慢 | 更快地创建和销毁 |
更容易编码和调试 | 编码和调试可能要复杂得多 |
使用多个进程的 Ruby 解决方案示例:
- Resque:一个 Redis 支持的 Ruby 库,用于创建后台作业,将它们放置在多个队列中,并在以后处理它们。
- Unicorn:用于 Rack 应用程序的 HTTP 服务器,旨在仅在低延迟、高带宽连接上为快速客户端提供服务,并利用 Unix/类 Unix 内核中的功能。
使用多线程的 Ruby 解决方案示例:
- Sidekiq:用于 Ruby 的全功能后台处理框架。 它旨在易于与任何现代 Rails 应用程序集成,并且比其他现有解决方案具有更高的性能。
- Puma:为并发而构建的 Ruby Web 服务器。
- Thin:一个非常快速和简单的 Ruby Web 服务器。
多进程
在我们研究 Ruby 多线程选项之前,让我们探索产生多个进程的更简单的路径。
在 Ruby 中, fork()
系统调用用于创建当前进程的“副本”。 这个新进程是在操作系统级别调度的,因此它可以与原始进程同时运行,就像任何其他独立进程一样。 (注意: fork()
是一个 POSIX 系统调用,因此如果您在 Windows 平台上运行 Ruby,则不可用。)
好的,让我们运行我们的测试用例,但这次使用fork()
来使用多个进程:
puts Benchmark.measure{ 100.times do |i| fork do Mailer.deliver do from "eki_#{i}@eqbalq.com" to "jill_#{i}@example.com" subject "Threading and Forking (#{i})" body "Some content" end end end Process.waitall }
( Process.waitall
等待所有子进程退出并返回一个进程状态数组。)
此代码现在产生以下结果(同样,在具有 MRI Ruby 2.0.0p353 的四核处理器上):
0.000000 0.030000 27.000000 ( 3.788106)
不是太寒酸! 我们通过修改几行代码(即使用fork()
)使邮件程序快了约 5 倍。
不过不要太兴奋。 尽管使用 fork 可能很诱人,因为它是 Ruby 并发的简单解决方案,但它有一个主要缺点,即它会消耗大量内存。 分叉有点昂贵,特别是如果您正在使用的 Ruby 解释器没有使用 Copy-on-Write (CoW)。 例如,如果您的应用程序使用 20MB 内存,则将其分叉 100 次可能会消耗多达 2GB 的内存!
此外,虽然多线程也有其自身的复杂性,但在使用fork()
时需要考虑许多复杂性,例如共享文件描述符和信号量(在父子进程和子进程之间),需要通过管道进行通信, 等等。
Ruby 多线程
好的,现在让我们尝试使用 Ruby 多线程技术来加快相同的程序。
单个进程中的多个线程的开销比相应数量的进程要少得多,因为它们共享地址空间和内存。
考虑到这一点,让我们重新审视我们的测试用例,但这次使用 Ruby 的Thread
类:
threads = [] puts Benchmark.measure{ 100.times do |i| threads << Thread.new do Mailer.deliver do from "eki_#{i}@eqbalq.com" to "jill_#{i}@example.com" subject "Threading and Forking (#{i})" body "Some content" end end end threads.map(&:join) }
此代码现在产生以下结果(同样,在具有 MRI Ruby 2.0.0p353 的四核处理器上):
13.710000 0.040000 13.750000 ( 13.740204)
真可惜。 这肯定不是很令人印象深刻! 发生什么了? 为什么这会产生与我们同步运行代码时几乎相同的结果?
答案是全球解释器锁 (GIL) ,它是许多 Ruby 程序员存在的祸根。 多亏了 GIL,CRuby(MRI 实现)并不真正支持线程。
全局解释器锁是计算机语言解释器中使用的一种机制,用于同步线程的执行,以便一次只能执行一个线程。 使用 GIL 的解释器将始终只允许一个线程和一个线程一次执行,即使在多核处理器上运行也是如此。 Ruby MRI 和 CPython 是具有 GIL 的流行解释器的两个最常见示例。
回到我们的问题,我们如何利用 Ruby 中的多线程来提高 GIL 的性能?
好吧,在 MRI (CRuby) 中,不幸的答案是你基本上被卡住了,多线程可以为你做的很少。

但是,对于 IO 繁重的任务(例如,需要经常在网络上等待的任务),没有并行性的 Ruby 并发仍然非常有用。 因此,对于 IO 繁重的任务,线程在 MRI中仍然很有用。 毕竟,线程在多核服务器普及之前就被发明和使用是有原因的。
但话虽如此,如果您可以选择使用 CRuby 以外的版本,则可以使用替代的 Ruby 实现,例如 JRuby 或 Rubinius,因为它们没有 GIL,而且它们确实支持真正的并行 Ruby 线程。
为了证明这一点,下面是我们运行与以前完全相同的线程版本代码时得到的结果,但这次在 JRuby(而不是 CRuby)上运行它:
43.240000 0.140000 43.380000 ( 5.655000)
现在我们正在谈论!
但…
线程不是免费的
多线程性能的提高可能会让人们相信我们可以不断地添加更多线程——基本上是无限的——来让我们的代码运行得越来越快。 如果这是真的,那确实很好,但现实情况是线程不是免费的,所以迟早会耗尽资源。
例如,假设我们希望运行示例邮件程序不是 100 次,而是 10,000 次。 让我们看看发生了什么:
threads = [] puts Benchmark.measure{ 10_000.times do |i| threads << Thread.new do Mailer.deliver do from "eki_#{i}@eqbalq.com" to "jill_#{i}@example.com" subject "Threading and Forking (#{i})" body "Some content" end end end threads.map(&:join) }
繁荣! 在生成大约 2,000 个线程后,我的 OS X 10.8 出现错误:
can't create Thread: Resource temporarily unavailable (ThreadError)
正如预期的那样,我们迟早会开始崩溃或完全耗尽资源。 所以这种方法的可扩展性显然是有限的。
线程池
幸运的是,有更好的方法; 即线程池。
线程池是一组预先实例化的、可重用的线程,可用于根据需要执行工作。 当需要执行大量短任务而不是少量较长任务时,线程池特别有用。 这可以避免产生大量创建线程的开销。
线程池的一个关键配置参数通常是池中的线程数。 这些线程可以一次全部实例化(即,在创建池时)或延迟实例化(即,根据需要,直到创建池中的最大线程数)。
当池被交给要执行的任务时,它会将任务分配给当前空闲的线程之一。 如果没有线程处于空闲状态(并且已经创建了最大线程数),它会等待线程完成其工作并变为空闲状态,然后将任务分配给该线程。
因此,回到我们的示例,我们将从使用Queue
开始(因为它是线程安全的数据类型)并使用线程池的简单实现:
需要“./lib/mailer” 需要“基准” 需要“线程”
POOL_SIZE = 10 jobs = Queue.new 10_0000.times{|i| jobs.push i} workers = (POOL_SIZE).times.map do Thread.new do begin while x = jobs.pop(true) Mailer.deliver do from "eki_#{x}@eqbalq.com" to "jill_#{x}@example.com" subject "Threading and Forking (#{x})" body "Some content" end end rescue ThreadError end end end workers.map(&:join)
在上面的代码中,我们首先为需要执行的作业创建了一个jobs
队列。 我们为此目的使用了Queue
,因为它是线程安全的(因此如果多个线程同时访问它,它将保持一致性),这避免了需要使用互斥锁的更复杂的实现。
然后,我们将邮件程序的 ID 推送到作业队列并创建了 10 个工作线程池。
在每个工作线程中,我们从作业队列中弹出项目。
因此,工作线程的生命周期是不断地等待任务放入作业队列并执行它们。
所以好消息是,这可以正常工作并且可以扩展,没有任何问题。 不幸的是,即使对于我们简单的教程,这也相当复杂。
赛璐珞
多亏了 Ruby Gem 生态系统,多线程的大部分复杂性都被巧妙地封装在许多开箱即用的易于使用的 Ruby Gem 中。
一个很好的例子是赛璐珞,我最喜欢的红宝石之一。 赛璐珞框架是在 Ruby 中实现基于角色的并发系统的一种简单而干净的方法。 赛璐珞使人们能够从并发对象构建并发程序,就像他们从顺序对象构建顺序程序一样容易。
在本文讨论的上下文中,我特别关注池功能,但请帮自己一个忙,并更详细地查看它。 使用赛璐珞,您将能够构建多线程 Ruby 程序,而不必担心诸如死锁之类的讨厌问题,并且您会发现使用其他更复杂的功能(例如 Futures 和 Promises)是微不足道的。
以下是我们的邮件程序的多线程版本使用赛璐珞的简单程度:
require "./lib/mailer" require "benchmark" require "celluloid" class MailWorker include Celluloid def send_email(id) Mailer.deliver do from "eki_#{id}@eqbalq.com" to "jill_#{id}@example.com" subject "Threading and Forking (#{id})" body "Some content" end end end mailer_pool = MailWorker.pool(size: 10) 10_000.times do |i| mailer_pool.async.send_email(i) end
干净、简单、可扩展且强大。 你还能要求什么呢?
后台作业
当然,根据您的操作要求和限制,另一个可能可行的替代方案是使用后台作业。 许多 Ruby Gem 支持后台处理(即,将作业保存在队列中并稍后处理它们而不阻塞当前线程)。 值得注意的例子包括 Sidekiq、Resque、Delayed Job 和 Beanstalkd。
在这篇文章中,我将使用 Sidekiq 和 Redis(一种开源键值缓存和存储)。
首先,让我们安装 Redis 并在本地运行它:
brew install redis redis-server /usr/local/etc/redis.conf
随着我们的本地 Redis 实例运行,让我们看一下使用 Sidekiq 的示例邮件程序 ( mail_worker.rb
) 的一个版本:
require_relative "../lib/mailer" require "sidekiq" class MailWorker include Sidekiq::Worker def perform(id) Mailer.deliver do from "eki_#{id}@eqbalq.com" to "jill_#{id}@example.com" subject "Threading and Forking (#{id})" body "Some content" end end end
我们可以使用mail_worker.rb
文件触发 Sidekiq:
sidekiq -r ./mail_worker.rb
然后来自 IRB:
⇒ irb >> require_relative "mail_worker" => true >> 100.times{|i| MailWorker.perform_async(i)} 2014-12-20T02:42:30Z 46549 TID-ouh10w8gw INFO: Sidekiq client with redis options {} => 100
非常简单。 只需更改工人的数量,它就可以轻松扩展。
另一种选择是使用 Sucker Punch,这是我最喜欢的异步 RoR 处理库之一。 使用 Sucker Punch 的实现将非常相似。 我们只需要包含SuckerPunch::Job
而不是Sidekiq::Worker
和MailWorker.new.async.perform()
而不是MailWorker.perform_async()
。
结论
高并发不仅在 Ruby 中可以实现,而且比您想象的要简单。
一种可行的方法是简单地分叉一个正在运行的进程以增加其处理能力。 另一种技术是利用多线程。 尽管线程比进程更轻量,需要更少的开销,但如果同时启动太多线程,您仍然可能会耗尽资源。 在某些时候,您可能会发现有必要使用线程池。 幸运的是,多线程的许多复杂性都可以通过利用许多可用的 gem 中的任何一个来简化,例如 Celluloid 及其 Actor 模型。
另一种处理耗时过程的方法是使用后台处理。 有许多库和服务允许您在应用程序中实现后台作业。 一些流行的工具包括数据库支持的作业框架和消息队列。
分叉、线程和后台处理都是可行的选择。 关于使用哪一个的决定取决于您的应用程序的性质、您的操作环境和要求。 希望本教程对可用选项提供了有用的介绍。