# You should have received a copy of the GNU General Public License
# along with Koha; if not, see <http://www.gnu.org/licenses>.
+=head1 NAME
+
+background_jobs_worker.pl - Worker script that will process background jobs
+
+=head1 SYNOPSIS
+
+./background_jobs_worker.pl [--queue QUEUE]
+
+=head1 DESCRIPTION
+
+This script will connect to the Stomp server (RabbitMQ) and subscribe to the queues passed in parameter (or the 'default' queue),
+or if a Stomp server is not active it will poll the database every 10s for new jobs in the passed queue.
+
+You can specify some queues only (using --queue, which is repeatable) if you want to run several workers that will handle their own jobs.
+
+=head1 OPTIONS
+
+=over
+
+=item B<--queue>
+
+Repeatable. Give the job queues this worker will process.
+
+The different values available are:
+
+ default
+ long_tasks
+
+=back
+
+=cut
+
use Modern::Perl;
-use JSON qw( encode_json decode_json );
+use JSON qw( decode_json );
+use Try::Tiny qw( catch try );
+use Pod::Usage;
+use Getopt::Long;
-use Koha::BackgroundJob::BatchUpdateBiblio;
-use Koha::BackgroundJob;
+use Koha::BackgroundJobs;
-my $conn = Koha::BackgroundJob->connect;
+my ( $help, @queues );
+GetOptions(
+ 'h|help' => \$help,
+ 'queue=s' => \@queues,
+) || pod2usage(1);
-my $job_type = 'batch_biblio_record_modification';
+pod2usage(0) if $help;
-$conn->subscribe({ destination => $job_type, ack => 'client' });
-while (1) {
- my $frame = $conn->receive_frame;
- if ( !defined $frame ) {
- # maybe log connection problems
- next; # will reconnect automatically
+unless (@queues) {
+ push @queues, 'default';
+}
+
+my $conn;
+try {
+ $conn = Koha::BackgroundJob->connect;
+} catch {
+ warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
+};
+
+if ( $conn ) {
+ # FIXME cf note in Koha::BackgroundJob about $namespace
+ my $namespace = C4::Context->config('memcached_namespace');
+ for my $queue (@queues) {
+ $conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $queue), ack => 'client' });
}
+}
+while (1) {
+ if ( $conn ) {
+ my $frame = $conn->receive_frame;
+ if ( !defined $frame ) {
+ # maybe log connection problems
+ next; # will reconnect automatically
+ }
- my $body = $frame->body;
- my $args = decode_json($body);
+ my $body = $frame->body;
+ my $args = decode_json($body); # TODO Should this be from_json? Check utf8 flag.
- my $success = Koha::BackgroundJob::BatchUpdateBiblio->process( $args );
+ # FIXME This means we need to have create the DB entry before
+ # It could work in a first step, but then we will want to handle job that will be created from the message received
+ my $job = Koha::BackgroundJobs->find($args->{job_id});
- $conn->ack( { frame => $frame } ); # FIXME depending on $success?
+ process_job( $job, $args );
+ $conn->ack( { frame => $frame } ); # FIXME depending on success?
+
+ } else {
+ my $jobs = Koha::BackgroundJobs->search({ status => 'new', queue => \@queues });
+ while ( my $job = $jobs->next ) {
+ my $args = $job->json->decode($job->data);
+ process_job( $job, { job_id => $job->id, %$args } );
+ }
+ sleep 10;
+ }
}
$conn->disconnect;
+
+sub process_job {
+ my ( $job, $args ) = @_;
+
+ my $pid;
+ if ( $pid = fork ) {
+ wait;
+ return;
+ }
+
+ die "fork failed!" unless defined $pid;
+
+ $job->process( $args );
+
+ exit;
+}