0c9b3ff380fa8909a9030d67e866ac28d794da23
[srvgit] / Koha / SearchEngine / Elasticsearch / Indexer.pm
1 package Koha::SearchEngine::Elasticsearch::Indexer;
2
3 # Copyright 2013 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use Carp;
21 use Modern::Perl;
22 use Try::Tiny;
23 use List::Util qw(any);
24 use base qw(Koha::SearchEngine::Elasticsearch);
25 use Data::Dumper;
26
27 use Koha::Exceptions;
28 use Koha::Exceptions::Elasticsearch;
29 use Koha::SearchEngine::Zebra::Indexer;
30 use C4::AuthoritiesMarc qw//;
31 use C4::Biblio;
32 use C4::Context;
33
34 =head1 NAME
35
36 Koha::SearchEngine::Elasticsearch::Indexer - handles adding new records to the index
37
38 =head1 SYNOPSIS
39
40     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new(
41         { index => Koha::SearchEngine::BIBLIOS_INDEX } );
42     $indexer->drop_index();
43     $indexer->update_index(\@biblionumbers, \@records);
44
45
46 =head1 CONSTANTS
47
48 =over 4
49
50 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_OK>
51
52 Represents an index state where index is created and in a working state.
53
54 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_REINDEX_REQUIRED>
55
56 Not currently used, but could be useful later, for example if can detect when new field or mapping added.
57
58 =item C<Koha::SearchEngine::Elasticsearch::Indexer::INDEX_STATUS_RECREATE_REQUIRED>
59
60 Representings an index state where index needs to be recreated and is not in a working state.
61
62 =back
63
64 =cut
65
66 use constant {
67     INDEX_STATUS_OK => 0,
68     INDEX_STATUS_REINDEX_REQUIRED => 1,
69     INDEX_STATUS_RECREATE_REQUIRED => 2,
70 };
71
72 =head1 FUNCTIONS
73
74 =head2 update_index($biblionums, $records)
75
76     try {
77         $self->update_index($biblionums, $records);
78     } catch {
79         die("Something went wrong trying to update index:" .  $_[0]);
80     }
81
82 Converts C<MARC::Records> C<$records> to Elasticsearch documents and performs
83 an update request for these records on the Elasticsearch index.
84
85 =over 4
86
87 =item C<$biblionums>
88
89 Arrayref of biblio numbers for the C<$records>, the order must be the same as
90 and match up with C<$records>.
91
92 =item C<$records>
93
94 Arrayref of C<MARC::Record>s.
95
96 =back
97
98 =cut
99
100 sub update_index {
101     my ($self, $biblionums, $records) = @_;
102
103     my $documents = $self->marc_records_to_documents($records);
104     my @body;
105     for (my $i = 0; $i < scalar @$biblionums; $i++) {
106         my $id = $biblionums->[$i];
107         my $document = $documents->[$i];
108         push @body, {
109             index => {
110                 _id => "$id"
111             }
112         };
113         push @body, $document;
114     }
115     my $response;
116     if (@body) {
117         try{
118             my $elasticsearch = $self->get_elasticsearch();
119             $response = $elasticsearch->bulk(
120                 index => $self->index_name,
121                 type => 'data', # is just hard coded in Indexer.pm?
122                 body => \@body
123             );
124             if ($response->{errors}) {
125                 carp "One or more ElasticSearch errors occurred when indexing documents";
126             }
127         } catch {
128             Koha::Exceptions::Elasticsearch::BadResponse->throw(
129                 type => $_->{type},
130                 details => $_->{text},
131             );
132         };
133     }
134     return $response;
135 }
136
137 =head2 set_index_status_ok
138
139 Convenience method for setting index status to C<INDEX_STATUS_OK>.
140
141 =cut
142
143 sub set_index_status_ok {
144     my ($self) = @_;
145     $self->index_status(INDEX_STATUS_OK);
146 }
147
148 =head2 is_index_status_ok
149
150 Convenience method for checking if index status is C<INDEX_STATUS_OK>.
151
152 =cut
153
154 sub is_index_status_ok {
155     my ($self) = @_;
156     return $self->index_status == INDEX_STATUS_OK;
157 }
158
159 =head2 set_index_status_reindex_required
160
161 Convenience method for setting index status to C<INDEX_REINDEX_REQUIRED>.
162
163 =cut
164
165 sub set_index_status_reindex_required {
166     my ($self) = @_;
167     $self->index_status(INDEX_STATUS_REINDEX_REQUIRED);
168 }
169
170 =head2 is_index_status_reindex_required
171
172 Convenience method for checking if index status is C<INDEX_STATUS_REINDEX_REQUIRED>.
173
174 =cut
175
176 sub is_index_status_reindex_required {
177     my ($self) = @_;
178     return $self->index_status == INDEX_STATUS_REINDEX_REQUIRED;
179 }
180
181 =head2 set_index_status_recreate_required
182
183 Convenience method for setting index status to C<INDEX_STATUS_RECREATE_REQUIRED>.
184
185 =cut
186
187 sub set_index_status_recreate_required {
188     my ($self) = @_;
189     $self->index_status(INDEX_STATUS_RECREATE_REQUIRED);
190 }
191
192 =head2 is_index_status_recreate_required
193
194 Convenience method for checking if index status is C<INDEX_STATUS_RECREATE_REQUIRED>.
195
196 =cut
197
198 sub is_index_status_recreate_required {
199     my ($self) = @_;
200     return $self->index_status == INDEX_STATUS_RECREATE_REQUIRED;
201 }
202
203 =head2 index_status($status)
204
205 Will either set the current index status to C<$status> and return C<$status>,
206 or return the current index status if called with no arguments.
207
208 =over 4
209
210 =item C<$status>
211
212 Optional argument. If passed will set current index status to C<$status> if C<$status> is
213 a valid status. See L</CONSTANTS>.
214
215 =back
216
217 =cut
218
219 sub index_status {
220     my ($self, $status) = @_;
221     my $key = 'ElasticsearchIndexStatus_' . $self->index;
222
223     if (defined $status) {
224         unless (any { $status == $_ } (
225                 INDEX_STATUS_OK,
226                 INDEX_STATUS_REINDEX_REQUIRED,
227                 INDEX_STATUS_RECREATE_REQUIRED,
228             )
229         ) {
230             Koha::Exceptions::Exception->throw("Invalid index status: $status");
231         }
232         C4::Context->set_preference($key, $status);
233         return $status;
234     }
235     else {
236         return C4::Context->preference($key);
237     }
238 }
239
240 =head2 update_mappings
241
242 Generate Elasticsearch mappings from mappings stored in database and
243 perform a request to update Elasticsearch index mappings. Will throw an
244 error and set index status to C<INDEX_STATUS_RECREATE_REQUIRED> if update
245 failes.
246
247 =cut
248
249 sub update_mappings {
250     my ($self) = @_;
251     my $elasticsearch = $self->get_elasticsearch();
252     my $mappings = $self->get_elasticsearch_mappings();
253
254     foreach my $type (keys %{$mappings}) {
255         try {
256             my $response = $elasticsearch->indices->put_mapping(
257                 index => $self->index_name,
258                 type => $type,
259                 body => {
260                     $type => $mappings->{$type}
261                 }
262             );
263         } catch {
264             $self->set_index_status_recreate_required();
265             my $reason = $_[0]->{vars}->{body}->{error}->{reason};
266             my $index_name = $self->index_name;
267             Koha::Exceptions::Exception->throw(
268                 error => "Unable to update mappings for index \"$index_name\". Reason was: \"$reason\". Index needs to be recreated and reindexed",
269             );
270         };
271     }
272     $self->set_index_status_ok();
273 }
274
275 =head2 update_index_background($biblionums, $records)
276
277 This has exactly the same API as C<update_index> however it'll
278 return immediately. It'll start a background process that does the adding.
279
280 If it fails to add to Elasticsearch then it'll add to a queue that will cause
281 it to be updated by a regular index cron job in the future.
282
283 =cut
284
285 # TODO implement in the future - I don't know the best way of doing this yet.
286 # If fork: make sure process group is changed so apache doesn't wait for us.
287
288 sub update_index_background {
289     my $self = shift;
290     $self->update_index(@_);
291 }
292
293 =head2 index_records
294
295 This function takes an array of record numbers and fetches the records to send to update_index
296 for actual indexing.
297
298 If $records parameter is provided the records will be used as-is, this is only utilized for authorities
299 at the moment.
300
301 The other variables are used for parity with Zebra indexing calls. Currently the calls are passed through
302 to Zebra as well.
303
304 =cut
305
306 sub index_records {
307     my ( $self, $record_numbers, $op, $server, $records ) = @_;
308     $record_numbers = [$record_numbers] if ref $record_numbers ne 'ARRAY' && defined $record_numbers;
309     $records = [$records] if ref $records ne 'ARRAY' && defined $records;
310     if ( $op eq 'specialUpdate' ) {
311         my $index_record_numbers;
312         if ($records){
313             $index_record_numbers = $record_numbers;
314         } else {
315             foreach my $record_number ( @$record_numbers ){
316                 my $record = _get_record( $record_number, $server );
317                 if( $record ){
318                     push @$records, $record;
319                     push @$index_record_numbers, $record_number;
320                 }
321             }
322         }
323         $self->update_index_background( $index_record_numbers, $records ) if $index_record_numbers && $records;
324     }
325     elsif ( $op eq 'recordDelete' ) {
326         $self->delete_index_background( $record_numbers );
327     }
328     #FIXME Current behaviour is to index Zebra when using ES, at some point we should stop
329     Koha::SearchEngine::Zebra::Indexer::index_records( $self, $record_numbers, $op, $server, undef );
330 }
331
332 sub _get_record {
333     my ( $id, $server ) = @_;
334     return $server eq 'biblioserver'
335         ? C4::Biblio::GetMarcBiblio({ biblionumber => $id, embed_items  => 1 })
336         : C4::AuthoritiesMarc::GetAuthority($id);
337 }
338
339 =head2 delete_index($biblionums)
340
341 C<$biblionums> is an arrayref of biblionumbers to delete from the index.
342
343 =cut
344
345 sub delete_index {
346     my ($self, $biblionums) = @_;
347
348     my $elasticsearch = $self->get_elasticsearch();
349     my @body = map { { delete => { _id => "$_" } } } @{$biblionums};
350     my $result = $elasticsearch->bulk(
351         index => $self->index_name,
352         type => 'data',
353         body => \@body,
354     );
355     if ($result->{errors}) {
356         croak "An Elasticsearch error occurred during bulk delete";
357     }
358 }
359
360 =head2 delete_index_background($biblionums)
361
362 Identical to L</delete_index($biblionums)>
363
364 =cut
365
366 # TODO: Should be made async
367 sub delete_index_background {
368     my $self = shift;
369     $self->delete_index(@_);
370 }
371
372 =head2 drop_index
373
374 Drops the index from the Elasticsearch server.
375
376 =cut
377
378 sub drop_index {
379     my ($self) = @_;
380     if ($self->index_exists) {
381         my $elasticsearch = $self->get_elasticsearch();
382         $elasticsearch->indices->delete(index => $self->index_name);
383         $self->set_index_status_recreate_required();
384     }
385 }
386
387 =head2 create_index
388
389 Creates the index (including mappings) on the Elasticsearch server.
390
391 =cut
392
393 sub create_index {
394     my ($self) = @_;
395     my $settings = $self->get_elasticsearch_settings();
396     my $elasticsearch = $self->get_elasticsearch();
397     $elasticsearch->indices->create(
398         index => $self->index_name,
399         body => {
400             settings => $settings
401         }
402     );
403     $self->update_mappings();
404 }
405
406 =head2 index_exists
407
408 Checks if index has been created on the Elasticsearch server. Returns C<1> or the
409 empty string to indicate whether index exists or not.
410
411 =cut
412
413 sub index_exists {
414     my ($self) = @_;
415     my $elasticsearch = $self->get_elasticsearch();
416     return $elasticsearch->indices->exists(
417         index => $self->index_name,
418     );
419 }
420
421 1;
422
423 __END__
424
425 =head1 AUTHOR
426
427 =over 4
428
429 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
430
431 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
432
433 =back