corosync  2.3.2
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2009 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - encryption of message contents with nss
45  * - authentication of meessage contents with SHA1/HMAC
46  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
47  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
48  */
49 
50 #include <config.h>
51 
52 #include <assert.h>
53 #ifdef HAVE_ALLOCA_H
54 #include <alloca.h>
55 #endif
56 #include <sys/mman.h>
57 #include <sys/types.h>
58 #include <sys/stat.h>
59 #include <sys/socket.h>
60 #include <netdb.h>
61 #include <sys/un.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
66 #include <unistd.h>
67 #include <fcntl.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <errno.h>
71 #include <sched.h>
72 #include <time.h>
73 #include <sys/time.h>
74 #include <sys/poll.h>
75 #include <sys/uio.h>
76 #include <limits.h>
77 
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
81 
82 #include <corosync/swab.h>
83 #include <corosync/sq.h>
84 #include <corosync/list.h>
85 
86 #define LOGSYS_UTILS_ONLY 1
87 #include <corosync/logsys.h>
88 
89 #include "totemsrp.h"
90 #include "totemrrp.h"
91 #include "totemnet.h"
92 
93 #include "cs_queue.h"
94 
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99 #define MAXIOVS 5
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000 /* bytes */
102 #define LEAVE_DUMMY_NODEID 0
103 
104 /*
105  * Rollover handling:
106  * SEQNO_START_MSG is the starting sequence number after a new configuration
107  * This should remain zero, unless testing overflow in which case
108  * 0x7ffff000 and 0xfffff000 are good starting values.
109  *
110  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
111  * for a token. This should remain zero, unless testing overflow in which
112  * case 07fffff00 or 0xffffff00 are good starting values.
113  */
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
116 
117 /*
118  * These can be used ot test different rollover points
119  * #define SEQNO_START_MSG 0xfffffe00
120  * #define SEQNO_START_TOKEN 0xfffffe00
121  */
122 
123 /*
124  * These can be used to test the error recovery algorithms
125  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
126  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
127  * #define TEST_DROP_MCAST_PERCENTAGE 50
128  * #define TEST_RECOVERY_MSG_COUNT 300
129  */
130 
131 /*
132  * we compare incoming messages to determine if their endian is
133  * different - if so convert them
134  *
135  * do not change
136  */
137 #define ENDIAN_LOCAL 0xff22
138 
140  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
141  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
142  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
143  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
144  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
145  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
146 };
147 
151 };
153 /*
154  * New membership algorithm local variables
155  */
156 struct srp_addr {
157  uint8_t no_addrs;
159 };
160 
161 
163  struct srp_addr addr;
164  int set;
165 };
166 
167 
169  struct list_head list;
170  int (*callback_fn) (enum totem_callback_token_type type, const void *);
172  int delete;
173  void *data;
174 };
175 
176 
178  int mcast;
179  int token;
180 };
181 
182 struct message_header {
183  char type;
184  char encapsulated;
185  unsigned short endian_detector;
186  unsigned int nodeid;
187 } __attribute__((packed));
188 
189 
190 struct mcast {
193  unsigned int seq;
196  unsigned int node_id;
198 } __attribute__((packed));
199 
200 
201 struct rtr_item {
203  unsigned int seq;
204 }__attribute__((packed));
205 
206 
207 struct orf_token {
209  unsigned int seq;
210  unsigned int token_seq;
211  unsigned int aru;
212  unsigned int aru_addr;
214  unsigned int backlog;
215  unsigned int fcc;
218  struct rtr_item rtr_list[0];
219 }__attribute__((packed));
220 
221 
222 struct memb_join {
225  unsigned int proc_list_entries;
226  unsigned int failed_list_entries;
227  unsigned long long ring_seq;
228  unsigned char end_of_memb_join[0];
229 /*
230  * These parts of the data structure are dynamic:
231  * struct srp_addr proc_list[];
232  * struct srp_addr failed_list[];
233  */
234 } __attribute__((packed));
235 
236 
241 } __attribute__((packed));
242 
243 
247 } __attribute__((packed));
248 
249 
252  unsigned int aru;
253  unsigned int high_delivered;
254  unsigned int received_flg;
255 }__attribute__((packed));
256 
257 
260  unsigned int token_seq;
262  unsigned int retrans_flg;
265  unsigned char end_of_commit_token[0];
266 /*
267  * These parts of the data structure are dynamic:
268  *
269  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
270  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
271  */
272 }__attribute__((packed));
273 
274 struct message_item {
275  struct mcast *mcast;
276  unsigned int msg_len;
277 };
278 
280  struct mcast *mcast;
281  unsigned int msg_len;
282 };
283 
289 };
290 
293 
295 
296  /*
297  * Flow control mcasts and remcasts on last and current orf_token
298  */
300 
302 
304 
306 
308 
309  struct srp_addr my_id;
310 
312 
314 
316 
318 
320 
322 
324 
326 
328 
330 
332 
334 
336 
338 
340 
342 
344 
346 
347  unsigned int my_last_aru;
348 
350 
352 
353  unsigned int my_high_seq_received;
354 
355  unsigned int my_install_seq;
356 
358 
360 
362 
364 
366 
367  /*
368  * Queues used to order, deliver, and recover messages
369  */
371 
373 
375 
377 
379 
380  /*
381  * Received up to and including
382  */
383  unsigned int my_aru;
384 
385  unsigned int my_high_delivered;
386 
388 
390 
392 
394 
395  unsigned int my_token_seq;
396 
397  /*
398  * Timers
399  */
400  qb_loop_timer_handle timer_pause_timeout;
401 
402  qb_loop_timer_handle timer_orf_token_timeout;
403 
405 
407 
408  qb_loop_timer_handle timer_merge_detect_timeout;
409 
411 
413 
414  qb_loop_timer_handle memb_timer_state_commit_timeout;
415 
416  qb_loop_timer_handle timer_heartbeat_timeout;
417 
418  /*
419  * Function and data used to log messages
420  */
422 
424 
426 
428 
430 
432 
434 
436  int level,
437  int sybsys,
438  const char *function,
439  const char *file,
440  int line,
441  const char *format, ...)__attribute__((format(printf, 6, 7)));;
442 
444 
445 //TODO struct srp_addr next_memb;
446 
448 
450 
452  unsigned int nodeid,
453  const void *msg,
454  unsigned int msg_len,
455  int endian_conversion_required);
456 
458  enum totem_configuration_type configuration_type,
459  const unsigned int *member_list, size_t member_list_entries,
460  const unsigned int *left_list, size_t left_list_entries,
461  const unsigned int *joined_list, size_t joined_list_entries,
462  const struct memb_ring_id *ring_id);
463 
465 
467  int waiting_trans_ack);
468 
470 
472 
473  unsigned long long token_ring_id_seq;
474 
475  unsigned int last_released;
476 
477  unsigned int set_aru;
478 
480 
482 
484 
485  unsigned int my_last_seq;
486 
487  struct timeval tv_old;
488 
490 
492 
493  unsigned int use_heartbeat;
494 
495  unsigned int my_trc;
496 
497  unsigned int my_pbl;
498 
499  unsigned int my_cbl;
500 
501  uint64_t pause_timestamp;
502 
504 
506 
508 
510 
512 
515  char commit_token_storage[40000];
516 };
517 
519  int count;
520  int (*handler_functions[6]) (
521  struct totemsrp_instance *instance,
522  const void *msg,
523  size_t msg_len,
524  int endian_conversion_needed);
525 };
526 
527 /*
528  * forward decls
529  */
530 static int message_handler_orf_token (
531  struct totemsrp_instance *instance,
532  const void *msg,
533  size_t msg_len,
534  int endian_conversion_needed);
535 
536 static int message_handler_mcast (
537  struct totemsrp_instance *instance,
538  const void *msg,
539  size_t msg_len,
540  int endian_conversion_needed);
541 
542 static int message_handler_memb_merge_detect (
543  struct totemsrp_instance *instance,
544  const void *msg,
545  size_t msg_len,
546  int endian_conversion_needed);
547 
548 static int message_handler_memb_join (
549  struct totemsrp_instance *instance,
550  const void *msg,
551  size_t msg_len,
552  int endian_conversion_needed);
553 
554 static int message_handler_memb_commit_token (
555  struct totemsrp_instance *instance,
556  const void *msg,
557  size_t msg_len,
558  int endian_conversion_needed);
559 
560 static int message_handler_token_hold_cancel (
561  struct totemsrp_instance *instance,
562  const void *msg,
563  size_t msg_len,
564  int endian_conversion_needed);
565 
566 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
567 
568 static unsigned int main_msgs_missing (void);
569 
570 static void main_token_seqid_get (
571  const void *msg,
572  unsigned int *seqid,
573  unsigned int *token_is);
574 
575 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
576 
577 static void srp_addr_to_nodeid (
578  unsigned int *nodeid_out,
579  struct srp_addr *srp_addr_in,
580  unsigned int entries);
581 
582 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
583 
584 static void memb_leave_message_send (struct totemsrp_instance *instance);
585 
586 static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb_ring_id *);
587 
588 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
589 static void memb_state_gather_enter (struct totemsrp_instance *instance, int gather_from);
590 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
591 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
592  int fcc_mcasts_allowed);
593 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
594 
595 static void memb_ring_id_set_and_store (struct totemsrp_instance *instance,
596  const struct memb_ring_id *ring_id);
597 static void target_set_completed (void *context);
598 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
599 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
600 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
601 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
602 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
603 static int token_hold_cancel_send (struct totemsrp_instance *instance);
604 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
605 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
606 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
607 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
608 static void memb_merge_detect_endian_convert (
609  const struct memb_merge_detect *in,
610  struct memb_merge_detect *out);
611 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
612 static void timer_function_orf_token_timeout (void *data);
613 static void timer_function_pause_timeout (void *data);
614 static void timer_function_heartbeat_timeout (void *data);
615 static void timer_function_token_retransmit_timeout (void *data);
616 static void timer_function_token_hold_retransmit_timeout (void *data);
617 static void timer_function_merge_detect_timeout (void *data);
618 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
619 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
620 
621 void main_deliver_fn (
622  void *context,
623  const void *msg,
624  unsigned int msg_len);
625 
627  void *context,
628  const struct totem_ip_address *iface_address,
629  unsigned int iface_no);
630 
632  6,
633  {
634  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
635  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
636  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
637  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
638  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
639  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
640  }
641 };
642 
643 static const char *rundir = NULL;
644 
645 #define log_printf(level, format, args...) \
646 do { \
647  instance->totemsrp_log_printf ( \
648  level, instance->totemsrp_subsys_id, \
649  __FUNCTION__, __FILE__, __LINE__, \
650  format, ##args); \
651 } while (0);
652 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
653 do { \
654  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
655  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
656  instance->totemsrp_log_printf ( \
657  level, instance->totemsrp_subsys_id, \
658  __FUNCTION__, __FILE__, __LINE__, \
659  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
660  } while(0)
661 
662 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
663 {
664  memset (instance, 0, sizeof (struct totemsrp_instance));
665 
666  list_init (&instance->token_callback_received_listhead);
667 
668  list_init (&instance->token_callback_sent_listhead);
669 
670  instance->my_received_flg = 1;
671 
672  instance->my_token_seq = SEQNO_START_TOKEN - 1;
673 
675 
676  instance->set_aru = -1;
677 
678  instance->my_aru = SEQNO_START_MSG;
679 
681 
683 
684  instance->orf_token_discard = 0;
685 
686  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
687 
688  instance->my_id.no_addrs = INTERFACE_MAX;
689 
690  instance->waiting_trans_ack = 1;
691 }
692 
693 static void main_token_seqid_get (
694  const void *msg,
695  unsigned int *seqid,
696  unsigned int *token_is)
697 {
698  const struct orf_token *token = msg;
699 
700  *seqid = 0;
701  *token_is = 0;
702  if (token->header.type == MESSAGE_TYPE_ORF_TOKEN) {
703  *seqid = token->token_seq;
704  *token_is = 1;
705  }
706 }
707 
708 static unsigned int main_msgs_missing (void)
709 {
710 // TODO
711  return (0);
712 }
713 
714 static int pause_flush (struct totemsrp_instance *instance)
715 {
716  uint64_t now_msec;
717  uint64_t timestamp_msec;
718  int res = 0;
719 
720  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
721  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
722 
723  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
725  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
726  /*
727  * -1 indicates an error from recvmsg
728  */
729  do {
731  } while (res == -1);
732  }
733  return (res);
734 }
735 
736 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
737 {
738  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
739  uint32_t time_now;
740  unsigned long long nano_secs = qb_util_nano_current_get ();
741 
742  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
743 
744  if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
745  /* incr latest token the index */
746  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
747  instance->stats.latest_token = 0;
748  else
749  instance->stats.latest_token++;
750 
751  if (instance->stats.earliest_token == instance->stats.latest_token) {
752  /* we have filled up the array, start overwriting */
753  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
754  instance->stats.earliest_token = 0;
755  else
756  instance->stats.earliest_token++;
757 
758  instance->stats.token[instance->stats.earliest_token].rx = 0;
759  instance->stats.token[instance->stats.earliest_token].tx = 0;
760  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
761  }
762 
763  instance->stats.token[instance->stats.latest_token].rx = time_now;
764  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
765  } else {
766  instance->stats.token[instance->stats.latest_token].tx = time_now;
767  }
768  return 0;
769 }
770 
771 /*
772  * Exported interfaces
773  */
775  qb_loop_t *poll_handle,
776  void **srp_context,
777  struct totem_config *totem_config,
779 
780  void (*deliver_fn) (
781  unsigned int nodeid,
782  const void *msg,
783  unsigned int msg_len,
784  int endian_conversion_required),
785 
786  void (*confchg_fn) (
787  enum totem_configuration_type configuration_type,
788  const unsigned int *member_list, size_t member_list_entries,
789  const unsigned int *left_list, size_t left_list_entries,
790  const unsigned int *joined_list, size_t joined_list_entries,
791  const struct memb_ring_id *ring_id),
792  void (*waiting_trans_ack_cb_fn) (
793  int waiting_trans_ack))
794 {
795  struct totemsrp_instance *instance;
796  unsigned int res;
797 
798  instance = malloc (sizeof (struct totemsrp_instance));
799  if (instance == NULL) {
800  goto error_exit;
801  }
802 
803  rundir = getenv ("COROSYNC_RUN_DIR");
804  if (rundir == NULL) {
805  rundir = LOCALSTATEDIR "/lib/corosync";
806  }
807 
808  res = mkdir (rundir, 0700);
809  if (res == -1 && errno != EEXIST) {
810  goto error_destroy;
811  }
812 
813  res = chdir (rundir);
814  if (res == -1) {
815  goto error_destroy;
816  }
817 
818  totemsrp_instance_initialize (instance);
819 
820  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
821  instance->totemsrp_waiting_trans_ack_cb_fn (1);
822 
823  stats->srp = &instance->stats;
824  instance->stats.latest_token = 0;
825  instance->stats.earliest_token = 0;
826 
827  instance->totem_config = totem_config;
828 
829  /*
830  * Configure logging
831  */
840 
841  /*
842  * Initialize local variables for totemsrp
843  */
844  totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
845 
846  /*
847  * Display totem configuration
848  */
850  "Token Timeout (%d ms) retransmit timeout (%d ms)",
851  totem_config->token_timeout, totem_config->token_retransmit_timeout);
853  "token hold (%d ms) retransmits before loss (%d retrans)",
854  totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
856  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
857  totem_config->join_timeout,
858  totem_config->send_join_timeout,
859  totem_config->consensus_timeout,
860 
861  totem_config->merge_timeout);
863  "downcheck (%d ms) fail to recv const (%d msgs)",
864  totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
866  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
867 
869  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
870  totem_config->window_size, totem_config->max_messages);
871 
873  "missed count const (%d messages)",
874  totem_config->miss_count_const);
875 
877  "send threads (%d threads)", totem_config->threads);
879  "RRP token expired timeout (%d ms)",
880  totem_config->rrp_token_expired_timeout);
882  "RRP token problem counter (%d ms)",
883  totem_config->rrp_problem_count_timeout);
885  "RRP threshold (%d problem count)",
886  totem_config->rrp_problem_count_threshold);
888  "RRP multicast threshold (%d problem count)",
889  totem_config->rrp_problem_count_mcast_threshold);
891  "RRP automatic recovery check timeout (%d ms)",
892  totem_config->rrp_autorecovery_check_timeout);
894  "RRP mode set to %s.", instance->totem_config->rrp_mode);
895 
897  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
899  "max_network_delay (%d ms)", totem_config->max_network_delay);
900 
901 
902  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
903  sizeof (struct message_item), instance->threaded_mode_enabled);
904 
905  sq_init (&instance->regular_sort_queue,
906  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
907 
908  sq_init (&instance->recovery_sort_queue,
909  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
910 
911  instance->totemsrp_poll_handle = poll_handle;
912 
913  instance->totemsrp_deliver_fn = deliver_fn;
914 
915  instance->totemsrp_confchg_fn = confchg_fn;
916  instance->use_heartbeat = 1;
917 
918  timer_function_pause_timeout (instance);
919 
920  if ( totem_config->heartbeat_failures_allowed == 0 ) {
922  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
923  instance->use_heartbeat = 0;
924  }
925 
926  if (instance->use_heartbeat) {
927  instance->heartbeat_timeout
928  = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
929  + totem_config->max_network_delay;
930 
931  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
933  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
934  instance->heartbeat_timeout,
935  totem_config->token_timeout);
937  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
939  "heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!");
940  instance->use_heartbeat = 0;
941  }
942  else {
944  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
945  }
946  }
947 
949  poll_handle,
950  &instance->totemrrp_context,
951  totem_config,
952  stats->srp,
953  instance,
956  main_token_seqid_get,
957  main_msgs_missing,
958  target_set_completed);
959 
960  /*
961  * Must have net_mtu adjusted by totemrrp_initialize first
962  */
963  cs_queue_init (&instance->new_message_queue,
965  sizeof (struct message_item), instance->threaded_mode_enabled);
966 
967  cs_queue_init (&instance->new_message_queue_trans,
969  sizeof (struct message_item), instance->threaded_mode_enabled);
970 
972  &instance->token_recv_event_handle,
974  0,
975  token_event_stats_collector,
976  instance);
978  &instance->token_sent_event_handle,
980  0,
981  token_event_stats_collector,
982  instance);
983  *srp_context = instance;
984  return (0);
985 
986 error_destroy:
987  free (instance);
988 
989 error_exit:
990  return (-1);
991 }
992 
994  void *srp_context)
995 {
996  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
997 
998 
999  memb_leave_message_send (instance);
1000  totemrrp_finalize (instance->totemrrp_context);
1001  cs_queue_free (&instance->new_message_queue);
1002  cs_queue_free (&instance->new_message_queue_trans);
1003  cs_queue_free (&instance->retrans_message_queue);
1004  sq_free (&instance->regular_sort_queue);
1005  sq_free (&instance->recovery_sort_queue);
1006  free (instance);
1007 }
1008 
1009 /*
1010  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1011  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1012  * function.
1013  *
1014  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1015  * and if interface was not found, -1 is returned.
1016  */
1018  void *srp_context,
1019  unsigned int nodeid,
1020  struct totem_ip_address *interfaces,
1021  unsigned int interfaces_size,
1022  char ***status,
1023  unsigned int *iface_count)
1024 {
1025  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1026  int res = 0;
1027  unsigned int found = 0;
1028  unsigned int i;
1029 
1030  for (i = 0; i < instance->my_memb_entries; i++) {
1031  if (instance->my_memb_list[i].addr[0].nodeid == nodeid) {
1032  found = 1;
1033  break;
1034  }
1035  }
1036 
1037  if (found) {
1038  *iface_count = instance->totem_config->interface_count;
1039 
1040  if (interfaces_size >= *iface_count) {
1041  memcpy (interfaces, instance->my_memb_list[i].addr,
1042  sizeof (struct totem_ip_address) * *iface_count);
1043  } else {
1044  res = -2;
1045  }
1046 
1047  goto finish;
1048  }
1049 
1050  for (i = 0; i < instance->my_left_memb_entries; i++) {
1051  if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
1052  found = 1;
1053  break;
1054  }
1055  }
1056 
1057  if (found) {
1058  *iface_count = instance->totem_config->interface_count;
1059 
1060  if (interfaces_size >= *iface_count) {
1061  memcpy (interfaces, instance->my_left_memb_list[i].addr,
1062  sizeof (struct totem_ip_address) * *iface_count);
1063  } else {
1064  res = -2;
1065  }
1066  } else {
1067  res = -1;
1068  }
1069 
1070 finish:
1071  totemrrp_ifaces_get (instance->totemrrp_context, status, NULL);
1072  return (res);
1073 }
1074 
1076  void *srp_context,
1077  const char *cipher_type,
1078  const char *hash_type)
1079 {
1080  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1081  int res;
1082 
1083  res = totemrrp_crypto_set(instance->totemrrp_context, cipher_type, hash_type);
1084 
1085  return (res);
1086 }
1087 
1088 
1090  void *srp_context)
1091 {
1092  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1093  unsigned int res;
1094 
1095  res = instance->totem_config->interfaces[0].boundto.nodeid;
1096 
1097  return (res);
1098 }
1099 
1101  void *srp_context)
1102 {
1103  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1104  int res;
1105 
1106  res = instance->totem_config->interfaces[0].boundto.family;
1107 
1108  return (res);
1109 }
1110 
1111 
1113  void *srp_context)
1114 {
1115  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1116 
1118  instance->totem_config->interface_count);
1119 
1120  return (0);
1121 }
1122 
1123 
1124 /*
1125  * Set operations for use by the membership algorithm
1126  */
1127 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1128 {
1129  unsigned int i;
1130  unsigned int res;
1131 
1132  for (i = 0; i < 1; i++) {
1133  res = totemip_equal (&a->addr[i], &b->addr[i]);
1134  if (res == 0) {
1135  return (0);
1136  }
1137  }
1138  return (1);
1139 }
1140 
1141 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1142 {
1143  unsigned int i;
1144 
1145  dest->no_addrs = src->no_addrs;
1146 
1147  for (i = 0; i < INTERFACE_MAX; i++) {
1148  totemip_copy (&dest->addr[i], &src->addr[i]);
1149  }
1150 }
1151 
1152 static void srp_addr_to_nodeid (
1153  unsigned int *nodeid_out,
1154  struct srp_addr *srp_addr_in,
1155  unsigned int entries)
1156 {
1157  unsigned int i;
1158 
1159  for (i = 0; i < entries; i++) {
1160  nodeid_out[i] = srp_addr_in[i].addr[0].nodeid;
1161  }
1162 }
1163 
1164 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1165 {
1166  int i;
1167 
1168  for (i = 0; i < INTERFACE_MAX; i++) {
1169  totemip_copy_endian_convert (&out->addr[i], &in->addr[i]);
1170  }
1171 }
1172 
1173 static void memb_consensus_reset (struct totemsrp_instance *instance)
1174 {
1175  instance->consensus_list_entries = 0;
1176 }
1177 
1178 static void memb_set_subtract (
1179  struct srp_addr *out_list, int *out_list_entries,
1180  struct srp_addr *one_list, int one_list_entries,
1181  struct srp_addr *two_list, int two_list_entries)
1182 {
1183  int found = 0;
1184  int i;
1185  int j;
1186 
1187  *out_list_entries = 0;
1188 
1189  for (i = 0; i < one_list_entries; i++) {
1190  for (j = 0; j < two_list_entries; j++) {
1191  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1192  found = 1;
1193  break;
1194  }
1195  }
1196  if (found == 0) {
1197  srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1198  *out_list_entries = *out_list_entries + 1;
1199  }
1200  found = 0;
1201  }
1202 }
1203 
1204 /*
1205  * Set consensus for a specific processor
1206  */
1207 static void memb_consensus_set (
1208  struct totemsrp_instance *instance,
1209  const struct srp_addr *addr)
1210 {
1211  int found = 0;
1212  int i;
1213 
1214  if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
1215  return;
1216 
1217  for (i = 0; i < instance->consensus_list_entries; i++) {
1218  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1219  found = 1;
1220  break; /* found entry */
1221  }
1222  }
1223  srp_addr_copy (&instance->consensus_list[i].addr, addr);
1224  instance->consensus_list[i].set = 1;
1225  if (found == 0) {
1226  instance->consensus_list_entries++;
1227  }
1228  return;
1229 }
1230 
1231 /*
1232  * Is consensus set for a specific processor
1233  */
1234 static int memb_consensus_isset (
1235  struct totemsrp_instance *instance,
1236  const struct srp_addr *addr)
1237 {
1238  int i;
1239 
1240  for (i = 0; i < instance->consensus_list_entries; i++) {
1241  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1242  return (instance->consensus_list[i].set);
1243  }
1244  }
1245  return (0);
1246 }
1247 
1248 /*
1249  * Is consensus agreed upon based upon consensus database
1250  */
1251 static int memb_consensus_agreed (
1252  struct totemsrp_instance *instance)
1253 {
1254  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1255  int token_memb_entries = 0;
1256  int agreed = 1;
1257  int i;
1258 
1259  memb_set_subtract (token_memb, &token_memb_entries,
1260  instance->my_proc_list, instance->my_proc_list_entries,
1261  instance->my_failed_list, instance->my_failed_list_entries);
1262 
1263  for (i = 0; i < token_memb_entries; i++) {
1264  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1265  agreed = 0;
1266  break;
1267  }
1268  }
1269 
1270  if (agreed && instance->failed_to_recv == 1) {
1271  /*
1272  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1273  * will create single ring anyway.
1274  */
1275 
1276  return (agreed);
1277  }
1278 
1279  assert (token_memb_entries >= 1);
1280 
1281  return (agreed);
1282 }
1283 
1284 static void memb_consensus_notset (
1285  struct totemsrp_instance *instance,
1286  struct srp_addr *no_consensus_list,
1287  int *no_consensus_list_entries,
1288  struct srp_addr *comparison_list,
1289  int comparison_list_entries)
1290 {
1291  int i;
1292 
1293  *no_consensus_list_entries = 0;
1294 
1295  for (i = 0; i < instance->my_proc_list_entries; i++) {
1296  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1297  srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1298  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1299  }
1300  }
1301 }
1302 
1303 /*
1304  * Is set1 equal to set2 Entries can be in different orders
1305  */
1306 static int memb_set_equal (
1307  struct srp_addr *set1, int set1_entries,
1308  struct srp_addr *set2, int set2_entries)
1309 {
1310  int i;
1311  int j;
1312 
1313  int found = 0;
1314 
1315  if (set1_entries != set2_entries) {
1316  return (0);
1317  }
1318  for (i = 0; i < set2_entries; i++) {
1319  for (j = 0; j < set1_entries; j++) {
1320  if (srp_addr_equal (&set1[j], &set2[i])) {
1321  found = 1;
1322  break;
1323  }
1324  }
1325  if (found == 0) {
1326  return (0);
1327  }
1328  found = 0;
1329  }
1330  return (1);
1331 }
1332 
1333 /*
1334  * Is subset fully contained in fullset
1335  */
1336 static int memb_set_subset (
1337  const struct srp_addr *subset, int subset_entries,
1338  const struct srp_addr *fullset, int fullset_entries)
1339 {
1340  int i;
1341  int j;
1342  int found = 0;
1343 
1344  if (subset_entries > fullset_entries) {
1345  return (0);
1346  }
1347  for (i = 0; i < subset_entries; i++) {
1348  for (j = 0; j < fullset_entries; j++) {
1349  if (srp_addr_equal (&subset[i], &fullset[j])) {
1350  found = 1;
1351  }
1352  }
1353  if (found == 0) {
1354  return (0);
1355  }
1356  found = 0;
1357  }
1358  return (1);
1359 }
1360 /*
1361  * merge subset into fullset taking care not to add duplicates
1362  */
1363 static void memb_set_merge (
1364  const struct srp_addr *subset, int subset_entries,
1365  struct srp_addr *fullset, int *fullset_entries)
1366 {
1367  int found = 0;
1368  int i;
1369  int j;
1370 
1371  for (i = 0; i < subset_entries; i++) {
1372  for (j = 0; j < *fullset_entries; j++) {
1373  if (srp_addr_equal (&fullset[j], &subset[i])) {
1374  found = 1;
1375  break;
1376  }
1377  }
1378  if (found == 0) {
1379  srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1380  *fullset_entries = *fullset_entries + 1;
1381  }
1382  found = 0;
1383  }
1384  return;
1385 }
1386 
1387 static void memb_set_and_with_ring_id (
1388  struct srp_addr *set1,
1389  struct memb_ring_id *set1_ring_ids,
1390  int set1_entries,
1391  struct srp_addr *set2,
1392  int set2_entries,
1393  struct memb_ring_id *old_ring_id,
1394  struct srp_addr *and,
1395  int *and_entries)
1396 {
1397  int i;
1398  int j;
1399  int found = 0;
1400 
1401  *and_entries = 0;
1402 
1403  for (i = 0; i < set2_entries; i++) {
1404  for (j = 0; j < set1_entries; j++) {
1405  if (srp_addr_equal (&set1[j], &set2[i])) {
1406  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1407  found = 1;
1408  }
1409  break;
1410  }
1411  }
1412  if (found) {
1413  srp_addr_copy (&and[*and_entries], &set1[j]);
1414  *and_entries = *and_entries + 1;
1415  }
1416  found = 0;
1417  }
1418  return;
1419 }
1420 
1421 #ifdef CODE_COVERAGE
1422 static void memb_set_print (
1423  char *string,
1424  struct srp_addr *list,
1425  int list_entries)
1426 {
1427  int i;
1428  int j;
1429  printf ("List '%s' contains %d entries:\n", string, list_entries);
1430 
1431  for (i = 0; i < list_entries; i++) {
1432  printf ("Address %d with %d rings\n", i, list[i].no_addrs);
1433  for (j = 0; j < list[i].no_addrs; j++) {
1434  printf ("\tiface %d %s\n", j, totemip_print (&list[i].addr[j]));
1435  printf ("\tfamily %d\n", list[i].addr[j].family);
1436  }
1437  }
1438 }
1439 #endif
1440 
1441 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1442 {
1443  assert (instance != NULL);
1444  return totemrrp_buffer_alloc (instance->totemrrp_context);
1445 }
1446 
1447 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1448 {
1449  assert (instance != NULL);
1450  totemrrp_buffer_release (instance->totemrrp_context, ptr);
1451 }
1452 
1453 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1454 {
1455  qb_loop_timer_del (instance->totemsrp_poll_handle,
1457  qb_loop_timer_add (instance->totemsrp_poll_handle,
1458  QB_LOOP_MED,
1459  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1460  (void *)instance,
1461  timer_function_token_retransmit_timeout,
1463 
1464 }
1465 
1466 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1467 {
1468  if (instance->my_merge_detect_timeout_outstanding == 0) {
1469  qb_loop_timer_add (instance->totemsrp_poll_handle,
1470  QB_LOOP_MED,
1471  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1472  (void *)instance,
1473  timer_function_merge_detect_timeout,
1474  &instance->timer_merge_detect_timeout);
1475 
1477  }
1478 }
1479 
1480 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1481 {
1482  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1484 }
1485 
1486 /*
1487  * ring_state_* is used to save and restore the sort queue
1488  * state when a recovery operation fails (and enters gather)
1489  */
1490 static void old_ring_state_save (struct totemsrp_instance *instance)
1491 {
1492  if (instance->old_ring_state_saved == 0) {
1493  instance->old_ring_state_saved = 1;
1494  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1495  sizeof (struct memb_ring_id));
1496  instance->old_ring_state_aru = instance->my_aru;
1499  "Saving state aru %x high seq received %x",
1500  instance->my_aru, instance->my_high_seq_received);
1501  }
1502 }
1503 
1504 static void old_ring_state_restore (struct totemsrp_instance *instance)
1505 {
1506  instance->my_aru = instance->old_ring_state_aru;
1509  "Restoring instance->my_aru %x my high seq received %x",
1510  instance->my_aru, instance->my_high_seq_received);
1511 }
1512 
1513 static void old_ring_state_reset (struct totemsrp_instance *instance)
1514 {
1516  "Resetting old ring state");
1517  instance->old_ring_state_saved = 0;
1518 }
1519 
1520 static void reset_pause_timeout (struct totemsrp_instance *instance)
1521 {
1522  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1523  qb_loop_timer_add (instance->totemsrp_poll_handle,
1524  QB_LOOP_MED,
1525  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1526  (void *)instance,
1527  timer_function_pause_timeout,
1528  &instance->timer_pause_timeout);
1529 }
1530 
1531 static void reset_token_timeout (struct totemsrp_instance *instance) {
1532  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1533  qb_loop_timer_add (instance->totemsrp_poll_handle,
1534  QB_LOOP_MED,
1535  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1536  (void *)instance,
1537  timer_function_orf_token_timeout,
1538  &instance->timer_orf_token_timeout);
1539 }
1540 
1541 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1542  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1543  qb_loop_timer_add (instance->totemsrp_poll_handle,
1544  QB_LOOP_MED,
1545  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1546  (void *)instance,
1547  timer_function_heartbeat_timeout,
1548  &instance->timer_heartbeat_timeout);
1549 }
1550 
1551 
1552 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1553  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1554 }
1555 
1556 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1557  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1558 }
1559 
1560 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1561 {
1562  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1563 }
1564 
1565 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1566 {
1567  qb_loop_timer_add (instance->totemsrp_poll_handle,
1568  QB_LOOP_MED,
1569  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1570  (void *)instance,
1571  timer_function_token_hold_retransmit_timeout,
1573 }
1574 
1575 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1576 {
1577  qb_loop_timer_del (instance->totemsrp_poll_handle,
1579 }
1580 
1581 static void memb_state_consensus_timeout_expired (
1582  struct totemsrp_instance *instance)
1583 {
1584  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1585  int no_consensus_list_entries;
1586 
1587  instance->stats.consensus_timeouts++;
1588  if (memb_consensus_agreed (instance)) {
1589  memb_consensus_reset (instance);
1590 
1591  memb_consensus_set (instance, &instance->my_id);
1592 
1593  reset_token_timeout (instance); // REVIEWED
1594  } else {
1595  memb_consensus_notset (
1596  instance,
1597  no_consensus_list,
1598  &no_consensus_list_entries,
1599  instance->my_proc_list,
1600  instance->my_proc_list_entries);
1601 
1602  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1603  instance->my_failed_list, &instance->my_failed_list_entries);
1604  memb_state_gather_enter (instance, 0);
1605  }
1606 }
1607 
1608 static void memb_join_message_send (struct totemsrp_instance *instance);
1609 
1610 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1611 
1612 /*
1613  * Timers used for various states of the membership algorithm
1614  */
1615 static void timer_function_pause_timeout (void *data)
1616 {
1617  struct totemsrp_instance *instance = data;
1618 
1619  instance->pause_timestamp = qb_util_nano_current_get ();
1620  reset_pause_timeout (instance);
1621 }
1622 
1623 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1624 {
1625  old_ring_state_restore (instance);
1626  memb_state_gather_enter (instance, 5);
1627  instance->stats.recovery_token_lost++;
1628 }
1629 
1630 static void timer_function_orf_token_timeout (void *data)
1631 {
1632  struct totemsrp_instance *instance = data;
1633 
1634  switch (instance->memb_state) {
1637  "The token was lost in the OPERATIONAL state.");
1639  "A processor failed, forming new configuration.");
1641  memb_state_gather_enter (instance, 2);
1642  instance->stats.operational_token_lost++;
1643  break;
1644 
1645  case MEMB_STATE_GATHER:
1647  "The consensus timeout expired.");
1648  memb_state_consensus_timeout_expired (instance);
1649  memb_state_gather_enter (instance, 3);
1650  instance->stats.gather_token_lost++;
1651  break;
1652 
1653  case MEMB_STATE_COMMIT:
1655  "The token was lost in the COMMIT state.");
1656  memb_state_gather_enter (instance, 4);
1657  instance->stats.commit_token_lost++;
1658  break;
1659 
1660  case MEMB_STATE_RECOVERY:
1662  "The token was lost in the RECOVERY state.");
1663  memb_recovery_state_token_loss (instance);
1664  instance->orf_token_discard = 1;
1665  break;
1666  }
1667 }
1668 
1669 static void timer_function_heartbeat_timeout (void *data)
1670 {
1671  struct totemsrp_instance *instance = data;
1673  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1674  timer_function_orf_token_timeout(data);
1675 }
1676 
1677 static void memb_timer_function_state_gather (void *data)
1678 {
1679  struct totemsrp_instance *instance = data;
1680 
1681  switch (instance->memb_state) {
1683  case MEMB_STATE_RECOVERY:
1684  assert (0); /* this should never happen */
1685  break;
1686  case MEMB_STATE_GATHER:
1687  case MEMB_STATE_COMMIT:
1688  memb_join_message_send (instance);
1689 
1690  /*
1691  * Restart the join timeout
1692  `*/
1693  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1694 
1695  qb_loop_timer_add (instance->totemsrp_poll_handle,
1696  QB_LOOP_MED,
1697  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1698  (void *)instance,
1699  memb_timer_function_state_gather,
1701  break;
1702  }
1703 }
1704 
1705 static void memb_timer_function_gather_consensus_timeout (void *data)
1706 {
1707  struct totemsrp_instance *instance = data;
1708  memb_state_consensus_timeout_expired (instance);
1709 }
1710 
1711 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1712 {
1713  unsigned int i;
1714  struct sort_queue_item *recovery_message_item;
1715  struct sort_queue_item regular_message_item;
1716  unsigned int range = 0;
1717  int res;
1718  void *ptr;
1719  struct mcast *mcast;
1720 
1722  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1723 
1724  range = instance->my_aru - SEQNO_START_MSG;
1725  /*
1726  * Move messages from recovery to regular sort queue
1727  */
1728 // todo should i be initialized to 0 or 1 ?
1729  for (i = 1; i <= range; i++) {
1730  res = sq_item_get (&instance->recovery_sort_queue,
1731  i + SEQNO_START_MSG, &ptr);
1732  if (res != 0) {
1733  continue;
1734  }
1735  recovery_message_item = ptr;
1736 
1737  /*
1738  * Convert recovery message into regular message
1739  */
1740  mcast = recovery_message_item->mcast;
1741  if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1742  /*
1743  * Message is a recovery message encapsulated
1744  * in a new ring message
1745  */
1746  regular_message_item.mcast =
1747  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1748  regular_message_item.msg_len =
1749  recovery_message_item->msg_len - sizeof (struct mcast);
1750  mcast = regular_message_item.mcast;
1751  } else {
1752  /*
1753  * TODO this case shouldn't happen
1754  */
1755  continue;
1756  }
1757 
1759  "comparing if ring id is for this processors old ring seqno %d",
1760  mcast->seq);
1761 
1762  /*
1763  * Only add this message to the regular sort
1764  * queue if it was originated with the same ring
1765  * id as the previous ring
1766  */
1767  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1768  sizeof (struct memb_ring_id)) == 0) {
1769 
1770  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1771  if (res == 0) {
1772  sq_item_add (&instance->regular_sort_queue,
1773  &regular_message_item, mcast->seq);
1774  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1775  instance->old_ring_state_high_seq_received = mcast->seq;
1776  }
1777  }
1778  } else {
1780  "-not adding msg with seq no %x", mcast->seq);
1781  }
1782  }
1783 }
1784 
1785 /*
1786  * Change states in the state machine of the membership algorithm
1787  */
1788 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1789 {
1790  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1791  int joined_list_entries = 0;
1792  unsigned int aru_save;
1793  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1794  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1795  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1796  unsigned int left_list[PROCESSOR_COUNT_MAX];
1797  unsigned int i;
1798  unsigned int res;
1799  char left_node_msg[1024];
1800  char joined_node_msg[1024];
1801 
1802  memb_consensus_reset (instance);
1803 
1804  old_ring_state_reset (instance);
1805 
1806  deliver_messages_from_recovery_to_regular (instance);
1807 
1809  "Delivering to app %x to %x",
1810  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1811 
1812  aru_save = instance->my_aru;
1813  instance->my_aru = instance->old_ring_state_aru;
1814 
1815  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1816 
1817  /*
1818  * Calculate joined and left list
1819  */
1820  memb_set_subtract (instance->my_left_memb_list,
1821  &instance->my_left_memb_entries,
1822  instance->my_memb_list, instance->my_memb_entries,
1823  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1824 
1825  memb_set_subtract (joined_list, &joined_list_entries,
1826  instance->my_new_memb_list, instance->my_new_memb_entries,
1827  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1828 
1829  /*
1830  * Install new membership
1831  */
1832  instance->my_memb_entries = instance->my_new_memb_entries;
1833  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1834  sizeof (struct srp_addr) * instance->my_memb_entries);
1835  instance->last_released = 0;
1836  instance->my_set_retrans_flg = 0;
1837 
1838  /*
1839  * Deliver transitional configuration to application
1840  */
1841  srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
1842  instance->my_left_memb_entries);
1843  srp_addr_to_nodeid (trans_memb_list_totemip,
1844  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1846  trans_memb_list_totemip, instance->my_trans_memb_entries,
1847  left_list, instance->my_left_memb_entries,
1848  0, 0, &instance->my_ring_id);
1849  instance->waiting_trans_ack = 1;
1850  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1851 
1852 // TODO we need to filter to ensure we only deliver those
1853 // messages which are part of instance->my_deliver_memb
1854  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1855 
1856  instance->my_aru = aru_save;
1857 
1858  /*
1859  * Deliver regular configuration to application
1860  */
1861  srp_addr_to_nodeid (new_memb_list_totemip,
1862  instance->my_new_memb_list, instance->my_new_memb_entries);
1863  srp_addr_to_nodeid (joined_list_totemip, joined_list,
1864  joined_list_entries);
1866  new_memb_list_totemip, instance->my_new_memb_entries,
1867  0, 0,
1868  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
1869 
1870  /*
1871  * The recovery sort queue now becomes the regular
1872  * sort queue. It is necessary to copy the state
1873  * into the regular sort queue.
1874  */
1875  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
1876  instance->my_last_aru = SEQNO_START_MSG;
1877 
1878  /* When making my_proc_list smaller, ensure that the
1879  * now non-used entries are zero-ed out. There are some suspect
1880  * assert's that assume that there is always 2 entries in the list.
1881  * These fail when my_proc_list is reduced to 1 entry (and the
1882  * valid [0] entry is the same as the 'unused' [1] entry).
1883  */
1884  memset(instance->my_proc_list, 0,
1885  sizeof (struct srp_addr) * instance->my_proc_list_entries);
1886 
1887  instance->my_proc_list_entries = instance->my_new_memb_entries;
1888  memcpy (instance->my_proc_list, instance->my_new_memb_list,
1889  sizeof (struct srp_addr) * instance->my_memb_entries);
1890 
1891  instance->my_failed_list_entries = 0;
1892  /*
1893  * TODO Not exactly to spec
1894  *
1895  * At the entry to this function all messages without a gap are
1896  * deliered.
1897  *
1898  * This code throw away messages from the last gap in the sort queue
1899  * to my_high_seq_received
1900  *
1901  * What should really happen is we should deliver all messages up to
1902  * a gap, then delier the transitional configuration, then deliver
1903  * the messages between the first gap and my_high_seq_received, then
1904  * deliver a regular configuration, then deliver the regular
1905  * configuration
1906  *
1907  * Unfortunately totempg doesn't appear to like this operating mode
1908  * which needs more inspection
1909  */
1910  i = instance->my_high_seq_received + 1;
1911  do {
1912  void *ptr;
1913 
1914  i -= 1;
1915  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
1916  if (i == 0) {
1917  break;
1918  }
1919  } while (res);
1920 
1921  instance->my_high_delivered = i;
1922 
1923  for (i = 0; i <= instance->my_high_delivered; i++) {
1924  void *ptr;
1925 
1926  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
1927  if (res == 0) {
1928  struct sort_queue_item *regular_message;
1929 
1930  regular_message = ptr;
1931  free (regular_message->mcast);
1932  }
1933  }
1934  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
1935  instance->last_released = instance->my_high_delivered;
1936 
1937  if (joined_list_entries) {
1938  int sptr = 0;
1939  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
1940  for (i=0; i< joined_list_entries; i++) {
1941  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %d", joined_list_totemip[i]);
1942  }
1943  }
1944  else {
1945  joined_node_msg[0] = '\0';
1946  }
1947 
1948  if (instance->my_left_memb_entries) {
1949  int sptr = 0;
1950  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
1951  for (i=0; i< instance->my_left_memb_entries; i++) {
1952  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %d", left_list[i]);
1953  }
1954  }
1955  else {
1956  left_node_msg[0] = '\0';
1957  }
1958 
1960  "entering OPERATIONAL state.");
1962  "A new membership (%s:%lld) was formed. Members%s%s",
1963  totemip_print (&instance->my_ring_id.rep),
1964  instance->my_ring_id.seq,
1965  joined_node_msg,
1966  left_node_msg);
1967  instance->memb_state = MEMB_STATE_OPERATIONAL;
1968 
1969  instance->stats.operational_entered++;
1970  instance->stats.continuous_gather = 0;
1971 
1972  instance->my_received_flg = 1;
1973 
1974  reset_pause_timeout (instance);
1975 
1976  /*
1977  * Save ring id information from this configuration to determine
1978  * which processors are transitioning from old regular configuration
1979  * in to new regular configuration on the next configuration change
1980  */
1981  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1982  sizeof (struct memb_ring_id));
1983 
1984  return;
1985 }
1986 
1987 static void memb_state_gather_enter (
1988  struct totemsrp_instance *instance,
1989  int gather_from)
1990 {
1991  instance->orf_token_discard = 1;
1992 
1993  memb_set_merge (
1994  &instance->my_id, 1,
1995  instance->my_proc_list, &instance->my_proc_list_entries);
1996 
1997  memb_join_message_send (instance);
1998 
1999  /*
2000  * Restart the join timeout
2001  */
2002  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2003 
2004  qb_loop_timer_add (instance->totemsrp_poll_handle,
2005  QB_LOOP_MED,
2006  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2007  (void *)instance,
2008  memb_timer_function_state_gather,
2010 
2011  /*
2012  * Restart the consensus timeout
2013  */
2014  qb_loop_timer_del (instance->totemsrp_poll_handle,
2016 
2017  qb_loop_timer_add (instance->totemsrp_poll_handle,
2018  QB_LOOP_MED,
2019  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2020  (void *)instance,
2021  memb_timer_function_gather_consensus_timeout,
2023 
2024  /*
2025  * Cancel the token loss and token retransmission timeouts
2026  */
2027  cancel_token_retransmit_timeout (instance); // REVIEWED
2028  cancel_token_timeout (instance); // REVIEWED
2029  cancel_merge_detect_timeout (instance);
2030 
2031  memb_consensus_reset (instance);
2032 
2033  memb_consensus_set (instance, &instance->my_id);
2034 
2036  "entering GATHER state from %d.", gather_from);
2037 
2038  instance->memb_state = MEMB_STATE_GATHER;
2039  instance->stats.gather_entered++;
2040 
2041  if (gather_from == 3) {
2042  /*
2043  * State 3 means gather, so we are continuously gathering.
2044  */
2045  instance->stats.continuous_gather++;
2046  }
2047 
2048  return;
2049 }
2050 
2051 static void timer_function_token_retransmit_timeout (void *data);
2052 
2053 static void target_set_completed (
2054  void *context)
2055 {
2056  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2057 
2058  memb_state_commit_token_send (instance);
2059 
2060 }
2061 
2062 static void memb_state_commit_enter (
2063  struct totemsrp_instance *instance)
2064 {
2065  old_ring_state_save (instance);
2066 
2067  memb_state_commit_token_update (instance);
2068 
2069  memb_state_commit_token_target_set (instance);
2070 
2071  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2072 
2074 
2075  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2076 
2078 
2079  memb_ring_id_set_and_store (instance, &instance->commit_token->ring_id);
2080 
2081  instance->token_ring_id_seq = instance->my_ring_id.seq;
2082 
2084  "entering COMMIT state.");
2085 
2086  instance->memb_state = MEMB_STATE_COMMIT;
2087  reset_token_retransmit_timeout (instance); // REVIEWED
2088  reset_token_timeout (instance); // REVIEWED
2089 
2090  instance->stats.commit_entered++;
2091  instance->stats.continuous_gather = 0;
2092 
2093  /*
2094  * reset all flow control variables since we are starting a new ring
2095  */
2096  instance->my_trc = 0;
2097  instance->my_pbl = 0;
2098  instance->my_cbl = 0;
2099  /*
2100  * commit token sent after callback that token target has been set
2101  */
2102 }
2103 
2104 static void memb_state_recovery_enter (
2105  struct totemsrp_instance *instance,
2107 {
2108  int i;
2109  int local_received_flg = 1;
2110  unsigned int low_ring_aru;
2111  unsigned int range = 0;
2112  unsigned int messages_originated = 0;
2113  const struct srp_addr *addr;
2114  struct memb_commit_token_memb_entry *memb_list;
2115  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2116 
2117  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2118  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2119 
2121  "entering RECOVERY state.");
2122 
2123  instance->orf_token_discard = 0;
2124 
2125  instance->my_high_ring_delivered = 0;
2126 
2127  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2128  cs_queue_reinit (&instance->retrans_message_queue);
2129 
2130  low_ring_aru = instance->old_ring_state_high_seq_received;
2131 
2132  memb_state_commit_token_send_recovery (instance, commit_token);
2133 
2134  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2135 
2136  /*
2137  * Build regular configuration
2138  */
2140  instance->totemrrp_context,
2141  commit_token->addr_entries);
2142 
2143  /*
2144  * Build transitional configuration
2145  */
2146  for (i = 0; i < instance->my_new_memb_entries; i++) {
2147  memcpy (&my_new_memb_ring_id_list[i],
2148  &memb_list[i].ring_id,
2149  sizeof (struct memb_ring_id));
2150  }
2151  memb_set_and_with_ring_id (
2152  instance->my_new_memb_list,
2153  my_new_memb_ring_id_list,
2154  instance->my_new_memb_entries,
2155  instance->my_memb_list,
2156  instance->my_memb_entries,
2157  &instance->my_old_ring_id,
2158  instance->my_trans_memb_list,
2159  &instance->my_trans_memb_entries);
2160 
2161  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2163  "TRANS [%d] member %s:", i, totemip_print (&instance->my_trans_memb_list[i].addr[0]));
2164  }
2165  for (i = 0; i < instance->my_new_memb_entries; i++) {
2167  "position [%d] member %s:", i, totemip_print (&addr[i].addr[0]));
2169  "previous ring seq %llx rep %s",
2170  memb_list[i].ring_id.seq,
2171  totemip_print (&memb_list[i].ring_id.rep));
2172 
2174  "aru %x high delivered %x received flag %d",
2175  memb_list[i].aru,
2176  memb_list[i].high_delivered,
2177  memb_list[i].received_flg);
2178 
2179  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2180  }
2181  /*
2182  * Determine if any received flag is false
2183  */
2184  for (i = 0; i < commit_token->addr_entries; i++) {
2185  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2186  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2187 
2188  memb_list[i].received_flg == 0) {
2189  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2190  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2191  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2192  local_received_flg = 0;
2193  break;
2194  }
2195  }
2196  if (local_received_flg == 1) {
2197  goto no_originate;
2198  } /* Else originate messages if we should */
2199 
2200  /*
2201  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2202  */
2203  for (i = 0; i < commit_token->addr_entries; i++) {
2204  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2205  instance->my_deliver_memb_list,
2206  instance->my_deliver_memb_entries) &&
2207 
2208  memcmp (&instance->my_old_ring_id,
2209  &memb_list[i].ring_id,
2210  sizeof (struct memb_ring_id)) == 0) {
2211 
2212  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2213 
2214  low_ring_aru = memb_list[i].aru;
2215  }
2216  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2217  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2218  }
2219  }
2220  }
2221 
2222  /*
2223  * Copy all old ring messages to instance->retrans_message_queue
2224  */
2225  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2226  if (range == 0) {
2227  /*
2228  * No messages to copy
2229  */
2230  goto no_originate;
2231  }
2232  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2233 
2235  "copying all old ring messages from %x-%x.",
2236  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2237 
2238  for (i = 1; i <= range; i++) {
2240  struct message_item message_item;
2241  void *ptr;
2242  int res;
2243 
2244  res = sq_item_get (&instance->regular_sort_queue,
2245  low_ring_aru + i, &ptr);
2246  if (res != 0) {
2247  continue;
2248  }
2249  sort_queue_item = ptr;
2250  messages_originated++;
2251  memset (&message_item, 0, sizeof (struct message_item));
2252  // TODO LEAK
2253  message_item.mcast = totemsrp_buffer_alloc (instance);
2254  assert (message_item.mcast);
2256  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2258  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2259  assert (message_item.mcast->header.nodeid);
2261  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2262  sizeof (struct memb_ring_id));
2263  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2264  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2265  sort_queue_item->mcast,
2266  sort_queue_item->msg_len);
2267  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2268  }
2270  "Originated %d messages in RECOVERY.", messages_originated);
2271  goto originated;
2272 
2273 no_originate:
2275  "Did not need to originate any messages in recovery.");
2276 
2277 originated:
2278  instance->my_aru = SEQNO_START_MSG;
2279  instance->my_aru_count = 0;
2280  instance->my_seq_unchanged = 0;
2282  instance->my_install_seq = SEQNO_START_MSG;
2283  instance->last_released = SEQNO_START_MSG;
2284 
2285  reset_token_timeout (instance); // REVIEWED
2286  reset_token_retransmit_timeout (instance); // REVIEWED
2287 
2288  instance->memb_state = MEMB_STATE_RECOVERY;
2289  instance->stats.recovery_entered++;
2290  instance->stats.continuous_gather = 0;
2291 
2292  return;
2293 }
2294 
2295 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2296 {
2297  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2298 
2299  token_hold_cancel_send (instance);
2300 
2301  return;
2302 }
2303 
2305  void *srp_context,
2306  struct iovec *iovec,
2307  unsigned int iov_len,
2308  int guarantee)
2309 {
2310  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2311  int i;
2312  struct message_item message_item;
2313  char *addr;
2314  unsigned int addr_idx;
2315  struct cs_queue *queue_use;
2316 
2317  if (instance->waiting_trans_ack) {
2318  queue_use = &instance->new_message_queue_trans;
2319  } else {
2320  queue_use = &instance->new_message_queue;
2321  }
2322 
2323  if (cs_queue_is_full (queue_use)) {
2324  log_printf (instance->totemsrp_log_level_debug, "queue full");
2325  return (-1);
2326  }
2327 
2328  memset (&message_item, 0, sizeof (struct message_item));
2329 
2330  /*
2331  * Allocate pending item
2332  */
2333  message_item.mcast = totemsrp_buffer_alloc (instance);
2334  if (message_item.mcast == 0) {
2335  goto error_mcast;
2336  }
2337 
2338  /*
2339  * Set mcast header
2340  */
2341  memset(message_item.mcast, 0, sizeof (struct mcast));
2342  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2343  message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2345  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2346  assert (message_item.mcast->header.nodeid);
2347 
2348  message_item.mcast->guarantee = guarantee;
2349  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2350 
2351  addr = (char *)message_item.mcast;
2352  addr_idx = sizeof (struct mcast);
2353  for (i = 0; i < iov_len; i++) {
2354  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2355  addr_idx += iovec[i].iov_len;
2356  }
2357 
2358  message_item.msg_len = addr_idx;
2359 
2360  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2361  instance->stats.mcast_tx++;
2362  cs_queue_item_add (queue_use, &message_item);
2363 
2364  return (0);
2365 
2366 error_mcast:
2367  return (-1);
2368 }
2369 
2370 /*
2371  * Determine if there is room to queue a new message
2372  */
2373 int totemsrp_avail (void *srp_context)
2374 {
2375  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2376  int avail;
2377  struct cs_queue *queue_use;
2378 
2379  if (instance->waiting_trans_ack) {
2380  queue_use = &instance->new_message_queue_trans;
2381  } else {
2382  queue_use = &instance->new_message_queue;
2383  }
2384  cs_queue_avail (queue_use, &avail);
2385 
2386  return (avail);
2387 }
2388 
2389 /*
2390  * ORF Token Management
2391  */
2392 /*
2393  * Recast message to mcast group if it is available
2394  */
2395 static int orf_token_remcast (
2396  struct totemsrp_instance *instance,
2397  int seq)
2398 {
2399  struct sort_queue_item *sort_queue_item;
2400  int res;
2401  void *ptr;
2402 
2403  struct sq *sort_queue;
2404 
2405  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2406  sort_queue = &instance->recovery_sort_queue;
2407  } else {
2408  sort_queue = &instance->regular_sort_queue;
2409  }
2410 
2411  res = sq_in_range (sort_queue, seq);
2412  if (res == 0) {
2413  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2414  return (-1);
2415  }
2416 
2417  /*
2418  * Get RTR item at seq, if not available, return
2419  */
2420  res = sq_item_get (sort_queue, seq, &ptr);
2421  if (res != 0) {
2422  return -1;
2423  }
2424 
2425  sort_queue_item = ptr;
2426 
2428  instance->totemrrp_context,
2429  sort_queue_item->mcast,
2430  sort_queue_item->msg_len);
2431 
2432  return (0);
2433 }
2434 
2435 
2436 /*
2437  * Free all freeable messages from ring
2438  */
2439 static void messages_free (
2440  struct totemsrp_instance *instance,
2441  unsigned int token_aru)
2442 {
2443  struct sort_queue_item *regular_message;
2444  unsigned int i;
2445  int res;
2446  int log_release = 0;
2447  unsigned int release_to;
2448  unsigned int range = 0;
2449 
2450  release_to = token_aru;
2451  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2452  release_to = instance->my_last_aru;
2453  }
2454  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2455  release_to = instance->my_high_delivered;
2456  }
2457 
2458  /*
2459  * Ensure we dont try release before an already released point
2460  */
2461  if (sq_lt_compare (release_to, instance->last_released)) {
2462  return;
2463  }
2464 
2465  range = release_to - instance->last_released;
2466  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2467 
2468  /*
2469  * Release retransmit list items if group aru indicates they are transmitted
2470  */
2471  for (i = 1; i <= range; i++) {
2472  void *ptr;
2473 
2474  res = sq_item_get (&instance->regular_sort_queue,
2475  instance->last_released + i, &ptr);
2476  if (res == 0) {
2477  regular_message = ptr;
2478  totemsrp_buffer_release (instance, regular_message->mcast);
2479  }
2480  sq_items_release (&instance->regular_sort_queue,
2481  instance->last_released + i);
2482 
2483  log_release = 1;
2484  }
2485  instance->last_released += range;
2486 
2487  if (log_release) {
2489  "releasing messages up to and including %x", release_to);
2490  }
2491 }
2492 
2493 static void update_aru (
2494  struct totemsrp_instance *instance)
2495 {
2496  unsigned int i;
2497  int res;
2498  struct sq *sort_queue;
2499  unsigned int range;
2500  unsigned int my_aru_saved = 0;
2501 
2502  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2503  sort_queue = &instance->recovery_sort_queue;
2504  } else {
2505  sort_queue = &instance->regular_sort_queue;
2506  }
2507 
2508  range = instance->my_high_seq_received - instance->my_aru;
2509 
2510  my_aru_saved = instance->my_aru;
2511  for (i = 1; i <= range; i++) {
2512 
2513  void *ptr;
2514 
2515  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2516  /*
2517  * If hole, stop updating aru
2518  */
2519  if (res != 0) {
2520  break;
2521  }
2522  }
2523  instance->my_aru += i - 1;
2524 }
2525 
2526 /*
2527  * Multicasts pending messages onto the ring (requires orf_token possession)
2528  */
2529 static int orf_token_mcast (
2530  struct totemsrp_instance *instance,
2531  struct orf_token *token,
2532  int fcc_mcasts_allowed)
2533 {
2534  struct message_item *message_item = 0;
2535  struct cs_queue *mcast_queue;
2536  struct sq *sort_queue;
2537  struct sort_queue_item sort_queue_item;
2538  struct mcast *mcast;
2539  unsigned int fcc_mcast_current;
2540 
2541  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2542  mcast_queue = &instance->retrans_message_queue;
2543  sort_queue = &instance->recovery_sort_queue;
2544  reset_token_retransmit_timeout (instance); // REVIEWED
2545  } else {
2546  if (instance->waiting_trans_ack) {
2547  mcast_queue = &instance->new_message_queue_trans;
2548  } else {
2549  mcast_queue = &instance->new_message_queue;
2550  }
2551 
2552  sort_queue = &instance->regular_sort_queue;
2553  }
2554 
2555  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2556  if (cs_queue_is_empty (mcast_queue)) {
2557  break;
2558  }
2559  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2560 
2561  message_item->mcast->seq = ++token->seq;
2562  message_item->mcast->this_seqno = instance->global_seqno++;
2563 
2564  /*
2565  * Build IO vector
2566  */
2567  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2568  sort_queue_item.mcast = message_item->mcast;
2569  sort_queue_item.msg_len = message_item->msg_len;
2570 
2571  mcast = sort_queue_item.mcast;
2572 
2573  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2574 
2575  /*
2576  * Add message to retransmit queue
2577  */
2578  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2579 
2581  instance->totemrrp_context,
2582  message_item->mcast,
2583  message_item->msg_len);
2584 
2585  /*
2586  * Delete item from pending queue
2587  */
2588  cs_queue_item_remove (mcast_queue);
2589 
2590  /*
2591  * If messages mcasted, deliver any new messages to totempg
2592  */
2593  instance->my_high_seq_received = token->seq;
2594  }
2595 
2596  update_aru (instance);
2597 
2598  /*
2599  * Return 1 if more messages are available for single node clusters
2600  */
2601  return (fcc_mcast_current);
2602 }
2603 
2604 /*
2605  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2606  * Modify's orf_token's rtr to include retransmits required by this process
2607  */
2608 static int orf_token_rtr (
2609  struct totemsrp_instance *instance,
2610  struct orf_token *orf_token,
2611  unsigned int *fcc_allowed)
2612 {
2613  unsigned int res;
2614  unsigned int i, j;
2615  unsigned int found;
2616  struct sq *sort_queue;
2617  struct rtr_item *rtr_list;
2618  unsigned int range = 0;
2619  char retransmit_msg[1024];
2620  char value[64];
2621 
2622  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2623  sort_queue = &instance->recovery_sort_queue;
2624  } else {
2625  sort_queue = &instance->regular_sort_queue;
2626  }
2627 
2628  rtr_list = &orf_token->rtr_list[0];
2629 
2630  strcpy (retransmit_msg, "Retransmit List: ");
2631  if (orf_token->rtr_list_entries) {
2633  "Retransmit List %d", orf_token->rtr_list_entries);
2634  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2635  sprintf (value, "%x ", rtr_list[i].seq);
2636  strcat (retransmit_msg, value);
2637  }
2638  strcat (retransmit_msg, "");
2640  "%s", retransmit_msg);
2641  }
2642 
2643  /*
2644  * Retransmit messages on orf_token's RTR list from RTR queue
2645  */
2646  for (instance->fcc_remcast_current = 0, i = 0;
2647  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2648 
2649  /*
2650  * If this retransmit request isn't from this configuration,
2651  * try next rtr entry
2652  */
2653  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2654  sizeof (struct memb_ring_id)) != 0) {
2655 
2656  i += 1;
2657  continue;
2658  }
2659 
2660  res = orf_token_remcast (instance, rtr_list[i].seq);
2661  if (res == 0) {
2662  /*
2663  * Multicasted message, so no need to copy to new retransmit list
2664  */
2665  orf_token->rtr_list_entries -= 1;
2666  assert (orf_token->rtr_list_entries >= 0);
2667  memmove (&rtr_list[i], &rtr_list[i + 1],
2668  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2669 
2670  instance->stats.mcast_retx++;
2671  instance->fcc_remcast_current++;
2672  } else {
2673  i += 1;
2674  }
2675  }
2676  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2677 
2678  /*
2679  * Add messages to retransmit to RTR list
2680  * but only retry if there is room in the retransmit list
2681  */
2682 
2683  range = orf_token->seq - instance->my_aru;
2684  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2685 
2686  for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2687  (i <= range); i++) {
2688 
2689  /*
2690  * Ensure message is within the sort queue range
2691  */
2692  res = sq_in_range (sort_queue, instance->my_aru + i);
2693  if (res == 0) {
2694  break;
2695  }
2696 
2697  /*
2698  * Find if a message is missing from this processor
2699  */
2700  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2701  if (res == 0) {
2702  /*
2703  * Determine how many times we have missed receiving
2704  * this sequence number. sq_item_miss_count increments
2705  * a counter for the sequence number. The miss count
2706  * will be returned and compared. This allows time for
2707  * delayed multicast messages to be received before
2708  * declaring the message is missing and requesting a
2709  * retransmit.
2710  */
2711  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2712  if (res < instance->totem_config->miss_count_const) {
2713  continue;
2714  }
2715 
2716  /*
2717  * Determine if missing message is already in retransmit list
2718  */
2719  found = 0;
2720  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2721  if (instance->my_aru + i == rtr_list[j].seq) {
2722  found = 1;
2723  }
2724  }
2725  if (found == 0) {
2726  /*
2727  * Missing message not found in current retransmit list so add it
2728  */
2729  memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2730  &instance->my_ring_id, sizeof (struct memb_ring_id));
2731  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2732  orf_token->rtr_list_entries++;
2733  }
2734  }
2735  }
2736  return (instance->fcc_remcast_current);
2737 }
2738 
2739 static void token_retransmit (struct totemsrp_instance *instance)
2740 {
2742  instance->orf_token_retransmit,
2743  instance->orf_token_retransmit_size);
2744 }
2745 
2746 /*
2747  * Retransmit the regular token if no mcast or token has
2748  * been received in retransmit token period retransmit
2749  * the token to the next processor
2750  */
2751 static void timer_function_token_retransmit_timeout (void *data)
2752 {
2753  struct totemsrp_instance *instance = data;
2754 
2755  switch (instance->memb_state) {
2756  case MEMB_STATE_GATHER:
2757  break;
2758  case MEMB_STATE_COMMIT:
2760  case MEMB_STATE_RECOVERY:
2761  token_retransmit (instance);
2762  reset_token_retransmit_timeout (instance); // REVIEWED
2763  break;
2764  }
2765 }
2766 
2767 static void timer_function_token_hold_retransmit_timeout (void *data)
2768 {
2769  struct totemsrp_instance *instance = data;
2770 
2771  switch (instance->memb_state) {
2772  case MEMB_STATE_GATHER:
2773  break;
2774  case MEMB_STATE_COMMIT:
2775  break;
2777  case MEMB_STATE_RECOVERY:
2778  token_retransmit (instance);
2779  break;
2780  }
2781 }
2782 
2783 static void timer_function_merge_detect_timeout(void *data)
2784 {
2785  struct totemsrp_instance *instance = data;
2786 
2788 
2789  switch (instance->memb_state) {
2791  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
2792  memb_merge_detect_transmit (instance);
2793  }
2794  break;
2795  case MEMB_STATE_GATHER:
2796  case MEMB_STATE_COMMIT:
2797  case MEMB_STATE_RECOVERY:
2798  break;
2799  }
2800 }
2801 
2802 /*
2803  * Send orf_token to next member (requires orf_token)
2804  */
2805 static int token_send (
2806  struct totemsrp_instance *instance,
2807  struct orf_token *orf_token,
2808  int forward_token)
2809 {
2810  int res = 0;
2811  unsigned int orf_token_size;
2812 
2813  orf_token_size = sizeof (struct orf_token) +
2814  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2815 
2816  orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
2817  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
2818  instance->orf_token_retransmit_size = orf_token_size;
2819  assert (orf_token->header.nodeid);
2820 
2821  if (forward_token == 0) {
2822  return (0);
2823  }
2824 
2826  orf_token,
2827  orf_token_size);
2828 
2829  return (res);
2830 }
2831 
2832 static int token_hold_cancel_send (struct totemsrp_instance *instance)
2833 {
2835 
2836  /*
2837  * Only cancel if the token is currently held
2838  */
2839  if (instance->my_token_held == 0) {
2840  return (0);
2841  }
2842  instance->my_token_held = 0;
2843 
2844  /*
2845  * Build message
2846  */
2851  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
2852  sizeof (struct memb_ring_id));
2853  assert (token_hold_cancel.header.nodeid);
2854 
2855  instance->stats.token_hold_cancel_tx++;
2856 
2858  sizeof (struct token_hold_cancel));
2859 
2860  return (0);
2861 }
2862 
2863 static int orf_token_send_initial (struct totemsrp_instance *instance)
2864 {
2865  struct orf_token orf_token;
2866  int res;
2867 
2868  orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
2869  orf_token.header.endian_detector = ENDIAN_LOCAL;
2870  orf_token.header.encapsulated = 0;
2871  orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
2872  assert (orf_token.header.nodeid);
2873  orf_token.seq = SEQNO_START_MSG;
2874  orf_token.token_seq = SEQNO_START_TOKEN;
2875  orf_token.retrans_flg = 1;
2876  instance->my_set_retrans_flg = 1;
2877  instance->stats.orf_token_tx++;
2878 
2879  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
2880  orf_token.retrans_flg = 0;
2881  instance->my_set_retrans_flg = 0;
2882  } else {
2883  orf_token.retrans_flg = 1;
2884  instance->my_set_retrans_flg = 1;
2885  }
2886 
2887  orf_token.aru = 0;
2888  orf_token.aru = SEQNO_START_MSG - 1;
2889  orf_token.aru_addr = instance->my_id.addr[0].nodeid;
2890 
2891  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2892  orf_token.fcc = 0;
2893  orf_token.backlog = 0;
2894 
2895  orf_token.rtr_list_entries = 0;
2896 
2897  res = token_send (instance, &orf_token, 1);
2898 
2899  return (res);
2900 }
2901 
2902 static void memb_state_commit_token_update (
2903  struct totemsrp_instance *instance)
2904 {
2905  struct srp_addr *addr;
2906  struct memb_commit_token_memb_entry *memb_list;
2907  unsigned int high_aru;
2908  unsigned int i;
2909 
2910  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
2911  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
2912 
2913  memcpy (instance->my_new_memb_list, addr,
2914  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
2915 
2916  instance->my_new_memb_entries = instance->commit_token->addr_entries;
2917 
2918  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
2919  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
2920 
2921  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
2922  /*
2923  * TODO high delivered is really instance->my_aru, but with safe this
2924  * could change?
2925  */
2926  instance->my_received_flg =
2927  (instance->my_aru == instance->my_high_seq_received);
2928 
2929  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
2930 
2931  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
2932  /*
2933  * find high aru up to current memb_index for all matching ring ids
2934  * if any ring id matching memb_index has aru less then high aru set
2935  * received flag for that entry to false
2936  */
2937  high_aru = memb_list[instance->commit_token->memb_index].aru;
2938  for (i = 0; i <= instance->commit_token->memb_index; i++) {
2939  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
2940  &memb_list[i].ring_id,
2941  sizeof (struct memb_ring_id)) == 0) {
2942 
2943  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
2944  high_aru = memb_list[i].aru;
2945  }
2946  }
2947  }
2948 
2949  for (i = 0; i <= instance->commit_token->memb_index; i++) {
2950  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
2951  &memb_list[i].ring_id,
2952  sizeof (struct memb_ring_id)) == 0) {
2953 
2954  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
2955  memb_list[i].received_flg = 0;
2956  if (i == instance->commit_token->memb_index) {
2957  instance->my_received_flg = 0;
2958  }
2959  }
2960  }
2961  }
2962 
2963  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
2964  instance->commit_token->memb_index += 1;
2965  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
2966  assert (instance->commit_token->header.nodeid);
2967 }
2968 
2969 static void memb_state_commit_token_target_set (
2970  struct totemsrp_instance *instance)
2971 {
2972  struct srp_addr *addr;
2973  unsigned int i;
2974 
2975  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
2976 
2977  for (i = 0; i < instance->totem_config->interface_count; i++) {
2979  instance->totemrrp_context,
2980  &addr[instance->commit_token->memb_index %
2981  instance->commit_token->addr_entries].addr[i],
2982  i);
2983  }
2984 }
2985 
2986 static int memb_state_commit_token_send_recovery (
2987  struct totemsrp_instance *instance,
2988  struct memb_commit_token *commit_token)
2989 {
2990  unsigned int commit_token_size;
2991 
2992  commit_token->token_seq++;
2993  commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
2994  commit_token_size = sizeof (struct memb_commit_token) +
2995  ((sizeof (struct srp_addr) +
2996  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
2997  /*
2998  * Make a copy for retransmission if necessary
2999  */
3000  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3001  instance->orf_token_retransmit_size = commit_token_size;
3002 
3003  instance->stats.memb_commit_token_tx++;
3004 
3006  commit_token,
3007  commit_token_size);
3008 
3009  /*
3010  * Request retransmission of the commit token in case it is lost
3011  */
3012  reset_token_retransmit_timeout (instance);
3013  return (0);
3014 }
3015 
3016 static int memb_state_commit_token_send (
3017  struct totemsrp_instance *instance)
3018 {
3019  unsigned int commit_token_size;
3020 
3021  instance->commit_token->token_seq++;
3022  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3023  commit_token_size = sizeof (struct memb_commit_token) +
3024  ((sizeof (struct srp_addr) +
3025  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3026  /*
3027  * Make a copy for retransmission if necessary
3028  */
3029  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3030  instance->orf_token_retransmit_size = commit_token_size;
3031 
3032  instance->stats.memb_commit_token_tx++;
3033 
3035  instance->commit_token,
3036  commit_token_size);
3037 
3038  /*
3039  * Request retransmission of the commit token in case it is lost
3040  */
3041  reset_token_retransmit_timeout (instance);
3042  return (0);
3043 }
3044 
3045 
3046 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3047 {
3048  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3049  int token_memb_entries = 0;
3050  int i;
3051  struct totem_ip_address *lowest_addr;
3052 
3053  memb_set_subtract (token_memb, &token_memb_entries,
3054  instance->my_proc_list, instance->my_proc_list_entries,
3055  instance->my_failed_list, instance->my_failed_list_entries);
3056 
3057  /*
3058  * find representative by searching for smallest identifier
3059  */
3060 
3061  lowest_addr = &token_memb[0].addr[0];
3062  for (i = 1; i < token_memb_entries; i++) {
3063  if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
3064  totemip_copy (lowest_addr, &token_memb[i].addr[0]);
3065  }
3066  }
3067  return (totemip_compare (lowest_addr, &instance->my_id.addr[0]) == 0);
3068 }
3069 
3070 static int srp_addr_compare (const void *a, const void *b)
3071 {
3072  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3073  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3074 
3075  return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
3076 }
3077 
3078 static void memb_state_commit_token_create (
3079  struct totemsrp_instance *instance)
3080 {
3081  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3082  struct srp_addr *addr;
3083  struct memb_commit_token_memb_entry *memb_list;
3084  int token_memb_entries = 0;
3085 
3087  "Creating commit token because I am the rep.");
3088 
3089  memb_set_subtract (token_memb, &token_memb_entries,
3090  instance->my_proc_list, instance->my_proc_list_entries,
3091  instance->my_failed_list, instance->my_failed_list_entries);
3092 
3093  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3096  instance->commit_token->header.encapsulated = 0;
3097  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3098  assert (instance->commit_token->header.nodeid);
3099 
3100  totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
3101 
3102  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3103 
3104  /*
3105  * This qsort is necessary to ensure the commit token traverses
3106  * the ring in the proper order
3107  */
3108  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3109  srp_addr_compare);
3110 
3111  instance->commit_token->memb_index = 0;
3112  instance->commit_token->addr_entries = token_memb_entries;
3113 
3114  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3115  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3116 
3117  memcpy (addr, token_memb,
3118  token_memb_entries * sizeof (struct srp_addr));
3119  memset (memb_list, 0,
3120  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3121 }
3122 
3123 static void memb_join_message_send (struct totemsrp_instance *instance)
3124 {
3125  char memb_join_data[40000];
3126  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3127  char *addr;
3128  unsigned int addr_idx;
3129 
3130  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3131  memb_join->header.endian_detector = ENDIAN_LOCAL;
3132  memb_join->header.encapsulated = 0;
3133  memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
3134  assert (memb_join->header.nodeid);
3135 
3136  memb_join->ring_seq = instance->my_ring_id.seq;
3137  memb_join->proc_list_entries = instance->my_proc_list_entries;
3138  memb_join->failed_list_entries = instance->my_failed_list_entries;
3139  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3140 
3141  /*
3142  * This mess adds the joined and failed processor lists into the join
3143  * message
3144  */
3145  addr = (char *)memb_join;
3146  addr_idx = sizeof (struct memb_join);
3147  memcpy (&addr[addr_idx],
3148  instance->my_proc_list,
3149  instance->my_proc_list_entries *
3150  sizeof (struct srp_addr));
3151  addr_idx +=
3152  instance->my_proc_list_entries *
3153  sizeof (struct srp_addr);
3154  memcpy (&addr[addr_idx],
3155  instance->my_failed_list,
3156  instance->my_failed_list_entries *
3157  sizeof (struct srp_addr));
3158  addr_idx +=
3159  instance->my_failed_list_entries *
3160  sizeof (struct srp_addr);
3161 
3162 
3163  if (instance->totem_config->send_join_timeout) {
3164  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3165  }
3166 
3167  instance->stats.memb_join_tx++;
3168 
3170  instance->totemrrp_context,
3171  memb_join,
3172  addr_idx);
3173 }
3174 
3175 static void memb_leave_message_send (struct totemsrp_instance *instance)
3176 {
3177  char memb_join_data[40000];
3178  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3179  char *addr;
3180  unsigned int addr_idx;
3181  int active_memb_entries;
3182  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3183 
3185  "sending join/leave message");
3186 
3187  /*
3188  * add us to the failed list, and remove us from
3189  * the members list
3190  */
3191  memb_set_merge(
3192  &instance->my_id, 1,
3193  instance->my_failed_list, &instance->my_failed_list_entries);
3194 
3195  memb_set_subtract (active_memb, &active_memb_entries,
3196  instance->my_proc_list, instance->my_proc_list_entries,
3197  &instance->my_id, 1);
3198 
3199 
3200  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3201  memb_join->header.endian_detector = ENDIAN_LOCAL;
3202  memb_join->header.encapsulated = 0;
3203  memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3204 
3205  memb_join->ring_seq = instance->my_ring_id.seq;
3206  memb_join->proc_list_entries = active_memb_entries;
3207  memb_join->failed_list_entries = instance->my_failed_list_entries;
3208  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3209  memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
3210 
3211  // TODO: CC Maybe use the actual join send routine.
3212  /*
3213  * This mess adds the joined and failed processor lists into the join
3214  * message
3215  */
3216  addr = (char *)memb_join;
3217  addr_idx = sizeof (struct memb_join);
3218  memcpy (&addr[addr_idx],
3219  active_memb,
3220  active_memb_entries *
3221  sizeof (struct srp_addr));
3222  addr_idx +=
3223  active_memb_entries *
3224  sizeof (struct srp_addr);
3225  memcpy (&addr[addr_idx],
3226  instance->my_failed_list,
3227  instance->my_failed_list_entries *
3228  sizeof (struct srp_addr));
3229  addr_idx +=
3230  instance->my_failed_list_entries *
3231  sizeof (struct srp_addr);
3232 
3233 
3234  if (instance->totem_config->send_join_timeout) {
3235  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3236  }
3237  instance->stats.memb_join_tx++;
3238 
3240  instance->totemrrp_context,
3241  memb_join,
3242  addr_idx);
3243 }
3244 
3245 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3246 {
3248 
3253  srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3254  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3255  sizeof (struct memb_ring_id));
3256  assert (memb_merge_detect.header.nodeid);
3257 
3258  instance->stats.memb_merge_detect_tx++;
3261  sizeof (struct memb_merge_detect));
3262 }
3263 
3264 static void memb_ring_id_create_or_load (
3265  struct totemsrp_instance *instance,
3266  struct memb_ring_id *memb_ring_id)
3267 {
3268  int fd;
3269  int res = 0;
3270  char filename[PATH_MAX];
3271 
3272  snprintf (filename, sizeof(filename), "%s/ringid_%s",
3273  rundir, totemip_print (&instance->my_id.addr[0]));
3274  fd = open (filename, O_RDONLY, 0700);
3275  /*
3276  * If file can be opened and read, read the ring id
3277  */
3278  if (fd != -1) {
3279  res = read (fd, &memb_ring_id->seq, sizeof (uint64_t));
3280  close (fd);
3281  }
3282  /*
3283  * If file could not be opened or read, create a new ring id
3284  */
3285  if ((fd == -1) || (res != sizeof (uint64_t))) {
3286  memb_ring_id->seq = 0;
3287  umask(0);
3288  fd = open (filename, O_CREAT|O_RDWR, 0700);
3289  if (fd != -1) {
3290  res = write (fd, &memb_ring_id->seq, sizeof (uint64_t));
3291  close (fd);
3292  if (res == -1) {
3293  LOGSYS_PERROR (errno, instance->totemsrp_log_level_warning,
3294  "Couldn't write ringid file '%s'", filename);
3295  }
3296  } else {
3297  LOGSYS_PERROR (errno, instance->totemsrp_log_level_warning,
3298  "Couldn't create ringid file '%s'", filename);
3299  }
3300  }
3301 
3302  totemip_copy(&memb_ring_id->rep, &instance->my_id.addr[0]);
3303  assert (!totemip_zero_check(&memb_ring_id->rep));
3304  instance->token_ring_id_seq = memb_ring_id->seq;
3305 }
3306 
3307 static void memb_ring_id_set_and_store (
3308  struct totemsrp_instance *instance,
3309  const struct memb_ring_id *ring_id)
3310 {
3311  char filename[256];
3312  int fd;
3313  int res;
3314 
3315  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3316 
3317  snprintf (filename, sizeof(filename), "%s/ringid_%s",
3318  rundir, totemip_print (&instance->my_id.addr[0]));
3319 
3320  fd = open (filename, O_WRONLY, 0777);
3321  if (fd == -1) {
3322  fd = open (filename, O_CREAT|O_RDWR, 0777);
3323  }
3324  if (fd == -1) {
3325  LOGSYS_PERROR(errno, instance->totemsrp_log_level_warning,
3326  "Couldn't store new ring id %llx to stable storage",
3327  instance->my_ring_id.seq);
3328  assert (0);
3329  return;
3330  }
3332  "Storing new sequence id for ring %llx", instance->my_ring_id.seq);
3333  //assert (fd > 0);
3334  res = write (fd, &instance->my_ring_id.seq, sizeof (unsigned long long));
3335  assert (res == sizeof (unsigned long long));
3336  close (fd);
3337 }
3338 
3340  void *srp_context,
3341  void **handle_out,
3342  enum totem_callback_token_type type,
3343  int delete,
3344  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3345  const void *data)
3346 {
3347  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3348  struct token_callback_instance *callback_handle;
3349 
3350  token_hold_cancel_send (instance);
3351 
3352  callback_handle = malloc (sizeof (struct token_callback_instance));
3353  if (callback_handle == 0) {
3354  return (-1);
3355  }
3356  *handle_out = (void *)callback_handle;
3357  list_init (&callback_handle->list);
3358  callback_handle->callback_fn = callback_fn;
3359  callback_handle->data = (void *) data;
3360  callback_handle->callback_type = type;
3361  callback_handle->delete = delete;
3362  switch (type) {
3364  list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3365  break;
3367  list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3368  break;
3369  }
3370 
3371  return (0);
3372 }
3373 
3374 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3375 {
3376  struct token_callback_instance *h;
3377 
3378  if (*handle_out) {
3379  h = (struct token_callback_instance *)*handle_out;
3380  list_del (&h->list);
3381  free (h);
3382  h = NULL;
3383  *handle_out = 0;
3384  }
3385 }
3386 
3387 static void token_callbacks_execute (
3388  struct totemsrp_instance *instance,
3389  enum totem_callback_token_type type)
3390 {
3391  struct list_head *list;
3392  struct list_head *list_next;
3393  struct list_head *callback_listhead = 0;
3395  int res;
3396  int del;
3397 
3398  switch (type) {
3400  callback_listhead = &instance->token_callback_received_listhead;
3401  break;
3403  callback_listhead = &instance->token_callback_sent_listhead;
3404  break;
3405  default:
3406  assert (0);
3407  }
3408 
3409  for (list = callback_listhead->next; list != callback_listhead;
3410  list = list_next) {
3411 
3412  token_callback_instance = list_entry (list, struct token_callback_instance, list);
3413 
3414  list_next = list->next;
3415  del = token_callback_instance->delete;
3416  if (del == 1) {
3417  list_del (list);
3418  }
3419 
3420  res = token_callback_instance->callback_fn (
3421  token_callback_instance->callback_type,
3422  token_callback_instance->data);
3423  /*
3424  * This callback failed to execute, try it again on the next token
3425  */
3426  if (res == -1 && del == 1) {
3427  list_add (list, callback_listhead);
3428  } else if (del) {
3429  free (token_callback_instance);
3430  }
3431  }
3432 }
3433 
3434 /*
3435  * Flow control functions
3436  */
3437 static unsigned int backlog_get (struct totemsrp_instance *instance)
3438 {
3439  unsigned int backlog = 0;
3440  struct cs_queue *queue_use = NULL;
3441 
3442  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3443  if (instance->waiting_trans_ack) {
3444  queue_use = &instance->new_message_queue_trans;
3445  } else {
3446  queue_use = &instance->new_message_queue;
3447  }
3448  } else
3449  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3450  queue_use = &instance->retrans_message_queue;
3451  }
3452 
3453  if (queue_use != NULL) {
3454  backlog = cs_queue_used (queue_use);
3455  }
3456 
3457  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3458  return (backlog);
3459 }
3460 
3461 static int fcc_calculate (
3462  struct totemsrp_instance *instance,
3463  struct orf_token *token)
3464 {
3465  unsigned int transmits_allowed;
3466  unsigned int backlog_calc;
3467 
3468  transmits_allowed = instance->totem_config->max_messages;
3469 
3470  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3471  transmits_allowed = instance->totem_config->window_size - token->fcc;
3472  }
3473 
3474  instance->my_cbl = backlog_get (instance);
3475 
3476  /*
3477  * Only do backlog calculation if there is a backlog otherwise
3478  * we would result in div by zero
3479  */
3480  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3481  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3482  (token->backlog + instance->my_cbl - instance->my_pbl);
3483  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3484  transmits_allowed = backlog_calc;
3485  }
3486  }
3487 
3488  return (transmits_allowed);
3489 }
3490 
3491 /*
3492  * don't overflow the RTR sort queue
3493  */
3494 static void fcc_rtr_limit (
3495  struct totemsrp_instance *instance,
3496  struct orf_token *token,
3497  unsigned int *transmits_allowed)
3498 {
3499  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3500  check -= (*transmits_allowed + instance->totem_config->window_size);
3501  assert (check >= 0);
3502  if (sq_lt_compare (instance->last_released +
3503  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3504  instance->totem_config->window_size,
3505 
3506  token->seq)) {
3507 
3508  *transmits_allowed = 0;
3509  }
3510 }
3511 
3512 static void fcc_token_update (
3513  struct totemsrp_instance *instance,
3514  struct orf_token *token,
3515  unsigned int msgs_transmitted)
3516 {
3517  token->fcc += msgs_transmitted - instance->my_trc;
3518  token->backlog += instance->my_cbl - instance->my_pbl;
3519  instance->my_trc = msgs_transmitted;
3520  instance->my_pbl = instance->my_cbl;
3521 }
3522 
3523 /*
3524  * Message Handlers
3525  */
3526 
3527 unsigned long long int tv_old;
3528 /*
3529  * message handler called when TOKEN message type received
3530  */
3531 static int message_handler_orf_token (
3532  struct totemsrp_instance *instance,
3533  const void *msg,
3534  size_t msg_len,
3535  int endian_conversion_needed)
3536 {
3537  char token_storage[1500];
3538  char token_convert[1500];
3539  struct orf_token *token = NULL;
3540  int forward_token;
3541  unsigned int transmits_allowed;
3542  unsigned int mcasted_retransmit;
3543  unsigned int mcasted_regular;
3544  unsigned int last_aru;
3545 
3546 #ifdef GIVEINFO
3547  unsigned long long tv_current;
3548  unsigned long long tv_diff;
3549 
3550  tv_current = qb_util_nano_current_get ();
3551  tv_diff = tv_current - tv_old;
3552  tv_old = tv_current;
3553 
3555  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3556 #endif
3557 
3558  if (instance->orf_token_discard) {
3559  return (0);
3560  }
3561 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3562  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3563  return (0);
3564  }
3565 #endif
3566 
3567  if (endian_conversion_needed) {
3568  orf_token_endian_convert ((struct orf_token *)msg,
3569  (struct orf_token *)token_convert);
3570  msg = (struct orf_token *)token_convert;
3571  }
3572 
3573  /*
3574  * Make copy of token and retransmit list in case we have
3575  * to flush incoming messages from the kernel queue
3576  */
3577  token = (struct orf_token *)token_storage;
3578  memcpy (token, msg, sizeof (struct orf_token));
3579  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3580  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3581 
3582 
3583  /*
3584  * Handle merge detection timeout
3585  */
3586  if (token->seq == instance->my_last_seq) {
3587  start_merge_detect_timeout (instance);
3588  instance->my_seq_unchanged += 1;
3589  } else {
3590  cancel_merge_detect_timeout (instance);
3591  cancel_token_hold_retransmit_timeout (instance);
3592  instance->my_seq_unchanged = 0;
3593  }
3594 
3595  instance->my_last_seq = token->seq;
3596 
3597 #ifdef TEST_RECOVERY_MSG_COUNT
3598  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3599  return (0);
3600  }
3601 #endif
3602 
3604 
3605  /*
3606  * Determine if we should hold (in reality drop) the token
3607  */
3608  instance->my_token_held = 0;
3609  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
3610  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3611  instance->my_token_held = 1;
3612  } else
3613  if (!totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
3614  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3615  instance->my_token_held = 1;
3616  }
3617 
3618  /*
3619  * Hold onto token when there is no activity on ring and
3620  * this processor is the ring rep
3621  */
3622  forward_token = 1;
3623  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
3624  if (instance->my_token_held) {
3625  forward_token = 0;
3626  }
3627  }
3628 
3629  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3630 
3631  switch (instance->memb_state) {
3632  case MEMB_STATE_COMMIT:
3633  /* Discard token */
3634  break;
3635 
3637  messages_free (instance, token->aru);
3638  /*
3639  * Do NOT add break, this case should also execute code in gather case.
3640  */
3641 
3642  case MEMB_STATE_GATHER:
3643  /*
3644  * DO NOT add break, we use different free mechanism in recovery state
3645  */
3646 
3647  case MEMB_STATE_RECOVERY:
3648  /*
3649  * Discard tokens from another configuration
3650  */
3651  if (memcmp (&token->ring_id, &instance->my_ring_id,
3652  sizeof (struct memb_ring_id)) != 0) {
3653 
3654  if ((forward_token)
3655  && instance->use_heartbeat) {
3656  reset_heartbeat_timeout(instance);
3657  }
3658  else {
3659  cancel_heartbeat_timeout(instance);
3660  }
3661 
3662  return (0); /* discard token */
3663  }
3664 
3665  /*
3666  * Discard retransmitted tokens
3667  */
3668  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3669  return (0); /* discard token */
3670  }
3671  last_aru = instance->my_last_aru;
3672  instance->my_last_aru = token->aru;
3673 
3674  transmits_allowed = fcc_calculate (instance, token);
3675  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3676 
3677  fcc_rtr_limit (instance, token, &transmits_allowed);
3678  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3679 /*
3680 if (mcasted_regular) {
3681 printf ("mcasted regular %d\n", mcasted_regular);
3682 printf ("token seq %d\n", token->seq);
3683 }
3684 */
3685  fcc_token_update (instance, token, mcasted_retransmit +
3686  mcasted_regular);
3687 
3688  if (sq_lt_compare (instance->my_aru, token->aru) ||
3689  instance->my_id.addr[0].nodeid == token->aru_addr ||
3690  token->aru_addr == 0) {
3691 
3692  token->aru = instance->my_aru;
3693  if (token->aru == token->seq) {
3694  token->aru_addr = 0;
3695  } else {
3696  token->aru_addr = instance->my_id.addr[0].nodeid;
3697  }
3698  }
3699  if (token->aru == last_aru && token->aru_addr != 0) {
3700  instance->my_aru_count += 1;
3701  } else {
3702  instance->my_aru_count = 0;
3703  }
3704 
3705  /*
3706  * We really don't follow specification there. In specification, OTHER nodes
3707  * detect failure of one node (based on aru_count) and my_id IS NEVER added
3708  * to failed list (so node never mark itself as failed)
3709  */
3710  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
3711  token->aru_addr == instance->my_id.addr[0].nodeid) {
3712 
3714  "FAILED TO RECEIVE");
3715 
3716  instance->failed_to_recv = 1;
3717 
3718  memb_set_merge (&instance->my_id, 1,
3719  instance->my_failed_list,
3720  &instance->my_failed_list_entries);
3721 
3722  memb_state_gather_enter (instance, 6);
3723  } else {
3724  instance->my_token_seq = token->token_seq;
3725  token->token_seq += 1;
3726 
3727  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3728  /*
3729  * instance->my_aru == instance->my_high_seq_received means this processor
3730  * has recovered all messages it can recover
3731  * (ie: its retrans queue is empty)
3732  */
3733  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
3734 
3735  if (token->retrans_flg == 0) {
3736  token->retrans_flg = 1;
3737  instance->my_set_retrans_flg = 1;
3738  }
3739  } else
3740  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
3741  token->retrans_flg = 0;
3742  instance->my_set_retrans_flg = 0;
3743  }
3745  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
3746  token->retrans_flg, instance->my_set_retrans_flg,
3747  cs_queue_is_empty (&instance->retrans_message_queue),
3748  instance->my_retrans_flg_count, token->aru);
3749  if (token->retrans_flg == 0) {
3750  instance->my_retrans_flg_count += 1;
3751  } else {
3752  instance->my_retrans_flg_count = 0;
3753  }
3754  if (instance->my_retrans_flg_count == 2) {
3755  instance->my_install_seq = token->seq;
3756  }
3758  "install seq %x aru %x high seq received %x",
3759  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
3760  if (instance->my_retrans_flg_count >= 2 &&
3761  instance->my_received_flg == 0 &&
3762  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
3763  instance->my_received_flg = 1;
3764  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
3765  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
3766  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
3767  }
3768  if (instance->my_retrans_flg_count >= 3 &&
3769  sq_lte_compare (instance->my_install_seq, token->aru)) {
3770  instance->my_rotation_counter += 1;
3771  } else {
3772  instance->my_rotation_counter = 0;
3773  }
3774  if (instance->my_rotation_counter == 2) {
3776  "retrans flag count %x token aru %x install seq %x aru %x %x",
3777  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
3778  instance->my_aru, token->seq);
3779 
3780  memb_state_operational_enter (instance);
3781  instance->my_rotation_counter = 0;
3782  instance->my_retrans_flg_count = 0;
3783  }
3784  }
3785 
3787  token_send (instance, token, forward_token);
3788 
3789 #ifdef GIVEINFO
3790  tv_current = qb_util_nano_current_get ();
3791  tv_diff = tv_current - tv_old;
3792  tv_old = tv_current;
3794  "I held %0.4f ms",
3795  ((float)tv_diff) / 1000000.0);
3796 #endif
3797  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3798  messages_deliver_to_app (instance, 0,
3799  instance->my_high_seq_received);
3800  }
3801 
3802  /*
3803  * Deliver messages after token has been transmitted
3804  * to improve performance
3805  */
3806  reset_token_timeout (instance); // REVIEWED
3807  reset_token_retransmit_timeout (instance); // REVIEWED
3808  if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) &&
3809  instance->my_token_held == 1) {
3810 
3811  start_token_hold_retransmit_timeout (instance);
3812  }
3813 
3814  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
3815  }
3816  break;
3817  }
3818 
3819  if ((forward_token)
3820  && instance->use_heartbeat) {
3821  reset_heartbeat_timeout(instance);
3822  }
3823  else {
3824  cancel_heartbeat_timeout(instance);
3825  }
3826 
3827  return (0);
3828 }
3829 
3830 static void messages_deliver_to_app (
3831  struct totemsrp_instance *instance,
3832  int skip,
3833  unsigned int end_point)
3834 {
3835  struct sort_queue_item *sort_queue_item_p;
3836  unsigned int i;
3837  int res;
3838  struct mcast *mcast_in;
3839  struct mcast mcast_header;
3840  unsigned int range = 0;
3841  int endian_conversion_required;
3842  unsigned int my_high_delivered_stored = 0;
3843 
3844 
3845  range = end_point - instance->my_high_delivered;
3846 
3847  if (range) {
3849  "Delivering %x to %x", instance->my_high_delivered,
3850  end_point);
3851  }
3852  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
3853  my_high_delivered_stored = instance->my_high_delivered;
3854 
3855  /*
3856  * Deliver messages in order from rtr queue to pending delivery queue
3857  */
3858  for (i = 1; i <= range; i++) {
3859 
3860  void *ptr = 0;
3861 
3862  /*
3863  * If out of range of sort queue, stop assembly
3864  */
3865  res = sq_in_range (&instance->regular_sort_queue,
3866  my_high_delivered_stored + i);
3867  if (res == 0) {
3868  break;
3869  }
3870 
3871  res = sq_item_get (&instance->regular_sort_queue,
3872  my_high_delivered_stored + i, &ptr);
3873  /*
3874  * If hole, stop assembly
3875  */
3876  if (res != 0 && skip == 0) {
3877  break;
3878  }
3879 
3880  instance->my_high_delivered = my_high_delivered_stored + i;
3881 
3882  if (res != 0) {
3883  continue;
3884 
3885  }
3886 
3887  sort_queue_item_p = ptr;
3888 
3889  mcast_in = sort_queue_item_p->mcast;
3890  assert (mcast_in != (struct mcast *)0xdeadbeef);
3891 
3892  endian_conversion_required = 0;
3893  if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
3894  endian_conversion_required = 1;
3895  mcast_endian_convert (mcast_in, &mcast_header);
3896  } else {
3897  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
3898  }
3899 
3900  /*
3901  * Skip messages not originated in instance->my_deliver_memb
3902  */
3903  if (skip &&
3904  memb_set_subset (&mcast_header.system_from,
3905  1,
3906  instance->my_deliver_memb_list,
3907  instance->my_deliver_memb_entries) == 0) {
3908 
3909  instance->my_high_delivered = my_high_delivered_stored + i;
3910 
3911  continue;
3912  }
3913 
3914  /*
3915  * Message found
3916  */
3918  "Delivering MCAST message with seq %x to pending delivery queue",
3919  mcast_header.seq);
3920 
3921  /*
3922  * Message is locally originated multicast
3923  */
3924  instance->totemsrp_deliver_fn (
3925  mcast_header.header.nodeid,
3926  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
3927  sort_queue_item_p->msg_len - sizeof (struct mcast),
3928  endian_conversion_required);
3929  }
3930 }
3931 
3932 /*
3933  * recv message handler called when MCAST message type received
3934  */
3935 static int message_handler_mcast (
3936  struct totemsrp_instance *instance,
3937  const void *msg,
3938  size_t msg_len,
3939  int endian_conversion_needed)
3940 {
3941  struct sort_queue_item sort_queue_item;
3942  struct sq *sort_queue;
3943  struct mcast mcast_header;
3944 
3945 
3946  if (endian_conversion_needed) {
3947  mcast_endian_convert (msg, &mcast_header);
3948  } else {
3949  memcpy (&mcast_header, msg, sizeof (struct mcast));
3950  }
3951 
3952  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
3953  sort_queue = &instance->recovery_sort_queue;
3954  } else {
3955  sort_queue = &instance->regular_sort_queue;
3956  }
3957 
3958  assert (msg_len <= FRAME_SIZE_MAX);
3959 
3960 #ifdef TEST_DROP_MCAST_PERCENTAGE
3961  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
3962  return (0);
3963  }
3964 #endif
3965 
3966  /*
3967  * If the message is foreign execute the switch below
3968  */
3969  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
3970  sizeof (struct memb_ring_id)) != 0) {
3971 
3972  switch (instance->memb_state) {
3974  memb_set_merge (
3975  &mcast_header.system_from, 1,
3976  instance->my_proc_list, &instance->my_proc_list_entries);
3977  memb_state_gather_enter (instance, 7);
3978  break;
3979 
3980  case MEMB_STATE_GATHER:
3981  if (!memb_set_subset (
3982  &mcast_header.system_from,
3983  1,
3984  instance->my_proc_list,
3985  instance->my_proc_list_entries)) {
3986 
3987  memb_set_merge (&mcast_header.system_from, 1,
3988  instance->my_proc_list, &instance->my_proc_list_entries);
3989  memb_state_gather_enter (instance, 8);
3990  return (0);
3991  }
3992  break;
3993 
3994  case MEMB_STATE_COMMIT:
3995  /* discard message */
3996  instance->stats.rx_msg_dropped++;
3997  break;
3998 
3999  case MEMB_STATE_RECOVERY:
4000  /* discard message */
4001  instance->stats.rx_msg_dropped++;
4002  break;
4003  }
4004  return (0);
4005  }
4006 
4008  "Received ringid(%s:%lld) seq %x",
4009  totemip_print (&mcast_header.ring_id.rep),
4010  mcast_header.ring_id.seq,
4011  mcast_header.seq);
4012 
4013  /*
4014  * Add mcast message to rtr queue if not already in rtr queue
4015  * otherwise free io vectors
4016  */
4017  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4018  sq_in_range (sort_queue, mcast_header.seq) &&
4019  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4020 
4021  /*
4022  * Allocate new multicast memory block
4023  */
4024 // TODO LEAK
4025  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4026  if (sort_queue_item.mcast == NULL) {
4027  return (-1); /* error here is corrected by the algorithm */
4028  }
4029  memcpy (sort_queue_item.mcast, msg, msg_len);
4030  sort_queue_item.msg_len = msg_len;
4031 
4032  if (sq_lt_compare (instance->my_high_seq_received,
4033  mcast_header.seq)) {
4034  instance->my_high_seq_received = mcast_header.seq;
4035  }
4036 
4037  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4038  }
4039 
4040  update_aru (instance);
4041  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4042  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4043  }
4044 
4045 /* TODO remove from retrans message queue for old ring in recovery state */
4046  return (0);
4047 }
4048 
4049 static int message_handler_memb_merge_detect (
4050  struct totemsrp_instance *instance,
4051  const void *msg,
4052  size_t msg_len,
4053  int endian_conversion_needed)
4054 {
4056 
4057 
4058  if (endian_conversion_needed) {
4059  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4060  } else {
4061  memcpy (&memb_merge_detect, msg,
4062  sizeof (struct memb_merge_detect));
4063  }
4064 
4065  /*
4066  * do nothing if this is a merge detect from this configuration
4067  */
4068  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4069  sizeof (struct memb_ring_id)) == 0) {
4070 
4071  return (0);
4072  }
4073 
4074  /*
4075  * Execute merge operation
4076  */
4077  switch (instance->memb_state) {
4079  memb_set_merge (&memb_merge_detect.system_from, 1,
4080  instance->my_proc_list, &instance->my_proc_list_entries);
4081  memb_state_gather_enter (instance, 9);
4082  break;
4083 
4084  case MEMB_STATE_GATHER:
4085  if (!memb_set_subset (
4087  1,
4088  instance->my_proc_list,
4089  instance->my_proc_list_entries)) {
4090 
4091  memb_set_merge (&memb_merge_detect.system_from, 1,
4092  instance->my_proc_list, &instance->my_proc_list_entries);
4093  memb_state_gather_enter (instance, 10);
4094  return (0);
4095  }
4096  break;
4097 
4098  case MEMB_STATE_COMMIT:
4099  /* do nothing in commit */
4100  break;
4101 
4102  case MEMB_STATE_RECOVERY:
4103  /* do nothing in recovery */
4104  break;
4105  }
4106  return (0);
4107 }
4108 
4109 static void memb_join_process (
4110  struct totemsrp_instance *instance,
4111  const struct memb_join *memb_join)
4112 {
4113  struct srp_addr *proc_list;
4114  struct srp_addr *failed_list;
4115  int gather_entered = 0;
4116  int fail_minus_memb_entries = 0;
4117  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4118 
4119  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4120  failed_list = proc_list + memb_join->proc_list_entries;
4121 
4122 /*
4123  memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
4124  memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
4125  memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4126  memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4127 -*/
4128 
4129  if (memb_set_equal (proc_list,
4130  memb_join->proc_list_entries,
4131  instance->my_proc_list,
4132  instance->my_proc_list_entries) &&
4133 
4134  memb_set_equal (failed_list,
4135  memb_join->failed_list_entries,
4136  instance->my_failed_list,
4137  instance->my_failed_list_entries)) {
4138 
4139  memb_consensus_set (instance, &memb_join->system_from);
4140 
4141  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4142  instance->failed_to_recv = 0;
4143  srp_addr_copy (&instance->my_proc_list[0],
4144  &instance->my_id);
4145  instance->my_proc_list_entries = 1;
4146  instance->my_failed_list_entries = 0;
4147 
4148  memb_state_commit_token_create (instance);
4149 
4150  memb_state_commit_enter (instance);
4151  return;
4152  }
4153  if (memb_consensus_agreed (instance) &&
4154  memb_lowest_in_config (instance)) {
4155 
4156  memb_state_commit_token_create (instance);
4157 
4158  memb_state_commit_enter (instance);
4159  } else {
4160  goto out;
4161  }
4162  } else
4163  if (memb_set_subset (proc_list,
4164  memb_join->proc_list_entries,
4165  instance->my_proc_list,
4166  instance->my_proc_list_entries) &&
4167 
4168  memb_set_subset (failed_list,
4169  memb_join->failed_list_entries,
4170  instance->my_failed_list,
4171  instance->my_failed_list_entries)) {
4172 
4173  goto out;
4174  } else
4175  if (memb_set_subset (&memb_join->system_from, 1,
4176  instance->my_failed_list, instance->my_failed_list_entries)) {
4177 
4178  goto out;
4179  } else {
4180  memb_set_merge (proc_list,
4181  memb_join->proc_list_entries,
4182  instance->my_proc_list, &instance->my_proc_list_entries);
4183 
4184  if (memb_set_subset (
4185  &instance->my_id, 1,
4186  failed_list, memb_join->failed_list_entries)) {
4187 
4188  memb_set_merge (
4189  &memb_join->system_from, 1,
4190  instance->my_failed_list, &instance->my_failed_list_entries);
4191  } else {
4192  if (memb_set_subset (
4193  &memb_join->system_from, 1,
4194  instance->my_memb_list,
4195  instance->my_memb_entries)) {
4196 
4197  if (memb_set_subset (
4198  &memb_join->system_from, 1,
4199  instance->my_failed_list,
4200  instance->my_failed_list_entries) == 0) {
4201 
4202  memb_set_merge (failed_list,
4203  memb_join->failed_list_entries,
4204  instance->my_failed_list, &instance->my_failed_list_entries);
4205  } else {
4206  memb_set_subtract (fail_minus_memb,
4207  &fail_minus_memb_entries,
4208  failed_list,
4209  memb_join->failed_list_entries,
4210  instance->my_memb_list,
4211  instance->my_memb_entries);
4212 
4213  memb_set_merge (fail_minus_memb,
4214  fail_minus_memb_entries,
4215  instance->my_failed_list,
4216  &instance->my_failed_list_entries);
4217  }
4218  }
4219  }
4220  memb_state_gather_enter (instance, 11);
4221  gather_entered = 1;
4222  }
4223 
4224 out:
4225  if (gather_entered == 0 &&
4226  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4227 
4228  memb_state_gather_enter (instance, 12);
4229  }
4230 }
4231 
4232 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4233 {
4234  int i;
4235  struct srp_addr *in_proc_list;
4236  struct srp_addr *in_failed_list;
4237  struct srp_addr *out_proc_list;
4238  struct srp_addr *out_failed_list;
4239 
4240  out->header.type = in->header.type;
4242  out->header.nodeid = swab32 (in->header.nodeid);
4243  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4246  out->ring_seq = swab64 (in->ring_seq);
4247 
4248  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4249  in_failed_list = in_proc_list + out->proc_list_entries;
4250  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4251  out_failed_list = out_proc_list + out->proc_list_entries;
4252 
4253  for (i = 0; i < out->proc_list_entries; i++) {
4254  srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4255  }
4256  for (i = 0; i < out->failed_list_entries; i++) {
4257  srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4258  }
4259 }
4260 
4261 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4262 {
4263  int i;
4264  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4265  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4266  struct memb_commit_token_memb_entry *in_memb_list;
4267  struct memb_commit_token_memb_entry *out_memb_list;
4268 
4269  out->header.type = in->header.type;
4271  out->header.nodeid = swab32 (in->header.nodeid);
4272  out->token_seq = swab32 (in->token_seq);
4274  out->ring_id.seq = swab64 (in->ring_id.seq);
4275  out->retrans_flg = swab32 (in->retrans_flg);
4276  out->memb_index = swab32 (in->memb_index);
4277  out->addr_entries = swab32 (in->addr_entries);
4278 
4279  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4280  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4281  for (i = 0; i < out->addr_entries; i++) {
4282  srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4283 
4284  /*
4285  * Only convert the memb entry if it has been set
4286  */
4287  if (in_memb_list[i].ring_id.rep.family != 0) {
4288  totemip_copy_endian_convert (&out_memb_list[i].ring_id.rep,
4289  &in_memb_list[i].ring_id.rep);
4290 
4291  out_memb_list[i].ring_id.seq =
4292  swab64 (in_memb_list[i].ring_id.seq);
4293  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4294  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4295  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4296  }
4297  }
4298 }
4299 
4300 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4301 {
4302  int i;
4303 
4304  out->header.type = in->header.type;
4306  out->header.nodeid = swab32 (in->header.nodeid);
4307  out->seq = swab32 (in->seq);
4308  out->token_seq = swab32 (in->token_seq);
4309  out->aru = swab32 (in->aru);
4311  out->aru_addr = swab32(in->aru_addr);
4312  out->ring_id.seq = swab64 (in->ring_id.seq);
4313  out->fcc = swab32 (in->fcc);
4314  out->backlog = swab32 (in->backlog);
4315  out->retrans_flg = swab32 (in->retrans_flg);
4317  for (i = 0; i < out->rtr_list_entries; i++) {
4319  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4320  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4321  }
4322 }
4323 
4324 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4325 {
4326  out->header.type = in->header.type;
4328  out->header.nodeid = swab32 (in->header.nodeid);
4330 
4331  out->seq = swab32 (in->seq);
4332  out->this_seqno = swab32 (in->this_seqno);
4334  out->ring_id.seq = swab64 (in->ring_id.seq);
4335  out->node_id = swab32 (in->node_id);
4336  out->guarantee = swab32 (in->guarantee);
4337  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4338 }
4339 
4340 static void memb_merge_detect_endian_convert (
4341  const struct memb_merge_detect *in,
4342  struct memb_merge_detect *out)
4343 {
4344  out->header.type = in->header.type;
4346  out->header.nodeid = swab32 (in->header.nodeid);
4348  out->ring_id.seq = swab64 (in->ring_id.seq);
4349  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4350 }
4351 
4352 static int message_handler_memb_join (
4353  struct totemsrp_instance *instance,
4354  const void *msg,
4355  size_t msg_len,
4356  int endian_conversion_needed)
4357 {
4358  const struct memb_join *memb_join;
4359  struct memb_join *memb_join_convert = alloca (msg_len);
4360 
4361  if (endian_conversion_needed) {
4362  memb_join = memb_join_convert;
4363  memb_join_endian_convert (msg, memb_join_convert);
4364 
4365  } else {
4366  memb_join = msg;
4367  }
4368  /*
4369  * If the process paused because it wasn't scheduled in a timely
4370  * fashion, flush the join messages because they may be queued
4371  * entries
4372  */
4373  if (pause_flush (instance)) {
4374  return (0);
4375  }
4376 
4377  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4378  instance->token_ring_id_seq = memb_join->ring_seq;
4379  }
4380  switch (instance->memb_state) {
4382  memb_join_process (instance, memb_join);
4383  break;
4384 
4385  case MEMB_STATE_GATHER:
4386  memb_join_process (instance, memb_join);
4387  break;
4388 
4389  case MEMB_STATE_COMMIT:
4390  if (memb_set_subset (&memb_join->system_from,
4391  1,
4392  instance->my_new_memb_list,
4393  instance->my_new_memb_entries) &&
4394 
4395  memb_join->ring_seq >= instance->my_ring_id.seq) {
4396 
4397  memb_join_process (instance, memb_join);
4398  memb_state_gather_enter (instance, 13);
4399  }
4400  break;
4401 
4402  case MEMB_STATE_RECOVERY:
4403  if (memb_set_subset (&memb_join->system_from,
4404  1,
4405  instance->my_new_memb_list,
4406  instance->my_new_memb_entries) &&
4407 
4408  memb_join->ring_seq >= instance->my_ring_id.seq) {
4409 
4410  memb_join_process (instance, memb_join);
4411  memb_recovery_state_token_loss (instance);
4412  memb_state_gather_enter (instance, 14);
4413  }
4414  break;
4415  }
4416  return (0);
4417 }
4418 
4419 static int message_handler_memb_commit_token (
4420  struct totemsrp_instance *instance,
4421  const void *msg,
4422  size_t msg_len,
4423  int endian_conversion_needed)
4424 {
4425  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4427  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4428  int sub_entries;
4429 
4430  struct srp_addr *addr;
4431 
4433  "got commit token");
4434 
4435  if (endian_conversion_needed) {
4436  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4437  } else {
4438  memcpy (memb_commit_token_convert, msg, msg_len);
4439  }
4440  memb_commit_token = memb_commit_token_convert;
4441  addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4442 
4443 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4444  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4445  return (0);
4446  }
4447 #endif
4448  switch (instance->memb_state) {
4450  /* discard token */
4451  break;
4452 
4453  case MEMB_STATE_GATHER:
4454  memb_set_subtract (sub, &sub_entries,
4455  instance->my_proc_list, instance->my_proc_list_entries,
4456  instance->my_failed_list, instance->my_failed_list_entries);
4457 
4458  if (memb_set_equal (addr,
4459  memb_commit_token->addr_entries,
4460  sub,
4461  sub_entries) &&
4462 
4463  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4464  memcpy (instance->commit_token, memb_commit_token, msg_len);
4465  memb_state_commit_enter (instance);
4466  }
4467  break;
4468 
4469  case MEMB_STATE_COMMIT:
4470  /*
4471  * If retransmitted commit tokens are sent on this ring
4472  * filter them out and only enter recovery once the
4473  * commit token has traversed the array. This is
4474  * determined by :
4475  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4476  */
4477  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4478  memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4479  memb_state_recovery_enter (instance, memb_commit_token);
4480  }
4481  break;
4482 
4483  case MEMB_STATE_RECOVERY:
4484  if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
4486  "Sending initial ORF token");
4487 
4488  // TODO convert instead of initiate
4489  orf_token_send_initial (instance);
4490  reset_token_timeout (instance); // REVIEWED
4491  reset_token_retransmit_timeout (instance); // REVIEWED
4492  }
4493  break;
4494  }
4495  return (0);
4496 }
4497 
4498 static int message_handler_token_hold_cancel (
4499  struct totemsrp_instance *instance,
4500  const void *msg,
4501  size_t msg_len,
4502  int endian_conversion_needed)
4503 {
4504  const struct token_hold_cancel *token_hold_cancel = msg;
4505 
4506  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4507  sizeof (struct memb_ring_id)) == 0) {
4508 
4509  instance->my_seq_unchanged = 0;
4510  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
4511  timer_function_token_retransmit_timeout (instance);
4512  }
4513  }
4514  return (0);
4515 }
4516 
4518  void *context,
4519  const void *msg,
4520  unsigned int msg_len)
4521 {
4522  struct totemsrp_instance *instance = context;
4523  const struct message_header *message_header = msg;
4524 
4525  if (msg_len < sizeof (struct message_header)) {
4527  "Received message is too short... ignoring %u.",
4528  (unsigned int)msg_len);
4529  return;
4530  }
4531 
4532  switch (message_header->type) {
4534  instance->stats.orf_token_rx++;
4535  break;
4536  case MESSAGE_TYPE_MCAST:
4537  instance->stats.mcast_rx++;
4538  break;
4540  instance->stats.memb_merge_detect_rx++;
4541  break;
4543  instance->stats.memb_join_rx++;
4544  break;
4546  instance->stats.memb_commit_token_rx++;
4547  break;
4549  instance->stats.token_hold_cancel_rx++;
4550  break;
4551  default:
4552  log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
4553 printf ("wrong message type\n");
4554  instance->stats.rx_msg_dropped++;
4555  return;
4556  }
4557  /*
4558  * Handle incoming message
4559  */
4560  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
4561  instance,
4562  msg,
4563  msg_len,
4564  message_header->endian_detector != ENDIAN_LOCAL);
4565 }
4566 
4568  void *context,
4569  const struct totem_ip_address *iface_addr,
4570  unsigned int iface_no)
4571 {
4572  struct totemsrp_instance *instance = context;
4573  int i;
4574 
4575  totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
4576  assert (instance->my_id.addr[iface_no].nodeid);
4577 
4578  totemip_copy (&instance->my_memb_list[0].addr[iface_no], iface_addr);
4579 
4580  if (instance->iface_changes++ == 0) {
4581  memb_ring_id_create_or_load (instance, &instance->my_ring_id);
4582  log_printf (
4583  instance->totemsrp_log_level_debug,
4584  "Created or loaded sequence id %llx.%s for this ring.",
4585  instance->my_ring_id.seq,
4586  totemip_print (&instance->my_ring_id.rep));
4587 
4588  if (instance->totemsrp_service_ready_fn) {
4589  instance->totemsrp_service_ready_fn ();
4590  }
4591 
4592  }
4593 
4594  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
4595  totemsrp_member_add (instance,
4596  &instance->totem_config->interfaces[iface_no].member_list[i],
4597  iface_no);
4598  }
4599 
4600  if (instance->iface_changes >= instance->totem_config->interface_count) {
4601  memb_state_gather_enter (instance, 15);
4602  }
4603 }
4604 
4605 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
4606  totem_config->net_mtu -= sizeof (struct mcast);
4607 }
4608 
4610  void *context,
4611  void (*totem_service_ready) (void))
4612 {
4613  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4614 
4615  instance->totemsrp_service_ready_fn = totem_service_ready;
4616 }
4617 
4619  void *context,
4620  const struct totem_ip_address *member,
4621  int ring_no)
4622 {
4623  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4624  int res;
4625 
4626  res = totemrrp_member_add (instance->totemrrp_context, member, ring_no);
4627 
4628  return (res);
4629 }
4630 
4632  void *context,
4633  const struct totem_ip_address *member,
4634  int ring_no)
4635 {
4636  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4637  int res;
4638 
4639  res = totemrrp_member_remove (instance->totemrrp_context, member, ring_no);
4640 
4641  return (res);
4642 }
4643 
4644 void totemsrp_threaded_mode_enable (void *context)
4645 {
4646  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4647 
4648  instance->threaded_mode_enabled = 1;
4649 }
4650 
4651 void totemsrp_trans_ack (void *context)
4652 {
4653  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4654 
4655  instance->waiting_trans_ack = 0;
4656  instance->totemsrp_waiting_trans_ack_cb_fn (0);
4657 }
unsigned int backlog
Definition: totemsrp.c:214
uint8_t no_addrs
Definition: totemsrp.c:157
unsigned short family
Definition: coroapi.h:97
int totemrrp_iface_check(void *rrp_context)
Definition: totemrrp.c:2058
void(*) in log_level_security)
Definition: totem.h:82
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:4567
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:101
struct srp_addr system_from
Definition: totemsrp.c:224
#define ENDIAN_LOCAL
Definition: totemsrp.c:137
uint64_t gather_entered
Definition: totem.h:252
struct memb_ring_id ring_id
Definition: totemsrp.c:202
struct list_head list
Definition: totemsrp.c:169
uint32_t waiting_trans_ack
Definition: totemsrp.c:511
struct srp_addr system_from
Definition: totemsrp.c:192
struct memb_ring_id ring_id
Definition: totemsrp.c:261
int totemsrp_log_level_debug
Definition: totemsrp.c:429
struct memb_ring_id my_ring_id
Definition: totemsrp.c:339
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
Definition: totem.h:247
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:464
struct message_header header
Definition: totemsrp.c:191
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:483
unsigned int proc_list_entries
Definition: totemsrp.c:225
uint32_t value
struct totem_interface * interfaces
Definition: totem.h:108
unsigned int interface_count
Definition: totem.h:109
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1100
struct list_head * next
Definition: list.h:47
uint64_t memb_join_tx
Definition: totem.h:241
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totem.h:266
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:214
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:98
int totemsrp_log_level_error
Definition: totemsrp.c:423
int old_ring_state_aru
Definition: totemsrp.c:481
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:102
unsigned int seq
Definition: totemsrp.c:209
struct memb_ring_id ring_id
Definition: totemsrp.c:251
int fcc_remcast_current
Definition: totemsrp.c:303
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:416
unsigned int failed_list_entries
Definition: totemsrp.c:226
uint64_t mcast_rx
Definition: totem.h:245
unsigned long long int tv_old
Definition: totemsrp.c:3527
#define SEQNO_START_TOKEN
Definition: totemsrp.c:115
unsigned int token_hold_timeout
Definition: totem.h:127
int member_count
Definition: totem.h:70
unsigned int msg_len
Definition: totemsrp.c:276
struct memb_ring_id ring_id
Definition: totemsrp.c:213
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:71
int totemip_compare(const void *a, const void *b)
Definition: totemip.c:130
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:4618
void * token_sent_event_handle
Definition: totemsrp.c:514
struct timeval tv_old
Definition: totemsrp.c:487
int retrans_flg
Definition: totemsrp.c:216
struct srp_addr system_from
Definition: totemsrp.c:239
int my_new_memb_entries
Definition: totemsrp.c:329
totem_configuration_type
Definition: coroapi.h:110
int totemsrp_log_level_notice
Definition: totemsrp.c:427
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1089
unsigned int my_pbl
Definition: totemsrp.c:497
char rrp_mode[TOTEM_RRP_MODE_BYTES]
Definition: totem.h:155
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:4605
int totemsrp_log_level_warning
Definition: totemsrp.c:425
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1075
#define LOCALSTATEDIR
Definition: config.h:343
unsigned int my_aru
Definition: totemsrp.c:383
uint64_t memb_merge_detect_rx
Definition: totem.h:240
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1017
int guarantee
Definition: totemsrp.c:155
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:372
struct message_header header
Definition: totemsrp.c:238
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:265
unsigned int seq
Definition: totemsrp.c:193
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:67
char commit_token_storage[40000]
Definition: totemsrp.c:515
unsigned int rrp_problem_count_timeout
Definition: totem.h:147
struct list_head token_callback_sent_listhead
Definition: totemsrp.c:389
Definition: sq.h:40
unsigned int set_aru
Definition: totemsrp.c:477
struct cs_queue new_message_queue
Definition: totemsrp.c:370
int my_rotation_counter
Definition: totemsrp.c:357
int earliest_token
Definition: totem.h:263
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:321
uint64_t orf_token_tx
Definition: totem.h:237
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3374
uint64_t gather_token_lost
Definition: totem.h:253
int totemsrp_log_level_trace
Definition: totemsrp.c:431
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:95
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
Definition: totemrrp.c:2067
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:341
memb_state
Definition: totemsrp.c:284
void * totemrrp_buffer_alloc(void *rrp_context)
Definition: totemrrp.c:1960
unsigned int downcheck_timeout
Definition: totem.h:139
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:101
uint64_t memb_commit_token_tx
Definition: totem.h:246
Definition: list.h:46
int my_deliver_memb_entries
Definition: totemsrp.c:335
unsigned int max_network_delay
Definition: totem.h:165
unsigned long long seq
Definition: coroapi.h:66
unsigned int heartbeat_failures_allowed
Definition: totem.h:163
#define TOTEM_TOKEN_STATS_MAX
Definition: totem.h:265
unsigned int my_last_seq
Definition: totemsrp.c:485
int my_left_memb_entries
Definition: totemsrp.c:337
#define swab64(x)
Definition: swab.h:52
unsigned long long token_ring_id_seq
Definition: totemsrp.c:473
struct totem_ip_address mcast_address
Definition: totemsrp.c:449
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3339
unsigned int send_join_timeout
Definition: totem.h:133
unsigned int window_size
Definition: totem.h:167
int guarantee
Definition: totemsrp.c:197
unsigned int seq
Definition: totemsrp.c:203
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:4609
unsigned int rrp_problem_count_threshold
Definition: totem.h:149
struct mcast * mcast
Definition: totemsrp.c:280
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
uint64_t operational_entered
Definition: totem.h:250
unsigned long long ring_seq
Definition: totemsrp.c:227
#define INTERFACE_MAX
Definition: coroapi.h:75
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2304
message_type
Definition: totemsrp.c:139
int latest_token
Definition: totem.h:264
uint64_t operational_token_lost
Definition: totem.h:251
unsigned int received_flg
Definition: totemsrp.c:152
uint64_t consensus_timeouts
Definition: totem.h:258
unsigned int aru_addr
Definition: totemsrp.c:212
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
Definition: totemsrp.c:385
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:631
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:412
uint64_t recovery_token_lost
Definition: totem.h:257
unsigned int backlog
Definition: totemsrp.c:155
int this_seqno
Definition: totemsrp.c:194
unsigned int token_retransmits_before_loss_const
Definition: totem.h:129
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:228
struct message_header header
Definition: totemsrp.c:245
int totemrrp_finalize(void *rrp_context)
Definition: totemrrp.c:1816
struct list_head token_callback_received_listhead
Definition: totemsrp.c:387
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2148
struct rtr_item rtr_list[0]
Definition: totemsrp.c:159
unsigned int retrans_flg
Definition: totemsrp.c:262
int totemsrp_ring_reenable(void *srp_context)
Definition: totemsrp.c:1112
struct memb_ring_id ring_id
Definition: totemsrp.c:195
unsigned int seqno_unchanged_const
Definition: totem.h:143
uint64_t commit_token_lost
Definition: totem.h:255
unsigned int miss_count_const
Definition: totem.h:181
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
Definition: totemrrp.c:2082
uint64_t token_hold_cancel_rx
Definition: totem.h:249
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:466
unsigned int join_timeout
Definition: totem.h:131
unsigned int aru
Definition: totemsrp.c:252
unsigned int nodeid
Definition: coroapi.h:96
int totemrrp_send_flush(void *rrp_context)
Definition: totemrrp.c:2005
uint64_t pause_timestamp
Definition: totemsrp.c:501
int my_set_retrans_flg
Definition: totemsrp.c:359
struct message_header header
Definition: totemsrp.c:208
struct totem_ip_address mcast_addr
Definition: totem.h:67
char encapsulated
Definition: totemrrp.c:521
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:85
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2135
Linked list API.
unsigned int received_flg
Definition: totemsrp.c:254
unsigned int my_cbl
Definition: totemsrp.c:499
struct totem_ip_address rep
Definition: coroapi.h:104
unsigned int last_released
Definition: totemsrp.c:475
int orf_token_retransmit_size
Definition: totemsrp.c:393
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2373
unsigned int rrp_autorecovery_check_timeout
Definition: totem.h:153
uint64_t mcast_retx
Definition: totem.h:244
unsigned int msg_len
Definition: totemsrp.c:281
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:97
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:75
unsigned int fail_to_recv_const
Definition: totem.h:141
unsigned int token_seq
Definition: totemsrp.c:210
struct mcast * mcast
Definition: totemsrp.c:275
void * token_recv_event_handle
Definition: totemsrp.c:513
struct totem_ip_address boundto
Definition: totem.h:66
unsigned int my_high_seq_received
Definition: totemsrp.c:353
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
Definition: totemrrp.c:1845
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:447
totem_event_type
Definition: totem.h:198
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:400
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:408
int old_ring_state_saved
Definition: totemsrp.c:479
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:345
uint64_t rx_msg_dropped
Definition: totem.h:259
int totemsrp_log_level_security
Definition: totemsrp.c:421
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:404
struct totem_config * totem_config
Definition: totemsrp.c:491
#define swab32(x)
Definition: swab.h:43
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:402
uint32_t continuous_gather
Definition: totem.h:260
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:4644
unsigned int aru
Definition: totemsrp.c:152
encapsulation_type
Definition: totemsrp.c:148
unsigned int net_mtu
Definition: totem.h:159
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:774
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2295
unsigned int node_id
Definition: totemsrp.c:196
int totemrrp_recv_flush(void *rrp_context)
Definition: totemrrp.c:1996
uint32_t orf_token_discard
Definition: totemsrp.c:507
int my_failed_list_entries
Definition: totemsrp.c:327
struct srp_addr my_id
Definition: totemsrp.c:309
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:323
uint64_t token_hold_cancel_tx
Definition: totem.h:248
unsigned int token_timeout
Definition: totem.h:123
Definition: totemsrp.c:250
unsigned int high_delivered
Definition: totemsrp.c:253
unsigned int consensus_timeout
Definition: totem.h:135
totemsrp_stats_t stats
Definition: totemsrp.c:505
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
Definition: totemsrp.c:4517
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:83
uint64_t mcast_tx
Definition: totem.h:243
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Definition: totemrrp.c:1967
void * totemrrp_context
Definition: totemsrp.c:489
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:391
struct message_header header
Definition: totemsrp.c:223
struct sq regular_sort_queue
Definition: totemsrp.c:376
int my_retrans_flg_count
Definition: totemsrp.c:361
#define SEQNO_START_MSG
Definition: totemsrp.c:114
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:993
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:435
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:96
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
unsigned short family
Definition: coroapi.h:66
struct cs_queue retrans_message_queue
Definition: totemsrp.c:374
unsigned int aru
Definition: totemsrp.c:211
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:410
int my_trans_memb_entries
Definition: totemsrp.c:331
unsigned int my_trc
Definition: totemsrp.c:495
uint64_t memb_merge_detect_tx
Definition: totem.h:239
unsigned int high_delivered
Definition: totemsrp.c:151
struct rtr_item rtr_list[0]
Definition: totemsrp.c:218
totemsrp_stats_t * srp
Definition: totem.h:275
int consensus_list_entries
Definition: totemsrp.c:307
unsigned int rrp_problem_count_mcast_threshold
Definition: totem.h:151
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
Definition: totemrrp.c:1974
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:443
uint64_t memb_join_rx
Definition: totem.h:242
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2038
#define FRAME_SIZE_MAX
Definition: totem.h:50
int rtr_list_entries
Definition: totemsrp.c:217
uint32_t threaded_mode_enabled
Definition: totemsrp.c:509
enum totem_callback_token_type callback_type
Definition: totemsrp.c:171
int totemrrp_mcast_recv_empty(void *rrp_context)
Definition: totemrrp.c:2124
int my_proc_list_entries
Definition: totemsrp.c:325
#define list_entry(ptr, type, member)
Definition: list.h:84
#define LOGSYS_PERROR(err_num, level, fmt, args...)
Definition: totemsrp.c:652
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:157
unsigned short endian_detector
Definition: totemrrp.c:522
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2024
struct memb_ring_id ring_id
Definition: totemsrp.c:246
#define log_printf(level, format, args...)
Definition: totemsrp.c:645
typedef __attribute__
unsigned long long seq
Definition: coroapi.h:105
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:4651
unsigned int max_messages
Definition: totem.h:169
uint64_t recovery_entered
Definition: totem.h:256
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:414
struct memb_commit_token * commit_token
Definition: totemsrp.c:503
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
struct srp_addr addr
Definition: totemsrp.c:163
char type
Definition: totemrrp.c:485
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:520
int totemsrp_subsys_id
Definition: totemsrp.c:433
unsigned int merge_timeout
Definition: totem.h:137
unsigned int use_heartbeat
Definition: totemsrp.c:493
struct message_header header
Definition: totemsrp.c:259
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:4631
unsigned int token_retransmit_timeout
Definition: totem.h:125
int rtr_list_entries
Definition: totemsrp.c:158
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:100
unsigned int token_seq
Definition: totemsrp.c:260
unsigned int nodeid
Definition: coroapi.h:65
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:71
unsigned int my_token_seq
Definition: totemsrp.c:395
struct memb_ring_id ring_id
Definition: totemsrp.c:153
unsigned int my_last_aru
Definition: totemsrp.c:347
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
Definition: totemrrp.c:2101
uint64_t commit_entered
Definition: totem.h:254
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:406
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:451
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:457
struct totem_ip_address addr[INTERFACE_MAX]
Definition: totemsrp.c:158
unsigned int rrp_token_expired_timeout
Definition: totem.h:145
struct memb_ring_id ring_id
Definition: totemsrp.c:240
unsigned int my_install_seq
Definition: totemsrp.c:355
uint64_t orf_token_rx
Definition: totem.h:238
unsigned int nodeid
Definition: totemsrp.c:186
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2013
unsigned int threads
Definition: totem.h:161
struct sq recovery_sort_queue
Definition: totemsrp.c:378
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
Definition: totemrrp.c:1986
totem_callback_token_type
Definition: coroapi.h:117
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:170
unsigned int my_high_ring_delivered
Definition: totemsrp.c:363
unsigned int fcc
Definition: totemsrp.c:215