74250f3e48fa19d50b2c73de49df5d4c6e3c3ef7
[srvgit] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31 use C4::AuthoritiesMarc;
32
33 use Carp;
34 use Clone qw(clone);
35 use JSON;
36 use Modern::Perl;
37 use Readonly;
38 use Search::Elasticsearch;
39 use Try::Tiny;
40 use YAML::XS;
41
42 use List::Util qw( sum0 reduce all );
43 use MARC::File::XML;
44 use MIME::Base64;
45 use Encode qw(encode);
46 use Business::ISBN;
47 use Scalar::Util qw(looks_like_number);
48
49 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
50 __PACKAGE__->mk_accessors(qw( sort_fields ));
51
52 # Constants to refer to the standard index names
53 Readonly our $BIBLIOS_INDEX     => 'biblios';
54 Readonly our $AUTHORITIES_INDEX => 'authorities';
55
56 =head1 NAME
57
58 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59
60 =head1 ACCESSORS
61
62 =over 4
63
64 =item index
65
66 The name of the index to use, generally 'biblios' or 'authorities'.
67
68 =item index_name
69
70 The Elasticsearch index name with Koha instance prefix.
71
72 =back
73
74
75 =head1 FUNCTIONS
76
77 =cut
78
79 sub new {
80     my $class = shift @_;
81     my ($params) = @_;
82
83     # Check for a valid index
84     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
85     my $config = _read_configuration();
86     $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87
88     my $self = $class->SUPER::new(@_);
89     return $self;
90 }
91
92 =head2 get_elasticsearch
93
94     my $elasticsearch_client = $self->get_elasticsearch();
95
96 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
97 instance level and will be reused if method is called multiple times.
98
99 =cut
100
101 sub get_elasticsearch {
102     my $self = shift @_;
103     unless (defined $self->{elasticsearch}) {
104         $self->{elasticsearch} = Search::Elasticsearch->new(
105             $self->get_elasticsearch_params()
106         );
107     }
108     return $self->{elasticsearch};
109 }
110
111 =head2 get_elasticsearch_params
112
113     my $params = $self->get_elasticsearch_params();
114
115 This provides a hashref that contains the parameters for connecting to the
116 ElasicSearch servers, in the form:
117
118     {
119         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
120         'index_name' => 'koha_instance_index',
121     }
122
123 This is configured by the following in the C<config> block in koha-conf.xml:
124
125     <elasticsearch>
126         <server>127.0.0.1:9200</server>
127         <server>anotherserver:9200</server>
128         <index_name>koha_instance</index_name>
129     </elasticsearch>
130
131 =cut
132
133 sub get_elasticsearch_params {
134     my ($self) = @_;
135
136     my $conf;
137     try {
138         $conf = _read_configuration();
139     } catch {
140         if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
141             croak($_->message);
142         }
143     };
144
145     return $conf
146 }
147
148 =head2 get_elasticsearch_settings
149
150     my $settings = $self->get_elasticsearch_settings();
151
152 This provides the settings provided to Elasticsearch when an index is created.
153 These can do things like define tokenization methods.
154
155 A hashref containing the settings is returned.
156
157 =cut
158
159 sub get_elasticsearch_settings {
160     my ($self) = @_;
161
162     # Use state to speed up repeated calls
163     state $settings = undef;
164     if (!defined $settings) {
165         my $config_file = C4::Context->config('elasticsearch_index_config');
166         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
167         $settings = YAML::XS::LoadFile( $config_file );
168     }
169
170     return $settings;
171 }
172
173 =head2 get_elasticsearch_mappings
174
175     my $mappings = $self->get_elasticsearch_mappings();
176
177 This provides the mappings that get passed to Elasticsearch when an index is
178 created.
179
180 =cut
181
182 sub get_elasticsearch_mappings {
183     my ($self) = @_;
184
185     # Use state to speed up repeated calls
186     state %all_mappings;
187     state %sort_fields;
188
189     if (!defined $all_mappings{$self->index}) {
190         $sort_fields{$self->index} = {};
191         # Clone the general mapping to break ties with the original hash
192         my $mappings = {
193             data => clone(_get_elasticsearch_field_config('general', ''))
194         };
195         my $marcflavour = lc C4::Context->preference('marcflavour');
196         $self->_foreach_mapping(
197             sub {
198                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
199                 return if $marc_type ne $marcflavour;
200                 # TODO if this gets any sort of complexity to it, it should
201                 # be broken out into its own function.
202
203                 # TODO be aware of date formats, but this requires pre-parsing
204                 # as ES will simply reject anything with an invalid date.
205                 my $es_type = 'text';
206                 if ($type eq 'boolean') {
207                     $es_type = 'boolean';
208                 } elsif ($type eq 'number' || $type eq 'sum') {
209                     $es_type = 'integer';
210                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211                     $es_type = 'stdno';
212                 } elsif ($type eq 'year') {
213                     $es_type = 'year';
214                 }
215
216                 if ($search) {
217                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
218                 }
219
220                 if ($facet) {
221                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
222                 }
223                 if ($suggestible) {
224                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225                 }
226                 # Sort is a bit special as it can be true, false, undef.
227                 # We care about "true" or "undef",
228                 # "undef" means to do the default thing, which is make it sortable.
229                 if (!defined $sort || $sort) {
230                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
231                     $sort_fields{$self->index}{$name} = 1;
232                 }
233             }
234         );
235         $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
236         $all_mappings{$self->index} = $mappings;
237     }
238     $self->sort_fields(\%{$sort_fields{$self->index}});
239     return $all_mappings{$self->index};
240 }
241
242 =head2 raw_elasticsearch_mappings
243
244 Return elasticsearch mapping as it is in database.
245 marc_type: marc21|unimarc|normarc
246
247 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
248
249 =cut
250
251 sub raw_elasticsearch_mappings {
252     my ( $marc_type ) = @_;
253
254     my $schema = Koha::Database->new()->schema();
255
256     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
257
258     my $mappings = {};
259     while ( my $search_field = $search_fields->next ) {
260
261         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
262             { search_field_id => $search_field->id },
263             {
264                 join     => 'search_marc_map',
265                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
266             }
267         );
268
269         while ( my $marc_to_field = $marc_to_fields->next ) {
270
271             my $marc_map = $marc_to_field->search_marc_map;
272
273             next if $marc_type && $marc_map->marc_type ne $marc_type;
274
275             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
276             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
277             $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
278             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
279             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
280             $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282
283             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284                 {
285                     facet   => $marc_to_field->facet || '',
286                     marc_type => $marc_map->marc_type,
287                     marc_field => $marc_map->marc_field,
288                     sort        => $marc_to_field->sort,
289                     suggestible => $marc_to_field->suggestible || ''
290                 });
291
292         }
293     }
294
295     return $mappings;
296 }
297
298 =head2 _get_elasticsearch_field_config
299
300 Get the Elasticsearch field config for the given purpose and data type.
301
302 $mapping = _get_elasticsearch_field_config('search', 'text');
303
304 =cut
305
306 sub _get_elasticsearch_field_config {
307
308     my ( $purpose, $type ) = @_;
309
310     # Use state to speed up repeated calls
311     state $settings = undef;
312     if (!defined $settings) {
313         my $config_file = C4::Context->config('elasticsearch_field_config');
314         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
315         local $YAML::XS::Boolean = 'JSON::PP';
316         $settings = YAML::XS::LoadFile( $config_file );
317     }
318
319     if (!defined $settings->{$purpose}) {
320         die "Field purpose $purpose not defined in field config";
321     }
322     if ($type eq '') {
323         return $settings->{$purpose};
324     }
325     if (defined $settings->{$purpose}{$type}) {
326         return $settings->{$purpose}{$type};
327     }
328     if (defined $settings->{$purpose}{'default'}) {
329         return $settings->{$purpose}{'default'};
330     }
331     return;
332 }
333
334 =head2 _load_elasticsearch_mappings
335
336 Load Elasticsearch mappings in the format of mappings.yaml.
337
338 $indexes = _load_elasticsearch_mappings();
339
340 =cut
341
342 sub _load_elasticsearch_mappings {
343     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
344     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
345     return YAML::XS::LoadFile( $mappings_yaml );
346 }
347
348 sub reset_elasticsearch_mappings {
349     my ( $self ) = @_;
350     my $indexes = $self->_load_elasticsearch_mappings();
351
352     Koha::SearchMarcMaps->delete;
353     Koha::SearchFields->delete;
354
355     while ( my ( $index_name, $fields ) = each %$indexes ) {
356         while ( my ( $field_name, $data ) = each %$fields ) {
357
358             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
359
360             # Set default values
361             $sf_params{staff_client} //= 1;
362             $sf_params{opac} //= 1;
363
364             $sf_params{name} = $field_name;
365
366             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
367
368             my $mappings = $data->{mappings};
369             for my $mapping ( @$mappings ) {
370                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
371                     index_name => $index_name,
372                     marc_type => $mapping->{marc_type},
373                     marc_field => $mapping->{marc_field}
374                 });
375                 $search_field->add_to_search_marc_maps($marc_field, {
376                     facet => $mapping->{facet} || 0,
377                     suggestible => $mapping->{suggestible} || 0,
378                     sort => $mapping->{sort} // 1,
379                     search => $mapping->{search} // 1
380                 });
381             }
382         }
383     }
384
385     $self->clear_search_fields_cache();
386
387     # FIXME return the mappings?
388 }
389
390 # This overrides the accessor provided by Class::Accessor so that if
391 # sort_fields isn't set, then it'll generate it.
392 sub sort_fields {
393     my $self = shift;
394     if (@_) {
395         $self->_sort_fields_accessor(@_);
396         return;
397     }
398     my $val = $self->_sort_fields_accessor();
399     return $val if $val;
400
401     # This will populate the accessor as a side effect
402     $self->get_elasticsearch_mappings();
403     return $self->_sort_fields_accessor();
404 }
405
406 =head2 _process_mappings($mappings, $data, $record_document, $meta)
407
408     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
409
410 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
411 Since we group all mappings by MARC field targets C<$mappings> will contain
412 all targets for C<$data> and thus we need to fetch the MARC field only once.
413 C<$mappings> will be applied to C<$record_document> and new field values added.
414 The method has no return value.
415
416 =over 4
417
418 =item C<$mappings>
419
420 Arrayref of mappings containing arrayrefs in the format
421 [C<$target>, C<$options>] where C<$target> is the name of the target field and
422 C<$options> is a hashref containing processing directives for this particular
423 mapping.
424
425 =item C<$data>
426
427 The source data from a MARC record field.
428
429 =item C<$record_document>
430
431 Hashref representing the Elasticsearch document on which mappings should be
432 applied.
433
434 =item C<$meta>
435
436 A hashref containing metadata useful for enforcing per mapping rules. For
437 example for providing extra context for mapping options, or treating mapping
438 targets differently depending on type (sort, search, facet etc). Combining
439 this metadata with the mapping options and metadata allows us to mutate the
440 data per mapping, or even replace it with other data retrieved from the
441 metadata context.
442
443 Current properties are:
444
445 C<altscript>: A boolean value indicating whether an alternate script presentation is being
446 processed.
447
448 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
449 'subfield' or 'subfields_group'.
450
451 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
452
453 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
454 is 'subfields_group'.
455
456 C<field>: The original C<MARC::Record> object.
457
458 =back
459
460 =cut
461
462 sub _process_mappings {
463     my ($_self, $mappings, $data, $record_document, $meta) = @_;
464     foreach my $mapping (@{$mappings}) {
465         my ($target, $options) = @{$mapping};
466
467         # Don't process sort fields for alternate scripts
468         my $sort = $target =~ /__sort$/;
469         if ($sort && $meta->{altscript}) {
470             next;
471         }
472
473         # Copy (scalar) data since can have multiple targets
474         # with differing options for (possibly) mutating data
475         # so need a different copy for each
476         my $data_copy = $data;
477         if (defined $options->{substr}) {
478             my ($start, $length) = @{$options->{substr}};
479             $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
480         }
481
482         # Add data to values array for callbacks processing
483         my $values = [$data_copy];
484
485         # Value callbacks takes subfield data (or values from previous
486         # callbacks) as argument, and returns a possibly different list of values.
487         # Note that the returned list may also be empty.
488         if (defined $options->{value_callbacks}) {
489             foreach my $callback (@{$options->{value_callbacks}}) {
490                 # Pass each value to current callback which returns a list
491                 # (scalar is fine too) resulting either in a list or
492                 # a list of lists that will be flattened by perl.
493                 # The next callback will receive the possibly expanded list of values.
494                 $values = [ map { $callback->($_) } @{$values} ];
495             }
496         }
497
498         # Skip mapping if all values has been removed
499         next unless @{$values};
500
501         if (defined $options->{property}) {
502             $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
503         }
504         if (defined $options->{nonfiling_characters_indicator}) {
505             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
506             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
507             # Nonfiling chars does not make sense for multiple values
508             # Only apply on first element
509             $values->[0] = substr $values->[0], $nonfiling_chars;
510         }
511
512         $values = [ grep(!/^$/, @{$values}) ];
513
514         $record_document->{$target} //= [];
515         push @{$record_document->{$target}}, @{$values};
516     }
517 }
518
519 =head2 marc_records_to_documents($marc_records)
520
521     my $record_documents = $self->marc_records_to_documents($marc_records);
522
523 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
524
525 Returns array of hash references, representing Elasticsearch documents,
526 acceptable as body payload in C<Search::Elasticsearch> requests.
527
528 =over 4
529
530 =item C<$marc_documents>
531
532 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
533
534 =back
535
536 =cut
537
538 sub marc_records_to_documents {
539     my ($self, $records) = @_;
540     my $rules = $self->_get_marc_mapping_rules();
541     my $control_fields_rules = $rules->{control_fields};
542     my $data_fields_rules = $rules->{data_fields};
543     my $marcflavour = lc C4::Context->preference('marcflavour');
544     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
545
546     my @record_documents;
547
548     my %auth_match_headings;
549     if( $self->index eq 'authorities' ){
550         my @auth_types = Koha::Authority::Types->search();
551         %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
552     }
553
554     foreach my $record (@{$records}) {
555         my $record_document = {};
556
557         if ( $self->index eq 'authorities' ){
558             my $authtypecode = GuessAuthTypeCode( $record );
559             if( $authtypecode ){
560                 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
561                     my $field = $record->field( $auth_match_headings{ $authtypecode } );
562                     my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
563                     push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
564                 }
565             } else {
566                 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
567             }
568         }
569
570         my $mappings = $rules->{leader};
571         if ($mappings) {
572             $self->_process_mappings($mappings, $record->leader(), $record_document, {
573                     altscript => 0,
574                     data_source => 'leader'
575                 }
576             );
577         }
578         foreach my $field ($record->fields()) {
579             if ($field->is_control_field()) {
580                 my $mappings = $control_fields_rules->{$field->tag()};
581                 if ($mappings) {
582                     $self->_process_mappings($mappings, $field->data(), $record_document, {
583                             altscript => 0,
584                             data_source => 'control_field',
585                             field => $field
586                         }
587                     );
588                 }
589             }
590             else {
591                 my $tag = $field->tag();
592                 # Handle alternate scripts in MARC 21
593                 my $altscript = 0;
594                 if ($marcflavour eq 'marc21' && $tag eq '880') {
595                     my $sub6 = $field->subfield('6');
596                     if ($sub6 =~ /^(...)-\d+/) {
597                         $tag = $1;
598                         $altscript = 1;
599                     }
600                 }
601
602                 my $data_field_rules = $data_fields_rules->{$tag};
603                 if ($data_field_rules) {
604                     my $subfields_mappings = $data_field_rules->{subfields};
605                     my $wildcard_mappings = $subfields_mappings->{'*'};
606                     foreach my $subfield ($field->subfields()) {
607                         my ($code, $data) = @{$subfield};
608                         my $mappings = $subfields_mappings->{$code} // [];
609                         if ($wildcard_mappings) {
610                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
611                         }
612                         if (@{$mappings}) {
613                             $self->_process_mappings($mappings, $data, $record_document, {
614                                     altscript => $altscript,
615                                     data_source => 'subfield',
616                                     code => $code,
617                                     field => $field
618                                 }
619                             );
620                         }
621                     }
622
623                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
624                     if ($subfields_join_mappings) {
625                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
626                             my $data_field = $field->clone; #copy field to preserve for alt scripts
627                             $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
628                             my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
629                             if ($data) {
630                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
631                                         altscript => $altscript,
632                                         data_source => 'subfields_group',
633                                         codes => $subfields_group,
634                                         field => $field
635                                     }
636                                 );
637                             }
638                         }
639                     }
640                 }
641             }
642         }
643         foreach my $field (keys %{$rules->{defaults}}) {
644             unless (defined $record_document->{$field}) {
645                 $record_document->{$field} = $rules->{defaults}->{$field};
646             }
647         }
648         foreach my $field (@{$rules->{sum}}) {
649             if (defined $record_document->{$field}) {
650                 # TODO: validate numeric? filter?
651                 # TODO: Or should only accept fields without nested values?
652                 # TODO: Quick and dirty, improve if needed
653                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
654             }
655         }
656         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
657         foreach my $field (@{$rules->{isbn}}) {
658             if (defined $record_document->{$field}) {
659                 my @isbns = ();
660                 foreach my $input_isbn (@{$record_document->{$field}}) {
661                     my $isbn = Business::ISBN->new($input_isbn);
662                     if (defined $isbn && $isbn->is_valid) {
663                         my $isbn13 = $isbn->as_isbn13->as_string;
664                         push @isbns, $isbn13;
665                         $isbn13 =~ s/\-//g;
666                         push @isbns, $isbn13;
667
668                         my $isbn10 = $isbn->as_isbn10;
669                         if ($isbn10) {
670                             $isbn10 = $isbn10->as_string;
671                             push @isbns, $isbn10;
672                             $isbn10 =~ s/\-//g;
673                             push @isbns, $isbn10;
674                         }
675                     } else {
676                         push @isbns, $input_isbn;
677                     }
678                 }
679                 $record_document->{$field} = \@isbns;
680             }
681         }
682
683         # Remove duplicate values and collapse sort fields
684         foreach my $field (keys %{$record_document}) {
685             if (ref($record_document->{$field}) eq 'ARRAY') {
686                 @{$record_document->{$field}} = do {
687                     my %seen;
688                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
689                 };
690                 if ($field =~ /__sort$/) {
691                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
692                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
693                 }
694             }
695         }
696
697         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
698         $record->encoding('UTF-8');
699         if ($use_array) {
700             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
701             $record_document->{'marc_format'} = 'ARRAY';
702         } else {
703             my @warnings;
704             {
705                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
706                 local $SIG{__WARN__} = sub {
707                     push @warnings, $_[0];
708                 };
709                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
710             }
711             if (@warnings) {
712                 # Suppress warnings if record length exceeded
713                 unless (substr($record->leader(), 0, 5) eq '99999') {
714                     foreach my $warning (@warnings) {
715                         carp $warning;
716                     }
717                 }
718                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
719                 $record_document->{'marc_format'} = 'MARCXML';
720             }
721             else {
722                 $record_document->{'marc_format'} = 'base64ISO2709';
723             }
724         }
725         push @record_documents, $record_document;
726     }
727     return \@record_documents;
728 }
729
730 =head2 _marc_to_array($record)
731
732     my @fields = _marc_to_array($record)
733
734 Convert a MARC::Record to an array modeled after MARC-in-JSON
735 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
736
737 =over 4
738
739 =item C<$record>
740
741 A MARC::Record object
742
743 =back
744
745 =cut
746
747 sub _marc_to_array {
748     my ($self, $record) = @_;
749
750     my $data = {
751         leader => $record->leader(),
752         fields => []
753     };
754     for my $field ($record->fields()) {
755         my $tag = $field->tag();
756         if ($field->is_control_field()) {
757             push @{$data->{fields}}, {$tag => $field->data()};
758         } else {
759             my $subfields = ();
760             foreach my $subfield ($field->subfields()) {
761                 my ($code, $contents) = @{$subfield};
762                 push @{$subfields}, {$code => $contents};
763             }
764             push @{$data->{fields}}, {
765                 $tag => {
766                     ind1 => $field->indicator(1),
767                     ind2 => $field->indicator(2),
768                     subfields => $subfields
769                 }
770             };
771         }
772     }
773     return $data;
774 }
775
776 =head2 _array_to_marc($data)
777
778     my $record = _array_to_marc($data)
779
780 Convert an array modeled after MARC-in-JSON to a MARC::Record
781
782 =over 4
783
784 =item C<$data>
785
786 An array modeled after MARC-in-JSON
787 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
788
789 =back
790
791 =cut
792
793 sub _array_to_marc {
794     my ($self, $data) = @_;
795
796     my $record = MARC::Record->new();
797
798     $record->leader($data->{leader});
799     for my $field (@{$data->{fields}}) {
800         my $tag = (keys %{$field})[0];
801         $field = $field->{$tag};
802         my $marc_field;
803         if (ref($field) eq 'HASH') {
804             my @subfields;
805             foreach my $subfield (@{$field->{subfields}}) {
806                 my $code = (keys %{$subfield})[0];
807                 push @subfields, $code;
808                 push @subfields, $subfield->{$code};
809             }
810             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
811         } else {
812             $marc_field = MARC::Field->new($tag, $field)
813         }
814         $record->append_fields($marc_field);
815     }
816 ;
817     return $record;
818 }
819
820 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
821
822     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
823
824 Get mappings, an internal data structure later used by
825 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
826 data for a MARC mapping.
827
828 The returned C<$mappings> is not to to be confused with mappings provided by
829 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
830 provided by C<_foreach_mapping> and expands it to this internal data structure.
831 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
832 is then applied to each MARC target (leader, control field data, subfield or
833 joined subfields) and integrated into the mapping rules data structure used in
834 C<marc_records_to_documents> to transform MARC records into Elasticsearch
835 documents.
836
837 =over 4
838
839 =item C<$facet>
840
841 Boolean indicating whether to create a facet field for this mapping.
842
843 =item C<$suggestible>
844
845 Boolean indicating whether to create a suggestion field for this mapping.
846
847 =item C<$sort>
848
849 Boolean indicating whether to create a sort field for this mapping.
850
851 =item C<$search>
852
853 Boolean indicating whether to create a search field for this mapping.
854
855 =item C<$target_name>
856
857 Elasticsearch document target field name.
858
859 =item C<$target_type>
860
861 Elasticsearch document target field type.
862
863 =item C<$range>
864
865 An optional range as a string in the format "<START>-<END>" or "<START>",
866 where "<START>" and "<END>" are integers specifying a range that will be used
867 for extracting a substring from MARC data as Elasticsearch field target value.
868
869 The first character position is "0", and the range is inclusive,
870 so "0-2" means the first three characters of MARC data.
871
872 If only "<START>" is provided only one character at position "<START>" will
873 be extracted.
874
875 =back
876
877 =cut
878
879 sub _field_mappings {
880     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
881     my %mapping_defaults = ();
882     my @mappings;
883
884     my $substr_args = undef;
885     if (defined $range) {
886         # TODO: use value_callback instead?
887         my ($start, $end) = map(int, split /-/, $range, 2);
888         $substr_args = [$start];
889         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
890     }
891     my $default_options = {};
892     if ($substr_args) {
893         $default_options->{substr} = $substr_args;
894     }
895
896     # TODO: Should probably have per type value callback/hook
897     # but hard code for now
898     if ($target_type eq 'boolean') {
899         $default_options->{value_callbacks} //= [];
900         push @{$default_options->{value_callbacks}}, sub {
901             my ($value) = @_;
902             # Trim whitespace at both ends
903             $value =~ s/^\s+|\s+$//g;
904             return $value ? 'true' : 'false';
905         };
906     }
907     elsif ($target_type eq 'year') {
908         $default_options->{value_callbacks} //= [];
909         # Only accept years containing digits and "u"
910         push @{$default_options->{value_callbacks}}, sub {
911             my ($value) = @_;
912             # Replace "u" with "0" for sorting
913             return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
914         };
915     }
916
917     if ($search) {
918         my $mapping = [$target_name, $default_options];
919         push @mappings, $mapping;
920     }
921
922     my @suffixes = ();
923     push @suffixes, 'facet' if $facet;
924     push @suffixes, 'suggestion' if $suggestible;
925     push @suffixes, 'sort' if !defined $sort || $sort;
926
927     foreach my $suffix (@suffixes) {
928         my $mapping = ["${target_name}__$suffix"];
929         # TODO: Hack, fix later in less hideous manner
930         if ($suffix eq 'suggestion') {
931             push @{$mapping}, {%{$default_options}, property => 'input'};
932         }
933         else {
934             # Important! Make shallow clone, or we end up with the same hashref
935             # shared by all mappings
936             push @{$mapping}, {%{$default_options}};
937         }
938         push @mappings, $mapping;
939     }
940     return @mappings;
941 };
942
943 =head2 _get_marc_mapping_rules
944
945     my $mapping_rules = $self->_get_marc_mapping_rules()
946
947 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
948
949 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
950 each call to C<MARC::Record>->field) we create an optimized structure of mapping
951 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
952
953 We can then iterate through all MARC fields for each record and apply all relevant
954 rules once per fields instead of retreiving fields multiple times for each mapping rule
955 which is terribly slow.
956
957 =cut
958
959 # TODO: This structure can be used for processing multiple MARC::Records so is currently
960 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
961 # memory cache which it is currently not. The performance gain of caching
962 # would probably be marginal, but to do this could be a further improvement.
963
964 sub _get_marc_mapping_rules {
965     my ($self) = @_;
966     my $marcflavour = lc C4::Context->preference('marcflavour');
967     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
968     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
969     my $rules = {
970         'leader' => [],
971         'control_fields' => {},
972         'data_fields' => {},
973         'sum' => [],
974         'isbn' => [],
975         'defaults' => {}
976     };
977
978     $self->_foreach_mapping(sub {
979         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
980         return if $marc_type ne $marcflavour;
981
982         if ($type eq 'sum') {
983             push @{$rules->{sum}}, $name;
984             push @{$rules->{sum}}, $name."__sort" if $sort;
985         }
986         elsif ($type eq 'isbn') {
987             push @{$rules->{isbn}}, $name;
988         }
989         elsif ($type eq 'boolean') {
990             # boolean gets special handling, if value doesn't exist for a field,
991             # it is set to false
992             $rules->{defaults}->{$name} = 'false';
993         }
994
995         if ($marc_field =~ $field_spec_regexp) {
996             my $field_tag = $1;
997
998             my @subfields;
999             my @subfield_groups;
1000             # Parse and separate subfields form subfield groups
1001             if (defined $2) {
1002                 my $subfield_group = '';
1003                 my $open_group = 0;
1004
1005                 foreach my $token (split //, $2) {
1006                     if ($token eq "(") {
1007                         if ($open_group) {
1008                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1009                                 "Unmatched opening parenthesis for $marc_field"
1010                             );
1011                         }
1012                         else {
1013                             $open_group = 1;
1014                         }
1015                     }
1016                     elsif ($token eq ")") {
1017                         if ($open_group) {
1018                             if ($subfield_group) {
1019                                 push @subfield_groups, $subfield_group;
1020                                 $subfield_group = '';
1021                             }
1022                             $open_group = 0;
1023                         }
1024                         else {
1025                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1026                                 "Unmatched closing parenthesis for $marc_field"
1027                             );
1028                         }
1029                     }
1030                     elsif ($open_group) {
1031                         $subfield_group .= $token;
1032                     }
1033                     else {
1034                         push @subfields, $token;
1035                     }
1036                 }
1037             }
1038             else {
1039                 push @subfields, '*';
1040             }
1041
1042             my $range = defined $3 ? $3 : undef;
1043             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1044             if ($field_tag < 10) {
1045                 $rules->{control_fields}->{$field_tag} //= [];
1046                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1047             }
1048             else {
1049                 $rules->{data_fields}->{$field_tag} //= {};
1050                 foreach my $subfield (@subfields) {
1051                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1052                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1053                 }
1054                 foreach my $subfield_group (@subfield_groups) {
1055                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1056                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1057                 }
1058             }
1059         }
1060         elsif ($marc_field =~ $leader_regexp) {
1061             my $range = defined $1 ? $1 : undef;
1062             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1063             push @{$rules->{leader}}, @mappings;
1064         }
1065         else {
1066             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1067                 "Invalid MARC field expression: $marc_field"
1068             );
1069         }
1070     });
1071
1072     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1073     if ($marcflavour eq 'marc21') {
1074         # Nonfiling characters processing for sort fields
1075         my %title_fields;
1076         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1077             # Format is: nonfiling characters indicator => field names list
1078             %title_fields = (
1079                 1 => [130, 630, 730, 740],
1080                 2 => [222, 240, 242, 243, 245, 440, 830]
1081             );
1082         }
1083         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1084             %title_fields = (
1085                 1 => [730],
1086                 2 => [130, 430, 530]
1087             );
1088         }
1089         foreach my $indicator (keys %title_fields) {
1090             foreach my $field_tag (@{$title_fields{$indicator}}) {
1091                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1092                 foreach my $mapping (@{$mappings}) {
1093                     if ($mapping->[0] =~ /__sort$/) {
1094                         # Mark this as to be processed for nonfiling characters indicator
1095                         # later on in _process_mappings
1096                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1097                     }
1098                 }
1099             }
1100         }
1101     }
1102
1103     return $rules;
1104 }
1105
1106 =head2 _foreach_mapping
1107
1108     $self->_foreach_mapping(
1109         sub {
1110             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1111                 $marc_field )
1112               = @_;
1113             return unless $marc_type eq 'marc21';
1114             print "Data comes from: " . $marc_field . "\n";
1115         }
1116     );
1117
1118 This allows you to apply a function to each entry in the elasticsearch mappings
1119 table, in order to build the mappings for whatever is needed.
1120
1121 In the provided function, the files are:
1122
1123 =over 4
1124
1125 =item C<$name>
1126
1127 The field name for elasticsearch (corresponds to the 'mapping' column in the
1128 database.
1129
1130 =item C<$type>
1131
1132 The type for this value, e.g. 'string'.
1133
1134 =item C<$facet>
1135
1136 True if this value should be facetised. This only really makes sense if the
1137 field is understood by the facet processing code anyway.
1138
1139 =item C<$sort>
1140
1141 True if this is a field that a) needs special sort handling, and b) if it
1142 should be sorted on. False if a) but not b). Undef if not a). This allows,
1143 for example, author to be sorted on but not everything marked with "author"
1144 to be included in that sort.
1145
1146 =item C<$marc_type>
1147
1148 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1149 'unimarc', 'normarc'.
1150
1151 =item C<$marc_field>
1152
1153 A string that describes the MARC field that contains the data to extract.
1154
1155 =back
1156
1157 =cut
1158
1159 sub _foreach_mapping {
1160     my ( $self, $sub ) = @_;
1161
1162     # TODO use a caching framework here
1163     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1164         {
1165             'search_marc_map.index_name' => $self->index,
1166         },
1167         {   join => { search_marc_to_fields => 'search_marc_map' },
1168             '+select' => [
1169                 'search_marc_to_fields.facet',
1170                 'search_marc_to_fields.suggestible',
1171                 'search_marc_to_fields.sort',
1172                 'search_marc_to_fields.search',
1173                 'search_marc_map.marc_type',
1174                 'search_marc_map.marc_field',
1175             ],
1176             '+as'     => [
1177                 'facet',
1178                 'suggestible',
1179                 'sort',
1180                 'search',
1181                 'marc_type',
1182                 'marc_field',
1183             ],
1184         }
1185     );
1186
1187     while ( my $search_field = $search_fields->next ) {
1188         $sub->(
1189             # Force lower case on indexed field names for case insensitive
1190             # field name searches
1191             lc($search_field->name),
1192             $search_field->type,
1193             $search_field->get_column('facet'),
1194             $search_field->get_column('suggestible'),
1195             $search_field->get_column('sort'),
1196             $search_field->get_column('search'),
1197             $search_field->get_column('marc_type'),
1198             $search_field->get_column('marc_field'),
1199         );
1200     }
1201 }
1202
1203 =head2 process_error
1204
1205     die process_error($@);
1206
1207 This parses an Elasticsearch error message and produces a human-readable
1208 result from it. This result is probably missing all the useful information
1209 that you might want in diagnosing an issue, so the warning is also logged.
1210
1211 Note that currently the resulting message is not internationalised. This
1212 will happen eventually by some method or other.
1213
1214 =cut
1215
1216 sub process_error {
1217     my ($self, $msg) = @_;
1218
1219     warn $msg; # simple logging
1220
1221     # This is super-primitive
1222     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1223
1224     return "Unable to perform your search. Please try again.\n";
1225 }
1226
1227 =head2 _read_configuration
1228
1229     my $conf = _read_configuration();
1230
1231 Reads the I<configuration file> and returns a hash structure with the
1232 configuration information. It raises an exception if mandatory entries
1233 are missing.
1234
1235 The hashref structure has the following form:
1236
1237     {
1238         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1239         'index_name' => 'koha_instance',
1240     }
1241
1242 This is configured by the following in the C<config> block in koha-conf.xml:
1243
1244     <elasticsearch>
1245         <server>127.0.0.1:9200</server>
1246         <server>anotherserver:9200</server>
1247         <index_name>koha_instance</index_name>
1248     </elasticsearch>
1249
1250 =cut
1251
1252 sub _read_configuration {
1253
1254     my $configuration;
1255
1256     my $conf = C4::Context->config('elasticsearch');
1257     unless ( defined $conf ) {
1258         Koha::Exceptions::Config::MissingEntry->throw(
1259             "Missing <elasticsearch> entry in koha-conf.xml"
1260         );
1261     }
1262
1263     if ( $conf && $conf->{server} ) {
1264         my $nodes = $conf->{server};
1265         if ( ref($nodes) eq 'ARRAY' ) {
1266             $configuration->{nodes} = $nodes;
1267         }
1268         else {
1269             $configuration->{nodes} = [$nodes];
1270         }
1271     }
1272     else {
1273         Koha::Exceptions::Config::MissingEntry->throw(
1274             "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1275         );
1276     }
1277
1278     if ( defined $conf->{index_name} ) {
1279         $configuration->{index_name} = $conf->{index_name};
1280     }
1281     else {
1282         Koha::Exceptions::Config::MissingEntry->throw(
1283             "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1284         );
1285     }
1286
1287     $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1288
1289     $configuration->{trace_to} = $conf->{trace_to} if defined $conf->{trace_to};
1290
1291     return $configuration;
1292 }
1293
1294 =head2 get_facetable_fields
1295
1296 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1297
1298 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1299
1300 =cut
1301
1302 sub get_facetable_fields {
1303     my ($self) = @_;
1304
1305     # These should correspond to the ES field names, as opposed to the CCL
1306     # things that zebra uses.
1307     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1308     my @faceted_fields = Koha::SearchFields->search(
1309         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1310     );
1311     my @not_faceted_fields = Koha::SearchFields->search(
1312         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1313     );
1314     # This could certainly be improved
1315     return ( @faceted_fields, @not_faceted_fields );
1316 }
1317
1318 =head2 clear_search_fields_cache
1319
1320 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1321
1322 Clear cached values for ES search fields
1323
1324 =cut
1325
1326 sub clear_search_fields_cache {
1327
1328     my $cache = Koha::Caches->get_instance();
1329     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1330     $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1331     $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1332     $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1333
1334 }
1335
1336 1;
1337
1338 __END__
1339
1340 =head1 AUTHOR
1341
1342 =over 4
1343
1344 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1345
1346 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1347
1348 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1349
1350 =back
1351
1352 =cut