1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it
8 # under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # Koha is distributed in the hope that it will be useful, but
13 # WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with Koha; if not, see <http://www.gnu.org/licenses>.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::Exceptions::Elasticsearch;
27 use Koha::SearchFields;
28 use Koha::SearchMarcMaps;
31 use C4::AuthoritiesMarc qw( GuessAuthTypeCode );
33 use Carp qw( carp croak );
34 use Clone qw( clone );
36 use Readonly qw( Readonly );
37 use Search::Elasticsearch;
38 use Try::Tiny qw( catch try );
41 use List::Util qw( sum0 );
43 use MIME::Base64 qw( encode_base64 );
44 use Encode qw( encode );
46 use Scalar::Util qw( looks_like_number );
48 __PACKAGE__->mk_ro_accessors(qw( index index_name ));
49 __PACKAGE__->mk_accessors(qw( sort_fields ));
51 # Constants to refer to the standard index names
52 Readonly our $BIBLIOS_INDEX => 'biblios';
53 Readonly our $AUTHORITIES_INDEX => 'authorities';
57 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
65 The name of the index to use, generally 'biblios' or 'authorities'.
69 The Elasticsearch index name with Koha instance prefix.
82 # Check for a valid index
83 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $params->{index};
84 my $config = _read_configuration();
85 $params->{index_name} = $config->{index_name} . '_' . $params->{index};
87 my $self = $class->SUPER::new(@_);
91 =head2 get_elasticsearch
93 my $elasticsearch_client = $self->get_elasticsearch();
95 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
96 instance level and will be reused if method is called multiple times.
100 sub get_elasticsearch {
102 unless (defined $self->{elasticsearch}) {
103 $self->{elasticsearch} = Search::Elasticsearch->new(
104 $self->get_elasticsearch_params()
107 return $self->{elasticsearch};
110 =head2 get_elasticsearch_params
112 my $params = $self->get_elasticsearch_params();
114 This provides a hashref that contains the parameters for connecting to the
115 ElasicSearch servers, in the form:
118 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
119 'index_name' => 'koha_instance_index',
122 This is configured by the following in the C<config> block in koha-conf.xml:
125 <server>127.0.0.1:9200</server>
126 <server>anotherserver:9200</server>
127 <index_name>koha_instance</index_name>
132 sub get_elasticsearch_params {
137 $conf = _read_configuration();
139 if ( ref($_) eq 'Koha::Exceptions::Config::MissingEntry' ) {
147 =head2 get_elasticsearch_settings
149 my $settings = $self->get_elasticsearch_settings();
151 This provides the settings provided to Elasticsearch when an index is created.
152 These can do things like define tokenization methods.
154 A hashref containing the settings is returned.
158 sub get_elasticsearch_settings {
161 # Use state to speed up repeated calls
162 state $settings = undef;
163 if (!defined $settings) {
164 my $config_file = C4::Context->config('elasticsearch_index_config');
165 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
166 $settings = YAML::XS::LoadFile( $config_file );
172 =head2 get_elasticsearch_mappings
174 my $mappings = $self->get_elasticsearch_mappings();
176 This provides the mappings that get passed to Elasticsearch when an index is
181 sub get_elasticsearch_mappings {
184 # Use state to speed up repeated calls
188 if (!defined $all_mappings{$self->index}) {
189 $sort_fields{$self->index} = {};
190 # Clone the general mapping to break ties with the original hash
192 data => clone(_get_elasticsearch_field_config('general', ''))
194 my $marcflavour = lc C4::Context->preference('marcflavour');
195 $self->_foreach_mapping(
197 my ( $name, $type, $facet, $suggestible, $sort, $search, $marc_type ) = @_;
198 return if $marc_type ne $marcflavour;
199 # TODO if this gets any sort of complexity to it, it should
200 # be broken out into its own function.
202 # TODO be aware of date formats, but this requires pre-parsing
203 # as ES will simply reject anything with an invalid date.
204 my $es_type = 'text';
205 if ($type eq 'boolean') {
206 $es_type = 'boolean';
207 } elsif ($type eq 'number' || $type eq 'sum') {
208 $es_type = 'integer';
209 } elsif ($type eq 'isbn' || $type eq 'stdno') {
211 } elsif ($type eq 'year') {
216 $mappings->{data}{properties}{$name} = _get_elasticsearch_field_config('search', $es_type);
220 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_field_config('facet', $es_type);
223 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_field_config('suggestible', $es_type);
225 # Sort is a bit special as it can be true, false, undef.
226 # We care about "true" or "undef",
227 # "undef" means to do the default thing, which is make it sortable.
228 if (!defined $sort || $sort) {
229 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_field_config('sort', $es_type);
230 $sort_fields{$self->index}{$name} = 1;
234 $mappings->{data}{properties}{ 'match-heading' } = _get_elasticsearch_field_config('search', 'text') if $self->index eq 'authorities';
235 $all_mappings{$self->index} = $mappings;
237 $self->sort_fields(\%{$sort_fields{$self->index}});
238 return $all_mappings{$self->index};
241 =head2 raw_elasticsearch_mappings
243 Return elasticsearch mapping as it is in database.
244 marc_type: marc21|unimarc|normarc
246 $raw_mappings = raw_elasticsearch_mappings( $marc_type )
250 sub raw_elasticsearch_mappings {
251 my ( $marc_type ) = @_;
253 my $schema = Koha::Database->new()->schema();
255 my $search_fields = Koha::SearchFields->search({}, { order_by => { -asc => 'name' } });
258 while ( my $search_field = $search_fields->next ) {
260 my $marc_to_fields = $schema->resultset('SearchMarcToField')->search(
261 { search_field_id => $search_field->id },
263 join => 'search_marc_map',
264 order_by => { -asc => ['search_marc_map.marc_type','search_marc_map.marc_field'] }
268 while ( my $marc_to_field = $marc_to_fields->next ) {
270 my $marc_map = $marc_to_field->search_marc_map;
272 next if $marc_type && $marc_map->marc_type ne $marc_type;
274 $mappings->{ $marc_map->index_name }{ $search_field->name }{label} = $search_field->label;
275 $mappings->{ $marc_map->index_name }{ $search_field->name }{type} = $search_field->type;
276 $mappings->{ $marc_map->index_name }{ $search_field->name }{mandatory} = $search_field->mandatory;
277 $mappings->{ $marc_map->index_name }{ $search_field->name }{facet_order} = $search_field->facet_order if defined $search_field->facet_order;
278 $mappings->{ $marc_map->index_name }{ $search_field->name }{weight} = $search_field->weight if defined $search_field->weight;
279 $mappings->{ $marc_map->index_name }{ $search_field->name }{opac} = $search_field->opac if defined $search_field->opac;
280 $mappings->{ $marc_map->index_name }{ $search_field->name }{staff_client} = $search_field->staff_client if defined $search_field->staff_client;
282 push (@{ $mappings->{ $marc_map->index_name }{ $search_field->name }{mappings} },
284 facet => $marc_to_field->facet || '',
285 marc_type => $marc_map->marc_type,
286 marc_field => $marc_map->marc_field,
287 sort => $marc_to_field->sort,
288 suggestible => $marc_to_field->suggestible || ''
297 =head2 _get_elasticsearch_field_config
299 Get the Elasticsearch field config for the given purpose and data type.
301 $mapping = _get_elasticsearch_field_config('search', 'text');
305 sub _get_elasticsearch_field_config {
307 my ( $purpose, $type ) = @_;
309 # Use state to speed up repeated calls
310 state $settings = undef;
311 if (!defined $settings) {
312 my $config_file = C4::Context->config('elasticsearch_field_config');
313 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
314 local $YAML::XS::Boolean = 'JSON::PP';
315 $settings = YAML::XS::LoadFile( $config_file );
318 if (!defined $settings->{$purpose}) {
319 die "Field purpose $purpose not defined in field config";
322 return $settings->{$purpose};
324 if (defined $settings->{$purpose}{$type}) {
325 return $settings->{$purpose}{$type};
327 if (defined $settings->{$purpose}{'default'}) {
328 return $settings->{$purpose}{'default'};
333 =head2 _load_elasticsearch_mappings
335 Load Elasticsearch mappings in the format of mappings.yaml.
337 $indexes = _load_elasticsearch_mappings();
341 sub _load_elasticsearch_mappings {
342 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
343 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
344 return YAML::XS::LoadFile( $mappings_yaml );
347 sub reset_elasticsearch_mappings {
349 my $indexes = $self->_load_elasticsearch_mappings();
351 Koha::SearchMarcMaps->delete;
352 Koha::SearchFields->delete;
354 while ( my ( $index_name, $fields ) = each %$indexes ) {
355 while ( my ( $field_name, $data ) = each %$fields ) {
357 my %sf_params = map { $_ => $data->{$_} } grep { exists $data->{$_} } qw/ type label weight staff_client opac facet_order mandatory/;
360 $sf_params{staff_client} //= 1;
361 $sf_params{opac} //= 1;
363 $sf_params{name} = $field_name;
365 my $search_field = Koha::SearchFields->find_or_create( \%sf_params, { key => 'name' } );
367 my $mappings = $data->{mappings};
368 for my $mapping ( @$mappings ) {
369 my $marc_field = Koha::SearchMarcMaps->find_or_create({
370 index_name => $index_name,
371 marc_type => $mapping->{marc_type},
372 marc_field => $mapping->{marc_field}
374 $search_field->add_to_search_marc_maps($marc_field, {
375 facet => $mapping->{facet} || 0,
376 suggestible => $mapping->{suggestible} || 0,
377 sort => $mapping->{sort} // 1,
378 search => $mapping->{search} // 1
384 $self->clear_search_fields_cache();
386 # FIXME return the mappings?
389 # This overrides the accessor provided by Class::Accessor so that if
390 # sort_fields isn't set, then it'll generate it.
394 $self->_sort_fields_accessor(@_);
397 my $val = $self->_sort_fields_accessor();
400 # This will populate the accessor as a side effect
401 $self->get_elasticsearch_mappings();
402 return $self->_sort_fields_accessor();
405 =head2 _process_mappings($mappings, $data, $record_document, $meta)
407 $self->_process_mappings($mappings, $marc_field_data, $record_document, 0)
409 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
410 Since we group all mappings by MARC field targets C<$mappings> will contain
411 all targets for C<$data> and thus we need to fetch the MARC field only once.
412 C<$mappings> will be applied to C<$record_document> and new field values added.
413 The method has no return value.
419 Arrayref of mappings containing arrayrefs in the format
420 [C<$target>, C<$options>] where C<$target> is the name of the target field and
421 C<$options> is a hashref containing processing directives for this particular
426 The source data from a MARC record field.
428 =item C<$record_document>
430 Hashref representing the Elasticsearch document on which mappings should be
435 A hashref containing metadata useful for enforcing per mapping rules. For
436 example for providing extra context for mapping options, or treating mapping
437 targets differently depending on type (sort, search, facet etc). Combining
438 this metadata with the mapping options and metadata allows us to mutate the
439 data per mapping, or even replace it with other data retrieved from the
442 Current properties are:
444 C<altscript>: A boolean value indicating whether an alternate script presentation is being
447 C<data_source>: The source of the $<data> argument. Possible values are: 'leader', 'control_field',
448 'subfield' or 'subfields_group'.
450 C<code>: The code of the subfield C<$data> was retrieved, if C<data_source> is 'subfield'.
452 C<codes>: Subfield codes of the subfields group from which C<$data> was retrieved, if C<data_source>
453 is 'subfields_group'.
455 C<field>: The original C<MARC::Record> object.
461 sub _process_mappings {
462 my ($_self, $mappings, $data, $record_document, $meta) = @_;
463 foreach my $mapping (@{$mappings}) {
464 my ($target, $options) = @{$mapping};
466 # Don't process sort fields for alternate scripts
467 my $sort = $target =~ /__sort$/;
468 if ($sort && $meta->{altscript}) {
472 # Copy (scalar) data since can have multiple targets
473 # with differing options for (possibly) mutating data
474 # so need a different copy for each
475 my $data_copy = $data;
476 if (defined $options->{substr}) {
477 my ($start, $length) = @{$options->{substr}};
478 $data_copy = length($data) > $start ? substr $data_copy, $start, $length : '';
481 # Add data to values array for callbacks processing
482 my $values = [$data_copy];
484 # Value callbacks takes subfield data (or values from previous
485 # callbacks) as argument, and returns a possibly different list of values.
486 # Note that the returned list may also be empty.
487 if (defined $options->{value_callbacks}) {
488 foreach my $callback (@{$options->{value_callbacks}}) {
489 # Pass each value to current callback which returns a list
490 # (scalar is fine too) resulting either in a list or
491 # a list of lists that will be flattened by perl.
492 # The next callback will receive the possibly expanded list of values.
493 $values = [ map { $callback->($_) } @{$values} ];
497 # Skip mapping if all values has been removed
498 next unless @{$values};
500 if (defined $options->{property}) {
501 $values = [ map { { $options->{property} => $_ } if $_} @{$values} ];
503 if (defined $options->{nonfiling_characters_indicator}) {
504 my $nonfiling_chars = $meta->{field}->indicator($options->{nonfiling_characters_indicator});
505 $nonfiling_chars = looks_like_number($nonfiling_chars) ? int($nonfiling_chars) : 0;
506 # Nonfiling chars does not make sense for multiple values
507 # Only apply on first element
508 $values->[0] = substr $values->[0], $nonfiling_chars;
511 $values = [ grep(!/^$/, @{$values}) ];
513 $record_document->{$target} //= [];
514 push @{$record_document->{$target}}, @{$values};
518 =head2 marc_records_to_documents($marc_records)
520 my $record_documents = $self->marc_records_to_documents($marc_records);
522 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
524 Returns array of hash references, representing Elasticsearch documents,
525 acceptable as body payload in C<Search::Elasticsearch> requests.
529 =item C<$marc_documents>
531 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
537 sub marc_records_to_documents {
538 my ($self, $records) = @_;
539 my $rules = $self->_get_marc_mapping_rules();
540 my $control_fields_rules = $rules->{control_fields};
541 my $data_fields_rules = $rules->{data_fields};
542 my $marcflavour = lc C4::Context->preference('marcflavour');
543 my $use_array = C4::Context->preference('ElasticsearchMARCFormat') eq 'ARRAY';
545 my @record_documents;
547 my %auth_match_headings;
548 if( $self->index eq 'authorities' ){
549 my @auth_types = Koha::Authority::Types->search();
550 %auth_match_headings = map { $_->authtypecode => $_->auth_tag_to_report } @auth_types;
553 foreach my $record (@{$records}) {
554 my $record_document = {};
556 if ( $self->index eq 'authorities' ){
557 my $authtypecode = GuessAuthTypeCode( $record );
559 if( $authtypecode !~ m/_SUBD/ ){ #Subdivision records will not be used for linking and so don't require match-heading to be built
560 my $field = $record->field( $auth_match_headings{ $authtypecode } );
561 my $heading = C4::Heading->new_from_field( $field, undef, 1 ); #new auth heading
562 push @{$record_document->{'match-heading'}}, $heading->search_form if $heading;
565 warn "Cannot determine authority type for record: " . $record->field('001')->as_string;
569 my $mappings = $rules->{leader};
571 $self->_process_mappings($mappings, $record->leader(), $record_document, {
573 data_source => 'leader'
577 foreach my $field ($record->fields()) {
578 if ($field->is_control_field()) {
579 my $mappings = $control_fields_rules->{$field->tag()};
581 $self->_process_mappings($mappings, $field->data(), $record_document, {
583 data_source => 'control_field',
590 my $tag = $field->tag();
591 # Handle alternate scripts in MARC 21
593 if ($marcflavour eq 'marc21' && $tag eq '880') {
594 my $sub6 = $field->subfield('6');
595 if ($sub6 =~ /^(...)-\d+/) {
601 my $data_field_rules = $data_fields_rules->{$tag};
602 if ($data_field_rules) {
603 my $subfields_mappings = $data_field_rules->{subfields};
604 my $wildcard_mappings = $subfields_mappings->{'*'};
605 foreach my $subfield ($field->subfields()) {
606 my ($code, $data) = @{$subfield};
607 my $mappings = $subfields_mappings->{$code} // [];
608 if ($wildcard_mappings) {
609 $mappings = [@{$mappings}, @{$wildcard_mappings}];
612 $self->_process_mappings($mappings, $data, $record_document, {
613 altscript => $altscript,
614 data_source => 'subfield',
622 my $subfields_join_mappings = $data_field_rules->{subfields_join};
623 if ($subfields_join_mappings) {
624 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
625 my $data_field = $field->clone; #copy field to preserve for alt scripts
626 $data_field->delete_subfield(match => qr/^$/); #remove empty subfields, otherwise they are printed as a space
627 my $data = $data_field->as_string( $subfields_group ); #get values for subfields as a combined string, preserving record order
629 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document, {
630 altscript => $altscript,
631 data_source => 'subfields_group',
632 codes => $subfields_group,
642 foreach my $field (keys %{$rules->{defaults}}) {
643 unless (defined $record_document->{$field}) {
644 $record_document->{$field} = $rules->{defaults}->{$field};
647 foreach my $field (@{$rules->{sum}}) {
648 if (defined $record_document->{$field}) {
649 # TODO: validate numeric? filter?
650 # TODO: Or should only accept fields without nested values?
651 # TODO: Quick and dirty, improve if needed
652 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
655 # Index all applicable ISBN forms (ISBN-10 and ISBN-13 with and without dashes)
656 foreach my $field (@{$rules->{isbn}}) {
657 if (defined $record_document->{$field}) {
659 foreach my $input_isbn (@{$record_document->{$field}}) {
660 my $isbn = Business::ISBN->new($input_isbn);
661 if (defined $isbn && $isbn->is_valid) {
662 my $isbn13 = $isbn->as_isbn13->as_string;
663 push @isbns, $isbn13;
665 push @isbns, $isbn13;
667 my $isbn10 = $isbn->as_isbn10;
669 $isbn10 = $isbn10->as_string;
670 push @isbns, $isbn10;
672 push @isbns, $isbn10;
675 push @isbns, $input_isbn;
678 $record_document->{$field} = \@isbns;
682 # Remove duplicate values and collapse sort fields
683 foreach my $field (keys %{$record_document}) {
684 if (ref($record_document->{$field}) eq 'ARRAY') {
685 @{$record_document->{$field}} = do {
687 grep { !$seen{ref($_) eq 'HASH' && defined $_->{input} ? $_->{input} : $_}++ } @{$record_document->{$field}};
689 if ($field =~ /__sort$/) {
690 # Make sure to keep the sort field length sensible. 255 was chosen as a nice round value.
691 $record_document->{$field} = [substr(join(' ', @{$record_document->{$field}}), 0, 255)];
696 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
697 $record->encoding('UTF-8');
699 $record_document->{'marc_data_array'} = $self->_marc_to_array($record);
700 $record_document->{'marc_format'} = 'ARRAY';
704 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
705 local $SIG{__WARN__} = sub {
706 push @warnings, $_[0];
708 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
711 # Suppress warnings if record length exceeded
712 unless (substr($record->leader(), 0, 5) eq '99999') {
713 foreach my $warning (@warnings) {
717 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
718 $record_document->{'marc_format'} = 'MARCXML';
721 $record_document->{'marc_format'} = 'base64ISO2709';
724 push @record_documents, $record_document;
726 return \@record_documents;
729 =head2 _marc_to_array($record)
731 my @fields = _marc_to_array($record)
733 Convert a MARC::Record to an array modeled after MARC-in-JSON
734 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
740 A MARC::Record object
747 my ($self, $record) = @_;
750 leader => $record->leader(),
753 for my $field ($record->fields()) {
754 my $tag = $field->tag();
755 if ($field->is_control_field()) {
756 push @{$data->{fields}}, {$tag => $field->data()};
759 foreach my $subfield ($field->subfields()) {
760 my ($code, $contents) = @{$subfield};
761 push @{$subfields}, {$code => $contents};
763 push @{$data->{fields}}, {
765 ind1 => $field->indicator(1),
766 ind2 => $field->indicator(2),
767 subfields => $subfields
775 =head2 _array_to_marc($data)
777 my $record = _array_to_marc($data)
779 Convert an array modeled after MARC-in-JSON to a MARC::Record
785 An array modeled after MARC-in-JSON
786 (see https://github.com/marc4j/marc4j/wiki/MARC-in-JSON-Description)
793 my ($self, $data) = @_;
795 my $record = MARC::Record->new();
797 $record->leader($data->{leader});
798 for my $field (@{$data->{fields}}) {
799 my $tag = (keys %{$field})[0];
800 $field = $field->{$tag};
802 if (ref($field) eq 'HASH') {
804 foreach my $subfield (@{$field->{subfields}}) {
805 my $code = (keys %{$subfield})[0];
806 push @subfields, $code;
807 push @subfields, $subfield->{$code};
809 $marc_field = MARC::Field->new($tag, $field->{ind1}, $field->{ind2}, @subfields);
811 $marc_field = MARC::Field->new($tag, $field)
813 $record->append_fields($marc_field);
819 =head2 _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
821 my @mappings = _field_mappings($facet, $suggestible, $sort, $search, $target_name, $target_type, $range)
823 Get mappings, an internal data structure later used by
824 L<_process_mappings($mappings, $data, $record_document, $meta)> to process MARC target
825 data for a MARC mapping.
827 The returned C<$mappings> is not to to be confused with mappings provided by
828 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
829 provided by C<_foreach_mapping> and expands it to this internal data structure.
830 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
831 is then applied to each MARC target (leader, control field data, subfield or
832 joined subfields) and integrated into the mapping rules data structure used in
833 C<marc_records_to_documents> to transform MARC records into Elasticsearch
840 Boolean indicating whether to create a facet field for this mapping.
842 =item C<$suggestible>
844 Boolean indicating whether to create a suggestion field for this mapping.
848 Boolean indicating whether to create a sort field for this mapping.
852 Boolean indicating whether to create a search field for this mapping.
854 =item C<$target_name>
856 Elasticsearch document target field name.
858 =item C<$target_type>
860 Elasticsearch document target field type.
864 An optional range as a string in the format "<START>-<END>" or "<START>",
865 where "<START>" and "<END>" are integers specifying a range that will be used
866 for extracting a substring from MARC data as Elasticsearch field target value.
868 The first character position is "0", and the range is inclusive,
869 so "0-2" means the first three characters of MARC data.
871 If only "<START>" is provided only one character at position "<START>" will
878 sub _field_mappings {
879 my ($_self, $facet, $suggestible, $sort, $search, $target_name, $target_type, $range) = @_;
880 my %mapping_defaults = ();
883 my $substr_args = undef;
884 if (defined $range) {
885 # TODO: use value_callback instead?
886 my ($start, $end) = map(int, split /-/, $range, 2);
887 $substr_args = [$start];
888 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
890 my $default_options = {};
892 $default_options->{substr} = $substr_args;
895 # TODO: Should probably have per type value callback/hook
896 # but hard code for now
897 if ($target_type eq 'boolean') {
898 $default_options->{value_callbacks} //= [];
899 push @{$default_options->{value_callbacks}}, sub {
901 # Trim whitespace at both ends
902 $value =~ s/^\s+|\s+$//g;
903 return $value ? 'true' : 'false';
906 elsif ($target_type eq 'year') {
907 $default_options->{value_callbacks} //= [];
908 # Only accept years containing digits and "u"
909 push @{$default_options->{value_callbacks}}, sub {
911 # Replace "u" with "0" for sorting
912 return map { s/[u\s]/0/gr } ( $value =~ /[0-9u\s]{4}/g );
917 my $mapping = [$target_name, $default_options];
918 push @mappings, $mapping;
922 push @suffixes, 'facet' if $facet;
923 push @suffixes, 'suggestion' if $suggestible;
924 push @suffixes, 'sort' if !defined $sort || $sort;
926 foreach my $suffix (@suffixes) {
927 my $mapping = ["${target_name}__$suffix"];
928 # TODO: Hack, fix later in less hideous manner
929 if ($suffix eq 'suggestion') {
930 push @{$mapping}, {%{$default_options}, property => 'input'};
933 # Important! Make shallow clone, or we end up with the same hashref
934 # shared by all mappings
935 push @{$mapping}, {%{$default_options}};
937 push @mappings, $mapping;
942 =head2 _get_marc_mapping_rules
944 my $mapping_rules = $self->_get_marc_mapping_rules()
946 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
948 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
949 each call to C<MARC::Record>->field) we create an optimized structure of mapping
950 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
952 We can then iterate through all MARC fields for each record and apply all relevant
953 rules once per fields instead of retreiving fields multiple times for each mapping rule
954 which is terribly slow.
958 # TODO: This structure can be used for processing multiple MARC::Records so is currently
959 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
960 # memory cache which it is currently not. The performance gain of caching
961 # would probably be marginal, but to do this could be a further improvement.
963 sub _get_marc_mapping_rules {
965 my $marcflavour = lc C4::Context->preference('marcflavour');
966 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-zA-Z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
967 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
970 'control_fields' => {},
977 $self->_foreach_mapping(sub {
978 my ($name, $type, $facet, $suggestible, $sort, $search, $marc_type, $marc_field) = @_;
979 return if $marc_type ne $marcflavour;
981 if ($type eq 'sum') {
982 push @{$rules->{sum}}, $name;
983 push @{$rules->{sum}}, $name."__sort" if $sort;
985 elsif ($type eq 'isbn') {
986 push @{$rules->{isbn}}, $name;
988 elsif ($type eq 'boolean') {
989 # boolean gets special handling, if value doesn't exist for a field,
991 $rules->{defaults}->{$name} = 'false';
994 if ($marc_field =~ $field_spec_regexp) {
999 # Parse and separate subfields form subfield groups
1001 my $subfield_group = '';
1004 foreach my $token (split //, $2) {
1005 if ($token eq "(") {
1007 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1008 "Unmatched opening parenthesis for $marc_field"
1015 elsif ($token eq ")") {
1017 if ($subfield_group) {
1018 push @subfield_groups, $subfield_group;
1019 $subfield_group = '';
1024 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1025 "Unmatched closing parenthesis for $marc_field"
1029 elsif ($open_group) {
1030 $subfield_group .= $token;
1033 push @subfields, $token;
1038 push @subfields, '*';
1041 my $range = defined $3 ? $3 : undef;
1042 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1043 if ($field_tag < 10) {
1044 $rules->{control_fields}->{$field_tag} //= [];
1045 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
1048 $rules->{data_fields}->{$field_tag} //= {};
1049 foreach my $subfield (@subfields) {
1050 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
1051 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
1053 foreach my $subfield_group (@subfield_groups) {
1054 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
1055 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
1059 elsif ($marc_field =~ $leader_regexp) {
1060 my $range = defined $1 ? $1 : undef;
1061 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $search, $name, $type, $range);
1062 push @{$rules->{leader}}, @mappings;
1065 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
1066 "Invalid MARC field expression: $marc_field"
1071 # Marc-flavour specific rule tweaks, could/should also provide hook for this
1072 if ($marcflavour eq 'marc21') {
1073 # Nonfiling characters processing for sort fields
1075 if ($self->index eq $Koha::SearchEngine::BIBLIOS_INDEX) {
1076 # Format is: nonfiling characters indicator => field names list
1078 1 => [130, 630, 730, 740],
1079 2 => [222, 240, 242, 243, 245, 440, 830]
1082 elsif ($self->index eq $Koha::SearchEngine::AUTHORITIES_INDEX) {
1085 2 => [130, 430, 530]
1088 foreach my $indicator (keys %title_fields) {
1089 foreach my $field_tag (@{$title_fields{$indicator}}) {
1090 my $mappings = $rules->{data_fields}->{$field_tag}->{subfields}->{a} // [];
1091 foreach my $mapping (@{$mappings}) {
1092 if ($mapping->[0] =~ /__sort$/) {
1093 # Mark this as to be processed for nonfiling characters indicator
1094 # later on in _process_mappings
1095 $mapping->[1]->{nonfiling_characters_indicator} = $indicator;
1105 =head2 _foreach_mapping
1107 $self->_foreach_mapping(
1109 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
1112 return unless $marc_type eq 'marc21';
1113 print "Data comes from: " . $marc_field . "\n";
1117 This allows you to apply a function to each entry in the elasticsearch mappings
1118 table, in order to build the mappings for whatever is needed.
1120 In the provided function, the files are:
1126 The field name for elasticsearch (corresponds to the 'mapping' column in the
1131 The type for this value, e.g. 'string'.
1135 True if this value should be facetised. This only really makes sense if the
1136 field is understood by the facet processing code anyway.
1140 True if this is a field that a) needs special sort handling, and b) if it
1141 should be sorted on. False if a) but not b). Undef if not a). This allows,
1142 for example, author to be sorted on but not everything marked with "author"
1143 to be included in that sort.
1147 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
1148 'unimarc', 'normarc'.
1150 =item C<$marc_field>
1152 A string that describes the MARC field that contains the data to extract.
1158 sub _foreach_mapping {
1159 my ( $self, $sub ) = @_;
1161 # TODO use a caching framework here
1162 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
1164 'search_marc_map.index_name' => $self->index,
1166 { join => { search_marc_to_fields => 'search_marc_map' },
1168 'search_marc_to_fields.facet',
1169 'search_marc_to_fields.suggestible',
1170 'search_marc_to_fields.sort',
1171 'search_marc_to_fields.search',
1172 'search_marc_map.marc_type',
1173 'search_marc_map.marc_field',
1186 while ( my $search_field = $search_fields->next ) {
1188 # Force lower case on indexed field names for case insensitive
1189 # field name searches
1190 lc($search_field->name),
1191 $search_field->type,
1192 $search_field->get_column('facet'),
1193 $search_field->get_column('suggestible'),
1194 $search_field->get_column('sort'),
1195 $search_field->get_column('search'),
1196 $search_field->get_column('marc_type'),
1197 $search_field->get_column('marc_field'),
1202 =head2 process_error
1204 die process_error($@);
1206 This parses an Elasticsearch error message and produces a human-readable
1207 result from it. This result is probably missing all the useful information
1208 that you might want in diagnosing an issue, so the warning is also logged.
1210 Note that currently the resulting message is not internationalised. This
1211 will happen eventually by some method or other.
1216 my ($self, $msg) = @_;
1218 warn $msg; # simple logging
1220 # This is super-primitive
1221 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
1223 return "Unable to perform your search. Please try again.\n";
1226 =head2 _read_configuration
1228 my $conf = _read_configuration();
1230 Reads the I<configuration file> and returns a hash structure with the
1231 configuration information. It raises an exception if mandatory entries
1234 The hashref structure has the following form:
1237 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
1238 'index_name' => 'koha_instance',
1241 This is configured by the following in the C<config> block in koha-conf.xml:
1244 <server>127.0.0.1:9200</server>
1245 <server>anotherserver:9200</server>
1246 <index_name>koha_instance</index_name>
1251 sub _read_configuration {
1255 my $conf = C4::Context->config('elasticsearch');
1256 unless ( defined $conf ) {
1257 Koha::Exceptions::Config::MissingEntry->throw(
1258 "Missing <elasticsearch> entry in koha-conf.xml"
1262 if ( $conf && $conf->{server} ) {
1263 my $nodes = $conf->{server};
1264 if ( ref($nodes) eq 'ARRAY' ) {
1265 $configuration->{nodes} = $nodes;
1268 $configuration->{nodes} = [$nodes];
1272 Koha::Exceptions::Config::MissingEntry->throw(
1273 "Missing <elasticsearch>/<server> entry in koha-conf.xml"
1277 if ( defined $conf->{index_name} ) {
1278 $configuration->{index_name} = $conf->{index_name};
1281 Koha::Exceptions::Config::MissingEntry->throw(
1282 "Missing <elasticsearch>/<index_name> entry in koha-conf.xml",
1286 $configuration->{cxn_pool} = $conf->{cxn_pool} // 'Static';
1288 $configuration->{trace_to} = $conf->{trace_to} if defined $conf->{trace_to};
1290 return $configuration;
1293 =head2 get_facetable_fields
1295 my @facetable_fields = Koha::SearchEngine::Elasticsearch->get_facetable_fields();
1297 Returns the list of Koha::SearchFields marked to be faceted in the ES configuration
1301 sub get_facetable_fields {
1304 # These should correspond to the ES field names, as opposed to the CCL
1305 # things that zebra uses.
1306 my @search_field_names = qw( author itype location su-geo title-series subject ccode holdingbranch homebranch ln );
1307 my @faceted_fields = Koha::SearchFields->search(
1308 { name => { -in => \@search_field_names }, facet_order => { '!=' => undef } }, { order_by => ['facet_order'] }
1310 my @not_faceted_fields = Koha::SearchFields->search(
1311 { name => { -in => \@search_field_names }, facet_order => undef }, { order_by => ['facet_order'] }
1313 # This could certainly be improved
1314 return ( @faceted_fields, @not_faceted_fields );
1317 =head2 clear_search_fields_cache
1319 Koha::SearchEngine::Elasticsearch->clear_search_fields_cache();
1321 Clear cached values for ES search fields
1325 sub clear_search_fields_cache {
1327 my $cache = Koha::Caches->get_instance();
1328 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_biblios');
1329 $cache->clear_from_cache('elasticsearch_search_fields_opac_biblios');
1330 $cache->clear_from_cache('elasticsearch_search_fields_staff_client_authorities');
1331 $cache->clear_from_cache('elasticsearch_search_fields_opac_authorities');
1343 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
1345 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
1347 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>