Bug 11078: QA Follow-up for missing file permissions on lockfile
[koha-ffzg.git] / misc / migration_tools / rebuild_zebra.pl
1 #!/usr/bin/perl
2
3 use strict;
4 #use warnings; FIXME - Bug 2505
5
6 use C4::Context;
7 use Getopt::Long;
8 use Fcntl qw(:flock);
9 use File::Temp qw/ tempdir /;
10 use File::Path;
11 use C4::Biblio;
12 use C4::AuthoritiesMarc;
13 use C4::Items;
14 use Koha::RecordProcessor;
15 use XML::LibXML;
16
17 use constant LOCK_FILENAME => 'rebuild..LCK';
18
19 # script that checks zebradir structure & create directories & mandatory files if needed
20 #
21 #
22
23 $|=1; # flushes output
24 # If the cron job starts us in an unreadable dir, we will break without
25 # this.
26 chdir $ENV{HOME} if (!(-r '.'));
27 my $daemon_mode;
28 my $daemon_sleep = 5;
29 my $directory;
30 my $nosanitize;
31 my $skip_export;
32 my $keep_export;
33 my $skip_index;
34 my $reset;
35 my $biblios;
36 my $authorities;
37 my $noxml;
38 my $noshadow;
39 my $want_help;
40 my $as_xml;
41 my $process_zebraqueue;
42 my $do_not_clear_zebraqueue;
43 my $length;
44 my $where;
45 my $offset;
46 my $run_as_root;
47 my $run_user = (getpwuid($<))[0];
48 my $wait_for_lock = 0;
49 my $use_flock;
50
51 my $verbose_logging = 0;
52 my $zebraidx_log_opt = " -v none,fatal,warn ";
53 my $result = GetOptions(
54     'daemon'        => \$daemon_mode,
55     'sleep:i'       => \$daemon_sleep,
56     'd:s'           => \$directory,
57     'r|reset'       => \$reset,
58     's'             => \$skip_export,
59     'k'             => \$keep_export,
60     'I|skip-index'  => \$skip_index,
61     'nosanitize'    => \$nosanitize,
62     'b'             => \$biblios,
63     'noxml'         => \$noxml,
64     'w'             => \$noshadow,
65     'a'             => \$authorities,
66     'h|help'        => \$want_help,
67     'x'             => \$as_xml,
68     'y'             => \$do_not_clear_zebraqueue,
69     'z'             => \$process_zebraqueue,
70     'where:s'       => \$where,
71     'length:i'      => \$length,
72     'offset:i'      => \$offset,
73     'v+'            => \$verbose_logging,
74     'run-as-root'   => \$run_as_root,
75     'wait-for-lock' => \$wait_for_lock,
76 );
77
78 if (not $result or $want_help) {
79     print_usage();
80     exit 0;
81 }
82
83 if( not defined $run_as_root and $run_user eq 'root') {
84     my $msg = "Warning: You are running this script as the user 'root'.\n";
85     $msg   .= "If this is intentional you must explicitly specify this using the -run-as-root switch\n";
86     $msg   .= "Please do '$0 --help' to see usage.\n";
87     die $msg;
88 }
89
90 if ( !$as_xml and $nosanitize ) {
91     my $msg = "Cannot specify both -no_xml and -nosanitize\n";
92     $msg   .= "Please do '$0 --help' to see usage.\n";
93     die $msg;
94 }
95
96 if ($process_zebraqueue and ($skip_export or $reset)) {
97     my $msg = "Cannot specify -r or -s if -z is specified\n";
98     $msg   .= "Please do '$0 --help' to see usage.\n";
99     die $msg;
100 }
101
102 if ($process_zebraqueue and $do_not_clear_zebraqueue) {
103     my $msg = "Cannot specify both -y and -z\n";
104     $msg   .= "Please do '$0 --help' to see usage.\n";
105     die $msg;
106 }
107
108 if ($reset) {
109     $noshadow = 1;
110 }
111
112 if ($noshadow) {
113     $noshadow = ' -n ';
114 }
115
116 if ($daemon_mode) {
117     # incompatible flags handled above: help, reset, and do_not_clear_zebraqueue
118     if ($skip_export or $keep_export or $skip_index or
119           $where or $length or $offset) {
120         my $msg = "Cannot specify -s, -k, -I, -where, -length, or -offset with -daemon.\n";
121         $msg   .= "Please do '$0 --help' to see usage.\n";
122         die $msg;
123     }
124     $authorities = 1;
125     $biblios = 1;
126     $process_zebraqueue = 1;
127 }
128
129 if (not $biblios and not $authorities) {
130     my $msg = "Must specify -b or -a to reindex bibs or authorities\n";
131     $msg   .= "Please do '$0 --help' to see usage.\n";
132     die $msg;
133 }
134
135
136 #  -v is for verbose, which seems backwards here because of how logging is set
137 #    on the CLI of zebraidx.  It works this way.  The default is to not log much
138 if ($verbose_logging >= 2) {
139     $zebraidx_log_opt = '-v none,fatal,warn,all';
140 }
141
142 my $use_tempdir = 0;
143 unless ($directory) {
144     $use_tempdir = 1;
145     $directory = tempdir(CLEANUP => ($keep_export ? 0 : 1));
146 }
147
148
149 my $biblioserverdir = C4::Context->zebraconfig('biblioserver')->{directory};
150 my $authorityserverdir = C4::Context->zebraconfig('authorityserver')->{directory};
151
152 my $kohadir = C4::Context->config('intranetdir');
153 my $bib_index_mode = C4::Context->config('zebra_bib_index_mode') || 'grs1';
154 my $auth_index_mode = C4::Context->config('zebra_auth_index_mode') || 'dom';
155
156 my $dbh = C4::Context->dbh;
157 my ($biblionumbertagfield,$biblionumbertagsubfield) = &GetMarcFromKohaField("biblio.biblionumber","");
158 my ($biblioitemnumbertagfield,$biblioitemnumbertagsubfield) = &GetMarcFromKohaField("biblioitems.biblioitemnumber","");
159
160 # Protect again simultaneous update of the zebra index by using a lock file.
161 # Create our own lock directory if its missing.  This shouild be created
162 # by koha-zebra-ctl.sh or at system installation.  If the desired directory
163 # does not exist and cannot be created, we fall back on /tmp - which will
164 # always work.
165
166 my ($lockfile, $LockFH);
167 foreach( C4::Context->config("zebra_lockdir"), "/var/lock/zebra_".C4::Context->config('database'), "/tmp/zebra_".C4::Context->config('database') ) {
168     #we try three possibilities (we really want to lock :)
169     next if !$_;
170     ($LockFH, $lockfile) = _create_lockfile($_.'/rebuild');
171     last if defined $LockFH;
172 }
173 if( !defined $LockFH ) {
174     print "WARNING: Could not create lock file $lockfile: $!\n";
175     print "Please check your koha-conf.xml for ZEBRA_LOCKDIR.\n";
176     print "Verify file permissions for it too.\n";
177     $use_flock=0; #we disable file locking now and will continue without it
178         #note that this mimics old behavior (before we used the lockfile)
179 };
180
181 if ( $verbose_logging ) {
182     print "Zebra configuration information\n";
183     print "================================\n";
184     print "Zebra biblio directory      = $biblioserverdir\n";
185     print "Zebra authorities directory = $authorityserverdir\n";
186     print "Koha directory              = $kohadir\n";
187     print "Lockfile                    = $lockfile\n" if $lockfile;
188     print "BIBLIONUMBER in :     $biblionumbertagfield\$$biblionumbertagsubfield\n";
189     print "BIBLIOITEMNUMBER in : $biblioitemnumbertagfield\$$biblioitemnumbertagsubfield\n";
190     print "================================\n";
191 }
192
193 my $tester = XML::LibXML->new();
194
195 # The main work is done here by calling do_one_pass().  We have added locking
196 # avoid race conditions between Full rebuilds and incremental updates either from
197 # daemon mode or periodic invocation from cron.  The race can lead to an updated
198 # record being overwritten by a rebuild if the update is applied after the export
199 # by the rebuild and before the rebuild finishes (more likely to effect large
200 # catalogs).
201 #
202 # We have chosen to exit immediately by default if we cannot obtain the lock
203 # to prevent the potential for a infinite backlog from cron invocations, but an
204 # option (wait-for-lock) is provided to let the program wait for the lock.
205 # See http://bugs.koha-community.org/bugzilla3/show_bug.cgi?id=11078 for details.
206 if ($daemon_mode) {
207     while (1) {
208         # For incremental updates, skip the update if the updates are locked
209         if (_flock($LockFH, LOCK_EX|LOCK_NB)) {
210             do_one_pass() if ( zebraqueue_not_empty() );
211             _flock($LockFH, LOCK_UN);
212         }
213         sleep $daemon_sleep;
214     }
215 } else {
216     # all one-off invocations
217     my $lock_mode = ($wait_for_lock) ? LOCK_EX : LOCK_EX|LOCK_NB;
218     if (_flock($LockFH, $lock_mode)) {
219         do_one_pass();
220         _flock($LockFH, LOCK_UN);
221     }
222     else {
223         print "Skipping rebuild/update because flock failed on $lockfile: $!\n";
224     }
225 }
226
227
228 if ( $verbose_logging ) {
229     print "====================\n";
230     print "CLEANING\n";
231     print "====================\n";
232 }
233 if ($keep_export) {
234     print "NOTHING cleaned : the export $directory has been kept.\n";
235     print "You can re-run this script with the -s ";
236     if ($use_tempdir) {
237         print " and -d $directory parameters";
238     } else {
239         print "parameter";
240     }
241     print "\n";
242     print "if you just want to rebuild zebra after changing the record.abs\n";
243     print "or another zebra config file\n";
244 } else {
245     unless ($use_tempdir) {
246         # if we're using a temporary directory
247         # created by File::Temp, it will be removed
248         # automatically.
249         rmtree($directory, 0, 1);
250         print "directory $directory deleted\n";
251     }
252 }
253
254 sub do_one_pass {
255     if ($authorities) {
256         index_records('authority', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $authorityserverdir);
257     } else {
258         print "skipping authorities\n" if ( $verbose_logging );
259     }
260
261     if ($biblios) {
262         index_records('biblio', $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $biblioserverdir);
263     } else {
264         print "skipping biblios\n" if ( $verbose_logging );
265     }
266 }
267
268 # Check the zebra update queue and return true if there are records to process
269 # This routine will handle each of -ab, -a, or -b, but in practice we force
270 # -ab when in daemon mode.
271 sub zebraqueue_not_empty {
272     my $where_str;
273
274     if ($authorities && $biblios) {
275         $where_str = 'done = 0;';
276     } elsif ($biblios) {
277         $where_str = 'server = "biblioserver" AND done = 0;';
278     } else {
279         $where_str = 'server = "authorityserver" AND done = 0;';
280     }
281     my $query =
282         $dbh->prepare('SELECT COUNT(*) FROM zebraqueue WHERE ' . $where_str );
283
284     $query->execute;
285     my $count = $query->fetchrow_arrayref->[0];
286     print "queued records: $count\n" if $verbose_logging > 0;
287     return $count > 0;
288 }
289
290 # This checks to see if the zebra directories exist under the provided path.
291 # If they don't, then zebra is likely to spit the dummy. This returns true
292 # if the directories had to be created, false otherwise.
293 sub check_zebra_dirs {
294     my ($base) = shift() . '/';
295     my $needed_repairing = 0;
296     my @dirs = ( '', 'key', 'register', 'shadow', 'tmp' );
297     foreach my $dir (@dirs) {
298         my $bdir = $base . $dir;
299         if (! -d $bdir) {
300             $needed_repairing = 1;
301             mkdir $bdir || die "Unable to create '$bdir': $!\n";
302             print "$0: needed to create '$bdir'\n";
303         }
304     }
305     return $needed_repairing;
306 }   # ----------  end of subroutine check_zebra_dirs  ----------
307
308 sub index_records {
309     my ($record_type, $directory, $skip_export, $skip_index, $process_zebraqueue, $as_xml, $noxml, $nosanitize, $do_not_clear_zebraqueue, $verbose_logging, $zebraidx_log_opt, $server_dir) = @_;
310
311     my $num_records_exported = 0;
312     my $records_deleted;
313     my $need_reset = check_zebra_dirs($server_dir);
314     if ($need_reset) {
315         print "$0: found broken zebra server directories: forcing a rebuild\n";
316         $reset = 1;
317     }
318     if ($skip_export && $verbose_logging) {
319         print "====================\n";
320         print "SKIPPING $record_type export\n";
321         print "====================\n";
322     } else {
323         if ( $verbose_logging ) {
324             print "====================\n";
325             print "exporting $record_type\n";
326             print "====================\n";
327         }
328         mkdir "$directory" unless (-d $directory);
329         mkdir "$directory/$record_type" unless (-d "$directory/$record_type");
330         if ($process_zebraqueue) {
331             my $entries = select_zebraqueue_records($record_type, 'deleted');
332             mkdir "$directory/del_$record_type" unless (-d "$directory/del_$record_type");
333             $records_deleted = generate_deleted_marc_records($record_type, $entries, "$directory/del_$record_type", $as_xml);
334             mark_zebraqueue_batch_done($entries);
335             $entries = select_zebraqueue_records($record_type, 'updated');
336             mkdir "$directory/upd_$record_type" unless (-d "$directory/upd_$record_type");
337             $num_records_exported = export_marc_records_from_list($record_type,
338                                                                   $entries, "$directory/upd_$record_type", $as_xml, $noxml, $records_deleted);
339             mark_zebraqueue_batch_done($entries);
340         } else {
341             my $sth = select_all_records($record_type);
342             $num_records_exported = export_marc_records_from_sth($record_type, $sth, "$directory/$record_type", $as_xml, $noxml, $nosanitize);
343             unless ($do_not_clear_zebraqueue) {
344                 mark_all_zebraqueue_done($record_type);
345             }
346         }
347     }
348
349     #
350     # and reindexing everything
351     #
352     if ($skip_index) {
353         if ($verbose_logging) {
354             print "====================\n";
355             print "SKIPPING $record_type indexing\n";
356             print "====================\n";
357         }
358     } else {
359         if ( $verbose_logging ) {
360             print "====================\n";
361             print "REINDEXING zebra\n";
362             print "====================\n";
363         }
364         my $record_fmt = ($as_xml) ? 'marcxml' : 'iso2709' ;
365         if ($process_zebraqueue) {
366             do_indexing($record_type, 'adelete', "$directory/del_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
367                 if %$records_deleted;
368             do_indexing($record_type, 'update', "$directory/upd_$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
369                 if $num_records_exported;
370         } else {
371             do_indexing($record_type, 'update', "$directory/$record_type", $reset, $noshadow, $record_fmt, $zebraidx_log_opt)
372                 if ($num_records_exported or $skip_export);
373         }
374     }
375 }
376
377
378 sub select_zebraqueue_records {
379     my ($record_type, $update_type) = @_;
380
381     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
382     my $op = ($update_type eq 'deleted') ? 'recordDelete' : 'specialUpdate';
383
384     my $sth = $dbh->prepare("SELECT id, biblio_auth_number
385                              FROM zebraqueue
386                              WHERE server = ?
387                              AND   operation = ?
388                              AND   done = 0
389                              ORDER BY id DESC");
390     $sth->execute($server, $op);
391     my $entries = $sth->fetchall_arrayref({});
392 }
393
394 sub mark_all_zebraqueue_done {
395     my ($record_type) = @_;
396
397     my $server = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
398
399     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1
400                              WHERE server = ?
401                              AND done = 0");
402     $sth->execute($server);
403 }
404
405 sub mark_zebraqueue_batch_done {
406     my ($entries) = @_;
407
408     $dbh->{AutoCommit} = 0;
409     my $sth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
410     $dbh->commit();
411     foreach my $id (map { $_->{id} } @$entries) {
412         $sth->execute($id);
413     }
414     $dbh->{AutoCommit} = 1;
415 }
416
417 sub select_all_records {
418     my $record_type = shift;
419     return ($record_type eq 'biblio') ? select_all_biblios() : select_all_authorities();
420 }
421
422 sub select_all_authorities {
423     my $strsth=qq{SELECT authid FROM auth_header};
424     $strsth.=qq{ WHERE $where } if ($where);
425     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
426     $strsth.=qq{ LIMIT $offset,$length } if ($length && $offset);
427     my $sth = $dbh->prepare($strsth);
428     $sth->execute();
429     return $sth;
430 }
431
432 sub select_all_biblios {
433     my $strsth = qq{ SELECT biblionumber FROM biblioitems };
434     $strsth.=qq{ WHERE $where } if ($where);
435     $strsth.=qq{ LIMIT $length } if ($length && !$offset);
436     $strsth.=qq{ LIMIT $offset,$length } if ($offset);
437     my $sth = $dbh->prepare($strsth);
438     $sth->execute();
439     return $sth;
440 }
441
442 sub include_xml_wrapper {
443     my $as_xml = shift;
444     my $record_type = shift;
445
446     return 0 unless $as_xml;
447     return 1 if $record_type eq 'biblio' and $bib_index_mode eq 'dom';
448     return 1 if $record_type eq 'authority' and $auth_index_mode eq 'dom';
449     return 0;
450
451 }
452
453 sub export_marc_records_from_sth {
454     my ($record_type, $sth, $directory, $as_xml, $noxml, $nosanitize) = @_;
455
456     my $num_exported = 0;
457     open my $fh, '>:encoding(UTF-8) ', "$directory/exported_records" or die $!;
458     if (include_xml_wrapper($as_xml, $record_type)) {
459         # include XML declaration and root element
460         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
461     }
462     my $i = 0;
463     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField("items.itemnumber",'');
464     while (my ($record_number) = $sth->fetchrow_array) {
465         print "." if ( $verbose_logging );
466         print "\r$i" unless ($i++ %100 or !$verbose_logging);
467         if ( $nosanitize ) {
468             my $marcxml = $record_type eq 'biblio'
469                           ? GetXmlBiblio( $record_number )
470                           : GetAuthorityXML( $record_number );
471             if ($record_type eq 'biblio'){
472                 my @items = GetItemsInfo($record_number);
473                 if (@items){
474                     my $record = MARC::Record->new;
475                     $record->encoding('UTF-8');
476                     my @itemsrecord;
477                     foreach my $item (@items){
478                         my $record = Item2Marc($item, $record_number);
479                         push @itemsrecord, $record->field($itemtag);
480                     }
481                     $record->insert_fields_ordered(@itemsrecord);
482                     my $itemsxml = $record->as_xml_record();
483                     $marcxml =
484                         substr($marcxml, 0, length($marcxml)-10) .
485                         substr($itemsxml, index($itemsxml, "</leader>\n", 0) + 10);
486                 }
487             }
488             # extra test to ensure that result is valid XML; otherwise
489             # Zebra won't parse it in DOM mode
490             eval {
491                 my $doc = $tester->parse_string($marcxml);
492             };
493             if ($@) {
494                 warn "Error exporting record $record_number ($record_type): $@\n";
495                 next;
496             }
497             if ( $marcxml ) {
498                 $marcxml =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
499                 print {$fh} $marcxml;
500                 $num_exported++;
501             }
502             next;
503         }
504         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
505         if (defined $marc) {
506             eval {
507                 my $rec;
508                 if ($as_xml) {
509                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
510                     eval {
511                         my $doc = $tester->parse_string($rec);
512                     };
513                     if ($@) {
514                         die "invalid XML: $@";
515                     }
516                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
517                 } else {
518                     $rec = $marc->as_usmarc();
519                 }
520                 print {$fh} $rec;
521                 $num_exported++;
522             };
523             if ($@) {
524                 warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
525                 warn "... specific error is $@" if $verbose_logging;
526             }
527         }
528     }
529     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
530     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
531     close $fh;
532     return $num_exported;
533 }
534
535 sub export_marc_records_from_list {
536     my ($record_type, $entries, $directory, $as_xml, $noxml, $records_deleted) = @_;
537
538     my $num_exported = 0;
539     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
540     if (include_xml_wrapper($as_xml, $record_type)) {
541         # include XML declaration and root element
542         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
543     }
544     my $i = 0;
545
546     # Skip any deleted records. We check for this anyway, but this reduces error spam
547     my %found = %$records_deleted;
548     foreach my $record_number ( map { $_->{biblio_auth_number} }
549                                 grep { !$found{ $_->{biblio_auth_number} }++ }
550                                 @$entries ) {
551         print "." if ( $verbose_logging );
552         print "\r$i" unless ($i++ %100 or !$verbose_logging);
553         my ($marc) = get_corrected_marc_record($record_type, $record_number, $noxml);
554         if (defined $marc) {
555             eval {
556                 my $rec;
557                 if ($as_xml) {
558                     $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
559                     $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
560                 } else {
561                     $rec = $marc->as_usmarc();
562                 }
563                 print {$fh} $rec;
564                 $num_exported++;
565             };
566             if ($@) {
567               warn "Error exporting record $record_number ($record_type) ".($noxml ? "not XML" : "XML");
568             }
569         }
570     }
571     print "\nRecords exported: $num_exported\n" if ( $verbose_logging );
572     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
573     close $fh;
574     return $num_exported;
575 }
576
577 sub generate_deleted_marc_records {
578     my ($record_type, $entries, $directory, $as_xml) = @_;
579
580     my $records_deleted = {};
581     open my $fh, '>:encoding(UTF-8)', "$directory/exported_records" or die $!;
582     if (include_xml_wrapper($as_xml, $record_type)) {
583         # include XML declaration and root element
584         print {$fh} '<?xml version="1.0" encoding="UTF-8"?><collection>';
585     }
586     my $i = 0;
587     foreach my $record_number (map { $_->{biblio_auth_number} } @$entries ) {
588         print "\r$i" unless ($i++ %100 or !$verbose_logging);
589         print "." if ( $verbose_logging );
590
591         my $marc = MARC::Record->new();
592         if ($record_type eq 'biblio') {
593             fix_biblio_ids($marc, $record_number, $record_number);
594         } else {
595             fix_authority_id($marc, $record_number);
596         }
597         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
598             fix_unimarc_100($marc);
599         }
600
601         my $rec;
602         if ($as_xml) {
603             $rec = $marc->as_xml_record(C4::Context->preference('marcflavour'));
604             $rec =~ s!<\?xml version="1.0" encoding="UTF-8"\?>\n!!;
605         } else {
606             $rec = $marc->as_usmarc();
607         }
608         print {$fh} $rec;
609
610         $records_deleted->{$record_number} = 1;
611     }
612     print "\nRecords exported: $i\n" if ( $verbose_logging );
613     print {$fh} '</collection>' if (include_xml_wrapper($as_xml, $record_type));
614     close $fh;
615     return $records_deleted;
616
617
618 }
619
620 sub get_corrected_marc_record {
621     my ($record_type, $record_number, $noxml) = @_;
622
623     my $marc = get_raw_marc_record($record_type, $record_number, $noxml);
624
625     if (defined $marc) {
626         fix_leader($marc);
627         if ($record_type eq 'authority') {
628             fix_authority_id($marc, $record_number);
629         } elsif ($record_type eq 'biblio' && C4::Context->preference('IncludeSeeFromInSearches')) {
630             my $normalizer = Koha::RecordProcessor->new( { filters => 'EmbedSeeFromHeadings' } );
631             $marc = $normalizer->process($marc);
632         }
633         if (C4::Context->preference("marcflavour") eq "UNIMARC") {
634             fix_unimarc_100($marc);
635         }
636     }
637
638     return $marc;
639 }
640
641 sub get_raw_marc_record {
642     my ($record_type, $record_number, $noxml) = @_;
643
644     my $marc;
645     if ($record_type eq 'biblio') {
646         if ($noxml) {
647             my $fetch_sth = $dbh->prepare_cached("SELECT marc FROM biblioitems WHERE biblionumber = ?");
648             $fetch_sth->execute($record_number);
649             if (my ($blob) = $fetch_sth->fetchrow_array) {
650                 $marc = MARC::Record->new_from_usmarc($blob);
651                 unless ($marc) {
652                     warn "error creating MARC::Record from $blob";
653                 }
654             }
655             # failure to find a bib is not a problem -
656             # a delete could have been done before
657             # trying to process a record update
658
659             $fetch_sth->finish();
660             return unless $marc;
661         } else {
662             eval { $marc = GetMarcBiblio($record_number, 1); };
663             if ($@ || !$marc) {
664                 # here we do warn since catching an exception
665                 # means that the bib was found but failed
666                 # to be parsed
667                 warn "error retrieving biblio $record_number";
668                 return;
669             }
670         }
671     } else {
672         eval { $marc = GetAuthority($record_number); };
673         if ($@) {
674             warn "error retrieving authority $record_number";
675             return;
676         }
677     }
678     return $marc;
679 }
680
681 sub fix_leader {
682     # FIXME - this routine is suspect
683     # It blanks the Leader/00-05 and Leader/12-16 to
684     # force them to be recalculated correct when
685     # the $marc->as_usmarc() or $marc->as_xml() is called.
686     # But why is this necessary?  It would be a serious bug
687     # in MARC::Record (definitely) and MARC::File::XML (arguably)
688     # if they are emitting incorrect leader values.
689     my $marc = shift;
690
691     my $leader = $marc->leader;
692     substr($leader,  0, 5) = '     ';
693     substr($leader, 10, 7) = '22     ';
694     $marc->leader(substr($leader, 0, 24));
695 }
696
697 sub fix_biblio_ids {
698     # FIXME - it is essential to ensure that the biblionumber is present,
699     #         otherwise, Zebra will choke on the record.  However, this
700     #         logic belongs in the relevant C4::Biblio APIs.
701     my $marc = shift;
702     my $biblionumber = shift;
703     my $biblioitemnumber;
704     if (@_) {
705         $biblioitemnumber = shift;
706     } else {
707         my $sth = $dbh->prepare(
708             "SELECT biblioitemnumber FROM biblioitems WHERE biblionumber=?");
709         $sth->execute($biblionumber);
710         ($biblioitemnumber) = $sth->fetchrow_array;
711         $sth->finish;
712         unless ($biblioitemnumber) {
713             warn "failed to get biblioitemnumber for biblio $biblionumber";
714             return 0;
715         }
716     }
717
718     # FIXME - this is cheating on two levels
719     # 1. C4::Biblio::_koha_marc_update_bib_ids is meant to be an internal function
720     # 2. Making sure that the biblionumber and biblioitemnumber are correct and
721     #    present in the MARC::Record object ought to be part of GetMarcBiblio.
722     #
723     # On the other hand, this better for now than what rebuild_zebra.pl used to
724     # do, which was duplicate the code for inserting the biblionumber
725     # and biblioitemnumber
726     C4::Biblio::_koha_marc_update_bib_ids($marc, '', $biblionumber, $biblioitemnumber);
727
728     return 1;
729 }
730
731 sub fix_authority_id {
732     # FIXME - as with fix_biblio_ids, the authid must be present
733     #         for Zebra's sake.  However, this really belongs
734     #         in C4::AuthoritiesMarc.
735     my ($marc, $authid) = @_;
736     unless ($marc->field('001') and $marc->field('001')->data() eq $authid){
737         $marc->delete_field($marc->field('001'));
738         $marc->insert_fields_ordered(MARC::Field->new('001',$authid));
739     }
740 }
741
742 sub fix_unimarc_100 {
743     # FIXME - again, if this is necessary, it belongs in C4::AuthoritiesMarc.
744     my $marc = shift;
745
746     my $string;
747     if ( length($marc->subfield( 100, "a" )) == 36 ) {
748         $string = $marc->subfield( 100, "a" );
749         my $f100 = $marc->field(100);
750         $marc->delete_field($f100);
751     }
752     else {
753         $string = POSIX::strftime( "%Y%m%d", localtime );
754         $string =~ s/\-//g;
755         $string = sprintf( "%-*s", 35, $string );
756     }
757     substr( $string, 22, 6, "frey50" );
758     unless ( length($marc->subfield( 100, "a" )) == 36 ) {
759         $marc->delete_field($marc->field(100));
760         $marc->insert_grouped_field(MARC::Field->new( 100, "", "", "a" => $string ));
761     }
762 }
763
764 sub do_indexing {
765     my ($record_type, $op, $record_dir, $reset_index, $noshadow, $record_format, $zebraidx_log_opt) = @_;
766
767     my $zebra_server  = ($record_type eq 'biblio') ? 'biblioserver' : 'authorityserver';
768     my $zebra_db_name = ($record_type eq 'biblio') ? 'biblios' : 'authorities';
769     my $zebra_config  = C4::Context->zebraconfig($zebra_server)->{'config'};
770     my $zebra_db_dir  = C4::Context->zebraconfig($zebra_server)->{'directory'};
771
772     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name init") if $reset_index;
773     system("zebraidx -c $zebra_config $zebraidx_log_opt $noshadow -g $record_format -d $zebra_db_name $op $record_dir");
774     system("zebraidx -c $zebra_config $zebraidx_log_opt -g $record_format -d $zebra_db_name commit") unless $noshadow;
775
776 }
777
778 sub _flock {
779 # test if flock is present; if so, use it; if not, return true
780 # op refers to the official flock operations incl LOCK_EX, LOCK_UN, etc.
781 # combining LOCK_EX with LOCK_NB returns immediately
782     my ($fh, $op)= @_;
783     if( !defined($use_flock) ) {
784         #check if flock is present; if not, you will have a fatal error
785         my $i=eval { flock($fh, $op) };
786         #assuming that $fh and $op are fine(..), an undef i means no flock
787         $use_flock= defined($i)? 1: 0;
788         print "Warning: flock could not be used!\n" if $verbose_logging && !$use_flock;
789         return 1 if !$use_flock;
790         return $i;
791     }
792     else {
793         return 1 if !$use_flock;
794         return flock($fh, $op);
795     }
796 }
797
798 sub _create_lockfile { #returns undef on failure
799     my $dir= shift;
800     unless (-d $dir) {
801         eval { mkpath($dir, 0, oct(755)) };
802         return if $@;
803     }
804     return if !open my $fh, q{>}, $dir.'/'.LOCK_FILENAME;
805     return ( $fh, $dir.'/'.LOCK_FILENAME );
806 }
807
808 sub print_usage {
809     print <<_USAGE_;
810 $0: reindex MARC bibs and/or authorities in Zebra.
811
812 Use this batch job to reindex all biblio or authority
813 records in your Koha database.
814
815 Parameters:
816
817     -b                      index bibliographic records
818
819     -a                      index authority records
820
821     -daemon                 Run in daemon mode.  The program will loop checking
822                             for entries on the zebraqueue table, processing
823                             them incrementally if present, and then sleep
824                             for a few seconds before repeating the process
825                             Checking the zebraqueue table is done with a cheap
826                             SQL query.  This allows for near realtime update of
827                             the zebra search index with low system overhead.
828                             Use -sleep to control the checking interval.
829
830                             Daemon mode implies -z, -a, -b.  The program will
831                             refuse to start if options are present that do not
832                             make sense while running as an incremental update
833                             daemon (e.g. -r or -offset).
834
835     -sleep 10               Seconds to sleep between checks of the zebraqueue
836                             table in daemon mode.  The default is 5 seconds.
837
838     -z                      select only updated and deleted
839                             records marked in the zebraqueue
840                             table.  Cannot be used with -r
841                             or -s.
842
843     -r                      clear Zebra index before
844                             adding records to index. Implies -w.
845
846     -d                      Temporary directory for indexing.
847                             If not specified, one is automatically
848                             created.  The export directory
849                             is automatically deleted unless
850                             you supply the -k switch.
851
852     -k                      Do not delete export directory.
853
854     -s                      Skip export.  Used if you have
855                             already exported the records
856                             in a previous run.
857
858     -noxml                  index from ISO MARC blob
859                             instead of MARC XML.  This
860                             option is recommended only
861                             for advanced user.
862
863     -x                      export and index as xml instead of is02709 (biblios only).
864                             use this if you might have records > 99,999 chars,
865
866     -nosanitize             export biblio/authority records directly from DB marcxml
867                             field without sanitizing records. It speed up
868                             dump process but could fail if DB contains badly
869                             encoded records. Works only with -x,
870
871     -w                      skip shadow indexing for this batch
872
873     -y                      do NOT clear zebraqueue after indexing; normally,
874                             after doing batch indexing, zebraqueue should be
875                             marked done for the affected record type(s) so that
876                             a running zebraqueue_daemon doesn't try to reindex
877                             the same records - specify -y to override this.
878                             Cannot be used with -z.
879
880     -v                      increase the amount of logging.  Normally only
881                             warnings and errors from the indexing are shown.
882                             Use log level 2 (-v -v) to include all Zebra logs.
883
884     --length   1234         how many biblio you want to export
885     --offset 1243           offset you want to start to
886                                 example: --offset 500 --length=500 will result in a LIMIT 500,1000 (exporting 1000 records, starting by the 500th one)
887                                 note that the numbers are NOT related to biblionumber, that's the intended behaviour.
888     --where                 let you specify a WHERE query, like itemtype='BOOK'
889                             or something like that
890
891     --run-as-root           explicitily allow script to run as 'root' user
892
893     --wait-for-lock         when not running in daemon mode, the default
894                             behavior is to abort a rebuild if the rebuild
895                             lock is busy.  This option will cause the program
896                             to wait for the lock to free and then continue
897                             processing the rebuild request,
898
899     --help or -h            show this message.
900 _USAGE_
901 }