85574cefc0609296c6442b77b346ad89b329dc7f
[srvgit] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use C4::Heading;
30
31 use Carp;
32 use Clone qw(clone);
33 use JSON;
34 use Modern::Perl;
35 use Readonly;
36 use Search::Elasticsearch;
37 use Try::Tiny;
38 use YAML::Syck;
39
40 use List::Util qw( sum0 reduce );
41 use MARC::File::XML;
42 use MIME::Base64;
43 use Encode qw(encode);
44 use Business::ISBN;
45
46 __PACKAGE__->mk_ro_accessors(qw( index ));
47 __PACKAGE__->mk_accessors(qw( sort_fields ));
48
49 # Constants to refer to the standard index names
50 Readonly our $BIBLIOS_INDEX     => 'biblios';
51 Readonly our $AUTHORITIES_INDEX => 'authorities';
52
53 =head1 NAME
54
55 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
56
57 =head1 ACCESSORS
58
59 =over 4
60
61 =item index
62
63 The name of the index to use, generally 'biblios' or 'authorities'.
64
65 =back
66
67 =head1 FUNCTIONS
68
69 =cut
70
71 sub new {
72     my $class = shift @_;
73     my $self = $class->SUPER::new(@_);
74     # Check for a valid index
75     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
76     return $self;
77 }
78
79 =head2 get_elasticsearch
80
81     my $elasticsearch_client = $self->get_elasticsearch();
82
83 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
84 instance level and will be reused if method is called multiple times.
85
86 =cut
87
88 sub get_elasticsearch {
89     my $self = shift @_;
90     unless (defined $self->{elasticsearch}) {
91         my $conf = $self->get_elasticsearch_params();
92         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
93     }
94     return $self->{elasticsearch};
95 }
96
97 =head2 get_elasticsearch_params
98
99     my $params = $self->get_elasticsearch_params();
100
101 This provides a hashref that contains the parameters for connecting to the
102 ElasicSearch servers, in the form:
103
104     {
105         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
106         'index_name' => 'koha_instance_index',
107     }
108
109 This is configured by the following in the C<config> block in koha-conf.xml:
110
111     <elasticsearch>
112         <server>127.0.0.1:9200</server>
113         <server>anotherserver:9200</server>
114         <index_name>koha_instance</index_name>
115     </elasticsearch>
116
117 =cut
118
119 sub get_elasticsearch_params {
120     my ($self) = @_;
121
122     # Copy the hash so that we're not modifying the original
123     my $conf = C4::Context->config('elasticsearch');
124     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
125     my $es = { %{ $conf } };
126
127     # Helpfully, the multiple server lines end up in an array for us anyway
128     # if there are multiple ones, but not if there's only one.
129     my $server = $es->{server};
130     delete $es->{server};
131     if ( ref($server) eq 'ARRAY' ) {
132
133         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
134         $es->{nodes} = $server;
135     }
136     elsif ($server) {
137         $es->{nodes} = [$server];
138     }
139     else {
140         die "No elasticsearch servers were specified in koha-conf.xml.\n";
141     }
142     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
143       if ( !$es->{index_name} );
144     # Append the name of this particular index to our namespace
145     $es->{index_name} .= '_' . $self->index;
146
147     $es->{key_prefix} = 'es_';
148     $es->{cxn_pool} //= 'Static';
149     $es->{request_timeout} //= 60;
150
151     return $es;
152 }
153
154 =head2 get_elasticsearch_settings
155
156     my $settings = $self->get_elasticsearch_settings();
157
158 This provides the settings provided to Elasticsearch when an index is created.
159 These can do things like define tokenization methods.
160
161 A hashref containing the settings is returned.
162
163 =cut
164
165 sub get_elasticsearch_settings {
166     my ($self) = @_;
167
168     # Use state to speed up repeated calls
169     state $settings = undef;
170     if (!defined $settings) {
171         my $config_file = C4::Context->config('elasticsearch_index_config');
172         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
173         $settings = LoadFile( $config_file );
174     }
175
176     return $settings;
177 }
178
179 =head2 get_elasticsearch_mappings
180
181     my $mappings = $self->get_elasticsearch_mappings();
182
183 This provides the mappings that get passed to Elasticsearch when an index is
184 created.
185
186 =cut
187
188 sub get_elasticsearch_mappings {
189     my ($self) = @_;
190
191     # Use state to speed up repeated calls
192     state %all_mappings;
193     state %sort_fields;
194
195     if (!defined $all_mappings{$self->index}) {
196         $sort_fields{$self->index} = {};
197         # Clone the general mapping to break ties with the original hash
198         my $mappings = {
199             data => clone(_get_elasticsearch_field_config('general', ''))
200         };
201         my $marcflavour = lc C4::Context->preference('marcflavour');
202         $self->_foreach_mapping(
203             sub {
204                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
205                 return if $marc_type ne $marcflavour;
206                 # TODO if this gets any sort of complexity to it, it should
207                 # be broken out into its own function.
208
209                 # TODO be aware of date formats, but this requires pre-parsing
210                 # as ES will simply reject anything with an invalid date.
211                 my $es_type = 'text';
212                 if ($type eq 'boolean') {
213                     $es_type = 'boolean';
214                 } elsif ($type eq 'number' || $type eq 'sum') {
215                     $es_type = 'integer';
216                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217                     $es_type = 'stdno';
218                 }
219
220                 if ($search) {
221                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
222                 }
223
224                 if ($facet) {
225                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
226                 }
227                 if ($suggestible) {
228                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
229                 }
230                 # Sort is a bit special as it can be true, false, undef.
231                 # We care about "true" or "undef",
232                 # "undef" means to do the default thing, which is make it sortable.
233                 if (!defined $sort || $sort) {
234                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
235                     $sort_fields{$self->index}{$name} = 1;
236                 }
237             }
238         );
239         $all_mappings{$self->index} = $mappings;
240     }
241     $self->sort_fields(\%{$sort_fields{$self->index}});
242
243     return $all_mappings{$self->index};
244 }
245
246 =head2 _get_elasticsearch_field_config
247
248 Get the Elasticsearch field config for the given purpose and data type.
249
250 $mapping = _get_elasticsearch_field_config('search', 'text');
251
252 =cut
253
254 sub _get_elasticsearch_field_config {
255
256     my ( $purpose, $type ) = @_;
257
258     # Use state to speed up repeated calls
259     state $settings = undef;
260     if (!defined $settings) {
261         my $config_file = C4::Context->config('elasticsearch_field_config');
262         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
263         $settings = LoadFile( $config_file );
264     }
265
266     if (!defined $settings->{$purpose}) {
267         die "Field purpose $purpose not defined in field config";
268     }
269     if ($type eq '') {
270         return $settings->{$purpose};
271     }
272     if (defined $settings->{$purpose}{$type}) {
273         return $settings->{$purpose}{$type};
274     }
275     if (defined $settings->{$purpose}{'default'}) {
276         return $settings->{$purpose}{'default'};
277     }
278     return;
279 }
280
281 =head2 _load_elasticsearch_mappings
282
283 Load Elasticsearch mappings in the format of mappings.yaml.
284
285 $indexes = _load_elasticsearch_mappings();
286
287 =cut
288
289 sub _load_elasticsearch_mappings {
290     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
291     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
292     return LoadFile( $mappings_yaml );
293 }
294
295 sub reset_elasticsearch_mappings {
296     my ( $self ) = @_;
297     my $indexes = $self->_load_elasticsearch_mappings();
298
299     Koha::SearchMarcMaps->delete;
300     Koha::SearchFields->delete;
301
302     while ( my ( $index_name, $fields ) = each %$indexes ) {
303         while ( my ( $field_name, $data ) = each %$fields ) {
304
305             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
306
307             # Set default values
308             $sf_params{staff_client} //= 1;
309             $sf_params{opac} //= 1;
310
311             $sf_params{name} = $field_name;
312
313             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
314
315             my $mappings = $data->{mappings};
316             for my $mapping ( @$mappings ) {
317                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
318                     index_name => $index_name,
319                     marc_type => $mapping->{marc_type},
320                     marc_field => $mapping->{marc_field}
321                 });
322                 $search_field->add_to_search_marc_maps($marc_field, {
323                     facet => $mapping->{facet} || 0,
324                     suggestible => $mapping->{suggestible} || 0,
325                     sort => $mapping->{sort},
326                     search => $mapping->{search} // 1
327                 });
328             }
329         }
330     }
331 }
332
333 # This overrides the accessor provided by Class::Accessor so that if
334 # sort_fields isn't set, then it'll generate it.
335 sub sort_fields {
336     my $self = shift;
337     if (@_) {
338         $self->_sort_fields_accessor(@_);
339         return;
340     }
341     my $val = $self->_sort_fields_accessor();
342     return $val if $val;
343
344     # This will populate the accessor as a side effect
345     $self->get_elasticsearch_mappings();
346     return $self->_sort_fields_accessor();
347 }
348
349 =head2 _process_mappings($mappings, $data, $record_document, $altscript)
350
351     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
352
353 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
354 Since we group all mappings by MARC field targets C<$mappings> will contain
355 all targets for C<$data> and thus we need to fetch the MARC field only once.
356 C<$mappings> will be applied to C<$record_document> and new field values added.
357 The method has no return value.
358
359 =over 4
360
361 =item C<$mappings>
362
363 Arrayref of mappings containing arrayrefs in the format
364 [C<$target>, C<$options>] where C<$target> is the name of the target field and
365 C<$options> is a hashref containing processing directives for this particular
366 mapping.
367
368 =item C<$data>
369
370 The source data from a MARC record field.
371
372 =item C<$record_document>
373
374 Hashref representing the Elasticsearch document on which mappings should be
375 applied.
376
377 =item C<$altscript>
378
379 A boolean value indicating whether an alternate script presentation is being
380 processed.
381
382 =back
383
384 =cut
385
386 sub _process_mappings {
387     my ($_self, $mappings, $data, $record_document, $altscript) = @_;
388     foreach my $mapping (@{$mappings}) {
389         my ($target, $options) = @{$mapping};
390
391         # Don't process sort fields for alternate scripts
392         my $sort = $target =~ /__sort$/;
393         if ($sort && $altscript) {
394             next;
395         }
396
397         # Copy (scalar) data since can have multiple targets
398         # with differing options for (possibly) mutating data
399         # so need a different copy for each
400         my $_data = $data;
401         $record_document->{$target} //= [];
402         if (defined $options->{substr}) {
403             my ($start, $length) = @{$options->{substr}};
404             $_data = length($data) > $start ? substr $data, $start, $length : '';
405         }
406         if (defined $options->{value_callbacks}) {
407             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
408         }
409         if (defined $options->{property}) {
410             $_data = {
411                 $options->{property} => $_data
412             }
413         }
414         push @{$record_document->{$target}}, $_data;
415     }
416 }
417
418 =head2 marc_records_to_documents($marc_records)
419
420     my $record_documents = $self->marc_records_to_documents($marc_records);
421
422 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
423
424 Returns array of hash references, representing Elasticsearch documents,
425 acceptable as body payload in C<Search::Elasticsearch> requests.
426
427 =over 4
428
429 =item C<$marc_documents>
430
431 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
432
433 =back
434
435 =cut
436
437 sub marc_records_to_documents {
438     my ($self, $records) = @_;
439     my $rules = $self->_get_marc_mapping_rules();
440     my $control_fields_rules = $rules->{control_fields};
441     my $data_fields_rules = $rules->{data_fields};
442     my $marcflavour = lc C4::Context->preference('marcflavour');
443     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
444
445     my @record_documents;
446
447     foreach my $record (@{$records}) {
448         my $record_document = {};
449         my $mappings = $rules->{leader};
450         if ($mappings) {
451             $self->_process_mappings($mappings, $record->leader(), $record_document, 0);
452         }
453         foreach my $field ($record->fields()) {
454             if ($field->is_control_field()) {
455                 my $mappings = $control_fields_rules->{$field->tag()};
456                 if ($mappings) {
457                     $self->_process_mappings($mappings, $field->data(), $record_document, 0);
458                 }
459             }
460             else {
461                 my $tag = $field->tag();
462                 # Handle alternate scripts in MARC 21
463                 my $altscript = 0;
464                 if ($marcflavour eq 'marc21' && $tag eq '880') {
465                     my $sub6 = $field->subfield('6');
466                     if ($sub6 =~ /^(...)-\d+/) {
467                         $tag = $1;
468                         $altscript = 1;
469                     }
470                 }
471
472                 my $data_field_rules = $data_fields_rules->{$tag};
473                 if ($data_field_rules) {
474                     my $subfields_mappings = $data_field_rules->{subfields};
475                     my $wildcard_mappings = $subfields_mappings->{'*'};
476                     foreach my $subfield ($field->subfields()) {
477                         my ($code, $data) = @{$subfield};
478                         my $mappings = $subfields_mappings->{$code} // [];
479                         if ($wildcard_mappings) {
480                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
481                         }
482                         if (@{$mappings}) {
483                             $self->_process_mappings($mappings, $data, $record_document, $altscript);
484                         }
485                         if ( defined @{$mappings}[0] && grep /match-heading/, @{@{$mappings}[0]} ){
486                             # Used by the authority linker the match-heading field requires a specific syntax
487                             # that is specified in C4/Heading
488                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
489                             next unless $heading;
490                             push @{$record_document->{'match-heading'}}, $heading->search_form;
491                         }
492                     }
493
494                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
495                     if ($subfields_join_mappings) {
496                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
497                             # Map each subfield to values, remove empty values, join with space
498                             my $data = join(
499                                 ' ',
500                                 grep(
501                                     $_,
502                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
503                                 )
504                             );
505                             if ($data) {
506                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, $altscript);
507                             }
508                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
509                                 # Used by the authority linker the match-heading field requires a specific syntax
510                                 # that is specified in C4/Heading
511                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
512                                 next unless $heading;
513                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
514                             }
515                         }
516                     }
517                 }
518             }
519         }
520         foreach my $field (keys %{$rules->{defaults}}) {
521             unless (defined $record_document->{$field}) {
522                 $record_document->{$field} = $rules->{defaults}->{$field};
523             }
524         }
525         foreach my $field (@{$rules->{sum}}) {
526             if (defined $record_document->{$field}) {
527                 # TODO: validate numeric? filter?
528                 # TODO: Or should only accept fields without nested values?
529                 # TODO: Quick and dirty, improve if needed
530                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
531             }
532         }
533         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
534         foreach my $field (@{$rules->{isbn}}) {
535             if (defined $record_document->{$field}) {
536                 my @isbns = ();
537                 foreach my $input_isbn (@{$record_document->{$field}}) {
538                     my $isbn = Business::ISBN->new($input_isbn);
539                     if (defined $isbn && $isbn->is_valid) {
540                         my $isbn13 = $isbn->as_isbn13->as_string;
541                         push @isbns, $isbn13;
542                         $isbn13 =~ s/\-//g;
543                         push @isbns, $isbn13;
544
545                         my $isbn10 = $isbn->as_isbn10;
546                         if ($isbn10) {
547                             $isbn10 = $isbn10->as_string;
548                             push @isbns, $isbn10;
549                             $isbn10 =~ s/\-//g;
550                             push @isbns, $isbn10;
551                         }
552                     } else {
553                         push @isbns, $input_isbn;
554                     }
555                 }
556                 $record_document->{$field} = \@isbns;
557             }
558         }
559
560         # Remove duplicate values and collapse sort fields
561         foreach my $field (keys %{$record_document}) {
562             if (ref($record_document->{$field}) eq 'ARRAY') {
563                 @{$record_document->{$field}} = do {
564                     my %seen;
565                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
566                 };
567                 if ($field =~ /__sort$/) {
568                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
569                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
570                 }
571             }
572         }
573
574         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
575         $record->encoding('UTF-8');
576         if ($use_array) {
577             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
578             $record_document->{'marc_format'} = 'ARRAY';
579         } else {
580             my @warnings;
581             {
582                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
583                 local $SIG{__WARN__} = sub {
584                     push @warnings, $_[0];
585                 };
586                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
587             }
588             if (@warnings) {
589                 # Suppress warnings if record length exceeded
590                 unless (substr($record->leader(), 0, 5) eq '99999') {
591                     foreach my $warning (@warnings) {
592                         carp $warning;
593                     }
594                 }
595                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
596                 $record_document->{'marc_format'} = 'MARCXML';
597             }
598             else {
599                 $record_document->{'marc_format'} = 'base64ISO2709';
600             }
601         }
602         push @record_documents, $record_document;
603     }
604     return \@record_documents;
605 }
606
607 =head2 _marc_to_array($record)
608
609     my @fields = _marc_to_array($record)
610
611 Convert a MARC::Record to an array modeled after MARC-in-JSON
612 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
613
614 =over 4
615
616 =item C<$record>
617
618 A MARC::Record object
619
620 =back
621
622 =cut
623
624 sub _marc_to_array {
625     my ($self, $record) = @_;
626
627     my $data = {
628         leader => $record->leader(),
629         fields => []
630     };
631     for my $field ($record->fields()) {
632         my $tag = $field->tag();
633         if ($field->is_control_field()) {
634             push @{$data->{fields}}, {$tag => $field->data()};
635         } else {
636             my $subfields = ();
637             foreach my $subfield ($field->subfields()) {
638                 my ($code, $contents) = @{$subfield};
639                 push @{$subfields}, {$code => $contents};
640             }
641             push @{$data->{fields}}, {
642                 $tag => {
643                     ind1 => $field->indicator(1),
644                     ind2 => $field->indicator(2),
645                     subfields => $subfields
646                 }
647             };
648         }
649     }
650     return $data;
651 }
652
653 =head2 _array_to_marc($data)
654
655     my $record = _array_to_marc($data)
656
657 Convert an array modeled after MARC-in-JSON to a MARC::Record
658
659 =over 4
660
661 =item C<$data>
662
663 An array modeled after MARC-in-JSON
664 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
665
666 =back
667
668 =cut
669
670 sub _array_to_marc {
671     my ($self, $data) = @_;
672
673     my $record = MARC::Record->new();
674
675     $record->leader($data->{leader});
676     for my $field (@{$data->{fields}}) {
677         my $tag = (keys %{$field})[0];
678         $field = $field->{$tag};
679         my $marc_field;
680         if (ref($field) eq 'HASH') {
681             my @subfields;
682             foreach my $subfield (@{$field->{subfields}}) {
683                 my $code = (keys %{$subfield})[0];
684                 push @subfields, $code;
685                 push @subfields, $subfield->{$code};
686             }
687             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
688         } else {
689             $marc_field = MARC::Field->new($tag, $field)
690         }
691         $record->append_fields($marc_field);
692     }
693 ;
694     return $record;
695 }
696
697 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
698
699     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
700
701 Get mappings, an internal data structure later used by
702 L<_process_mappings($mappings, $data, $record_document, $altscript)> to process MARC target
703 data for a MARC mapping.
704
705 The returned C<$mappings> is not to to be confused with mappings provided by
706 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
707 provided by C<_foreach_mapping> and expands it to this internal data structure.
708 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
709 is then applied to each MARC target (leader, control field data, subfield or
710 joined subfields) and integrated into the mapping rules data structure used in
711 C<marc_records_to_documents> to transform MARC records into Elasticsearch
712 documents.
713
714 =over 4
715
716 =item C<$facet>
717
718 Boolean indicating whether to create a facet field for this mapping.
719
720 =item C<$suggestible>
721
722 Boolean indicating whether to create a suggestion field for this mapping.
723
724 =item C<$sort>
725
726 Boolean indicating whether to create a sort field for this mapping.
727
728 =item C<$search>
729
730 Boolean indicating whether to create a search field for this mapping.
731
732 =item C<$target_name>
733
734 Elasticsearch document target field name.
735
736 =item C<$target_type>
737
738 Elasticsearch document target field type.
739
740 =item C<$range>
741
742 An optional range as a string in the format "<START>-<END>" or "<START>",
743 where "<START>" and "<END>" are integers specifying a range that will be used
744 for extracting a substring from MARC data as Elasticsearch field target value.
745
746 The first character position is "0", and the range is inclusive,
747 so "0-2" means the first three characters of MARC data.
748
749 If only "<START>" is provided only one character at position "<START>" will
750 be extracted.
751
752 =back
753
754 =cut
755
756 sub _field_mappings {
757     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
758     my %mapping_defaults = ();
759     my @mappings;
760
761     my $substr_args = undef;
762     if (defined $range) {
763         # TODO: use value_callback instead?
764         my ($start, $end) = map(int, split /-/, $range, 2);
765         $substr_args = [$start];
766         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
767     }
768     my $default_options = {};
769     if ($substr_args) {
770         $default_options->{substr} = $substr_args;
771     }
772
773     # TODO: Should probably have per type value callback/hook
774     # but hard code for now
775     if ($target_type eq 'boolean') {
776         $default_options->{value_callbacks} //= [];
777         push @{$default_options->{value_callbacks}}, sub {
778             my ($value) = @_;
779             # Trim whitespace at both ends
780             $value =~ s/^\s+|\s+$//g;
781             return $value ? 'true' : 'false';
782         };
783     }
784
785     if ($search) {
786         my $mapping = [$target_name, $default_options];
787         push @mappings, $mapping;
788     }
789
790     my @suffixes = ();
791     push @suffixes, 'facet' if $facet;
792     push @suffixes, 'suggestion' if $suggestible;
793     push @suffixes, 'sort' if !defined $sort || $sort;
794
795     foreach my $suffix (@suffixes) {
796         my $mapping = ["${target_name}__$suffix"];
797         # TODO: Hack, fix later in less hideous manner
798         if ($suffix eq 'suggestion') {
799             push @{$mapping}, {%{$default_options}, property => 'input'};
800         }
801         else {
802             push @{$mapping}, $default_options;
803         }
804         push @mappings, $mapping;
805     }
806     return @mappings;
807 };
808
809 =head2 _get_marc_mapping_rules
810
811     my $mapping_rules = $self->_get_marc_mapping_rules()
812
813 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
814
815 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
816 each call to C<MARC::Record>->field) we create an optimized structure of mapping
817 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
818
819 We can then iterate through all MARC fields for each record and apply all relevant
820 rules once per fields instead of retreiving fields multiple times for each mapping rule
821 which is terribly slow.
822
823 =cut
824
825 # TODO: This structure can be used for processing multiple MARC::Records so is currently
826 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
827 # memory cache which it is currently not. The performance gain of caching
828 # would probably be marginal, but to do this could be a further improvement.
829
830 sub _get_marc_mapping_rules {
831     my ($self) = @_;
832     my $marcflavour = lc C4::Context->preference('marcflavour');
833     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
834     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
835     my $rules = {
836         'leader' => [],
837         'control_fields' => {},
838         'data_fields' => {},
839         'sum' => [],
840         'isbn' => [],
841         'defaults' => {}
842     };
843
844     $self->_foreach_mapping(sub {
845         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
846         return if $marc_type ne $marcflavour;
847
848         if ($type eq 'sum') {
849             push @{$rules->{sum}}, $name;
850             push @{$rules->{sum}}, $name."__sort" if $sort;
851         }
852         elsif ($type eq 'isbn') {
853             push @{$rules->{isbn}}, $name;
854         }
855         elsif ($type eq 'boolean') {
856             # boolean gets special handling, if value doesn't exist for a field,
857             # it is set to false
858             $rules->{defaults}->{$name} = 'false';
859         }
860
861         if ($marc_field =~ $field_spec_regexp) {
862             my $field_tag = $1;
863
864             my @subfields;
865             my @subfield_groups;
866             # Parse and separate subfields form subfield groups
867             if (defined $2) {
868                 my $subfield_group = '';
869                 my $open_group = 0;
870
871                 foreach my $token (split //, $2) {
872                     if ($token eq "(") {
873                         if ($open_group) {
874                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
875                                 "Unmatched opening parenthesis for $marc_field"
876                             );
877                         }
878                         else {
879                             $open_group = 1;
880                         }
881                     }
882                     elsif ($token eq ")") {
883                         if ($open_group) {
884                             if ($subfield_group) {
885                                 push @subfield_groups, $subfield_group;
886                                 $subfield_group = '';
887                             }
888                             $open_group = 0;
889                         }
890                         else {
891                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
892                                 "Unmatched closing parenthesis for $marc_field"
893                             );
894                         }
895                     }
896                     elsif ($open_group) {
897                         $subfield_group .= $token;
898                     }
899                     else {
900                         push @subfields, $token;
901                     }
902                 }
903             }
904             else {
905                 push @subfields, '*';
906             }
907
908             my $range = defined $3 ? $3 : undef;
909             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
910             if ($field_tag < 10) {
911                 $rules->{control_fields}->{$field_tag} //= [];
912                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
913             }
914             else {
915                 $rules->{data_fields}->{$field_tag} //= {};
916                 foreach my $subfield (@subfields) {
917                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
918                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
919                 }
920                 foreach my $subfield_group (@subfield_groups) {
921                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
922                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
923                 }
924             }
925         }
926         elsif ($marc_field =~ $leader_regexp) {
927             my $range = defined $1 ? $1 : undef;
928             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
929             push @{$rules->{leader}}, @mappings;
930         }
931         else {
932             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
933                 "Invalid MARC field expression: $marc_field"
934             );
935         }
936     });
937     return $rules;
938 }
939
940 =head2 _foreach_mapping
941
942     $self->_foreach_mapping(
943         sub {
944             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
945                 $marc_field )
946               = @_;
947             return unless $marc_type eq 'marc21';
948             print "Data comes from: " . $marc_field . "\n";
949         }
950     );
951
952 This allows you to apply a function to each entry in the elasticsearch mappings
953 table, in order to build the mappings for whatever is needed.
954
955 In the provided function, the files are:
956
957 =over 4
958
959 =item C<$name>
960
961 The field name for elasticsearch (corresponds to the 'mapping' column in the
962 database.
963
964 =item C<$type>
965
966 The type for this value, e.g. 'string'.
967
968 =item C<$facet>
969
970 True if this value should be facetised. This only really makes sense if the
971 field is understood by the facet processing code anyway.
972
973 =item C<$sort>
974
975 True if this is a field that a) needs special sort handling, and b) if it
976 should be sorted on. False if a) but not b). Undef if not a). This allows,
977 for example, author to be sorted on but not everything marked with "author"
978 to be included in that sort.
979
980 =item C<$marc_type>
981
982 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
983 'unimarc', 'normarc'.
984
985 =item C<$marc_field>
986
987 A string that describes the MARC field that contains the data to extract.
988 These are of a form suited to Catmandu's MARC fixers.
989
990 =back
991
992 =cut
993
994 sub _foreach_mapping {
995     my ( $self, $sub ) = @_;
996
997     # TODO use a caching framework here
998     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
999         {
1000             'search_marc_map.index_name' => $self->index,
1001         },
1002         {   join => { search_marc_to_fields => 'search_marc_map' },
1003             '+select' => [
1004                 'search_marc_to_fields.facet',
1005                 'search_marc_to_fields.suggestible',
1006                 'search_marc_to_fields.sort',
1007                 'search_marc_to_fields.search',
1008                 'search_marc_map.marc_type',
1009                 'search_marc_map.marc_field',
1010             ],
1011             '+as'     => [
1012                 'facet',
1013                 'suggestible',
1014                 'sort',
1015                 'search',
1016                 'marc_type',
1017                 'marc_field',
1018             ],
1019         }
1020     );
1021
1022     while ( my $search_field = $search_fields->next ) {
1023         $sub->(
1024             # Force lower case on indexed field names for case insensitive
1025             # field name searches
1026             lc($search_field->name),
1027             $search_field->type,
1028             $search_field->get_column('facet'),
1029             $search_field->get_column('suggestible'),
1030             $search_field->get_column('sort'),
1031             $search_field->get_column('search'),
1032             $search_field->get_column('marc_type'),
1033             $search_field->get_column('marc_field'),
1034         );
1035     }
1036 }
1037
1038 =head2 process_error
1039
1040     die process_error($@);
1041
1042 This parses an Elasticsearch error message and produces a human-readable
1043 result from it. This result is probably missing all the useful information
1044 that you might want in diagnosing an issue, so the warning is also logged.
1045
1046 Note that currently the resulting message is not internationalised. This
1047 will happen eventually by some method or other.
1048
1049 =cut
1050
1051 sub process_error {
1052     my ($self, $msg) = @_;
1053
1054     warn $msg; # simple logging
1055
1056     # This is super-primitive
1057     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1058
1059     return "Unable to perform your search. Please try again.\n";
1060 }
1061
1062 =head2 _read_configuration
1063
1064     my $conf = _read_configuration();
1065
1066 Reads the I<configuration file> and returns a hash structure with the
1067 configuration information. It raises an exception if mandatory entries
1068 are missing.
1069
1070 The hashref structure has the following form:
1071
1072     {
1073         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1074         'index_name' => 'koha_instance',
1075     }
1076
1077 This is configured by the following in the C<config> block in koha-conf.xml:
1078
1079     <elasticsearch>
1080         <server>127.0.0.1:9200</server>
1081         <server>anotherserver:9200</server>
1082         <index_name>koha_instance</index_name>
1083     </elasticsearch>
1084
1085 =cut
1086
1087 sub _read_configuration {
1088
1089     my $configuration;
1090
1091     my $conf = C4::Context->config('elasticsearch');
1092     Koha::Exceptions::Config::MissingEntry->throw(
1093         "Missing 'elasticsearch' block in config file")
1094       unless defined $conf;
1095
1096     if ( $conf && $conf->{server} ) {
1097         my $nodes = $conf->{server};
1098         if ( ref($nodes) eq 'ARRAY' ) {
1099             $configuration->{nodes} = $nodes;
1100         }
1101         else {
1102             $configuration->{nodes} = [$nodes];
1103         }
1104     }
1105     else {
1106         Koha::Exceptions::Config::MissingEntry->throw(
1107             "Missing 'server' entry in config file for elasticsearch");
1108     }
1109
1110     if ( defined $conf->{index_name} ) {
1111         $configuration->{index_name} = $conf->{index_name};
1112     }
1113     else {
1114         Koha::Exceptions::Config::MissingEntry->throw(
1115             "Missing 'index_name' entry in config file for elasticsearch");
1116     }
1117
1118     return $configuration;
1119 }
1120
1121 =head2 get_facetable_fields
1122
1123 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1124
1125 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1126
1127 =cut
1128
1129 sub get_facetable_fields {
1130     my ($self) = @_;
1131
1132     # These should correspond to the ES field names, as opposed to the CCL
1133     # things that zebra uses.
1134     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1135     my @faceted_fields = Koha::SearchFields->search(
1136         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1137     );
1138     my @not_faceted_fields = Koha::SearchFields->search(
1139         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1140     );
1141     # This could certainly be improved
1142     return ( @faceted_fields, @not_faceted_fields );
1143 }
1144
1145 1;
1146
1147 __END__
1148
1149 =head1 AUTHOR
1150
1151 =over 4
1152
1153 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1154
1155 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1156
1157 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1158
1159 =back
1160
1161 =cut