adefd1f7eca0c32deb27163a68975a83ccc51c1d
[koha-ffzg.git] / Koha / SearchEngine / Elasticsearch / Indexer.pm
1 package Koha::SearchEngine::Elasticsearch::Indexer;
2
3 # Copyright 2013 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use Carp qw( carp croak );
21 use Modern::Perl;
22 use Try::Tiny qw( catch try );
23 use List::Util qw( any );
24 use base qw(Koha::SearchEngine::Elasticsearch);
25
26 use Koha::Exceptions;
27 use Koha::Exceptions::Elasticsearch;
28 use Koha::SearchEngine::Zebra::Indexer;
29 use Koha::BackgroundJob::UpdateElasticIndex;
30 use C4::AuthoritiesMarc qw//;
31 use C4::Biblio;
32 use C4::Context;
33
34 =head1 NAME
35
36 Koha::SearchEngine::Elasticsearch::Indexer - handles adding new records to the index
37
38 =head1 SYNOPSIS
39
40     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new(
41         { index => Koha::SearchEngine::BIBLIOS_INDEX } );
42     $indexer->drop_index();
43     $indexer->update_index(\@biblionumbers, \@records);
44
45
46 =head1 CONSTANTS
47
48 =over 4
49
50 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_OK>
51
52 Represents an index state where index is created and in a working state.
53
54 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_REINDEX_REQUIRED>
55
56 Not currently used, but could be useful later, for example if can detect when new field or mapping added.
57
58 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_RECREATE_REQUIRED>
59
60 Representings an index state where index needs to be recreated and is not in a working state.
61
62 =back
63
64 =cut
65
66 use constant {
67     INDEX_STATUS_OK => 0,
68     INDEX_STATUS_REINDEX_REQUIRED => 1,
69     INDEX_STATUS_RECREATE_REQUIRED => 2,
70 };
71
72 =head1 FUNCTIONS
73
74 =head2 update_index($biblionums, $records)
75
76     try {
77         $self->update_index($biblionums, $records);
78     } catch {
79         die("Something went wrong trying to update index:" .  $_[0]);
80     }
81
82 Converts C<MARC::Records> C<$records> to Elasticsearch documents and performs
83 an update request for these records on the Elasticsearch index.
84
85 =over 4
86
87 =item C<$biblionums>
88
89 Arrayref of biblio numbers for the C<$records>, the order must be the same as
90 and match up with C<$records>.
91
92 =item C<$records>
93
94 Arrayref of C<MARC::Record>s.
95
96 =back
97
98 =cut
99
100 sub update_index {
101     my ($self, $record_ids, $records) = @_;
102
103     my $index_record_ids = [];
104     unless ( $records && @$records ) {
105         for my $record_id ( sort { $a <=> $b } @$record_ids ) {
106
107             next unless $record_id;
108
109             my $record = $self->_get_record( $record_id );
110             if( $record ){
111                 push @$records, $record;
112                 push @$index_record_ids, $record_id;
113             }
114         }
115     } else {
116         $index_record_ids = $record_ids;
117     }
118
119     my $documents = $self->marc_records_to_documents($records);
120     my @body;
121     for (my $i = 0; $i < scalar @$index_record_ids; $i++) {
122         my $id = $index_record_ids->[$i];
123         my $document = $documents->[$i];
124         push @body, {
125             index => {
126                 _id => "$id"
127             }
128         };
129         push @body, $document;
130     }
131     my $response;
132     if (@body) {
133         try{
134             my $elasticsearch = $self->get_elasticsearch();
135             $response = $elasticsearch->bulk(
136                 index => $self->index_name,
137                 type => 'data', # is just hard coded in Indexer.pm?
138                 body => \@body
139             );
140             if ($response->{errors}) {
141                 carp "One or more ElasticSearch errors occurred when indexing documents";
142             }
143         } catch {
144             Koha::Exceptions::Elasticsearch::BadResponse->throw(
145                 type => $_->{type},
146                 details => $_->{text},
147             );
148         };
149     }
150     return $response;
151 }
152
153 =head2 set_index_status_ok
154
155 Convenience method for setting index status to C<INDEX_STATUS_OK>.
156
157 =cut
158
159 sub set_index_status_ok {
160     my ($self) = @_;
161     $self->index_status(INDEX_STATUS_OK);
162 }
163
164 =head2 is_index_status_ok
165
166 Convenience method for checking if index status is C<INDEX_STATUS_OK>.
167
168 =cut
169
170 sub is_index_status_ok {
171     my ($self) = @_;
172     return $self->index_status == INDEX_STATUS_OK;
173 }
174
175 =head2 set_index_status_reindex_required
176
177 Convenience method for setting index status to C<INDEX_REINDEX_REQUIRED>.
178
179 =cut
180
181 sub set_index_status_reindex_required {
182     my ($self) = @_;
183     $self->index_status(INDEX_STATUS_REINDEX_REQUIRED);
184 }
185
186 =head2 is_index_status_reindex_required
187
188 Convenience method for checking if index status is C<INDEX_STATUS_REINDEX_REQUIRED>.
189
190 =cut
191
192 sub is_index_status_reindex_required {
193     my ($self) = @_;
194     return $self->index_status == INDEX_STATUS_REINDEX_REQUIRED;
195 }
196
197 =head2 set_index_status_recreate_required
198
199 Convenience method for setting index status to C<INDEX_STATUS_RECREATE_REQUIRED>.
200
201 =cut
202
203 sub set_index_status_recreate_required {
204     my ($self) = @_;
205     $self->index_status(INDEX_STATUS_RECREATE_REQUIRED);
206 }
207
208 =head2 is_index_status_recreate_required
209
210 Convenience method for checking if index status is C<INDEX_STATUS_RECREATE_REQUIRED>.
211
212 =cut
213
214 sub is_index_status_recreate_required {
215     my ($self) = @_;
216     return $self->index_status == INDEX_STATUS_RECREATE_REQUIRED;
217 }
218
219 =head2 index_status($status)
220
221 Will either set the current index status to C<$status> and return C<$status>,
222 or return the current index status if called with no arguments.
223
224 =over 4
225
226 =item C<$status>
227
228 Optional argument. If passed will set current index status to C<$status> if C<$status> is
229 a valid status. See L</CONSTANTS>.
230
231 =back
232
233 =cut
234
235 sub index_status {
236     my ($self, $status) = @_;
237     my $key = 'ElasticsearchIndexStatus_' . $self->index;
238
239     if (defined $status) {
240         unless (any { $status == $_ } (
241                 INDEX_STATUS_OK,
242                 INDEX_STATUS_REINDEX_REQUIRED,
243                 INDEX_STATUS_RECREATE_REQUIRED,
244             )
245         ) {
246             Koha::Exception->throw("Invalid index status: $status");
247         }
248         C4::Context->set_preference($key, $status);
249         return $status;
250     }
251     else {
252         return C4::Context->preference($key);
253     }
254 }
255
256 =head2 update_mappings
257
258 Generate Elasticsearch mappings from mappings stored in database and
259 perform a request to update Elasticsearch index mappings. Will throw an
260 error and set index status to C<INDEX_STATUS_RECREATE_REQUIRED> if update
261 failes.
262
263 =cut
264
265 sub update_mappings {
266     my ($self) = @_;
267     my $elasticsearch = $self->get_elasticsearch();
268     my $mappings = $self->get_elasticsearch_mappings();
269
270     try {
271         my $response = $elasticsearch->indices->put_mapping(
272             index => $self->index_name,
273             type => 'data',
274             include_type_name => JSON::true(),
275             body => {
276                 data => $mappings
277             }
278         );
279     } catch {
280         $self->set_index_status_recreate_required();
281         my $reason = $_[0]->{vars}->{body}->{error}->{reason};
282         my $index_name = $self->index_name;
283         Koha::Exception->throw(
284             error => "Unable to update mappings for index \"$index_name\". Reason was: \"$reason\". Index needs to be recreated and reindexed",
285         );
286     };
287     $self->set_index_status_ok();
288 }
289
290 =head2 update_index_background($record_numbers, $server)
291
292 This has exactly the same API as C<update_index> however it'll
293 return immediately. It'll start a background process that does the adding.
294
295 If it fails to add to Elasticsearch then it'll add to a queue that will cause
296 it to be updated by a regular index cron job in the future.
297
298 =cut
299
300 sub update_index_background {
301     my ( $self, $record_numbers, $server ) = @_;
302
303     Koha::BackgroundJob::UpdateElasticIndex->new->enqueue({ record_ids => $record_numbers, record_server => $server });
304 }
305
306 =head2 index_records
307
308 This function takes an array of record numbers and fetches the records to send to update_index
309 for actual indexing.
310
311 If $records parameter is provided the records will be used as-is, this is only utilized for authorities
312 at the moment.
313
314 The other variables are used for parity with Zebra indexing calls. Currently the calls are passed through
315 to Zebra as well.
316
317 =cut
318
319 sub index_records {
320     my ( $self, $record_numbers, $op, $server, $records ) = @_;
321     $record_numbers = [$record_numbers] if ref $record_numbers ne 'ARRAY' && defined $record_numbers;
322     $records = [$records] if ref $records ne 'ARRAY' && defined $records;
323     if ( $op eq 'specialUpdate' ) {
324         if ($records){
325             $self->update_index( $record_numbers, $records );
326         } else {
327             $self->update_index_background( $record_numbers, $server );
328         }
329     }
330     elsif ( $op eq 'recordDelete' ) {
331         $self->delete_index_background( $record_numbers );
332     }
333     #FIXME Current behaviour is to index Zebra when using ES, at some point we should stop
334     Koha::SearchEngine::Zebra::Indexer::index_records( $self, $record_numbers, $op, $server, undef );
335 }
336
337 sub _get_record {
338     my ( $self, $record_id ) = @_;
339     return $self->index eq $Koha::SearchEngine::BIBLIOS_INDEX
340         ? C4::Biblio::GetMarcBiblio({ biblionumber => $record_id, embed_items  => 1 })
341         : C4::AuthoritiesMarc::GetAuthority($record_id);
342 }
343
344 =head2 delete_index($biblionums)
345
346 C<$biblionums> is an arrayref of biblionumbers to delete from the index.
347
348 =cut
349
350 sub delete_index {
351     my ($self, $biblionums) = @_;
352
353     my $elasticsearch = $self->get_elasticsearch();
354     my @body = map { { delete => { _id => "$_" } } } @{$biblionums};
355     my $result = $elasticsearch->bulk(
356         index => $self->index_name,
357         type => 'data',
358         include_type_name => JSON::true(),
359         body => \@body,
360     );
361     if ($result->{errors}) {
362         croak "An Elasticsearch error occurred during bulk delete";
363     }
364 }
365
366 =head2 delete_index_background($biblionums)
367
368 Identical to L</delete_index($biblionums)>
369
370 =cut
371
372 # TODO: Should be made async
373 sub delete_index_background {
374     my $self = shift;
375     $self->delete_index(@_);
376 }
377
378 =head2 drop_index
379
380 Drops the index from the Elasticsearch server.
381
382 =cut
383
384 sub drop_index {
385     my ($self) = @_;
386     if ($self->index_exists) {
387         my $elasticsearch = $self->get_elasticsearch();
388         $elasticsearch->indices->delete(index => $self->index_name);
389         $self->set_index_status_recreate_required();
390     }
391 }
392
393 =head2 create_index
394
395 Creates the index (including mappings) on the Elasticsearch server.
396
397 =cut
398
399 sub create_index {
400     my ($self) = @_;
401     my $settings = $self->get_elasticsearch_settings();
402     my $elasticsearch = $self->get_elasticsearch();
403     $elasticsearch->indices->create(
404         index => $self->index_name,
405         body => {
406             settings => $settings
407         }
408     );
409     $self->update_mappings();
410 }
411
412 =head2 index_exists
413
414 Checks if index has been created on the Elasticsearch server. Returns C<1> or the
415 empty string to indicate whether index exists or not.
416
417 =cut
418
419 sub index_exists {
420     my ($self) = @_;
421     my $elasticsearch = $self->get_elasticsearch();
422     return $elasticsearch->indices->exists(
423         index => $self->index_name,
424     );
425 }
426
427 1;
428
429 __END__
430
431 =head1 AUTHOR
432
433 =over 4
434
435 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
436
437 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
438
439 =back