3b25709a76f90c9233de0261b61a4335b59c1829
[srvgit] / misc / search_tools / rebuild_elasticsearch.pl
1 #!/usr/bin/perl
2
3 # This inserts records from a Koha database into elastic search
4
5 # Copyright 2014 Catalyst IT
6 #
7 # This file is part of Koha.
8 #
9 # Koha is free software; you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # Koha is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with Koha; if not, see <http://www.gnu.org/licenses>.
21
22 =head1 NAME
23
24 rebuild_elasticsearch.pl - inserts records from a Koha database into Elasticsearch
25
26 =head1 SYNOPSIS
27
28 B<rebuild_elasticsearch.pl>
29 [B<-c|--commit>=C<count>]
30 [B<-d|--delete>]
31 [B<-r|--reset>]
32 [B<-a|--authorities>]
33 [B<-b|--biblios>]
34 [B<--desc>]
35 [B<-bn|--bnumber>]
36 [B<-ai|--authid>]
37 [B<-p|--processes>]
38 [B<-v|--verbose>]
39 [B<-h|--help>]
40 [B<--man>]
41
42 =head1 DESCRIPTION
43
44 Inserts records from a Koha database into Elasticsearch.
45
46 =head1 OPTIONS
47
48 =over
49
50 =item B<-c|--commit>=C<count>
51
52 Specify how many records will be batched up before they're added to Elasticsearch.
53 Higher should be faster, but will cause more RAM usage. Default is 5000.
54
55 =item B<-d|--delete>
56
57 Delete the index and recreate it before indexing.
58
59 =item B<-r|--reset>
60
61 Reload mappings from files (specified in koha-conf.xml) before indexing.
62 Implies --delete.
63
64 =item B<-a|--authorities>
65
66 Index the authorities only. Combining this with B<-b> is the same as
67 specifying neither and so both get indexed.
68
69 =item B<-b|--biblios>
70
71 Index the biblios only. Combining this with B<-a> is the same as
72 specifying neither and so both get indexed.
73
74 =item B<--desc>
75
76 Index the records in descending id order. Intended to index newer record before older records.
77 Default is to index in ascending order.
78 Does not work with --bnumber or --authid
79
80 =item B<-bn|--bnumber>
81
82 Only index the supplied biblionumber, mostly for testing purposes. May be
83 repeated.
84
85 =item B<-ai|--authid>
86
87 Only index the supplied authority id, mostly for testing purposes. May be
88 repeated.
89
90 =item B<-p|--processes>
91
92 Number of processes to use for indexing. This can be used to do more indexing
93 work in parallel on multicore systems. By default, a single process is used.
94
95 =item B<-v|--verbose>
96
97 By default, this program only emits warnings and errors. This makes it talk
98 more. Add more to make it even more wordy, in particular when debugging.
99
100 =item B<-h|--help>
101
102 Help!
103
104 =item B<--man>
105
106 Full documentation.
107
108 =back
109
110 =head1 IMPLEMENTATION
111
112 =cut
113
114 use autodie;
115 use Getopt::Long;
116 use Koha::Script;
117 use C4::Context;
118 use Koha::MetadataRecord::Authority;
119 use Koha::BiblioUtils;
120 use Koha::SearchEngine::Elasticsearch;
121 use Koha::SearchEngine::Elasticsearch::Indexer;
122 use MARC::Field;
123 use MARC::Record;
124 use Modern::Perl;
125 use Pod::Usage;
126 use Try::Tiny;
127
128 my $verbose = 0;
129 my $commit = 5000;
130 my ($delete, $reset, $help, $man, $processes);
131 my ($index_biblios, $index_authorities);
132 my (@biblionumbers,@authids);
133 my $desc;
134
135 $|=1; # flushes output
136
137 GetOptions(
138     'c|commit=i'    => \$commit,
139     'd|delete'      => \$delete,
140     'r|reset'       => \$reset,
141     'a|authorities' => \$index_authorities,
142     'b|biblios'     => \$index_biblios,
143     'desc'          => \$desc,
144     'bn|bnumber=i'  => \@biblionumbers,
145     'ai|authid=i'   => \@authids,
146     'p|processes=i' => \$processes,
147     'v|verbose+'    => \$verbose,
148     'h|help'        => \$help,
149     'man'           => \$man,
150 );
151
152 # Default is to do both
153 unless ($index_authorities || $index_biblios) {
154     $index_authorities = $index_biblios = 1;
155 }
156
157 if ($processes && ( @biblionumbers || @authids) ) {
158     die "Argument p|processes cannot be combined with bn|bnumber or ai|authid";
159 }
160
161 pod2usage(1) if $help;
162 pod2usage( -exitstatus => 0, -verbose => 2 ) if $man;
163
164 _sanity_check();
165
166 if ($reset){
167     Koha::SearchEngine::Elasticsearch->reset_elasticsearch_mappings;
168     $delete = 1;
169 }
170
171 _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, $delete) if ($index_biblios);
172 _verify_index_state($Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX, $delete) if ($index_authorities);
173
174 my $slice_index = 0;
175 my $slice_count = ( $processes //= 1 );
176 my %iterator_options;
177
178 if ($slice_count > 1) {
179     # Fire up child processes for processing slices from 2 on. This main process will handle slice 1.
180     $slice_index = 0;
181     for (my $proc = 1; $proc < $slice_count; $proc++) {
182         my $pid = fork();
183         die "Failed to fork a child process\n" unless defined $pid;
184         if ($pid == 0) {
185             # Child process, give it a slice to process
186             $slice_index = $proc;
187             last;
188         }
189     }
190     # Fudge the commit count a bit to spread out the Elasticsearch commits
191     $commit *= 1 + 0.10 * $slice_index;
192     $commit = int( $commit );
193     _log(1, "Processing slice @{[$slice_index + 1]} of $slice_count\n");
194     $iterator_options{slice} = { index => $slice_index, count => $slice_count };
195 }
196
197 if( $desc ){
198     $iterator_options{desc} = 1;
199 }
200
201 my $next;
202 if ($index_biblios) {
203     _log(1, "Indexing biblios\n");
204     if (@biblionumbers) {
205         $next = sub {
206             my $r = shift @biblionumbers;
207             return () unless defined $r;
208             return ($r, Koha::BiblioUtils->get_from_biblionumber($r, item_data => 1 ));
209         };
210     } else {
211         my $records = Koha::BiblioUtils->get_all_biblios_iterator(%iterator_options);
212         $next = sub {
213             $records->next();
214         }
215     }
216     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
217 }
218 if ($index_authorities) {
219     _log(1, "Indexing authorities\n");
220     if (@authids) {
221         $next = sub {
222             my $r = shift @authids;
223             return () unless defined $r;
224             my $a = Koha::MetadataRecord::Authority->get_from_authid($r);
225             return ($r, $a);
226         };
227     } else {
228         my $records = Koha::MetadataRecord::Authority->get_all_authorities_iterator(%iterator_options);
229         $next = sub {
230             $records->next();
231         }
232     }
233     _do_reindex($next, $Koha::SearchEngine::Elasticsearch::AUTHORITIES_INDEX);
234 }
235
236 if ($slice_index == 0) {
237     # Main process, wait for children
238     for (my $proc = 1; $proc < $processes; $proc++) {
239         wait();
240     }
241 }
242
243 =head1 INTERNAL METHODS
244
245 =head2 _verify_index_state
246
247     _verify_index_state($Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX, 1);
248
249 Checks the index state and recreates it if requested.
250
251 =cut
252
253 sub _verify_index_state {
254     my ( $index_name, $recreate ) = @_;
255
256     _log(1, "Checking state of $index_name index\n");
257     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
258
259     if ($recreate) {
260         _log(1, "Dropping and recreating $index_name index\n");
261         $indexer->drop_index() if $indexer->index_exists();
262         $indexer->create_index();
263     }
264     elsif (!$indexer->index_exists) {
265         # Create index if does not exist
266         $indexer->create_index();
267     } elsif ($indexer->is_index_status_ok) {
268         # Update mapping unless index is some kind of problematic state
269         $indexer->update_mappings();
270     } elsif ($indexer->is_index_status_recreate_required) {
271         warn qq/Index "$index_name" has status "recreate required", suggesting it should be recreated/;
272     }
273 }
274
275 =head2 _do_reindex
276
277     _do_reindex($callback, $Koha::SearchEngine::Elasticsearch::BIBLIOS_INDEX);
278
279 Does the actual reindexing. $callback is a function that always returns the next record.
280 For each index we iterate through the records, committing at specified count
281
282 =cut
283
284 sub _do_reindex {
285     my ( $next, $index_name ) = @_;
286
287     my $indexer = Koha::SearchEngine::Elasticsearch::Indexer->new( { index => $index_name } );
288
289     my $count        = 0;
290     my $commit_count = $commit;
291     my ( @id_buffer, @commit_buffer );
292     while ( my $record = $next->() ) {
293         my $id     = $record->id // $record->authid;
294         my $record = $record->record;
295         $count++;
296         if ( $verbose == 1 ) {
297             _log( 1, "$count records processed\n" ) if ( $count % 1000 == 0);
298         } else {
299             _log( 2, "$id\n" );
300         }
301
302         push @id_buffer,     $id;
303         push @commit_buffer, $record;
304         if ( !( --$commit_count ) ) {
305             _log( 1, "Committing $commit records...\n" );
306             my $response;
307             try{
308                 $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
309                 _handle_response($response);
310                 _log( 1, "Commit complete\n" );
311             } catch {
312                 _log(1,"Elasticsearch exception thrown: ".$_->type."\n");
313                 _log(2,"Details: ".$_->details."\n");
314             };
315             $commit_count  = $commit;
316             @id_buffer     = ();
317             @commit_buffer = ();
318         }
319     }
320
321     # There are probably uncommitted records
322     _log( 1, "Committing final records...\n" );
323     my $response = $indexer->update_index( \@id_buffer, \@commit_buffer );
324     _handle_response($response);
325     _log( 1, "Total $count records indexed\n" );
326 }
327
328 =head2 _sanity_check
329
330     _sanity_check();
331
332 Checks some basic stuff to ensure that it's sane before we start.
333
334 =cut
335
336 sub _sanity_check {
337     # Do we have an elasticsearch block defined?
338     my $conf = C4::Context->config('elasticsearch');
339     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
340 }
341
342 =head2 _handle_response
343
344 Parse the return from update_index and display errors depending on verbosity of the script
345
346 =cut
347
348 sub _handle_response {
349     my ($response) = @_;
350     if( $response->{errors} eq 'true' ){
351         _log( 1, "There were errors during indexing\n" );
352         if ( $verbose > 1 ){
353             foreach my $item (@{$response->{items}}){
354                 next unless defined $item->{index}->{error};
355                 print "Record #" . $item->{index}->{_id} . " " .
356                       $item->{index}->{error}->{reason} . " (" . $item->{index}->{error}->{type} . ") : " .
357                       $item->{index}->{error}->{caused_by}->{type} . " (" . $item->{index}->{error}->{caused_by}->{reason} . ")\n";
358             }
359         }
360     }
361 }
362
363 =head2 _log
364
365     _log($level, "Message\n");
366
367 Output progress information.
368
369 Will output the message if verbosity level is set to $level or more. Will not
370 include a trailing newline automatically.
371
372 =cut
373
374 sub _log {
375     my ($level, $msg) = @_;
376
377     print "[$$] $msg" if ($verbose >= $level);
378 }