# along with Koha; if not, see <http://www.gnu.org/licenses>.
use Modern::Perl;
-use JSON qw( decode_json encode_json );
-use Encode qw( encode_utf8 );
+use JSON;
use Carp qw( croak );
use Net::Stomp;
use Try::Tiny qw( catch try );
use C4::Context;
use Koha::DateUtils qw( dt_from_string );
use Koha::Exceptions;
-use Koha::BackgroundJob::BatchUpdateBiblio;
-use Koha::BackgroundJob::BatchUpdateAuthority;
-use Koha::BackgroundJob::BatchUpdateItem;
-use Koha::BackgroundJob::BatchDeleteBiblio;
-use Koha::BackgroundJob::BatchDeleteAuthority;
-use Koha::BackgroundJob::BatchDeleteItem;
-use Koha::BackgroundJob::BatchCancelHold;
+use Koha::Plugins;
+use Koha::Exceptions::BackgroundJob;
use base qw( Koha::Object );
);
Consumer:
-Koha::BackgrounJobs->find($job_id)->process;
+Koha::BackgroundJobs->find($job_id)->process;
See also C<misc/background_jobs_worker.pl> for a full example
=head1 API
sub enqueue {
my ( $self, $params ) = @_;
- my $job_type = $self->job_type;
- my $job_size = $params->{job_size};
- my $job_args = $params->{job_args};
-
- my $borrowernumber = C4::Context->userenv->{number}; # FIXME Handle non GUI calls
- my $json_args = encode_json $job_args;
- my $job_id;
- $self->_result->result_source->schema->txn_do(
- sub {
- $self->set(
- {
- status => 'new',
- type => $job_type,
- size => $job_size,
- data => $json_args,
- enqueued_on => dt_from_string,
- borrowernumber => $borrowernumber,
- }
- )->store;
-
- $job_id = $self->id;
- $job_args->{job_id} = $job_id;
- $json_args = encode_json $job_args;
-
- try {
- my $conn = $self->connect;
- # This namespace is wrong, it must be a vhost instead.
- # But to do so it needs to be created on the server => much more work when a new Koha instance is created.
- # Also, here we just want the Koha instance's name, but it's not in the config...
- # Picking a random id (memcached_namespace) from the config
- my $namespace = C4::Context->config('memcached_namespace');
- $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } )
- or Koha::Exceptions::Exception->throw('Job has not been enqueued');
- } catch {
- if ( ref($_) eq 'Koha::Exceptions::Exception' ) {
- $_->rethrow;
- } else {
- warn sprintf "The job has not been sent to the message broker: (%s)", $_;
- }
- };
+ my $job_type = $self->job_type;
+ my $job_size = $params->{job_size};
+ my $job_args = $params->{job_args};
+ my $job_context = $params->{job_context} // C4::Context->userenv;
+ my $job_queue = $params->{job_queue} // 'default';
+ my $json = $self->json;
+
+ my $borrowernumber = (C4::Context->userenv) ? C4::Context->userenv->{number} : undef;
+ $job_context->{interface} = C4::Context->interface;
+ my $json_context = $json->encode($job_context);
+ my $json_args = $json->encode($job_args);
+
+ $self->set(
+ {
+ status => 'new',
+ type => $job_type,
+ queue => $job_queue,
+ size => $job_size,
+ data => $json_args,
+ context => $json_context,
+ enqueued_on => dt_from_string,
+ borrowernumber => $borrowernumber,
}
- );
+ )->store;
- return $job_id;
+ $job_args->{job_id} = $self->id;
+
+ my $conn;
+ try {
+ $conn = $self->connect;
+ } catch {
+ warn "Cannot connect to broker " . $_;
+ };
+ return unless $conn;
+
+ $json_args = $json->encode($job_args);
+ try {
+ # This namespace is wrong, it must be a vhost instead.
+ # But to do so it needs to be created on the server => much more work when a new Koha instance is created.
+ # Also, here we just want the Koha instance's name, but it's not in the config...
+ # Picking a random id (memcached_namespace) from the config
+ my $namespace = C4::Context->config('memcached_namespace');
+ $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_queue), body => $json_args } )
+ or Koha::Exceptions::Exception->throw('Job has not been enqueued');
+ } catch {
+ $self->status('failed')->store;
+ if ( ref($_) eq 'Koha::Exceptions::Exception' ) {
+ $_->rethrow;
+ } else {
+ warn sprintf "The job has not been sent to the message broker: (%s)", $_;
+ }
+ };
+
+ return $self->id;
}
=head3 process
$args ||= {};
- return $derived_class->process({job_id => $self->id, %$args});
+ if ( $self->context ) {
+ my $context = $self->json->decode($self->context);
+ C4::Context->_new_userenv(-1);
+ C4::Context->interface( $context->{interface} );
+ C4::Context->set_userenv(
+ $context->{number}, $context->{id},
+ $context->{cardnumber}, $context->{firstname},
+ $context->{surname}, $context->{branch},
+ $context->{branchname}, $context->{flags},
+ $context->{emailaddress}, undef,
+ $context->{desk_id}, $context->{desk_name},
+ $context->{register_id}, $context->{register_name}
+ );
+ }
+ else {
+ Koha::Logger->get->warn("A background job didn't have context defined (" . $self->id . ")");
+ }
+
+ return $derived_class->process( $args );
+}
+
+=head3 start
+
+ $self->start;
+
+Marks the job as started.
+
+=cut
+
+sub start {
+ my ($self) = @_;
+
+ Koha::Exceptions::BackgroundJob::InconsistentStatus->throw(
+ current_status => $self->status,
+ expected_status => 'new'
+ ) unless $self->status eq 'new';
+
+ return $self->set(
+ {
+ started_on => \'NOW()',
+ progress => 0,
+ status => 'started',
+ }
+ )->store;
+}
+
+=head3 step
+
+ $self->step;
+
+Makes the job record a step has taken place.
+
+=cut
+
+sub step {
+ my ($self) = @_;
+
+ Koha::Exceptions::BackgroundJob::InconsistentStatus->throw(
+ current_status => $self->status,
+ expected_status => 'started'
+ ) unless $self->status eq 'started';
+
+ # reached the end of the tasks already
+ Koha::Exceptions::BackgroundJob::StepOutOfBounds->throw()
+ unless $self->progress < $self->size;
+
+ return $self->progress( $self->progress + 1 )->store;
+}
+
+=head3 finish
+
+ $self->finish;
+
+Makes the job record as finished. If the job status is I<cancelled>, it is kept.
+
+=cut
+
+sub finish {
+ my ( $self, $data ) = @_;
+
+ $self->status('finished') unless $self->status eq 'cancelled' or $self->status eq 'failed';
+
+ return $self->set(
+ {
+ ended_on => \'NOW()',
+ data => $self->json->encode($data),
+ }
+ )->store;
+}
+
+=head3 json
+
+ my $JSON_object = $self->json;
+
+Returns a JSON object with utf8 disabled. Encoding to UTF-8 should be
+done later.
+
+=cut
+
+sub json {
+ my ( $self ) = @_;
+ $self->{_json} //= JSON->new->utf8(0); # TODO Should we allow_nonref ?
+ return $self->{_json};
+}
+
+=head3 decoded_data
+
+ my $job_data = $self->decoded_data;
+
+Returns the decoded JSON contents from $self->data.
+
+=cut
+
+sub decoded_data {
+ my ($self) = @_;
+
+ return $self->data ? $self->json->decode( $self->data ) : undef;
+}
+
+=head3 set_encoded_data
+
+ $self->set_encoded_data( $data );
+
+Serializes I<$data> as a JSON string and sets the I<data> attribute with it.
+
+=cut
+
+sub set_encoded_data {
+ my ( $self, $data ) = @_;
+
+ return $self->data( $data ? $self->json->encode($data) : undef );
}
=head3 job_type
my ( $self ) = @_;
my @messages;
- my $data_dump = decode_json encode_utf8 $self->data;
+ my $data_dump = $self->json->decode($self->data);
if ( exists $data_dump->{messages} ) {
@messages = @{ $data_dump->{messages} };
}
sub report {
my ( $self ) = @_;
- my $data_dump = decode_json encode_utf8 $self->data;
+ my $data_dump = $self->json->decode($self->data);
return $data_dump->{report} || {};
}
my $derived_class = $self->_derived_class;
- return $derived_class->additional_report({job_id => $self->id});
+ return $derived_class->additional_report;
}
=head3 cancel
my $class = $self->type_to_class_mapping->{$job_type};
- Koha::Exceptions::Exception->throw($job_type . ' is not a valid job_type')
+ Koha::Exception->throw($job_type . ' is not a valid job_type')
unless $class;
- return $class->new;
+ eval "require $class";
+ return $class->_new_from_dbic( $self->_result );
}
=head3 type_to_class_mapping
+ my $mapping = Koha::BackgroundJob->new->type_to_class_mapping;
+
+Returns the available types to class mappings.
+
=cut
sub type_to_class_mapping {
+ my ($self) = @_;
+
+ my $plugins_mapping = ( C4::Context->config("enable_plugins") ) ? $self->plugin_types_to_classes : {};
+
+ return ($plugins_mapping)
+ ? { %{ $self->core_types_to_classes }, %$plugins_mapping }
+ : $self->core_types_to_classes;
+}
+
+=head3 core_types_to_classes
+
+ my $mappings = Koha::BackgroundJob->new->core_types_to_classes
+
+Returns the core background jobs types to class mappings.
+
+=cut
+
+sub core_types_to_classes {
return {
batch_authority_record_deletion => 'Koha::BackgroundJob::BatchDeleteAuthority',
batch_authority_record_modification => 'Koha::BackgroundJob::BatchUpdateAuthority',
batch_item_record_deletion => 'Koha::BackgroundJob::BatchDeleteItem',
batch_item_record_modification => 'Koha::BackgroundJob::BatchUpdateItem',
batch_hold_cancel => 'Koha::BackgroundJob::BatchCancelHold',
+ update_elastic_index => 'Koha::BackgroundJob::UpdateElasticIndex',
+ update_holds_queue_for_biblios => 'Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue',
+ stage_marc_for_import => 'Koha::BackgroundJob::StageMARCForImport',
+ marc_import_commit_batch => 'Koha::BackgroundJob::MARCImportCommitBatch',
+ marc_import_revert_batch => 'Koha::BackgroundJob::MARCImportRevertBatch',
+ };
+}
+
+=head3 plugin_types_to_classes
+
+ my $mappings = Koha::BackgroundJob->new->plugin_types_to_classes
+
+Returns the plugin-defined background jobs types to class mappings.
+
+=cut
+
+sub plugin_types_to_classes {
+ my ($self) = @_;
+
+ unless ( exists $self->{_plugin_mapping} ) {
+ my @plugins = Koha::Plugins->new()->GetPlugins( { method => 'background_tasks', } );
+
+ foreach my $plugin (@plugins) {
+
+ my $tasks = $plugin->background_tasks;
+ my $metadata = $plugin->get_metadata;
+
+ unless ( $metadata->{namespace} ) {
+ Koha::Logger->get->warn(
+ q{A plugin includes the 'background_tasks' method, }
+ . q{but doesn't provide the required 'namespace' }
+ . qq{method ($plugin->{class})} );
+ next;
+ }
+
+ my $namespace = $metadata->{namespace};
+
+ foreach my $type ( keys %{$tasks} ) {
+ my $class = $tasks->{$type};
+
+ # skip if conditions not met
+ next unless $type and $class;
+
+ my $key = "plugin_$namespace" . "_$type";
+
+ $self->{_plugin_mapping}->{$key} = $tasks->{$type};
+ }
+ }
+ }
+
+ return $self->{_plugin_mapping};
+}
+
+=head3 to_api
+
+ my $json = $job->to_api;
+
+Overloaded method that returns a JSON representation of the Koha::BackgroundJob object,
+suitable for API output.
+
+=cut
+
+sub to_api {
+ my ( $self, $params ) = @_;
+
+ my $json = $self->SUPER::to_api( $params );
+
+ $json->{context} = $self->json->decode($self->context)
+ if defined $self->context;
+ $json->{data} = $self->decoded_data;
+
+ return $json;
+}
+
+=head3 to_api_mapping
+
+This method returns the mapping for representing a Koha::BackgroundJob object
+on the API.
+
+=cut
+
+sub to_api_mapping {
+ return {
+ id => 'job_id',
+ borrowernumber => 'patron_id',
+ ended_on => 'ended_date',
+ enqueued_on => 'enqueued_date',
+ started_on => 'started_date',
};
}