Bug 30903: (follow-up) Fix error message class
[koha-ffzg.git] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::SearchEngine;
39 use Koha::SearchEngine::Indexer;
40 use Koha::Plugins::Handler;
41 use Koha::Logger;
42
43 our (@ISA, @EXPORT_OK);
44 BEGIN {
45     require Exporter;
46     @ISA       = qw(Exporter);
47     @EXPORT_OK = qw(
48       GetZ3950BatchId
49       GetWebserviceBatchId
50       GetImportRecordMarc
51       GetImportRecordMarcXML
52       GetRecordFromImportBiblio
53       AddImportBatch
54       GetImportBatch
55       AddAuthToBatch
56       AddBiblioToBatch
57       AddItemsToImportBiblio
58       ModAuthorityInBatch
59
60       BatchStageMarcRecords
61       BatchFindDuplicates
62       BatchCommitRecords
63       BatchRevertRecords
64       CleanBatch
65       DeleteBatch
66
67       GetAllImportBatches
68       GetStagedWebserviceBatches
69       GetImportBatchRangeDesc
70       GetNumberOfNonZ3950ImportBatches
71       GetImportBiblios
72       GetImportRecordsRange
73       GetItemNumbersFromImportBatch
74
75       GetImportBatchStatus
76       SetImportBatchStatus
77       GetImportBatchOverlayAction
78       SetImportBatchOverlayAction
79       GetImportBatchNoMatchAction
80       SetImportBatchNoMatchAction
81       GetImportBatchItemAction
82       SetImportBatchItemAction
83       GetImportBatchMatcher
84       SetImportBatchMatcher
85       GetImportRecordOverlayStatus
86       SetImportRecordOverlayStatus
87       GetImportRecordStatus
88       SetImportRecordStatus
89       SetMatchedBiblionumber
90       GetImportRecordMatches
91       SetImportRecordMatches
92
93       RecordsFromMARCXMLFile
94       RecordsFromISO2709File
95       RecordsFromMarcPlugin
96     );
97 }
98
99 =head1 NAME
100
101 C4::ImportBatch - manage batches of imported MARC records
102
103 =head1 SYNOPSIS
104
105 use C4::ImportBatch;
106
107 =head1 FUNCTIONS
108
109 =head2 GetZ3950BatchId
110
111   my $batchid = GetZ3950BatchId($z3950server);
112
113 Retrieves the ID of the import batch for the Z39.50
114 reservoir for the given target.  If necessary,
115 creates the import batch.
116
117 =cut
118
119 sub GetZ3950BatchId {
120     my ($z3950server) = @_;
121
122     my $dbh = C4::Context->dbh;
123     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
124                              WHERE  batch_type = 'z3950'
125                              AND    file_name = ?");
126     $sth->execute($z3950server);
127     my $rowref = $sth->fetchrow_arrayref();
128     $sth->finish();
129     if (defined $rowref) {
130         return $rowref->[0];
131     } else {
132         my $batch_id = AddImportBatch( {
133                 overlay_action => 'create_new',
134                 import_status => 'staged',
135                 batch_type => 'z3950',
136                 file_name => $z3950server,
137             } );
138         return $batch_id;
139     }
140     
141 }
142
143 =head2 GetWebserviceBatchId
144
145   my $batchid = GetWebserviceBatchId();
146
147 Retrieves the ID of the import batch for webservice.
148 If necessary, creates the import batch.
149
150 =cut
151
152 my $WEBSERVICE_BASE_QRY = <<EOQ;
153 SELECT import_batch_id FROM import_batches
154 WHERE  batch_type = 'webservice'
155 AND    import_status = 'staged'
156 EOQ
157 sub GetWebserviceBatchId {
158     my ($params) = @_;
159
160     my $dbh = C4::Context->dbh;
161     my $sql = $WEBSERVICE_BASE_QRY;
162     my @args;
163     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
164         if (my $val = $params->{$field}) {
165             $sql .= " AND $field = ?";
166             push @args, $val;
167         }
168     }
169     my $id = $dbh->selectrow_array($sql, undef, @args);
170     return $id if $id;
171
172     $params->{batch_type} = 'webservice';
173     $params->{import_status} = 'staged';
174     return AddImportBatch($params);
175 }
176
177 =head2 GetImportRecordMarc
178
179   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
180
181 =cut
182
183 sub GetImportRecordMarc {
184     my ($import_record_id) = @_;
185
186     my $dbh = C4::Context->dbh;
187     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
188         SELECT marc, encoding
189         FROM import_records
190         WHERE import_record_id = ?
191     |, undef, $import_record_id );
192
193     return $marc, $encoding;
194 }
195
196 sub GetRecordFromImportBiblio {
197     my ( $import_record_id, $embed_items ) = @_;
198
199     my ($marc) = GetImportRecordMarc($import_record_id);
200     my $record = MARC::Record->new_from_usmarc($marc);
201
202     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
203
204     return $record;
205 }
206
207 sub EmbedItemsInImportBiblio {
208     my ( $record, $import_record_id ) = @_;
209     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
210     my $dbh = C4::Context->dbh;
211     my $import_items = $dbh->selectall_arrayref(q|
212         SELECT import_items.marcxml
213         FROM import_items
214         WHERE import_record_id = ?
215     |, { Slice => {} }, $import_record_id );
216     my @item_fields;
217     for my $import_item ( @$import_items ) {
218         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
219         push @item_fields, $item_marc->field($itemtag);
220     }
221     $record->append_fields(@item_fields);
222     return $record;
223 }
224
225 =head2 GetImportRecordMarcXML
226
227   my $marcxml = GetImportRecordMarcXML($import_record_id);
228
229 =cut
230
231 sub GetImportRecordMarcXML {
232     my ($import_record_id) = @_;
233
234     my $dbh = C4::Context->dbh;
235     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
236     $sth->execute($import_record_id);
237     my ($marcxml) = $sth->fetchrow();
238     $sth->finish();
239     return $marcxml;
240
241 }
242
243 =head2 AddImportBatch
244
245   my $batch_id = AddImportBatch($params_hash);
246
247 =cut
248
249 sub AddImportBatch {
250     my ($params) = @_;
251
252     my (@fields, @vals);
253     foreach (qw( matcher_id template_id branchcode
254                  overlay_action nomatch_action item_action
255                  import_status batch_type file_name comments record_type )) {
256         if (exists $params->{$_}) {
257             push @fields, $_;
258             push @vals, $params->{$_};
259         }
260     }
261     my $dbh = C4::Context->dbh;
262     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
263                                   VALUES (".join( ',', map '?', @fields).")",
264              undef,
265              @vals);
266     return $dbh->{'mysql_insertid'};
267 }
268
269 =head2 GetImportBatch 
270
271   my $row = GetImportBatch($batch_id);
272
273 Retrieve a hashref of an import_batches row.
274
275 =cut
276
277 sub GetImportBatch {
278     my ($batch_id) = @_;
279
280     my $dbh = C4::Context->dbh;
281     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
282     $sth->bind_param(1, $batch_id);
283     $sth->execute();
284     my $result = $sth->fetchrow_hashref;
285     $sth->finish();
286     return $result;
287
288 }
289
290 =head2 AddBiblioToBatch 
291
292   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
293                 $marc_record, $encoding, $update_counts);
294
295 =cut
296
297 sub AddBiblioToBatch {
298     my $batch_id = shift;
299     my $record_sequence = shift;
300     my $marc_record = shift;
301     my $encoding = shift;
302     my $update_counts = @_ ? shift : 1;
303
304     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
305     _add_biblio_fields($import_record_id, $marc_record);
306     _update_batch_record_counts($batch_id) if $update_counts;
307     return $import_record_id;
308 }
309
310 =head2 AddAuthToBatch
311
312   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
313                 $marc_record, $encoding, $update_counts, [$marc_type]);
314
315 =cut
316
317 sub AddAuthToBatch {
318     my $batch_id = shift;
319     my $record_sequence = shift;
320     my $marc_record = shift;
321     my $encoding = shift;
322     my $update_counts = @_ ? shift : 1;
323     my $marc_type = shift || C4::Context->preference('marcflavour');
324
325     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
326
327     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
328     _add_auth_fields($import_record_id, $marc_record);
329     _update_batch_record_counts($batch_id) if $update_counts;
330     return $import_record_id;
331 }
332
333 =head2 BatchStageMarcRecords
334
335 ( $batch_id, $num_records, $num_items, @invalid_records ) =
336   BatchStageMarcRecords(
337     $record_type,                $encoding,
338     $marc_records,               $file_name,
339     $marc_modification_template, $comments,
340     $branch_code,                $parse_items,
341     $leave_as_staging,           $progress_interval,
342     $progress_callback
343   );
344
345 =cut
346
347 sub BatchStageMarcRecords {
348     my $record_type = shift;
349     my $encoding = shift;
350     my $marc_records = shift;
351     my $file_name = shift;
352     my $marc_modification_template = shift;
353     my $comments = shift;
354     my $branch_code = shift;
355     my $parse_items = shift;
356     my $leave_as_staging = shift;
357
358     # optional callback to monitor status 
359     # of job
360     my $progress_interval = 0;
361     my $progress_callback = undef;
362     if ($#_ == 1) {
363         $progress_interval = shift;
364         $progress_callback = shift;
365         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
366         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
367     } 
368     
369     my $batch_id = AddImportBatch( {
370             overlay_action => 'create_new',
371             import_status => 'staging',
372             batch_type => 'batch',
373             file_name => $file_name,
374             comments => $comments,
375             record_type => $record_type,
376         } );
377     if ($parse_items) {
378         SetImportBatchItemAction($batch_id, 'always_add');
379     } else {
380         SetImportBatchItemAction($batch_id, 'ignore');
381     }
382
383
384     my $marc_type = C4::Context->preference('marcflavour');
385     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
386     my @invalid_records = ();
387     my $num_valid = 0;
388     my $num_items = 0;
389     # FIXME - for now, we're dealing only with bibs
390     my $rec_num = 0;
391     foreach my $marc_record (@$marc_records) {
392         $rec_num++;
393         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
394             &$progress_callback($rec_num);
395         }
396
397         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
398
399         my $import_record_id;
400         if (scalar($marc_record->fields()) == 0) {
401             push @invalid_records, $marc_record;
402         } else {
403
404             # Normalize the record so it doesn't have separated diacritics
405             SetUTF8Flag($marc_record);
406
407             $num_valid++;
408             if ($record_type eq 'biblio') {
409                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
410                 if ($parse_items) {
411                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
412                     $num_items += scalar(@import_items_ids);
413                 }
414             } elsif ($record_type eq 'auth') {
415                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
416             }
417         }
418     }
419     unless ($leave_as_staging) {
420         SetImportBatchStatus($batch_id, 'staged');
421     }
422     # FIXME branch_code, number of bibs, number of items
423     _update_batch_record_counts($batch_id);
424     return ($batch_id, $num_valid, $num_items, @invalid_records);
425 }
426
427 =head2 AddItemsToImportBiblio
428
429   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
430                 $import_record_id, $marc_record, $update_counts);
431
432 =cut
433
434 sub AddItemsToImportBiblio {
435     my $batch_id = shift;
436     my $import_record_id = shift;
437     my $marc_record = shift;
438     my $update_counts = @_ ? shift : 0;
439
440     my @import_items_ids = ();
441    
442     my $dbh = C4::Context->dbh; 
443     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
444     foreach my $item_field ($marc_record->field($item_tag)) {
445         my $item_marc = MARC::Record->new();
446         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
447         $item_marc->append_fields($item_field);
448         $marc_record->delete_field($item_field);
449         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
450                                         VALUES (?, ?, ?)");
451         $sth->bind_param(1, $import_record_id);
452         $sth->bind_param(2, 'staged');
453         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
454         $sth->execute();
455         push @import_items_ids, $dbh->{'mysql_insertid'};
456         $sth->finish();
457     }
458
459     if ($#import_items_ids > -1) {
460         _update_batch_record_counts($batch_id) if $update_counts;
461         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
462     }
463     return @import_items_ids;
464 }
465
466 =head2 BatchFindDuplicates
467
468   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
469              $max_matches, $progress_interval, $progress_callback);
470
471 Goes through the records loaded in the batch and attempts to 
472 find duplicates for each one.  Sets the matching status 
473 of each record to "no_match" or "auto_match" as appropriate.
474
475 The $max_matches parameter is optional; if it is not supplied,
476 it defaults to 10.
477
478 The $progress_interval and $progress_callback parameters are 
479 optional; if both are supplied, the sub referred to by
480 $progress_callback will be invoked every $progress_interval
481 records using the number of records processed as the 
482 singular argument.
483
484 =cut
485
486 sub BatchFindDuplicates {
487     my $batch_id = shift;
488     my $matcher = shift;
489     my $max_matches = @_ ? shift : 10;
490
491     # optional callback to monitor status 
492     # of job
493     my $progress_interval = 0;
494     my $progress_callback = undef;
495     if ($#_ == 1) {
496         $progress_interval = shift;
497         $progress_callback = shift;
498         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
499         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
500     }
501
502     my $dbh = C4::Context->dbh;
503
504     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
505                              FROM import_records
506                              WHERE import_batch_id = ?");
507     $sth->execute($batch_id);
508     my $num_with_matches = 0;
509     my $rec_num = 0;
510     while (my $rowref = $sth->fetchrow_hashref) {
511         $rec_num++;
512         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
513             &$progress_callback($rec_num);
514         }
515         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
516         my @matches = ();
517         if (defined $matcher) {
518             @matches = $matcher->get_matches($marc_record, $max_matches);
519         }
520         if (scalar(@matches) > 0) {
521             $num_with_matches++;
522             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
523             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
524         } else {
525             SetImportRecordMatches($rowref->{'import_record_id'}, ());
526             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
527         }
528     }
529     $sth->finish();
530     return $num_with_matches;
531 }
532
533 =head2 BatchCommitRecords
534
535   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
536         BatchCommitRecords($batch_id, $framework,
537         $progress_interval, $progress_callback);
538
539 =cut
540
541 sub BatchCommitRecords {
542     my $batch_id = shift;
543     my $framework = shift;
544
545     my $schema = Koha::Database->schema;
546
547     # optional callback to monitor status 
548     # of job
549     my $progress_interval = 0;
550     my $progress_callback = undef;
551     if ($#_ == 1) {
552         $progress_interval = shift;
553         $progress_callback = shift;
554         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
555         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
556     }
557
558     my $record_type;
559     my $num_added = 0;
560     my $num_updated = 0;
561     my $num_items_added = 0;
562     my $num_items_replaced = 0;
563     my $num_items_errored = 0;
564     my $num_ignored = 0;
565     # commit (i.e., save, all records in the batch)
566     SetImportBatchStatus($batch_id, 'importing');
567     my $overlay_action = GetImportBatchOverlayAction($batch_id);
568     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
569     my $item_action = GetImportBatchItemAction($batch_id);
570     my $item_tag;
571     my $item_subfield;
572     my $dbh = C4::Context->dbh;
573     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
574                              FROM import_records
575                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
576                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
577                              WHERE import_batch_id = ?");
578     $sth->execute($batch_id);
579     my $marcflavour = C4::Context->preference('marcflavour');
580
581     my $userenv = C4::Context->userenv;
582     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
583
584     my $rec_num = 0;
585     my @biblio_ids;
586     $schema->txn_begin; # We commit in a transaction
587     while (my $rowref = $sth->fetchrow_hashref) {
588         $record_type = $rowref->{'record_type'};
589
590         $rec_num++;
591
592         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
593             # report progress and commit
594             $schema->txn_commit;
595             &$progress_callback( $rec_num );
596             $schema->txn_begin;
597         }
598         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
599             $num_ignored++;
600             next;
601         }
602
603         my $marc_type;
604         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
605             $marc_type = 'UNIMARCAUTH';
606         } elsif ($marcflavour eq 'UNIMARC') {
607             $marc_type = 'UNIMARC';
608         } else {
609             $marc_type = 'USMARC';
610         }
611         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
612
613         if ($record_type eq 'biblio') {
614             # remove any item tags - rely on _batchCommitItems
615             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
616             foreach my $item_field ($marc_record->field($item_tag)) {
617                 $marc_record->delete_field($item_field);
618             }
619         }
620
621         my ($record_result, $item_result, $record_match) =
622             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
623                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
624
625         my $recordid;
626         my $query;
627         if ($record_result eq 'create_new') {
628             $num_added++;
629             if ($record_type eq 'biblio') {
630                 my $biblioitemnumber;
631                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework, { skip_record_index => 1 });
632                 push @biblio_ids, $recordid;
633                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
634                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
635                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result, $biblioitemnumber);
636                     $num_items_added += $bib_items_added;
637                     $num_items_replaced += $bib_items_replaced;
638                     $num_items_errored += $bib_items_errored;
639                 }
640             } else {
641                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
642                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
643             }
644             my $sth = $dbh->prepare_cached($query);
645             $sth->execute($recordid, $rowref->{'import_record_id'});
646             $sth->finish();
647             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
648         } elsif ($record_result eq 'replace') {
649             $num_updated++;
650             $recordid = $record_match;
651             my $oldxml;
652             if ($record_type eq 'biblio') {
653                 my $oldbiblio = Koha::Biblios->find( $recordid );
654                 $oldxml = GetXmlBiblio($recordid);
655
656                 # remove item fields so that they don't get
657                 # added again if record is reverted
658                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
659                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
660                 foreach my $item_field ($old_marc->field($item_tag)) {
661                     $old_marc->delete_field($item_field);
662                 }
663                 $oldxml = $old_marc->as_xml($marc_type);
664
665                 my $context = { source => 'batchimport' };
666                 if ($logged_in_patron) {
667                     $context->{categorycode} = $logged_in_patron->categorycode;
668                     $context->{userid} = $logged_in_patron->userid;
669                 }
670
671                 ModBiblio(
672                     $marc_record,
673                     $recordid,
674                     $oldbiblio->frameworkcode,
675                     {
676                         overlay_context   => $context,
677                         skip_record_index => 1
678                     }
679                 );
680                 push @biblio_ids, $recordid;
681                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
682
683                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
684                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
685                     $num_items_added += $bib_items_added;
686                     $num_items_replaced += $bib_items_replaced;
687                     $num_items_errored += $bib_items_errored;
688                 }
689             } else {
690                 $oldxml = GetAuthorityXML($recordid);
691
692                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
693                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
694             }
695             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
696             $sth->execute($oldxml, $rowref->{'import_record_id'});
697             $sth->finish();
698             my $sth2 = $dbh->prepare_cached($query);
699             $sth2->execute($recordid, $rowref->{'import_record_id'});
700             $sth2->finish();
701             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
702             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
703         } elsif ($record_result eq 'ignore') {
704             $recordid = $record_match;
705             $num_ignored++;
706             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
707                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = _batchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
708                 push @biblio_ids, $recordid if $bib_items_added || $bib_items_replaced;
709                 $num_items_added += $bib_items_added;
710          $num_items_replaced += $bib_items_replaced;
711                 $num_items_errored += $bib_items_errored;
712                 # still need to record the matched biblionumber so that the
713                 # items can be reverted
714                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
715                 $sth2->execute($recordid, $rowref->{'import_record_id'});
716                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
717             }
718             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
719         }
720     }
721     $schema->txn_commit; # Commit final records that may not have hit callback threshold
722     $sth->finish();
723
724     if ( @biblio_ids ) {
725         my $indexer = Koha::SearchEngine::Indexer->new({ index => $Koha::SearchEngine::BIBLIOS_INDEX });
726         $indexer->index_records( \@biblio_ids, "specialUpdate", "biblioserver" );
727     }
728
729     SetImportBatchStatus($batch_id, 'imported');
730     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
731 }
732
733 =head2 _batchCommitItems
734
735   ($num_items_added, $num_items_errored) = 
736          _batchCommitItems($import_record_id, $biblionumber, [$action, $biblioitemnumber]);
737
738 Private function for batch committing item changes. We do not trigger a re-index here, that is left to the caller.
739
740 =cut
741
742 sub _batchCommitItems {
743     my ( $import_record_id, $biblionumber, $action, $biblioitemnumber ) = @_;
744
745     my $dbh = C4::Context->dbh;
746
747     my $num_items_added = 0;
748     my $num_items_errored = 0;
749     my $num_items_replaced = 0;
750
751     my $sth = $dbh->prepare( "
752         SELECT import_items_id, import_items.marcxml, encoding
753         FROM import_items
754         JOIN import_records USING (import_record_id)
755         WHERE import_record_id = ?
756         ORDER BY import_items_id
757     " );
758     $sth->bind_param( 1, $import_record_id );
759     $sth->execute();
760
761     while ( my $row = $sth->fetchrow_hashref() ) {
762         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
763
764         # Delete date_due subfield as to not accidentally delete item checkout due dates
765         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
766         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
767
768         my $item = TransformMarcToKoha({ record => $item_marc, kohafields => ['items.barcode','items.itemnumber'] });
769
770         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
771         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
772
773         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
774         if ( $action eq "replace" && $duplicate_itemnumber ) {
775             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
776             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber}, { skip_record_index => 1 } );
777             $updsth->bind_param( 1, 'imported' );
778             $updsth->bind_param( 2, $item->{itemnumber} );
779             $updsth->bind_param( 3, undef );
780             $updsth->bind_param( 4, $row->{'import_items_id'} );
781             $updsth->execute();
782             $updsth->finish();
783             $num_items_replaced++;
784         } elsif ( $action eq "replace" && $duplicate_barcode ) {
785             my $itemnumber = $duplicate_barcode->itemnumber;
786             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber, { skip_record_index => 1 } );
787             $updsth->bind_param( 1, 'imported' );
788             $updsth->bind_param( 2, $item->{itemnumber} );
789             $updsth->bind_param( 3, undef );
790             $updsth->bind_param( 4, $row->{'import_items_id'} );
791             $updsth->execute();
792             $updsth->finish();
793             $num_items_replaced++;
794         } elsif ($duplicate_barcode) {
795             $updsth->bind_param( 1, 'error' );
796             $updsth->bind_param( 2, undef );
797             $updsth->bind_param( 3, 'duplicate item barcode' );
798             $updsth->bind_param( 4, $row->{'import_items_id'} );
799             $updsth->execute();
800             $num_items_errored++;
801         } else {
802             # Remove the itemnumber if it exists, we want to create a new item
803             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
804             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
805
806             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber, { biblioitemnumber => $biblioitemnumber, skip_record_index => 1 } );
807             if( $itemnumber ) {
808                 $updsth->bind_param( 1, 'imported' );
809                 $updsth->bind_param( 2, $itemnumber );
810                 $updsth->bind_param( 3, undef );
811                 $updsth->bind_param( 4, $row->{'import_items_id'} );
812                 $updsth->execute();
813                 $updsth->finish();
814                 $num_items_added++;
815             }
816         }
817     }
818
819     return ( $num_items_added, $num_items_replaced, $num_items_errored );
820 }
821
822 =head2 BatchRevertRecords
823
824   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
825       $num_ignored) = BatchRevertRecords($batch_id);
826
827 =cut
828
829 sub BatchRevertRecords {
830     my $batch_id = shift;
831
832     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
833
834     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
835
836     my $record_type;
837     my $num_deleted = 0;
838     my $num_errors = 0;
839     my $num_reverted = 0;
840     my $num_ignored = 0;
841     my $num_items_deleted = 0;
842     # commit (i.e., save, all records in the batch)
843     SetImportBatchStatus($batch_id, 'reverting');
844     my $overlay_action = GetImportBatchOverlayAction($batch_id);
845     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
846     my $dbh = C4::Context->dbh;
847     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
848                              FROM import_records
849                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
850                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
851                              WHERE import_batch_id = ?");
852     $sth->execute($batch_id);
853     my $marc_type;
854     my $marcflavour = C4::Context->preference('marcflavour');
855     while (my $rowref = $sth->fetchrow_hashref) {
856         $record_type = $rowref->{'record_type'};
857         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
858             $num_ignored++;
859             next;
860         }
861         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
862             $marc_type = 'UNIMARCAUTH';
863         } elsif ($marcflavour eq 'UNIMARC') {
864             $marc_type = 'UNIMARC';
865         } else {
866             $marc_type = 'USMARC';
867         }
868
869         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
870
871         if ($record_result eq 'delete') {
872             my $error = undef;
873             if  ($record_type eq 'biblio') {
874                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
875                 $error = DelBiblio($rowref->{'matched_biblionumber'});
876             } else {
877                 DelAuthority({ authid => $rowref->{'matched_authid'} });
878             }
879             if (defined $error) {
880                 $num_errors++;
881             } else {
882                 $num_deleted++;
883                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
884             }
885         } elsif ($record_result eq 'restore') {
886             $num_reverted++;
887             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
888             if ($record_type eq 'biblio') {
889                 my $biblionumber = $rowref->{'matched_biblionumber'};
890                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
891
892                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
893                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
894
895                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
896                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
897             } else {
898                 my $authid = $rowref->{'matched_authid'};
899                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
900             }
901             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
902         } elsif ($record_result eq 'ignore') {
903             if ($record_type eq 'biblio') {
904                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
905             }
906             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
907         }
908         my $query;
909         if ($record_type eq 'biblio') {
910             # remove matched_biblionumber only if there is no 'imported' item left
911             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
912             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
913         } else {
914             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
915         }
916         my $sth2 = $dbh->prepare_cached($query);
917         $sth2->execute($rowref->{'import_record_id'});
918     }
919
920     $sth->finish();
921     SetImportBatchStatus($batch_id, 'reverted');
922     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
923 }
924
925 =head2 BatchRevertItems
926
927   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
928
929 =cut
930
931 sub BatchRevertItems {
932     my ($import_record_id, $biblionumber) = @_;
933
934     my $dbh = C4::Context->dbh;
935     my $num_items_deleted = 0;
936
937     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
938                                    FROM import_items
939                                    JOIN items USING (itemnumber)
940                                    WHERE import_record_id = ?");
941     $sth->bind_param(1, $import_record_id);
942     $sth->execute();
943     while (my $row = $sth->fetchrow_hashref()) {
944         my $item = Koha::Items->find($row->{itemnumber});
945         if ($item->safe_delete){
946             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
947             $updsth->bind_param(1, 'reverted');
948             $updsth->bind_param(2, $row->{'import_items_id'});
949             $updsth->execute();
950             $updsth->finish();
951             $num_items_deleted++;
952         }
953         else {
954             next;
955         }
956     }
957     $sth->finish();
958     return $num_items_deleted;
959 }
960
961 =head2 CleanBatch
962
963   CleanBatch($batch_id)
964
965 Deletes all staged records from the import batch
966 and sets the status of the batch to 'cleaned'.  Note
967 that deleting a stage record does *not* affect
968 any record that has been committed to the database.
969
970 =cut
971
972 sub CleanBatch {
973     my $batch_id = shift;
974     return unless defined $batch_id;
975
976     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
977     SetImportBatchStatus($batch_id, 'cleaned');
978 }
979
980 =head2 DeleteBatch
981
982   DeleteBatch($batch_id)
983
984 Deletes the record from the database. This can only be done
985 once the batch has been cleaned.
986
987 =cut
988
989 sub DeleteBatch {
990     my $batch_id = shift;
991     return unless defined $batch_id;
992
993     my $dbh = C4::Context->dbh;
994     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
995     $sth->execute( $batch_id );
996 }
997
998 =head2 GetAllImportBatches
999
1000   my $results = GetAllImportBatches();
1001
1002 Returns a references to an array of hash references corresponding
1003 to all import_batches rows (of batch_type 'batch'), sorted in 
1004 ascending order by import_batch_id.
1005
1006 =cut
1007
1008 sub  GetAllImportBatches {
1009     my $dbh = C4::Context->dbh;
1010     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
1011                                     WHERE batch_type IN ('batch', 'webservice')
1012                                     ORDER BY import_batch_id ASC");
1013
1014     my $results = [];
1015     $sth->execute();
1016     while (my $row = $sth->fetchrow_hashref) {
1017         push @$results, $row;
1018     }
1019     $sth->finish();
1020     return $results;
1021 }
1022
1023 =head2 GetStagedWebserviceBatches
1024
1025   my $batch_ids = GetStagedWebserviceBatches();
1026
1027 Returns a references to an array of batch id's
1028 of batch_type 'webservice' that are not imported
1029
1030 =cut
1031
1032 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1033 SELECT import_batch_id FROM import_batches
1034 WHERE batch_type = 'webservice'
1035 AND import_status = 'staged'
1036 EOQ
1037 sub  GetStagedWebserviceBatches {
1038     my $dbh = C4::Context->dbh;
1039     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1040 }
1041
1042 =head2 GetImportBatchRangeDesc
1043
1044   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1045
1046 Returns a reference to an array of hash references corresponding to
1047 import_batches rows (sorted in descending order by import_batch_id)
1048 start at the given offset.
1049
1050 =cut
1051
1052 sub GetImportBatchRangeDesc {
1053     my ($offset, $results_per_group) = @_;
1054
1055     my $dbh = C4::Context->dbh;
1056     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1057                                     LEFT JOIN import_batch_profiles p
1058                                     ON b.profile_id = p.id
1059                                     WHERE b.batch_type IN ('batch', 'webservice')
1060                                     ORDER BY b.import_batch_id DESC";
1061     my @params;
1062     if ($results_per_group){
1063         $query .= " LIMIT ?";
1064         push(@params, $results_per_group);
1065     }
1066     if ($offset){
1067         $query .= " OFFSET ?";
1068         push(@params, $offset);
1069     }
1070     my $sth = $dbh->prepare_cached($query);
1071     $sth->execute(@params);
1072     my $results = $sth->fetchall_arrayref({});
1073     $sth->finish();
1074     return $results;
1075 }
1076
1077 =head2 GetItemNumbersFromImportBatch
1078
1079   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1080
1081 =cut
1082
1083 sub GetItemNumbersFromImportBatch {
1084     my ($batch_id) = @_;
1085     my $dbh = C4::Context->dbh;
1086     my $sql = q|
1087 SELECT itemnumber FROM import_items
1088 INNER JOIN items USING (itemnumber)
1089 INNER JOIN import_records USING (import_record_id)
1090 WHERE import_batch_id = ?|;
1091     my  $sth = $dbh->prepare( $sql );
1092     $sth->execute($batch_id);
1093     my @items ;
1094     while ( my ($itm) = $sth->fetchrow_array ) {
1095         push @items, $itm;
1096     }
1097     return @items;
1098 }
1099
1100 =head2 GetNumberOfImportBatches
1101
1102   my $count = GetNumberOfImportBatches();
1103
1104 =cut
1105
1106 sub GetNumberOfNonZ3950ImportBatches {
1107     my $dbh = C4::Context->dbh;
1108     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1109     $sth->execute();
1110     my ($count) = $sth->fetchrow_array();
1111     $sth->finish();
1112     return $count;
1113 }
1114
1115 =head2 GetImportBiblios
1116
1117   my $results = GetImportBiblios($importid);
1118
1119 =cut
1120
1121 sub GetImportBiblios {
1122     my ($import_record_id) = @_;
1123
1124     my $dbh = C4::Context->dbh;
1125     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1126     return $dbh->selectall_arrayref(
1127         $query,
1128         { Slice => {} },
1129         $import_record_id
1130     );
1131
1132 }
1133
1134 =head2 GetImportRecordsRange
1135
1136   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1137
1138 Returns a reference to an array of hash references corresponding to
1139 import_biblios/import_auths/import_records rows for a given batch
1140 starting at the given offset.
1141
1142 =cut
1143
1144 sub GetImportRecordsRange {
1145     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1146
1147     my $dbh = C4::Context->dbh;
1148
1149     my $order_by = $parameters->{order_by} || 'import_record_id';
1150     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1151
1152     my $order_by_direction =
1153       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1154
1155     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1156
1157     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1158                                            record_sequence, status, overlay_status,
1159                                            matched_biblionumber, matched_authid, record_type
1160                                     FROM   import_records
1161                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1162                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1163                                     WHERE  import_batch_id = ?";
1164     my @params;
1165     push(@params, $batch_id);
1166     if ($status) {
1167         $query .= " AND status=?";
1168         push(@params,$status);
1169     }
1170
1171     $query.=" ORDER BY $order_by $order_by_direction";
1172
1173     if($results_per_group){
1174         $query .= " LIMIT ?";
1175         push(@params, $results_per_group);
1176     }
1177     if($offset){
1178         $query .= " OFFSET ?";
1179         push(@params, $offset);
1180     }
1181     my $sth = $dbh->prepare_cached($query);
1182     $sth->execute(@params);
1183     my $results = $sth->fetchall_arrayref({});
1184     $sth->finish();
1185     return $results;
1186
1187 }
1188
1189 =head2 GetBestRecordMatch
1190
1191   my $record_id = GetBestRecordMatch($import_record_id);
1192
1193 =cut
1194
1195 sub GetBestRecordMatch {
1196     my ($import_record_id) = @_;
1197
1198     my $dbh = C4::Context->dbh;
1199     my $sth = $dbh->prepare("SELECT candidate_match_id
1200                              FROM   import_record_matches
1201                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1202                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1203                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1204                              WHERE  import_record_matches.import_record_id = ? AND
1205                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1206                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1207                              AND chosen = 1
1208                              ORDER BY score DESC, candidate_match_id DESC");
1209     $sth->execute($import_record_id);
1210     my ($record_id) = $sth->fetchrow_array();
1211     $sth->finish();
1212     return $record_id;
1213 }
1214
1215 =head2 GetImportBatchStatus
1216
1217   my $status = GetImportBatchStatus($batch_id);
1218
1219 =cut
1220
1221 sub GetImportBatchStatus {
1222     my ($batch_id) = @_;
1223
1224     my $dbh = C4::Context->dbh;
1225     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1226     $sth->execute($batch_id);
1227     my ($status) = $sth->fetchrow_array();
1228     $sth->finish();
1229     return $status;
1230
1231 }
1232
1233 =head2 SetImportBatchStatus
1234
1235   SetImportBatchStatus($batch_id, $new_status);
1236
1237 =cut
1238
1239 sub SetImportBatchStatus {
1240     my ($batch_id, $new_status) = @_;
1241
1242     my $dbh = C4::Context->dbh;
1243     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1244     $sth->execute($new_status, $batch_id);
1245     $sth->finish();
1246
1247 }
1248
1249 =head2 SetMatchedBiblionumber
1250
1251   SetMatchedBiblionumber($import_record_id, $biblionumber);
1252
1253 =cut
1254
1255 sub SetMatchedBiblionumber {
1256     my ($import_record_id, $biblionumber) = @_;
1257
1258     my $dbh = C4::Context->dbh;
1259     $dbh->do(
1260         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1261         undef, $biblionumber, $import_record_id
1262     );
1263 }
1264
1265 =head2 GetImportBatchOverlayAction
1266
1267   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1268
1269 =cut
1270
1271 sub GetImportBatchOverlayAction {
1272     my ($batch_id) = @_;
1273
1274     my $dbh = C4::Context->dbh;
1275     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1276     $sth->execute($batch_id);
1277     my ($overlay_action) = $sth->fetchrow_array();
1278     $sth->finish();
1279     return $overlay_action;
1280
1281 }
1282
1283
1284 =head2 SetImportBatchOverlayAction
1285
1286   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1287
1288 =cut
1289
1290 sub SetImportBatchOverlayAction {
1291     my ($batch_id, $new_overlay_action) = @_;
1292
1293     my $dbh = C4::Context->dbh;
1294     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1295     $sth->execute($new_overlay_action, $batch_id);
1296     $sth->finish();
1297
1298 }
1299
1300 =head2 GetImportBatchNoMatchAction
1301
1302   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1303
1304 =cut
1305
1306 sub GetImportBatchNoMatchAction {
1307     my ($batch_id) = @_;
1308
1309     my $dbh = C4::Context->dbh;
1310     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1311     $sth->execute($batch_id);
1312     my ($nomatch_action) = $sth->fetchrow_array();
1313     $sth->finish();
1314     return $nomatch_action;
1315
1316 }
1317
1318
1319 =head2 SetImportBatchNoMatchAction
1320
1321   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1322
1323 =cut
1324
1325 sub SetImportBatchNoMatchAction {
1326     my ($batch_id, $new_nomatch_action) = @_;
1327
1328     my $dbh = C4::Context->dbh;
1329     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1330     $sth->execute($new_nomatch_action, $batch_id);
1331     $sth->finish();
1332
1333 }
1334
1335 =head2 GetImportBatchItemAction
1336
1337   my $item_action = GetImportBatchItemAction($batch_id);
1338
1339 =cut
1340
1341 sub GetImportBatchItemAction {
1342     my ($batch_id) = @_;
1343
1344     my $dbh = C4::Context->dbh;
1345     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1346     $sth->execute($batch_id);
1347     my ($item_action) = $sth->fetchrow_array();
1348     $sth->finish();
1349     return $item_action;
1350
1351 }
1352
1353
1354 =head2 SetImportBatchItemAction
1355
1356   SetImportBatchItemAction($batch_id, $new_item_action);
1357
1358 =cut
1359
1360 sub SetImportBatchItemAction {
1361     my ($batch_id, $new_item_action) = @_;
1362
1363     my $dbh = C4::Context->dbh;
1364     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1365     $sth->execute($new_item_action, $batch_id);
1366     $sth->finish();
1367
1368 }
1369
1370 =head2 GetImportBatchMatcher
1371
1372   my $matcher_id = GetImportBatchMatcher($batch_id);
1373
1374 =cut
1375
1376 sub GetImportBatchMatcher {
1377     my ($batch_id) = @_;
1378
1379     my $dbh = C4::Context->dbh;
1380     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1381     $sth->execute($batch_id);
1382     my ($matcher_id) = $sth->fetchrow_array();
1383     $sth->finish();
1384     return $matcher_id;
1385
1386 }
1387
1388
1389 =head2 SetImportBatchMatcher
1390
1391   SetImportBatchMatcher($batch_id, $new_matcher_id);
1392
1393 =cut
1394
1395 sub SetImportBatchMatcher {
1396     my ($batch_id, $new_matcher_id) = @_;
1397
1398     my $dbh = C4::Context->dbh;
1399     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1400     $sth->execute($new_matcher_id, $batch_id);
1401     $sth->finish();
1402
1403 }
1404
1405 =head2 GetImportRecordOverlayStatus
1406
1407   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1408
1409 =cut
1410
1411 sub GetImportRecordOverlayStatus {
1412     my ($import_record_id) = @_;
1413
1414     my $dbh = C4::Context->dbh;
1415     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1416     $sth->execute($import_record_id);
1417     my ($overlay_status) = $sth->fetchrow_array();
1418     $sth->finish();
1419     return $overlay_status;
1420
1421 }
1422
1423
1424 =head2 SetImportRecordOverlayStatus
1425
1426   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1427
1428 =cut
1429
1430 sub SetImportRecordOverlayStatus {
1431     my ($import_record_id, $new_overlay_status) = @_;
1432
1433     my $dbh = C4::Context->dbh;
1434     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1435     $sth->execute($new_overlay_status, $import_record_id);
1436     $sth->finish();
1437
1438 }
1439
1440 =head2 GetImportRecordStatus
1441
1442   my $status = GetImportRecordStatus($import_record_id);
1443
1444 =cut
1445
1446 sub GetImportRecordStatus {
1447     my ($import_record_id) = @_;
1448
1449     my $dbh = C4::Context->dbh;
1450     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1451     $sth->execute($import_record_id);
1452     my ($status) = $sth->fetchrow_array();
1453     $sth->finish();
1454     return $status;
1455
1456 }
1457
1458
1459 =head2 SetImportRecordStatus
1460
1461   SetImportRecordStatus($import_record_id, $new_status);
1462
1463 =cut
1464
1465 sub SetImportRecordStatus {
1466     my ($import_record_id, $new_status) = @_;
1467
1468     my $dbh = C4::Context->dbh;
1469     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1470     $sth->execute($new_status, $import_record_id);
1471     $sth->finish();
1472
1473 }
1474
1475 =head2 GetImportRecordMatches
1476
1477   my $results = GetImportRecordMatches($import_record_id, $best_only);
1478
1479 =cut
1480
1481 sub GetImportRecordMatches {
1482     my $import_record_id = shift;
1483     my $best_only = @_ ? shift : 0;
1484
1485     my $dbh = C4::Context->dbh;
1486     # FIXME currently biblio only
1487     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1488                                     candidate_match_id, score, record_type,
1489                                     chosen
1490                                     FROM import_records
1491                                     JOIN import_record_matches USING (import_record_id)
1492                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1493                                     WHERE import_record_id = ?
1494                                     ORDER BY score DESC, biblionumber DESC");
1495     $sth->bind_param(1, $import_record_id);
1496     my $results = [];
1497     $sth->execute();
1498     while (my $row = $sth->fetchrow_hashref) {
1499         if ($row->{'record_type'} eq 'auth') {
1500             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1501         }
1502         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1503         push @$results, $row;
1504         last if $best_only;
1505     }
1506     $sth->finish();
1507
1508     return $results;
1509     
1510 }
1511
1512 =head2 SetImportRecordMatches
1513
1514   SetImportRecordMatches($import_record_id, @matches);
1515
1516 =cut
1517
1518 sub SetImportRecordMatches {
1519     my $import_record_id = shift;
1520     my @matches = @_;
1521
1522     my $dbh = C4::Context->dbh;
1523     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1524     $delsth->execute($import_record_id);
1525     $delsth->finish();
1526
1527     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1528                                     VALUES (?, ?, ?, ?)");
1529     my $chosen = 1; #The first match is defaulted to be chosen
1530     foreach my $match (@matches) {
1531         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1532         $chosen = 0; #After the first we do not default to other matches
1533     }
1534 }
1535
1536 =head2 RecordsFromISO2709File
1537
1538     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1539
1540 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1541
1542 @PARAM1, String, absolute path to the ISO2709 file.
1543 @PARAM2, String, see stage_file.pl
1544 @PARAM3, String, should be utf8
1545
1546 Returns two array refs.
1547
1548 =cut
1549
1550 sub RecordsFromISO2709File {
1551     my ($input_file, $record_type, $encoding) = @_;
1552     my @errors;
1553
1554     my $marc_type = C4::Context->preference('marcflavour');
1555     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1556
1557     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1558     my @marc_records;
1559     $/ = "\035";
1560     while (<$fh>) {
1561         s/^\s+//;
1562         s/\s+$//;
1563         next unless $_; # skip if record has only whitespace, as might occur
1564                         # if file includes newlines between each MARC record
1565         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1566         push @marc_records, $marc_record;
1567         if ($charset_guessed ne $encoding) {
1568             push @errors,
1569                 "Unexpected charset $charset_guessed, expecting $encoding";
1570         }
1571     }
1572     close $fh;
1573     return ( \@errors, \@marc_records );
1574 }
1575
1576 =head2 RecordsFromMARCXMLFile
1577
1578     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1579
1580 Creates MARC::Record-objects out of the given MARCXML-file.
1581
1582 @PARAM1, String, absolute path to the ISO2709 file.
1583 @PARAM2, String, should be utf8
1584
1585 Returns two array refs.
1586
1587 =cut
1588
1589 sub RecordsFromMARCXMLFile {
1590     my ( $filename, $encoding ) = @_;
1591     my $batch = MARC::File::XML->in( $filename );
1592     my ( @marcRecords, @errors, $record );
1593     do {
1594         eval { $record = $batch->next( $encoding ); };
1595         if ($@) {
1596             push @errors, $@;
1597         }
1598         push @marcRecords, $record if $record;
1599     } while( $record );
1600     return (\@errors, \@marcRecords);
1601 }
1602
1603 =head2 RecordsFromMarcPlugin
1604
1605     Converts text of input_file into array of MARC records with to_marc plugin
1606
1607 =cut
1608
1609 sub RecordsFromMarcPlugin {
1610     my ($input_file, $plugin_class, $encoding) = @_;
1611     my ( $text, @return );
1612     return \@return if !$input_file || !$plugin_class;
1613
1614     # Read input file
1615     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1616     $/ = "\035";
1617     while (<$fh>) {
1618         s/^\s+//;
1619         s/\s+$//;
1620         next unless $_;
1621         $text .= $_;
1622     }
1623     close $fh;
1624
1625     # Convert to large MARC blob with plugin
1626     $text = Koha::Plugins::Handler->run({
1627         class  => $plugin_class,
1628         method => 'to_marc',
1629         params => { data => $text },
1630     }) if $text;
1631
1632     # Convert to array of MARC records
1633     if( $text ) {
1634         my $marc_type = C4::Context->preference('marcflavour');
1635         foreach my $blob ( split(/\x1D/, $text) ) {
1636             next if $blob =~ /^\s*$/;
1637             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1638             push @return, $marcrecord;
1639         }
1640     }
1641     return \@return;
1642 }
1643
1644 # internal functions
1645
1646 sub _create_import_record {
1647     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1648
1649     my $dbh = C4::Context->dbh;
1650     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1651                                                          record_type, encoding)
1652                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1653     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1654                   $record_type, $encoding);
1655     my $import_record_id = $dbh->{'mysql_insertid'};
1656     $sth->finish();
1657     return $import_record_id;
1658 }
1659
1660 sub _update_import_record_marc {
1661     my ($import_record_id, $marc_record, $marc_type) = @_;
1662
1663     my $dbh = C4::Context->dbh;
1664     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1665                              WHERE  import_record_id = ?");
1666     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1667     $sth->finish();
1668 }
1669
1670 sub _add_auth_fields {
1671     my ($import_record_id, $marc_record) = @_;
1672
1673     my $controlnumber;
1674     if ($marc_record->field('001')) {
1675         $controlnumber = $marc_record->field('001')->data();
1676     }
1677     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1678     my $dbh = C4::Context->dbh;
1679     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1680     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1681     $sth->finish();
1682 }
1683
1684 sub _add_biblio_fields {
1685     my ($import_record_id, $marc_record) = @_;
1686
1687     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1688     my $dbh = C4::Context->dbh;
1689     # FIXME no controlnumber, originalsource
1690     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1691     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1692     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1693     $sth->finish();
1694                 
1695 }
1696
1697 sub _update_biblio_fields {
1698     my ($import_record_id, $marc_record) = @_;
1699
1700     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1701     my $dbh = C4::Context->dbh;
1702     # FIXME no controlnumber, originalsource
1703     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1704     $isbn =~ s/\(.*$//;
1705     $isbn =~ tr/ -_//;
1706     $isbn = uc $isbn;
1707     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1708                              WHERE  import_record_id = ?");
1709     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1710     $sth->finish();
1711 }
1712
1713 sub _parse_biblio_fields {
1714     my ($marc_record) = @_;
1715
1716     my $dbh = C4::Context->dbh;
1717     my $bibliofields = TransformMarcToKoha({ record => $marc_record, kohafields => ['biblio.title','biblio.author','biblioitems.isbn','biblioitems.issn'] });
1718     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1719
1720 }
1721
1722 sub _update_batch_record_counts {
1723     my ($batch_id) = @_;
1724
1725     my $dbh = C4::Context->dbh;
1726     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1727                                         num_records = (
1728                                             SELECT COUNT(*)
1729                                             FROM import_records
1730                                             WHERE import_batch_id = import_batches.import_batch_id),
1731                                         num_items = (
1732                                             SELECT COUNT(*)
1733                                             FROM import_records
1734                                             JOIN import_items USING (import_record_id)
1735                                             WHERE import_batch_id = import_batches.import_batch_id
1736                                             AND record_type = 'biblio')
1737                                     WHERE import_batch_id = ?");
1738     $sth->bind_param(1, $batch_id);
1739     $sth->execute();
1740     $sth->finish();
1741 }
1742
1743 sub _get_commit_action {
1744     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1745     
1746     if ($record_type eq 'biblio') {
1747         my ($bib_result, $bib_match, $item_result);
1748
1749         $bib_match = GetBestRecordMatch($import_record_id);
1750         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1751
1752             $bib_result = $overlay_action;
1753
1754             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1755                 $item_result = 'create_new';
1756             } elsif($item_action eq 'replace'){
1757                 $item_result = 'replace';
1758             } else {
1759                 $item_result = 'ignore';
1760             }
1761
1762         } else {
1763             $bib_result = $nomatch_action;
1764             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1765         }
1766         return ($bib_result, $item_result, $bib_match);
1767     } else { # must be auths
1768         my ($auth_result, $auth_match);
1769
1770         $auth_match = GetBestRecordMatch($import_record_id);
1771         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1772             $auth_result = $overlay_action;
1773         } else {
1774             $auth_result = $nomatch_action;
1775         }
1776
1777         return ($auth_result, undef, $auth_match);
1778
1779     }
1780 }
1781
1782 sub _get_revert_action {
1783     my ($overlay_action, $overlay_status, $status) = @_;
1784
1785     my $bib_result;
1786
1787     if ($status eq 'ignored') {
1788         $bib_result = 'ignore';
1789     } else {
1790         if ($overlay_action eq 'create_new') {
1791             $bib_result = 'delete';
1792         } else {
1793             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1794         }
1795     }
1796     return $bib_result;
1797 }
1798
1799 1;
1800 __END__
1801
1802 =head1 AUTHOR
1803
1804 Koha Development Team <http://koha-community.org/>
1805
1806 Galen Charlton <galen.charlton@liblime.com>
1807
1808 =cut