Bug 19893: Restore and fix removed tests
[srvgit] / Koha / SearchEngine / Elasticsearch / Indexer.pm
1 package Koha::SearchEngine::Elasticsearch::Indexer;
2
3 # Copyright 2013 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use Carp;
21 use Modern::Perl;
22 use Try::Tiny;
23 use List::Util qw(any);
24 use base qw(Koha::SearchEngine::Elasticsearch);
25 use Data::Dumper;
26
27 # For now just marc, but we can do anything here really
28 use Catmandu::Importer::MARC;
29 use Catmandu::Store::ElasticSearch;
30
31 use Koha::Exceptions;
32 use C4::Context;
33
34 Koha::SearchEngine::Elasticsearch::Indexer->mk_accessors(qw( store ));
35
36 =head1 NAME
37
38 Koha::SearchEngine::Elasticsearch::Indexer - handles adding new records to the index
39
40 =head1 SYNOPSIS
41
42     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new(
43         { index => Koha::SearchEngine::BIBLIOS_INDEX } );
44     $indexer->drop_index();
45     $indexer->update_index(\@biblionumbers, \@records);
46
47 =head1 FUNCTIONS
48
49 =head2 $indexer->update_index($biblionums, $records);
50
51 C<$biblionums> is an arrayref containing the biblionumbers for the records.
52
53 C<$records> is an arrayref containing the L<MARC::Record>s themselves.
54
55 The values in the arrays must match up, and the 999$c value in the MARC record
56 will be rewritten using the values in C<$biblionums> to ensure they are correct.
57 If C<$biblionums> is C<undef>, this won't happen, but you should be sure that
58 999$c is correct on your own then.
59
60 Note that this will modify the original record if C<$biblionums> is supplied.
61 If that's a problem, clone them first.
62
63 =cut
64
65 use constant {
66     INDEX_STATUS_OK => 0,
67     INDEX_STATUS_REINDEX_REQUIRED => 1, # Not currently used, but could be useful later, for example if can detect when new field or mapping added
68     INDEX_STATUS_RECREATE_REQUIRED => 2,
69 };
70
71 sub update_index {
72     my ($self, $biblionums, $records) = @_;
73
74     # TODO should have a separate path for dealing with a large number
75     # of records at once where we use the bulk update functions in ES.
76     if ($biblionums) {
77         $self->_sanitise_records($biblionums, $records);
78     }
79
80     $self->bulk_index($records);
81     return 1;
82 }
83
84 sub bulk_index {
85     my ($self, $records) = @_;
86     my $conf = $self->get_elasticsearch_params();
87     my $elasticsearch = $self->get_elasticsearch();
88     my $documents = $self->marc_records_to_documents($records);
89     my @body;
90
91     foreach my $document_info (@{$documents}) {
92         my ($id, $document) = @{$document_info};
93         push @body, {
94             index => {
95                 _id => $id
96             }
97         };
98         push @body, $document;
99     }
100     if (@body) {
101         my $response = $elasticsearch->bulk(
102             index => $conf->{index_name},
103             type => 'data', # is just hard coded in Indexer.pm?
104             body => \@body
105         );
106     }
107     # TODO: handle response
108     return 1;
109 }
110
111 sub index_status_ok {
112     my ($self, $set) = @_;
113     return defined $set ?
114         $self->index_status(INDEX_STATUS_OK) :
115         $self->index_status == INDEX_STATUS_OK;
116 }
117
118 sub index_status_reindex_required {
119     my ($self, $set) = @_;
120     return defined $set ?
121         $self->index_status(INDEX_STATUS_REINDEX_REQUIRED) :
122         $self->index_status == INDEX_STATUS_REINDEX_REQUIRED;
123 }
124
125 sub index_status_recreate_required {
126     my ($self, $set) = @_;
127     return defined $set ?
128         $self->index_status(INDEX_STATUS_RECREATE_REQUIRED) :
129         $self->index_status == INDEX_STATUS_RECREATE_REQUIRED;
130 }
131
132 sub index_status {
133     my ($self, $status) = @_;
134     my $key = 'ElasticsearchIndexStatus_' . $self->index;
135
136     if (defined $status) {
137         unless (any { $status == $_ } (
138                 INDEX_STATUS_OK,
139                 INDEX_STATUS_REINDEX_REQUIRED,
140                 INDEX_STATUS_RECREATE_REQUIRED,
141             )
142         ) {
143             Koha::Exceptions::Exception->throw("Invalid index status: $status");
144         }
145         C4::Context->set_preference($key, $status);
146         return $status;
147     }
148     else {
149         return C4::Context->preference($key);
150     }
151 }
152
153 sub update_mappings {
154     my ($self) = @_;
155     my $conf = $self->get_elasticsearch_params();
156     my $elasticsearch = $self->get_elasticsearch();
157     my $mappings = $self->get_elasticsearch_mappings();
158
159     foreach my $type (keys %{$mappings}) {
160         try {
161             my $response = $elasticsearch->indices->put_mapping(
162                 index => $conf->{index_name},
163                 type => $type,
164                 body => {
165                     $type => $mappings->{$type}
166                 }
167             );
168         } catch {
169             $self->index_status_recreate_required(1);
170             my $reason = $_[0]->{vars}->{body}->{error}->{reason};
171             Koha::Exceptions::Exception->throw(
172                 error => "Unable to update mappings for index \"$conf->{index_name}\". Reason was: \"$reason\". Index needs to be recreated and reindexed",
173             );
174         };
175     }
176     $self->index_status_ok(1);
177 }
178
179 =head2 $indexer->update_index_background($biblionums, $records)
180
181 This has exactly the same API as C<update_index_background> however it'll
182 return immediately. It'll start a background process that does the adding.
183
184 If it fails to add to Elasticsearch then it'll add to a queue that will cause
185 it to be updated by a regular index cron job in the future.
186
187 # TODO implement in the future - I don't know the best way of doing this yet.
188 # If fork: make sure process group is changed so apache doesn't wait for us.
189
190 =cut
191
192 sub update_index_background {
193     my $self = shift;
194     $self->update_index(@_);
195 }
196
197 =head2 $indexer->delete_index($biblionums)
198
199 C<$biblionums> is an arrayref of biblionumbers to delete from the index.
200
201 =cut
202
203 sub delete_index {
204     my ($self, $biblionums) = @_;
205
206     if ( !$self->store ) {
207         my $params  = $self->get_elasticsearch_params();
208         $self->store(
209             Catmandu::Store::ElasticSearch->new(
210                 %$params,
211                 index_settings => $self->get_elasticsearch_settings(),
212                 index_mappings => $self->get_elasticsearch_mappings(),
213             )
214         );
215     }
216     $self->store->bag->delete($_) foreach @$biblionums;
217     $self->store->bag->commit;
218 }
219
220 =head2 $indexer->delete_index_background($biblionums)
221
222 Identical to L<delete_index>, this will return immediately and start a
223 background process to do the actual deleting.
224
225 =cut
226
227 # TODO implement in the future
228
229 sub delete_index_background {
230     my $self = shift;
231     $self->delete_index(@_);
232 }
233
234 =head2 $indexer->drop_index();
235
236 Drops the index from the elasticsearch server.
237
238 =cut
239
240 sub drop_index {
241     my ($self) = @_;
242     if ($self->index_exists) {
243         my $conf = $self->get_elasticsearch_params();
244         my $elasticsearch = $self->get_elasticsearch();
245         $elasticsearch->indices->delete(index => $conf->{index_name});
246         $self->index_status_recreate_required(1);
247     }
248 }
249
250 sub create_index {
251     my ($self) = @_;
252     my $conf = $self->get_elasticsearch_params();
253     my $settings = $self->get_elasticsearch_settings();
254     my $elasticsearch = $self->get_elasticsearch();
255     $elasticsearch->indices->create(
256         index => $conf->{index_name},
257         body => {
258             settings => $settings
259         }
260     );
261     $self->update_mappings();
262 }
263
264 sub index_exists {
265     my ($self) = @_;
266     my $conf = $self->get_elasticsearch_params();
267     my $elasticsearch = $self->get_elasticsearch();
268     return $elasticsearch->indices->exists(
269         index => $conf->{index_name},
270     );
271 }
272
273 sub _sanitise_records {
274     my ($self, $biblionums, $records) = @_;
275
276     confess "Unequal number of values in \$biblionums and \$records." if (@$biblionums != @$records);
277
278     my $c = @$biblionums;
279     for (my $i=0; $i<$c; $i++) {
280         my $bibnum = $biblionums->[$i];
281         my $rec = $records->[$i];
282         # I've seen things you people wouldn't believe. Attack ships on fire
283         # off the shoulder of Orion. I watched C-beams glitter in the dark near
284         # the Tannhauser gate. MARC records where 999$c doesn't match the
285         # biblionumber column. All those moments will be lost in time... like
286         # tears in rain...
287         if ( $rec ) {
288             $rec->delete_fields($rec->field('999'));
289             $rec->append_fields(MARC::Field->new('999','','','c' => $bibnum, 'd' => $bibnum));
290         }
291     }
292 }
293
294 1;
295
296 __END__
297
298 =head1 AUTHOR
299
300 =over 4
301
302 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
303
304 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
305
306 =back