環境
概要
例えば以下のように配列に対してmapで全要素に対して特定の処理を行いたかったとする。
def check(x) # 例えば3秒かかるとする sleep 3 x.even? end a = [1, 2, 3] results = a.map do |x| check(x) end p results
ただこれだと1要素に対して3秒かかるので合計9秒もかかってしまう。そこまで待てない、高速化したいということでRubyのThreadクラスを利用して並列化してみる。
def check(x) # 例えば3秒かかるとする sleep 3 x.even? end a = [1, 2, 3] threads = a.map do |x| Thread.new do check(x) end end results = threads.map(&:value) p results
こうすれば3秒しかかからない。
Thread.new
は、Rubyのスレッドを生成するメソッドだ。これにより、指定したブロック内の処理を新たなスレッドで実行できる。素晴らしい点は、この処理がメインのスレッドとは別に走るので、複数のスレッドが並行して実行されるということだ。
例えば、元のコードではcheck
メソッドが3秒の待機を伴うため、3つの要素に対して順番に処理をした場合、合計で9秒かかる。しかし、各要素に対してThread.new
を使って、check
メソッドを独立したスレッドで実行することで、全ての処理が並行して行われる。3つのスレッドが同時に動作するので、最初のスレッドが処理に3秒かかり、次の2つのスレッドも同時に開始され、全体の時間は3秒で済む。
つまり、Thread.new
を使うことで、処理を非同期的に行い、待機時間を大幅に短縮できるわけだ。
副作用
ただ、Thread.newがRails内で使われるのはニッチな想定なのか、checkメソッドにモデルに関連する処理があった場合実行すると失敗してしまう。
困った
解決策
ActiveRecord::Base.connection_pool.with_connection
をブロックに挟む
def check(x) # 例えば3秒かかるとする sleep 3 x.even? end a = [1, 2, 3] threads = a.map do |x| Thread.new do ActiveRecord::Base.connection_pool.with_connection do check(x) end end end results = threads.map(&:value) p results
原因
Railsのアプリケーションでは、データベースへの接続はスレッドセーフではない。つまり、各スレッドが個別にデータベース接続を持っていないと、モデルに関連する処理を行った際にデータベースとの接続が正常に行われず、エラーが発生することがある。
ActiveRecord::Base.connection_pool.with_connection
を使うことで、各スレッドが必要な時にデータベース接続プールから接続を取得し、処理が終わったら接続を返すことができる。これにより、スレッド間で接続の競合が解消され、エラーが出なくなる仕組みだ。
要するに、with_connection
を使うことで、データベース接続を安全に管理しつつ、スレッドを活用した処理が可能になるってわけだ。これがあれば、複数のスレッドでのデータベース操作も安心して行えるようになる訳。