Bug 16830: (followup) Remove weird character from warning in rebuild_zebra.pl
[koha-ffzg.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 # This file is part of Koha.
4 #
5 # Koha is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # Koha is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with Koha; if not, see <http://www.gnu.org/licenses>.
17
18 use Modern::Perl;
19
20 use C4::Context;
21 use Getopt::Long;
22 use Fcntl qw(:flock);
23 use File::Temp qw/ tempdir /;
24 use File::Path;
25 use C4::Biblio;
26 use C4::AuthoritiesMarc;
27 use C4::Items;
28 use Koha::RecordProcessor;
29 use XML::LibXML;
30
31 use constant LOCK_FILENAME => 'rebuild..LCK';
32
33 # script that checks zebradir structure & create directories & mandatory files if needed
34 #
35 #
36
37 $|=1; # flushes output
38 # If the cron job starts us in an unreadable dir, we will break without
39 # this.
40 chdir $ENV{HOME} if (!(-r '.'));
41 my $daemon_mode;
42 my $daemon_sleep = 5;
43 my $directory;
44 my $nosanitize;
45 my $skip_export;
46 my $keep_export;
47 my $skip_index;
48 my $reset;
49 my $biblios;
50 my $authorities;
51 my $as_usmarc;
52 my $as_xml;
53 my $noshadow;
54 my $want_help;
55 my $process_zebraqueue;
56 my $process_zebraqueue_skip_deletes;
57 my $do_not_clear_zebraqueue;
58 my $length;
59 my $where;
60 my $offset;
61 my $run_as_root;
62 my $run_user = (getpwuid($<))[0];
63 my $wait_for_lock = 0;
64 my $use_flock;
65 my $table = 'biblioitems';
66
67 my $verbose_logging = 0;
68 my $zebraidx_log_opt = " -v none,fatal,warn ";
69 my $result = GetOptions(
70     'daemon'        => \$daemon_mode,
71     'sleep:i'       => \$daemon_sleep,
72     'd:s'           => \$directory,
73     'r|reset'       => \$reset,
74     's'             => \$skip_export,
75     'k'             => \$keep_export,
76     'I|skip-index'  => \$skip_index,
77     'nosanitize'    => \$nosanitize,
78     'b'             => \$biblios,
79     'noxml'         => \$as_usmarc,
80     'w'             => \$noshadow,
81     'a'             => \$authorities,
82     'h|help'        => \$want_help,
83     'x'             => \$as_xml,
84     'y'             => \$do_not_clear_zebraqueue,
85     'z'             => \$process_zebraqueue,
86     'skip-deletes'  => \$process_zebraqueue_skip_deletes,
87     'where:s'       => \$where,
88     'length:i'      => \$length,
89     'offset:i'      => \$offset,
90     'v+'            => \$verbose_logging,
91     'run-as-root'   => \$run_as_root,
92     'wait-for-lock' => \$wait_for_lock,
93     't|table:s'     => \$table,
94 );
95
96 if (not $result or $want_help) {
97     print_usage();
98     exit 0;
99 }
100
101 if ( $as_xml ) {
102     warn "Warning: You passed -x which is already the default and is now deprecated\n";
103     undef $as_xml; # Should not be used later
104 }
105
106 if( not defined $run_as_root and $run_user eq 'root') {
107     my $msg = "Warning: You are running this script as the user 'root'.\n";
108     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
109     $msg   .= "Please do '$0 --help' to see usage.\n";
110     die $msg;
111 }
112
113 if ( $as_usmarc and $nosanitize ) {
114     my $msg = "Cannot specify both -noxml and -nosanitize\n";
115     $msg   .= "Please do '$0 --help' to see usage.\n";
116     die $msg;
117 }
118
119 if ($process_zebraqueue and ($skip_export or $reset)) {
120     my $msg = "Cannot specify -r or -s if -z is specified\n";
121     $msg   .= "Please do '$0 --help' to see usage.\n";
122     die $msg;
123 }
124
125 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
126     my $msg = "Cannot specify both -y and -z\n";
127     $msg   .= "Please do '$0 --help' to see usage.\n";
128     die $msg;
129 }
130
131 if ($daemon_mode) {
132     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
133     if ($skip_export or $keep_export or $skip_index or
134           $where or $length or $offset) {
135         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
136         $msg   .= "Please do '$0 --help' to see usage.\n";
137         die $msg;
138     }
139     $authorities = 1;
140     $biblios = 1;
141     $process_zebraqueue = 1;
142 }
143
144 if (not $biblios and not $authorities) {
145     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
146     $msg   .= "Please do '$0 --help' to see usage.\n";
147     die $msg;
148 }
149
150 our @tables_allowed_for_select = ( 'biblioitems', 'items', 'biblio' );
151 unless ( grep { /^$table$/ } @tables_allowed_for_select ) {
152     die "Cannot specify -t|--table with value '$table'. Only "
153       . ( join ', ', @tables_allowed_for_select )
154       . " are allowed.";
155 }
156
157
158 #  -v is for verbose, which seems backwards here because of how logging is set
159 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
160 if ($verbose_logging >= 2) {
161     $zebraidx_log_opt = '-v none,fatal,warn,all';
162 }
163
164 my $use_tempdir = 0;
165 unless ($directory) {
166     $use_tempdir = 1;
167     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
168 }
169
170
171 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
172 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
173
174 my $kohadir = C4::Context->config('intranetdir');
175 my $bib_index_mode  = C4::Context->config('zebra_bib_index_mode')  // 'dom';
176 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') // 'dom';
177
178 my $dbh = C4::Context->dbh;
179 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
180 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
181
182 my $marcxml_open = q{<?xml version="1.0" encoding="UTF-8"?>
183 <collection xmlns="http://www.loc.gov/MARC21/slim">
184 };
185
186 my $marcxml_close = q{
187 </collection>
188 };
189
190 # Protect again simultaneous update of the zebra index by using a lock file.
191 # Create our own lock directory if its missing.  This shouild be created
192 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
193 # does not exist and cannot be created, we fall back on /tmp - which will
194 # always work.
195
196 my ($lockfile, $LockFH);
197 foreach (
198     C4::Context->config("zebra_lockdir"),
199     '/var/lock/zebra_' . C4::Context->config('database'),
200     '/tmp/zebra_' . C4::Context->config('database')
201 ) {
202     #we try three possibilities (we really want to lock :)
203     next if !$_;
204     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
205     last if defined $LockFH;
206 }
207 if( !defined $LockFH ) {
208     print "WARNING: Could not create lock file $lockfile: $!\n";
209     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
210     print "Verify file permissions for it too.\n";
211     $use_flock = 0; # we disable file locking now and will continue
212                     # without it
213                     # note that this mimics old behavior (before we used
214                     # the lockfile)
215 };
216
217 if ( $verbose_logging ) {
218     print "Zebra configuration information\n";
219     print "================================\n";
220     print "Zebra biblio directory      = $biblioserverdir\n";
221     print "Zebra authorities directory = $authorityserverdir\n";
222     print "Koha directory              = $kohadir\n";
223     print "Lockfile                    = $lockfile\n" if $lockfile;
224     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
225     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
226     print "================================\n";
227 }
228
229 my $tester = XML::LibXML->new();
230
231 # The main work is done here by calling do_one_pass().  We have added locking
232 # avoid race conditions between full rebuilds and incremental updates either from
233 # daemon mode or periodic invocation from cron.  The race can lead to an updated
234 # record being overwritten by a rebuild if the update is applied after the export
235 # by the rebuild and before the rebuild finishes (more likely to affect large
236 # catalogs).
237 #
238 # We have chosen to exit immediately by default if we cannot obtain the lock
239 # to prevent the potential for a infinite backlog from cron invocations, but an
240 # option (wait-for-lock) is provided to let the program wait for the lock.
241 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
242 if ($daemon_mode) {
243     while (1) {
244         # For incremental updates, skip the update if the updates are locked
245         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
246             do_one_pass() if ( zebraqueue_not_empty() );
247             _flock($LockFH, LOCK_UN);
248         }
249         sleep $daemon_sleep;
250     }
251 } else {
252     # all one-off invocations
253     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
254     if (_flock($LockFH, $lock_mode)) {
255         do_one_pass();
256         _flock($LockFH, LOCK_UN);
257     } else {
258         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
259     }
260 }
261
262
263 if ( $verbose_logging ) {
264     print "====================\n";
265     print "CLEANING\n";
266     print "====================\n";
267 }
268 if ($keep_export) {
269     print "NOTHING cleaned : the export $directory has been kept.\n";
270     print "You can re-run this script with the -s ";
271     if ($use_tempdir) {
272         print " and -d $directory parameters";
273     } else {
274         print "parameter";
275     }
276     print "\n";
277     print "if you just want to rebuild zebra after changing the record.abs\n";
278     print "or another zebra config file\n";
279 } else {
280     unless ($use_tempdir) {
281         # if we're using a temporary directory
282         # created by File::Temp, it will be removed
283         # automatically.
284         rmtree($directory, 0, 1);
285         print "directory $directory deleted\n";
286     }
287 }
288
289 sub do_one_pass {
290     if ($authorities) {
291         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
292     } else {
293         print "skipping authorities\n" if ( $verbose_logging );
294     }
295
296     if ($biblios) {
297         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
298     } else {
299         print "skipping biblios\n" if ( $verbose_logging );
300     }
301 }
302
303 # Check the zebra update queue and return true if there are records to process
304 # This routine will handle each of -ab, -a, or -b, but in practice we force
305 # -ab when in daemon mode.
306 sub zebraqueue_not_empty {
307     my $where_str;
308
309     if ($authorities && $biblios) {
310         $where_str = 'done = 0;';
311     } elsif ($biblios) {
312         $where_str = 'server = "biblioserver" AND done = 0;';
313     } else {
314         $where_str = 'server = "authorityserver" AND done = 0;';
315     }
316     my $query =
317         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
318
319     $query->execute;
320     my $count = $query->fetchrow_arrayref->[0];
321     print "queued records: $count\n" if $verbose_logging > 0;
322     return $count > 0;
323 }
324
325 # This checks to see if the zebra directories exist under the provided path.
326 # If they don't, then zebra is likely to spit the dummy. This returns true
327 # if the directories had to be created, false otherwise.
328 sub check_zebra_dirs {
329     my ($base) = shift() . '/';
330     my $needed_repairing = 0;
331     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
332     foreach my $dir (@dirs) {
333         my $bdir = $base . $dir;
334         if (! -d $bdir) {
335             $needed_repairing = 1;
336             mkdir $bdir || die "Unable to create '$bdir': $!\n";
337             print "$0: needed to create '$bdir'\n";
338         }
339     }
340     return $needed_repairing;
341 }   # ----------  end of subroutine check_zebra_dirs  ----------
342
343 sub index_records {
344     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_usmarc, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
345
346     my $num_records_exported = 0;
347     my $records_deleted = {};
348     my $need_reset = check_zebra_dirs($server_dir);
349     if ($need_reset) {
350         print "$0: found broken zebra server directories: forcing a rebuild\n";
351         $reset = 1;
352     }
353     if ($skip_export && $verbose_logging) {
354         print "====================\n";
355         print "SKIPPING $record_type export\n";
356         print "====================\n";
357     } else {
358         if ( $verbose_logging ) {
359             print "====================\n";
360             print "exporting $record_type\n";
361             print "====================\n";
362         }
363         mkdir "$directory" unless (-d $directory);
364         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
365         if ($process_zebraqueue) {
366             my $entries;
367
368             unless ( $process_zebraqueue_skip_deletes ) {
369                 $entries = select_zebraqueue_records($record_type, 'deleted');
370                 mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
371                 $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_usmarc);
372                 mark_zebraqueue_batch_done($entries);
373             }
374
375             $entries = select_zebraqueue_records($record_type, 'updated');
376             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
377             $num_records_exported = export_marc_records_from_list($record_type,$entries, "$directory/upd_$record_type", $as_usmarc, $records_deleted);
378             mark_zebraqueue_batch_done($entries);
379
380         } else {
381             my $sth = select_all_records($record_type);
382             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_usmarc, $nosanitize);
383             unless ($do_not_clear_zebraqueue) {
384                 mark_all_zebraqueue_done($record_type);
385             }
386         }
387     }
388
389     #
390     # and reindexing everything
391     #
392     if ($skip_index) {
393         if ($verbose_logging) {
394             print "====================\n";
395             print "SKIPPING $record_type indexing\n";
396             print "====================\n";
397         }
398     } else {
399         if ( $verbose_logging ) {
400             print "====================\n";
401             print "REINDEXING zebra\n";
402             print "====================\n";
403         }
404         my $record_fmt = ($as_usmarc) ? 'iso2709' : 'marcxml' ;
405         if ($process_zebraqueue) {
406             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
407                 if %$records_deleted;
408             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
409                 if $num_records_exported;
410         } else {
411             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
412                 if ($num_records_exported or $skip_export);
413         }
414     }
415 }
416
417
418 sub select_zebraqueue_records {
419     my ($record_type, $update_type) = @_;
420
421     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
422     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
423
424     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
425                              FROM zebraqueue
426                              WHERE server = ?
427                              AND   operation = ?
428                              AND   done = 0
429                              ORDER BY id DESC");
430     $sth->execute($server, $op);
431     my $entries = $sth->fetchall_arrayref({});
432 }
433
434 sub mark_all_zebraqueue_done {
435     my ($record_type) = @_;
436
437     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
438
439     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
440                              WHERE server = ?
441                              AND done = 0");
442     $sth->execute($server);
443 }
444
445 sub mark_zebraqueue_batch_done {
446     my ($entries) = @_;
447
448     $dbh->{AutoCommit} = 0;
449     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
450     $dbh->commit();
451     foreach my $id (map { $_->{id} } @$entries) {
452         $sth->execute($id);
453     }
454     $dbh->{AutoCommit} = 1;
455 }
456
457 sub select_all_records {
458     my $record_type = shift;
459     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
460 }
461
462 sub select_all_authorities {
463     my $strsth=qq{SELECT authid FROM auth_header};
464     $strsth.=qq{ WHERE $where } if ($where);
465     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
466     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
467     my $sth = $dbh->prepare($strsth);
468     $sth->execute();
469     return $sth;
470 }
471
472 sub select_all_biblios {
473     $table = 'biblioitems'
474       unless grep { /^$table$/ } @tables_allowed_for_select;
475     my $strsth = qq{ SELECT biblionumber FROM $table };
476     $strsth.=qq{ WHERE $where } if ($where);
477     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
478     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
479     my $sth = $dbh->prepare($strsth);
480     $sth->execute();
481     return $sth;
482 }
483
484 sub export_marc_records_from_sth {
485     my ($record_type, $sth, $directory, $as_usmarc, $nosanitize) = @_;
486
487     my $num_exported = 0;
488     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
489
490     print {$fh} $marcxml_open
491         unless $as_usmarc;
492
493     my $i = 0;
494     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
495     while (my ($record_number) = $sth->fetchrow_array) {
496         print "." if ( $verbose_logging );
497         print "\r$i" unless ($i++ %100 or !$verbose_logging);
498         if ( $nosanitize ) {
499             my $marcxml = $record_type eq 'biblio'
500                           ? GetXmlBiblio( $record_number )
501                           : GetAuthorityXML( $record_number );
502             if ($record_type eq 'biblio'){
503                 my @items = GetItemsInfo($record_number);
504                 if (@items){
505                     my $record = MARC::Record->new;
506                     $record->encoding('UTF-8');
507                     my @itemsrecord;
508                     foreach my $item (@items){
509                         my $record = Item2Marc($item, $record_number);
510                         push @itemsrecord, $record->field($itemtag);
511                     }
512                     $record->insert_fields_ordered(@itemsrecord);
513                     my $itemsxml = $record->as_xml_record();
514                     $marcxml =
515                         substr($marcxml, 0, length($marcxml)-10) .
516                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
517                 }
518             }
519             # extra test to ensure that result is valid XML; otherwise
520             # Zebra won't parse it in DOM mode
521             eval {
522                 my $doc = $tester->parse_string($marcxml);
523             };
524             if ($@) {
525                 warn "Error exporting record $record_number ($record_type): $@\n";
526                 next;
527             }
528             if ( $marcxml ) {
529                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
530                 print {$fh} $marcxml;
531                 $num_exported++;
532             }
533             next;
534         }
535         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
536         if (defined $marc) {
537             eval {
538                 my $rec;
539                 if ($as_usmarc) {
540                     $rec = $marc->as_usmarc();
541                 } else {
542                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
543                     eval {
544                         my $doc = $tester->parse_string($rec);
545                     };
546                     if ($@) {
547                         die "invalid XML: $@";
548                     }
549                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
550                 }
551                 print {$fh} $rec;
552                 $num_exported++;
553             };
554             if ($@) {
555                 warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
556                 warn "... specific error is $@" if $verbose_logging;
557             }
558         }
559     }
560     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
561     print {$fh} $marcxml_close
562         unless $as_usmarc;
563
564     close $fh;
565     return $num_exported;
566 }
567
568 sub export_marc_records_from_list {
569     my ($record_type, $entries, $directory, $as_usmarc, $records_deleted) = @_;
570
571     my $num_exported = 0;
572     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
573
574     print {$fh} $marcxml_open
575         unless $as_usmarc;
576
577     my $i = 0;
578
579     # Skip any deleted records. We check for this anyway, but this reduces error spam
580     my %found = %$records_deleted;
581     foreach my $record_number ( map { $_->{biblio_auth_number} }
582                                 grep { !$found{ $_->{biblio_auth_number} }++ }
583                                 @$entries ) {
584         print "." if ( $verbose_logging );
585         print "\r$i" unless ($i++ %100 or !$verbose_logging);
586         my ($marc) = get_corrected_marc_record($record_type, $record_number, $as_usmarc);
587         if (defined $marc) {
588             eval {
589                 my $rec;
590                 if ( $as_usmarc ) {
591                     $rec = $marc->as_usmarc();
592                 } else {
593                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
594                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
595                 }
596                 print {$fh} $rec;
597                 $num_exported++;
598             };
599             if ($@) {
600               warn "Error exporting record $record_number ($record_type) ".($as_usmarc ? "not XML" : "XML");
601             }
602         }
603     }
604     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
605
606     print {$fh} $marcxml_close
607         unless $as_usmarc;
608
609     close $fh;
610     return $num_exported;
611 }
612
613 sub generate_deleted_marc_records {
614
615     my ($record_type, $entries, $directory, $as_usmarc) = @_;
616
617     my $records_deleted = {};
618     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
619
620     print {$fh} $marcxml_open
621         unless $as_usmarc;
622
623     my $i = 0;
624     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
625         print "\r$i" unless ($i++ %100 or !$verbose_logging);
626         print "." if ( $verbose_logging );
627
628         my $marc = MARC::Record->new();
629         if ($record_type eq 'biblio') {
630             fix_biblio_ids($marc, $record_number, $record_number);
631         } else {
632             fix_authority_id($marc, $record_number);
633         }
634         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
635             fix_unimarc_100($marc);
636         }
637
638         my $rec;
639         if ( $as_usmarc ) {
640             $rec = $marc->as_usmarc();
641         } else {
642             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
643             # Remove the record's XML header
644             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
645         }
646         print {$fh} $rec;
647
648         $records_deleted->{$record_number} = 1;
649     }
650     print "\nRecords exported: $i\n" if ( $verbose_logging );
651
652     print {$fh} $marcxml_close
653         unless $as_usmarc;
654
655     close $fh;
656     return $records_deleted;
657 }
658
659 sub get_corrected_marc_record {
660     my ($record_type, $record_number, $as_usmarc) = @_;
661
662     my $marc = get_raw_marc_record($record_type, $record_number, $as_usmarc);
663
664     if (defined $marc) {
665         fix_leader($marc);
666         if ($record_type eq 'authority') {
667             fix_authority_id($marc, $record_number);
668         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
669             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
670             $marc = $normalizer->process($marc);
671         }
672         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
673             fix_unimarc_100($marc);
674         }
675     }
676
677     return $marc;
678 }
679
680 sub get_raw_marc_record {
681     my ($record_type, $record_number, $as_usmarc) = @_;
682
683     my $marc;
684     if ($record_type eq 'biblio') {
685         if ($as_usmarc) {
686             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
687             $fetch_sth->execute($record_number);
688             if (my ($blob) = $fetch_sth->fetchrow_array) {
689                 $marc = MARC::Record->new_from_usmarc($blob);
690                 unless ($marc) {
691                     warn "error creating MARC::Record from $blob";
692                 }
693             }
694             # failure to find a bib is not a problem -
695             # a delete could have been done before
696             # trying to process a record update
697
698             $fetch_sth->finish();
699             return unless $marc;
700         } else {
701             eval { $marc = GetMarcBiblio($record_number, 1); };
702             if ($@ || !$marc) {
703                 # here we do warn since catching an exception
704                 # means that the bib was found but failed
705                 # to be parsed
706                 warn "error retrieving biblio $record_number";
707                 return;
708             }
709         }
710     } else {
711         eval { $marc = GetAuthority($record_number); };
712         if ($@) {
713             warn "error retrieving authority $record_number";
714             return;
715         }
716     }
717     return $marc;
718 }
719
720 sub fix_leader {
721     # FIXME - this routine is suspect
722     # It blanks the Leader/00-05 and Leader/12-16 to
723     # force them to be recalculated correct when
724     # the $marc->as_usmarc() or $marc->as_xml() is called.
725     # But why is this necessary?  It would be a serious bug
726     # in MARC::Record (definitely) and MARC::File::XML (arguably)
727     # if they are emitting incorrect leader values.
728     my $marc = shift;
729
730     my $leader = $marc->leader;
731     substr($leader,  0, 5) = '     ';
732     substr($leader, 10, 7) = '22     ';
733     $marc->leader(substr($leader, 0, 24));
734 }
735
736 sub fix_biblio_ids {
737     # FIXME - it is essential to ensure that the biblionumber is present,
738     #         otherwise, Zebra will choke on the record.  However, this
739     #         logic belongs in the relevant C4::Biblio APIs.
740     my $marc = shift;
741     my $biblionumber = shift;
742     my $biblioitemnumber;
743     if (@_) {
744         $biblioitemnumber = shift;
745     } else {
746         my $sth = $dbh->prepare(
747             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
748         $sth->execute($biblionumber);
749         ($biblioitemnumber) = $sth->fetchrow_array;
750         $sth->finish;
751         unless ($biblioitemnumber) {
752             warn "failed to get biblioitemnumber for biblio $biblionumber";
753             return 0;
754         }
755     }
756
757     # FIXME - this is cheating on two levels
758     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
759     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
760     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
761     #
762     # On the other hand, this better for now than what rebuild_zebra.pl used to
763     # do, which was duplicate the code for inserting the biblionumber
764     # and biblioitemnumber
765     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
766
767     return 1;
768 }
769
770 sub fix_authority_id {
771     # FIXME - as with fix_biblio_ids, the authid must be present
772     #         for Zebra's sake.  However, this really belongs
773     #         in C4::AuthoritiesMarc.
774     my ($marc, $authid) = @_;
775     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
776         $marc->delete_field($marc->field('001'));
777         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
778     }
779 }
780
781 sub fix_unimarc_100 {
782     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
783     my $marc = shift;
784
785     my $string;
786     my $length_100a = length($marc->subfield( 100, "a" ));
787     if (  $length_100a and $length_100a == 36 ) {
788         $string = $marc->subfield( 100, "a" );
789         my $f100 = $marc->field(100);
790         $marc->delete_field($f100);
791     }
792     else {
793         $string = POSIX::strftime( "%Y%m%d", localtime );
794         $string =~ s/\-//g;
795         $string = sprintf( "%-*s", 35, $string );
796     }
797     substr( $string, 22, 6, "frey50" );
798     $length_100a = length($marc->subfield( 100, "a" ));
799     unless ( $length_100a and $length_100a == 36 ) {
800         $marc->delete_field($marc->field(100));
801         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
802     }
803 }
804
805 sub do_indexing {
806     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
807
808     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
809     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
810     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
811     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
812
813     $noshadow //= '';
814
815     if ($noshadow or $reset_index) {
816         $noshadow = '-n';
817     }
818
819     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
820     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
821     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
822 }
823
824 sub _flock {
825     # test if flock is present; if so, use it; if not, return true
826     # op refers to the official flock operations including LOCK_EX,
827     # LOCK_UN, etc.
828     # combining LOCK_EX with LOCK_NB returns immediately
829     my ($fh, $op)= @_;
830     if( !defined($use_flock) ) {
831         #check if flock is present; if not, you will have a fatal error
832         my $lock_acquired = eval { flock($fh, $op) };
833         # assuming that $fh and $op are fine(..), an undef $lock_acquired
834         # means no flock
835         $use_flock = defined($lock_acquired) ? 1 : 0;
836         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
837         return 1 if !$use_flock;
838         return $lock_acquired;
839     } else {
840         return 1 if !$use_flock;
841         return flock($fh, $op);
842     }
843 }
844
845 sub _create_lockfile { #returns undef on failure
846     my $dir= shift;
847     unless (-d $dir) {
848         eval { mkpath($dir, 0, oct(755)) };
849         return if $@;
850     }
851     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
852     return ( $fh, $dir.'/'.LOCK_FILENAME );
853 }
854
855 sub print_usage {
856     print <<_USAGE_;
857 $0: reindex MARC bibs and/or authorities in Zebra.
858
859 Use this batch job to reindex all biblio or authority
860 records in your Koha database.
861
862 Parameters:
863
864     -b                      index bibliographic records
865
866     -a                      index authority records
867
868     -daemon                 Run in daemon mode.  The program will loop checking
869                             for entries on the zebraqueue table, processing
870                             them incrementally if present, and then sleep
871                             for a few seconds before repeating the process
872                             Checking the zebraqueue table is done with a cheap
873                             SQL query.  This allows for near realtime update of
874                             the zebra search index with low system overhead.
875                             Use -sleep to control the checking interval.
876
877                             Daemon mode implies -z, -a, -b.  The program will
878                             refuse to start if options are present that do not
879                             make sense while running as an incremental update
880                             daemon (e.g. -r or -offset).
881
882     -sleep 10               Seconds to sleep between checks of the zebraqueue
883                             table in daemon mode.  The default is 5 seconds.
884
885     -z                      select only updated and deleted
886                             records marked in the zebraqueue
887                             table.  Cannot be used with -r
888                             or -s.
889
890     --skip-deletes          only select record updates, not record
891                             deletions, to avoid potential excessive
892                             I/O when zebraidx processes deletions.
893                             If this option is used for normal indexing,
894                             a cronjob should be set up to run
895                             rebuild_zebra.pl -z without --skip-deletes
896                             during off hours.
897                             Only effective with -z.
898
899     -r                      clear Zebra index before
900                             adding records to index. Implies -w.
901
902     -d                      Temporary directory for indexing.
903                             If not specified, one is automatically
904                             created.  The export directory
905                             is automatically deleted unless
906                             you supply the -k switch.
907
908     -k                      Do not delete export directory.
909
910     -s                      Skip export.  Used if you have
911                             already exported the records
912                             in a previous run.
913
914     -noxml                  index from ISO MARC blob
915                             instead of MARC XML.  This
916                             option is recommended only
917                             for advanced user.
918
919     -nosanitize             export biblio/authority records directly from DB marcxml
920                             field without sanitizing records. It speed up
921                             dump process but could fail if DB contains badly
922                             encoded records. Works only with -x,
923
924     -w                      skip shadow indexing for this batch
925
926     -y                      do NOT clear zebraqueue after indexing; normally,
927                             after doing batch indexing, zebraqueue should be
928                             marked done for the affected record type(s) so that
929                             a running zebraqueue_daemon doesn't try to reindex
930                             the same records - specify -y to override this.
931                             Cannot be used with -z.
932
933     -v                      increase the amount of logging.  Normally only
934                             warnings and errors from the indexing are shown.
935                             Use log level 2 (-v -v) to include all Zebra logs.
936
937     --length   1234         how many biblio you want to export
938     --offset 1243           offset you want to start to
939                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
940                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
941     --where                 let you specify a WHERE query, like itemtype='BOOK'
942                             or something like that
943
944     --run-as-root           explicitily allow script to run as 'root' user
945
946     --wait-for-lock         when not running in daemon mode, the default
947                             behavior is to abort a rebuild if the rebuild
948                             lock is busy.  This option will cause the program
949                             to wait for the lock to free and then continue
950                             processing the rebuild request,
951
952     --table                 specify a table (can be items, biblioitems or biblio) to retrieve biblionumber to index.
953                             biblioitems is the default value.
954
955     --help or -h            show this message.
956 _USAGE_
957 }