3 # daemon to watch the zebraqueue and update zebra as needed
7 # find Koha's Perl modules
8 # test carefully before changing this
10 eval { require "$FindBin::Bin/kohalib.pl" };
13 use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream Driver::SysRW);
14 use Unix::Syslog qw(:macros);
19 use C4::AuthoritiesMarc;
25 # wait periods governing connection attempts
26 my $min_connection_wait = 1; # start off at 1 second
27 my $max_connection_wait = 1024; # max about 17 minutes
29 # keep separate wait period for bib and authority Zebra databases
30 my %zoom_connection_waits = ();
32 my $db_connection_wait = $min_connection_wait;
34 # ZOOM and Z39.50 errors that are potentially
35 # resolvable by connecting again and retrying
37 my %retriable_zoom_errors = (
38 10000 => 'ZOOM_ERROR_CONNECT',
39 10001 => 'ZOOM_ERROR_MEMORY',
40 10002 => 'ZOOM_ERROR_ENCODE',
41 10003 => 'ZOOM_ERROR_DECODE',
42 10004 => 'ZOOM_ERROR_CONNECTION_LOST',
43 10005 => 'ZOOM_ERROR_INIT',
44 10006 => 'ZOOM_ERROR_INTERNAL',
45 10007 => 'ZOOM_ERROR_TIMEOUT',
48 # structure to store updates that have
49 # failed and are to be retrieved. The
50 # structure is a hashref of hashrefs,
53 # $postoned_updates->{$server}->{$record_number} = 1;
55 # If an operation is attempted and fails because
56 # of a retriable error (see above), the daemon
57 # will try several times to recover as follows:
59 # 1. close and reopen the connection to the
60 # Zebra server, unless the error was a timeout,
62 # 2. retry the operation
64 # If, after trying this five times, the operation still
65 # fails, the daemon will mark the record number as
66 # postponed, and try to process other entries in
67 # zebraqueue. When an update is postponed, the
68 # error will be reported to syslog.
70 # If more than 100 postponed updates are
71 # accumulated, the daemon will assume that
72 # something is seriously wrong, complain loudly,
73 # and abort. If running under the daemon(1) command,
74 # this means that the daemon will respawn.
76 my $num_postponed_updates = 0;
77 my $postponed_updates = {};
79 my $max_operation_attempts = 5;
80 my $max_postponed_updates = 100;
82 # Zebra connection timeout
83 my $zconn_timeout = 30;
84 my $zconn_timeout_multiplier = 1.5;
85 my $max_zconn_timeout = 120;
87 my $ident = "Koha Zebraqueue ";
90 Unix::Syslog::openlog $ident, LOG_PID, LOG_LOCAL0;
92 Unix::Syslog::syslog LOG_INFO, "Starting Zebraqueue log at " . scalar localtime(time) . "\n";
96 # Starts session. Only ever called once only really used to set an alias
98 my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
100 my $time = localtime(time);
101 Unix::Syslog::syslog LOG_INFO, "$time POE Session ", $session->ID, " has started.\n";
104 # $kernel->yield('status_check');
105 $kernel->yield('sleep');
110 # can be used to slow down loop execution if needed
111 my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
112 use Time::HiRes qw (sleep);
113 Time::HiRes::sleep(0.01);
115 $kernel->yield('status_check');
119 # check if we need to do anything, at the moment just checks the zebraqueue, it could check other things too
120 my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
121 my $dbh = get_db_connection();
122 my $sth = $dbh->prepare("SELECT count(*) AS opcount FROM zebraqueue WHERE done = 0");
124 my $data = $sth->fetchrow_hashref();
125 if ($data->{'opcount'} > 0) {
126 Unix::Syslog::syslog LOG_INFO, "$data->{'opcount'} operations waiting to be run\n";
128 $kernel->yield('do_ops');
132 $kernel->yield('sleep');
137 # execute operations waiting in the zebraqueue
138 my ( $kernel, $heap, $session ) = @_[ KERNEL, HEAP, SESSION ];
139 my $dbh = get_db_connection();
140 my $readsth = $dbh->prepare("SELECT id, biblio_auth_number, operation, server FROM zebraqueue WHERE done = 0");
142 Unix::Syslog::syslog LOG_INFO, "Executing zebra operations\n";
144 ZEBRAQUEUE: while (my $data = $readsth->fetchrow_hashref()) {
145 warn "Inside while loop" if $debug;
147 my $id = $data->{'id'};
148 my $op = $data->{'operation'};
149 $op = 'recordDelete' if $op =~ /delete/i; # delete ops historically have been coded
150 # either delete_record or recordDelete
151 my $record_number = $data->{'biblio_auth_number'};
152 my $server = $data->{'server'};
154 next ZEBRAQUEUE if exists $postponed_updates->{$server}->{$record_number};
158 if ($op eq 'recordDelete') {
159 $ok = process_delete($dbh, $server, $record_number);
162 $ok = process_update($dbh, $server, $record_number, $id);
165 mark_done($dbh, $record_number, $op);
169 $kernel->yield('status_check');
175 my $record_number = shift;
180 warn "Searching for record to delete" if $debug;
181 # 1st read the record in zebra, we have to get it from zebra as its no longer in the db
182 my $Zconn = get_zebra_connection($server);
183 my $results = $Zconn->search_pqf( '@attr 1=Local-number '.$record_number);
184 $results->option(elementSetName => 'marcxml');
185 $record = $results->record(0)->raw();
188 # this doesn't exist, so no need to wail on zebra to delete it
189 if ($@->code() eq 13) {
192 # caught a ZOOM::Exception
193 my $error = _format_zoom_error_message($@);
194 warn "ERROR: $error";
197 # then, delete the record
198 warn "Deleting record" if $debug;
199 $ok = zebrado($record, 'recordDelete', $server, $record_number);
207 my $record_number = shift;
213 warn "Updating record" if $debug;
216 if ($server eq "biblioserver") {
217 my $marc = GetMarcBiblio($record_number);
218 $marcxml = $marc->as_xml_record() if $marc;
220 elsif ($server eq "authorityserver") {
221 $marcxml = C4::AuthoritiesMarc::GetAuthorityXML($record_number);
223 # check it's XML, just in case
225 my $hashed = XMLin($marcxml);
226 }; ### is it a proper xml? broken xml may crash ZEBRA- slow but safe
227 ## it's Broken XML-- Should not reach here-- but if it does -lets protect ZEBRA
229 Unix::Syslog::syslog LOG_ERR, "$server record $record_number is malformed: $@";
230 mark_done_by_id($dbh, $id);
233 # ok, we have everything, do the operation in zebra !
234 $ok = zebrado($marcxml, 'specialUpdate', $server, $record_number);
239 sub mark_done_by_id {
242 my $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1 WHERE id = ?");
243 $delsth->execute($id);
248 my $record_number = shift;
252 if ($op eq 'recordDelete') {
253 # if it's a deletion, we can delete every request on this biblio : in case the user
254 # did a modif (or item deletion) just before biblio deletion, there are some specialUpdate
255 # that are pending and can't succeed, as we don't have the XML anymore
256 # so, delete everything for this biblionumber
257 $delsth = $dbh->prepare_cached("UPDATE zebraqueue SET done=1 WHERE biblio_auth_number = ? and operation = ?");
258 $delsth->execute($record_number, $op);
260 # if it's not a deletion, delete every pending specialUpdate for this biblionumber
261 # in case the user add biblio, then X items, before this script runs
262 # this avoid indexing X+1 times where just 1 is enough.
263 $delsth = $dbh->prepare("UPDATE zebraqueue SET done = 1
264 WHERE biblio_auth_number = ? and operation = 'specialUpdate'");
265 $delsth->execute($record_number);
270 ###Accepts a $server variable thus we can use it to update biblios, authorities or other zebra dbs
271 my ($record, $op, $server, $record_number) = @_;
274 my $message = "error updating index for $server $record $record_number: no source record";
275 postpone_update($server, $record_number, $message);
281 ATTEMPT: while ($attempts < $max_operation_attempts) {
283 warn "Attempt $attempts for $op for $server $record_number" if $debug;
284 my $Zconn = get_zebra_connection($server);
286 my $Zpackage = $Zconn->package();
287 $Zpackage->option(action => $op);
288 $Zpackage->option(record => $record);
290 eval { $Zpackage->send("update") };
291 if ($@ && $@->isa("ZOOM::Exception")) {
292 my $message = _format_zoom_error_message($@);
293 my $error = $@->code();
294 if (exists $retriable_zoom_errors{$error}) {
295 warn "reattempting operation $op for $server $record_number" if $debug;
296 warn "last Zebra error was $message" if $debug;
297 $Zpackage->destroy();
299 if ($error == 10007 and $zconn_timeout < $max_zconn_timeout) {
300 # bump up connection timeout
301 $zconn_timeout = POSIX::ceil($zconn_timeout * $zconn_timeout_multiplier);
302 $zconn_timeout = $max_zconn_timeout if $zconn_timeout > $max_zconn_timeout;
303 Unix::Syslog::syslog LOG_INFO, "increased Zebra connection timeout to $zconn_timeout\n";
304 warn "increased Zebra connection timeout to $zconn_timeout" if $debug;
308 postpone_update($server, $record_number, $message);
311 eval { $Zpackage->send('commit'); };
313 # operation succeeded, but commit
314 # did not - we have a problem
315 my $message = _format_zoom_error_message($@);
316 postpone_update($server, $record_number, $message);
324 my $message = "Made $attempts attempts to index $server record $record_number without success";
325 postpone_update($server, $record_number, $message);
331 sub postpone_update {
332 my ($server, $record_number, $message) = @_;
333 warn $message if $debug;
334 $message .= "\n" unless $message =~ /\n$/;
335 Unix::Syslog::syslog LOG_ERR, $message;
336 $postponed_updates->{$server}->{$record_number} = 1;
338 $num_postponed_updates++;
339 if ($num_postponed_updates > $max_postponed_updates) {
340 warn "exiting, over $max_postponed_updates postponed indexing updates";
341 Unix::Syslog::syslog LOG_ERR, "exiting, over $max_postponed_updates postponed indexing updates";
342 Unix::Syslog::closelog;
349 my $time = localtime(time);
350 Unix::Syslog::syslog LOG_INFO, "$time Session ", $_[SESSION]->ID, " has stopped.\n";
351 delete $heap->{session};
354 # get a DB connection
355 sub get_db_connection {
358 $db_connection_wait = $min_connection_wait unless defined $db_connection_wait;
361 # note that C4::Context caches the
362 # DB handle; C4::Context->dbh() will
363 # check that handle first before returning
364 # it. If the connection is bad, it
365 # then tries (once) to create a new one.
366 $dbh = C4::Context->dbh();
370 # C4::Context->dbh dies if it cannot
371 # establish a connection
372 $db_connection_wait = $min_connection_wait;
377 my $error = "failed to connect to DB: $DBI::errstr";
378 warn $error if $debug;
379 Unix::Syslog::syslog LOG_ERR, $error;
380 sleep $db_connection_wait;
381 $db_connection_wait *= 2 unless $db_connection_wait >= $max_connection_wait;
385 # get a Zebra connection
386 sub get_zebra_connection {
389 # start connection retry wait queue if necessary
390 $zoom_connection_waits{$server} = $min_connection_wait unless exists $zoom_connection_waits{$server};
392 # try to connect to Zebra forever until we succeed
394 # what follows assumes that C4::Context->Zconn
395 # makes only one attempt to create a new connection;
396 my $Zconn = C4::Context->Zconn($server, 0, 1, '', 'xml');
397 $Zconn->option('timeout' => $zconn_timeout);
399 # it is important to note that if the existing connection
400 # stored by C4::Context has an error (any type of error)
401 # from the last transaction, C4::Context->Zconn closes
402 # it and establishes a new one. Therefore, the
403 # following check will succeed if we have a new, good
404 # connection or we're using a previously established
405 # connection that has experienced no errors.
406 if ($Zconn->errcode() == 0) {
407 $zoom_connection_waits{$server} = $min_connection_wait;
412 my $error = _format_zoom_error_message($Zconn);
413 warn $error if $debug;
414 Unix::Syslog::syslog LOG_ERR, $error;
415 sleep $zoom_connection_waits{$server};
416 $zoom_connection_waits{$server} *= 2 unless $zoom_connection_waits{$server} >= $max_connection_wait;
420 # given a ZOOM::Exception or
421 # ZOOM::Connection object, generate
422 # a human-reaable error message
423 sub _format_zoom_error_message {
427 if (ref($err) eq 'ZOOM::Connection') {
428 $message = $err->errmsg() . " (" . $err->diagset . " " . $err->errcode() . ") " . $err->addinfo();
429 } elsif (ref($err) eq 'ZOOM::Exception') {
430 $message = $err->message() . " (" . $err->diagset . " " . $err->code() . ") " . $err->addinfo();
435 POE::Session->create(
437 _start => \&handler_start,
438 sleep => \&handler_sleep,
439 status_check => \&handler_check,
441 _stop => \&handler_stop,
448 Unix::Syslog::closelog;