# along with Koha; if not, see <http://www.gnu.org/licenses>.
use Modern::Perl;
-use JSON qw( decode_json encode_json );
-use Encode qw( encode_utf8 );
+use JSON;
use Carp qw( croak );
use Net::Stomp;
use Try::Tiny qw( catch try );
use C4::Context;
use Koha::DateUtils qw( dt_from_string );
use Koha::Exceptions;
+use Koha::Plugins;
+use Koha::Exceptions::BackgroundJob;
use base qw( Koha::Object );
sub enqueue {
my ( $self, $params ) = @_;
- my $job_type = $self->job_type;
- my $job_size = $params->{job_size};
- my $job_args = $params->{job_args};
+ my $job_type = $self->job_type;
+ my $job_size = $params->{job_size};
+ my $job_args = $params->{job_args};
+ my $job_context = $params->{job_context} // C4::Context->userenv;
+ my $job_queue = $params->{job_queue} // 'default';
+ my $json = $self->json;
+
+ my $borrowernumber = (C4::Context->userenv) ? C4::Context->userenv->{number} : undef;
+ $job_context->{interface} = C4::Context->interface;
+ my $json_context = $json->encode($job_context);
+ my $json_args = $json->encode($job_args);
- my $borrowernumber = C4::Context->userenv->{number}; # FIXME Handle non GUI calls
- my $json_args = encode_json $job_args;
$self->set(
{
status => 'new',
type => $job_type,
+ queue => $job_queue,
size => $job_size,
data => $json_args,
+ context => $json_context,
enqueued_on => dt_from_string,
borrowernumber => $borrowernumber,
}
)->store;
$job_args->{job_id} = $self->id;
- $json_args = encode_json $job_args;
+ my $conn;
+ try {
+ $conn = $self->connect;
+ } catch {
+ warn "Cannot connect to broker " . $_;
+ };
+ return unless $conn;
+
+ $json_args = $json->encode($job_args);
try {
- my $conn = $self->connect;
# This namespace is wrong, it must be a vhost instead.
# But to do so it needs to be created on the server => much more work when a new Koha instance is created.
# Also, here we just want the Koha instance's name, but it's not in the config...
# Picking a random id (memcached_namespace) from the config
my $namespace = C4::Context->config('memcached_namespace');
- $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_type), body => $json_args } )
+ $conn->send_with_receipt( { destination => sprintf("/queue/%s-%s", $namespace, $job_queue), body => $json_args } )
or Koha::Exceptions::Exception->throw('Job has not been enqueued');
} catch {
$self->status('failed')->store;
$args ||= {};
+ if ( $self->context ) {
+ my $context = $self->json->decode($self->context);
+ C4::Context->_new_userenv(-1);
+ C4::Context->interface( $context->{interface} );
+ C4::Context->set_userenv(
+ $context->{number}, $context->{id},
+ $context->{cardnumber}, $context->{firstname},
+ $context->{surname}, $context->{branch},
+ $context->{branchname}, $context->{flags},
+ $context->{emailaddress}, undef,
+ $context->{desk_id}, $context->{desk_name},
+ $context->{register_id}, $context->{register_name}
+ );
+ }
+ else {
+ Koha::Logger->get->warn("A background job didn't have context defined (" . $self->id . ")");
+ }
+
return $derived_class->process( $args );
}
+=head3 start
+
+ $self->start;
+
+Marks the job as started.
+
+=cut
+
+sub start {
+ my ($self) = @_;
+
+ Koha::Exceptions::BackgroundJob::InconsistentStatus->throw(
+ current_status => $self->status,
+ expected_status => 'new'
+ ) unless $self->status eq 'new';
+
+ return $self->set(
+ {
+ started_on => \'NOW()',
+ progress => 0,
+ status => 'started',
+ }
+ )->store;
+}
+
+=head3 step
+
+ $self->step;
+
+Makes the job record a step has taken place.
+
+=cut
+
+sub step {
+ my ($self) = @_;
+
+ Koha::Exceptions::BackgroundJob::InconsistentStatus->throw(
+ current_status => $self->status,
+ expected_status => 'started'
+ ) unless $self->status eq 'started';
+
+ # reached the end of the tasks already
+ Koha::Exceptions::BackgroundJob::StepOutOfBounds->throw()
+ unless $self->progress < $self->size;
+
+ return $self->progress( $self->progress + 1 )->store;
+}
+
+=head3 finish
+
+ $self->finish;
+
+Makes the job record as finished. If the job status is I<cancelled>, it is kept.
+
+=cut
+
+sub finish {
+ my ( $self, $data ) = @_;
+
+ $self->status('finished') unless $self->status eq 'cancelled';
+
+ return $self->set(
+ {
+ ended_on => \'NOW()',
+ data => $self->json->encode($data),
+ }
+ )->store;
+}
+
+=head3 json
+
+ my $JSON_object = $self->json;
+
+Returns a JSON object with utf8 disabled. Encoding to UTF-8 should be
+done later.
+
+=cut
+
+sub json {
+ my ( $self ) = @_;
+ $self->{_json} //= JSON->new->utf8(0); # TODO Should we allow_nonref ?
+ return $self->{_json};
+}
+
+=head3 decoded_data
+
+ my $job_data = $self->decoded_data;
+
+Returns the decoded JSON contents from $self->data.
+
+=cut
+
+sub decoded_data {
+ my ($self) = @_;
+
+ return $self->data ? $self->json->decode( $self->data ) : undef;
+}
+
+=head3 set_encoded_data
+
+ $self->set_encoded_data( $data );
+
+Serializes I<$data> as a JSON string and sets the I<data> attribute with it.
+
+=cut
+
+sub set_encoded_data {
+ my ( $self, $data ) = @_;
+
+ return $self->data( $data ? $self->json->encode($data) : undef );
+}
+
=head3 job_type
Return the job type of the job. Must be a string.
my ( $self ) = @_;
my @messages;
- my $data_dump = decode_json encode_utf8 $self->data;
+ my $data_dump = $self->json->decode($self->data);
if ( exists $data_dump->{messages} ) {
@messages = @{ $data_dump->{messages} };
}
sub report {
my ( $self ) = @_;
- my $data_dump = decode_json encode_utf8 $self->data;
+ my $data_dump = $self->json->decode($self->data);
return $data_dump->{report} || {};
}
=head3 type_to_class_mapping
+ my $mapping = Koha::BackgrounJob->new->type_to_class_mapping;
+
+Returns the available types to class mappings.
+
=cut
sub type_to_class_mapping {
+ my ($self) = @_;
+
+ my $plugins_mapping = ( C4::Context->config("enable_plugins") ) ? $self->plugin_types_to_classes : {};
+
+ return ($plugins_mapping)
+ ? { %{ $self->core_types_to_classes }, %$plugins_mapping }
+ : $self->core_types_to_classes;
+}
+
+=head3 core_types_to_classes
+
+ my $mappings = Koha::BackgrounJob->new->core_types_to_classes
+
+Returns the core background jobs types to class mappings.
+
+=cut
+
+sub core_types_to_classes {
return {
batch_authority_record_deletion => 'Koha::BackgroundJob::BatchDeleteAuthority',
batch_authority_record_modification => 'Koha::BackgroundJob::BatchUpdateAuthority',
batch_item_record_deletion => 'Koha::BackgroundJob::BatchDeleteItem',
batch_item_record_modification => 'Koha::BackgroundJob::BatchUpdateItem',
batch_hold_cancel => 'Koha::BackgroundJob::BatchCancelHold',
+ update_elastic_index => 'Koha::BackgroundJob::UpdateElasticIndex',
+ update_holds_queue_for_biblios => 'Koha::BackgroundJob::BatchUpdateBiblioHoldsQueue',
};
}
+=head3 plugin_types_to_classes
+
+ my $mappings = Koha::BackgroundJob->new->plugin_types_to_classes
+
+Returns the plugin-defined background jobs types to class mappings.
+
+=cut
+
+sub plugin_types_to_classes {
+ my ($self) = @_;
+
+ unless ( exists $self->{_plugin_mapping} ) {
+ my @plugins = Koha::Plugins->new()->GetPlugins( { method => 'background_tasks', } );
+
+ foreach my $plugin (@plugins) {
+
+ my $tasks = $plugin->background_tasks;
+ my $metadata = $plugin->get_metadata;
+
+ unless ( $metadata->{namespace} ) {
+ Koha::Logger->get->warn(
+ q{A plugin includes the 'background_tasks' method, }
+ . q{but doesn't provide the required 'namespace' }
+ . qq{method ($plugin->{class})} );
+ next;
+ }
+
+ my $namespace = $metadata->{namespace};
+
+ foreach my $type ( keys %{$tasks} ) {
+ my $class = $tasks->{$type};
+
+ # skip if conditions not met
+ next unless $type and $class;
+
+ my $key = "plugin_$namespace" . "_$type";
+
+ $self->{_plugin_mapping}->{$key} = $tasks->{$type};
+ }
+ }
+ }
+
+ return $self->{_plugin_mapping};
+}
+
=head3 _type
=cut