18da8026ada7ad4a13db833396d957b4e9866e28
[srvgit] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use Koha::Script;
21 use C4::Context;
22 use Getopt::Long;
23 use Fcntl qw(:flock);
24 use File::Temp qw/ tempdir /;
25 use File::Path;
26 use C4::Biblio;
27 use C4::AuthoritiesMarc;
28 use C4::Items;
29 use Koha::RecordProcessor;
30 use Koha::Caches;
31 use XML::LibXML;
32
33 use constant LOCK_FILENAME => 'rebuild..LCK';
34
35 # script that checks zebradir structure & create directories & mandatory files if needed
36 #
37 #
38
39 $|=1; # flushes output
40 # If the cron job starts us in an unreadable dir, we will break without
41 # this.
42 chdir $ENV{HOME} if (!(-r '.'));
43 my $daemon_mode;
44 my $daemon_sleep = 5;
45 my $directory;
46 my $nosanitize;
47 my $skip_export;
48 my $keep_export;
49 my $skip_index;
50 my $reset;
51 my $biblios;
52 my $authorities;
53 my $as_xml;
54 my $noshadow;
55 my $want_help;
56 my $process_zebraqueue;
57 my $process_zebraqueue_skip_deletes;
58 my $do_not_clear_zebraqueue;
59 my $length;
60 my $where;
61 my $offset;
62 my $run_as_root;
63 my $run_user = (getpwuid($<))[0];
64 my $wait_for_lock = 0;
65 my $use_flock;
66 my $table = 'biblioitems';
67 my $is_memcached = Koha::Caches->get_instance->memcached_cache;
68
69 my $verbose_logging = 0;
70 my $zebraidx_log_opt = " -v none,fatal,warn ";
71 my $result = GetOptions(
72     'daemon'        => \$daemon_mode,
73     'sleep:i'       => \$daemon_sleep,
74     'd:s'           => \$directory,
75     'r|reset'       => \$reset,
76     's'             => \$skip_export,
77     'k'             => \$keep_export,
78     'I|skip-index'  => \$skip_index,
79     'nosanitize'    => \$nosanitize,
80     'b'             => \$biblios,
81     'w'             => \$noshadow,
82     'a'             => \$authorities,
83     'h|help'        => \$want_help,
84     'x'             => \$as_xml,
85     'y'             => \$do_not_clear_zebraqueue,
86     'z'             => \$process_zebraqueue,
87     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
88     'where:s'       => \$where,
89     'length:i'      => \$length,
90     'offset:i'      => \$offset,
91     'v+'            => \$verbose_logging,
92     'run-as-root'   => \$run_as_root,
93     'wait-for-lock' => \$wait_for_lock,
94     't|table:s'     => \$table,
95 );
96
97 if (not $result or $want_help) {
98     print_usage();
99     exit 0;
100 }
101
102 if ( $as_xml ) {
103     warn "Warning: You passed -x which is already the default and is now deprecated\n";
104     undef $as_xml; # Should not be used later
105 }
106
107 if( not defined $run_as_root and $run_user eq 'root') {
108     my $msg = "Warning: You are running this script as the user 'root'.\n";
109     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
110     $msg   .= "Please do '$0 --help' to see usage.\n";
111     die $msg;
112 }
113
114 if ($process_zebraqueue and ($skip_export or $reset)) {
115     my $msg = "Cannot specify -r or -s if -z is specified\n";
116     $msg   .= "Please do '$0 --help' to see usage.\n";
117     die $msg;
118 }
119
120 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
121     my $msg = "Cannot specify both -y and -z\n";
122     $msg   .= "Please do '$0 --help' to see usage.\n";
123     die $msg;
124 }
125
126 if ($daemon_mode) {
127     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
128     if ($skip_export or $keep_export or $skip_index or
129           $where or $length or $offset) {
130         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
131         $msg   .= "Please do '$0 --help' to see usage.\n";
132         die $msg;
133     }
134     unless ($is_memcached) {
135         warn "Warning: script running in daemon mode, without recommended caching system (memcached).\n";
136     }
137     $authorities = 1;
138     $biblios = 1;
139     $process_zebraqueue = 1;
140 }
141
142 if (not $biblios and not $authorities) {
143     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
144     $msg   .= "Please do '$0 --help' to see usage.\n";
145     die $msg;
146 }
147
148 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio', 'biblio_metadata' );
149 unless ( grep { $_ eq $table } @tables_allowed_for_select ) {
150     die "Cannot specify -t|--table with value '$table'. Only "
151       . ( join ', ', @tables_allowed_for_select )
152       . " are allowed.";
153 }
154
155
156 #  -v is for verbose, which seems backwards here because of how logging is set
157 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
158 if ($verbose_logging >= 2) {
159     $zebraidx_log_opt = '-v none,fatal,warn,all';
160 }
161
162 my $use_tempdir = 0;
163 unless ($directory) {
164     $use_tempdir = 1;
165     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
166 }
167
168
169 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
170 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
171
172 my $kohadir = C4::Context->config('intranetdir');
173
174 my ($biblionumbertagfield,$biblionumbertagsubfield) = C4::Biblio::GetMarcFromKohaField( "biblio.biblionumber" );
175 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = C4::Biblio::GetMarcFromKohaField( "biblioitems.biblioitemnumber" );
176
177 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
178 <collection xmlns="http://www.loc.gov/MARC21/slim">
179 };
180
181 my $marcxml_close = q{
182 </collection>
183 };
184
185 # Protect again simultaneous update of the zebra index by using a lock file.
186 # Create our own lock directory if it is missing. This should be created
187 # by koha-zebra-ctl.sh or at system installation. If the desired directory
188 # does not exist and cannot be created, we fall back on /tmp - which will
189 # always work.
190
191 my ($lockfile, $LockFH);
192 foreach (
193     C4::Context->config("zebra_lockdir"),
194     '/var/lock/zebra_' . C4::Context->config('database'),
195     '/tmp/zebra_' . C4::Context->config('database')
196 ) {
197     #we try three possibilities (we really want to lock :)
198     next if !$_;
199     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
200     last if defined $LockFH;
201 }
202 if( !defined $LockFH ) {
203     print "WARNING: Could not create lock file $lockfile: $!\n";
204     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
205     print "Verify file permissions for it too.\n";
206     $use_flock = 0; # we disable file locking now and will continue
207                     # without it
208                     # note that this mimics old behavior (before we used
209                     # the lockfile)
210 };
211
212 my $start_time = time();
213 if ( $verbose_logging ) {
214     my $pretty_time = POSIX::strftime("%H:%M:%S",localtime($start_time));
215     print "Zebra configuration information\n";
216     print "================================\n";
217     print "Zebra biblio directory      = $biblioserverdir\n";
218     print "Zebra authorities directory = $authorityserverdir\n";
219     print "Koha directory              = $kohadir\n";
220     print "Lockfile                    = $lockfile\n" if $lockfile;
221     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
222     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
223     print "================================\n";
224     print "Job started: $pretty_time\n";
225 }
226
227 my $tester = XML::LibXML->new();
228 my $dbh;
229
230 # The main work is done here by calling do_one_pass().  We have added locking
231 # avoid race conditions between full rebuilds and incremental updates either from
232 # daemon mode or periodic invocation from cron.  The race can lead to an updated
233 # record being overwritten by a rebuild if the update is applied after the export
234 # by the rebuild and before the rebuild finishes (more likely to affect large
235 # catalogs).
236 #
237 # We have chosen to exit immediately by default if we cannot obtain the lock
238 # to prevent the potential for a infinite backlog from cron invocations, but an
239 # option (wait-for-lock) is provided to let the program wait for the lock.
240 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
241 if ($daemon_mode) {
242     while (1) {
243         # For incremental updates, skip the update if the updates are locked
244         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
245             eval {
246                 $dbh = C4::Context->dbh;
247                 if( zebraqueue_not_empty() ) {
248                     Koha::Caches->flush_L1_caches() if $is_memcached;
249                     do_one_pass();
250                 }
251             };
252             if ($@ && $verbose_logging) {
253                 warn "Warning : $@\n";
254             }
255             _flock($LockFH, LOCK_UN);
256         }
257         sleep $daemon_sleep;
258     }
259 } else {
260     # all one-off invocations
261     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
262     if (_flock($LockFH, $lock_mode)) {
263         $dbh = C4::Context->dbh;
264         do_one_pass();
265         _flock($LockFH, LOCK_UN);
266     } else {
267         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
268     }
269 }
270
271
272 if ( $verbose_logging ) {
273     print "====================\n";
274     print "Indexing complete: ". pretty_time() . "\n";
275     print "====================\n";
276     print "CLEANING\n";
277     print "====================\n";
278 }
279 if ($keep_export) {
280     print "NOTHING cleaned : the export $directory has been kept.\n";
281     print "You can re-run this script with the -s ";
282     if ($use_tempdir) {
283         print " and -d $directory parameters";
284     } else {
285         print "parameter";
286     }
287     print "\n";
288     print "if you just want to rebuild zebra after changing zebra config files\n";
289 } else {
290     unless ($use_tempdir) {
291         # if we're using a temporary directory
292         # created by File::Temp, it will be removed
293         # automatically.
294         rmtree($directory, 0, 1);
295         print "directory $directory deleted\n";
296     }
297 }
298
299 sub do_one_pass {
300     if ($authorities) {
301         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
302     } else {
303         print "skipping authorities\n" if ( $verbose_logging );
304     }
305
306     if ($biblios) {
307         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
308     } else {
309         print "skipping biblios\n" if ( $verbose_logging );
310     }
311 }
312
313 # Check the zebra update queue and return true if there are records to process
314 # This routine will handle each of -ab, -a, or -b, but in practice we force
315 # -ab when in daemon mode.
316 sub zebraqueue_not_empty {
317     my $where_str;
318
319     if ($authorities && $biblios) {
320         $where_str = 'done = 0;';
321     } elsif ($biblios) {
322         $where_str = 'server = "biblioserver" AND done = 0;';
323     } else {
324         $where_str = 'server = "authorityserver" AND done = 0;';
325     }
326     my $query =
327         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
328
329     $query->execute;
330     my $count = $query->fetchrow_arrayref->[0];
331     print "queued records: $count\n" if $verbose_logging > 0;
332     return $count > 0;
333 }
334
335 # This checks to see if the zebra directories exist under the provided path.
336 # If they don't, then zebra is likely to spit the dummy. This returns true
337 # if the directories had to be created, false otherwise.
338 sub check_zebra_dirs {
339     my ($base) = shift() . '/';
340     my $needed_repairing = 0;
341     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
342     foreach my $dir (@dirs) {
343         my $bdir = $base . $dir;
344         if (! -d $bdir) {
345             $needed_repairing = 1;
346             mkdir $bdir || die "Unable to create '$bdir': $!\n";
347             print "$0: needed to create '$bdir'\n";
348         }
349     }
350     return $needed_repairing;
351 }   # ----------  end of subroutine check_zebra_dirs  ----------
352
353 sub index_records {
354     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
355
356     my $num_records_exported = 0;
357     my $records_deleted = {};
358     my $need_reset = check_zebra_dirs($server_dir);
359     if ($need_reset) {
360         print "$0: found broken zebra server directories: forcing a rebuild\n";
361         $reset = 1;
362     }
363     if ($skip_export && $verbose_logging) {
364         print "====================\n";
365         print "SKIPPING $record_type export\n";
366         print "====================\n";
367     } else {
368         if ( $verbose_logging ) {
369             print "====================\n";
370             print "exporting $record_type " . pretty_time() . "\n";
371             print "====================\n";
372         }
373         mkdir "$directory" unless (-d $directory);
374         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
375         if ($process_zebraqueue) {
376             my $entries;
377
378             unless ( $process_zebraqueue_skip_deletes ) {
379                 $entries = select_zebraqueue_records($record_type, 'deleted');
380                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
381                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type");
382                 mark_zebraqueue_batch_done($entries);
383             }
384
385             $entries = select_zebraqueue_records($record_type, 'updated');
386             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
387             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $records_deleted);
388             mark_zebraqueue_batch_done($entries);
389
390         } else {
391             my $sth = select_all_records($record_type);
392             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $nosanitize);
393             unless ($do_not_clear_zebraqueue) {
394                 mark_all_zebraqueue_done($record_type);
395             }
396         }
397     }
398
399     #
400     # and reindexing everything
401     #
402     if ($skip_index) {
403         if ($verbose_logging) {
404             print "====================\n";
405             print "SKIPPING $record_type indexing\n";
406             print "====================\n";
407         }
408     } else {
409         if ( $verbose_logging ) {
410             print "====================\n";
411             print "REINDEXING zebra " . pretty_time() . "\n";
412             print "====================\n";
413         }
414         my $record_fmt = 'marcxml';
415         if ($process_zebraqueue) {
416             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
417                 if %$records_deleted;
418             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
419                 if $num_records_exported;
420         } else {
421             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
422                 if ($num_records_exported or $skip_export);
423         }
424     }
425 }
426
427
428 sub select_zebraqueue_records {
429     my ($record_type, $update_type) = @_;
430
431     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
432     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
433
434     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
435                              FROM zebraqueue
436                              WHERE server = ?
437                              AND   operation = ?
438                              AND   done = 0
439                              ORDER BY id DESC");
440     $sth->execute($server, $op);
441     my $entries = $sth->fetchall_arrayref({});
442 }
443
444 sub mark_all_zebraqueue_done {
445     my ($record_type) = @_;
446
447     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
448
449     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
450                              WHERE server = ?
451                              AND done = 0");
452     $sth->execute($server);
453 }
454
455 sub mark_zebraqueue_batch_done {
456     my ($entries) = @_;
457
458     $dbh->{AutoCommit} = 0;
459     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
460     $dbh->commit();
461     foreach my $id (map { $_->{id} } @$entries) {
462         $sth->execute($id);
463     }
464     $dbh->{AutoCommit} = 1;
465 }
466
467 sub select_all_records {
468     my $record_type = shift;
469     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
470 }
471
472 sub select_all_authorities {
473     my $strsth=qq{SELECT authid FROM auth_header};
474     $strsth.=qq{ WHERE $where } if ($where);
475     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
476     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
477     my $sth = $dbh->prepare($strsth);
478     $sth->execute();
479     return $sth;
480 }
481
482 sub select_all_biblios {
483     $table = 'biblioitems'
484       unless grep { $_ eq $table } @tables_allowed_for_select;
485     my $strsth = qq{ SELECT DISTINCT biblionumber FROM $table };
486     $strsth.=qq{ WHERE $where } if ($where);
487     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
488     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
489     my $sth = $dbh->prepare($strsth);
490     $sth->execute();
491     return $sth;
492 }
493
494 sub export_marc_records_from_sth {
495     my ($record_type, $sth, $directory, $nosanitize) = @_;
496
497     my $num_exported = 0;
498     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
499
500     print {$fh} $marcxml_open;
501
502     my $i = 0;
503     my ( $itemtag, $itemsubfield ) = C4::Biblio::GetMarcFromKohaField( "items.itemnumber" );
504     while (my ($record_number) = $sth->fetchrow_array) {
505         print "." if ( $verbose_logging );
506         print "\r$i" unless ($i++ %100 or !$verbose_logging);
507         if ( $nosanitize ) {
508             my $marcxml = $record_type eq 'biblio'
509                           ? GetXmlBiblio( $record_number )
510                           : GetAuthorityXML( $record_number );
511             if ($record_type eq 'biblio'){
512                 my @items = GetItemsInfo($record_number);
513                 if (@items){
514                     my $record = MARC::Record->new;
515                     $record->encoding('UTF-8');
516                     my @itemsrecord;
517                     foreach my $item (@items){
518                         my $record = Item2Marc($item, $record_number);
519                         push @itemsrecord, $record->field($itemtag);
520                     }
521                     $record->insert_fields_ordered(@itemsrecord);
522                     my $itemsxml = $record->as_xml_record();
523                     $marcxml =
524                         substr($marcxml, 0, length($marcxml)-10) .
525                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
526                 }
527             }
528             # extra test to ensure that result is valid XML; otherwise
529             # Zebra won't parse it in DOM mode
530             eval {
531                 my $doc = $tester->parse_string($marcxml);
532             };
533             if ($@) {
534                 warn "Error exporting record $record_number ($record_type): $@\n";
535                 next;
536             }
537             if ( $marcxml ) {
538                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
539                 print {$fh} $marcxml;
540                 $num_exported++;
541             }
542             next;
543         }
544         my ($marc) = get_corrected_marc_record($record_type, $record_number);
545         if (defined $marc) {
546             eval {
547                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
548                 eval {
549                     my $doc = $tester->parse_string($rec);
550                 };
551                 if ($@) {
552                     die "invalid XML: $@";
553                 }
554                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
555                 print {$fh} $rec;
556                 $num_exported++;
557             };
558             if ($@) {
559                 warn "Error exporting record $record_number ($record_type) XML";
560                 warn "... specific error is $@" if $verbose_logging;
561             }
562         }
563     }
564     print "\nRecords exported: $num_exported " . pretty_time() . "\n" if ( $verbose_logging );
565     print {$fh} $marcxml_close;
566
567     close $fh;
568     return $num_exported;
569 }
570
571 sub export_marc_records_from_list {
572     my ($record_type, $entries, $directory, $records_deleted) = @_;
573
574     my $num_exported = 0;
575     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
576
577     print {$fh} $marcxml_open;
578
579     my $i = 0;
580
581     # Skip any deleted records. We check for this anyway, but this reduces error spam
582     my %found = %$records_deleted;
583     foreach my $record_number ( map { $_->{biblio_auth_number} }
584                                 grep { !$found{ $_->{biblio_auth_number} }++ }
585                                 @$entries ) {
586         print "." if ( $verbose_logging );
587         print "\r$i" unless ($i++ %100 or !$verbose_logging);
588         my ($marc) = get_corrected_marc_record($record_type, $record_number);
589         if (defined $marc) {
590             eval {
591                 my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
592                 $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
593                 print {$fh} $rec;
594                 $num_exported++;
595             };
596             if ($@) {
597               warn "Error exporting record $record_number ($record_type) XML";
598             }
599         }
600     }
601     print "\nRecords exported: $num_exported " . pretty_time() . "\n" if ( $verbose_logging );
602
603     print {$fh} $marcxml_close;
604
605     close $fh;
606     return $num_exported;
607 }
608
609 sub generate_deleted_marc_records {
610
611     my ($record_type, $entries, $directory) = @_;
612
613     my $records_deleted = {};
614     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
615
616     print {$fh} $marcxml_open;
617
618     my $i = 0;
619     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
620         print "\r$i" unless ($i++ %100 or !$verbose_logging);
621         print "." if ( $verbose_logging );
622
623         my $marc = MARC::Record->new();
624         if ($record_type eq 'biblio') {
625             fix_biblio_ids($marc, $record_number, $record_number);
626         } else {
627             fix_authority_id($marc, $record_number);
628         }
629         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
630             fix_unimarc_100($marc);
631         }
632
633         my $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
634         # Remove the record's XML header
635         $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
636         print {$fh} $rec;
637
638         $records_deleted->{$record_number} = 1;
639     }
640     print "\nRecords exported: $i " . pretty_time() . "\n" if ( $verbose_logging );
641
642     print {$fh} $marcxml_close;
643
644     close $fh;
645     return $records_deleted;
646 }
647
648 sub get_corrected_marc_record {
649     my ( $record_type, $record_number ) = @_;
650
651     my $marc = get_raw_marc_record( $record_type, $record_number );
652
653     if ( defined $marc ) {
654         fix_leader($marc);
655         if ( $record_type eq 'authority' ) {
656             fix_authority_id( $marc, $record_number );
657         }
658         elsif ( $record_type eq 'biblio' ) {
659
660             my @filters;
661             push @filters, 'EmbedItemsAvailability';
662             push @filters, 'EmbedSeeFromHeadings'
663                 if C4::Context->preference('IncludeSeeFromInSearches');
664
665             my $normalizer = Koha::RecordProcessor->new( { filters => \@filters } );
666             $marc = $normalizer->process($marc);
667         }
668         if ( C4::Context->preference("marcflavour") eq "UNIMARC" ) {
669             fix_unimarc_100($marc);
670         }
671     }
672
673     return $marc;
674 }
675
676 sub get_raw_marc_record {
677     my ($record_type, $record_number) = @_;
678
679     my $marc;
680     if ($record_type eq 'biblio') {
681         eval { $marc = C4::Biblio::GetMarcBiblio({ biblionumber => $record_number, embed_items => 1 }); };
682         if ($@ || !$marc) {
683             # here we do warn since catching an exception
684             # means that the bib was found but failed
685             # to be parsed
686             warn "error retrieving biblio $record_number";
687             return;
688         }
689     } else {
690         eval { $marc = GetAuthority($record_number); };
691         if ($@) {
692             warn "error retrieving authority $record_number";
693             return;
694         }
695     }
696     return $marc;
697 }
698
699 sub fix_leader {
700     # FIXME - this routine is suspect
701     # It blanks the Leader/00-05 and Leader/12-16 to
702     # force them to be recalculated correct when
703     # the $marc->as_usmarc() or $marc->as_xml() is called.
704     # But why is this necessary?  It would be a serious bug
705     # in MARC::Record (definitely) and MARC::File::XML (arguably)
706     # if they are emitting incorrect leader values.
707     my $marc = shift;
708
709     my $leader = $marc->leader;
710     substr($leader,  0, 5) = '     ';
711     substr($leader, 10, 7) = '22     ';
712     $marc->leader(substr($leader, 0, 24));
713 }
714
715 sub fix_biblio_ids {
716     # FIXME - it is essential to ensure that the biblionumber is present,
717     #         otherwise, Zebra will choke on the record.  However, this
718     #         logic belongs in the relevant C4::Biblio APIs.
719     my $marc = shift;
720     my $biblionumber = shift;
721     my $biblioitemnumber;
722     if (@_) {
723         $biblioitemnumber = shift;
724     } else {
725         my $sth = $dbh->prepare(
726             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
727         $sth->execute($biblionumber);
728         ($biblioitemnumber) = $sth->fetchrow_array;
729         $sth->finish;
730         unless ($biblioitemnumber) {
731             warn "failed to get biblioitemnumber for biblio $biblionumber";
732             return 0;
733         }
734     }
735
736     # FIXME - this is cheating on two levels
737     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
738     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
739     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
740     #
741     # On the other hand, this better for now than what rebuild_zebra.pl used to
742     # do, which was duplicate the code for inserting the biblionumber
743     # and biblioitemnumber
744     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
745
746     return 1;
747 }
748
749 sub fix_authority_id {
750     # FIXME - as with fix_biblio_ids, the authid must be present
751     #         for Zebra's sake.  However, this really belongs
752     #         in C4::AuthoritiesMarc.
753     my ($marc, $authid) = @_;
754     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
755         $marc->delete_field($marc->field('001'));
756         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
757     }
758 }
759
760 sub fix_unimarc_100 {
761     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
762     my $marc = shift;
763
764     my $string;
765     my $length_100a = length($marc->subfield( 100, "a" ));
766     if (  $length_100a and $length_100a == 36 ) {
767         $string = $marc->subfield( 100, "a" );
768         my $f100 = $marc->field(100);
769         $marc->delete_field($f100);
770     }
771     else {
772         $string = POSIX::strftime( "%Y%m%d", localtime );
773         $string =~ s/\-//g;
774         $string = sprintf( "%-*s", 35, $string );
775     }
776     substr( $string, 22, 6, "frey50" );
777     $length_100a = length($marc->subfield( 100, "a" ));
778     unless ( $length_100a and $length_100a == 36 ) {
779         $marc->delete_field($marc->field(100));
780         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
781     }
782 }
783
784 sub do_indexing {
785     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
786
787     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
788     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
789     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
790     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
791
792     $noshadow //= '';
793
794     if ($noshadow or $reset_index) {
795         $noshadow = '-n';
796     }
797
798     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
799     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
800     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
801 }
802
803 sub _flock {
804     # test if flock is present; if so, use it; if not, return true
805     # op refers to the official flock operations including LOCK_EX,
806     # LOCK_UN, etc.
807     # combining LOCK_EX with LOCK_NB returns immediately
808     my ($fh, $op)= @_;
809     if( !defined($use_flock) ) {
810         #check if flock is present; if not, you will have a fatal error
811         my $lock_acquired = eval { flock($fh, $op) };
812         # assuming that $fh and $op are fine(..), an undef $lock_acquired
813         # means no flock
814         $use_flock = defined($lock_acquired) ? 1 : 0;
815         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
816         return 1 if !$use_flock;
817         return $lock_acquired;
818     } else {
819         return 1 if !$use_flock;
820         return flock($fh, $op);
821     }
822 }
823
824 sub _create_lockfile { #returns undef on failure
825     my $dir= shift;
826     unless (-d $dir) {
827         eval { mkpath($dir, 0, oct(755)) };
828         return if $@;
829     }
830     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
831     return ( $fh, $dir.'/'.LOCK_FILENAME );
832 }
833
834 sub pretty_time {
835     use integer;
836     my $now = time;
837     my $elapsed = $now - $start_time;
838     local $_ = $elapsed;
839     my ( $h, $m, $s );
840     $s = $_ % 60;
841     $_ /= 60;
842     $m = $_ % 60;
843     $_ /= 60;
844     $h = $_ % 24;
845
846     my $now_pretty = POSIX::strftime("%H:%M:%S",localtime($now));
847     my $elapsed_pretty = sprintf "[%02d:%02d:%02d]",$h,$m,$s;
848
849     return "$now_pretty $elapsed_pretty";
850 }
851
852 sub print_usage {
853     print <<_USAGE_;
854 $0: reindex MARC bibs and/or authorities in Zebra.
855
856 Use this batch job to reindex all biblio or authority
857 records in your Koha database.
858
859 Parameters:
860
861     -b                      index bibliographic records
862
863     -a                      index authority records
864
865     -daemon                 Run in daemon mode.  The program will loop checking
866                             for entries on the zebraqueue table, processing
867                             them incrementally if present, and then sleep
868                             for a few seconds before repeating the process
869                             Checking the zebraqueue table is done with a cheap
870                             SQL query.  This allows for near realtime update of
871                             the zebra search index with low system overhead.
872                             Use -sleep to control the checking interval.
873
874                             Daemon mode implies -z, -a, -b.  The program will
875                             refuse to start if options are present that do not
876                             make sense while running as an incremental update
877                             daemon (e.g. -r or -offset).
878
879     -sleep 10               Seconds to sleep between checks of the zebraqueue
880                             table in daemon mode.  The default is 5 seconds.
881
882     -z                      select only updated and deleted
883                             records marked in the zebraqueue
884                             table.  Cannot be used with -r
885                             or -s.
886
887     --skip-deletes          only select record updates, not record
888                             deletions, to avoid potential excessive
889                             I/O when zebraidx processes deletions.
890                             If this option is used for normal indexing,
891                             a cronjob should be set up to run
892                             rebuild_zebra.pl -z without --skip-deletes
893                             during off hours.
894                             Only effective with -z.
895
896     -r                      clear Zebra index before
897                             adding records to index. Implies -w.
898
899     -d                      Temporary directory for indexing.
900                             If not specified, one is automatically
901                             created.  The export directory
902                             is automatically deleted unless
903                             you supply the -k switch.
904
905     -k                      Do not delete export directory.
906
907     -s                      Skip export.  Used if you have
908                             already exported the records
909                             in a previous run.
910
911     -nosanitize             export biblio/authority records directly from DB marcxml
912                             field without sanitizing records. It speed up
913                             dump process but could fail if DB contains badly
914                             encoded records. Works only with -x,
915
916     -w                      skip shadow indexing for this batch
917
918     -y                      do NOT clear zebraqueue after indexing; normally,
919                             after doing batch indexing, zebraqueue should be
920                             marked done for the affected record type(s) so that
921                             a running zebraqueue_daemon doesn't try to reindex
922                             the same records - specify -y to override this.
923                             Cannot be used with -z.
924
925     -v                      increase the amount of logging.  Normally only
926                             warnings and errors from the indexing are shown.
927                             Use log level 2 (-v -v) to include all Zebra logs.
928
929     --length   1234         how many biblio you want to export
930     --offset 1243           offset you want to start to
931                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
932                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
933     --where                 let you specify a WHERE query, like itemtype='BOOK'
934                             or something like that
935
936     --run-as-root           explicitily allow script to run as 'root' user
937
938     --wait-for-lock         when not running in daemon mode, the default
939                             behavior is to abort a rebuild if the rebuild
940                             lock is busy.  This option will cause the program
941                             to wait for the lock to free and then continue
942                             processing the rebuild request,
943
944     --table                 specify a table (can be items, biblioitems, biblio, biblio_metadata) to retrieve biblionumber to index.
945                             biblioitems is the default value.
946
947     --help or -h            show this message.
948 _USAGE_
949 }