Bug 30778: Remove ModBiblioInBatch
[srvgit] / C4 / ImportBatch.pm
1 package C4::ImportBatch;
2
3 # Copyright (C) 2007 LibLime, 2012 C & P Bibliography Services
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use strict;
21 use warnings;
22
23 use C4::Context;
24 use C4::Koha qw( GetNormalizedISBN );
25 use C4::Biblio qw(
26     AddBiblio
27     DelBiblio
28     GetMarcFromKohaField
29     GetXmlBiblio
30     ModBiblio
31     TransformMarcToKoha
32 );
33 use C4::Items qw( AddItemFromMarc ModItemFromMarc );
34 use C4::Charset qw( MarcToUTF8Record SetUTF8Flag StripNonXmlChars );
35 use C4::AuthoritiesMarc qw( AddAuthority GuessAuthTypeCode GetAuthorityXML ModAuthority DelAuthority );
36 use C4::MarcModificationTemplates qw( ModifyRecordWithTemplate );
37 use Koha::Items;
38 use Koha::Plugins::Handler;
39 use Koha::Logger;
40
41 our (@ISA, @EXPORT_OK);
42 BEGIN {
43     require Exporter;
44     @ISA       = qw(Exporter);
45     @EXPORT_OK = qw(
46       GetZ3950BatchId
47       GetWebserviceBatchId
48       GetImportRecordMarc
49       GetImportRecordMarcXML
50       GetRecordFromImportBiblio
51       AddImportBatch
52       GetImportBatch
53       AddAuthToBatch
54       AddBiblioToBatch
55       AddItemsToImportBiblio
56       ModAuthorityInBatch
57
58       BatchStageMarcRecords
59       BatchFindDuplicates
60       BatchCommitRecords
61       BatchRevertRecords
62       CleanBatch
63       DeleteBatch
64
65       GetAllImportBatches
66       GetStagedWebserviceBatches
67       GetImportBatchRangeDesc
68       GetNumberOfNonZ3950ImportBatches
69       GetImportBiblios
70       GetImportRecordsRange
71       GetItemNumbersFromImportBatch
72
73       GetImportBatchStatus
74       SetImportBatchStatus
75       GetImportBatchOverlayAction
76       SetImportBatchOverlayAction
77       GetImportBatchNoMatchAction
78       SetImportBatchNoMatchAction
79       GetImportBatchItemAction
80       SetImportBatchItemAction
81       GetImportBatchMatcher
82       SetImportBatchMatcher
83       GetImportRecordOverlayStatus
84       SetImportRecordOverlayStatus
85       GetImportRecordStatus
86       SetImportRecordStatus
87       SetMatchedBiblionumber
88       GetImportRecordMatches
89       SetImportRecordMatches
90
91       RecordsFromMARCXMLFile
92       RecordsFromISO2709File
93       RecordsFromMarcPlugin
94     );
95 }
96
97 =head1 NAME
98
99 C4::ImportBatch - manage batches of imported MARC records
100
101 =head1 SYNOPSIS
102
103 use C4::ImportBatch;
104
105 =head1 FUNCTIONS
106
107 =head2 GetZ3950BatchId
108
109   my $batchid = GetZ3950BatchId($z3950server);
110
111 Retrieves the ID of the import batch for the Z39.50
112 reservoir for the given target.  If necessary,
113 creates the import batch.
114
115 =cut
116
117 sub GetZ3950BatchId {
118     my ($z3950server) = @_;
119
120     my $dbh = C4::Context->dbh;
121     my $sth = $dbh->prepare("SELECT import_batch_id FROM import_batches
122                              WHERE  batch_type = 'z3950'
123                              AND    file_name = ?");
124     $sth->execute($z3950server);
125     my $rowref = $sth->fetchrow_arrayref();
126     $sth->finish();
127     if (defined $rowref) {
128         return $rowref->[0];
129     } else {
130         my $batch_id = AddImportBatch( {
131                 overlay_action => 'create_new',
132                 import_status => 'staged',
133                 batch_type => 'z3950',
134                 file_name => $z3950server,
135             } );
136         return $batch_id;
137     }
138     
139 }
140
141 =head2 GetWebserviceBatchId
142
143   my $batchid = GetWebserviceBatchId();
144
145 Retrieves the ID of the import batch for webservice.
146 If necessary, creates the import batch.
147
148 =cut
149
150 my $WEBSERVICE_BASE_QRY = <<EOQ;
151 SELECT import_batch_id FROM import_batches
152 WHERE  batch_type = 'webservice'
153 AND    import_status = 'staged'
154 EOQ
155 sub GetWebserviceBatchId {
156     my ($params) = @_;
157
158     my $dbh = C4::Context->dbh;
159     my $sql = $WEBSERVICE_BASE_QRY;
160     my @args;
161     foreach my $field (qw(matcher_id overlay_action nomatch_action item_action)) {
162         if (my $val = $params->{$field}) {
163             $sql .= " AND $field = ?";
164             push @args, $val;
165         }
166     }
167     my $id = $dbh->selectrow_array($sql, undef, @args);
168     return $id if $id;
169
170     $params->{batch_type} = 'webservice';
171     $params->{import_status} = 'staged';
172     return AddImportBatch($params);
173 }
174
175 =head2 GetImportRecordMarc
176
177   my ($marcblob, $encoding) = GetImportRecordMarc($import_record_id);
178
179 =cut
180
181 sub GetImportRecordMarc {
182     my ($import_record_id) = @_;
183
184     my $dbh = C4::Context->dbh;
185     my ( $marc, $encoding ) = $dbh->selectrow_array(q|
186         SELECT marc, encoding
187         FROM import_records
188         WHERE import_record_id = ?
189     |, undef, $import_record_id );
190
191     return $marc, $encoding;
192 }
193
194 sub GetRecordFromImportBiblio {
195     my ( $import_record_id, $embed_items ) = @_;
196
197     my ($marc) = GetImportRecordMarc($import_record_id);
198     my $record = MARC::Record->new_from_usmarc($marc);
199
200     EmbedItemsInImportBiblio( $record, $import_record_id ) if $embed_items;
201
202     return $record;
203 }
204
205 sub EmbedItemsInImportBiblio {
206     my ( $record, $import_record_id ) = @_;
207     my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
208     my $dbh = C4::Context->dbh;
209     my $import_items = $dbh->selectall_arrayref(q|
210         SELECT import_items.marcxml
211         FROM import_items
212         WHERE import_record_id = ?
213     |, { Slice => {} }, $import_record_id );
214     my @item_fields;
215     for my $import_item ( @$import_items ) {
216         my $item_marc = MARC::Record::new_from_xml($import_item->{marcxml}, 'UTF-8');
217         push @item_fields, $item_marc->field($itemtag);
218     }
219     $record->append_fields(@item_fields);
220     return $record;
221 }
222
223 =head2 GetImportRecordMarcXML
224
225   my $marcxml = GetImportRecordMarcXML($import_record_id);
226
227 =cut
228
229 sub GetImportRecordMarcXML {
230     my ($import_record_id) = @_;
231
232     my $dbh = C4::Context->dbh;
233     my $sth = $dbh->prepare("SELECT marcxml FROM import_records WHERE import_record_id = ?");
234     $sth->execute($import_record_id);
235     my ($marcxml) = $sth->fetchrow();
236     $sth->finish();
237     return $marcxml;
238
239 }
240
241 =head2 AddImportBatch
242
243   my $batch_id = AddImportBatch($params_hash);
244
245 =cut
246
247 sub AddImportBatch {
248     my ($params) = @_;
249
250     my (@fields, @vals);
251     foreach (qw( matcher_id template_id branchcode
252                  overlay_action nomatch_action item_action
253                  import_status batch_type file_name comments record_type )) {
254         if (exists $params->{$_}) {
255             push @fields, $_;
256             push @vals, $params->{$_};
257         }
258     }
259     my $dbh = C4::Context->dbh;
260     $dbh->do("INSERT INTO import_batches (".join( ',', @fields).")
261                                   VALUES (".join( ',', map '?', @fields).")",
262              undef,
263              @vals);
264     return $dbh->{'mysql_insertid'};
265 }
266
267 =head2 GetImportBatch 
268
269   my $row = GetImportBatch($batch_id);
270
271 Retrieve a hashref of an import_batches row.
272
273 =cut
274
275 sub GetImportBatch {
276     my ($batch_id) = @_;
277
278     my $dbh = C4::Context->dbh;
279     my $sth = $dbh->prepare_cached("SELECT b.*, p.name as profile FROM import_batches b LEFT JOIN import_batch_profiles p ON p.id = b.profile_id WHERE import_batch_id = ?");
280     $sth->bind_param(1, $batch_id);
281     $sth->execute();
282     my $result = $sth->fetchrow_hashref;
283     $sth->finish();
284     return $result;
285
286 }
287
288 =head2 AddBiblioToBatch 
289
290   my $import_record_id = AddBiblioToBatch($batch_id, $record_sequence, 
291                 $marc_record, $encoding, $update_counts);
292
293 =cut
294
295 sub AddBiblioToBatch {
296     my $batch_id = shift;
297     my $record_sequence = shift;
298     my $marc_record = shift;
299     my $encoding = shift;
300     my $update_counts = @_ ? shift : 1;
301
302     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'biblio', $encoding, C4::Context->preference('marcflavour'));
303     _add_biblio_fields($import_record_id, $marc_record);
304     _update_batch_record_counts($batch_id) if $update_counts;
305     return $import_record_id;
306 }
307
308 =head2 AddAuthToBatch
309
310   my $import_record_id = AddAuthToBatch($batch_id, $record_sequence,
311                 $marc_record, $encoding, $update_counts, [$marc_type]);
312
313 =cut
314
315 sub AddAuthToBatch {
316     my $batch_id = shift;
317     my $record_sequence = shift;
318     my $marc_record = shift;
319     my $encoding = shift;
320     my $update_counts = @_ ? shift : 1;
321     my $marc_type = shift || C4::Context->preference('marcflavour');
322
323     $marc_type = 'UNIMARCAUTH' if $marc_type eq 'UNIMARC';
324
325     my $import_record_id = _create_import_record($batch_id, $record_sequence, $marc_record, 'auth', $encoding, $marc_type);
326     _add_auth_fields($import_record_id, $marc_record);
327     _update_batch_record_counts($batch_id) if $update_counts;
328     return $import_record_id;
329 }
330
331 =head2 ModAuthInBatch
332
333   ModAuthInBatch($import_record_id, $marc_record);
334
335 =cut
336
337 sub ModAuthInBatch {
338     my ($import_record_id, $marc_record) = @_;
339
340     my $marcflavour = C4::Context->preference('marcflavour');
341     _update_import_record_marc($import_record_id, $marc_record, $marcflavour eq 'UNIMARC' ? 'UNIMARCAUTH' : 'USMARC');
342
343 }
344
345 =head2 BatchStageMarcRecords
346
347 ( $batch_id, $num_records, $num_items, @invalid_records ) =
348   BatchStageMarcRecords(
349     $record_type,                $encoding,
350     $marc_records,               $file_name,
351     $marc_modification_template, $comments,
352     $branch_code,                $parse_items,
353     $leave_as_staging,           $progress_interval,
354     $progress_callback
355   );
356
357 =cut
358
359 sub BatchStageMarcRecords {
360     my $record_type = shift;
361     my $encoding = shift;
362     my $marc_records = shift;
363     my $file_name = shift;
364     my $marc_modification_template = shift;
365     my $comments = shift;
366     my $branch_code = shift;
367     my $parse_items = shift;
368     my $leave_as_staging = shift;
369
370     # optional callback to monitor status 
371     # of job
372     my $progress_interval = 0;
373     my $progress_callback = undef;
374     if ($#_ == 1) {
375         $progress_interval = shift;
376         $progress_callback = shift;
377         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
378         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
379     } 
380     
381     my $batch_id = AddImportBatch( {
382             overlay_action => 'create_new',
383             import_status => 'staging',
384             batch_type => 'batch',
385             file_name => $file_name,
386             comments => $comments,
387             record_type => $record_type,
388         } );
389     if ($parse_items) {
390         SetImportBatchItemAction($batch_id, 'always_add');
391     } else {
392         SetImportBatchItemAction($batch_id, 'ignore');
393     }
394
395
396     my $marc_type = C4::Context->preference('marcflavour');
397     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
398     my @invalid_records = ();
399     my $num_valid = 0;
400     my $num_items = 0;
401     # FIXME - for now, we're dealing only with bibs
402     my $rec_num = 0;
403     foreach my $marc_record (@$marc_records) {
404         $rec_num++;
405         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
406             &$progress_callback($rec_num);
407         }
408
409         ModifyRecordWithTemplate( $marc_modification_template, $marc_record ) if ( $marc_modification_template );
410
411         my $import_record_id;
412         if (scalar($marc_record->fields()) == 0) {
413             push @invalid_records, $marc_record;
414         } else {
415
416             # Normalize the record so it doesn't have separated diacritics
417             SetUTF8Flag($marc_record);
418
419             $num_valid++;
420             if ($record_type eq 'biblio') {
421                 $import_record_id = AddBiblioToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0);
422                 if ($parse_items) {
423                     my @import_items_ids = AddItemsToImportBiblio($batch_id, $import_record_id, $marc_record, 0);
424                     $num_items += scalar(@import_items_ids);
425                 }
426             } elsif ($record_type eq 'auth') {
427                 $import_record_id = AddAuthToBatch($batch_id, $rec_num, $marc_record, $encoding, int(rand(99999)), 0, $marc_type);
428             }
429         }
430     }
431     unless ($leave_as_staging) {
432         SetImportBatchStatus($batch_id, 'staged');
433     }
434     # FIXME branch_code, number of bibs, number of items
435     _update_batch_record_counts($batch_id);
436     return ($batch_id, $num_valid, $num_items, @invalid_records);
437 }
438
439 =head2 AddItemsToImportBiblio
440
441   my @import_items_ids = AddItemsToImportBiblio($batch_id, 
442                 $import_record_id, $marc_record, $update_counts);
443
444 =cut
445
446 sub AddItemsToImportBiblio {
447     my $batch_id = shift;
448     my $import_record_id = shift;
449     my $marc_record = shift;
450     my $update_counts = @_ ? shift : 0;
451
452     my @import_items_ids = ();
453    
454     my $dbh = C4::Context->dbh; 
455     my ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
456     foreach my $item_field ($marc_record->field($item_tag)) {
457         my $item_marc = MARC::Record->new();
458         $item_marc->leader("00000    a              "); # must set Leader/09 to 'a'
459         $item_marc->append_fields($item_field);
460         $marc_record->delete_field($item_field);
461         my $sth = $dbh->prepare_cached("INSERT INTO import_items (import_record_id, status, marcxml)
462                                         VALUES (?, ?, ?)");
463         $sth->bind_param(1, $import_record_id);
464         $sth->bind_param(2, 'staged');
465         $sth->bind_param(3, $item_marc->as_xml("USMARC"));
466         $sth->execute();
467         push @import_items_ids, $dbh->{'mysql_insertid'};
468         $sth->finish();
469     }
470
471     if ($#import_items_ids > -1) {
472         _update_batch_record_counts($batch_id) if $update_counts;
473         _update_import_record_marc($import_record_id, $marc_record, C4::Context->preference('marcflavour'));
474     }
475     return @import_items_ids;
476 }
477
478 =head2 BatchFindDuplicates
479
480   my $num_with_matches = BatchFindDuplicates($batch_id, $matcher,
481              $max_matches, $progress_interval, $progress_callback);
482
483 Goes through the records loaded in the batch and attempts to 
484 find duplicates for each one.  Sets the matching status 
485 of each record to "no_match" or "auto_match" as appropriate.
486
487 The $max_matches parameter is optional; if it is not supplied,
488 it defaults to 10.
489
490 The $progress_interval and $progress_callback parameters are 
491 optional; if both are supplied, the sub referred to by
492 $progress_callback will be invoked every $progress_interval
493 records using the number of records processed as the 
494 singular argument.
495
496 =cut
497
498 sub BatchFindDuplicates {
499     my $batch_id = shift;
500     my $matcher = shift;
501     my $max_matches = @_ ? shift : 10;
502
503     # optional callback to monitor status 
504     # of job
505     my $progress_interval = 0;
506     my $progress_callback = undef;
507     if ($#_ == 1) {
508         $progress_interval = shift;
509         $progress_callback = shift;
510         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
511         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
512     }
513
514     my $dbh = C4::Context->dbh;
515
516     my $sth = $dbh->prepare("SELECT import_record_id, record_type, marc
517                              FROM import_records
518                              WHERE import_batch_id = ?");
519     $sth->execute($batch_id);
520     my $num_with_matches = 0;
521     my $rec_num = 0;
522     while (my $rowref = $sth->fetchrow_hashref) {
523         $rec_num++;
524         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
525             &$progress_callback($rec_num);
526         }
527         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
528         my @matches = ();
529         if (defined $matcher) {
530             @matches = $matcher->get_matches($marc_record, $max_matches);
531         }
532         if (scalar(@matches) > 0) {
533             $num_with_matches++;
534             SetImportRecordMatches($rowref->{'import_record_id'}, @matches);
535             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'auto_match');
536         } else {
537             SetImportRecordMatches($rowref->{'import_record_id'}, ());
538             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'no_match');
539         }
540     }
541     $sth->finish();
542     return $num_with_matches;
543 }
544
545 =head2 BatchCommitRecords
546
547   my ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored) =
548         BatchCommitRecords($batch_id, $framework,
549         $progress_interval, $progress_callback);
550
551 =cut
552
553 sub BatchCommitRecords {
554     my $batch_id = shift;
555     my $framework = shift;
556
557     # optional callback to monitor status 
558     # of job
559     my $progress_interval = 0;
560     my $progress_callback = undef;
561     if ($#_ == 1) {
562         $progress_interval = shift;
563         $progress_callback = shift;
564         $progress_interval = 0 unless $progress_interval =~ /^\d+$/ and $progress_interval > 0;
565         $progress_interval = 0 unless 'CODE' eq ref $progress_callback;
566     }
567
568     my $record_type;
569     my $num_added = 0;
570     my $num_updated = 0;
571     my $num_items_added = 0;
572     my $num_items_replaced = 0;
573     my $num_items_errored = 0;
574     my $num_ignored = 0;
575     # commit (i.e., save, all records in the batch)
576     SetImportBatchStatus($batch_id, 'importing');
577     my $overlay_action = GetImportBatchOverlayAction($batch_id);
578     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
579     my $item_action = GetImportBatchItemAction($batch_id);
580     my $item_tag;
581     my $item_subfield;
582     my $dbh = C4::Context->dbh;
583     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marc, encoding
584                              FROM import_records
585                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
586                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
587                              WHERE import_batch_id = ?");
588     $sth->execute($batch_id);
589     my $marcflavour = C4::Context->preference('marcflavour');
590
591     my $userenv = C4::Context->userenv;
592     my $logged_in_patron = Koha::Patrons->find( $userenv->{number} );
593
594     my $rec_num = 0;
595     while (my $rowref = $sth->fetchrow_hashref) {
596         $record_type = $rowref->{'record_type'};
597         $rec_num++;
598         if ($progress_interval and (0 == ($rec_num % $progress_interval))) {
599             &$progress_callback($rec_num);
600         }
601         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'imported') {
602             $num_ignored++;
603             next;
604         }
605
606         my $marc_type;
607         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
608             $marc_type = 'UNIMARCAUTH';
609         } elsif ($marcflavour eq 'UNIMARC') {
610             $marc_type = 'UNIMARC';
611         } else {
612             $marc_type = 'USMARC';
613         }
614         my $marc_record = MARC::Record->new_from_usmarc($rowref->{'marc'});
615
616         if ($record_type eq 'biblio') {
617             # remove any item tags - rely on BatchCommitItems
618             ($item_tag,$item_subfield) = &GetMarcFromKohaField( "items.itemnumber" );
619             foreach my $item_field ($marc_record->field($item_tag)) {
620                 $marc_record->delete_field($item_field);
621             }
622         }
623
624         my ($record_result, $item_result, $record_match) =
625             _get_commit_action($overlay_action, $nomatch_action, $item_action, 
626                                $rowref->{'overlay_status'}, $rowref->{'import_record_id'}, $record_type);
627
628         my $recordid;
629         my $query;
630         if ($record_result eq 'create_new') {
631             $num_added++;
632             if ($record_type eq 'biblio') {
633                 my $biblioitemnumber;
634                 ($recordid, $biblioitemnumber) = AddBiblio($marc_record, $framework);
635                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
636                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
637                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
638                     $num_items_added += $bib_items_added;
639                     $num_items_replaced += $bib_items_replaced;
640                     $num_items_errored += $bib_items_errored;
641                 }
642             } else {
643                 $recordid = AddAuthority($marc_record, undef, GuessAuthTypeCode($marc_record));
644                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
645             }
646             my $sth = $dbh->prepare_cached($query);
647             $sth->execute($recordid, $rowref->{'import_record_id'});
648             $sth->finish();
649             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
650         } elsif ($record_result eq 'replace') {
651             $num_updated++;
652             $recordid = $record_match;
653             my $oldxml;
654             if ($record_type eq 'biblio') {
655                 my $oldbiblio = Koha::Biblios->find( $recordid );
656                 $oldxml = GetXmlBiblio($recordid);
657
658                 # remove item fields so that they don't get
659                 # added again if record is reverted
660                 # FIXME: GetXmlBiblio output should not contain item info any more! So the next foreach should not be needed. Does not hurt either; may remove old 952s that should not have been there anymore.
661                 my $old_marc = MARC::Record->new_from_xml(StripNonXmlChars($oldxml), 'UTF-8', $rowref->{'encoding'}, $marc_type);
662                 foreach my $item_field ($old_marc->field($item_tag)) {
663                     $old_marc->delete_field($item_field);
664                 }
665                 $oldxml = $old_marc->as_xml($marc_type);
666
667                 my $context = { source => 'batchimport' };
668                 if ($logged_in_patron) {
669                     $context->{categorycode} = $logged_in_patron->categorycode;
670                     $context->{userid} = $logged_in_patron->userid;
671                 }
672
673                 ModBiblio($marc_record, $recordid, $oldbiblio->frameworkcode, {
674                     overlay_context => $context
675                 });
676                 $query = "UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"; # FIXME call SetMatchedBiblionumber instead
677
678                 if ($item_result eq 'create_new' || $item_result eq 'replace') {
679                     my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
680                     $num_items_added += $bib_items_added;
681                     $num_items_replaced += $bib_items_replaced;
682                     $num_items_errored += $bib_items_errored;
683                 }
684             } else {
685                 $oldxml = GetAuthorityXML($recordid);
686
687                 ModAuthority($recordid, $marc_record, GuessAuthTypeCode($marc_record));
688                 $query = "UPDATE import_auths SET matched_authid = ? WHERE import_record_id = ?";
689             }
690             my $sth = $dbh->prepare_cached("UPDATE import_records SET marcxml_old = ? WHERE import_record_id = ?");
691             $sth->execute($oldxml, $rowref->{'import_record_id'});
692             $sth->finish();
693             my $sth2 = $dbh->prepare_cached($query);
694             $sth2->execute($recordid, $rowref->{'import_record_id'});
695             $sth2->finish();
696             SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
697             SetImportRecordStatus($rowref->{'import_record_id'}, 'imported');
698         } elsif ($record_result eq 'ignore') {
699             $recordid = $record_match;
700             $num_ignored++;
701             if ($record_type eq 'biblio' and defined $recordid and ( $item_result eq 'create_new' || $item_result eq 'replace' ) ) {
702                 my ($bib_items_added, $bib_items_replaced, $bib_items_errored) = BatchCommitItems($rowref->{'import_record_id'}, $recordid, $item_result);
703                 $num_items_added += $bib_items_added;
704          $num_items_replaced += $bib_items_replaced;
705                 $num_items_errored += $bib_items_errored;
706                 # still need to record the matched biblionumber so that the
707                 # items can be reverted
708                 my $sth2 = $dbh->prepare_cached("UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?"); # FIXME call SetMatchedBiblionumber instead
709                 $sth2->execute($recordid, $rowref->{'import_record_id'});
710                 SetImportRecordOverlayStatus($rowref->{'import_record_id'}, 'match_applied');
711             }
712             SetImportRecordStatus($rowref->{'import_record_id'}, 'ignored');
713         }
714     }
715     $sth->finish();
716     SetImportBatchStatus($batch_id, 'imported');
717     return ($num_added, $num_updated, $num_items_added, $num_items_replaced, $num_items_errored, $num_ignored);
718 }
719
720 =head2 BatchCommitItems
721
722   ($num_items_added, $num_items_errored) = 
723          BatchCommitItems($import_record_id, $biblionumber);
724
725 =cut
726
727 sub BatchCommitItems {
728     my ( $import_record_id, $biblionumber, $action ) = @_;
729
730     my $dbh = C4::Context->dbh;
731
732     my $num_items_added = 0;
733     my $num_items_errored = 0;
734     my $num_items_replaced = 0;
735
736     my $sth = $dbh->prepare( "
737         SELECT import_items_id, import_items.marcxml, encoding
738         FROM import_items
739         JOIN import_records USING (import_record_id)
740         WHERE import_record_id = ?
741         ORDER BY import_items_id
742     " );
743     $sth->bind_param( 1, $import_record_id );
744     $sth->execute();
745
746     while ( my $row = $sth->fetchrow_hashref() ) {
747         my $item_marc = MARC::Record->new_from_xml( StripNonXmlChars( $row->{'marcxml'} ), 'UTF-8', $row->{'encoding'} );
748
749         # Delete date_due subfield as to not accidentally delete item checkout due dates
750         my ( $MARCfield, $MARCsubfield ) = GetMarcFromKohaField( 'items.onloan' );
751         $item_marc->field($MARCfield)->delete_subfield( code => $MARCsubfield );
752
753         my $item = TransformMarcToKoha( $item_marc );
754
755         my $duplicate_barcode = exists( $item->{'barcode'} ) && Koha::Items->find({ barcode => $item->{'barcode'} });
756         my $duplicate_itemnumber = exists( $item->{'itemnumber'} );
757
758         my $updsth = $dbh->prepare("UPDATE import_items SET status = ?, itemnumber = ?, import_error = ? WHERE import_items_id = ?");
759         if ( $action eq "replace" && $duplicate_itemnumber ) {
760             # Duplicate itemnumbers have precedence, that way we can update barcodes by overlaying
761             ModItemFromMarc( $item_marc, $biblionumber, $item->{itemnumber} );
762             $updsth->bind_param( 1, 'imported' );
763             $updsth->bind_param( 2, $item->{itemnumber} );
764             $updsth->bind_param( 3, undef );
765             $updsth->bind_param( 4, $row->{'import_items_id'} );
766             $updsth->execute();
767             $updsth->finish();
768             $num_items_replaced++;
769         } elsif ( $action eq "replace" && $duplicate_barcode ) {
770             my $itemnumber = $duplicate_barcode->itemnumber;
771             ModItemFromMarc( $item_marc, $biblionumber, $itemnumber );
772             $updsth->bind_param( 1, 'imported' );
773             $updsth->bind_param( 2, $item->{itemnumber} );
774             $updsth->bind_param( 3, undef );
775             $updsth->bind_param( 4, $row->{'import_items_id'} );
776             $updsth->execute();
777             $updsth->finish();
778             $num_items_replaced++;
779         } elsif ($duplicate_barcode) {
780             $updsth->bind_param( 1, 'error' );
781             $updsth->bind_param( 2, undef );
782             $updsth->bind_param( 3, 'duplicate item barcode' );
783             $updsth->bind_param( 4, $row->{'import_items_id'} );
784             $updsth->execute();
785             $num_items_errored++;
786         } else {
787             # Remove the itemnumber if it exists, we want to create a new item
788             my ( $itemtag, $itemsubfield ) = GetMarcFromKohaField( "items.itemnumber" );
789             $item_marc->field($itemtag)->delete_subfield( code => $itemsubfield );
790
791             my ( $item_biblionumber, $biblioitemnumber, $itemnumber ) = AddItemFromMarc( $item_marc, $biblionumber );
792             if( $itemnumber ) {
793                 $updsth->bind_param( 1, 'imported' );
794                 $updsth->bind_param( 2, $itemnumber );
795                 $updsth->bind_param( 3, undef );
796                 $updsth->bind_param( 4, $row->{'import_items_id'} );
797                 $updsth->execute();
798                 $updsth->finish();
799                 $num_items_added++;
800             }
801         }
802     }
803
804     return ( $num_items_added, $num_items_replaced, $num_items_errored );
805 }
806
807 =head2 BatchRevertRecords
808
809   my ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, 
810       $num_ignored) = BatchRevertRecords($batch_id);
811
812 =cut
813
814 sub BatchRevertRecords {
815     my $batch_id = shift;
816
817     my $logger = Koha::Logger->get( { category => 'C4.ImportBatch' } );
818
819     $logger->trace("C4::ImportBatch::BatchRevertRecords( $batch_id )");
820
821     my $record_type;
822     my $num_deleted = 0;
823     my $num_errors = 0;
824     my $num_reverted = 0;
825     my $num_ignored = 0;
826     my $num_items_deleted = 0;
827     # commit (i.e., save, all records in the batch)
828     SetImportBatchStatus($batch_id, 'reverting');
829     my $overlay_action = GetImportBatchOverlayAction($batch_id);
830     my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
831     my $dbh = C4::Context->dbh;
832     my $sth = $dbh->prepare("SELECT import_records.import_record_id, record_type, status, overlay_status, marcxml_old, encoding, matched_biblionumber, matched_authid
833                              FROM import_records
834                              LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
835                              LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
836                              WHERE import_batch_id = ?");
837     $sth->execute($batch_id);
838     my $marc_type;
839     my $marcflavour = C4::Context->preference('marcflavour');
840     while (my $rowref = $sth->fetchrow_hashref) {
841         $record_type = $rowref->{'record_type'};
842         if ($rowref->{'status'} eq 'error' or $rowref->{'status'} eq 'reverted') {
843             $num_ignored++;
844             next;
845         }
846         if ($marcflavour eq 'UNIMARC' && $record_type eq 'auth') {
847             $marc_type = 'UNIMARCAUTH';
848         } elsif ($marcflavour eq 'UNIMARC') {
849             $marc_type = 'UNIMARC';
850         } else {
851             $marc_type = 'USMARC';
852         }
853
854         my $record_result = _get_revert_action($overlay_action, $rowref->{'overlay_status'}, $rowref->{'status'});
855
856         if ($record_result eq 'delete') {
857             my $error = undef;
858             if  ($record_type eq 'biblio') {
859                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
860                 $error = DelBiblio($rowref->{'matched_biblionumber'});
861             } else {
862                 DelAuthority({ authid => $rowref->{'matched_authid'} });
863             }
864             if (defined $error) {
865                 $num_errors++;
866             } else {
867                 $num_deleted++;
868                 SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
869             }
870         } elsif ($record_result eq 'restore') {
871             $num_reverted++;
872             my $old_record = MARC::Record->new_from_xml(StripNonXmlChars($rowref->{'marcxml_old'}), 'UTF-8', $rowref->{'encoding'}, $marc_type);
873             if ($record_type eq 'biblio') {
874                 my $biblionumber = $rowref->{'matched_biblionumber'};
875                 my $oldbiblio = Koha::Biblios->find( $biblionumber );
876
877                 $logger->info("C4::ImportBatch::BatchRevertRecords: Biblio record $biblionumber does not exist, restoration of this record was skipped") unless $oldbiblio;
878                 next unless $oldbiblio; # Record has since been deleted. Deleted records should stay deleted.
879
880                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
881                 ModBiblio($old_record, $biblionumber, $oldbiblio->frameworkcode);
882             } else {
883                 my $authid = $rowref->{'matched_authid'};
884                 ModAuthority($authid, $old_record, GuessAuthTypeCode($old_record));
885             }
886             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
887         } elsif ($record_result eq 'ignore') {
888             if ($record_type eq 'biblio') {
889                 $num_items_deleted += BatchRevertItems($rowref->{'import_record_id'}, $rowref->{'matched_biblionumber'});
890             }
891             SetImportRecordStatus($rowref->{'import_record_id'}, 'reverted');
892         }
893         my $query;
894         if ($record_type eq 'biblio') {
895             # remove matched_biblionumber only if there is no 'imported' item left
896             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?"; # FIXME Remove me
897             $query = "UPDATE import_biblios SET matched_biblionumber = NULL WHERE import_record_id = ?  AND NOT EXISTS (SELECT * FROM import_items WHERE import_items.import_record_id=import_biblios.import_record_id and status='imported')";
898         } else {
899             $query = "UPDATE import_auths SET matched_authid = NULL WHERE import_record_id = ?";
900         }
901         my $sth2 = $dbh->prepare_cached($query);
902         $sth2->execute($rowref->{'import_record_id'});
903     }
904
905     $sth->finish();
906     SetImportBatchStatus($batch_id, 'reverted');
907     return ($num_deleted, $num_errors, $num_reverted, $num_items_deleted, $num_ignored);
908 }
909
910 =head2 BatchRevertItems
911
912   my $num_items_deleted = BatchRevertItems($import_record_id, $biblionumber);
913
914 =cut
915
916 sub BatchRevertItems {
917     my ($import_record_id, $biblionumber) = @_;
918
919     my $dbh = C4::Context->dbh;
920     my $num_items_deleted = 0;
921
922     my $sth = $dbh->prepare_cached("SELECT import_items_id, itemnumber
923                                    FROM import_items
924                                    JOIN items USING (itemnumber)
925                                    WHERE import_record_id = ?");
926     $sth->bind_param(1, $import_record_id);
927     $sth->execute();
928     while (my $row = $sth->fetchrow_hashref()) {
929         my $item = Koha::Items->find($row->{itemnumber});
930         if ($item->safe_delete){
931             my $updsth = $dbh->prepare("UPDATE import_items SET status = ? WHERE import_items_id = ?");
932             $updsth->bind_param(1, 'reverted');
933             $updsth->bind_param(2, $row->{'import_items_id'});
934             $updsth->execute();
935             $updsth->finish();
936             $num_items_deleted++;
937         }
938         else {
939             next;
940         }
941     }
942     $sth->finish();
943     return $num_items_deleted;
944 }
945
946 =head2 CleanBatch
947
948   CleanBatch($batch_id)
949
950 Deletes all staged records from the import batch
951 and sets the status of the batch to 'cleaned'.  Note
952 that deleting a stage record does *not* affect
953 any record that has been committed to the database.
954
955 =cut
956
957 sub CleanBatch {
958     my $batch_id = shift;
959     return unless defined $batch_id;
960
961     C4::Context->dbh->do('DELETE FROM import_records WHERE import_batch_id = ?', {}, $batch_id);
962     SetImportBatchStatus($batch_id, 'cleaned');
963 }
964
965 =head2 DeleteBatch
966
967   DeleteBatch($batch_id)
968
969 Deletes the record from the database. This can only be done
970 once the batch has been cleaned.
971
972 =cut
973
974 sub DeleteBatch {
975     my $batch_id = shift;
976     return unless defined $batch_id;
977
978     my $dbh = C4::Context->dbh;
979     my $sth = $dbh->prepare('DELETE FROM import_batches WHERE import_batch_id = ?');
980     $sth->execute( $batch_id );
981 }
982
983 =head2 GetAllImportBatches
984
985   my $results = GetAllImportBatches();
986
987 Returns a references to an array of hash references corresponding
988 to all import_batches rows (of batch_type 'batch'), sorted in 
989 ascending order by import_batch_id.
990
991 =cut
992
993 sub  GetAllImportBatches {
994     my $dbh = C4::Context->dbh;
995     my $sth = $dbh->prepare_cached("SELECT * FROM import_batches
996                                     WHERE batch_type IN ('batch', 'webservice')
997                                     ORDER BY import_batch_id ASC");
998
999     my $results = [];
1000     $sth->execute();
1001     while (my $row = $sth->fetchrow_hashref) {
1002         push @$results, $row;
1003     }
1004     $sth->finish();
1005     return $results;
1006 }
1007
1008 =head2 GetStagedWebserviceBatches
1009
1010   my $batch_ids = GetStagedWebserviceBatches();
1011
1012 Returns a references to an array of batch id's
1013 of batch_type 'webservice' that are not imported
1014
1015 =cut
1016
1017 my $PENDING_WEBSERVICE_BATCHES_QRY = <<EOQ;
1018 SELECT import_batch_id FROM import_batches
1019 WHERE batch_type = 'webservice'
1020 AND import_status = 'staged'
1021 EOQ
1022 sub  GetStagedWebserviceBatches {
1023     my $dbh = C4::Context->dbh;
1024     return $dbh->selectcol_arrayref($PENDING_WEBSERVICE_BATCHES_QRY);
1025 }
1026
1027 =head2 GetImportBatchRangeDesc
1028
1029   my $results = GetImportBatchRangeDesc($offset, $results_per_group);
1030
1031 Returns a reference to an array of hash references corresponding to
1032 import_batches rows (sorted in descending order by import_batch_id)
1033 start at the given offset.
1034
1035 =cut
1036
1037 sub GetImportBatchRangeDesc {
1038     my ($offset, $results_per_group) = @_;
1039
1040     my $dbh = C4::Context->dbh;
1041     my $query = "SELECT b.*, p.name as profile FROM import_batches b
1042                                     LEFT JOIN import_batch_profiles p
1043                                     ON b.profile_id = p.id
1044                                     WHERE b.batch_type IN ('batch', 'webservice')
1045                                     ORDER BY b.import_batch_id DESC";
1046     my @params;
1047     if ($results_per_group){
1048         $query .= " LIMIT ?";
1049         push(@params, $results_per_group);
1050     }
1051     if ($offset){
1052         $query .= " OFFSET ?";
1053         push(@params, $offset);
1054     }
1055     my $sth = $dbh->prepare_cached($query);
1056     $sth->execute(@params);
1057     my $results = $sth->fetchall_arrayref({});
1058     $sth->finish();
1059     return $results;
1060 }
1061
1062 =head2 GetItemNumbersFromImportBatch
1063
1064   my @itemsnos = GetItemNumbersFromImportBatch($batch_id);
1065
1066 =cut
1067
1068 sub GetItemNumbersFromImportBatch {
1069     my ($batch_id) = @_;
1070     my $dbh = C4::Context->dbh;
1071     my $sql = q|
1072 SELECT itemnumber FROM import_items
1073 INNER JOIN items USING (itemnumber)
1074 INNER JOIN import_records USING (import_record_id)
1075 WHERE import_batch_id = ?|;
1076     my  $sth = $dbh->prepare( $sql );
1077     $sth->execute($batch_id);
1078     my @items ;
1079     while ( my ($itm) = $sth->fetchrow_array ) {
1080         push @items, $itm;
1081     }
1082     return @items;
1083 }
1084
1085 =head2 GetNumberOfImportBatches
1086
1087   my $count = GetNumberOfImportBatches();
1088
1089 =cut
1090
1091 sub GetNumberOfNonZ3950ImportBatches {
1092     my $dbh = C4::Context->dbh;
1093     my $sth = $dbh->prepare("SELECT COUNT(*) FROM import_batches WHERE batch_type != 'z3950'");
1094     $sth->execute();
1095     my ($count) = $sth->fetchrow_array();
1096     $sth->finish();
1097     return $count;
1098 }
1099
1100 =head2 GetImportBiblios
1101
1102   my $results = GetImportBiblios($importid);
1103
1104 =cut
1105
1106 sub GetImportBiblios {
1107     my ($import_record_id) = @_;
1108
1109     my $dbh = C4::Context->dbh;
1110     my $query = "SELECT * FROM import_biblios WHERE import_record_id = ?";
1111     return $dbh->selectall_arrayref(
1112         $query,
1113         { Slice => {} },
1114         $import_record_id
1115     );
1116
1117 }
1118
1119 =head2 GetImportRecordsRange
1120
1121   my $results = GetImportRecordsRange($batch_id, $offset, $results_per_group);
1122
1123 Returns a reference to an array of hash references corresponding to
1124 import_biblios/import_auths/import_records rows for a given batch
1125 starting at the given offset.
1126
1127 =cut
1128
1129 sub GetImportRecordsRange {
1130     my ( $batch_id, $offset, $results_per_group, $status, $parameters ) = @_;
1131
1132     my $dbh = C4::Context->dbh;
1133
1134     my $order_by = $parameters->{order_by} || 'import_record_id';
1135     ( $order_by ) = grep( { $_ eq $order_by } qw( import_record_id title status overlay_status ) ) ? $order_by : 'import_record_id';
1136
1137     my $order_by_direction =
1138       uc( $parameters->{order_by_direction} // 'ASC' ) eq 'DESC' ? 'DESC' : 'ASC';
1139
1140     $order_by .= " $order_by_direction, authorized_heading" if $order_by eq 'title';
1141
1142     my $query = "SELECT title, author, isbn, issn, authorized_heading, import_records.import_record_id,
1143                                            record_sequence, status, overlay_status,
1144                                            matched_biblionumber, matched_authid, record_type
1145                                     FROM   import_records
1146                                     LEFT JOIN import_auths ON (import_records.import_record_id=import_auths.import_record_id)
1147                                     LEFT JOIN import_biblios ON (import_records.import_record_id=import_biblios.import_record_id)
1148                                     WHERE  import_batch_id = ?";
1149     my @params;
1150     push(@params, $batch_id);
1151     if ($status) {
1152         $query .= " AND status=?";
1153         push(@params,$status);
1154     }
1155
1156     $query.=" ORDER BY $order_by $order_by_direction";
1157
1158     if($results_per_group){
1159         $query .= " LIMIT ?";
1160         push(@params, $results_per_group);
1161     }
1162     if($offset){
1163         $query .= " OFFSET ?";
1164         push(@params, $offset);
1165     }
1166     my $sth = $dbh->prepare_cached($query);
1167     $sth->execute(@params);
1168     my $results = $sth->fetchall_arrayref({});
1169     $sth->finish();
1170     return $results;
1171
1172 }
1173
1174 =head2 GetBestRecordMatch
1175
1176   my $record_id = GetBestRecordMatch($import_record_id);
1177
1178 =cut
1179
1180 sub GetBestRecordMatch {
1181     my ($import_record_id) = @_;
1182
1183     my $dbh = C4::Context->dbh;
1184     my $sth = $dbh->prepare("SELECT candidate_match_id
1185                              FROM   import_record_matches
1186                              JOIN   import_records ON ( import_record_matches.import_record_id = import_records.import_record_id )
1187                              LEFT JOIN biblio ON ( candidate_match_id = biblio.biblionumber )
1188                              LEFT JOIN auth_header ON ( candidate_match_id = auth_header.authid )
1189                              WHERE  import_record_matches.import_record_id = ? AND
1190                              (  (import_records.record_type = 'biblio' AND biblio.biblionumber IS NOT NULL) OR
1191                                 (import_records.record_type = 'auth' AND auth_header.authid IS NOT NULL) )
1192                              AND chosen = 1
1193                              ORDER BY score DESC, candidate_match_id DESC");
1194     $sth->execute($import_record_id);
1195     my ($record_id) = $sth->fetchrow_array();
1196     $sth->finish();
1197     return $record_id;
1198 }
1199
1200 =head2 GetImportBatchStatus
1201
1202   my $status = GetImportBatchStatus($batch_id);
1203
1204 =cut
1205
1206 sub GetImportBatchStatus {
1207     my ($batch_id) = @_;
1208
1209     my $dbh = C4::Context->dbh;
1210     my $sth = $dbh->prepare("SELECT import_status FROM import_batches WHERE import_batch_id = ?");
1211     $sth->execute($batch_id);
1212     my ($status) = $sth->fetchrow_array();
1213     $sth->finish();
1214     return $status;
1215
1216 }
1217
1218 =head2 SetImportBatchStatus
1219
1220   SetImportBatchStatus($batch_id, $new_status);
1221
1222 =cut
1223
1224 sub SetImportBatchStatus {
1225     my ($batch_id, $new_status) = @_;
1226
1227     my $dbh = C4::Context->dbh;
1228     my $sth = $dbh->prepare("UPDATE import_batches SET import_status = ? WHERE import_batch_id = ?");
1229     $sth->execute($new_status, $batch_id);
1230     $sth->finish();
1231
1232 }
1233
1234 =head2 SetMatchedBiblionumber
1235
1236   SetMatchedBiblionumber($import_record_id, $biblionumber);
1237
1238 =cut
1239
1240 sub SetMatchedBiblionumber {
1241     my ($import_record_id, $biblionumber) = @_;
1242
1243     my $dbh = C4::Context->dbh;
1244     $dbh->do(
1245         q|UPDATE import_biblios SET matched_biblionumber = ? WHERE import_record_id = ?|,
1246         undef, $biblionumber, $import_record_id
1247     );
1248 }
1249
1250 =head2 GetImportBatchOverlayAction
1251
1252   my $overlay_action = GetImportBatchOverlayAction($batch_id);
1253
1254 =cut
1255
1256 sub GetImportBatchOverlayAction {
1257     my ($batch_id) = @_;
1258
1259     my $dbh = C4::Context->dbh;
1260     my $sth = $dbh->prepare("SELECT overlay_action FROM import_batches WHERE import_batch_id = ?");
1261     $sth->execute($batch_id);
1262     my ($overlay_action) = $sth->fetchrow_array();
1263     $sth->finish();
1264     return $overlay_action;
1265
1266 }
1267
1268
1269 =head2 SetImportBatchOverlayAction
1270
1271   SetImportBatchOverlayAction($batch_id, $new_overlay_action);
1272
1273 =cut
1274
1275 sub SetImportBatchOverlayAction {
1276     my ($batch_id, $new_overlay_action) = @_;
1277
1278     my $dbh = C4::Context->dbh;
1279     my $sth = $dbh->prepare("UPDATE import_batches SET overlay_action = ? WHERE import_batch_id = ?");
1280     $sth->execute($new_overlay_action, $batch_id);
1281     $sth->finish();
1282
1283 }
1284
1285 =head2 GetImportBatchNoMatchAction
1286
1287   my $nomatch_action = GetImportBatchNoMatchAction($batch_id);
1288
1289 =cut
1290
1291 sub GetImportBatchNoMatchAction {
1292     my ($batch_id) = @_;
1293
1294     my $dbh = C4::Context->dbh;
1295     my $sth = $dbh->prepare("SELECT nomatch_action FROM import_batches WHERE import_batch_id = ?");
1296     $sth->execute($batch_id);
1297     my ($nomatch_action) = $sth->fetchrow_array();
1298     $sth->finish();
1299     return $nomatch_action;
1300
1301 }
1302
1303
1304 =head2 SetImportBatchNoMatchAction
1305
1306   SetImportBatchNoMatchAction($batch_id, $new_nomatch_action);
1307
1308 =cut
1309
1310 sub SetImportBatchNoMatchAction {
1311     my ($batch_id, $new_nomatch_action) = @_;
1312
1313     my $dbh = C4::Context->dbh;
1314     my $sth = $dbh->prepare("UPDATE import_batches SET nomatch_action = ? WHERE import_batch_id = ?");
1315     $sth->execute($new_nomatch_action, $batch_id);
1316     $sth->finish();
1317
1318 }
1319
1320 =head2 GetImportBatchItemAction
1321
1322   my $item_action = GetImportBatchItemAction($batch_id);
1323
1324 =cut
1325
1326 sub GetImportBatchItemAction {
1327     my ($batch_id) = @_;
1328
1329     my $dbh = C4::Context->dbh;
1330     my $sth = $dbh->prepare("SELECT item_action FROM import_batches WHERE import_batch_id = ?");
1331     $sth->execute($batch_id);
1332     my ($item_action) = $sth->fetchrow_array();
1333     $sth->finish();
1334     return $item_action;
1335
1336 }
1337
1338
1339 =head2 SetImportBatchItemAction
1340
1341   SetImportBatchItemAction($batch_id, $new_item_action);
1342
1343 =cut
1344
1345 sub SetImportBatchItemAction {
1346     my ($batch_id, $new_item_action) = @_;
1347
1348     my $dbh = C4::Context->dbh;
1349     my $sth = $dbh->prepare("UPDATE import_batches SET item_action = ? WHERE import_batch_id = ?");
1350     $sth->execute($new_item_action, $batch_id);
1351     $sth->finish();
1352
1353 }
1354
1355 =head2 GetImportBatchMatcher
1356
1357   my $matcher_id = GetImportBatchMatcher($batch_id);
1358
1359 =cut
1360
1361 sub GetImportBatchMatcher {
1362     my ($batch_id) = @_;
1363
1364     my $dbh = C4::Context->dbh;
1365     my $sth = $dbh->prepare("SELECT matcher_id FROM import_batches WHERE import_batch_id = ?");
1366     $sth->execute($batch_id);
1367     my ($matcher_id) = $sth->fetchrow_array();
1368     $sth->finish();
1369     return $matcher_id;
1370
1371 }
1372
1373
1374 =head2 SetImportBatchMatcher
1375
1376   SetImportBatchMatcher($batch_id, $new_matcher_id);
1377
1378 =cut
1379
1380 sub SetImportBatchMatcher {
1381     my ($batch_id, $new_matcher_id) = @_;
1382
1383     my $dbh = C4::Context->dbh;
1384     my $sth = $dbh->prepare("UPDATE import_batches SET matcher_id = ? WHERE import_batch_id = ?");
1385     $sth->execute($new_matcher_id, $batch_id);
1386     $sth->finish();
1387
1388 }
1389
1390 =head2 GetImportRecordOverlayStatus
1391
1392   my $overlay_status = GetImportRecordOverlayStatus($import_record_id);
1393
1394 =cut
1395
1396 sub GetImportRecordOverlayStatus {
1397     my ($import_record_id) = @_;
1398
1399     my $dbh = C4::Context->dbh;
1400     my $sth = $dbh->prepare("SELECT overlay_status FROM import_records WHERE import_record_id = ?");
1401     $sth->execute($import_record_id);
1402     my ($overlay_status) = $sth->fetchrow_array();
1403     $sth->finish();
1404     return $overlay_status;
1405
1406 }
1407
1408
1409 =head2 SetImportRecordOverlayStatus
1410
1411   SetImportRecordOverlayStatus($import_record_id, $new_overlay_status);
1412
1413 =cut
1414
1415 sub SetImportRecordOverlayStatus {
1416     my ($import_record_id, $new_overlay_status) = @_;
1417
1418     my $dbh = C4::Context->dbh;
1419     my $sth = $dbh->prepare("UPDATE import_records SET overlay_status = ? WHERE import_record_id = ?");
1420     $sth->execute($new_overlay_status, $import_record_id);
1421     $sth->finish();
1422
1423 }
1424
1425 =head2 GetImportRecordStatus
1426
1427   my $status = GetImportRecordStatus($import_record_id);
1428
1429 =cut
1430
1431 sub GetImportRecordStatus {
1432     my ($import_record_id) = @_;
1433
1434     my $dbh = C4::Context->dbh;
1435     my $sth = $dbh->prepare("SELECT status FROM import_records WHERE import_record_id = ?");
1436     $sth->execute($import_record_id);
1437     my ($status) = $sth->fetchrow_array();
1438     $sth->finish();
1439     return $status;
1440
1441 }
1442
1443
1444 =head2 SetImportRecordStatus
1445
1446   SetImportRecordStatus($import_record_id, $new_status);
1447
1448 =cut
1449
1450 sub SetImportRecordStatus {
1451     my ($import_record_id, $new_status) = @_;
1452
1453     my $dbh = C4::Context->dbh;
1454     my $sth = $dbh->prepare("UPDATE import_records SET status = ? WHERE import_record_id = ?");
1455     $sth->execute($new_status, $import_record_id);
1456     $sth->finish();
1457
1458 }
1459
1460 =head2 GetImportRecordMatches
1461
1462   my $results = GetImportRecordMatches($import_record_id, $best_only);
1463
1464 =cut
1465
1466 sub GetImportRecordMatches {
1467     my $import_record_id = shift;
1468     my $best_only = @_ ? shift : 0;
1469
1470     my $dbh = C4::Context->dbh;
1471     # FIXME currently biblio only
1472     my $sth = $dbh->prepare_cached("SELECT title, author, biblionumber,
1473                                     candidate_match_id, score, record_type,
1474                                     chosen
1475                                     FROM import_records
1476                                     JOIN import_record_matches USING (import_record_id)
1477                                     LEFT JOIN biblio ON (biblionumber = candidate_match_id)
1478                                     WHERE import_record_id = ?
1479                                     ORDER BY score DESC, biblionumber DESC");
1480     $sth->bind_param(1, $import_record_id);
1481     my $results = [];
1482     $sth->execute();
1483     while (my $row = $sth->fetchrow_hashref) {
1484         if ($row->{'record_type'} eq 'auth') {
1485             $row->{'authorized_heading'} = C4::AuthoritiesMarc::GetAuthorizedHeading( { authid => $row->{'candidate_match_id'} } );
1486         }
1487         next if ($row->{'record_type'} eq 'biblio' && not $row->{'biblionumber'});
1488         push @$results, $row;
1489         last if $best_only;
1490     }
1491     $sth->finish();
1492
1493     return $results;
1494     
1495 }
1496
1497 =head2 SetImportRecordMatches
1498
1499   SetImportRecordMatches($import_record_id, @matches);
1500
1501 =cut
1502
1503 sub SetImportRecordMatches {
1504     my $import_record_id = shift;
1505     my @matches = @_;
1506
1507     my $dbh = C4::Context->dbh;
1508     my $delsth = $dbh->prepare("DELETE FROM import_record_matches WHERE import_record_id = ?");
1509     $delsth->execute($import_record_id);
1510     $delsth->finish();
1511
1512     my $sth = $dbh->prepare("INSERT INTO import_record_matches (import_record_id, candidate_match_id, score, chosen)
1513                                     VALUES (?, ?, ?, ?)");
1514     my $chosen = 1; #The first match is defaulted to be chosen
1515     foreach my $match (@matches) {
1516         $sth->execute($import_record_id, $match->{'record_id'}, $match->{'score'}, $chosen);
1517         $chosen = 0; #After the first we do not default to other matches
1518     }
1519 }
1520
1521 =head2 RecordsFromISO2709File
1522
1523     my ($errors, $records) = C4::ImportBatch::RecordsFromISO2709File($input_file, $record_type, $encoding);
1524
1525 Reads ISO2709 binary porridge from the given file and creates MARC::Record-objects out of it.
1526
1527 @PARAM1, String, absolute path to the ISO2709 file.
1528 @PARAM2, String, see stage_file.pl
1529 @PARAM3, String, should be utf8
1530
1531 Returns two array refs.
1532
1533 =cut
1534
1535 sub RecordsFromISO2709File {
1536     my ($input_file, $record_type, $encoding) = @_;
1537     my @errors;
1538
1539     my $marc_type = C4::Context->preference('marcflavour');
1540     $marc_type .= 'AUTH' if ($marc_type eq 'UNIMARC' && $record_type eq 'auth');
1541
1542     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1543     my @marc_records;
1544     $/ = "\035";
1545     while (<$fh>) {
1546         s/^\s+//;
1547         s/\s+$//;
1548         next unless $_; # skip if record has only whitespace, as might occur
1549                         # if file includes newlines between each MARC record
1550         my ($marc_record, $charset_guessed, $char_errors) = MarcToUTF8Record($_, $marc_type, $encoding);
1551         push @marc_records, $marc_record;
1552         if ($charset_guessed ne $encoding) {
1553             push @errors,
1554                 "Unexpected charset $charset_guessed, expecting $encoding";
1555         }
1556     }
1557     close $fh;
1558     return ( \@errors, \@marc_records );
1559 }
1560
1561 =head2 RecordsFromMARCXMLFile
1562
1563     my ($errors, $records) = C4::ImportBatch::RecordsFromMARCXMLFile($input_file, $encoding);
1564
1565 Creates MARC::Record-objects out of the given MARCXML-file.
1566
1567 @PARAM1, String, absolute path to the ISO2709 file.
1568 @PARAM2, String, should be utf8
1569
1570 Returns two array refs.
1571
1572 =cut
1573
1574 sub RecordsFromMARCXMLFile {
1575     my ( $filename, $encoding ) = @_;
1576     my $batch = MARC::File::XML->in( $filename );
1577     my ( @marcRecords, @errors, $record );
1578     do {
1579         eval { $record = $batch->next( $encoding ); };
1580         if ($@) {
1581             push @errors, $@;
1582         }
1583         push @marcRecords, $record if $record;
1584     } while( $record );
1585     return (\@errors, \@marcRecords);
1586 }
1587
1588 =head2 RecordsFromMarcPlugin
1589
1590     Converts text of input_file into array of MARC records with to_marc plugin
1591
1592 =cut
1593
1594 sub RecordsFromMarcPlugin {
1595     my ($input_file, $plugin_class, $encoding) = @_;
1596     my ( $text, @return );
1597     return \@return if !$input_file || !$plugin_class;
1598
1599     # Read input file
1600     open my $fh, '<', $input_file or die "$0: cannot open input file $input_file: $!\n";
1601     $/ = "\035";
1602     while (<$fh>) {
1603         s/^\s+//;
1604         s/\s+$//;
1605         next unless $_;
1606         $text .= $_;
1607     }
1608     close $fh;
1609
1610     # Convert to large MARC blob with plugin
1611     $text = Koha::Plugins::Handler->run({
1612         class  => $plugin_class,
1613         method => 'to_marc',
1614         params => { data => $text },
1615     }) if $text;
1616
1617     # Convert to array of MARC records
1618     if( $text ) {
1619         my $marc_type = C4::Context->preference('marcflavour');
1620         foreach my $blob ( split(/\x1D/, $text) ) {
1621             next if $blob =~ /^\s*$/;
1622             my ($marcrecord) = MarcToUTF8Record($blob, $marc_type, $encoding);
1623             push @return, $marcrecord;
1624         }
1625     }
1626     return \@return;
1627 }
1628
1629 # internal functions
1630
1631 sub _create_import_record {
1632     my ($batch_id, $record_sequence, $marc_record, $record_type, $encoding, $marc_type) = @_;
1633
1634     my $dbh = C4::Context->dbh;
1635     my $sth = $dbh->prepare("INSERT INTO import_records (import_batch_id, record_sequence, marc, marcxml, marcxml_old,
1636                                                          record_type, encoding)
1637                                     VALUES (?, ?, ?, ?, ?, ?, ?)");
1638     $sth->execute($batch_id, $record_sequence, $marc_record->as_usmarc(), $marc_record->as_xml($marc_type), '',
1639                   $record_type, $encoding);
1640     my $import_record_id = $dbh->{'mysql_insertid'};
1641     $sth->finish();
1642     return $import_record_id;
1643 }
1644
1645 sub _update_import_record_marc {
1646     my ($import_record_id, $marc_record, $marc_type) = @_;
1647
1648     my $dbh = C4::Context->dbh;
1649     my $sth = $dbh->prepare("UPDATE import_records SET marc = ?, marcxml = ?
1650                              WHERE  import_record_id = ?");
1651     $sth->execute($marc_record->as_usmarc(), $marc_record->as_xml($marc_type), $import_record_id);
1652     $sth->finish();
1653 }
1654
1655 sub _add_auth_fields {
1656     my ($import_record_id, $marc_record) = @_;
1657
1658     my $controlnumber;
1659     if ($marc_record->field('001')) {
1660         $controlnumber = $marc_record->field('001')->data();
1661     }
1662     my $authorized_heading = C4::AuthoritiesMarc::GetAuthorizedHeading({ record => $marc_record });
1663     my $dbh = C4::Context->dbh;
1664     my $sth = $dbh->prepare("INSERT INTO import_auths (import_record_id, control_number, authorized_heading) VALUES (?, ?, ?)");
1665     $sth->execute($import_record_id, $controlnumber, $authorized_heading);
1666     $sth->finish();
1667 }
1668
1669 sub _add_biblio_fields {
1670     my ($import_record_id, $marc_record) = @_;
1671
1672     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1673     my $dbh = C4::Context->dbh;
1674     # FIXME no controlnumber, originalsource
1675     $isbn = C4::Koha::GetNormalizedISBN($isbn);
1676     my $sth = $dbh->prepare("INSERT INTO import_biblios (import_record_id, title, author, isbn, issn) VALUES (?, ?, ?, ?, ?)");
1677     $sth->execute($import_record_id, $title, $author, $isbn, $issn) or die $sth->errstr;
1678     $sth->finish();
1679                 
1680 }
1681
1682 sub _update_biblio_fields {
1683     my ($import_record_id, $marc_record) = @_;
1684
1685     my ($title, $author, $isbn, $issn) = _parse_biblio_fields($marc_record);
1686     my $dbh = C4::Context->dbh;
1687     # FIXME no controlnumber, originalsource
1688     # FIXME 2 - should regularize normalization of ISBN wherever it is done
1689     $isbn =~ s/\(.*$//;
1690     $isbn =~ tr/ -_//;
1691     $isbn = uc $isbn;
1692     my $sth = $dbh->prepare("UPDATE import_biblios SET title = ?, author = ?, isbn = ?, issn = ?
1693                              WHERE  import_record_id = ?");
1694     $sth->execute($title, $author, $isbn, $issn, $import_record_id);
1695     $sth->finish();
1696 }
1697
1698 sub _parse_biblio_fields {
1699     my ($marc_record) = @_;
1700
1701     my $dbh = C4::Context->dbh;
1702     my $bibliofields = TransformMarcToKoha($marc_record, '');
1703     return ($bibliofields->{'title'}, $bibliofields->{'author'}, $bibliofields->{'isbn'}, $bibliofields->{'issn'});
1704
1705 }
1706
1707 sub _update_batch_record_counts {
1708     my ($batch_id) = @_;
1709
1710     my $dbh = C4::Context->dbh;
1711     my $sth = $dbh->prepare_cached("UPDATE import_batches SET
1712                                         num_records = (
1713                                             SELECT COUNT(*)
1714                                             FROM import_records
1715                                             WHERE import_batch_id = import_batches.import_batch_id),
1716                                         num_items = (
1717                                             SELECT COUNT(*)
1718                                             FROM import_records
1719                                             JOIN import_items USING (import_record_id)
1720                                             WHERE import_batch_id = import_batches.import_batch_id
1721                                             AND record_type = 'biblio')
1722                                     WHERE import_batch_id = ?");
1723     $sth->bind_param(1, $batch_id);
1724     $sth->execute();
1725     $sth->finish();
1726 }
1727
1728 sub _get_commit_action {
1729     my ($overlay_action, $nomatch_action, $item_action, $overlay_status, $import_record_id, $record_type) = @_;
1730     
1731     if ($record_type eq 'biblio') {
1732         my ($bib_result, $bib_match, $item_result);
1733
1734         $bib_match = GetBestRecordMatch($import_record_id);
1735         if ($overlay_status ne 'no_match' && defined($bib_match)) {
1736
1737             $bib_result = $overlay_action;
1738
1739             if($item_action eq 'always_add' or $item_action eq 'add_only_for_matches'){
1740                 $item_result = 'create_new';
1741             } elsif($item_action eq 'replace'){
1742                 $item_result = 'replace';
1743             } else {
1744                 $item_result = 'ignore';
1745             }
1746
1747         } else {
1748             $bib_result = $nomatch_action;
1749             $item_result = ($item_action eq 'always_add' or $item_action eq 'add_only_for_new') ? 'create_new' : 'ignore';
1750         }
1751         return ($bib_result, $item_result, $bib_match);
1752     } else { # must be auths
1753         my ($auth_result, $auth_match);
1754
1755         $auth_match = GetBestRecordMatch($import_record_id);
1756         if ($overlay_status ne 'no_match' && defined($auth_match)) {
1757             $auth_result = $overlay_action;
1758         } else {
1759             $auth_result = $nomatch_action;
1760         }
1761
1762         return ($auth_result, undef, $auth_match);
1763
1764     }
1765 }
1766
1767 sub _get_revert_action {
1768     my ($overlay_action, $overlay_status, $status) = @_;
1769
1770     my $bib_result;
1771
1772     if ($status eq 'ignored') {
1773         $bib_result = 'ignore';
1774     } else {
1775         if ($overlay_action eq 'create_new') {
1776             $bib_result = 'delete';
1777         } else {
1778             $bib_result = ($overlay_status eq 'match_applied') ? 'restore' : 'delete';
1779         }
1780     }
1781     return $bib_result;
1782 }
1783
1784 1;
1785 __END__
1786
1787 =head1 AUTHOR
1788
1789 Koha Development Team <http://koha-community.org/>
1790
1791 Galen Charlton <galen.charlton@liblime.com>
1792
1793 =cut