1 package Koha::SearchEngine::Elasticsearch;
3 # Copyright 2015 Catalyst IT
5 # This file is part of Koha.
7 # Koha is free software; you can redistribute it and/or modify it under the
8 # terms of the GNU General Public License as published by the Free Software
9 # Foundation; either version 3 of the License, or (at your option) any later
12 # Koha is distributed in the hope that it will be useful, but WITHOUT ANY
13 # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
14 # A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License along
17 # with Koha; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 use base qw(Class::Accessor);
25 use Koha::Exceptions::Config;
26 use Koha::SearchFields;
27 use Koha::SearchMarcMaps;
33 use Search::Elasticsearch;
37 use List::Util qw( sum0 reduce );
40 use Encode qw(encode);
42 __PACKAGE__->mk_ro_accessors(qw( index ));
43 __PACKAGE__->mk_accessors(qw( sort_fields ));
45 # Constants to refer to the standard index names
46 Readonly our $BIBLIOS_INDEX => 'biblios';
47 Readonly our $AUTHORITIES_INDEX => 'authorities';
51 Koha::SearchEngine::Elasticsearch - Base module for things using elasticsearch
59 The name of the index to use, generally 'biblios' or 'authorities'.
69 my $self = $class->SUPER::new(@_);
70 # Check for a valid index
71 Koha::Exceptions::MissingParameter->throw('No index name provided') unless $self->index;
75 =head2 get_elasticsearch
77 my $elasticsearch_client = $self->get_elasticsearch();
79 Returns a C<Search::Elasticsearch> client. The client is cached on a C<Koha::SearchEngine::ElasticSearch>
80 instance level and will be reused if method is called multiple times.
84 sub get_elasticsearch {
86 unless (defined $self->{elasticsearch}) {
87 my $conf = $self->get_elasticsearch_params();
88 $self->{elasticsearch} = Search::Elasticsearch->new(
89 client => "5_0::Direct",
90 nodes => $conf->{nodes},
95 return $self->{elasticsearch};
98 =head2 get_elasticsearch_params
100 my $params = $self->get_elasticsearch_params();
102 This provides a hashref that contains the parameters for connecting to the
103 ElasicSearch servers, in the form:
106 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
107 'index_name' => 'koha_instance_index',
110 This is configured by the following in the C<config> block in koha-conf.xml:
113 <server>127.0.0.1:9200</server>
114 <server>anotherserver:9200</server>
115 <index_name>koha_instance</index_name>
120 sub get_elasticsearch_params {
123 # Copy the hash so that we're not modifying the original
124 my $conf = C4::Context->config('elasticsearch');
125 die "No 'elasticsearch' block is defined in koha-conf.xml.\n" if ( !$conf );
126 my $es = { %{ $conf } };
128 # Helpfully, the multiple server lines end up in an array for us anyway
129 # if there are multiple ones, but not if there's only one.
130 my $server = $es->{server};
131 delete $es->{server};
132 if ( ref($server) eq 'ARRAY' ) {
134 # store it called 'nodes' (which is used by newer Search::Elasticsearch)
135 $es->{nodes} = $server;
138 $es->{nodes} = [$server];
141 die "No elasticsearch servers were specified in koha-conf.xml.\n";
143 die "No elasticserver index_name was specified in koha-conf.xml.\n"
144 if ( !$es->{index_name} );
145 # Append the name of this particular index to our namespace
146 $es->{index_name} .= '_' . $self->index;
148 $es->{key_prefix} = 'es_';
152 =head2 get_elasticsearch_settings
154 my $settings = $self->get_elasticsearch_settings();
156 This provides the settings provided to Elasticsearch when an index is created.
157 These can do things like define tokenization methods.
159 A hashref containing the settings is returned.
163 sub get_elasticsearch_settings {
166 # Use state to speed up repeated calls
167 state $settings = undef;
168 if (!defined $settings) {
169 my $config_file = C4::Context->config('elasticsearch_index_config');
170 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/index_config.yaml';
171 $settings = LoadFile( $config_file );
177 =head2 get_elasticsearch_mappings
179 my $mappings = $self->get_elasticsearch_mappings();
181 This provides the mappings that get passed to Elasticsearch when an index is
186 sub get_elasticsearch_mappings {
189 # Use state to speed up repeated calls
193 if (!defined $all_mappings{$self->index}) {
194 $sort_fields{$self->index} = {};
196 data => scalar _get_elasticsearch_mapping('general', '')
198 my $marcflavour = lc C4::Context->preference('marcflavour');
199 $self->_foreach_mapping(
201 my ( $name, $type, $facet, $suggestible, $sort, $marc_type ) = @_;
202 return if $marc_type ne $marcflavour;
203 # TODO if this gets any sort of complexity to it, it should
204 # be broken out into its own function.
206 # TODO be aware of date formats, but this requires pre-parsing
207 # as ES will simply reject anything with an invalid date.
208 my $es_type = 'text';
209 if ($type eq 'boolean') {
210 $es_type = 'boolean';
211 } elsif ($type eq 'number' || $type eq 'sum') {
212 $es_type = 'integer';
213 } elsif ($type eq 'isbn' || $type eq 'stdno') {
217 $mappings->{data}{properties}{$name} = _get_elasticsearch_mapping('search', $es_type);
220 $mappings->{data}{properties}{ $name . '__facet' } = _get_elasticsearch_mapping('facet', $es_type);
223 $mappings->{data}{properties}{ $name . '__suggestion' } = _get_elasticsearch_mapping('suggestible', $es_type);
225 # Sort is a bit special as it can be true, false, undef.
226 # We care about "true" or "undef",
227 # "undef" means to do the default thing, which is make it sortable.
228 if (!defined $sort || $sort) {
229 $mappings->{data}{properties}{ $name . '__sort' } = _get_elasticsearch_mapping('sort', $es_type);
230 $sort_fields{$self->index}{$name} = 1;
234 $all_mappings{$self->index} = $mappings;
236 $self->sort_fields(\%{$sort_fields{$self->index}});
238 return $all_mappings{$self->index};
241 =head2 _get_elasticsearch_mapping
243 Get the Elasticsearch mappings for the given purpose and data type.
245 $mapping = _get_elasticsearch_mapping('search', 'text');
249 sub _get_elasticsearch_mapping {
251 my ( $purpose, $type ) = @_;
253 # Use state to speed up repeated calls
254 state $settings = undef;
255 if (!defined $settings) {
256 my $config_file = C4::Context->config('elasticsearch_field_config');
257 $config_file ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/field_config.yaml';
258 $settings = LoadFile( $config_file );
261 if (!defined $settings->{$purpose}) {
262 die "Field purpose $purpose not defined in field config";
265 return $settings->{$purpose};
267 if (defined $settings->{$purpose}{$type}) {
268 return $settings->{$purpose}{$type};
270 if (defined $settings->{$purpose}{'default'}) {
271 return $settings->{$purpose}{'default'};
276 sub reset_elasticsearch_mappings {
277 my ( $reset_fields ) = @_;
278 my $mappings_yaml = C4::Context->config('elasticsearch_index_mappings');
279 $mappings_yaml ||= C4::Context->config('intranetdir') . '/admin/searchengine/elasticsearch/mappings.yaml';
280 my $indexes = LoadFile( $mappings_yaml );
282 while ( my ( $index_name, $fields ) = each %$indexes ) {
283 while ( my ( $field_name, $data ) = each %$fields ) {
284 my $field_type = $data->{type};
285 my $field_label = $data->{label};
286 my $mappings = $data->{mappings};
287 my $search_field = Koha::SearchFields->find_or_create({ name => $field_name, label => $field_label, type => $field_type }, { key => 'name' });
288 for my $mapping ( @$mappings ) {
289 my $marc_field = Koha::SearchMarcMaps->find_or_create({ index_name => $index_name, marc_type => $mapping->{marc_type}, marc_field => $mapping->{marc_field} });
290 $search_field->add_to_search_marc_maps($marc_field, { facet => $mapping->{facet} || 0, suggestible => $mapping->{suggestible} || 0, sort => $mapping->{sort} } );
296 # This overrides the accessor provided by Class::Accessor so that if
297 # sort_fields isn't set, then it'll generate it.
301 $self->_sort_fields_accessor(@_);
304 my $val = $self->_sort_fields_accessor();
307 # This will populate the accessor as a side effect
308 $self->get_elasticsearch_mappings();
309 return $self->_sort_fields_accessor();
312 =head2 _process_mappings($mappings, $data, $record_document)
314 $self->_process_mappings($mappings, $marc_field_data, $record_document)
316 Process all C<$mappings> targets operating on a specific MARC field C<$data>.
317 Since we group all mappings by MARC field targets C<$mappings> will contain
318 all targets for C<$data> and thus we need to fetch the MARC field only once.
319 C<$mappings> will be applied to C<$record_document> and new field values added.
320 The method has no return value.
326 Arrayref of mappings containing arrayrefs in the format
327 [C<$taget>, C<$options>] where C<$target> is the name of the target field and
328 C<$options> is a hashref containing processing directives for this particular
333 The source data from a MARC record field.
335 =item C<$record_document>
337 Hashref representing the Elasticsearch document on which mappings should be
344 sub _process_mappings {
345 my ($_self, $mappings, $data, $record_document) = @_;
346 foreach my $mapping (@{$mappings}) {
347 my ($target, $options) = @{$mapping};
348 # Copy (scalar) data since can have multiple targets
349 # with differing options for (possibly) mutating data
350 # so need a different copy for each
352 $record_document->{$target} //= [];
353 if (defined $options->{substr}) {
354 my ($start, $length) = @{$options->{substr}};
355 $_data = length($data) > $start ? substr $data, $start, $length : '';
357 if (defined $options->{value_callbacks}) {
358 $_data = reduce { $b->($a) } ($_data, @{$options->{value_callbacks}});
360 if (defined $options->{property}) {
362 $options->{property} => $_data
365 push @{$record_document->{$target}}, $_data;
369 =head2 marc_records_to_documents($marc_records)
371 my @record_documents = $self->marc_records_to_documents($marc_records);
373 Using mappings stored in database convert C<$marc_records> to Elasticsearch documents.
375 Returns array of hash references, representing Elasticsearch documents,
376 acceptable as body payload in C<Search::Elasticsearch> requests.
380 =item C<$marc_documents>
382 Reference to array of C<MARC::Record> objects to be converted to Elasticsearch documents.
388 sub marc_records_to_documents {
389 my ($self, $records) = @_;
390 my $rules = $self->_get_marc_mapping_rules();
391 my $control_fields_rules = $rules->{control_fields};
392 my $data_fields_rules = $rules->{data_fields};
393 my $marcflavour = lc C4::Context->preference('marcflavour');
395 my @record_documents;
397 foreach my $record (@{$records}) {
398 my $record_document = {};
399 my $mappings = $rules->{leader};
401 $self->_process_mappings($mappings, $record->leader(), $record_document);
403 foreach my $field ($record->fields()) {
404 if($field->is_control_field()) {
405 my $mappings = $control_fields_rules->{$field->tag()};
407 $self->_process_mappings($mappings, $field->data(), $record_document);
411 my $data_field_rules = $data_fields_rules->{$field->tag()};
413 if ($data_field_rules) {
414 my $subfields_mappings = $data_field_rules->{subfields};
415 my $wildcard_mappings = $subfields_mappings->{'*'};
416 foreach my $subfield ($field->subfields()) {
417 my ($code, $data) = @{$subfield};
418 my $mappings = $subfields_mappings->{$code} // [];
419 if ($wildcard_mappings) {
420 $mappings = [@{$mappings}, @{$wildcard_mappings}];
423 $self->_process_mappings($mappings, $data, $record_document);
427 my $subfields_join_mappings = $data_field_rules->{subfields_join};
428 if ($subfields_join_mappings) {
429 foreach my $subfields_group (keys %{$subfields_join_mappings}) {
430 # Map each subfield to values, remove empty values, join with space
435 map { join(' ', $field->subfield($_)) } split(//, $subfields_group)
439 $self->_process_mappings($subfields_join_mappings->{$subfields_group}, $data, $record_document);
446 foreach my $field (keys %{$rules->{defaults}}) {
447 unless (defined $record_document->{$field}) {
448 $record_document->{$field} = $rules->{defaults}->{$field};
451 foreach my $field (@{$rules->{sum}}) {
452 if (defined $record_document->{$field}) {
453 # TODO: validate numeric? filter?
454 # TODO: Or should only accept fields without nested values?
455 # TODO: Quick and dirty, improve if needed
456 $record_document->{$field} = sum0(grep { !ref($_) && m/\d+(\.\d+)?/} @{$record_document->{$field}});
459 # TODO: Perhaps should check if $records_document non empty, but really should never be the case
460 $record->encoding('UTF-8');
463 # Temporarily intercept all warn signals (MARC::Record carps when record length > 99999)
464 local $SIG{__WARN__} = sub {
465 push @warnings, $_[0];
467 $record_document->{'marc_data'} = encode_base64(encode('UTF-8', $record->as_usmarc()));
470 # Suppress warnings if record length exceeded
471 unless (substr($record->leader(), 0, 5) eq '99999') {
472 foreach my $warning (@warnings) {
476 $record_document->{'marc_data'} = $record->as_xml_record($marcflavour);
477 $record_document->{'marc_format'} = 'MARCXML';
480 $record_document->{'marc_format'} = 'base64ISO2709';
482 my $id = $record->subfield('999', 'c');
483 push @record_documents, [$id, $record_document];
485 return \@record_documents;
488 =head2 _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
490 my @mappings = _field_mappings($facet, $suggestible, $sort, $target_name, $target_type, $range)
492 Get mappings, an internal data structure later used by
493 L<_process_mappings($mappings, $data, $record_document)> to process MARC target
494 data for a MARC mapping.
496 The returned C<$mappings> is not to to be confused with mappings provided by
497 C<_foreach_mapping>, rather this sub accepts properties from a mapping as
498 provided by C<_foreach_mapping> and expands it to this internal data stucture.
499 In the caller context (C<_get_marc_mapping_rules>) the returned C<@mappings>
500 is then applied to each MARC target (leader, control field data, subfield or
501 joined subfields) and integrated into the mapping rules data structure used in
502 C<marc_records_to_documents> to transform MARC records into Elasticsearch
509 Boolean indicating whether to create a facet field for this mapping.
511 =item C<$suggestible>
513 Boolean indicating whether to create a suggestion field for this mapping.
517 Boolean indicating whether to create a sort field for this mapping.
519 =item C<$target_name>
521 Elasticsearch document target field name.
523 =item C<$target_type>
525 Elasticsearch document target field type.
529 An optional range as a string in the format "<START>-<END>" or "<START>",
530 where "<START>" and "<END>" are integers specifying a range that will be used
531 for extracting a substring from MARC data as Elasticsearch field target value.
533 The first character position is "1", and the range is inclusive,
534 so "1-3" means the first three characters of MARC data.
536 If only "<START>" is provided only one character at position "<START>" will
543 sub _field_mappings {
544 my ($_self, $facet, $suggestible, $sort, $target_name, $target_type, $range) = @_;
545 my %mapping_defaults = ();
548 my $substr_args = undef;
550 # TODO: use value_callback instead?
551 my ($start, $end) = map(int, split /-/, $range, 2);
552 $substr_args = [$start];
553 push @{$substr_args}, (defined $end ? $end - $start + 1 : 1);
555 my $default_options = {};
557 $default_options->{substr} = $substr_args;
560 # TODO: Should probably have per type value callback/hook
561 # but hard code for now
562 if ($target_type eq 'boolean') {
563 $default_options->{value_callbacks} //= [];
564 push @{$default_options->{value_callbacks}}, sub {
566 # Trim whitespace at both ends
567 $value =~ s/^\s+|\s+$//g;
568 return $value ? 'true' : 'false';
572 my $mapping = [$target_name, $default_options];
573 push @mappings, $mapping;
576 push @suffixes, 'facet' if $facet;
577 push @suffixes, 'suggestion' if $suggestible;
578 push @suffixes, 'sort' if !defined $sort || $sort;
580 foreach my $suffix (@suffixes) {
581 my $mapping = ["${target_name}__$suffix"];
582 # TODO: Hack, fix later in less hideous manner
583 if ($suffix eq 'suggestion') {
584 push @{$mapping}, {%{$default_options}, property => 'input'};
587 push @{$mapping}, $default_options;
589 push @mappings, $mapping;
594 =head2 _get_marc_mapping_rules
596 my $mapping_rules = $self->_get_marc_mapping_rules()
598 Generates rules from mappings stored in database for MARC records to Elasticsearch JSON document conversion.
600 Since field retrieval is slow in C<MARC::Records> (all fields are itereted through for
601 each call to C<MARC::Record>->field) we create an optimized structure of mapping
602 rules keyed by MARC field tags holding all the mapping rules for that particular tag.
604 We can then iterate through all MARC fields for each record and apply all relevant
605 rules once per fields instead of retreiving fields multiple times for each mapping rule
606 which is terribly slow.
610 # TODO: This structure can be used for processing multiple MARC::Records so is currently
611 # rebuilt for each batch. Since it is cacheable it could also be stored in an in
612 # memory cache which it is currently not. The performance gain of caching
613 # would probably be marginal, but to do this could be a further improvement.
615 sub _get_marc_mapping_rules {
617 my $marcflavour = lc C4::Context->preference('marcflavour');
618 my $field_spec_regexp = qr/^([0-9]{3})([()0-9a-z]+)?(?:_\/(\d+(?:-\d+)?))?$/;
619 my $leader_regexp = qr/^leader(?:_\/(\d+(?:-\d+)?))?$/;
622 'control_fields' => {},
628 $self->_foreach_mapping(sub {
629 my ($name, $type, $facet, $suggestible, $sort, $marc_type, $marc_field) = @_;
630 return if $marc_type ne $marcflavour;
632 if ($type eq 'sum') {
633 push @{$rules->{sum}}, $name;
635 elsif ($type eq 'boolean') {
636 # boolean gets special handling, if value doesn't exist for a field,
638 $rules->{defaults}->{$name} = 'false';
641 if ($marc_field =~ $field_spec_regexp) {
646 # Parse and separate subfields form subfield groups
648 my $subfield_group = '';
651 foreach my $token (split //, $2) {
654 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
655 "Unmatched opening parenthesis for $marc_field"
662 elsif ($token eq ")") {
664 if ($subfield_group) {
665 push @subfield_groups, $subfield_group;
666 $subfield_group = '';
671 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
672 "Unmatched closing parenthesis for $marc_field"
676 elsif ($open_group) {
677 $subfield_group .= $token;
680 push @subfields, $token;
685 push @subfields, '*';
688 my $range = defined $3 ? $3 : undef;
689 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
691 if ($field_tag < 10) {
692 $rules->{control_fields}->{$field_tag} //= [];
693 push @{$rules->{control_fields}->{$field_tag}}, @mappings;
696 $rules->{data_fields}->{$field_tag} //= {};
697 foreach my $subfield (@subfields) {
698 $rules->{data_fields}->{$field_tag}->{subfields}->{$subfield} //= [];
699 push @{$rules->{data_fields}->{$field_tag}->{subfields}->{$subfield}}, @mappings;
701 foreach my $subfield_group (@subfield_groups) {
702 $rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group} //= [];
703 push @{$rules->{data_fields}->{$field_tag}->{subfields_join}->{$subfield_group}}, @mappings;
707 elsif ($marc_field =~ $leader_regexp) {
708 my $range = defined $1 ? $1 : undef;
709 my @mappings = $self->_field_mappings($facet, $suggestible, $sort, $name, $type, $range);
710 push @{$rules->{leader}}, @mappings;
713 Koha::Exceptions::Elasticsearch::MARCFieldExprParseError->throw(
714 "Invalid MARC field expression: $marc_field"
721 =head2 _foreach_mapping
723 $self->_foreach_mapping(
725 my ( $name, $type, $facet, $suggestible, $sort, $marc_type,
728 return unless $marc_type eq 'marc21';
729 print "Data comes from: " . $marc_field . "\n";
733 This allows you to apply a function to each entry in the elasticsearch mappings
734 table, in order to build the mappings for whatever is needed.
736 In the provided function, the files are:
742 The field name for elasticsearch (corresponds to the 'mapping' column in the
747 The type for this value, e.g. 'string'.
751 True if this value should be facetised. This only really makes sense if the
752 field is understood by the facet processing code anyway.
756 True if this is a field that a) needs special sort handling, and b) if it
757 should be sorted on. False if a) but not b). Undef if not a). This allows,
758 for example, author to be sorted on but not everything marked with "author"
759 to be included in that sort.
763 A string that indicates the MARC type that this mapping is for, e.g. 'marc21',
764 'unimarc', 'normarc'.
768 A string that describes the MARC field that contains the data to extract.
769 These are of a form suited to Catmandu's MARC fixers.
775 sub _foreach_mapping {
776 my ( $self, $sub ) = @_;
778 # TODO use a caching framework here
779 my $search_fields = Koha::Database->schema->resultset('SearchField')->search(
781 'search_marc_map.index_name' => $self->index,
783 { join => { search_marc_to_fields => 'search_marc_map' },
785 'search_marc_to_fields.facet',
786 'search_marc_to_fields.suggestible',
787 'search_marc_to_fields.sort',
788 'search_marc_map.marc_type',
789 'search_marc_map.marc_field',
801 while ( my $search_field = $search_fields->next ) {
805 $search_field->get_column('facet'),
806 $search_field->get_column('suggestible'),
807 $search_field->get_column('sort'),
808 $search_field->get_column('marc_type'),
809 $search_field->get_column('marc_field'),
816 die process_error($@);
818 This parses an Elasticsearch error message and produces a human-readable
819 result from it. This result is probably missing all the useful information
820 that you might want in diagnosing an issue, so the warning is also logged.
822 Note that currently the resulting message is not internationalised. This
823 will happen eventually by some method or other.
828 my ($self, $msg) = @_;
830 warn $msg; # simple logging
832 # This is super-primitive
833 return "Unable to understand your search query, please rephrase and try again.\n" if $msg =~ /ParseException/;
835 return "Unable to perform your search. Please try again.\n";
838 =head2 _read_configuration
840 my $conf = _read_configuration();
842 Reads the I<configuration file> and returns a hash structure with the
843 configuration information. It raises an exception if mandatory entries
846 The hashref structure has the following form:
849 'nodes' => ['127.0.0.1:9200', 'anotherserver:9200'],
850 'index_name' => 'koha_instance',
853 This is configured by the following in the C<config> block in koha-conf.xml:
856 <server>127.0.0.1:9200</server>
857 <server>anotherserver:9200</server>
858 <index_name>koha_instance</index_name>
863 sub _read_configuration {
867 my $conf = C4::Context->config('elasticsearch');
868 Koha::Exceptions::Config::MissingEntry->throw(
869 "Missing 'elasticsearch' block in config file")
870 unless defined $conf;
872 if ( $conf && $conf->{server} ) {
873 my $nodes = $conf->{server};
874 if ( ref($nodes) eq 'ARRAY' ) {
875 $configuration->{nodes} = $nodes;
878 $configuration->{nodes} = [$nodes];
882 Koha::Exceptions::Config::MissingEntry->throw(
883 "Missing 'server' entry in config file for elasticsearch");
886 if ( defined $conf->{index_name} ) {
887 $configuration->{index_name} = $conf->{index_name};
890 Koha::Exceptions::Config::MissingEntry->throw(
891 "Missing 'index_name' entry in config file for elasticsearch");
894 return $configuration;
905 =item Chris Cormack C<< <chrisc@catalyst.net.nz> >>
907 =item Robin Sheat C<< <robin@catalyst.net.nz> >>
909 =item Jonathan Druart C<< <jonathan.druart@bugs.koha-community.org> >>