my $job_type = $self->job_type;
my $job_size = $params->{job_size};
my $job_args = $params->{job_args};
+ my $job_queue = $params->{job_queue} // 'default';
my $borrowernumber = (C4::Context->userenv) ? C4::Context->userenv->{number} : undef;
my $json_args = encode_json $job_args;
{
status => 'new',
type => $job_type,
+ queue => $job_queue,
size => $job_size,
data => $json_args,
enqueued_on => dt_from_string,
# Also, here we just want the Koha instance's name, but it's not in the config...
# Picking a random id (memcached_namespace) from the config
my $namespace = C4::Context->config('memcached_namespace');
- $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } )
+ $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_queue), body => $json_args } )
or Koha::Exceptions::Exception->throw('Job has not been enqueued');
} catch {
$self->status('failed')->store;
=head1 SYNOPSIS
-./background_jobs_worker.pl [--job-type]
+./background_jobs_worker.pl [--job-queue QUEUE]
=head1 DESCRIPTION
-This script will connect to the Stomp server (RabbitMQ) and subscribe to the different destination queues available.
-You can specify some queues only (using --job-type) if you want to run several workers that will handle their own jobs.
+This script will connect to the Stomp server (RabbitMQ) and subscribe to the queues passed in parameter (or the 'default' queue),
+or if a Stomp server is not active it will poll the database every 10s for new jobs in the passed queue.
+
+You can specify some queues only (using --job-queue, which is repeatable) if you want to run several workers that will handle their own jobs.
=head1 OPTIONS
=over
-=item B<--job-type>
+=item B<--job-queue>
-Give the job types this worker will process.
+Repeatable. Give the job queues this worker will process.
The different values available are:
- batch_biblio_record_modification
- batch_authority_record_modification
- update_elastic_index
+ default
+ index
=back
use Koha::BackgroundJobs;
-my ( $help, @job_types );
+my ( $help, @queues );
GetOptions(
'h|help' => \$help,
- 'job-type:s' => \@job_types,
+ 'job-queue=s' => \@queues,
) || pod2usage(1);
pod2usage(0) if $help;
+unless (@queues) {
+ push @queues, 'default';
+}
+
my $conn;
try {
$conn = Koha::BackgroundJob->connect;
warn sprintf "Cannot connect to the message broker, the jobs will be processed anyway (%s)", $_;
};
-my @available_job_types = qw(
- batch_biblio_record_modification
- batch_authority_record_modification
- batch_item_record_modification
- batch_biblio_record_deletion
- batch_authority_record_deletion
- batch_item_record_deletion
- batch_hold_cancel
- update_elastic_index
-);
-
-if ( @job_types ) {
- for my $job_type ( @job_types ) {
- pod2usage( -verbose => 1, -msg => sprintf "You specify an invalid --job-type value: %s\n", $job_type )
- unless grep { $_ eq $job_type } @available_job_types;
- }
-} else {
- @job_types = @available_job_types;
-}
-
if ( $conn ) {
# FIXME cf note in Koha::BackgroundJob about $namespace
my $namespace = C4::Context->config('memcached_namespace');
- for my $job_type ( @job_types ) {
- $conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $job_type), ack => 'client' });
+ for my $queue (@queues) {
+ $conn->subscribe({ destination => sprintf("/queue/%s-%s", $namespace, $queue), ack => 'client' });
}
}
while (1) {
$conn->ack( { frame => $frame } ); # FIXME depending on success?
} else {
- my $jobs = Koha::BackgroundJobs->search({ status => 'new' });
+ my $jobs = Koha::BackgroundJobs->search({ status => 'new', queue => \@queues });
while ( my $job = $jobs->next ) {
my $args = decode_json($job->data);
process_job( $job, { job_id => $job->id, %$args } );