TheSchwartz でジョブを登録順に処理する方法(手抜き)
perl でジョブキューの仕組みをお手軽に導入したい場合は、TheSchwartz を使っている方が多いのではないでしょうか?
とあるシステムで、各々の処理が比較的時間がかかるので、ジョブキューの仕組みが必須ではあるが、処理順序をジョブキューに投入した順序を守って処理する必要がありました。TheSchwartz は残念ながら投入したジョブ順序で処理されません。ぐぐって検索してみたのですが、同様の悩みを解決できずに困ってる人は結構いるみたい。でもその解決方法は示されていない。
そんな状況で困ったのでソースを読んでみました。オプションで処理順序をジョブ投入順にすることはできないみたいです。
というわけで、シーケンシャルに処理するための patch を記述することにしました。
改変した TheSchwartz を作っているアプリ配下の lib 内に設置して、改変した TheSchwartz が呼び出されるようにしています。
シーケンシャルに処理するための patch (手抜き)
patch ファイルはこちら: patch.diff
patch の内容は下記の通りです。
--- TheSchwartz.pm.org 2012-03-10 10:00:51.000000000 +0900 +++ TheSchwartz.pm 2012-03-10 10:01:19.000000000 +0900 @@ -185,7 +185,7 @@ @options }, { limit => $limit, ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) + direction => 'descend' ) : ( sort => 'jobid', direction => 'ascend' ) ) }); } else { push @jobs, $driver->search('TheSchwartz::Job' => { @@ -193,7 +193,7 @@ @options }, { limit => $limit, ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) + direction => 'descend' ) : ( sort => 'jobid', direction => 'ascend' ) ) } ); } @@ -240,7 +240,7 @@ coalesce => { op => $op, value => $coval }, }, { limit => $FIND_JOB_BATCH_SIZE, ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) + direction => 'descend' ) : ( sort => 'jobid', direction => 'ascend' ) ) } ); }; @@ -283,7 +283,7 @@ grabbed_until => \ "<= $unixtime", }, { limit => $FIND_JOB_BATCH_SIZE, ( $client->prioritize ? ( sort => 'priority', - direction => 'descend' ) : () ) + direction => 'descend' ) : ( sort => 'jobid', direction => 'ascend' ) ) } ); }; @@ -314,7 +314,7 @@ my $driver = $client->driver_for($hashdsn); ## Got some jobs! Randomize them to avoid contention between workers. - my @jobs = shuffle(@_); + my @jobs = @_; JOB: while (my $job = shift @jobs) {
まずは TheSchwartz のジョブを取り出すまでの処理の流れを把握してみましょう。各自が作るであろう worker.pl からの処理の流れは下記の示すとおりです。
patch の内容を説明しますと、ジョブを登録すると、実際には job テーブルにジョブが insert されていきます。worker がジョブを取り出す時には、find_job_for_workers() メソッド内で priority オプションでソートしてジョブをとりだしています。それらのコードは TheSchwartz.pm に記述されています。また、TheSchwartz は DBI で直接 SQL を記述せず、ORマッパーのひとつの Data::ObjectDriver を使って記述されています。
今回は手抜きなので priority オプションがない場合に、プライマリーキーの jobid で昇順にソートして取り出すように改変しました。昇順なので、1,2,3 と投入した順でジョブを取り出せます。解析が適当なので、find_job_for_workers() メソッド以外にも、同じように job テーブルに対する SQL に該当する部分に、全部 patch を当てました。
if ($arg->{want_handle}) { push @jobs, map { my $handle = TheSchwartz::JobHandle->new({ dsn_hashed => $hashdsn, client => $client, jobid => $_->jobid }); $_->handle($handle); $_; } $driver->search('TheSchwartz::Job' => { funcid => $funcid, @options }, { limit => $limit, ( $client->prioritize ? ( sort => 'priority', direction => 'descend' ) : ( sort => 'jobid', direction => 'ascend' ) ) }); } else { push @jobs, $driver->search('TheSchwartz::Job' => { funcid => $funcid, @options }, { limit => $limit, ( $client->prioritize ? ( sort => 'priority', direction => 'descend' ) : ( sort => 'jobid', direction => 'ascend' ) ) } ); }
次に、_grab_a_job() という関数を見てみます。ここで何故かとりだしたジョブをシャッフルしています。コメントが挿入されていて下記のように記述されています。
worker 間での競合を避けるためにシャッフルしていますが、ここのシャッフルをコメントアウトします。worker を多重化しなければ、これでも問題は発生しないはずです。検証した範囲では、YourWorker.pl で worker を1つしか作らなければ問題なくジョブ登録順に処理できています。
このpatchを適用すれば、workerを多重化できない欠点がありますが、要件は満たすことができるようになると思います。
※TheSchwartz をあまり深追いしていないので、こんなことしちゃイカン!ってことしてる可能性はあります・・・
コメントやシェアをお願いします!