Revert "Bug 15187: Index 880 in Zebra the same as Elasticsearch"
[srvgit] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use Koha::Script;
21 use C4::Context;
22 use Getopt::Long qw( GetOptions );
23 use Fcntl qw( LOCK_EX LOCK_NB LOCK_UN );
24 use File::Temp qw( tempdir );
25 use File::Path qw( mkpath rmtree );
26 use C4::Biblio qw( GetXmlBiblio );
27 use C4::AuthoritiesMarc qw( GetAuthority GetAuthorityXML );
28 use C4::Items qw( Item2Marc );
29 use Koha::RecordProcessor;
30 use Koha::Caches;
31 use XML::LibXML;
32
33 use constant LOCK_FILENAME => 'rebuild..LCK';
34
35 # script that checks zebradir structure & create directories & mandatory files if needed
36 #
37 #
38
39 $|=1; # flushes output
40 # If the cron job starts us in an unreadable dir, we will break without
41 # this.
42 chdir $ENV{HOME} if (!(-r '.'));
43 my $daemon_mode;
44 my $daemon_sleep = 5;
45 my $directory;
46 my $nosanitize;
47 my $skip_export;
48 my $keep_export;
49 my $skip_index;
50 my $reset;
51 my $biblios;
52 my $authorities;
53 my $as_xml;
54 my $noshadow;
55 my $want_help;
56 my $process_zebraqueue;
57 my $process_zebraqueue_skip_deletes;
58 my $do_not_clear_zebraqueue;
59 my $length;
60 my $where;
61 my $offset;
62 my $run_as_root;
63 my $run_user = (getpwuid($<))[0];
64 my $wait_for_lock = 0;
65 my $use_flock;
66 my $table = 'biblioitems';
67 my $is_memcached = Koha::Caches->get_instance->memcached_cache;
68
69 my $verbose_logging = 0;
70 my $zebraidx_log_opt = " -v none,fatal,warn ";
71 my $result = GetOptions(
72     'daemon'        => \$daemon_mode,
73     'sleep:i'       => \$daemon_sleep,
74     'd:s'           => \$directory,
75     'r|reset'       => \$reset,
76     's'             => \$skip_export,
77     'k'             => \$keep_export,
78     'I|skip-index'  => \$skip_index,
79     'nosanitize'    => \$nosanitize,
80     'b'             => \$biblios,
81     'w'             => \$noshadow,
82     'a'             => \$authorities,
83     'h|help'        => \$want_help,
84     'x'             => \$as_xml,
85     'y'             => \$do_not_clear_zebraqueue,
86     'z'             => \$process_zebraqueue,
87     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
88     'where:s'       => \$where,
89     'length:i'      => \$length,
90     'offset:i'      => \$offset,
91     'v+'            => \$verbose_logging,
92     'run-as-root'   => \$run_as_root,
93     'wait-for-lock' => \$wait_for_lock,
94     't|table:s'     => \$table,
95 );
96
97 if (not $result or $want_help) {
98     print_usage();
99     exit 0;
100 }
101
102 if ( $as_xml ) {
103     warn "Warning: You passed -x which is already the default and is now deprecated\n";
104     undef $as_xml; # Should not be used later
105 }
106
107 if( not defined $run_as_root and $run_user eq 'root') {
108     my $msg = "Warning: You are running this script as the user 'root'.\n";
109     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
110     $msg   .= "Please do '$0 --help' to see usage.\n";
111     die $msg;
112 }
113
114 if ($process_zebraqueue and ($skip_export or $reset)) {
115     my $msg = "Cannot specify -r or -s if -z is specified\n";
116     $msg   .= "Please do '$0 --help' to see usage.\n";
117     die $msg;
118 }
119
120 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
121     my $msg = "Cannot specify both -y and -z\n";
122     $msg   .= "Please do '$0 --help' to see usage.\n";
123     die $msg;
124 }
125
126 if ($daemon_mode) {
127     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
128     if ($skip_export or $keep_export or $skip_index or
129           $where or $length or $offset) {
130         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
131         $msg   .= "Please do '$0 --help' to see usage.\n";
132         die $msg;
133     }
134     unless ($is_memcached) {
135         warn "Warning: script running in daemon mode, without recommended caching system (memcached).\n";
136     }
137     $authorities = 1;
138     $biblios = 1;
139     $process_zebraqueue = 1;
140 }
141
142 if (not $biblios and not $authorities) {
143     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
144     $msg   .= "Please do '$0 --help' to see usage.\n";
145     die $msg;
146 }
147
148 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio', 'biblio_metadata' );
149 unless ( grep { $_ eq $table } @tables_allowed_for_select ) {
150     die "Cannot specify -t|--table with value '$table'. Only "
151       . ( join ', ', @tables_allowed_for_select )
152       . " are allowed.";
153 }
154
155
156 #  -v is for verbose, which seems backwards here because of how logging is set
157 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
158 if ($verbose_logging >= 2) {
159     $zebraidx_log_opt = '-v none,fatal,warn,all';
160 }
161
162 my $use_tempdir = 0;
163 unless ($directory) {
164     $use_tempdir = 1;
165     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
166 }
167
168
169 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
170 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
171
172 my $kohadir = C4::Context->config('intranetdir');
173
174 my ($biblionumbertagfield,$biblionumbertagsubfield) = C4::Biblio::GetMarcFromKohaField( "biblio.biblionumber" );
175 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = C4::Biblio::GetMarcFromKohaField( "biblioitems.biblioitemnumber" );
176
177 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
178 <collection xmlns="http://www.loc.gov/MARC21/slim">
179 };
180
181 my $marcxml_close = q{
182 </collection>
183 };
184
185 # Protect again simultaneous update of the zebra index by using a lock file.
186 # Create our own lock directory if it is missing. This should be created
187 # by koha-zebra-ctl.sh or at system installation. If the desired directory
188 # does not exist and cannot be created, we fall back on /tmp - which will
189 # always work.
190
191 my ($lockfile, $LockFH);
192 foreach (
193     C4::Context->config("zebra_lockdir"),
194     '/var/lock/zebra_' . C4::Context->config('database'),
195     '/tmp/zebra_' . C4::Context->config('database')
196 ) {
197     #we try three possibilities (we really want to lock :)
198     next if !$_;
199     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
200     last if defined $LockFH;
201 }
202 if( !defined $LockFH ) {
203     print "WARNING: Could not create lock file $lockfile: $!\n";
204     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
205     print "Verify file permissions for it too.\n";
206     $use_flock = 0; # we disable file locking now and will continue
207                     # without it
208                     # note that this mimics old behavior (before we used
209                     # the lockfile)
210 };
211
212 my $start_time = time();
213 if ( $verbose_logging ) {
214     my $pretty_time = POSIX::strftime("%H:%M:%S",localtime($start_time));
215     print "Zebra configuration information\n";
216     print "================================\n";
217     print "Zebra biblio directory      = $biblioserverdir\n";
218     print "Zebra authorities directory = $authorityserverdir\n";
219     print "Koha directory              = $kohadir\n";
220     print "Lockfile                    = $lockfile\n" if $lockfile;
221     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
222     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
223     print "================================\n";
224     print "Job started: $pretty_time\n";
225 }
226
227 my $tester = XML::LibXML->new();
228 my $dbh;
229
230 # The main work is done here by calling do_one_pass().  We have added locking
231 # avoid race conditions between full rebuilds and incremental updates either from
232 # daemon mode or periodic invocation from cron.  The race can lead to an updated
233 # record being overwritten by a rebuild if the update is applied after the export
234 # by the rebuild and before the rebuild finishes (more likely to affect large
235 # catalogs).
236 #
237 # We have chosen to exit immediately by default if we cannot obtain the lock
238 # to prevent the potential for a infinite backlog from cron invocations, but an
239 # option (wait-for-lock) is provided to let the program wait for the lock.
240 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
241 if ($daemon_mode) {
242     while (1) {
243         # For incremental updates, skip the update if the updates are locked
244         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
245             eval {
246                 $dbh = C4::Context->dbh;
247                 if( zebraqueue_not_empty() ) {
248                     Koha::Caches->flush_L1_caches() if $is_memcached;
249                     do_one_pass();
250                 }
251             };
252             if ($@ && $verbose_logging) {
253                 warn "Warning : $@\n";
254             }
255             _flock($LockFH, LOCK_UN);
256         }
257         sleep $daemon_sleep;
258     }
259 } else {
260     # all one-off invocations
261     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
262     if (_flock($LockFH, $lock_mode)) {
263         $dbh = C4::Context->dbh;
264         do_one_pass();
265         _flock($LockFH, LOCK_UN);
266     } else {
267         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
268     }
269 }
270
271
272 if ( $verbose_logging ) {
273     print "====================\n";
274     print "Indexing complete: ". pretty_time() . "\n";
275     print "====================\n";
276     print "CLEANING\n";
277     print "====================\n";
278 }
279 if ($keep_export) {
280     print "NOTHING cleaned : the export $directory has been kept.\n";
281     print "You can re-run this script with the -s ";
282     if ($use_tempdir) {
283         print " and -d $directory parameters";
284     } else {
285         print "parameter";
286     }
287     print "\n";
288     print "if you just want to rebuild zebra after changing zebra config files\n";
289 } else {
290     unless ($use_tempdir) {
291         # if we're using a temporary directory
292         # created by File::Temp, it will be removed
293         # automatically.
294         rmtree($directory, 0, 1);
295         print "directory $directory deleted\n";
296     }
297 }
298
299 sub do_one_pass {
300     if ($authorities) {
301         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
302     } else {
303         print "skipping authorities\n" if ( $verbose_logging );
304     }
305
306     if ($biblios) {
307         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
308     } else {
309         print "skipping biblios\n" if ( $verbose_logging );
310     }
311 }
312
313 # Check the zebra update queue and return true if there are records to process
314 # This routine will handle each of -ab, -a, or -b, but in practice we force
315 # -ab when in daemon mode.
316 sub zebraqueue_not_empty {
317     my $where_str;
318
319     if ($authorities && $biblios) {
320         $where_str = 'done = 0;';
321     } elsif ($biblios) {
322         $where_str = 'server = "biblioserver" AND done = 0;';
323     } else {
324         $where_str = 'server = "authorityserver" AND done = 0;';
325     }
326     my $query =
327         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
328
329     $query->execute;
330     my $count = $query->fetchrow_arrayref->[0];
331     print "queued records: $count\n" if $verbose_logging > 0;
332     return $count > 0;
333 }
334
335 # This checks to see if the zebra directories exist under the provided path.
336 # If they don't, then zebra is likely to spit the dummy. This returns true
337 # if the directories had to be created, false otherwise.
338 sub check_zebra_dirs {
339     my ($base) = shift() . '/';
340     my $needed_repairing = 0;
341     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
342     foreach my $dir (@dirs) {
343         my $bdir = $base . $dir;
344         if (! -d $bdir) {
345             $needed_repairing = 1;
346             mkdir $bdir || die "Unable to create '$bdir': $!\n";
347             print "$0: needed to create '$bdir'\n";
348         }
349     }
350     return $needed_repairing;
351 }   # ----------  end of subroutine check_zebra_dirs  ----------
352
353 sub index_records {
354     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
355
356     my $num_records_exported = 0;
357     my $records_deleted = {};
358     my $need_reset = check_zebra_dirs($server_dir);
359     if ($need_reset) {
360         print "$0: found broken zebra server directories: forcing a rebuild\n";
361         $reset = 1;
362     }
363     if ($skip_export && $verbose_logging) {
364         print "====================\n";
365         print "SKIPPING $record_type export\n";
366         print "====================\n";
367     } else {
368         if ( $verbose_logging ) {
369             print "====================\n";
370             print "exporting $record_type " . pretty_time() . "\n";
371             print "====================\n";
372         }
373         mkdir "$directory" unless (-d $directory);
374         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
375         if ($process_zebraqueue) {
376             my $entries;
377
378             unless ( $process_zebraqueue_skip_deletes ) {
379                 $entries = select_zebraqueue_records($record_type, 'deleted');
380                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
381                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type");
382                 mark_zebraqueue_batch_done($entries);
383             }
384
385             $entries = select_zebraqueue_records($record_type, 'updated');
386             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
387             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $records_deleted);
388             mark_zebraqueue_batch_done($entries);
389
390         } else {
391             my $sth = select_all_records($record_type);
392             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $nosanitize);
393             unless ($do_not_clear_zebraqueue) {
394                 mark_all_zebraqueue_done($record_type);
395             }
396         }
397     }
398
399     #
400     # and reindexing everything
401     #
402     if ($skip_index) {
403         if ($verbose_logging) {
404             print "====================\n";
405             print "SKIPPING $record_type indexing\n";
406             print "====================\n";
407         }
408     } else {
409         if ( $verbose_logging ) {
410             print "====================\n";
411             print "REINDEXING zebra " . pretty_time() . "\n";
412             print "====================\n";
413         }
414         my $record_fmt = 'marcxml';
415         if ($process_zebraqueue) {
416             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
417                 if %$records_deleted;
418             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
419                 if $num_records_exported;
420         } else {
421             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
422                 if ($num_records_exported or $skip_export);
423         }
424     }
425 }
426
427
428 sub select_zebraqueue_records {
429     my ($record_type, $update_type) = @_;
430
431     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
432     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
433
434     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
435                              FROM zebraqueue
436                              WHERE server = ?
437                              AND   operation = ?
438                              AND   done = 0
439                              ORDER BY id DESC");
440     $sth->execute($server, $op);
441     my $entries = $sth->fetchall_arrayref({});
442 }
443
444 sub mark_all_zebraqueue_done {
445     my ($record_type) = @_;
446
447     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
448
449     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
450                              WHERE server = ?
451                              AND done = 0");
452     $sth->execute($server);
453 }
454
455 sub mark_zebraqueue_batch_done {
456     my ($entries) = @_;
457
458     $dbh->{AutoCommit} = 0;
459     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
460     $dbh->commit();
461     foreach my $id (map { $_->{id} } @$entries) {
462         $sth->execute($id);
463     }
464     $dbh->{AutoCommit} = 1;
465 }
466
467 sub select_all_records {
468     my $record_type = shift;
469     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
470 }
471
472 sub select_all_authorities {
473     my $strsth=qq{SELECT authid FROM auth_header};
474     $strsth.=qq{ WHERE $where } if ($where);
475     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
476     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
477     my $sth = $dbh->prepare($strsth);
478     $sth->execute();
479     return $sth;
480 }
481
482 sub select_all_biblios {
483     $table = 'biblioitems'
484       unless grep { $_ eq $table } @tables_allowed_for_select;
485     my $strsth = qq{ SELECT DISTINCT biblionumber FROM $table };
486     $strsth.=qq{ WHERE $where } if ($where);
487     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
488     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
489     my $sth = $dbh->prepare($strsth);
490     $sth->execute();
491     return $sth;
492 }
493
494 sub export_marc_records_from_sth {
495     my ($record_type, $sth, $directory, $nosanitize) = @_;
496
497     my $num_exported = 0;
498     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
499
500     print {$fh} $marcxml_open;
501
502     my $i = 0;
503     my ( $itemtag, $itemsubfield ) = C4::Biblio::GetMarcFromKohaField( "items.itemnumber" );
504     while (my ($record_number) = $sth->fetchrow_array) {
505         print "." if ( $verbose_logging );
506         print "\r$i" unless ($i++ %100 or !$verbose_logging);
507         if ( $nosanitize ) {
508             my $marcxml = $record_type eq 'biblio'
509                           ? GetXmlBiblio( $record_number )
510                           : GetAuthorityXML( $record_number );
511             if ($record_type eq 'biblio'){
512                 my $biblio = Koha::Biblios->find($record_number);
513                 next unless $biblio;
514                 my $items = $biblio->items;
515                 if ($items->count){
516                     my $record = MARC::Record->new;
517                     $record->encoding('UTF-8');
518                     my @itemsrecord;
519                     for my $item ( @{$items->unblessed} ) {
520                         my $record = Item2Marc($item, $record_number);
521                         push @itemsrecord, $record->field($itemtag);
522                     }
523                     $record->insert_fields_ordered(@itemsrecord);
524                     my $itemsxml = $record->as_xml_record();
525                     $marcxml =
526                         substr($marcxml, 0, length($marcxml)-10) .
527                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
528                 }
529             }
530             # extra test to ensure that result is valid XML; otherwise
531             # Zebra won't parse it in DOM mode
532             eval {
533                 my $doc = $tester->parse_string($marcxml);
534             };
535             if ($@) {
536                 warn "Error exporting record $record_number ($record_type): $@\n";
537                 next;
538             }
539             if ( $marcxml ) {
540                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
541                 print {$fh} $marcxml;
542                 $num_exported++;
543             }
544             next;
545         }
546         my ($marc) = get_corrected_marc_record($record_type, $record_number);
547         if (defined $marc) {
548             eval {
549                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
550                 eval {
551                     my $doc = $tester->parse_string($rec);
552                 };
553                 if ($@) {
554                     die "invalid XML: $@";
555                 }
556                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
557                 print {$fh} $rec;
558                 $num_exported++;
559             };
560             if ($@) {
561                 warn "Error exporting record $record_number ($record_type) XML";
562                 warn "... specific error is $@" if $verbose_logging;
563             }
564         }
565     }
566     print "\nRecords exported: $num_exported " . pretty_time() . "\n" if ( $verbose_logging );
567     print {$fh} $marcxml_close;
568
569     close $fh;
570     return $num_exported;
571 }
572
573 sub export_marc_records_from_list {
574     my ($record_type, $entries, $directory, $records_deleted) = @_;
575
576     my $num_exported = 0;
577     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
578
579     print {$fh} $marcxml_open;
580
581     my $i = 0;
582
583     # Skip any deleted records. We check for this anyway, but this reduces error spam
584     my %found = %$records_deleted;
585     foreach my $record_number ( map { $_->{biblio_auth_number} }
586                                 grep { !$found{ $_->{biblio_auth_number} }++ }
587                                 @$entries ) {
588         print "." if ( $verbose_logging );
589         print "\r$i" unless ($i++ %100 or !$verbose_logging);
590         my ($marc) = get_corrected_marc_record($record_type, $record_number);
591         if (defined $marc) {
592             eval {
593                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
594                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
595                 print {$fh} $rec;
596                 $num_exported++;
597             };
598             if ($@) {
599               warn "Error exporting record $record_number ($record_type) XML";
600             }
601         }
602     }
603     print "\nRecords exported: $num_exported " . pretty_time() . "\n" if ( $verbose_logging );
604
605     print {$fh} $marcxml_close;
606
607     close $fh;
608     return $num_exported;
609 }
610
611 sub generate_deleted_marc_records {
612
613     my ($record_type, $entries, $directory) = @_;
614
615     my $records_deleted = {};
616     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
617
618     print {$fh} $marcxml_open;
619
620     my $i = 0;
621     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
622         print "\r$i" unless ($i++ %100 or !$verbose_logging);
623         print "." if ( $verbose_logging );
624
625         my $marc = MARC::Record->new();
626         if ($record_type eq 'biblio') {
627             fix_biblio_ids($marc, $record_number, $record_number);
628         } else {
629             fix_authority_id($marc, $record_number);
630         }
631         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
632             fix_unimarc_100($marc);
633         }
634
635         my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
636         # Remove the record's XML header
637         $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
638         print {$fh} $rec;
639
640         $records_deleted->{$record_number} = 1;
641     }
642     print "\nRecords exported: $i " . pretty_time() . "\n" if ( $verbose_logging );
643
644     print {$fh} $marcxml_close;
645
646     close $fh;
647     return $records_deleted;
648 }
649
650 sub get_corrected_marc_record {
651     my ( $record_type, $record_number ) = @_;
652
653     my $marc = get_raw_marc_record( $record_type, $record_number );
654
655     if ( defined $marc ) {
656         fix_leader($marc);
657         if ( $record_type eq 'authority' ) {
658             fix_authority_id( $marc, $record_number );
659         }
660         elsif ( $record_type eq 'biblio' ) {
661
662             my @filters;
663             push @filters, 'EmbedItemsAvailability';
664             push @filters, 'EmbedSeeFromHeadings'
665                 if C4::Context->preference('IncludeSeeFromInSearches');
666
667             my $normalizer = Koha::RecordProcessor->new( { filters => \@filters } );
668             $marc = $normalizer->process($marc);
669         }
670         if ( C4::Context->preference("marcflavour") eq "UNIMARC" ) {
671             fix_unimarc_100($marc);
672         }
673     }
674
675     return $marc;
676 }
677
678 sub get_raw_marc_record {
679     my ($record_type, $record_number) = @_;
680
681     my $marc;
682     if ($record_type eq 'biblio') {
683         eval {
684             my $biblio = Koha::Biblios->find($record_number);
685             $marc = $biblio->metadata->record({ embed_items => 1 });
686         };
687         if ($@ || !$marc) {
688             # here we do warn since catching an exception
689             # means that the bib was found but failed
690             # to be parsed
691             warn "error retrieving biblio $record_number";
692             return;
693         }
694     } else {
695         eval { $marc = GetAuthority($record_number); };
696         if ($@) {
697             warn "error retrieving authority $record_number";
698             return;
699         }
700     }
701     return $marc;
702 }
703
704 sub fix_leader {
705     # FIXME - this routine is suspect
706     # It blanks the Leader/00-05 and Leader/12-16 to
707     # force them to be recalculated correct when
708     # the $marc->as_usmarc() or $marc->as_xml() is called.
709     # But why is this necessary?  It would be a serious bug
710     # in MARC::Record (definitely) and MARC::File::XML (arguably)
711     # if they are emitting incorrect leader values.
712     my $marc = shift;
713
714     my $leader = $marc->leader;
715     substr($leader,  0, 5) = '     ';
716     substr($leader, 10, 7) = '22     ';
717     $marc->leader(substr($leader, 0, 24));
718 }
719
720 sub fix_biblio_ids {
721     # FIXME - it is essential to ensure that the biblionumber is present,
722     #         otherwise, Zebra will choke on the record.  However, this
723     #         logic belongs in the relevant C4::Biblio APIs.
724     my $marc = shift;
725     my $biblionumber = shift;
726     my $biblioitemnumber;
727     if (@_) {
728         $biblioitemnumber = shift;
729     } else {
730         my $sth = $dbh->prepare(
731             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
732         $sth->execute($biblionumber);
733         ($biblioitemnumber) = $sth->fetchrow_array;
734         $sth->finish;
735         unless ($biblioitemnumber) {
736             warn "failed to get biblioitemnumber for biblio $biblionumber";
737             return 0;
738         }
739     }
740
741     # FIXME - this is cheating on two levels
742     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
743     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
744     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
745     #
746     # On the other hand, this better for now than what rebuild_zebra.pl used to
747     # do, which was duplicate the code for inserting the biblionumber
748     # and biblioitemnumber
749     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
750
751     return 1;
752 }
753
754 sub fix_authority_id {
755     # FIXME - as with fix_biblio_ids, the authid must be present
756     #         for Zebra's sake.  However, this really belongs
757     #         in C4::AuthoritiesMarc.
758     my ($marc, $authid) = @_;
759     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
760         $marc->delete_field($marc->field('001'));
761         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
762     }
763 }
764
765 sub fix_unimarc_100 {
766     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
767     my $marc = shift;
768
769     my $string;
770     my $length_100a = length($marc->subfield( 100, "a" ));
771     if (  $length_100a and $length_100a == 36 ) {
772         $string = $marc->subfield( 100, "a" );
773         my $f100 = $marc->field(100);
774         $marc->delete_field($f100);
775     }
776     else {
777         $string = POSIX::strftime( "%Y%m%d", localtime );
778         $string =~ s/\-//g;
779         $string = sprintf( "%-*s", 35, $string );
780     }
781     substr( $string, 22, 6, "frey50" );
782     $length_100a = length($marc->subfield( 100, "a" ));
783     unless ( $length_100a and $length_100a == 36 ) {
784         $marc->delete_field($marc->field(100));
785         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
786     }
787 }
788
789 sub do_indexing {
790     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
791
792     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
793     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
794     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
795     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
796
797     $noshadow //= '';
798
799     if ($noshadow or $reset_index) {
800         $noshadow = '-n';
801     }
802
803     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
804     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
805     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
806 }
807
808 sub _flock {
809     # test if flock is present; if so, use it; if not, return true
810     # op refers to the official flock operations including LOCK_EX,
811     # LOCK_UN, etc.
812     # combining LOCK_EX with LOCK_NB returns immediately
813     my ($fh, $op)= @_;
814     if( !defined($use_flock) ) {
815         #check if flock is present; if not, you will have a fatal error
816         my $lock_acquired = eval { flock($fh, $op) };
817         # assuming that $fh and $op are fine(..), an undef $lock_acquired
818         # means no flock
819         $use_flock = defined($lock_acquired) ? 1 : 0;
820         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
821         return 1 if !$use_flock;
822         return $lock_acquired;
823     } else {
824         return 1 if !$use_flock;
825         return flock($fh, $op);
826     }
827 }
828
829 sub _create_lockfile { #returns undef on failure
830     my $dir= shift;
831     unless (-d $dir) {
832         eval { mkpath($dir, 0, oct(755)) };
833         return if $@;
834     }
835     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
836     return ( $fh, $dir.'/'.LOCK_FILENAME );
837 }
838
839 sub pretty_time {
840     use integer;
841     my $now = time;
842     my $elapsed = $now - $start_time;
843     local $_ = $elapsed;
844     my ( $h, $m, $s );
845     $s = $_ % 60;
846     $_ /= 60;
847     $m = $_ % 60;
848     $_ /= 60;
849     $h = $_ % 24;
850
851     my $now_pretty = POSIX::strftime("%H:%M:%S",localtime($now));
852     my $elapsed_pretty = sprintf "[%02d:%02d:%02d]",$h,$m,$s;
853
854     return "$now_pretty $elapsed_pretty";
855 }
856
857 sub print_usage {
858     print <<_USAGE_;
859 $0: reindex MARC bibs and/or authorities in Zebra.
860
861 Use this batch job to reindex all biblio or authority
862 records in your Koha database.
863
864 Parameters:
865
866     -b                      index bibliographic records
867
868     -a                      index authority records
869
870     -daemon                 Run in daemon mode.  The program will loop checking
871                             for entries on the zebraqueue table, processing
872                             them incrementally if present, and then sleep
873                             for a few seconds before repeating the process
874                             Checking the zebraqueue table is done with a cheap
875                             SQL query.  This allows for near realtime update of
876                             the zebra search index with low system overhead.
877                             Use -sleep to control the checking interval.
878
879                             Daemon mode implies -z, -a, -b.  The program will
880                             refuse to start if options are present that do not
881                             make sense while running as an incremental update
882                             daemon (e.g. -r or -offset).
883
884     -sleep 10               Seconds to sleep between checks of the zebraqueue
885                             table in daemon mode.  The default is 5 seconds.
886
887     -z                      select only updated and deleted
888                             records marked in the zebraqueue
889                             table.  Cannot be used with -r
890                             or -s.
891
892     --skip-deletes          only select record updates, not record
893                             deletions, to avoid potential excessive
894                             I/O when zebraidx processes deletions.
895                             If this option is used for normal indexing,
896                             a cronjob should be set up to run
897                             rebuild_zebra.pl -z without --skip-deletes
898                             during off hours.
899                             Only effective with -z.
900
901     -r                      clear Zebra index before
902                             adding records to index. Implies -w.
903
904     -d                      Temporary directory for indexing.
905                             If not specified, one is automatically
906                             created.  The export directory
907                             is automatically deleted unless
908                             you supply the -k switch.
909
910     -k                      Do not delete export directory.
911
912     -s                      Skip export.  Used if you have
913                             already exported the records
914                             in a previous run.
915
916     -nosanitize             export biblio/authority records directly from DB marcxml
917                             field without sanitizing records. It speed up
918                             dump process but could fail if DB contains badly
919                             encoded records. Works only with -x,
920
921     -w                      skip shadow indexing for this batch
922
923     -y                      do NOT clear zebraqueue after indexing; normally,
924                             after doing batch indexing, zebraqueue should be
925                             marked done for the affected record type(s) so that
926                             a running zebraqueue_daemon doesn't try to reindex
927                             the same records - specify -y to override this.
928                             Cannot be used with -z.
929
930     -v                      increase the amount of logging.  Normally only
931                             warnings and errors from the indexing are shown.
932                             Use log level 2 (-v -v) to include all Zebra logs.
933
934     --length   1234         how many biblio you want to export
935     --offset 1243           offset you want to start to
936                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
937                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
938     --where                 let you specify a WHERE query, like itemtype='BOOK'
939                             or something like that
940
941     --run-as-root           explicitily allow script to run as 'root' user
942
943     --wait-for-lock         when not running in daemon mode, the default
944                             behavior is to abort a rebuild if the rebuild
945                             lock is busy.  This option will cause the program
946                             to wait for the lock to free and then continue
947                             processing the rebuild request,
948
949     --table                 specify a table (can be items, biblioitems, biblio, biblio_metadata) to retrieve biblionumber to index.
950                             biblioitems is the default value.
951
952     --help or -h            show this message.
953 _USAGE_
954 }