Bug 19893: Add code review fixes
[srvgit] / Koha / SearchEngine / Elasticsearch.pm
1 package Koha::SearchEngine::Elasticsearch;
2
3 # Copyright 2015 Catalyst IT
4 #
5 # This file is part of Koha.
6 #
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
10 # version.
11 #
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19
20 use base qw(Class::Accessor);
21
22 use C4::Context;
23
24 use Koha::Database;
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
28
29 use Carp;
30 use JSON;
31 use Modern::Perl;
32 use Readonly;
33 use Search::Elasticsearch;
34 use Try::Tiny;
35 use YAML::Syck;
36
37 use List::Util qw( sum0 reduce );
38 use MARC::File::XML;
39 use MIME::Base64;
40 use Encode qw(encode);
41
42 __PACKAGE__->mk_ro_accessors(qw( index ));
43 __PACKAGE__->mk_accessors(qw( sort_fields ));
44
45 # Constants to refer to the standard index names
46 Readonly our $BIBLIOS_INDEX     => 'biblios';
47 Readonly our $AUTHORITIES_INDEX => 'authorities';
48
49 =head1 NAME
50
51 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
52
53 =head1 ACCESSORS
54
55 =over 4
56
57 =item index
58
59 The name of the index to use, generally 'biblios' or 'authorities'.
60
61 =back
62
63 =head1 FUNCTIONS
64
65 =cut
66
67 sub new {
68     my $class = shift @_;
69     my $self = $class->SUPER::new(@_);
70     # Check for a valid index
71     Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
72     return $self;
73 }
74
75 =head2 get_elasticsearch
76
77     my $elasticsearch_client = $self->get_elasticsearch();
78
79 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
80 instance level and will be reused if method is called multiple times.
81
82 =cut
83
84 sub get_elasticsearch {
85     my $self = shift @_;
86     unless (defined $self->{elasticsearch}) {
87         my $conf = $self->get_elasticsearch_params();
88         $self->{elasticsearch} = Search::Elasticsearch->new(
89             client => "5_0::Direct",
90             nodes => $conf->{nodes},
91             cxn_pool => 'Sniff',
92             request_timeout => 60
93         );
94     }
95     return $self->{elasticsearch};
96 }
97
98 =head2 get_elasticsearch_params
99
100     my $params = $self->get_elasticsearch_params();
101
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
104
105     {
106         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107         'index_name' => 'koha_instance_index',
108     }
109
110 This is configured by the following in the C<config> block in koha-conf.xml:
111
112     <elasticsearch>
113         <server>127.0.0.1:9200</server>
114         <server>anotherserver:9200</server>
115         <index_name>koha_instance</index_name>
116     </elasticsearch>
117
118 =cut
119
120 sub get_elasticsearch_params {
121     my ($self) = @_;
122
123     # Copy the hash so that we're not modifying the original
124     my $conf = C4::Context->config('elasticsearch');
125     die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126     my $es = { %{ $conf } };
127
128     # Helpfully, the multiple server lines end up in an array for us anyway
129     # if there are multiple ones, but not if there's only one.
130     my $server = $es->{server};
131     delete $es->{server};
132     if ( ref($server) eq 'ARRAY' ) {
133
134         # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135         $es->{nodes} = $server;
136     }
137     elsif ($server) {
138         $es->{nodes} = [$server];
139     }
140     else {
141         die "No elasticsearch servers were specified in koha-conf.xml.\n";
142     }
143     die "No elasticserver index_name was specified in koha-conf.xml.\n"
144       if ( !$es->{index_name} );
145     # Append the name of this particular index to our namespace
146     $es->{index_name} .= '_' . $self->index;
147
148     $es->{key_prefix} = 'es_';
149     return $es;
150 }
151
152 =head2 get_elasticsearch_settings
153
154     my $settings = $self->get_elasticsearch_settings();
155
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
158
159 A hashref containing the settings is returned.
160
161 =cut
162
163 sub get_elasticsearch_settings {
164     my ($self) = @_;
165
166     # Use state to speed up repeated calls
167     state $settings = undef;
168     if (!defined $settings) {
169         my $config_file = C4::Context->config('elasticsearch_index_config');
170         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171         $settings = LoadFile( $config_file );
172     }
173
174     return $settings;
175 }
176
177 =head2 get_elasticsearch_mappings
178
179     my $mappings = $self->get_elasticsearch_mappings();
180
181 This provides the mappings that get passed to Elasticsearch when an index is
182 created.
183
184 =cut
185
186 sub get_elasticsearch_mappings {
187     my ($self) = @_;
188
189     # Use state to speed up repeated calls
190     state %all_mappings;
191     state %sort_fields;
192
193     if (!defined $all_mappings{$self->index}) {
194         $sort_fields{$self->index} = {};
195         my $mappings = {
196             data => scalar _get_elasticsearch_mapping('general', '')
197         };
198         my $marcflavour = lc C4::Context->preference('marcflavour');
199         $self->_foreach_mapping(
200             sub {
201                 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
202                 return if $marc_type ne $marcflavour;
203                 # TODO if this gets any sort of complexity to it, it should
204                 # be broken out into its own function.
205
206                 # TODO be aware of date formats, but this requires pre-parsing
207                 # as ES will simply reject anything with an invalid date.
208                 my $es_type = 'text';
209                 if ($type eq 'boolean') {
210                     $es_type = 'boolean';
211                 } elsif ($type eq 'number' || $type eq 'sum') {
212                     $es_type = 'integer';
213                 } elsif ($type eq 'isbn' || $type eq 'stdno') {
214                     $es_type = 'stdno';
215                 }
216
217                 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
218
219                 if ($facet) {
220                     $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
221                 }
222                 if ($suggestible) {
223                     $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
224                 }
225                 # Sort is a bit special as it can be true, false, undef.
226                 # We care about "true" or "undef",
227                 # "undef" means to do the default thing, which is make it sortable.
228                 if (!defined $sort || $sort) {
229                     $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
230                     $sort_fields{$self->index}{$name} = 1;
231                 }
232             }
233         );
234         $all_mappings{$self->index} = $mappings;
235     }
236     $self->sort_fields(\%{$sort_fields{$self->index}});
237
238     return $all_mappings{$self->index};
239 }
240
241 =head2 _get_elasticsearch_mapping
242
243 Get the Elasticsearch mappings for the given purpose and data type.
244
245 $mapping = _get_elasticsearch_mapping('search', 'text');
246
247 =cut
248
249 sub _get_elasticsearch_mapping {
250
251     my ( $purpose, $type ) = @_;
252
253     # Use state to speed up repeated calls
254     state $settings = undef;
255     if (!defined $settings) {
256         my $config_file = C4::Context->config('elasticsearch_field_config');
257         $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
258         $settings = LoadFile( $config_file );
259     }
260
261     if (!defined $settings->{$purpose}) {
262         die "Field purpose $purpose not defined in field config";
263     }
264     if ($type eq '') {
265         return $settings->{$purpose};
266     }
267     if (defined $settings->{$purpose}{$type}) {
268         return $settings->{$purpose}{$type};
269     }
270     if (defined $settings->{$purpose}{'default'}) {
271         return $settings->{$purpose}{'default'};
272     }
273     return;
274 }
275
276 sub reset_elasticsearch_mappings {
277     my ( $reset_fields ) = @_;
278     my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
279     $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
280     my $indexes = LoadFile( $mappings_yaml );
281
282     while ( my ( $index_name, $fields ) = each %$indexes ) {
283         while ( my ( $field_name, $data ) = each %$fields ) {
284             my $field_type = $data->{type};
285             my $field_label = $data->{label};
286             my $mappings = $data->{mappings};
287             my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
288             for my $mapping ( @$mappings ) {
289                 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
290                 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
291             }
292         }
293     }
294 }
295
296 # This overrides the accessor provided by Class::Accessor so that if
297 # sort_fields isn't set, then it'll generate it.
298 sub sort_fields {
299     my $self = shift;
300     if (@_) {
301         $self->_sort_fields_accessor(@_);
302         return;
303     }
304     my $val = $self->_sort_fields_accessor();
305     return $val if $val;
306
307     # This will populate the accessor as a side effect
308     $self->get_elasticsearch_mappings();
309     return $self->_sort_fields_accessor();
310 }
311
312 =head2 _process_mappings($mappings, $data, $record_document)
313
314     $self->_process_mappings($mappings, $marc_field_data, $record_document)
315
316 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
317 Since we group all mappings by MARC field targets C<$mappings> will contain
318 all targets for C<$data> and thus we need to fetch the MARC field only once.
319 C<$mappings> will be applied to C<$record_document> and new field values added.
320 The method has no return value.
321
322 =over 4
323
324 =item C<$mappings>
325
326 Arrayref of mappings containing arrayrefs in the format
327 [C<$taget>, C<$options>] where C<$target> is the name of the target field and
328 C<$options> is a hashref containing processing directives for this particular
329 mapping.
330
331 =item C<$data>
332
333 The source data from a MARC record field.
334
335 =item C<$record_document>
336
337 Hashref representing the Elasticsearch document on which mappings should be
338 applied.
339
340 =back
341
342 =cut
343
344 sub _process_mappings {
345     my ($_self, $mappings, $data, $record_document) = @_;
346     foreach my $mapping (@{$mappings}) {
347         my ($target, $options) = @{$mapping};
348         # Copy (scalar) data since can have multiple targets
349         # with differing options for (possibly) mutating data
350         # so need a different copy for each
351         my $_data = $data;
352         $record_document->{$target} //= [];
353         if (defined $options->{substr}) {
354             my ($start, $length) = @{$options->{substr}};
355             $_data = length($data) > $start ? substr $data, $start, $length : '';
356         }
357         if (defined $options->{value_callbacks}) {
358             $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
359         }
360         if (defined $options->{property}) {
361             $_data = {
362                 $options->{property} => $_data
363             }
364         }
365         push @{$record_document->{$target}}, $_data;
366     }
367 }
368
369 =head2 marc_records_to_documents($marc_records)
370
371     my @record_documents = $self->marc_records_to_documents($marc_records);
372
373 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
374
375 Returns array of hash references, representing Elasticsearch documents,
376 acceptable as body payload in C<Search::Elasticsearch> requests.
377
378 =over 4
379
380 =item C<$marc_documents>
381
382 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
383
384 =back
385
386 =cut
387
388 sub marc_records_to_documents {
389     my ($self, $records) = @_;
390     my $rules = $self->_get_marc_mapping_rules();
391     my $control_fields_rules = $rules->{control_fields};
392     my $data_fields_rules = $rules->{data_fields};
393     my $marcflavour = lc C4::Context->preference('marcflavour');
394
395     my @record_documents;
396
397     foreach my $record (@{$records}) {
398         my $record_document = {};
399         my $mappings = $rules->{leader};
400         if ($mappings) {
401             $self->_process_mappings($mappings, $record->leader(), $record_document);
402         }
403         foreach my $field ($record->fields()) {
404             if($field->is_control_field()) {
405                 my $mappings = $control_fields_rules->{$field->tag()};
406                 if ($mappings) {
407                     $self->_process_mappings($mappings, $field->data(), $record_document);
408                 }
409             }
410             else {
411                 my $data_field_rules = $data_fields_rules->{$field->tag()};
412
413                 if ($data_field_rules) {
414                     my $subfields_mappings = $data_field_rules->{subfields};
415                     my $wildcard_mappings = $subfields_mappings->{'*'};
416                     foreach my $subfield ($field->subfields()) {
417                         my ($code, $data) = @{$subfield};
418                         my $mappings = $subfields_mappings->{$code} // [];
419                         if ($wildcard_mappings) {
420                             $mappings = [@{$mappings}, @{$wildcard_mappings}];
421                         }
422                         if (@{$mappings}) {
423                             $self->_process_mappings($mappings, $data, $record_document);
424                         }
425                     }
426
427                     my $subfields_join_mappings = $data_field_rules->{subfields_join};
428                     if ($subfields_join_mappings) {
429                         foreach my $subfields_group (keys %{$subfields_join_mappings}) {
430                             # Map each subfield to values, remove empty values, join with space
431                             my $data = join(
432                                 ' ',
433                                 grep(
434                                     $_,
435                                     map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
436                                 )
437                             );
438                             if ($data) {
439                                 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document);
440                             }
441                         }
442                     }
443                 }
444             }
445         }
446         foreach my $field (keys %{$rules->{defaults}}) {
447             unless (defined $record_document->{$field}) {
448                 $record_document->{$field} = $rules->{defaults}->{$field};
449             }
450         }
451         foreach my $field (@{$rules->{sum}}) {
452             if (defined $record_document->{$field}) {
453                 # TODO: validate numeric? filter?
454                 # TODO: Or should only accept fields without nested values?
455                 # TODO: Quick and dirty, improve if needed
456                 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
457             }
458         }
459         # TODO: Perhaps should check if $records_document non empty, but really should never be the case
460         $record->encoding('UTF-8');
461         my @warnings;
462         {
463             # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
464             local $SIG{__WARN__} = sub {
465                 push @warnings, $_[0];
466             };
467             $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
468         }
469         if (@warnings) {
470             # Suppress warnings if record length exceeded
471             unless (substr($record->leader(), 0, 5) eq '99999') {
472                 foreach my $warning (@warnings) {
473                     carp $warning;
474                 }
475             }
476             $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
477             $record_document->{'marc_format'} = 'MARCXML';
478         }
479         else {
480             $record_document->{'marc_format'} = 'base64ISO2709';
481         }
482         my $id = $record->subfield('999', 'c');
483         push @record_documents, [$id, $record_document];
484     }
485     return \@record_documents;
486 }
487
488 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
489
490     my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
491
492 Get mappings, an internal data structure later used by
493 L<_process_mappings($mappings, $data, $record_document)> to process MARC target
494 data for a MARC mapping.
495
496 The returned C<$mappings> is not to to be confused with mappings provided by
497 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
498 provided by C<_foreach_mapping> and expands it to this internal data stucture.
499 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
500 is then applied to each MARC target (leader, control field data, subfield or
501 joined subfields) and integrated into the mapping rules data structure used in
502 C<marc_records_to_documents> to transform MARC records into Elasticsearch
503 documents.
504
505 =over 4
506
507 =item C<$facet>
508
509 Boolean indicating whether to create a facet field for this mapping.
510
511 =item C<$suggestible>
512
513 Boolean indicating whether to create a suggestion field for this mapping.
514
515 =item C<$sort>
516
517 Boolean indicating whether to create a sort field for this mapping.
518
519 =item C<$target_name>
520
521 Elasticsearch document target field name.
522
523 =item C<$target_type>
524
525 Elasticsearch document target field type.
526
527 =item C<$range>
528
529 An optional range as a string in the format "<START>-<END>" or "<START>",
530 where "<START>" and "<END>" are integers specifying a range that will be used
531 for extracting a substring from MARC data as Elasticsearch field target value.
532
533 The first character position is "1", and the range is inclusive,
534 so "1-3" means the first three characters of MARC data.
535
536 If only "<START>" is provided only one character at position "<START>" will
537 be extracted.
538
539 =back
540
541 =cut
542
543 sub _field_mappings {
544     my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
545     my %mapping_defaults = ();
546     my @mappings;
547
548     my $substr_args = undef;
549     if ($range) {
550         # TODO: use value_callback instead?
551         my ($start, $end) = map(int, split /-/, $range, 2);
552         $substr_args = [$start];
553         push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
554     }
555     my $default_options = {};
556     if ($substr_args) {
557         $default_options->{substr} = $substr_args;
558     }
559
560     # TODO: Should probably have per type value callback/hook
561     # but hard code for now
562     if ($target_type eq 'boolean') {
563         $default_options->{value_callbacks} //= [];
564         push @{$default_options->{value_callbacks}}, sub {
565             my ($value) = @_;
566             # Trim whitespace at both ends
567             $value =~ s/^\s+|\s+$//g;
568             return $value ? 'true' : 'false';
569         };
570     }
571
572     my $mapping = [$target_name, $default_options];
573     push @mappings, $mapping;
574
575     my @suffixes = ();
576     push @suffixes, 'facet' if $facet;
577     push @suffixes, 'suggestion' if $suggestible;
578     push @suffixes, 'sort' if !defined $sort || $sort;
579
580     foreach my $suffix (@suffixes) {
581         my $mapping = ["${target_name}__$suffix"];
582         # TODO: Hack, fix later in less hideous manner
583         if ($suffix eq 'suggestion') {
584             push @{$mapping}, {%{$default_options}, property => 'input'};
585         }
586         else {
587             push @{$mapping}, $default_options;
588         }
589         push @mappings, $mapping;
590     }
591     return @mappings;
592 };
593
594 =head2 _get_marc_mapping_rules
595
596     my $mapping_rules = $self->_get_marc_mapping_rules()
597
598 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
599
600 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
601 each call to C<MARC::Record>->field) we create an optimized structure of mapping
602 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
603
604 We can then iterate through all MARC fields for each record and apply all relevant
605 rules once per fields instead of retreiving fields multiple times for each mapping rule
606 which is terribly slow.
607
608 =cut
609
610 # TODO: This structure can be used for processing multiple MARC::Records so is currently
611 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
612 # memory cache which it is currently not. The performance gain of caching
613 # would probably be marginal, but to do this could be a further improvement.
614
615 sub _get_marc_mapping_rules {
616     my ($self) = @_;
617     my $marcflavour = lc C4::Context->preference('marcflavour');
618     my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
619     my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
620     my $rules = {
621         'leader' => [],
622         'control_fields' => {},
623         'data_fields' => {},
624         'sum' => [],
625         'defaults' => {}
626     };
627
628     $self->_foreach_mapping(sub {
629         my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
630         return if $marc_type ne $marcflavour;
631
632         if ($type eq 'sum') {
633             push @{$rules->{sum}}, $name;
634         }
635         elsif ($type eq 'boolean') {
636             # boolean gets special handling, if value doesn't exist for a field,
637             # it is set to false
638             $rules->{defaults}->{$name} = 'false';
639         }
640
641         if ($marc_field =~ $field_spec_regexp) {
642             my $field_tag = $1;
643
644             my @subfields;
645             my @subfield_groups;
646             # Parse and separate subfields form subfield groups
647             if (defined $2) {
648                 my $subfield_group = '';
649                 my $open_group = 0;
650
651                 foreach my $token (split //, $2) {
652                     if ($token eq "(") {
653                         if ($open_group) {
654                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
655                                 "Unmatched opening parenthesis for $marc_field"
656                             );
657                         }
658                         else {
659                             $open_group = 1;
660                         }
661                     }
662                     elsif ($token eq ")") {
663                         if ($open_group) {
664                             if ($subfield_group) {
665                                 push @subfield_groups, $subfield_group;
666                                 $subfield_group = '';
667                             }
668                             $open_group = 0;
669                         }
670                         else {
671                             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
672                                 "Unmatched closing parenthesis for $marc_field"
673                             );
674                         }
675                     }
676                     elsif ($open_group) {
677                         $subfield_group .= $token;
678                     }
679                     else {
680                         push @subfields, $token;
681                     }
682                 }
683             }
684             else {
685                 push @subfields, '*';
686             }
687
688             my $range = defined $3 ? $3 : undef;
689             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
690
691             if ($field_tag < 10) {
692                 $rules->{control_fields}->{$field_tag} //= [];
693                 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
694             }
695             else {
696                 $rules->{data_fields}->{$field_tag} //= {};
697                 foreach my $subfield (@subfields) {
698                     $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
699                     push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
700                 }
701                 foreach my $subfield_group (@subfield_groups) {
702                     $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
703                     push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
704                 }
705             }
706         }
707         elsif ($marc_field =~ $leader_regexp) {
708             my $range = defined $1 ? $1 : undef;
709             my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
710             push @{$rules->{leader}}, @mappings;
711         }
712         else {
713             Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
714                 "Invalid MARC field expression: $marc_field"
715             );
716         }
717     });
718     return $rules;
719 }
720
721 =head2 _foreach_mapping
722
723     $self->_foreach_mapping(
724         sub {
725             my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
726                 $marc_field )
727               = @_;
728             return unless $marc_type eq 'marc21';
729             print "Data comes from: " . $marc_field . "\n";
730         }
731     );
732
733 This allows you to apply a function to each entry in the elasticsearch mappings
734 table, in order to build the mappings for whatever is needed.
735
736 In the provided function, the files are:
737
738 =over 4
739
740 =item C<$name>
741
742 The field name for elasticsearch (corresponds to the 'mapping' column in the
743 database.
744
745 =item C<$type>
746
747 The type for this value, e.g. 'string'.
748
749 =item C<$facet>
750
751 True if this value should be facetised. This only really makes sense if the
752 field is understood by the facet processing code anyway.
753
754 =item C<$sort>
755
756 True if this is a field that a) needs special sort handling, and b) if it
757 should be sorted on. False if a) but not b). Undef if not a). This allows,
758 for example, author to be sorted on but not everything marked with "author"
759 to be included in that sort.
760
761 =item C<$marc_type>
762
763 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
764 'unimarc', 'normarc'.
765
766 =item C<$marc_field>
767
768 A string that describes the MARC field that contains the data to extract.
769 These are of a form suited to Catmandu's MARC fixers.
770
771 =back
772
773 =cut
774
775 sub _foreach_mapping {
776     my ( $self, $sub ) = @_;
777
778     # TODO use a caching framework here
779     my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
780         {
781             'search_marc_map.index_name' => $self->index,
782         },
783         {   join => { search_marc_to_fields => 'search_marc_map' },
784             '+select' => [
785                 'search_marc_to_fields.facet',
786                 'search_marc_to_fields.suggestible',
787                 'search_marc_to_fields.sort',
788                 'search_marc_map.marc_type',
789                 'search_marc_map.marc_field',
790             ],
791             '+as'     => [
792                 'facet',
793                 'suggestible',
794                 'sort',
795                 'marc_type',
796                 'marc_field',
797             ],
798         }
799     );
800
801     while ( my $search_field = $search_fields->next ) {
802         $sub->(
803             $search_field->name,
804             $search_field->type,
805             $search_field->get_column('facet'),
806             $search_field->get_column('suggestible'),
807             $search_field->get_column('sort'),
808             $search_field->get_column('marc_type'),
809             $search_field->get_column('marc_field'),
810         );
811     }
812 }
813
814 =head2 process_error
815
816     die process_error($@);
817
818 This parses an Elasticsearch error message and produces a human-readable
819 result from it. This result is probably missing all the useful information
820 that you might want in diagnosing an issue, so the warning is also logged.
821
822 Note that currently the resulting message is not internationalised. This
823 will happen eventually by some method or other.
824
825 =cut
826
827 sub process_error {
828     my ($self, $msg) = @_;
829
830     warn $msg; # simple logging
831
832     # This is super-primitive
833     return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
834
835     return "Unable to perform your search. Please try again.\n";
836 }
837
838 =head2 _read_configuration
839
840     my $conf = _read_configuration();
841
842 Reads the I<configuration file> and returns a hash structure with the
843 configuration information. It raises an exception if mandatory entries
844 are missing.
845
846 The hashref structure has the following form:
847
848     {
849         'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
850         'index_name' => 'koha_instance',
851     }
852
853 This is configured by the following in the C<config> block in koha-conf.xml:
854
855     <elasticsearch>
856         <server>127.0.0.1:9200</server>
857         <server>anotherserver:9200</server>
858         <index_name>koha_instance</index_name>
859     </elasticsearch>
860
861 =cut
862
863 sub _read_configuration {
864
865     my $configuration;
866
867     my $conf = C4::Context->config('elasticsearch');
868     Koha::Exceptions::Config::MissingEntry->throw(
869         "Missing 'elasticsearch' block in config file")
870       unless defined $conf;
871
872     if ( $conf && $conf->{server} ) {
873         my $nodes = $conf->{server};
874         if ( ref($nodes) eq 'ARRAY' ) {
875             $configuration->{nodes} = $nodes;
876         }
877         else {
878             $configuration->{nodes} = [$nodes];
879         }
880     }
881     else {
882         Koha::Exceptions::Config::MissingEntry->throw(
883             "Missing 'server' entry in config file for elasticsearch");
884     }
885
886     if ( defined $conf->{index_name} ) {
887         $configuration->{index_name} = $conf->{index_name};
888     }
889     else {
890         Koha::Exceptions::Config::MissingEntry->throw(
891             "Missing 'index_name' entry in config file for elasticsearch");
892     }
893
894     return $configuration;
895 }
896
897 1;
898
899 __END__
900
901 =head1 AUTHOR
902
903 =over 4
904
905 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
906
907 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
908
909 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>
910
911 =back
912
913 =cut