1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
37 use Search::Elasticsearch;
41 use List::Util qw( sum0 reduce );
44 use Encode qw(encode);
46 use Scalar::Util qw(looks_like_number);
48 __PACKAGE__->mk_ro_accessors(qw( index ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
65 The name of the index to use, generally 'biblios' or 'authorities'.
75 my $self = $class->SUPER::new(@_);
76 # Check for a valid index
77 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
81 =head2 get_elasticsearch
83 my $elasticsearch_client = $self->get_elasticsearch();
85 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
86 instance level and will be reused if method is called multiple times.
90 sub get_elasticsearch {
92 unless (defined $self->{elasticsearch}) {
93 my $conf = $self->get_elasticsearch_params();
94 $self->{elasticsearch} = Search::Elasticsearch->new($conf);
96 return $self->{elasticsearch};
99 =head2 get_elasticsearch_params
101 my $params = $self->get_elasticsearch_params();
103 This provides a hashref that contains the parameters for connecting to the
104 ElasicSearch servers, in the form:
107 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
108 'index_name' => 'koha_instance_index',
111 This is configured by the following in the C<config> block in koha-conf.xml:
114 <server>127.0.0.1:9200</server>
115 <server>anotherserver:9200</server>
116 <index_name>koha_instance</index_name>
121 sub get_elasticsearch_params {
124 # Copy the hash so that we're not modifying the original
125 my $conf = C4::Context->config('elasticsearch');
126 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
127 my $es = { %{ $conf } };
129 # Helpfully, the multiple server lines end up in an array for us anyway
130 # if there are multiple ones, but not if there's only one.
131 my $server = $es->{server};
132 delete $es->{server};
133 if ( ref($server) eq 'ARRAY' ) {
135 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
136 $es->{nodes} = $server;
139 $es->{nodes} = [$server];
142 die "No elasticsearch servers were specified in koha-conf.xml.\n";
144 die "No elasticsearch index_name was specified in koha-conf.xml.\n"
145 if ( !$es->{index_name} );
146 # Append the name of this particular index to our namespace
147 $es->{index_name} .= '_' . $self->index;
149 $es->{key_prefix} = 'es_';
150 $es->{cxn_pool} //= 'Static';
151 $es->{request_timeout} //= 60;
156 =head2 get_elasticsearch_settings
158 my $settings = $self->get_elasticsearch_settings();
160 This provides the settings provided to Elasticsearch when an index is created.
161 These can do things like define tokenization methods.
163 A hashref containing the settings is returned.
167 sub get_elasticsearch_settings {
170 # Use state to speed up repeated calls
171 state $settings = undef;
172 if (!defined $settings) {
173 my $config_file = C4::Context->config('elasticsearch_index_config');
174 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
175 $settings = LoadFile( $config_file );
181 =head2 get_elasticsearch_mappings
183 my $mappings = $self->get_elasticsearch_mappings();
185 This provides the mappings that get passed to Elasticsearch when an index is
190 sub get_elasticsearch_mappings {
193 # Use state to speed up repeated calls
197 if (!defined $all_mappings{$self->index}) {
198 $sort_fields{$self->index} = {};
199 # Clone the general mapping to break ties with the original hash
201 data => clone(_get_elasticsearch_field_config('general', ''))
203 my $marcflavour = lc C4::Context->preference('marcflavour');
204 $self->_foreach_mapping(
206 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
207 return if $marc_type ne $marcflavour;
208 # TODO if this gets any sort of complexity to it, it should
209 # be broken out into its own function.
211 # TODO be aware of date formats, but this requires pre-parsing
212 # as ES will simply reject anything with an invalid date.
213 my $es_type = 'text';
214 if ($type eq 'boolean') {
215 $es_type = 'boolean';
216 } elsif ($type eq 'number' || $type eq 'sum') {
217 $es_type = 'integer';
218 } elsif ($type eq 'isbn' || $type eq 'stdno') {
223 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
227 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
230 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
232 # Sort is a bit special as it can be true, false, undef.
233 # We care about "true" or "undef",
234 # "undef" means to do the default thing, which is make it sortable.
235 if (!defined $sort || $sort) {
236 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
237 $sort_fields{$self->index}{$name} = 1;
241 $all_mappings{$self->index} = $mappings;
243 $self->sort_fields(\%{$sort_fields{$self->index}});
245 return $all_mappings{$self->index};
248 =head2 raw_elasticsearch_mappings
250 Return elasticsearch mapping as it is in database.
251 marc_type: marc21|unimarc|normarc
253 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
257 sub raw_elasticsearch_mappings {
258 my ( $marc_type ) = @_;
260 my $schema = Koha::Database->new()->schema();
262 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
265 while ( my $search_field = $search_fields->next ) {
267 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
268 { search_field_id => $search_field->id },
270 join => 'search_marc_map',
271 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
275 while ( my $marc_to_field = $marc_to_fields->next ) {
277 my $marc_map = $marc_to_field->search_marc_map;
279 next if $marc_type && $marc_map->marc_type ne $marc_type;
281 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
282 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
283 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
284 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
286 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
288 facet => $marc_to_field->facet || '',
289 marc_type => $marc_map->marc_type,
290 marc_field => $marc_map->marc_field,
291 sort => $marc_to_field->sort,
292 suggestible => $marc_to_field->suggestible || ''
301 =head2 _get_elasticsearch_field_config
303 Get the Elasticsearch field config for the given purpose and data type.
305 $mapping = _get_elasticsearch_field_config('search', 'text');
309 sub _get_elasticsearch_field_config {
311 my ( $purpose, $type ) = @_;
313 # Use state to speed up repeated calls
314 state $settings = undef;
315 if (!defined $settings) {
316 my $config_file = C4::Context->config('elasticsearch_field_config');
317 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
318 $settings = LoadFile( $config_file );
321 if (!defined $settings->{$purpose}) {
322 die "Field purpose $purpose not defined in field config";
325 return $settings->{$purpose};
327 if (defined $settings->{$purpose}{$type}) {
328 return $settings->{$purpose}{$type};
330 if (defined $settings->{$purpose}{'default'}) {
331 return $settings->{$purpose}{'default'};
336 =head2 _load_elasticsearch_mappings
338 Load Elasticsearch mappings in the format of mappings.yaml.
340 $indexes = _load_elasticsearch_mappings();
344 sub _load_elasticsearch_mappings {
345 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
346 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
347 return LoadFile( $mappings_yaml );
350 sub reset_elasticsearch_mappings {
352 my $indexes = $self->_load_elasticsearch_mappings();
354 Koha::SearchMarcMaps->delete;
355 Koha::SearchFields->delete;
357 while ( my ( $index_name, $fields ) = each %$indexes ) {
358 while ( my ( $field_name, $data ) = each %$fields ) {
360 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order /;
363 $sf_params{staff_client} //= 1;
364 $sf_params{opac} //= 1;
366 $sf_params{name} = $field_name;
368 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
370 my $mappings = $data->{mappings};
371 for my $mapping ( @$mappings ) {
372 my $marc_field = Koha::SearchMarcMaps->find_or_create({
373 index_name => $index_name,
374 marc_type => $mapping->{marc_type},
375 marc_field => $mapping->{marc_field}
377 $search_field->add_to_search_marc_maps($marc_field, {
378 facet => $mapping->{facet} || 0,
379 suggestible => $mapping->{suggestible} || 0,
380 sort => $mapping->{sort},
381 search => $mapping->{search} // 1
387 my $cache = Koha::Caches->get_instance();
388 $cache->clear_from_cache('elasticsearch_search_fields_staff_client');
389 $cache->clear_from_cache('elasticsearch_search_fields_opac');
391 # FIXME return the mappings?
394 # This overrides the accessor provided by Class::Accessor so that if
395 # sort_fields isn't set, then it'll generate it.
399 $self->_sort_fields_accessor(@_);
402 my $val = $self->_sort_fields_accessor();
405 # This will populate the accessor as a side effect
406 $self->get_elasticsearch_mappings();
407 return $self->_sort_fields_accessor();
410 =head2 _process_mappings($mappings, $data, $record_document, $meta)
412 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
414 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
415 Since we group all mappings by MARC field targets C<$mappings> will contain
416 all targets for C<$data> and thus we need to fetch the MARC field only once.
417 C<$mappings> will be applied to C<$record_document> and new field values added.
418 The method has no return value.
424 Arrayref of mappings containing arrayrefs in the format
425 [C<$target>, C<$options>] where C<$target> is the name of the target field and
426 C<$options> is a hashref containing processing directives for this particular
431 The source data from a MARC record field.
433 =item C<$record_document>
435 Hashref representing the Elasticsearch document on which mappings should be
440 A hashref containing metadata useful for enforcing per mapping rules. For
441 example for providing extra context for mapping options, or treating mapping
442 targets differently depending on type (sort, search, facet etc). Combining
443 this metadata with the mapping options and metadata allows us to mutate the
444 data per mapping, or even replace it with other data retrieved from the
447 Current properties are:
449 C<altscript>: A boolean value indicating whether an alternate script presentation is being
452 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
453 'subfield' or 'subfields_group'.
455 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
457 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
458 is 'subfields_group'.
460 C<field>: The original C<MARC::Record> object.
466 sub _process_mappings {
467 my ($_self, $mappings, $data, $record_document, $meta) = @_;
468 foreach my $mapping (@{$mappings}) {
469 my ($target, $options) = @{$mapping};
471 # Don't process sort fields for alternate scripts
472 my $sort = $target =~ /__sort$/;
473 if ($sort && $meta->{altscript}) {
477 # Copy (scalar) data since can have multiple targets
478 # with differing options for (possibly) mutating data
479 # so need a different copy for each
481 $record_document->{$target} //= [];
482 if (defined $options->{substr}) {
483 my ($start, $length) = @{$options->{substr}};
484 $_data = length($data) > $start ? substr $data, $start, $length : '';
486 if (defined $options->{value_callbacks}) {
487 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
489 if (defined $options->{property}) {
491 $options->{property} => $_data
494 if (defined $options->{nonfiling_characters_indicator}) {
495 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
496 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
497 if ($nonfiling_chars) {
498 $_data = substr $_data, $nonfiling_chars;
501 push @{$record_document->{$target}}, $_data;
505 =head2 marc_records_to_documents($marc_records)
507 my $record_documents = $self->marc_records_to_documents($marc_records);
509 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
511 Returns array of hash references, representing Elasticsearch documents,
512 acceptable as body payload in C<Search::Elasticsearch> requests.
516 =item C<$marc_documents>
518 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
524 sub marc_records_to_documents {
525 my ($self, $records) = @_;
526 my $rules = $self->_get_marc_mapping_rules();
527 my $control_fields_rules = $rules->{control_fields};
528 my $data_fields_rules = $rules->{data_fields};
529 my $marcflavour = lc C4::Context->preference('marcflavour');
530 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
532 my @record_documents;
534 foreach my $record (@{$records}) {
535 my $record_document = {};
536 my $mappings = $rules->{leader};
538 $self->_process_mappings($mappings, $record->leader(), $record_document, {
540 data_source => 'leader'
544 foreach my $field ($record->fields()) {
545 if ($field->is_control_field()) {
546 my $mappings = $control_fields_rules->{$field->tag()};
548 $self->_process_mappings($mappings, $field->data(), $record_document, {
550 data_source => 'control_field',
557 my $tag = $field->tag();
558 # Handle alternate scripts in MARC 21
560 if ($marcflavour eq 'marc21' && $tag eq '880') {
561 my $sub6 = $field->subfield('6');
562 if ($sub6 =~ /^(...)-\d+/) {
568 my $data_field_rules = $data_fields_rules->{$tag};
569 if ($data_field_rules) {
570 my $subfields_mappings = $data_field_rules->{subfields};
571 my $wildcard_mappings = $subfields_mappings->{'*'};
572 foreach my $subfield ($field->subfields()) {
573 my ($code, $data) = @{$subfield};
574 my $mappings = $subfields_mappings->{$code} // [];
575 if ($wildcard_mappings) {
576 $mappings = [@{$mappings}, @{$wildcard_mappings}];
579 $self->_process_mappings($mappings, $data, $record_document, {
580 altscript => $altscript,
581 data_source => 'subfield',
587 if ( @{$mappings} && grep { $_->[0] eq 'match-heading'} @{$mappings} ){
588 # Used by the authority linker the match-heading field requires a specific syntax
589 # that is specified in C4/Heading
590 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
591 next unless $heading;
592 push @{$record_document->{'match-heading'}}, $heading->search_form;
596 my $subfields_join_mappings = $data_field_rules->{subfields_join};
597 if ($subfields_join_mappings) {
598 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
599 # Map each subfield to values, remove empty values, join with space
604 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
608 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
609 altscript => $altscript,
610 data_source => 'subfields_group',
611 codes => $subfields_group,
616 if ( grep { $_->[0] eq 'match-heading' } @{$subfields_join_mappings->{$subfields_group}} ){
617 # Used by the authority linker the match-heading field requires a specific syntax
618 # that is specified in C4/Heading
619 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
620 next unless $heading;
621 push @{$record_document->{'match-heading'}}, $heading->search_form;
628 foreach my $field (keys %{$rules->{defaults}}) {
629 unless (defined $record_document->{$field}) {
630 $record_document->{$field} = $rules->{defaults}->{$field};
633 foreach my $field (@{$rules->{sum}}) {
634 if (defined $record_document->{$field}) {
635 # TODO: validate numeric? filter?
636 # TODO: Or should only accept fields without nested values?
637 # TODO: Quick and dirty, improve if needed
638 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
641 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
642 foreach my $field (@{$rules->{isbn}}) {
643 if (defined $record_document->{$field}) {
645 foreach my $input_isbn (@{$record_document->{$field}}) {
646 my $isbn = Business::ISBN->new($input_isbn);
647 if (defined $isbn && $isbn->is_valid) {
648 my $isbn13 = $isbn->as_isbn13->as_string;
649 push @isbns, $isbn13;
651 push @isbns, $isbn13;
653 my $isbn10 = $isbn->as_isbn10;
655 $isbn10 = $isbn10->as_string;
656 push @isbns, $isbn10;
658 push @isbns, $isbn10;
661 push @isbns, $input_isbn;
664 $record_document->{$field} = \@isbns;
668 # Remove duplicate values and collapse sort fields
669 foreach my $field (keys %{$record_document}) {
670 if (ref($record_document->{$field}) eq 'ARRAY') {
671 @{$record_document->{$field}} = do {
673 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
675 if ($field =~ /__sort$/) {
676 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
677 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
682 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
683 $record->encoding('UTF-8');
685 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
686 $record_document->{'marc_format'} = 'ARRAY';
690 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
691 local $SIG{__WARN__} = sub {
692 push @warnings, $_[0];
694 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
697 # Suppress warnings if record length exceeded
698 unless (substr($record->leader(), 0, 5) eq '99999') {
699 foreach my $warning (@warnings) {
703 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
704 $record_document->{'marc_format'} = 'MARCXML';
707 $record_document->{'marc_format'} = 'base64ISO2709';
710 push @record_documents, $record_document;
712 return \@record_documents;
715 =head2 _marc_to_array($record)
717 my @fields = _marc_to_array($record)
719 Convert a MARC::Record to an array modeled after MARC-in-JSON
720 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
726 A MARC::Record object
733 my ($self, $record) = @_;
736 leader => $record->leader(),
739 for my $field ($record->fields()) {
740 my $tag = $field->tag();
741 if ($field->is_control_field()) {
742 push @{$data->{fields}}, {$tag => $field->data()};
745 foreach my $subfield ($field->subfields()) {
746 my ($code, $contents) = @{$subfield};
747 push @{$subfields}, {$code => $contents};
749 push @{$data->{fields}}, {
751 ind1 => $field->indicator(1),
752 ind2 => $field->indicator(2),
753 subfields => $subfields
761 =head2 _array_to_marc($data)
763 my $record = _array_to_marc($data)
765 Convert an array modeled after MARC-in-JSON to a MARC::Record
771 An array modeled after MARC-in-JSON
772 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
779 my ($self, $data) = @_;
781 my $record = MARC::Record->new();
783 $record->leader($data->{leader});
784 for my $field (@{$data->{fields}}) {
785 my $tag = (keys %{$field})[0];
786 $field = $field->{$tag};
788 if (ref($field) eq 'HASH') {
790 foreach my $subfield (@{$field->{subfields}}) {
791 my $code = (keys %{$subfield})[0];
792 push @subfields, $code;
793 push @subfields, $subfield->{$code};
795 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
797 $marc_field = MARC::Field->new($tag, $field)
799 $record->append_fields($marc_field);
805 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
807 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
809 Get mappings, an internal data structure later used by
810 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
811 data for a MARC mapping.
813 The returned C<$mappings> is not to to be confused with mappings provided by
814 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
815 provided by C<_foreach_mapping> and expands it to this internal data structure.
816 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
817 is then applied to each MARC target (leader, control field data, subfield or
818 joined subfields) and integrated into the mapping rules data structure used in
819 C<marc_records_to_documents> to transform MARC records into Elasticsearch
826 Boolean indicating whether to create a facet field for this mapping.
828 =item C<$suggestible>
830 Boolean indicating whether to create a suggestion field for this mapping.
834 Boolean indicating whether to create a sort field for this mapping.
838 Boolean indicating whether to create a search field for this mapping.
840 =item C<$target_name>
842 Elasticsearch document target field name.
844 =item C<$target_type>
846 Elasticsearch document target field type.
850 An optional range as a string in the format "<START>-<END>" or "<START>",
851 where "<START>" and "<END>" are integers specifying a range that will be used
852 for extracting a substring from MARC data as Elasticsearch field target value.
854 The first character position is "0", and the range is inclusive,
855 so "0-2" means the first three characters of MARC data.
857 If only "<START>" is provided only one character at position "<START>" will
864 sub _field_mappings {
865 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
866 my %mapping_defaults = ();
869 my $substr_args = undef;
870 if (defined $range) {
871 # TODO: use value_callback instead?
872 my ($start, $end) = map(int, split /-/, $range, 2);
873 $substr_args = [$start];
874 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
876 my $default_options = {};
878 $default_options->{substr} = $substr_args;
881 # TODO: Should probably have per type value callback/hook
882 # but hard code for now
883 if ($target_type eq 'boolean') {
884 $default_options->{value_callbacks} //= [];
885 push @{$default_options->{value_callbacks}}, sub {
887 # Trim whitespace at both ends
888 $value =~ s/^\s+|\s+$//g;
889 return $value ? 'true' : 'false';
894 my $mapping = [$target_name, $default_options];
895 push @mappings, $mapping;
899 push @suffixes, 'facet' if $facet;
900 push @suffixes, 'suggestion' if $suggestible;
901 push @suffixes, 'sort' if !defined $sort || $sort;
903 foreach my $suffix (@suffixes) {
904 my $mapping = ["${target_name}__$suffix"];
905 # TODO: Hack, fix later in less hideous manner
906 if ($suffix eq 'suggestion') {
907 push @{$mapping}, {%{$default_options}, property => 'input'};
910 # Important! Make shallow clone, or we end up with the same hashref
911 # shared by all mappings
912 push @{$mapping}, {%{$default_options}};
914 push @mappings, $mapping;
919 =head2 _get_marc_mapping_rules
921 my $mapping_rules = $self->_get_marc_mapping_rules()
923 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
925 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
926 each call to C<MARC::Record>->field) we create an optimized structure of mapping
927 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
929 We can then iterate through all MARC fields for each record and apply all relevant
930 rules once per fields instead of retreiving fields multiple times for each mapping rule
931 which is terribly slow.
935 # TODO: This structure can be used for processing multiple MARC::Records so is currently
936 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
937 # memory cache which it is currently not. The performance gain of caching
938 # would probably be marginal, but to do this could be a further improvement.
940 sub _get_marc_mapping_rules {
942 my $marcflavour = lc C4::Context->preference('marcflavour');
943 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
944 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
947 'control_fields' => {},
954 $self->_foreach_mapping(sub {
955 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
956 return if $marc_type ne $marcflavour;
958 if ($type eq 'sum') {
959 push @{$rules->{sum}}, $name;
960 push @{$rules->{sum}}, $name."__sort" if $sort;
962 elsif ($type eq 'isbn') {
963 push @{$rules->{isbn}}, $name;
965 elsif ($type eq 'boolean') {
966 # boolean gets special handling, if value doesn't exist for a field,
968 $rules->{defaults}->{$name} = 'false';
971 if ($marc_field =~ $field_spec_regexp) {
976 # Parse and separate subfields form subfield groups
978 my $subfield_group = '';
981 foreach my $token (split //, $2) {
984 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
985 "Unmatched opening parenthesis for $marc_field"
992 elsif ($token eq ")") {
994 if ($subfield_group) {
995 push @subfield_groups, $subfield_group;
996 $subfield_group = '';
1001 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1002 "Unmatched closing parenthesis for $marc_field"
1006 elsif ($open_group) {
1007 $subfield_group .= $token;
1010 push @subfields, $token;
1015 push @subfields, '*';
1018 my $range = defined $3 ? $3 : undef;
1019 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1020 if ($field_tag < 10) {
1021 $rules->{control_fields}->{$field_tag} //= [];
1022 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1025 $rules->{data_fields}->{$field_tag} //= {};
1026 foreach my $subfield (@subfields) {
1027 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1028 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1030 foreach my $subfield_group (@subfield_groups) {
1031 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1032 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1036 elsif ($marc_field =~ $leader_regexp) {
1037 my $range = defined $1 ? $1 : undef;
1038 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1039 push @{$rules->{leader}}, @mappings;
1042 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1043 "Invalid MARC field expression: $marc_field"
1048 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1049 if ($marcflavour eq 'marc21') {
1050 # Nonfiling characters processing for sort fields
1052 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1053 # Format is: nonfiling characters indicator => field names list
1055 1 => [130, 630, 730, 740],
1056 2 => [222, 240, 242, 243, 245, 440, 830]
1059 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1062 2 => [130, 430, 530]
1065 foreach my $indicator (keys %title_fields) {
1066 foreach my $field_tag (@{$title_fields{$indicator}}) {
1067 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1068 foreach my $mapping (@{$mappings}) {
1069 if ($mapping->[0] =~ /__sort$/) {
1070 # Mark this as to be processed for nonfiling characters indicator
1071 # later on in _process_mappings
1072 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1082 =head2 _foreach_mapping
1084 $self->_foreach_mapping(
1086 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1089 return unless $marc_type eq 'marc21';
1090 print "Data comes from: " . $marc_field . "\n";
1094 This allows you to apply a function to each entry in the elasticsearch mappings
1095 table, in order to build the mappings for whatever is needed.
1097 In the provided function, the files are:
1103 The field name for elasticsearch (corresponds to the 'mapping' column in the
1108 The type for this value, e.g. 'string'.
1112 True if this value should be facetised. This only really makes sense if the
1113 field is understood by the facet processing code anyway.
1117 True if this is a field that a) needs special sort handling, and b) if it
1118 should be sorted on. False if a) but not b). Undef if not a). This allows,
1119 for example, author to be sorted on but not everything marked with "author"
1120 to be included in that sort.
1124 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1125 'unimarc', 'normarc'.
1127 =item C<$marc_field>
1129 A string that describes the MARC field that contains the data to extract.
1130 These are of a form suited to Catmandu's MARC fixers.
1136 sub _foreach_mapping {
1137 my ( $self, $sub ) = @_;
1139 # TODO use a caching framework here
1140 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1142 'search_marc_map.index_name' => $self->index,
1144 { join => { search_marc_to_fields => 'search_marc_map' },
1146 'search_marc_to_fields.facet',
1147 'search_marc_to_fields.suggestible',
1148 'search_marc_to_fields.sort',
1149 'search_marc_to_fields.search',
1150 'search_marc_map.marc_type',
1151 'search_marc_map.marc_field',
1164 while ( my $search_field = $search_fields->next ) {
1166 # Force lower case on indexed field names for case insensitive
1167 # field name searches
1168 lc($search_field->name),
1169 $search_field->type,
1170 $search_field->get_column('facet'),
1171 $search_field->get_column('suggestible'),
1172 $search_field->get_column('sort'),
1173 $search_field->get_column('search'),
1174 $search_field->get_column('marc_type'),
1175 $search_field->get_column('marc_field'),
1180 =head2 process_error
1182 die process_error($@);
1184 This parses an Elasticsearch error message and produces a human-readable
1185 result from it. This result is probably missing all the useful information
1186 that you might want in diagnosing an issue, so the warning is also logged.
1188 Note that currently the resulting message is not internationalised. This
1189 will happen eventually by some method or other.
1194 my ($self, $msg) = @_;
1196 warn $msg; # simple logging
1198 # This is super-primitive
1199 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1201 return "Unable to perform your search. Please try again.\n";
1204 =head2 _read_configuration
1206 my $conf = _read_configuration();
1208 Reads the I<configuration file> and returns a hash structure with the
1209 configuration information. It raises an exception if mandatory entries
1212 The hashref structure has the following form:
1215 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1216 'index_name' => 'koha_instance',
1219 This is configured by the following in the C<config> block in koha-conf.xml:
1222 <server>127.0.0.1:9200</server>
1223 <server>anotherserver:9200</server>
1224 <index_name>koha_instance</index_name>
1229 sub _read_configuration {
1233 my $conf = C4::Context->config('elasticsearch');
1234 Koha::Exceptions::Config::MissingEntry->throw(
1235 "Missing 'elasticsearch' block in config file")
1236 unless defined $conf;
1238 if ( $conf && $conf->{server} ) {
1239 my $nodes = $conf->{server};
1240 if ( ref($nodes) eq 'ARRAY' ) {
1241 $configuration->{nodes} = $nodes;
1244 $configuration->{nodes} = [$nodes];
1248 Koha::Exceptions::Config::MissingEntry->throw(
1249 "Missing 'server' entry in config file for elasticsearch");
1252 if ( defined $conf->{index_name} ) {
1253 $configuration->{index_name} = $conf->{index_name};
1256 Koha::Exceptions::Config::MissingEntry->throw(
1257 "Missing 'index_name' entry in config file for elasticsearch");
1260 return $configuration;
1263 =head2 get_facetable_fields
1265 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1267 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1271 sub get_facetable_fields {
1274 # These should correspond to the ES field names, as opposed to the CCL
1275 # things that zebra uses.
1276 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1277 my @faceted_fields = Koha::SearchFields->search(
1278 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1280 my @not_faceted_fields = Koha::SearchFields->search(
1281 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1283 # This could certainly be improved
1284 return ( @faceted_fields, @not_faceted_fields );
1295 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1297 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1299 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>