Bug 27783: Replace --job-type by --queue
authorJulian Maurice <julian.maurice@biblibre.com>
Wed, 30 Mar 2022 12:21:50 +0000 (14:21 +0200)
committerFridolin Somers <fridolin.somers@biblibre.com>
Wed, 13 Apr 2022 13:55:40 +0000 (15:55 +0200)
This patch adds a new column background_jobs.queue, which default to
'default'
By default, new jobs are enqueued into this default queue, and the
background job worker will subscribe to the default queue unless told
otherwise by the --job-queue option

The new job UpdateElasticIndex is automatically enqueued in another
queue named 'index'
So you can have 'default' worker with
    misc/background_jobs_worker.pl
and a dedicated indexing worker with
    misc/background_jobs_worker.pl --queue index

This is to address bug 27344 comment #15

Signed-off-by: Martin Renvoize <martin.renvoize@ptfs-europe.com>
Signed-off-by: Kyle M Hall <kyle@bywatersolutions.com>
Signed-off-by: Martin Renvoize <martin.renvoize@ptfs-europe.com>
Signed-off-by: Fridolin Somers <fridolin.somers@biblibre.com>
Koha/BackgroundJob.pm
misc/background_jobs_worker.pl

index b22da6d..9d05f17 100644 (file)
@@ -97,6 +97,7 @@ sub enqueue {
     my $job_type = $self->job_type;
     my $job_size = $params->{job_size};
     my $job_args = $params->{job_args};
+    my $job_queue = $params->{job_queue} // 'default';
 
     my $borrowernumber = (C4::Context->userenv) ? C4::Context->userenv->{number} : undef;
     my $json_args = encode_json $job_args;
@@ -105,6 +106,7 @@ sub enqueue {
         {
             status         => 'new',
             type           => $job_type,
+            queue          => $job_queue,
             size           => $job_size,
             data           => $json_args,
             enqueued_on    => dt_from_string,
@@ -129,7 +131,7 @@ sub enqueue {
         # Also, here we just want the Koha instance's name, but it's not in the config...
         # Picking a random id (memcached_namespace) from the config
         my $namespace = C4::Context->config('memcached_namespace');
-        $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } )
+        $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_queue), body => $json_args } )
           or Koha::Exceptions::Exception->throw('Job has not been enqueued');
     } catch {
         $self->status('failed')->store;
index b503ebd..c9495d0 100755 (executable)
@@ -21,26 +21,27 @@ background_jobs_worker.pl - Worker script that will process background jobs
 
 =head1 SYNOPSIS
 
-./background_jobs_worker.pl [--job-type]
+./background_jobs_worker.pl [--job-queue QUEUE]
 
 =head1 DESCRIPTION
 
-This script will connect to the Stomp server (RabbitMQ) and subscribe to the different destination queues available.
-You can specify some queues only (using --job-type) if you want to run several workers that will handle their own jobs.
+This script will connect to the Stomp server (RabbitMQ) and subscribe to the queues passed in parameter (or the 'default' queue),
+or if a Stomp server is not active it will poll the database every 10s for new jobs in the passed queue.
+
+You can specify some queues only (using --job-queue, which is repeatable) if you want to run several workers that will handle their own jobs.
 
 =head1 OPTIONS
 
 =over
 
-=item B<--job-type>
+=item B<--job-queue>
 
-Give the job types this worker will process.
+Repeatable. Give the job queues this worker will process.
 
 The different values available are:
 
-    batch_biblio_record_modification
-    batch_authority_record_modification
-    update_elastic_index
+    default
+    index
 
 =back
 
@@ -54,14 +55,18 @@ use Getopt::Long;
 
 use Koha::BackgroundJobs;
 
-my ( $help, @job_types );
+my ( $help, @queues );
 GetOptions(
     'h|help' => \$help,
-    'job-type:s' => \@job_types,
+    'job-queue=s' => \@queues,
 ) || pod2usage(1);
 
 pod2usage(0) if $help;
 
+unless (@queues) {
+    push @queues, 'default';
+}
+
 my $conn;
 try {
     $conn = Koha::BackgroundJob->connect;
@@ -69,31 +74,11 @@ try {
     warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
 };
 
-my @available_job_types = qw(
-    batch_biblio_record_modification
-    batch_authority_record_modification
-    batch_item_record_modification
-    batch_biblio_record_deletion
-    batch_authority_record_deletion
-    batch_item_record_deletion
-    batch_hold_cancel
-    update_elastic_index
-);
-
-if ( @job_types ) {
-    for my $job_type ( @job_types ) {
-        pod2usage( -verbose => 1, -msg => sprintf "You specify an invalid --job-type value: %s\n", $job_type )
-            unless grep { $_ eq $job_type } @available_job_types;
-    }
-} else {
-    @job_types = @available_job_types;
-}
-
 if ( $conn ) {
     # FIXME cf note in Koha::BackgroundJob about $namespace
     my $namespace = C4::Context->config('memcached_namespace');
-    for my $job_type ( @job_types ) {
-        $conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $job_type), ack => 'client' });
+    for my $queue (@queues) {
+        $conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $queue), ack => 'client' });
     }
 }
 while (1) {
@@ -115,7 +100,7 @@ while (1) {
         $conn->ack( { frame => $frame } ); # FIXME depending on success?
 
     } else {
-        my $jobs = Koha::BackgroundJobs->search({ status => 'new' });
+        my $jobs = Koha::BackgroundJobs->search({ status => 'new', queue => \@queues });
         while ( my $job = $jobs->next ) {
             my $args = decode_json($job->data);
             process_job( $job, { job_id => $job->id, %$args } );