Bug 23137: Move cache flushing to the method
[srvgit] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
29 use Koha::Caches;
30 use C4::Heading;
31
32 use Carp;
33 use Clone qw(clone);
34 use JSON;
35 use Modern::Perl;
36 use Readonly;
37 use Search::Elasticsearch;
38 use Try::Tiny;
39 use YAML::Syck;
40
41 use List::Util qw( sum0 reduce );
42 use MARC::File::XML;
43 use MIME::Base64;
44 use Encode qw(encode);
45 use Business::ISBN;
46 use Scalar::Util qw(looks_like_number);
47
48 __PACKAGE__->mk_ro_accessors(qw( index ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
50
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX     => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
54
55 =head1 NAME
56
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
58
59 =head1 ACCESSORS
60
61 =over 4
62
63 =item index
64
65 The name of the index to use, generally 'biblios' or 'authorities'.
66
67 =back
68
69 =head1 FUNCTIONS
70
71 =cut
72
73 sub new {
74     my $class = shift @_;
75     my $self = $class->SUPER::new(@_);
76     # Check for a valid index
77     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
78     return $self;
79 }
80
81 =head2 get_elasticsearch
82
83     my $elasticsearch_client = $self->get_elasticsearch();
84
85 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
86 instance level and will be reused if method is called multiple times.
87
88 =cut
89
90 sub get_elasticsearch {
91     my $self = shift @_;
92     unless (defined $self->{elasticsearch}) {
93         my $conf = $self->get_elasticsearch_params();
94         $self->{elasticsearch} = Search::Elasticsearch->new($conf);
95     }
96     return $self->{elasticsearch};
97 }
98
99 =head2 get_elasticsearch_params
100
101     my $params = $self->get_elasticsearch_params();
102
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
105
106     {
107         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108         'index_name' => 'koha_instance_index',
109     }
110
111 This is configured by the following in the C<config> block in koha-conf.xml:
112
113     <elasticsearch>
114         <server>127.0.0.1:9200</server>
115         <server>anotherserver:9200</server>
116         <index_name>koha_instance</index_name>
117     </elasticsearch>
118
119 =cut
120
121 sub get_elasticsearch_params {
122     my ($self) = @_;
123
124     # Copy the hash so that we're not modifying the original
125     my $conf = C4::Context->config('elasticsearch');
126     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127     my $es = { %{ $conf } };
128
129     # Helpfully, the multiple server lines end up in an array for us anyway
130     # if there are multiple ones, but not if there's only one.
131     my $server = $es->{server};
132     delete $es->{server};
133     if ( ref($server) eq 'ARRAY' ) {
134
135         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136         $es->{nodes} = $server;
137     }
138     elsif ($server) {
139         $es->{nodes} = [$server];
140     }
141     else {
142         die "No elasticsearch servers were specified in koha-conf.xml.\n";
143     }
144     die "No elasticsearch index_name was specified in koha-conf.xml.\n"
145       if ( !$es->{index_name} );
146     # Append the name of this particular index to our namespace
147     $es->{index_name} .= '_' . $self->index;
148
149     $es->{key_prefix} = 'es_';
150     $es->{cxn_pool} //= 'Static';
151     $es->{request_timeout} //= 60;
152
153     return $es;
154 }
155
156 =head2 get_elasticsearch_settings
157
158     my $settings = $self->get_elasticsearch_settings();
159
160 This provides the settings provided to Elasticsearch when an index is created.
161 These can do things like define tokenization methods.
162
163 A hashref containing the settings is returned.
164
165 =cut
166
167 sub get_elasticsearch_settings {
168     my ($self) = @_;
169
170     # Use state to speed up repeated calls
171     state $settings = undef;
172     if (!defined $settings) {
173         my $config_file = C4::Context->config('elasticsearch_index_config');
174         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
175         $settings = LoadFile( $config_file );
176     }
177
178     return $settings;
179 }
180
181 =head2 get_elasticsearch_mappings
182
183     my $mappings = $self->get_elasticsearch_mappings();
184
185 This provides the mappings that get passed to Elasticsearch when an index is
186 created.
187
188 =cut
189
190 sub get_elasticsearch_mappings {
191     my ($self) = @_;
192
193     # Use state to speed up repeated calls
194     state %all_mappings;
195     state %sort_fields;
196
197     if (!defined $all_mappings{$self->index}) {
198         $sort_fields{$self->index} = {};
199         # Clone the general mapping to break ties with the original hash
200         my $mappings = {
201             data => clone(_get_elasticsearch_field_config('general', ''))
202         };
203         my $marcflavour = lc C4::Context->preference('marcflavour');
204         $self->_foreach_mapping(
205             sub {
206                 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
207                 return if $marc_type ne $marcflavour;
208                 # TODO if this gets any sort of complexity to it, it should
209                 # be broken out into its own function.
210
211                 # TODO be aware of date formats, but this requires pre-parsing
212                 # as ES will simply reject anything with an invalid date.
213                 my $es_type = 'text';
214                 if ($type eq 'boolean') {
215                     $es_type = 'boolean';
216                 } elsif ($type eq 'number' || $type eq 'sum') {
217                     $es_type = 'integer';
218                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
219                     $es_type = 'stdno';
220                 }
221
222                 if ($search) {
223                     $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
224                 }
225
226                 if ($facet) {
227                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
228                 }
229                 if ($suggestible) {
230                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
231                 }
232                 # Sort is a bit special as it can be true, false, undef.
233                 # We care about "true" or "undef",
234                 # "undef" means to do the default thing, which is make it sortable.
235                 if (!defined $sort || $sort) {
236                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
237                     $sort_fields{$self->index}{$name} = 1;
238                 }
239             }
240         );
241         $all_mappings{$self->index} = $mappings;
242     }
243     $self->sort_fields(\%{$sort_fields{$self->index}});
244
245     return $all_mappings{$self->index};
246 }
247
248 =head2 raw_elasticsearch_mappings
249
250 Return elasticsearch mapping as it is in database.
251 marc_type: marc21|unimarc|normarc
252
253 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
254
255 =cut
256
257 sub raw_elasticsearch_mappings {
258     my ( $marc_type ) = @_;
259
260     my $schema = Koha::Database->new()->schema();
261
262     my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
263
264     my $mappings = {};
265     while ( my $search_field = $search_fields->next ) {
266
267         my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
268             { search_field_id => $search_field->id },
269             {
270                 join     => 'search_marc_map',
271                 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
272             }
273         );
274
275         while ( my $marc_to_field = $marc_to_fields->next ) {
276
277             my $marc_map = $marc_to_field->search_marc_map;
278
279             next if $marc_type && $marc_map->marc_type ne $marc_type;
280
281             $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
282             $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
283             $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284             $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
285
286             push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
287                 {
288                     facet   => $marc_to_field->facet || '',
289                     marc_type => $marc_map->marc_type,
290                     marc_field => $marc_map->marc_field,
291                     sort        => $marc_to_field->sort,
292                     suggestible => $marc_to_field->suggestible || ''
293                 });
294
295         }
296     }
297
298     return $mappings;
299 }
300
301 =head2 _get_elasticsearch_field_config
302
303 Get the Elasticsearch field config for the given purpose and data type.
304
305 $mapping = _get_elasticsearch_field_config('search', 'text');
306
307 =cut
308
309 sub _get_elasticsearch_field_config {
310
311     my ( $purpose, $type ) = @_;
312
313     # Use state to speed up repeated calls
314     state $settings = undef;
315     if (!defined $settings) {
316         my $config_file = C4::Context->config('elasticsearch_field_config');
317         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
318         $settings = LoadFile( $config_file );
319     }
320
321     if (!defined $settings->{$purpose}) {
322         die "Field purpose $purpose not defined in field config";
323     }
324     if ($type eq '') {
325         return $settings->{$purpose};
326     }
327     if (defined $settings->{$purpose}{$type}) {
328         return $settings->{$purpose}{$type};
329     }
330     if (defined $settings->{$purpose}{'default'}) {
331         return $settings->{$purpose}{'default'};
332     }
333     return;
334 }
335
336 =head2 _load_elasticsearch_mappings
337
338 Load Elasticsearch mappings in the format of mappings.yaml.
339
340 $indexes = _load_elasticsearch_mappings();
341
342 =cut
343
344 sub _load_elasticsearch_mappings {
345     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
346     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
347     return LoadFile( $mappings_yaml );
348 }
349
350 sub reset_elasticsearch_mappings {
351     my ( $self ) = @_;
352     my $indexes = $self->_load_elasticsearch_mappings();
353
354     Koha::SearchMarcMaps->delete;
355     Koha::SearchFields->delete;
356
357     while ( my ( $index_name, $fields ) = each %$indexes ) {
358         while ( my ( $field_name, $data ) = each %$fields ) {
359
360             my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
361
362             # Set default values
363             $sf_params{staff_client} //= 1;
364             $sf_params{opac} //= 1;
365
366             $sf_params{name} = $field_name;
367
368             my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
369
370             my $mappings = $data->{mappings};
371             for my $mapping ( @$mappings ) {
372                 my $marc_field = Koha::SearchMarcMaps->find_or_create({
373                     index_name => $index_name,
374                     marc_type => $mapping->{marc_type},
375                     marc_field => $mapping->{marc_field}
376                 });
377                 $search_field->add_to_search_marc_maps($marc_field, {
378                     facet => $mapping->{facet} || 0,
379                     suggestible => $mapping->{suggestible} || 0,
380                     sort => $mapping->{sort},
381                     search => $mapping->{search} // 1
382                 });
383             }
384         }
385     }
386
387     my $cache = Koha::Caches->get_instance();
388     $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
389     $cache->clear_from_cache('elasticsearch_search_fields_opac');
390
391     # FIXME return the mappings?
392 }
393
394 # This overrides the accessor provided by Class::Accessor so that if
395 # sort_fields isn't set, then it'll generate it.
396 sub sort_fields {
397     my $self = shift;
398     if (@_) {
399         $self->_sort_fields_accessor(@_);
400         return;
401     }
402     my $val = $self->_sort_fields_accessor();
403     return $val if $val;
404
405     # This will populate the accessor as a side effect
406     $self->get_elasticsearch_mappings();
407     return $self->_sort_fields_accessor();
408 }
409
410 =head2 _process_mappings($mappings, $data, $record_document, $meta)
411
412     $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
413
414 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
415 Since we group all mappings by MARC field targets C<$mappings> will contain
416 all targets for C<$data> and thus we need to fetch the MARC field only once.
417 C<$mappings> will be applied to C<$record_document> and new field values added.
418 The method has no return value.
419
420 =over 4
421
422 =item C<$mappings>
423
424 Arrayref of mappings containing arrayrefs in the format
425 [C<$target>, C<$options>] where C<$target> is the name of the target field and
426 C<$options> is a hashref containing processing directives for this particular
427 mapping.
428
429 =item C<$data>
430
431 The source data from a MARC record field.
432
433 =item C<$record_document>
434
435 Hashref representing the Elasticsearch document on which mappings should be
436 applied.
437
438 =item C<$meta>
439
440 A hashref containing metadata useful for enforcing per mapping rules. For
441 example for providing extra context for mapping options, or treating mapping
442 targets differently depending on type (sort, search, facet etc). Combining
443 this metadata with the mapping options and metadata allows us to mutate the
444 data per mapping, or even replace it with other data retrieved from the
445 metadata context.
446
447 Current properties are:
448
449 C<altscript>: A boolean value indicating whether an alternate script presentation is being
450 processed.
451
452 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
453 'subfield' or 'subfields_group'.
454
455 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
456
457 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
458 is 'subfields_group'.
459
460 C<field>: The original C<MARC::Record> object.
461
462 =back
463
464 =cut
465
466 sub _process_mappings {
467     my ($_self, $mappings, $data, $record_document, $meta) = @_;
468     foreach my $mapping (@{$mappings}) {
469         my ($target, $options) = @{$mapping};
470
471         # Don't process sort fields for alternate scripts
472         my $sort = $target =~ /__sort$/;
473         if ($sort && $meta->{altscript}) {
474             next;
475         }
476
477         # Copy (scalar) data since can have multiple targets
478         # with differing options for (possibly) mutating data
479         # so need a different copy for each
480         my $_data = $data;
481         $record_document->{$target} //= [];
482         if (defined $options->{substr}) {
483             my ($start, $length) = @{$options->{substr}};
484             $_data = length($data) > $start ? substr $data, $start, $length : '';
485         }
486         if (defined $options->{value_callbacks}) {
487             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
488         }
489         if (defined $options->{property}) {
490             $_data = {
491                 $options->{property} => $_data
492             }
493         }
494         if (defined $options->{nonfiling_characters_indicator}) {
495             my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
496             $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
497             if ($nonfiling_chars) {
498                 $_data = substr $_data, $nonfiling_chars;
499             }
500         }
501         push @{$record_document->{$target}}, $_data;
502     }
503 }
504
505 =head2 marc_records_to_documents($marc_records)
506
507     my $record_documents = $self->marc_records_to_documents($marc_records);
508
509 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
510
511 Returns array of hash references, representing Elasticsearch documents,
512 acceptable as body payload in C<Search::Elasticsearch> requests.
513
514 =over 4
515
516 =item C<$marc_documents>
517
518 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
519
520 =back
521
522 =cut
523
524 sub marc_records_to_documents {
525     my ($self, $records) = @_;
526     my $rules = $self->_get_marc_mapping_rules();
527     my $control_fields_rules = $rules->{control_fields};
528     my $data_fields_rules = $rules->{data_fields};
529     my $marcflavour = lc C4::Context->preference('marcflavour');
530     my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
531
532     my @record_documents;
533
534     foreach my $record (@{$records}) {
535         my $record_document = {};
536         my $mappings = $rules->{leader};
537         if ($mappings) {
538             $self->_process_mappings($mappings, $record->leader(), $record_document, {
539                     altscript => 0,
540                     data_source => 'leader'
541                 }
542             );
543         }
544         foreach my $field ($record->fields()) {
545             if ($field->is_control_field()) {
546                 my $mappings = $control_fields_rules->{$field->tag()};
547                 if ($mappings) {
548                     $self->_process_mappings($mappings, $field->data(), $record_document, {
549                             altscript => 0,
550                             data_source => 'control_field',
551                             field => $field
552                         }
553                     );
554                 }
555             }
556             else {
557                 my $tag = $field->tag();
558                 # Handle alternate scripts in MARC 21
559                 my $altscript = 0;
560                 if ($marcflavour eq 'marc21' && $tag eq '880') {
561                     my $sub6 = $field->subfield('6');
562                     if ($sub6 =~ /^(...)-\d+/) {
563                         $tag = $1;
564                         $altscript = 1;
565                     }
566                 }
567
568                 my $data_field_rules = $data_fields_rules->{$tag};
569                 if ($data_field_rules) {
570                     my $subfields_mappings = $data_field_rules->{subfields};
571                     my $wildcard_mappings = $subfields_mappings->{'*'};
572                     foreach my $subfield ($field->subfields()) {
573                         my ($code, $data) = @{$subfield};
574                         my $mappings = $subfields_mappings->{$code} // [];
575                         if ($wildcard_mappings) {
576                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
577                         }
578                         if (@{$mappings}) {
579                             $self->_process_mappings($mappings, $data, $record_document, {
580                                     altscript => $altscript,
581                                     data_source => 'subfield',
582                                     code => $code,
583                                     field => $field
584                                 }
585                             );
586                         }
587                         if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
588                             # Used by the authority linker the match-heading field requires a specific syntax
589                             # that is specified in C4/Heading
590                             my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
591                             next unless $heading;
592                             push @{$record_document->{'match-heading'}}, $heading->search_form;
593                         }
594                     }
595
596                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
597                     if ($subfields_join_mappings) {
598                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
599                             # Map each subfield to values, remove empty values, join with space
600                             my $data = join(
601                                 ' ',
602                                 grep(
603                                     $_,
604                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
605                                 )
606                             );
607                             if ($data) {
608                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
609                                         altscript => $altscript,
610                                         data_source => 'subfields_group',
611                                         codes => $subfields_group,
612                                         field => $field
613                                     }
614                                 );
615                             }
616                             if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
617                                 # Used by the authority linker the match-heading field requires a specific syntax
618                                 # that is specified in C4/Heading
619                                 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
620                                 next unless $heading;
621                                 push @{$record_document->{'match-heading'}}, $heading->search_form;
622                             }
623                         }
624                     }
625                 }
626             }
627         }
628         foreach my $field (keys %{$rules->{defaults}}) {
629             unless (defined $record_document->{$field}) {
630                 $record_document->{$field} = $rules->{defaults}->{$field};
631             }
632         }
633         foreach my $field (@{$rules->{sum}}) {
634             if (defined $record_document->{$field}) {
635                 # TODO: validate numeric? filter?
636                 # TODO: Or should only accept fields without nested values?
637                 # TODO: Quick and dirty, improve if needed
638                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
639             }
640         }
641         # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
642         foreach my $field (@{$rules->{isbn}}) {
643             if (defined $record_document->{$field}) {
644                 my @isbns = ();
645                 foreach my $input_isbn (@{$record_document->{$field}}) {
646                     my $isbn = Business::ISBN->new($input_isbn);
647                     if (defined $isbn && $isbn->is_valid) {
648                         my $isbn13 = $isbn->as_isbn13->as_string;
649                         push @isbns, $isbn13;
650                         $isbn13 =~ s/\-//g;
651                         push @isbns, $isbn13;
652
653                         my $isbn10 = $isbn->as_isbn10;
654                         if ($isbn10) {
655                             $isbn10 = $isbn10->as_string;
656                             push @isbns, $isbn10;
657                             $isbn10 =~ s/\-//g;
658                             push @isbns, $isbn10;
659                         }
660                     } else {
661                         push @isbns, $input_isbn;
662                     }
663                 }
664                 $record_document->{$field} = \@isbns;
665             }
666         }
667
668         # Remove duplicate values and collapse sort fields
669         foreach my $field (keys %{$record_document}) {
670             if (ref($record_document->{$field}) eq 'ARRAY') {
671                 @{$record_document->{$field}} = do {
672                     my %seen;
673                     grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
674                 };
675                 if ($field =~ /__sort$/) {
676                     # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
677                     $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
678                 }
679             }
680         }
681
682         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
683         $record->encoding('UTF-8');
684         if ($use_array) {
685             $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
686             $record_document->{'marc_format'} = 'ARRAY';
687         } else {
688             my @warnings;
689             {
690                 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
691                 local $SIG{__WARN__} = sub {
692                     push @warnings, $_[0];
693                 };
694                 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
695             }
696             if (@warnings) {
697                 # Suppress warnings if record length exceeded
698                 unless (substr($record->leader(), 0, 5) eq '99999') {
699                     foreach my $warning (@warnings) {
700                         carp $warning;
701                     }
702                 }
703                 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
704                 $record_document->{'marc_format'} = 'MARCXML';
705             }
706             else {
707                 $record_document->{'marc_format'} = 'base64ISO2709';
708             }
709         }
710         push @record_documents, $record_document;
711     }
712     return \@record_documents;
713 }
714
715 =head2 _marc_to_array($record)
716
717     my @fields = _marc_to_array($record)
718
719 Convert a MARC::Record to an array modeled after MARC-in-JSON
720 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
721
722 =over 4
723
724 =item C<$record>
725
726 A MARC::Record object
727
728 =back
729
730 =cut
731
732 sub _marc_to_array {
733     my ($self, $record) = @_;
734
735     my $data = {
736         leader => $record->leader(),
737         fields => []
738     };
739     for my $field ($record->fields()) {
740         my $tag = $field->tag();
741         if ($field->is_control_field()) {
742             push @{$data->{fields}}, {$tag => $field->data()};
743         } else {
744             my $subfields = ();
745             foreach my $subfield ($field->subfields()) {
746                 my ($code, $contents) = @{$subfield};
747                 push @{$subfields}, {$code => $contents};
748             }
749             push @{$data->{fields}}, {
750                 $tag => {
751                     ind1 => $field->indicator(1),
752                     ind2 => $field->indicator(2),
753                     subfields => $subfields
754                 }
755             };
756         }
757     }
758     return $data;
759 }
760
761 =head2 _array_to_marc($data)
762
763     my $record = _array_to_marc($data)
764
765 Convert an array modeled after MARC-in-JSON to a MARC::Record
766
767 =over 4
768
769 =item C<$data>
770
771 An array modeled after MARC-in-JSON
772 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
773
774 =back
775
776 =cut
777
778 sub _array_to_marc {
779     my ($self, $data) = @_;
780
781     my $record = MARC::Record->new();
782
783     $record->leader($data->{leader});
784     for my $field (@{$data->{fields}}) {
785         my $tag = (keys %{$field})[0];
786         $field = $field->{$tag};
787         my $marc_field;
788         if (ref($field) eq 'HASH') {
789             my @subfields;
790             foreach my $subfield (@{$field->{subfields}}) {
791                 my $code = (keys %{$subfield})[0];
792                 push @subfields, $code;
793                 push @subfields, $subfield->{$code};
794             }
795             $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
796         } else {
797             $marc_field = MARC::Field->new($tag, $field)
798         }
799         $record->append_fields($marc_field);
800     }
801 ;
802     return $record;
803 }
804
805 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
806
807     my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
808
809 Get mappings, an internal data structure later used by
810 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
811 data for a MARC mapping.
812
813 The returned C<$mappings> is not to to be confused with mappings provided by
814 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
815 provided by C<_foreach_mapping> and expands it to this internal data structure.
816 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
817 is then applied to each MARC target (leader, control field data, subfield or
818 joined subfields) and integrated into the mapping rules data structure used in
819 C<marc_records_to_documents> to transform MARC records into Elasticsearch
820 documents.
821
822 =over 4
823
824 =item C<$facet>
825
826 Boolean indicating whether to create a facet field for this mapping.
827
828 =item C<$suggestible>
829
830 Boolean indicating whether to create a suggestion field for this mapping.
831
832 =item C<$sort>
833
834 Boolean indicating whether to create a sort field for this mapping.
835
836 =item C<$search>
837
838 Boolean indicating whether to create a search field for this mapping.
839
840 =item C<$target_name>
841
842 Elasticsearch document target field name.
843
844 =item C<$target_type>
845
846 Elasticsearch document target field type.
847
848 =item C<$range>
849
850 An optional range as a string in the format "<START>-<END>" or "<START>",
851 where "<START>" and "<END>" are integers specifying a range that will be used
852 for extracting a substring from MARC data as Elasticsearch field target value.
853
854 The first character position is "0", and the range is inclusive,
855 so "0-2" means the first three characters of MARC data.
856
857 If only "<START>" is provided only one character at position "<START>" will
858 be extracted.
859
860 =back
861
862 =cut
863
864 sub _field_mappings {
865     my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
866     my %mapping_defaults = ();
867     my @mappings;
868
869     my $substr_args = undef;
870     if (defined $range) {
871         # TODO: use value_callback instead?
872         my ($start, $end) = map(int, split /-/, $range, 2);
873         $substr_args = [$start];
874         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
875     }
876     my $default_options = {};
877     if ($substr_args) {
878         $default_options->{substr} = $substr_args;
879     }
880
881     # TODO: Should probably have per type value callback/hook
882     # but hard code for now
883     if ($target_type eq 'boolean') {
884         $default_options->{value_callbacks} //= [];
885         push @{$default_options->{value_callbacks}}, sub {
886             my ($value) = @_;
887             # Trim whitespace at both ends
888             $value =~ s/^\s+|\s+$//g;
889             return $value ? 'true' : 'false';
890         };
891     }
892
893     if ($search) {
894         my $mapping = [$target_name, $default_options];
895         push @mappings, $mapping;
896     }
897
898     my @suffixes = ();
899     push @suffixes, 'facet' if $facet;
900     push @suffixes, 'suggestion' if $suggestible;
901     push @suffixes, 'sort' if !defined $sort || $sort;
902
903     foreach my $suffix (@suffixes) {
904         my $mapping = ["${target_name}__$suffix"];
905         # TODO: Hack, fix later in less hideous manner
906         if ($suffix eq 'suggestion') {
907             push @{$mapping}, {%{$default_options}, property => 'input'};
908         }
909         else {
910             # Important! Make shallow clone, or we end up with the same hashref
911             # shared by all mappings
912             push @{$mapping}, {%{$default_options}};
913         }
914         push @mappings, $mapping;
915     }
916     return @mappings;
917 };
918
919 =head2 _get_marc_mapping_rules
920
921     my $mapping_rules = $self->_get_marc_mapping_rules()
922
923 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
924
925 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
926 each call to C<MARC::Record>->field) we create an optimized structure of mapping
927 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
928
929 We can then iterate through all MARC fields for each record and apply all relevant
930 rules once per fields instead of retreiving fields multiple times for each mapping rule
931 which is terribly slow.
932
933 =cut
934
935 # TODO: This structure can be used for processing multiple MARC::Records so is currently
936 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
937 # memory cache which it is currently not. The performance gain of caching
938 # would probably be marginal, but to do this could be a further improvement.
939
940 sub _get_marc_mapping_rules {
941     my ($self) = @_;
942     my $marcflavour = lc C4::Context->preference('marcflavour');
943     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
944     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
945     my $rules = {
946         'leader' => [],
947         'control_fields' => {},
948         'data_fields' => {},
949         'sum' => [],
950         'isbn' => [],
951         'defaults' => {}
952     };
953
954     $self->_foreach_mapping(sub {
955         my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
956         return if $marc_type ne $marcflavour;
957
958         if ($type eq 'sum') {
959             push @{$rules->{sum}}, $name;
960             push @{$rules->{sum}}, $name."__sort" if $sort;
961         }
962         elsif ($type eq 'isbn') {
963             push @{$rules->{isbn}}, $name;
964         }
965         elsif ($type eq 'boolean') {
966             # boolean gets special handling, if value doesn't exist for a field,
967             # it is set to false
968             $rules->{defaults}->{$name} = 'false';
969         }
970
971         if ($marc_field =~ $field_spec_regexp) {
972             my $field_tag = $1;
973
974             my @subfields;
975             my @subfield_groups;
976             # Parse and separate subfields form subfield groups
977             if (defined $2) {
978                 my $subfield_group = '';
979                 my $open_group = 0;
980
981                 foreach my $token (split //, $2) {
982                     if ($token eq "(") {
983                         if ($open_group) {
984                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
985                                 "Unmatched opening parenthesis for $marc_field"
986                             );
987                         }
988                         else {
989                             $open_group = 1;
990                         }
991                     }
992                     elsif ($token eq ")") {
993                         if ($open_group) {
994                             if ($subfield_group) {
995                                 push @subfield_groups, $subfield_group;
996                                 $subfield_group = '';
997                             }
998                             $open_group = 0;
999                         }
1000                         else {
1001                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1002                                 "Unmatched closing parenthesis for $marc_field"
1003                             );
1004                         }
1005                     }
1006                     elsif ($open_group) {
1007                         $subfield_group .= $token;
1008                     }
1009                     else {
1010                         push @subfields, $token;
1011                     }
1012                 }
1013             }
1014             else {
1015                 push @subfields, '*';
1016             }
1017
1018             my $range = defined $3 ? $3 : undef;
1019             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1020             if ($field_tag < 10) {
1021                 $rules->{control_fields}->{$field_tag} //= [];
1022                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1023             }
1024             else {
1025                 $rules->{data_fields}->{$field_tag} //= {};
1026                 foreach my $subfield (@subfields) {
1027                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1028                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1029                 }
1030                 foreach my $subfield_group (@subfield_groups) {
1031                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1032                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1033                 }
1034             }
1035         }
1036         elsif ($marc_field =~ $leader_regexp) {
1037             my $range = defined $1 ? $1 : undef;
1038             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1039             push @{$rules->{leader}}, @mappings;
1040         }
1041         else {
1042             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1043                 "Invalid MARC field expression: $marc_field"
1044             );
1045         }
1046     });
1047
1048     # Marc-flavour specific rule tweaks, could/should also provide hook for this
1049     if ($marcflavour eq 'marc21') {
1050         # Nonfiling characters processing for sort fields
1051         my %title_fields;
1052         if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1053             # Format is: nonfiling characters indicator => field names list
1054             %title_fields = (
1055                 1 => [130, 630, 730, 740],
1056                 2 => [222, 240, 242, 243, 245, 440, 830]
1057             );
1058         }
1059         elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1060             %title_fields = (
1061                 1 => [730],
1062                 2 => [130, 430, 530]
1063             );
1064         }
1065         foreach my $indicator (keys %title_fields) {
1066             foreach my $field_tag (@{$title_fields{$indicator}}) {
1067                 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1068                 foreach my $mapping (@{$mappings}) {
1069                     if ($mapping->[0] =~ /__sort$/) {
1070                         # Mark this as to be processed for nonfiling characters indicator
1071                         # later on in _process_mappings
1072                         $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1073                     }
1074                 }
1075             }
1076         }
1077     }
1078
1079     return $rules;
1080 }
1081
1082 =head2 _foreach_mapping
1083
1084     $self->_foreach_mapping(
1085         sub {
1086             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1087                 $marc_field )
1088               = @_;
1089             return unless $marc_type eq 'marc21';
1090             print "Data comes from: " . $marc_field . "\n";
1091         }
1092     );
1093
1094 This allows you to apply a function to each entry in the elasticsearch mappings
1095 table, in order to build the mappings for whatever is needed.
1096
1097 In the provided function, the files are:
1098
1099 =over 4
1100
1101 =item C<$name>
1102
1103 The field name for elasticsearch (corresponds to the 'mapping' column in the
1104 database.
1105
1106 =item C<$type>
1107
1108 The type for this value, e.g. 'string'.
1109
1110 =item C<$facet>
1111
1112 True if this value should be facetised. This only really makes sense if the
1113 field is understood by the facet processing code anyway.
1114
1115 =item C<$sort>
1116
1117 True if this is a field that a) needs special sort handling, and b) if it
1118 should be sorted on. False if a) but not b). Undef if not a). This allows,
1119 for example, author to be sorted on but not everything marked with "author"
1120 to be included in that sort.
1121
1122 =item C<$marc_type>
1123
1124 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1125 'unimarc', 'normarc'.
1126
1127 =item C<$marc_field>
1128
1129 A string that describes the MARC field that contains the data to extract.
1130 These are of a form suited to Catmandu's MARC fixers.
1131
1132 =back
1133
1134 =cut
1135
1136 sub _foreach_mapping {
1137     my ( $self, $sub ) = @_;
1138
1139     # TODO use a caching framework here
1140     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1141         {
1142             'search_marc_map.index_name' => $self->index,
1143         },
1144         {   join => { search_marc_to_fields => 'search_marc_map' },
1145             '+select' => [
1146                 'search_marc_to_fields.facet',
1147                 'search_marc_to_fields.suggestible',
1148                 'search_marc_to_fields.sort',
1149                 'search_marc_to_fields.search',
1150                 'search_marc_map.marc_type',
1151                 'search_marc_map.marc_field',
1152             ],
1153             '+as'     => [
1154                 'facet',
1155                 'suggestible',
1156                 'sort',
1157                 'search',
1158                 'marc_type',
1159                 'marc_field',
1160             ],
1161         }
1162     );
1163
1164     while ( my $search_field = $search_fields->next ) {
1165         $sub->(
1166             # Force lower case on indexed field names for case insensitive
1167             # field name searches
1168             lc($search_field->name),
1169             $search_field->type,
1170             $search_field->get_column('facet'),
1171             $search_field->get_column('suggestible'),
1172             $search_field->get_column('sort'),
1173             $search_field->get_column('search'),
1174             $search_field->get_column('marc_type'),
1175             $search_field->get_column('marc_field'),
1176         );
1177     }
1178 }
1179
1180 =head2 process_error
1181
1182     die process_error($@);
1183
1184 This parses an Elasticsearch error message and produces a human-readable
1185 result from it. This result is probably missing all the useful information
1186 that you might want in diagnosing an issue, so the warning is also logged.
1187
1188 Note that currently the resulting message is not internationalised. This
1189 will happen eventually by some method or other.
1190
1191 =cut
1192
1193 sub process_error {
1194     my ($self, $msg) = @_;
1195
1196     warn $msg; # simple logging
1197
1198     # This is super-primitive
1199     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1200
1201     return "Unable to perform your search. Please try again.\n";
1202 }
1203
1204 =head2 _read_configuration
1205
1206     my $conf = _read_configuration();
1207
1208 Reads the I<configuration file> and returns a hash structure with the
1209 configuration information. It raises an exception if mandatory entries
1210 are missing.
1211
1212 The hashref structure has the following form:
1213
1214     {
1215         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1216         'index_name' => 'koha_instance',
1217     }
1218
1219 This is configured by the following in the C<config> block in koha-conf.xml:
1220
1221     <elasticsearch>
1222         <server>127.0.0.1:9200</server>
1223         <server>anotherserver:9200</server>
1224         <index_name>koha_instance</index_name>
1225     </elasticsearch>
1226
1227 =cut
1228
1229 sub _read_configuration {
1230
1231     my $configuration;
1232
1233     my $conf = C4::Context->config('elasticsearch');
1234     Koha::Exceptions::Config::MissingEntry->throw(
1235         "Missing 'elasticsearch' block in config file")
1236       unless defined $conf;
1237
1238     if ( $conf && $conf->{server} ) {
1239         my $nodes = $conf->{server};
1240         if ( ref($nodes) eq 'ARRAY' ) {
1241             $configuration->{nodes} = $nodes;
1242         }
1243         else {
1244             $configuration->{nodes} = [$nodes];
1245         }
1246     }
1247     else {
1248         Koha::Exceptions::Config::MissingEntry->throw(
1249             "Missing 'server' entry in config file for elasticsearch");
1250     }
1251
1252     if ( defined $conf->{index_name} ) {
1253         $configuration->{index_name} = $conf->{index_name};
1254     }
1255     else {
1256         Koha::Exceptions::Config::MissingEntry->throw(
1257             "Missing 'index_name' entry in config file for elasticsearch");
1258     }
1259
1260     return $configuration;
1261 }
1262
1263 =head2 get_facetable_fields
1264
1265 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1266
1267 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1268
1269 =cut
1270
1271 sub get_facetable_fields {
1272     my ($self) = @_;
1273
1274     # These should correspond to the ES field names, as opposed to the CCL
1275     # things that zebra uses.
1276     my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1277     my @faceted_fields = Koha::SearchFields->search(
1278         { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1279     );
1280     my @not_faceted_fields = Koha::SearchFields->search(
1281         { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1282     );
1283     # This could certainly be improved
1284     return ( @faceted_fields, @not_faceted_fields );
1285 }
1286
1287 1;
1288
1289 __END__
1290
1291 =head1 AUTHOR
1292
1293 =over 4
1294
1295 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1296
1297 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1298
1299 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
1300
1301 =back
1302
1303 =cut