corosync  2.3.4
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2009 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - encryption of message contents with nss
45  * - authentication of meessage contents with SHA1/HMAC
46  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
47  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
48  */
49 
50 #include <config.h>
51 
52 #include <assert.h>
53 #ifdef HAVE_ALLOCA_H
54 #include <alloca.h>
55 #endif
56 #include <sys/mman.h>
57 #include <sys/types.h>
58 #include <sys/stat.h>
59 #include <sys/socket.h>
60 #include <netdb.h>
61 #include <sys/un.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
66 #include <unistd.h>
67 #include <fcntl.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <errno.h>
71 #include <sched.h>
72 #include <time.h>
73 #include <sys/time.h>
74 #include <sys/poll.h>
75 #include <sys/uio.h>
76 #include <limits.h>
77 
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
81 
82 #include <corosync/swab.h>
83 #include <corosync/sq.h>
84 #include <corosync/list.h>
85 
86 #define LOGSYS_UTILS_ONLY 1
87 #include <corosync/logsys.h>
88 
89 #include "totemsrp.h"
90 #include "totemrrp.h"
91 #include "totemnet.h"
92 
93 #include "cs_queue.h"
94 
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99 #define MAXIOVS 5
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000 /* bytes */
102 #define LEAVE_DUMMY_NODEID 0
103 
104 /*
105  * Rollover handling:
106  * SEQNO_START_MSG is the starting sequence number after a new configuration
107  * This should remain zero, unless testing overflow in which case
108  * 0x7ffff000 and 0xfffff000 are good starting values.
109  *
110  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
111  * for a token. This should remain zero, unless testing overflow in which
112  * case 07fffff00 or 0xffffff00 are good starting values.
113  */
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
116 
117 /*
118  * These can be used ot test different rollover points
119  * #define SEQNO_START_MSG 0xfffffe00
120  * #define SEQNO_START_TOKEN 0xfffffe00
121  */
122 
123 /*
124  * These can be used to test the error recovery algorithms
125  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
126  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
127  * #define TEST_DROP_MCAST_PERCENTAGE 50
128  * #define TEST_RECOVERY_MSG_COUNT 300
129  */
130 
131 /*
132  * we compare incoming messages to determine if their endian is
133  * different - if so convert them
134  *
135  * do not change
136  */
137 #define ENDIAN_LOCAL 0xff22
138 
140  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
141  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
142  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
143  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
144  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
145  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
146 };
147 
151 };
152 
153 /*
154  * New membership algorithm local variables
155  */
157  struct srp_addr addr;
158  int set;
159 };
160 
161 
163  struct list_head list;
164  int (*callback_fn) (enum totem_callback_token_type type, const void *);
166  int delete;
167  void *data;
168 };
169 
170 
172  int mcast;
173  int token;
174 };
175 
176 struct message_header {
177  char type;
178  char encapsulated;
179  unsigned short endian_detector;
180  unsigned int nodeid;
181 } __attribute__((packed));
182 
183 
184 struct mcast {
187  unsigned int seq;
190  unsigned int node_id;
192 } __attribute__((packed));
193 
194 
195 struct rtr_item {
197  unsigned int seq;
198 }__attribute__((packed));
199 
200 
201 struct orf_token {
203  unsigned int seq;
204  unsigned int token_seq;
205  unsigned int aru;
206  unsigned int aru_addr;
208  unsigned int backlog;
209  unsigned int fcc;
212  struct rtr_item rtr_list[0];
213 }__attribute__((packed));
214 
215 
216 struct memb_join {
219  unsigned int proc_list_entries;
220  unsigned int failed_list_entries;
221  unsigned long long ring_seq;
222  unsigned char end_of_memb_join[0];
223 /*
224  * These parts of the data structure are dynamic:
225  * struct srp_addr proc_list[];
226  * struct srp_addr failed_list[];
227  */
228 } __attribute__((packed));
229 
230 
235 } __attribute__((packed));
236 
237 
241 } __attribute__((packed));
242 
243 
246  unsigned int aru;
247  unsigned int high_delivered;
248  unsigned int received_flg;
249 }__attribute__((packed));
250 
251 
254  unsigned int token_seq;
256  unsigned int retrans_flg;
259  unsigned char end_of_commit_token[0];
260 /*
261  * These parts of the data structure are dynamic:
262  *
263  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
264  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
265  */
266 }__attribute__((packed));
267 
268 struct message_item {
269  struct mcast *mcast;
270  unsigned int msg_len;
271 };
272 
274  struct mcast *mcast;
275  unsigned int msg_len;
276 };
277 
283 };
284 
287 
289 
290  /*
291  * Flow control mcasts and remcasts on last and current orf_token
292  */
294 
296 
298 
300 
302 
303  struct srp_addr my_id;
304 
306 
308 
310 
312 
314 
316 
318 
320 
322 
324 
326 
328 
330 
332 
334 
336 
338 
340 
341  unsigned int my_last_aru;
342 
344 
346 
347  unsigned int my_high_seq_received;
348 
349  unsigned int my_install_seq;
350 
352 
354 
356 
358 
360 
361  /*
362  * Queues used to order, deliver, and recover messages
363  */
365 
367 
369 
371 
373 
374  /*
375  * Received up to and including
376  */
377  unsigned int my_aru;
378 
379  unsigned int my_high_delivered;
380 
382 
384 
386 
388 
389  unsigned int my_token_seq;
390 
391  /*
392  * Timers
393  */
394  qb_loop_timer_handle timer_pause_timeout;
395 
396  qb_loop_timer_handle timer_orf_token_timeout;
397 
399 
401 
402  qb_loop_timer_handle timer_merge_detect_timeout;
403 
405 
407 
408  qb_loop_timer_handle memb_timer_state_commit_timeout;
409 
410  qb_loop_timer_handle timer_heartbeat_timeout;
411 
412  /*
413  * Function and data used to log messages
414  */
416 
418 
420 
422 
424 
426 
428 
430  int level,
431  int sybsys,
432  const char *function,
433  const char *file,
434  int line,
435  const char *format, ...)__attribute__((format(printf, 6, 7)));;
436 
438 
439 //TODO struct srp_addr next_memb;
440 
442 
444 
446  unsigned int nodeid,
447  const void *msg,
448  unsigned int msg_len,
449  int endian_conversion_required);
450 
452  enum totem_configuration_type configuration_type,
453  const unsigned int *member_list, size_t member_list_entries,
454  const unsigned int *left_list, size_t left_list_entries,
455  const unsigned int *joined_list, size_t joined_list_entries,
456  const struct memb_ring_id *ring_id);
457 
459 
461  int waiting_trans_ack);
462 
464  struct memb_ring_id *memb_ring_id,
465  const struct totem_ip_address *addr);
466 
468  const struct memb_ring_id *memb_ring_id,
469  const struct totem_ip_address *addr);
470 
472 
474 
475  unsigned long long token_ring_id_seq;
476 
477  unsigned int last_released;
478 
479  unsigned int set_aru;
480 
482 
484 
486 
487  unsigned int my_last_seq;
488 
489  struct timeval tv_old;
490 
492 
494 
495  unsigned int use_heartbeat;
496 
497  unsigned int my_trc;
498 
499  unsigned int my_pbl;
500 
501  unsigned int my_cbl;
502 
503  uint64_t pause_timestamp;
504 
506 
508 
510 
512 
514 
517  char commit_token_storage[40000];
518 };
519 
521  int count;
522  int (*handler_functions[6]) (
523  struct totemsrp_instance *instance,
524  const void *msg,
525  size_t msg_len,
526  int endian_conversion_needed);
527 };
528 
547 };
548 
549 const char* gather_state_from_desc [] = {
550  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
551  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
552  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
553  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
554  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
555  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
556  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
557  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
558  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
559  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
560  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
561  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
562  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
563  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
564  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
565  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
566 };
567 
568 /*
569  * forward decls
570  */
571 static int message_handler_orf_token (
572  struct totemsrp_instance *instance,
573  const void *msg,
574  size_t msg_len,
575  int endian_conversion_needed);
576 
577 static int message_handler_mcast (
578  struct totemsrp_instance *instance,
579  const void *msg,
580  size_t msg_len,
581  int endian_conversion_needed);
582 
583 static int message_handler_memb_merge_detect (
584  struct totemsrp_instance *instance,
585  const void *msg,
586  size_t msg_len,
587  int endian_conversion_needed);
588 
589 static int message_handler_memb_join (
590  struct totemsrp_instance *instance,
591  const void *msg,
592  size_t msg_len,
593  int endian_conversion_needed);
594 
595 static int message_handler_memb_commit_token (
596  struct totemsrp_instance *instance,
597  const void *msg,
598  size_t msg_len,
599  int endian_conversion_needed);
600 
601 static int message_handler_token_hold_cancel (
602  struct totemsrp_instance *instance,
603  const void *msg,
604  size_t msg_len,
605  int endian_conversion_needed);
606 
607 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
608 
609 static unsigned int main_msgs_missing (void);
610 
611 static void main_token_seqid_get (
612  const void *msg,
613  unsigned int *seqid,
614  unsigned int *token_is);
615 
616 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
617 
618 static void srp_addr_to_nodeid (
619  unsigned int *nodeid_out,
620  struct srp_addr *srp_addr_in,
621  unsigned int entries);
622 
623 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
624 
625 static void memb_leave_message_send (struct totemsrp_instance *instance);
626 
627 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
628 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
629 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
630 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
631  int fcc_mcasts_allowed);
632 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
633 
634 static void memb_ring_id_set (struct totemsrp_instance *instance,
635  const struct memb_ring_id *ring_id);
636 static void target_set_completed (void *context);
637 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
638 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
639 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
640 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
641 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
642 static int token_hold_cancel_send (struct totemsrp_instance *instance);
643 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
644 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
645 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
646 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
647 static void memb_merge_detect_endian_convert (
648  const struct memb_merge_detect *in,
649  struct memb_merge_detect *out);
650 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
651 static void timer_function_orf_token_timeout (void *data);
652 static void timer_function_pause_timeout (void *data);
653 static void timer_function_heartbeat_timeout (void *data);
654 static void timer_function_token_retransmit_timeout (void *data);
655 static void timer_function_token_hold_retransmit_timeout (void *data);
656 static void timer_function_merge_detect_timeout (void *data);
657 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
658 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
659 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
660 
661 void main_deliver_fn (
662  void *context,
663  const void *msg,
664  unsigned int msg_len);
665 
667  void *context,
668  const struct totem_ip_address *iface_address,
669  unsigned int iface_no);
670 
672  6,
673  {
674  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
675  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
676  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
677  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
678  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
679  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
680  }
681 };
682 
683 #define log_printf(level, format, args...) \
684 do { \
685  instance->totemsrp_log_printf ( \
686  level, instance->totemsrp_subsys_id, \
687  __FUNCTION__, __FILE__, __LINE__, \
688  format, ##args); \
689 } while (0);
690 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
691 do { \
692  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
693  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
694  instance->totemsrp_log_printf ( \
695  level, instance->totemsrp_subsys_id, \
696  __FUNCTION__, __FILE__, __LINE__, \
697  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
698  } while(0)
699 
700 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
701 {
702  if (0 <= gsfrom && gsfrom <= TOTEMSRP_GSFROM_MAX) {
703  return gather_state_from_desc[gsfrom];
704  }
705  else {
706  return "UNKNOWN";
707  }
708 }
709 
710 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
711 {
712  memset (instance, 0, sizeof (struct totemsrp_instance));
713 
714  list_init (&instance->token_callback_received_listhead);
715 
716  list_init (&instance->token_callback_sent_listhead);
717 
718  instance->my_received_flg = 1;
719 
720  instance->my_token_seq = SEQNO_START_TOKEN - 1;
721 
723 
724  instance->set_aru = -1;
725 
726  instance->my_aru = SEQNO_START_MSG;
727 
729 
731 
732  instance->orf_token_discard = 0;
733 
734  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
735 
736  instance->my_id.no_addrs = INTERFACE_MAX;
737 
738  instance->waiting_trans_ack = 1;
739 }
740 
741 static void main_token_seqid_get (
742  const void *msg,
743  unsigned int *seqid,
744  unsigned int *token_is)
745 {
746  const struct orf_token *token = msg;
747 
748  *seqid = 0;
749  *token_is = 0;
750  if (token->header.type == MESSAGE_TYPE_ORF_TOKEN) {
751  *seqid = token->token_seq;
752  *token_is = 1;
753  }
754 }
755 
756 static unsigned int main_msgs_missing (void)
757 {
758 // TODO
759  return (0);
760 }
761 
762 static int pause_flush (struct totemsrp_instance *instance)
763 {
764  uint64_t now_msec;
765  uint64_t timestamp_msec;
766  int res = 0;
767 
768  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
769  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
770 
771  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
773  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
774  /*
775  * -1 indicates an error from recvmsg
776  */
777  do {
779  } while (res == -1);
780  }
781  return (res);
782 }
783 
784 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
785 {
786  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
787  uint32_t time_now;
788  unsigned long long nano_secs = qb_util_nano_current_get ();
789 
790  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
791 
792  if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
793  /* incr latest token the index */
794  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
795  instance->stats.latest_token = 0;
796  else
797  instance->stats.latest_token++;
798 
799  if (instance->stats.earliest_token == instance->stats.latest_token) {
800  /* we have filled up the array, start overwriting */
801  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
802  instance->stats.earliest_token = 0;
803  else
804  instance->stats.earliest_token++;
805 
806  instance->stats.token[instance->stats.earliest_token].rx = 0;
807  instance->stats.token[instance->stats.earliest_token].tx = 0;
808  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
809  }
810 
811  instance->stats.token[instance->stats.latest_token].rx = time_now;
812  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
813  } else {
814  instance->stats.token[instance->stats.latest_token].tx = time_now;
815  }
816  return 0;
817 }
818 
819 /*
820  * Exported interfaces
821  */
823  qb_loop_t *poll_handle,
824  void **srp_context,
825  struct totem_config *totem_config,
827 
828  void (*deliver_fn) (
829  unsigned int nodeid,
830  const void *msg,
831  unsigned int msg_len,
832  int endian_conversion_required),
833 
834  void (*confchg_fn) (
835  enum totem_configuration_type configuration_type,
836  const unsigned int *member_list, size_t member_list_entries,
837  const unsigned int *left_list, size_t left_list_entries,
838  const unsigned int *joined_list, size_t joined_list_entries,
839  const struct memb_ring_id *ring_id),
840  void (*waiting_trans_ack_cb_fn) (
841  int waiting_trans_ack))
842 {
843  struct totemsrp_instance *instance;
844 
845  instance = malloc (sizeof (struct totemsrp_instance));
846  if (instance == NULL) {
847  goto error_exit;
848  }
849 
850  totemsrp_instance_initialize (instance);
851 
852  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
853  instance->totemsrp_waiting_trans_ack_cb_fn (1);
854 
855  stats->srp = &instance->stats;
856  instance->stats.latest_token = 0;
857  instance->stats.earliest_token = 0;
858 
859  instance->totem_config = totem_config;
860 
861  /*
862  * Configure logging
863  */
872 
873  /*
874  * Configure totem store and load functions
875  */
877  instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
878 
879  /*
880  * Initialize local variables for totemsrp
881  */
882  totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
883 
884  /*
885  * Display totem configuration
886  */
888  "Token Timeout (%d ms) retransmit timeout (%d ms)",
889  totem_config->token_timeout, totem_config->token_retransmit_timeout);
891  "token hold (%d ms) retransmits before loss (%d retrans)",
892  totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
894  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
895  totem_config->join_timeout,
896  totem_config->send_join_timeout,
897  totem_config->consensus_timeout,
898 
899  totem_config->merge_timeout);
901  "downcheck (%d ms) fail to recv const (%d msgs)",
902  totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
904  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
905 
907  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
908  totem_config->window_size, totem_config->max_messages);
909 
911  "missed count const (%d messages)",
912  totem_config->miss_count_const);
913 
915  "send threads (%d threads)", totem_config->threads);
917  "RRP token expired timeout (%d ms)",
918  totem_config->rrp_token_expired_timeout);
920  "RRP token problem counter (%d ms)",
921  totem_config->rrp_problem_count_timeout);
923  "RRP threshold (%d problem count)",
924  totem_config->rrp_problem_count_threshold);
926  "RRP multicast threshold (%d problem count)",
927  totem_config->rrp_problem_count_mcast_threshold);
929  "RRP automatic recovery check timeout (%d ms)",
930  totem_config->rrp_autorecovery_check_timeout);
932  "RRP mode set to %s.", instance->totem_config->rrp_mode);
933 
935  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
937  "max_network_delay (%d ms)", totem_config->max_network_delay);
938 
939 
940  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
941  sizeof (struct message_item), instance->threaded_mode_enabled);
942 
943  sq_init (&instance->regular_sort_queue,
944  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
945 
946  sq_init (&instance->recovery_sort_queue,
947  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
948 
949  instance->totemsrp_poll_handle = poll_handle;
950 
951  instance->totemsrp_deliver_fn = deliver_fn;
952 
953  instance->totemsrp_confchg_fn = confchg_fn;
954  instance->use_heartbeat = 1;
955 
956  timer_function_pause_timeout (instance);
957 
958  if ( totem_config->heartbeat_failures_allowed == 0 ) {
960  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
961  instance->use_heartbeat = 0;
962  }
963 
964  if (instance->use_heartbeat) {
965  instance->heartbeat_timeout
966  = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
967  + totem_config->max_network_delay;
968 
969  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
971  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
972  instance->heartbeat_timeout,
973  totem_config->token_timeout);
975  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
977  "heartbeat timeout should be less than the token timeout. HeartBeat is Diabled !!");
978  instance->use_heartbeat = 0;
979  }
980  else {
982  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
983  }
984  }
985 
987  poll_handle,
988  &instance->totemrrp_context,
989  totem_config,
990  stats->srp,
991  instance,
994  main_token_seqid_get,
995  main_msgs_missing,
996  target_set_completed);
997 
998  /*
999  * Must have net_mtu adjusted by totemrrp_initialize first
1000  */
1001  cs_queue_init (&instance->new_message_queue,
1003  sizeof (struct message_item), instance->threaded_mode_enabled);
1004 
1005  cs_queue_init (&instance->new_message_queue_trans,
1007  sizeof (struct message_item), instance->threaded_mode_enabled);
1008 
1010  &instance->token_recv_event_handle,
1012  0,
1013  token_event_stats_collector,
1014  instance);
1016  &instance->token_sent_event_handle,
1018  0,
1019  token_event_stats_collector,
1020  instance);
1021  *srp_context = instance;
1022  return (0);
1023 
1024 error_exit:
1025  return (-1);
1026 }
1027 
1029  void *srp_context)
1030 {
1031  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1032 
1033 
1034  memb_leave_message_send (instance);
1035  totemrrp_finalize (instance->totemrrp_context);
1036  cs_queue_free (&instance->new_message_queue);
1037  cs_queue_free (&instance->new_message_queue_trans);
1038  cs_queue_free (&instance->retrans_message_queue);
1039  sq_free (&instance->regular_sort_queue);
1040  sq_free (&instance->recovery_sort_queue);
1041  free (instance);
1042 }
1043 
1044 /*
1045  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1046  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1047  * function.
1048  *
1049  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1050  * and if interface was not found, -1 is returned.
1051  */
1053  void *srp_context,
1054  unsigned int nodeid,
1055  struct totem_ip_address *interfaces,
1056  unsigned int interfaces_size,
1057  char ***status,
1058  unsigned int *iface_count)
1059 {
1060  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1061  int res = 0;
1062  unsigned int found = 0;
1063  unsigned int i;
1064 
1065  for (i = 0; i < instance->my_memb_entries; i++) {
1066  if (instance->my_memb_list[i].addr[0].nodeid == nodeid) {
1067  found = 1;
1068  break;
1069  }
1070  }
1071 
1072  if (found) {
1073  *iface_count = instance->totem_config->interface_count;
1074 
1075  if (interfaces_size >= *iface_count) {
1076  memcpy (interfaces, instance->my_memb_list[i].addr,
1077  sizeof (struct totem_ip_address) * *iface_count);
1078  } else {
1079  res = -2;
1080  }
1081 
1082  goto finish;
1083  }
1084 
1085  for (i = 0; i < instance->my_left_memb_entries; i++) {
1086  if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
1087  found = 1;
1088  break;
1089  }
1090  }
1091 
1092  if (found) {
1093  *iface_count = instance->totem_config->interface_count;
1094 
1095  if (interfaces_size >= *iface_count) {
1096  memcpy (interfaces, instance->my_left_memb_list[i].addr,
1097  sizeof (struct totem_ip_address) * *iface_count);
1098  } else {
1099  res = -2;
1100  }
1101  } else {
1102  res = -1;
1103  }
1104 
1105 finish:
1106  totemrrp_ifaces_get (instance->totemrrp_context, status, NULL);
1107  return (res);
1108 }
1109 
1111  void *srp_context,
1112  const char *cipher_type,
1113  const char *hash_type)
1114 {
1115  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1116  int res;
1117 
1118  res = totemrrp_crypto_set(instance->totemrrp_context, cipher_type, hash_type);
1119 
1120  return (res);
1121 }
1122 
1123 
1125  void *srp_context)
1126 {
1127  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1128  unsigned int res;
1129 
1130  res = instance->totem_config->interfaces[0].boundto.nodeid;
1131 
1132  return (res);
1133 }
1134 
1136  void *srp_context)
1137 {
1138  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1139  int res;
1140 
1141  res = instance->totem_config->interfaces[0].boundto.family;
1142 
1143  return (res);
1144 }
1145 
1146 
1148  void *srp_context)
1149 {
1150  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1151 
1153  instance->totem_config->interface_count);
1154 
1155  return (0);
1156 }
1157 
1158 
1159 /*
1160  * Set operations for use by the membership algorithm
1161  */
1162 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1163 {
1164  unsigned int i;
1165  unsigned int res;
1166 
1167  for (i = 0; i < 1; i++) {
1168  res = totemip_equal (&a->addr[i], &b->addr[i]);
1169  if (res == 0) {
1170  return (0);
1171  }
1172  }
1173  return (1);
1174 }
1175 
1176 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1177 {
1178  unsigned int i;
1179 
1180  dest->no_addrs = src->no_addrs;
1181 
1182  for (i = 0; i < INTERFACE_MAX; i++) {
1183  totemip_copy (&dest->addr[i], &src->addr[i]);
1184  }
1185 }
1186 
1187 static void srp_addr_to_nodeid (
1188  unsigned int *nodeid_out,
1189  struct srp_addr *srp_addr_in,
1190  unsigned int entries)
1191 {
1192  unsigned int i;
1193 
1194  for (i = 0; i < entries; i++) {
1195  nodeid_out[i] = srp_addr_in[i].addr[0].nodeid;
1196  }
1197 }
1198 
1199 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1200 {
1201  int i;
1202 
1203  for (i = 0; i < INTERFACE_MAX; i++) {
1204  totemip_copy_endian_convert (&out->addr[i], &in->addr[i]);
1205  }
1206 }
1207 
1208 static void memb_consensus_reset (struct totemsrp_instance *instance)
1209 {
1210  instance->consensus_list_entries = 0;
1211 }
1212 
1213 static void memb_set_subtract (
1214  struct srp_addr *out_list, int *out_list_entries,
1215  struct srp_addr *one_list, int one_list_entries,
1216  struct srp_addr *two_list, int two_list_entries)
1217 {
1218  int found = 0;
1219  int i;
1220  int j;
1221 
1222  *out_list_entries = 0;
1223 
1224  for (i = 0; i < one_list_entries; i++) {
1225  for (j = 0; j < two_list_entries; j++) {
1226  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1227  found = 1;
1228  break;
1229  }
1230  }
1231  if (found == 0) {
1232  srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1233  *out_list_entries = *out_list_entries + 1;
1234  }
1235  found = 0;
1236  }
1237 }
1238 
1239 /*
1240  * Set consensus for a specific processor
1241  */
1242 static void memb_consensus_set (
1243  struct totemsrp_instance *instance,
1244  const struct srp_addr *addr)
1245 {
1246  int found = 0;
1247  int i;
1248 
1249  if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
1250  return;
1251 
1252  for (i = 0; i < instance->consensus_list_entries; i++) {
1253  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1254  found = 1;
1255  break; /* found entry */
1256  }
1257  }
1258  srp_addr_copy (&instance->consensus_list[i].addr, addr);
1259  instance->consensus_list[i].set = 1;
1260  if (found == 0) {
1261  instance->consensus_list_entries++;
1262  }
1263  return;
1264 }
1265 
1266 /*
1267  * Is consensus set for a specific processor
1268  */
1269 static int memb_consensus_isset (
1270  struct totemsrp_instance *instance,
1271  const struct srp_addr *addr)
1272 {
1273  int i;
1274 
1275  for (i = 0; i < instance->consensus_list_entries; i++) {
1276  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1277  return (instance->consensus_list[i].set);
1278  }
1279  }
1280  return (0);
1281 }
1282 
1283 /*
1284  * Is consensus agreed upon based upon consensus database
1285  */
1286 static int memb_consensus_agreed (
1287  struct totemsrp_instance *instance)
1288 {
1289  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1290  int token_memb_entries = 0;
1291  int agreed = 1;
1292  int i;
1293 
1294  memb_set_subtract (token_memb, &token_memb_entries,
1295  instance->my_proc_list, instance->my_proc_list_entries,
1296  instance->my_failed_list, instance->my_failed_list_entries);
1297 
1298  for (i = 0; i < token_memb_entries; i++) {
1299  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1300  agreed = 0;
1301  break;
1302  }
1303  }
1304 
1305  if (agreed && instance->failed_to_recv == 1) {
1306  /*
1307  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1308  * will create single ring anyway.
1309  */
1310 
1311  return (agreed);
1312  }
1313 
1314  assert (token_memb_entries >= 1);
1315 
1316  return (agreed);
1317 }
1318 
1319 static void memb_consensus_notset (
1320  struct totemsrp_instance *instance,
1321  struct srp_addr *no_consensus_list,
1322  int *no_consensus_list_entries,
1323  struct srp_addr *comparison_list,
1324  int comparison_list_entries)
1325 {
1326  int i;
1327 
1328  *no_consensus_list_entries = 0;
1329 
1330  for (i = 0; i < instance->my_proc_list_entries; i++) {
1331  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1332  srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1333  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1334  }
1335  }
1336 }
1337 
1338 /*
1339  * Is set1 equal to set2 Entries can be in different orders
1340  */
1341 static int memb_set_equal (
1342  struct srp_addr *set1, int set1_entries,
1343  struct srp_addr *set2, int set2_entries)
1344 {
1345  int i;
1346  int j;
1347 
1348  int found = 0;
1349 
1350  if (set1_entries != set2_entries) {
1351  return (0);
1352  }
1353  for (i = 0; i < set2_entries; i++) {
1354  for (j = 0; j < set1_entries; j++) {
1355  if (srp_addr_equal (&set1[j], &set2[i])) {
1356  found = 1;
1357  break;
1358  }
1359  }
1360  if (found == 0) {
1361  return (0);
1362  }
1363  found = 0;
1364  }
1365  return (1);
1366 }
1367 
1368 /*
1369  * Is subset fully contained in fullset
1370  */
1371 static int memb_set_subset (
1372  const struct srp_addr *subset, int subset_entries,
1373  const struct srp_addr *fullset, int fullset_entries)
1374 {
1375  int i;
1376  int j;
1377  int found = 0;
1378 
1379  if (subset_entries > fullset_entries) {
1380  return (0);
1381  }
1382  for (i = 0; i < subset_entries; i++) {
1383  for (j = 0; j < fullset_entries; j++) {
1384  if (srp_addr_equal (&subset[i], &fullset[j])) {
1385  found = 1;
1386  }
1387  }
1388  if (found == 0) {
1389  return (0);
1390  }
1391  found = 0;
1392  }
1393  return (1);
1394 }
1395 /*
1396  * merge subset into fullset taking care not to add duplicates
1397  */
1398 static void memb_set_merge (
1399  const struct srp_addr *subset, int subset_entries,
1400  struct srp_addr *fullset, int *fullset_entries)
1401 {
1402  int found = 0;
1403  int i;
1404  int j;
1405 
1406  for (i = 0; i < subset_entries; i++) {
1407  for (j = 0; j < *fullset_entries; j++) {
1408  if (srp_addr_equal (&fullset[j], &subset[i])) {
1409  found = 1;
1410  break;
1411  }
1412  }
1413  if (found == 0) {
1414  srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1415  *fullset_entries = *fullset_entries + 1;
1416  }
1417  found = 0;
1418  }
1419  return;
1420 }
1421 
1422 static void memb_set_and_with_ring_id (
1423  struct srp_addr *set1,
1424  struct memb_ring_id *set1_ring_ids,
1425  int set1_entries,
1426  struct srp_addr *set2,
1427  int set2_entries,
1428  struct memb_ring_id *old_ring_id,
1429  struct srp_addr *and,
1430  int *and_entries)
1431 {
1432  int i;
1433  int j;
1434  int found = 0;
1435 
1436  *and_entries = 0;
1437 
1438  for (i = 0; i < set2_entries; i++) {
1439  for (j = 0; j < set1_entries; j++) {
1440  if (srp_addr_equal (&set1[j], &set2[i])) {
1441  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1442  found = 1;
1443  }
1444  break;
1445  }
1446  }
1447  if (found) {
1448  srp_addr_copy (&and[*and_entries], &set1[j]);
1449  *and_entries = *and_entries + 1;
1450  }
1451  found = 0;
1452  }
1453  return;
1454 }
1455 
1456 #ifdef CODE_COVERAGE
1457 static void memb_set_print (
1458  char *string,
1459  struct srp_addr *list,
1460  int list_entries)
1461 {
1462  int i;
1463  int j;
1464  printf ("List '%s' contains %d entries:\n", string, list_entries);
1465 
1466  for (i = 0; i < list_entries; i++) {
1467  printf ("Address %d with %d rings\n", i, list[i].no_addrs);
1468  for (j = 0; j < list[i].no_addrs; j++) {
1469  printf ("\tiface %d %s\n", j, totemip_print (&list[i].addr[j]));
1470  printf ("\tfamily %d\n", list[i].addr[j].family);
1471  }
1472  }
1473 }
1474 #endif
1475 
1476 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1477 {
1478  assert (instance != NULL);
1479  return totemrrp_buffer_alloc (instance->totemrrp_context);
1480 }
1481 
1482 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1483 {
1484  assert (instance != NULL);
1485  totemrrp_buffer_release (instance->totemrrp_context, ptr);
1486 }
1487 
1488 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1489 {
1490  qb_loop_timer_del (instance->totemsrp_poll_handle,
1492  qb_loop_timer_add (instance->totemsrp_poll_handle,
1493  QB_LOOP_MED,
1494  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1495  (void *)instance,
1496  timer_function_token_retransmit_timeout,
1498 
1499 }
1500 
1501 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1502 {
1503  if (instance->my_merge_detect_timeout_outstanding == 0) {
1504  qb_loop_timer_add (instance->totemsrp_poll_handle,
1505  QB_LOOP_MED,
1506  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1507  (void *)instance,
1508  timer_function_merge_detect_timeout,
1509  &instance->timer_merge_detect_timeout);
1510 
1512  }
1513 }
1514 
1515 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1516 {
1517  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1519 }
1520 
1521 /*
1522  * ring_state_* is used to save and restore the sort queue
1523  * state when a recovery operation fails (and enters gather)
1524  */
1525 static void old_ring_state_save (struct totemsrp_instance *instance)
1526 {
1527  if (instance->old_ring_state_saved == 0) {
1528  instance->old_ring_state_saved = 1;
1529  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1530  sizeof (struct memb_ring_id));
1531  instance->old_ring_state_aru = instance->my_aru;
1534  "Saving state aru %x high seq received %x",
1535  instance->my_aru, instance->my_high_seq_received);
1536  }
1537 }
1538 
1539 static void old_ring_state_restore (struct totemsrp_instance *instance)
1540 {
1541  instance->my_aru = instance->old_ring_state_aru;
1544  "Restoring instance->my_aru %x my high seq received %x",
1545  instance->my_aru, instance->my_high_seq_received);
1546 }
1547 
1548 static void old_ring_state_reset (struct totemsrp_instance *instance)
1549 {
1551  "Resetting old ring state");
1552  instance->old_ring_state_saved = 0;
1553 }
1554 
1555 static void reset_pause_timeout (struct totemsrp_instance *instance)
1556 {
1557  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1558  qb_loop_timer_add (instance->totemsrp_poll_handle,
1559  QB_LOOP_MED,
1560  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1561  (void *)instance,
1562  timer_function_pause_timeout,
1563  &instance->timer_pause_timeout);
1564 }
1565 
1566 static void reset_token_timeout (struct totemsrp_instance *instance) {
1567  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1568  qb_loop_timer_add (instance->totemsrp_poll_handle,
1569  QB_LOOP_MED,
1570  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1571  (void *)instance,
1572  timer_function_orf_token_timeout,
1573  &instance->timer_orf_token_timeout);
1574 }
1575 
1576 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1577  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1578  qb_loop_timer_add (instance->totemsrp_poll_handle,
1579  QB_LOOP_MED,
1580  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1581  (void *)instance,
1582  timer_function_heartbeat_timeout,
1583  &instance->timer_heartbeat_timeout);
1584 }
1585 
1586 
1587 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1588  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1589 }
1590 
1591 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1592  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1593 }
1594 
1595 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1596 {
1597  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1598 }
1599 
1600 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1601 {
1602  qb_loop_timer_add (instance->totemsrp_poll_handle,
1603  QB_LOOP_MED,
1604  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1605  (void *)instance,
1606  timer_function_token_hold_retransmit_timeout,
1608 }
1609 
1610 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1611 {
1612  qb_loop_timer_del (instance->totemsrp_poll_handle,
1614 }
1615 
1616 static void memb_state_consensus_timeout_expired (
1617  struct totemsrp_instance *instance)
1618 {
1619  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1620  int no_consensus_list_entries;
1621 
1622  instance->stats.consensus_timeouts++;
1623  if (memb_consensus_agreed (instance)) {
1624  memb_consensus_reset (instance);
1625 
1626  memb_consensus_set (instance, &instance->my_id);
1627 
1628  reset_token_timeout (instance); // REVIEWED
1629  } else {
1630  memb_consensus_notset (
1631  instance,
1632  no_consensus_list,
1633  &no_consensus_list_entries,
1634  instance->my_proc_list,
1635  instance->my_proc_list_entries);
1636 
1637  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1638  instance->my_failed_list, &instance->my_failed_list_entries);
1639  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1640  }
1641 }
1642 
1643 static void memb_join_message_send (struct totemsrp_instance *instance);
1644 
1645 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1646 
1647 /*
1648  * Timers used for various states of the membership algorithm
1649  */
1650 static void timer_function_pause_timeout (void *data)
1651 {
1652  struct totemsrp_instance *instance = data;
1653 
1654  instance->pause_timestamp = qb_util_nano_current_get ();
1655  reset_pause_timeout (instance);
1656 }
1657 
1658 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1659 {
1660  old_ring_state_restore (instance);
1661  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1662  instance->stats.recovery_token_lost++;
1663 }
1664 
1665 static void timer_function_orf_token_timeout (void *data)
1666 {
1667  struct totemsrp_instance *instance = data;
1668 
1669  switch (instance->memb_state) {
1672  "The token was lost in the OPERATIONAL state.");
1674  "A processor failed, forming new configuration.");
1676  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1677  instance->stats.operational_token_lost++;
1678  break;
1679 
1680  case MEMB_STATE_GATHER:
1682  "The consensus timeout expired.");
1683  memb_state_consensus_timeout_expired (instance);
1684  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1685  instance->stats.gather_token_lost++;
1686  break;
1687 
1688  case MEMB_STATE_COMMIT:
1690  "The token was lost in the COMMIT state.");
1691  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1692  instance->stats.commit_token_lost++;
1693  break;
1694 
1695  case MEMB_STATE_RECOVERY:
1697  "The token was lost in the RECOVERY state.");
1698  memb_recovery_state_token_loss (instance);
1699  instance->orf_token_discard = 1;
1700  break;
1701  }
1702 }
1703 
1704 static void timer_function_heartbeat_timeout (void *data)
1705 {
1706  struct totemsrp_instance *instance = data;
1708  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1709  timer_function_orf_token_timeout(data);
1710 }
1711 
1712 static void memb_timer_function_state_gather (void *data)
1713 {
1714  struct totemsrp_instance *instance = data;
1715 
1716  switch (instance->memb_state) {
1718  case MEMB_STATE_RECOVERY:
1719  assert (0); /* this should never happen */
1720  break;
1721  case MEMB_STATE_GATHER:
1722  case MEMB_STATE_COMMIT:
1723  memb_join_message_send (instance);
1724 
1725  /*
1726  * Restart the join timeout
1727  `*/
1728  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1729 
1730  qb_loop_timer_add (instance->totemsrp_poll_handle,
1731  QB_LOOP_MED,
1732  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1733  (void *)instance,
1734  memb_timer_function_state_gather,
1736  break;
1737  }
1738 }
1739 
1740 static void memb_timer_function_gather_consensus_timeout (void *data)
1741 {
1742  struct totemsrp_instance *instance = data;
1743  memb_state_consensus_timeout_expired (instance);
1744 }
1745 
1746 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1747 {
1748  unsigned int i;
1749  struct sort_queue_item *recovery_message_item;
1750  struct sort_queue_item regular_message_item;
1751  unsigned int range = 0;
1752  int res;
1753  void *ptr;
1754  struct mcast *mcast;
1755 
1757  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1758 
1759  range = instance->my_aru - SEQNO_START_MSG;
1760  /*
1761  * Move messages from recovery to regular sort queue
1762  */
1763 // todo should i be initialized to 0 or 1 ?
1764  for (i = 1; i <= range; i++) {
1765  res = sq_item_get (&instance->recovery_sort_queue,
1766  i + SEQNO_START_MSG, &ptr);
1767  if (res != 0) {
1768  continue;
1769  }
1770  recovery_message_item = ptr;
1771 
1772  /*
1773  * Convert recovery message into regular message
1774  */
1775  mcast = recovery_message_item->mcast;
1776  if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1777  /*
1778  * Message is a recovery message encapsulated
1779  * in a new ring message
1780  */
1781  regular_message_item.mcast =
1782  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1783  regular_message_item.msg_len =
1784  recovery_message_item->msg_len - sizeof (struct mcast);
1785  mcast = regular_message_item.mcast;
1786  } else {
1787  /*
1788  * TODO this case shouldn't happen
1789  */
1790  continue;
1791  }
1792 
1794  "comparing if ring id is for this processors old ring seqno %d",
1795  mcast->seq);
1796 
1797  /*
1798  * Only add this message to the regular sort
1799  * queue if it was originated with the same ring
1800  * id as the previous ring
1801  */
1802  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1803  sizeof (struct memb_ring_id)) == 0) {
1804 
1805  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1806  if (res == 0) {
1807  sq_item_add (&instance->regular_sort_queue,
1808  &regular_message_item, mcast->seq);
1809  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1810  instance->old_ring_state_high_seq_received = mcast->seq;
1811  }
1812  }
1813  } else {
1815  "-not adding msg with seq no %x", mcast->seq);
1816  }
1817  }
1818 }
1819 
1820 /*
1821  * Change states in the state machine of the membership algorithm
1822  */
1823 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1824 {
1825  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1826  int joined_list_entries = 0;
1827  unsigned int aru_save;
1828  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1829  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1830  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1831  unsigned int left_list[PROCESSOR_COUNT_MAX];
1832  unsigned int i;
1833  unsigned int res;
1834  char left_node_msg[1024];
1835  char joined_node_msg[1024];
1836 
1837  memb_consensus_reset (instance);
1838 
1839  old_ring_state_reset (instance);
1840 
1841  deliver_messages_from_recovery_to_regular (instance);
1842 
1844  "Delivering to app %x to %x",
1845  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1846 
1847  aru_save = instance->my_aru;
1848  instance->my_aru = instance->old_ring_state_aru;
1849 
1850  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1851 
1852  /*
1853  * Calculate joined and left list
1854  */
1855  memb_set_subtract (instance->my_left_memb_list,
1856  &instance->my_left_memb_entries,
1857  instance->my_memb_list, instance->my_memb_entries,
1858  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1859 
1860  memb_set_subtract (joined_list, &joined_list_entries,
1861  instance->my_new_memb_list, instance->my_new_memb_entries,
1862  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1863 
1864  /*
1865  * Install new membership
1866  */
1867  instance->my_memb_entries = instance->my_new_memb_entries;
1868  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1869  sizeof (struct srp_addr) * instance->my_memb_entries);
1870  instance->last_released = 0;
1871  instance->my_set_retrans_flg = 0;
1872 
1873  /*
1874  * Inform RRP about transitional change
1875  */
1877  instance->totemrrp_context,
1879  instance->my_trans_memb_list, instance->my_trans_memb_entries,
1880  instance->my_left_memb_list, instance->my_left_memb_entries,
1881  NULL, 0,
1882  &instance->my_ring_id);
1883  /*
1884  * Deliver transitional configuration to application
1885  */
1886  srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
1887  instance->my_left_memb_entries);
1888  srp_addr_to_nodeid (trans_memb_list_totemip,
1889  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1891  trans_memb_list_totemip, instance->my_trans_memb_entries,
1892  left_list, instance->my_left_memb_entries,
1893  0, 0, &instance->my_ring_id);
1894  instance->waiting_trans_ack = 1;
1895  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1896 
1897 // TODO we need to filter to ensure we only deliver those
1898 // messages which are part of instance->my_deliver_memb
1899  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1900 
1901  instance->my_aru = aru_save;
1902 
1903  /*
1904  * Inform RRP about regular membership change
1905  */
1907  instance->totemrrp_context,
1909  instance->my_new_memb_list, instance->my_new_memb_entries,
1910  NULL, 0,
1911  joined_list, joined_list_entries,
1912  &instance->my_ring_id);
1913  /*
1914  * Deliver regular configuration to application
1915  */
1916  srp_addr_to_nodeid (new_memb_list_totemip,
1917  instance->my_new_memb_list, instance->my_new_memb_entries);
1918  srp_addr_to_nodeid (joined_list_totemip, joined_list,
1919  joined_list_entries);
1921  new_memb_list_totemip, instance->my_new_memb_entries,
1922  0, 0,
1923  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
1924 
1925  /*
1926  * The recovery sort queue now becomes the regular
1927  * sort queue. It is necessary to copy the state
1928  * into the regular sort queue.
1929  */
1930  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
1931  instance->my_last_aru = SEQNO_START_MSG;
1932 
1933  /* When making my_proc_list smaller, ensure that the
1934  * now non-used entries are zero-ed out. There are some suspect
1935  * assert's that assume that there is always 2 entries in the list.
1936  * These fail when my_proc_list is reduced to 1 entry (and the
1937  * valid [0] entry is the same as the 'unused' [1] entry).
1938  */
1939  memset(instance->my_proc_list, 0,
1940  sizeof (struct srp_addr) * instance->my_proc_list_entries);
1941 
1942  instance->my_proc_list_entries = instance->my_new_memb_entries;
1943  memcpy (instance->my_proc_list, instance->my_new_memb_list,
1944  sizeof (struct srp_addr) * instance->my_memb_entries);
1945 
1946  instance->my_failed_list_entries = 0;
1947  /*
1948  * TODO Not exactly to spec
1949  *
1950  * At the entry to this function all messages without a gap are
1951  * deliered.
1952  *
1953  * This code throw away messages from the last gap in the sort queue
1954  * to my_high_seq_received
1955  *
1956  * What should really happen is we should deliver all messages up to
1957  * a gap, then delier the transitional configuration, then deliver
1958  * the messages between the first gap and my_high_seq_received, then
1959  * deliver a regular configuration, then deliver the regular
1960  * configuration
1961  *
1962  * Unfortunately totempg doesn't appear to like this operating mode
1963  * which needs more inspection
1964  */
1965  i = instance->my_high_seq_received + 1;
1966  do {
1967  void *ptr;
1968 
1969  i -= 1;
1970  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
1971  if (i == 0) {
1972  break;
1973  }
1974  } while (res);
1975 
1976  instance->my_high_delivered = i;
1977 
1978  for (i = 0; i <= instance->my_high_delivered; i++) {
1979  void *ptr;
1980 
1981  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
1982  if (res == 0) {
1983  struct sort_queue_item *regular_message;
1984 
1985  regular_message = ptr;
1986  free (regular_message->mcast);
1987  }
1988  }
1989  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
1990  instance->last_released = instance->my_high_delivered;
1991 
1992  if (joined_list_entries) {
1993  int sptr = 0;
1994  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
1995  for (i=0; i< joined_list_entries; i++) {
1996  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %d", joined_list_totemip[i]);
1997  }
1998  }
1999  else {
2000  joined_node_msg[0] = '\0';
2001  }
2002 
2003  if (instance->my_left_memb_entries) {
2004  int sptr = 0;
2005  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2006  for (i=0; i< instance->my_left_memb_entries; i++) {
2007  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %d", left_list[i]);
2008  }
2009  }
2010  else {
2011  left_node_msg[0] = '\0';
2012  }
2013 
2015  "entering OPERATIONAL state.");
2017  "A new membership (%s:%lld) was formed. Members%s%s",
2018  totemip_print (&instance->my_ring_id.rep),
2019  instance->my_ring_id.seq,
2020  joined_node_msg,
2021  left_node_msg);
2022  instance->memb_state = MEMB_STATE_OPERATIONAL;
2023 
2024  instance->stats.operational_entered++;
2025  instance->stats.continuous_gather = 0;
2026 
2027  instance->my_received_flg = 1;
2028 
2029  reset_pause_timeout (instance);
2030 
2031  /*
2032  * Save ring id information from this configuration to determine
2033  * which processors are transitioning from old regular configuration
2034  * in to new regular configuration on the next configuration change
2035  */
2036  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2037  sizeof (struct memb_ring_id));
2038 
2039  return;
2040 }
2041 
2042 static void memb_state_gather_enter (
2043  struct totemsrp_instance *instance,
2044  enum gather_state_from gather_from)
2045 {
2046  instance->orf_token_discard = 1;
2047 
2048  memb_set_merge (
2049  &instance->my_id, 1,
2050  instance->my_proc_list, &instance->my_proc_list_entries);
2051 
2052  memb_join_message_send (instance);
2053 
2054  /*
2055  * Restart the join timeout
2056  */
2057  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2058 
2059  qb_loop_timer_add (instance->totemsrp_poll_handle,
2060  QB_LOOP_MED,
2061  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2062  (void *)instance,
2063  memb_timer_function_state_gather,
2065 
2066  /*
2067  * Restart the consensus timeout
2068  */
2069  qb_loop_timer_del (instance->totemsrp_poll_handle,
2071 
2072  qb_loop_timer_add (instance->totemsrp_poll_handle,
2073  QB_LOOP_MED,
2074  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2075  (void *)instance,
2076  memb_timer_function_gather_consensus_timeout,
2078 
2079  /*
2080  * Cancel the token loss and token retransmission timeouts
2081  */
2082  cancel_token_retransmit_timeout (instance); // REVIEWED
2083  cancel_token_timeout (instance); // REVIEWED
2084  cancel_merge_detect_timeout (instance);
2085 
2086  memb_consensus_reset (instance);
2087 
2088  memb_consensus_set (instance, &instance->my_id);
2089 
2091  "entering GATHER state from %d(%s).",
2092  gather_from, gsfrom_to_msg(gather_from));
2093 
2094  instance->memb_state = MEMB_STATE_GATHER;
2095  instance->stats.gather_entered++;
2096 
2097  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2098  /*
2099  * State 3 means gather, so we are continuously gathering.
2100  */
2101  instance->stats.continuous_gather++;
2102  }
2103 
2104  return;
2105 }
2106 
2107 static void timer_function_token_retransmit_timeout (void *data);
2108 
2109 static void target_set_completed (
2110  void *context)
2111 {
2112  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2113 
2114  memb_state_commit_token_send (instance);
2115 
2116 }
2117 
2118 static void memb_state_commit_enter (
2119  struct totemsrp_instance *instance)
2120 {
2121  old_ring_state_save (instance);
2122 
2123  memb_state_commit_token_update (instance);
2124 
2125  memb_state_commit_token_target_set (instance);
2126 
2127  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2128 
2130 
2131  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2132 
2134 
2135  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2136  instance->memb_ring_id_store (&instance->my_ring_id, &instance->my_id.addr[0]);
2137 
2138  instance->token_ring_id_seq = instance->my_ring_id.seq;
2139 
2141  "entering COMMIT state.");
2142 
2143  instance->memb_state = MEMB_STATE_COMMIT;
2144  reset_token_retransmit_timeout (instance); // REVIEWED
2145  reset_token_timeout (instance); // REVIEWED
2146 
2147  instance->stats.commit_entered++;
2148  instance->stats.continuous_gather = 0;
2149 
2150  /*
2151  * reset all flow control variables since we are starting a new ring
2152  */
2153  instance->my_trc = 0;
2154  instance->my_pbl = 0;
2155  instance->my_cbl = 0;
2156  /*
2157  * commit token sent after callback that token target has been set
2158  */
2159 }
2160 
2161 static void memb_state_recovery_enter (
2162  struct totemsrp_instance *instance,
2164 {
2165  int i;
2166  int local_received_flg = 1;
2167  unsigned int low_ring_aru;
2168  unsigned int range = 0;
2169  unsigned int messages_originated = 0;
2170  const struct srp_addr *addr;
2171  struct memb_commit_token_memb_entry *memb_list;
2172  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2173 
2174  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2175  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2176 
2178  "entering RECOVERY state.");
2179 
2180  instance->orf_token_discard = 0;
2181 
2182  instance->my_high_ring_delivered = 0;
2183 
2184  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2185  cs_queue_reinit (&instance->retrans_message_queue);
2186 
2187  low_ring_aru = instance->old_ring_state_high_seq_received;
2188 
2189  memb_state_commit_token_send_recovery (instance, commit_token);
2190 
2191  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2192 
2193  /*
2194  * Build regular configuration
2195  */
2197  instance->totemrrp_context,
2198  commit_token->addr_entries);
2199 
2200  /*
2201  * Build transitional configuration
2202  */
2203  for (i = 0; i < instance->my_new_memb_entries; i++) {
2204  memcpy (&my_new_memb_ring_id_list[i],
2205  &memb_list[i].ring_id,
2206  sizeof (struct memb_ring_id));
2207  }
2208  memb_set_and_with_ring_id (
2209  instance->my_new_memb_list,
2210  my_new_memb_ring_id_list,
2211  instance->my_new_memb_entries,
2212  instance->my_memb_list,
2213  instance->my_memb_entries,
2214  &instance->my_old_ring_id,
2215  instance->my_trans_memb_list,
2216  &instance->my_trans_memb_entries);
2217 
2218  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2220  "TRANS [%d] member %s:", i, totemip_print (&instance->my_trans_memb_list[i].addr[0]));
2221  }
2222  for (i = 0; i < instance->my_new_memb_entries; i++) {
2224  "position [%d] member %s:", i, totemip_print (&addr[i].addr[0]));
2226  "previous ring seq %llx rep %s",
2227  memb_list[i].ring_id.seq,
2228  totemip_print (&memb_list[i].ring_id.rep));
2229 
2231  "aru %x high delivered %x received flag %d",
2232  memb_list[i].aru,
2233  memb_list[i].high_delivered,
2234  memb_list[i].received_flg);
2235 
2236  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2237  }
2238  /*
2239  * Determine if any received flag is false
2240  */
2241  for (i = 0; i < commit_token->addr_entries; i++) {
2242  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2243  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2244 
2245  memb_list[i].received_flg == 0) {
2246  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2247  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2248  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2249  local_received_flg = 0;
2250  break;
2251  }
2252  }
2253  if (local_received_flg == 1) {
2254  goto no_originate;
2255  } /* Else originate messages if we should */
2256 
2257  /*
2258  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2259  */
2260  for (i = 0; i < commit_token->addr_entries; i++) {
2261  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2262  instance->my_deliver_memb_list,
2263  instance->my_deliver_memb_entries) &&
2264 
2265  memcmp (&instance->my_old_ring_id,
2266  &memb_list[i].ring_id,
2267  sizeof (struct memb_ring_id)) == 0) {
2268 
2269  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2270 
2271  low_ring_aru = memb_list[i].aru;
2272  }
2273  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2274  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2275  }
2276  }
2277  }
2278 
2279  /*
2280  * Copy all old ring messages to instance->retrans_message_queue
2281  */
2282  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2283  if (range == 0) {
2284  /*
2285  * No messages to copy
2286  */
2287  goto no_originate;
2288  }
2289  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2290 
2292  "copying all old ring messages from %x-%x.",
2293  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2294 
2295  for (i = 1; i <= range; i++) {
2297  struct message_item message_item;
2298  void *ptr;
2299  int res;
2300 
2301  res = sq_item_get (&instance->regular_sort_queue,
2302  low_ring_aru + i, &ptr);
2303  if (res != 0) {
2304  continue;
2305  }
2306  sort_queue_item = ptr;
2307  messages_originated++;
2308  memset (&message_item, 0, sizeof (struct message_item));
2309  // TODO LEAK
2310  message_item.mcast = totemsrp_buffer_alloc (instance);
2311  assert (message_item.mcast);
2313  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2315  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2316  assert (message_item.mcast->header.nodeid);
2318  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2319  sizeof (struct memb_ring_id));
2320  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2321  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2322  sort_queue_item->mcast,
2323  sort_queue_item->msg_len);
2324  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2325  }
2327  "Originated %d messages in RECOVERY.", messages_originated);
2328  goto originated;
2329 
2330 no_originate:
2332  "Did not need to originate any messages in recovery.");
2333 
2334 originated:
2335  instance->my_aru = SEQNO_START_MSG;
2336  instance->my_aru_count = 0;
2337  instance->my_seq_unchanged = 0;
2339  instance->my_install_seq = SEQNO_START_MSG;
2340  instance->last_released = SEQNO_START_MSG;
2341 
2342  reset_token_timeout (instance); // REVIEWED
2343  reset_token_retransmit_timeout (instance); // REVIEWED
2344 
2345  instance->memb_state = MEMB_STATE_RECOVERY;
2346  instance->stats.recovery_entered++;
2347  instance->stats.continuous_gather = 0;
2348 
2349  return;
2350 }
2351 
2352 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2353 {
2354  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2355 
2356  token_hold_cancel_send (instance);
2357 
2358  return;
2359 }
2360 
2362  void *srp_context,
2363  struct iovec *iovec,
2364  unsigned int iov_len,
2365  int guarantee)
2366 {
2367  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2368  int i;
2369  struct message_item message_item;
2370  char *addr;
2371  unsigned int addr_idx;
2372  struct cs_queue *queue_use;
2373 
2374  if (instance->waiting_trans_ack) {
2375  queue_use = &instance->new_message_queue_trans;
2376  } else {
2377  queue_use = &instance->new_message_queue;
2378  }
2379 
2380  if (cs_queue_is_full (queue_use)) {
2381  log_printf (instance->totemsrp_log_level_debug, "queue full");
2382  return (-1);
2383  }
2384 
2385  memset (&message_item, 0, sizeof (struct message_item));
2386 
2387  /*
2388  * Allocate pending item
2389  */
2390  message_item.mcast = totemsrp_buffer_alloc (instance);
2391  if (message_item.mcast == 0) {
2392  goto error_mcast;
2393  }
2394 
2395  /*
2396  * Set mcast header
2397  */
2398  memset(message_item.mcast, 0, sizeof (struct mcast));
2399  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2400  message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2402  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2403  assert (message_item.mcast->header.nodeid);
2404 
2405  message_item.mcast->guarantee = guarantee;
2406  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2407 
2408  addr = (char *)message_item.mcast;
2409  addr_idx = sizeof (struct mcast);
2410  for (i = 0; i < iov_len; i++) {
2411  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2412  addr_idx += iovec[i].iov_len;
2413  }
2414 
2415  message_item.msg_len = addr_idx;
2416 
2417  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2418  instance->stats.mcast_tx++;
2419  cs_queue_item_add (queue_use, &message_item);
2420 
2421  return (0);
2422 
2423 error_mcast:
2424  return (-1);
2425 }
2426 
2427 /*
2428  * Determine if there is room to queue a new message
2429  */
2430 int totemsrp_avail (void *srp_context)
2431 {
2432  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2433  int avail;
2434  struct cs_queue *queue_use;
2435 
2436  if (instance->waiting_trans_ack) {
2437  queue_use = &instance->new_message_queue_trans;
2438  } else {
2439  queue_use = &instance->new_message_queue;
2440  }
2441  cs_queue_avail (queue_use, &avail);
2442 
2443  return (avail);
2444 }
2445 
2446 /*
2447  * ORF Token Management
2448  */
2449 /*
2450  * Recast message to mcast group if it is available
2451  */
2452 static int orf_token_remcast (
2453  struct totemsrp_instance *instance,
2454  int seq)
2455 {
2456  struct sort_queue_item *sort_queue_item;
2457  int res;
2458  void *ptr;
2459 
2460  struct sq *sort_queue;
2461 
2462  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2463  sort_queue = &instance->recovery_sort_queue;
2464  } else {
2465  sort_queue = &instance->regular_sort_queue;
2466  }
2467 
2468  res = sq_in_range (sort_queue, seq);
2469  if (res == 0) {
2470  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2471  return (-1);
2472  }
2473 
2474  /*
2475  * Get RTR item at seq, if not available, return
2476  */
2477  res = sq_item_get (sort_queue, seq, &ptr);
2478  if (res != 0) {
2479  return -1;
2480  }
2481 
2482  sort_queue_item = ptr;
2483 
2485  instance->totemrrp_context,
2486  sort_queue_item->mcast,
2487  sort_queue_item->msg_len);
2488 
2489  return (0);
2490 }
2491 
2492 
2493 /*
2494  * Free all freeable messages from ring
2495  */
2496 static void messages_free (
2497  struct totemsrp_instance *instance,
2498  unsigned int token_aru)
2499 {
2500  struct sort_queue_item *regular_message;
2501  unsigned int i;
2502  int res;
2503  int log_release = 0;
2504  unsigned int release_to;
2505  unsigned int range = 0;
2506 
2507  release_to = token_aru;
2508  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2509  release_to = instance->my_last_aru;
2510  }
2511  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2512  release_to = instance->my_high_delivered;
2513  }
2514 
2515  /*
2516  * Ensure we dont try release before an already released point
2517  */
2518  if (sq_lt_compare (release_to, instance->last_released)) {
2519  return;
2520  }
2521 
2522  range = release_to - instance->last_released;
2523  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2524 
2525  /*
2526  * Release retransmit list items if group aru indicates they are transmitted
2527  */
2528  for (i = 1; i <= range; i++) {
2529  void *ptr;
2530 
2531  res = sq_item_get (&instance->regular_sort_queue,
2532  instance->last_released + i, &ptr);
2533  if (res == 0) {
2534  regular_message = ptr;
2535  totemsrp_buffer_release (instance, regular_message->mcast);
2536  }
2537  sq_items_release (&instance->regular_sort_queue,
2538  instance->last_released + i);
2539 
2540  log_release = 1;
2541  }
2542  instance->last_released += range;
2543 
2544  if (log_release) {
2546  "releasing messages up to and including %x", release_to);
2547  }
2548 }
2549 
2550 static void update_aru (
2551  struct totemsrp_instance *instance)
2552 {
2553  unsigned int i;
2554  int res;
2555  struct sq *sort_queue;
2556  unsigned int range;
2557  unsigned int my_aru_saved = 0;
2558 
2559  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2560  sort_queue = &instance->recovery_sort_queue;
2561  } else {
2562  sort_queue = &instance->regular_sort_queue;
2563  }
2564 
2565  range = instance->my_high_seq_received - instance->my_aru;
2566 
2567  my_aru_saved = instance->my_aru;
2568  for (i = 1; i <= range; i++) {
2569 
2570  void *ptr;
2571 
2572  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2573  /*
2574  * If hole, stop updating aru
2575  */
2576  if (res != 0) {
2577  break;
2578  }
2579  }
2580  instance->my_aru += i - 1;
2581 }
2582 
2583 /*
2584  * Multicasts pending messages onto the ring (requires orf_token possession)
2585  */
2586 static int orf_token_mcast (
2587  struct totemsrp_instance *instance,
2588  struct orf_token *token,
2589  int fcc_mcasts_allowed)
2590 {
2591  struct message_item *message_item = 0;
2592  struct cs_queue *mcast_queue;
2593  struct sq *sort_queue;
2594  struct sort_queue_item sort_queue_item;
2595  struct mcast *mcast;
2596  unsigned int fcc_mcast_current;
2597 
2598  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2599  mcast_queue = &instance->retrans_message_queue;
2600  sort_queue = &instance->recovery_sort_queue;
2601  reset_token_retransmit_timeout (instance); // REVIEWED
2602  } else {
2603  if (instance->waiting_trans_ack) {
2604  mcast_queue = &instance->new_message_queue_trans;
2605  } else {
2606  mcast_queue = &instance->new_message_queue;
2607  }
2608 
2609  sort_queue = &instance->regular_sort_queue;
2610  }
2611 
2612  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2613  if (cs_queue_is_empty (mcast_queue)) {
2614  break;
2615  }
2616  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2617 
2618  message_item->mcast->seq = ++token->seq;
2619  message_item->mcast->this_seqno = instance->global_seqno++;
2620 
2621  /*
2622  * Build IO vector
2623  */
2624  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2625  sort_queue_item.mcast = message_item->mcast;
2626  sort_queue_item.msg_len = message_item->msg_len;
2627 
2628  mcast = sort_queue_item.mcast;
2629 
2630  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2631 
2632  /*
2633  * Add message to retransmit queue
2634  */
2635  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2636 
2638  instance->totemrrp_context,
2639  message_item->mcast,
2640  message_item->msg_len);
2641 
2642  /*
2643  * Delete item from pending queue
2644  */
2645  cs_queue_item_remove (mcast_queue);
2646 
2647  /*
2648  * If messages mcasted, deliver any new messages to totempg
2649  */
2650  instance->my_high_seq_received = token->seq;
2651  }
2652 
2653  update_aru (instance);
2654 
2655  /*
2656  * Return 1 if more messages are available for single node clusters
2657  */
2658  return (fcc_mcast_current);
2659 }
2660 
2661 /*
2662  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2663  * Modify's orf_token's rtr to include retransmits required by this process
2664  */
2665 static int orf_token_rtr (
2666  struct totemsrp_instance *instance,
2667  struct orf_token *orf_token,
2668  unsigned int *fcc_allowed)
2669 {
2670  unsigned int res;
2671  unsigned int i, j;
2672  unsigned int found;
2673  struct sq *sort_queue;
2674  struct rtr_item *rtr_list;
2675  unsigned int range = 0;
2676  char retransmit_msg[1024];
2677  char value[64];
2678 
2679  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2680  sort_queue = &instance->recovery_sort_queue;
2681  } else {
2682  sort_queue = &instance->regular_sort_queue;
2683  }
2684 
2685  rtr_list = &orf_token->rtr_list[0];
2686 
2687  strcpy (retransmit_msg, "Retransmit List: ");
2688  if (orf_token->rtr_list_entries) {
2690  "Retransmit List %d", orf_token->rtr_list_entries);
2691  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2692  sprintf (value, "%x ", rtr_list[i].seq);
2693  strcat (retransmit_msg, value);
2694  }
2695  strcat (retransmit_msg, "");
2697  "%s", retransmit_msg);
2698  }
2699 
2700  /*
2701  * Retransmit messages on orf_token's RTR list from RTR queue
2702  */
2703  for (instance->fcc_remcast_current = 0, i = 0;
2704  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2705 
2706  /*
2707  * If this retransmit request isn't from this configuration,
2708  * try next rtr entry
2709  */
2710  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2711  sizeof (struct memb_ring_id)) != 0) {
2712 
2713  i += 1;
2714  continue;
2715  }
2716 
2717  res = orf_token_remcast (instance, rtr_list[i].seq);
2718  if (res == 0) {
2719  /*
2720  * Multicasted message, so no need to copy to new retransmit list
2721  */
2722  orf_token->rtr_list_entries -= 1;
2723  assert (orf_token->rtr_list_entries >= 0);
2724  memmove (&rtr_list[i], &rtr_list[i + 1],
2725  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2726 
2727  instance->stats.mcast_retx++;
2728  instance->fcc_remcast_current++;
2729  } else {
2730  i += 1;
2731  }
2732  }
2733  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2734 
2735  /*
2736  * Add messages to retransmit to RTR list
2737  * but only retry if there is room in the retransmit list
2738  */
2739 
2740  range = orf_token->seq - instance->my_aru;
2741  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2742 
2743  for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2744  (i <= range); i++) {
2745 
2746  /*
2747  * Ensure message is within the sort queue range
2748  */
2749  res = sq_in_range (sort_queue, instance->my_aru + i);
2750  if (res == 0) {
2751  break;
2752  }
2753 
2754  /*
2755  * Find if a message is missing from this processor
2756  */
2757  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2758  if (res == 0) {
2759  /*
2760  * Determine how many times we have missed receiving
2761  * this sequence number. sq_item_miss_count increments
2762  * a counter for the sequence number. The miss count
2763  * will be returned and compared. This allows time for
2764  * delayed multicast messages to be received before
2765  * declaring the message is missing and requesting a
2766  * retransmit.
2767  */
2768  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2769  if (res < instance->totem_config->miss_count_const) {
2770  continue;
2771  }
2772 
2773  /*
2774  * Determine if missing message is already in retransmit list
2775  */
2776  found = 0;
2777  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2778  if (instance->my_aru + i == rtr_list[j].seq) {
2779  found = 1;
2780  }
2781  }
2782  if (found == 0) {
2783  /*
2784  * Missing message not found in current retransmit list so add it
2785  */
2786  memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2787  &instance->my_ring_id, sizeof (struct memb_ring_id));
2788  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2789  orf_token->rtr_list_entries++;
2790  }
2791  }
2792  }
2793  return (instance->fcc_remcast_current);
2794 }
2795 
2796 static void token_retransmit (struct totemsrp_instance *instance)
2797 {
2799  instance->orf_token_retransmit,
2800  instance->orf_token_retransmit_size);
2801 }
2802 
2803 /*
2804  * Retransmit the regular token if no mcast or token has
2805  * been received in retransmit token period retransmit
2806  * the token to the next processor
2807  */
2808 static void timer_function_token_retransmit_timeout (void *data)
2809 {
2810  struct totemsrp_instance *instance = data;
2811 
2812  switch (instance->memb_state) {
2813  case MEMB_STATE_GATHER:
2814  break;
2815  case MEMB_STATE_COMMIT:
2817  case MEMB_STATE_RECOVERY:
2818  token_retransmit (instance);
2819  reset_token_retransmit_timeout (instance); // REVIEWED
2820  break;
2821  }
2822 }
2823 
2824 static void timer_function_token_hold_retransmit_timeout (void *data)
2825 {
2826  struct totemsrp_instance *instance = data;
2827 
2828  switch (instance->memb_state) {
2829  case MEMB_STATE_GATHER:
2830  break;
2831  case MEMB_STATE_COMMIT:
2832  break;
2834  case MEMB_STATE_RECOVERY:
2835  token_retransmit (instance);
2836  break;
2837  }
2838 }
2839 
2840 static void timer_function_merge_detect_timeout(void *data)
2841 {
2842  struct totemsrp_instance *instance = data;
2843 
2845 
2846  switch (instance->memb_state) {
2848  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
2849  memb_merge_detect_transmit (instance);
2850  }
2851  break;
2852  case MEMB_STATE_GATHER:
2853  case MEMB_STATE_COMMIT:
2854  case MEMB_STATE_RECOVERY:
2855  break;
2856  }
2857 }
2858 
2859 /*
2860  * Send orf_token to next member (requires orf_token)
2861  */
2862 static int token_send (
2863  struct totemsrp_instance *instance,
2864  struct orf_token *orf_token,
2865  int forward_token)
2866 {
2867  int res = 0;
2868  unsigned int orf_token_size;
2869 
2870  orf_token_size = sizeof (struct orf_token) +
2871  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2872 
2873  orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
2874  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
2875  instance->orf_token_retransmit_size = orf_token_size;
2876  assert (orf_token->header.nodeid);
2877 
2878  if (forward_token == 0) {
2879  return (0);
2880  }
2881 
2883  orf_token,
2884  orf_token_size);
2885 
2886  return (res);
2887 }
2888 
2889 static int token_hold_cancel_send (struct totemsrp_instance *instance)
2890 {
2892 
2893  /*
2894  * Only cancel if the token is currently held
2895  */
2896  if (instance->my_token_held == 0) {
2897  return (0);
2898  }
2899  instance->my_token_held = 0;
2900 
2901  /*
2902  * Build message
2903  */
2908  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
2909  sizeof (struct memb_ring_id));
2910  assert (token_hold_cancel.header.nodeid);
2911 
2912  instance->stats.token_hold_cancel_tx++;
2913 
2915  sizeof (struct token_hold_cancel));
2916 
2917  return (0);
2918 }
2919 
2920 static int orf_token_send_initial (struct totemsrp_instance *instance)
2921 {
2922  struct orf_token orf_token;
2923  int res;
2924 
2925  orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
2926  orf_token.header.endian_detector = ENDIAN_LOCAL;
2927  orf_token.header.encapsulated = 0;
2928  orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
2929  assert (orf_token.header.nodeid);
2930  orf_token.seq = SEQNO_START_MSG;
2931  orf_token.token_seq = SEQNO_START_TOKEN;
2932  orf_token.retrans_flg = 1;
2933  instance->my_set_retrans_flg = 1;
2934  instance->stats.orf_token_tx++;
2935 
2936  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
2937  orf_token.retrans_flg = 0;
2938  instance->my_set_retrans_flg = 0;
2939  } else {
2940  orf_token.retrans_flg = 1;
2941  instance->my_set_retrans_flg = 1;
2942  }
2943 
2944  orf_token.aru = 0;
2945  orf_token.aru = SEQNO_START_MSG - 1;
2946  orf_token.aru_addr = instance->my_id.addr[0].nodeid;
2947 
2948  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2949  orf_token.fcc = 0;
2950  orf_token.backlog = 0;
2951 
2952  orf_token.rtr_list_entries = 0;
2953 
2954  res = token_send (instance, &orf_token, 1);
2955 
2956  return (res);
2957 }
2958 
2959 static void memb_state_commit_token_update (
2960  struct totemsrp_instance *instance)
2961 {
2962  struct srp_addr *addr;
2963  struct memb_commit_token_memb_entry *memb_list;
2964  unsigned int high_aru;
2965  unsigned int i;
2966 
2967  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
2968  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
2969 
2970  memcpy (instance->my_new_memb_list, addr,
2971  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
2972 
2973  instance->my_new_memb_entries = instance->commit_token->addr_entries;
2974 
2975  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
2976  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
2977 
2978  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
2979  /*
2980  * TODO high delivered is really instance->my_aru, but with safe this
2981  * could change?
2982  */
2983  instance->my_received_flg =
2984  (instance->my_aru == instance->my_high_seq_received);
2985 
2986  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
2987 
2988  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
2989  /*
2990  * find high aru up to current memb_index for all matching ring ids
2991  * if any ring id matching memb_index has aru less then high aru set
2992  * received flag for that entry to false
2993  */
2994  high_aru = memb_list[instance->commit_token->memb_index].aru;
2995  for (i = 0; i <= instance->commit_token->memb_index; i++) {
2996  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
2997  &memb_list[i].ring_id,
2998  sizeof (struct memb_ring_id)) == 0) {
2999 
3000  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3001  high_aru = memb_list[i].aru;
3002  }
3003  }
3004  }
3005 
3006  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3007  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3008  &memb_list[i].ring_id,
3009  sizeof (struct memb_ring_id)) == 0) {
3010 
3011  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3012  memb_list[i].received_flg = 0;
3013  if (i == instance->commit_token->memb_index) {
3014  instance->my_received_flg = 0;
3015  }
3016  }
3017  }
3018  }
3019 
3020  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3021  instance->commit_token->memb_index += 1;
3022  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3023  assert (instance->commit_token->header.nodeid);
3024 }
3025 
3026 static void memb_state_commit_token_target_set (
3027  struct totemsrp_instance *instance)
3028 {
3029  struct srp_addr *addr;
3030  unsigned int i;
3031 
3032  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3033 
3034  for (i = 0; i < instance->totem_config->interface_count; i++) {
3036  instance->totemrrp_context,
3037  &addr[instance->commit_token->memb_index %
3038  instance->commit_token->addr_entries].addr[i],
3039  i);
3040  }
3041 }
3042 
3043 static int memb_state_commit_token_send_recovery (
3044  struct totemsrp_instance *instance,
3045  struct memb_commit_token *commit_token)
3046 {
3047  unsigned int commit_token_size;
3048 
3049  commit_token->token_seq++;
3050  commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3051  commit_token_size = sizeof (struct memb_commit_token) +
3052  ((sizeof (struct srp_addr) +
3053  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3054  /*
3055  * Make a copy for retransmission if necessary
3056  */
3057  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3058  instance->orf_token_retransmit_size = commit_token_size;
3059 
3060  instance->stats.memb_commit_token_tx++;
3061 
3063  commit_token,
3064  commit_token_size);
3065 
3066  /*
3067  * Request retransmission of the commit token in case it is lost
3068  */
3069  reset_token_retransmit_timeout (instance);
3070  return (0);
3071 }
3072 
3073 static int memb_state_commit_token_send (
3074  struct totemsrp_instance *instance)
3075 {
3076  unsigned int commit_token_size;
3077 
3078  instance->commit_token->token_seq++;
3079  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3080  commit_token_size = sizeof (struct memb_commit_token) +
3081  ((sizeof (struct srp_addr) +
3082  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3083  /*
3084  * Make a copy for retransmission if necessary
3085  */
3086  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3087  instance->orf_token_retransmit_size = commit_token_size;
3088 
3089  instance->stats.memb_commit_token_tx++;
3090 
3092  instance->commit_token,
3093  commit_token_size);
3094 
3095  /*
3096  * Request retransmission of the commit token in case it is lost
3097  */
3098  reset_token_retransmit_timeout (instance);
3099  return (0);
3100 }
3101 
3102 
3103 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3104 {
3105  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3106  int token_memb_entries = 0;
3107  int i;
3108  struct totem_ip_address *lowest_addr;
3109 
3110  memb_set_subtract (token_memb, &token_memb_entries,
3111  instance->my_proc_list, instance->my_proc_list_entries,
3112  instance->my_failed_list, instance->my_failed_list_entries);
3113 
3114  /*
3115  * find representative by searching for smallest identifier
3116  */
3117 
3118  lowest_addr = &token_memb[0].addr[0];
3119  for (i = 1; i < token_memb_entries; i++) {
3120  if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
3121  totemip_copy (lowest_addr, &token_memb[i].addr[0]);
3122  }
3123  }
3124  return (totemip_compare (lowest_addr, &instance->my_id.addr[0]) == 0);
3125 }
3126 
3127 static int srp_addr_compare (const void *a, const void *b)
3128 {
3129  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3130  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3131 
3132  return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
3133 }
3134 
3135 static void memb_state_commit_token_create (
3136  struct totemsrp_instance *instance)
3137 {
3138  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3139  struct srp_addr *addr;
3140  struct memb_commit_token_memb_entry *memb_list;
3141  int token_memb_entries = 0;
3142 
3144  "Creating commit token because I am the rep.");
3145 
3146  memb_set_subtract (token_memb, &token_memb_entries,
3147  instance->my_proc_list, instance->my_proc_list_entries,
3148  instance->my_failed_list, instance->my_failed_list_entries);
3149 
3150  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3153  instance->commit_token->header.encapsulated = 0;
3154  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3155  assert (instance->commit_token->header.nodeid);
3156 
3157  totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
3158 
3159  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3160 
3161  /*
3162  * This qsort is necessary to ensure the commit token traverses
3163  * the ring in the proper order
3164  */
3165  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3166  srp_addr_compare);
3167 
3168  instance->commit_token->memb_index = 0;
3169  instance->commit_token->addr_entries = token_memb_entries;
3170 
3171  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3172  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3173 
3174  memcpy (addr, token_memb,
3175  token_memb_entries * sizeof (struct srp_addr));
3176  memset (memb_list, 0,
3177  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3178 }
3179 
3180 static void memb_join_message_send (struct totemsrp_instance *instance)
3181 {
3182  char memb_join_data[40000];
3183  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3184  char *addr;
3185  unsigned int addr_idx;
3186 
3187  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3188  memb_join->header.endian_detector = ENDIAN_LOCAL;
3189  memb_join->header.encapsulated = 0;
3190  memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
3191  assert (memb_join->header.nodeid);
3192 
3193  memb_join->ring_seq = instance->my_ring_id.seq;
3194  memb_join->proc_list_entries = instance->my_proc_list_entries;
3195  memb_join->failed_list_entries = instance->my_failed_list_entries;
3196  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3197 
3198  /*
3199  * This mess adds the joined and failed processor lists into the join
3200  * message
3201  */
3202  addr = (char *)memb_join;
3203  addr_idx = sizeof (struct memb_join);
3204  memcpy (&addr[addr_idx],
3205  instance->my_proc_list,
3206  instance->my_proc_list_entries *
3207  sizeof (struct srp_addr));
3208  addr_idx +=
3209  instance->my_proc_list_entries *
3210  sizeof (struct srp_addr);
3211  memcpy (&addr[addr_idx],
3212  instance->my_failed_list,
3213  instance->my_failed_list_entries *
3214  sizeof (struct srp_addr));
3215  addr_idx +=
3216  instance->my_failed_list_entries *
3217  sizeof (struct srp_addr);
3218 
3219 
3220  if (instance->totem_config->send_join_timeout) {
3221  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3222  }
3223 
3224  instance->stats.memb_join_tx++;
3225 
3227  instance->totemrrp_context,
3228  memb_join,
3229  addr_idx);
3230 }
3231 
3232 static void memb_leave_message_send (struct totemsrp_instance *instance)
3233 {
3234  char memb_join_data[40000];
3235  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3236  char *addr;
3237  unsigned int addr_idx;
3238  int active_memb_entries;
3239  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3240 
3242  "sending join/leave message");
3243 
3244  /*
3245  * add us to the failed list, and remove us from
3246  * the members list
3247  */
3248  memb_set_merge(
3249  &instance->my_id, 1,
3250  instance->my_failed_list, &instance->my_failed_list_entries);
3251 
3252  memb_set_subtract (active_memb, &active_memb_entries,
3253  instance->my_proc_list, instance->my_proc_list_entries,
3254  &instance->my_id, 1);
3255 
3256 
3257  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3258  memb_join->header.endian_detector = ENDIAN_LOCAL;
3259  memb_join->header.encapsulated = 0;
3260  memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3261 
3262  memb_join->ring_seq = instance->my_ring_id.seq;
3263  memb_join->proc_list_entries = active_memb_entries;
3264  memb_join->failed_list_entries = instance->my_failed_list_entries;
3265  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3266  memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
3267 
3268  // TODO: CC Maybe use the actual join send routine.
3269  /*
3270  * This mess adds the joined and failed processor lists into the join
3271  * message
3272  */
3273  addr = (char *)memb_join;
3274  addr_idx = sizeof (struct memb_join);
3275  memcpy (&addr[addr_idx],
3276  active_memb,
3277  active_memb_entries *
3278  sizeof (struct srp_addr));
3279  addr_idx +=
3280  active_memb_entries *
3281  sizeof (struct srp_addr);
3282  memcpy (&addr[addr_idx],
3283  instance->my_failed_list,
3284  instance->my_failed_list_entries *
3285  sizeof (struct srp_addr));
3286  addr_idx +=
3287  instance->my_failed_list_entries *
3288  sizeof (struct srp_addr);
3289 
3290 
3291  if (instance->totem_config->send_join_timeout) {
3292  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3293  }
3294  instance->stats.memb_join_tx++;
3295 
3297  instance->totemrrp_context,
3298  memb_join,
3299  addr_idx);
3300 }
3301 
3302 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3303 {
3305 
3310  srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3311  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3312  sizeof (struct memb_ring_id));
3313  assert (memb_merge_detect.header.nodeid);
3314 
3315  instance->stats.memb_merge_detect_tx++;
3318  sizeof (struct memb_merge_detect));
3319 }
3320 
3321 static void memb_ring_id_set (
3322  struct totemsrp_instance *instance,
3323  const struct memb_ring_id *ring_id)
3324 {
3325 
3326  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3327 }
3328 
3330  void *srp_context,
3331  void **handle_out,
3332  enum totem_callback_token_type type,
3333  int delete,
3334  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3335  const void *data)
3336 {
3337  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3338  struct token_callback_instance *callback_handle;
3339 
3340  token_hold_cancel_send (instance);
3341 
3342  callback_handle = malloc (sizeof (struct token_callback_instance));
3343  if (callback_handle == 0) {
3344  return (-1);
3345  }
3346  *handle_out = (void *)callback_handle;
3347  list_init (&callback_handle->list);
3348  callback_handle->callback_fn = callback_fn;
3349  callback_handle->data = (void *) data;
3350  callback_handle->callback_type = type;
3351  callback_handle->delete = delete;
3352  switch (type) {
3354  list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3355  break;
3357  list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3358  break;
3359  }
3360 
3361  return (0);
3362 }
3363 
3364 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3365 {
3366  struct token_callback_instance *h;
3367 
3368  if (*handle_out) {
3369  h = (struct token_callback_instance *)*handle_out;
3370  list_del (&h->list);
3371  free (h);
3372  h = NULL;
3373  *handle_out = 0;
3374  }
3375 }
3376 
3377 static void token_callbacks_execute (
3378  struct totemsrp_instance *instance,
3379  enum totem_callback_token_type type)
3380 {
3381  struct list_head *list;
3382  struct list_head *list_next;
3383  struct list_head *callback_listhead = 0;
3385  int res;
3386  int del;
3387 
3388  switch (type) {
3390  callback_listhead = &instance->token_callback_received_listhead;
3391  break;
3393  callback_listhead = &instance->token_callback_sent_listhead;
3394  break;
3395  default:
3396  assert (0);
3397  }
3398 
3399  for (list = callback_listhead->next; list != callback_listhead;
3400  list = list_next) {
3401 
3402  token_callback_instance = list_entry (list, struct token_callback_instance, list);
3403 
3404  list_next = list->next;
3405  del = token_callback_instance->delete;
3406  if (del == 1) {
3407  list_del (list);
3408  }
3409 
3410  res = token_callback_instance->callback_fn (
3411  token_callback_instance->callback_type,
3412  token_callback_instance->data);
3413  /*
3414  * This callback failed to execute, try it again on the next token
3415  */
3416  if (res == -1 && del == 1) {
3417  list_add (list, callback_listhead);
3418  } else if (del) {
3419  free (token_callback_instance);
3420  }
3421  }
3422 }
3423 
3424 /*
3425  * Flow control functions
3426  */
3427 static unsigned int backlog_get (struct totemsrp_instance *instance)
3428 {
3429  unsigned int backlog = 0;
3430  struct cs_queue *queue_use = NULL;
3431 
3432  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3433  if (instance->waiting_trans_ack) {
3434  queue_use = &instance->new_message_queue_trans;
3435  } else {
3436  queue_use = &instance->new_message_queue;
3437  }
3438  } else
3439  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3440  queue_use = &instance->retrans_message_queue;
3441  }
3442 
3443  if (queue_use != NULL) {
3444  backlog = cs_queue_used (queue_use);
3445  }
3446 
3447  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3448  return (backlog);
3449 }
3450 
3451 static int fcc_calculate (
3452  struct totemsrp_instance *instance,
3453  struct orf_token *token)
3454 {
3455  unsigned int transmits_allowed;
3456  unsigned int backlog_calc;
3457 
3458  transmits_allowed = instance->totem_config->max_messages;
3459 
3460  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3461  transmits_allowed = instance->totem_config->window_size - token->fcc;
3462  }
3463 
3464  instance->my_cbl = backlog_get (instance);
3465 
3466  /*
3467  * Only do backlog calculation if there is a backlog otherwise
3468  * we would result in div by zero
3469  */
3470  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3471  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3472  (token->backlog + instance->my_cbl - instance->my_pbl);
3473  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3474  transmits_allowed = backlog_calc;
3475  }
3476  }
3477 
3478  return (transmits_allowed);
3479 }
3480 
3481 /*
3482  * don't overflow the RTR sort queue
3483  */
3484 static void fcc_rtr_limit (
3485  struct totemsrp_instance *instance,
3486  struct orf_token *token,
3487  unsigned int *transmits_allowed)
3488 {
3489  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3490  check -= (*transmits_allowed + instance->totem_config->window_size);
3491  assert (check >= 0);
3492  if (sq_lt_compare (instance->last_released +
3493  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3494  instance->totem_config->window_size,
3495 
3496  token->seq)) {
3497 
3498  *transmits_allowed = 0;
3499  }
3500 }
3501 
3502 static void fcc_token_update (
3503  struct totemsrp_instance *instance,
3504  struct orf_token *token,
3505  unsigned int msgs_transmitted)
3506 {
3507  token->fcc += msgs_transmitted - instance->my_trc;
3508  token->backlog += instance->my_cbl - instance->my_pbl;
3509  instance->my_trc = msgs_transmitted;
3510  instance->my_pbl = instance->my_cbl;
3511 }
3512 
3513 /*
3514  * Message Handlers
3515  */
3516 
3517 unsigned long long int tv_old;
3518 /*
3519  * message handler called when TOKEN message type received
3520  */
3521 static int message_handler_orf_token (
3522  struct totemsrp_instance *instance,
3523  const void *msg,
3524  size_t msg_len,
3525  int endian_conversion_needed)
3526 {
3527  char token_storage[1500];
3528  char token_convert[1500];
3529  struct orf_token *token = NULL;
3530  int forward_token;
3531  unsigned int transmits_allowed;
3532  unsigned int mcasted_retransmit;
3533  unsigned int mcasted_regular;
3534  unsigned int last_aru;
3535 
3536 #ifdef GIVEINFO
3537  unsigned long long tv_current;
3538  unsigned long long tv_diff;
3539 
3540  tv_current = qb_util_nano_current_get ();
3541  tv_diff = tv_current - tv_old;
3542  tv_old = tv_current;
3543 
3545  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3546 #endif
3547 
3548  if (instance->orf_token_discard) {
3549  return (0);
3550  }
3551 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3552  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3553  return (0);
3554  }
3555 #endif
3556 
3557  if (endian_conversion_needed) {
3558  orf_token_endian_convert ((struct orf_token *)msg,
3559  (struct orf_token *)token_convert);
3560  msg = (struct orf_token *)token_convert;
3561  }
3562 
3563  /*
3564  * Make copy of token and retransmit list in case we have
3565  * to flush incoming messages from the kernel queue
3566  */
3567  token = (struct orf_token *)token_storage;
3568  memcpy (token, msg, sizeof (struct orf_token));
3569  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3570  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3571 
3572 
3573  /*
3574  * Handle merge detection timeout
3575  */
3576  if (token->seq == instance->my_last_seq) {
3577  start_merge_detect_timeout (instance);
3578  instance->my_seq_unchanged += 1;
3579  } else {
3580  cancel_merge_detect_timeout (instance);
3581  cancel_token_hold_retransmit_timeout (instance);
3582  instance->my_seq_unchanged = 0;
3583  }
3584 
3585  instance->my_last_seq = token->seq;
3586 
3587 #ifdef TEST_RECOVERY_MSG_COUNT
3588  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3589  return (0);
3590  }
3591 #endif
3592 
3594 
3595  /*
3596  * Determine if we should hold (in reality drop) the token
3597  */
3598  instance->my_token_held = 0;
3599  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
3600  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3601  instance->my_token_held = 1;
3602  } else
3603  if (!totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
3604  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3605  instance->my_token_held = 1;
3606  }
3607 
3608  /*
3609  * Hold onto token when there is no activity on ring and
3610  * this processor is the ring rep
3611  */
3612  forward_token = 1;
3613  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
3614  if (instance->my_token_held) {
3615  forward_token = 0;
3616  }
3617  }
3618 
3619  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3620 
3621  switch (instance->memb_state) {
3622  case MEMB_STATE_COMMIT:
3623  /* Discard token */
3624  break;
3625 
3627  messages_free (instance, token->aru);
3628  /*
3629  * Do NOT add break, this case should also execute code in gather case.
3630  */
3631 
3632  case MEMB_STATE_GATHER:
3633  /*
3634  * DO NOT add break, we use different free mechanism in recovery state
3635  */
3636 
3637  case MEMB_STATE_RECOVERY:
3638  /*
3639  * Discard tokens from another configuration
3640  */
3641  if (memcmp (&token->ring_id, &instance->my_ring_id,
3642  sizeof (struct memb_ring_id)) != 0) {
3643 
3644  if ((forward_token)
3645  && instance->use_heartbeat) {
3646  reset_heartbeat_timeout(instance);
3647  }
3648  else {
3649  cancel_heartbeat_timeout(instance);
3650  }
3651 
3652  return (0); /* discard token */
3653  }
3654 
3655  /*
3656  * Discard retransmitted tokens
3657  */
3658  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3659  return (0); /* discard token */
3660  }
3661  last_aru = instance->my_last_aru;
3662  instance->my_last_aru = token->aru;
3663 
3664  transmits_allowed = fcc_calculate (instance, token);
3665  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3666 
3667  if (instance->my_token_held == 1 &&
3668  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3669  instance->my_token_held = 0;
3670  forward_token = 1;
3671  }
3672 
3673  fcc_rtr_limit (instance, token, &transmits_allowed);
3674  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3675 /*
3676 if (mcasted_regular) {
3677 printf ("mcasted regular %d\n", mcasted_regular);
3678 printf ("token seq %d\n", token->seq);
3679 }
3680 */
3681  fcc_token_update (instance, token, mcasted_retransmit +
3682  mcasted_regular);
3683 
3684  if (sq_lt_compare (instance->my_aru, token->aru) ||
3685  instance->my_id.addr[0].nodeid == token->aru_addr ||
3686  token->aru_addr == 0) {
3687 
3688  token->aru = instance->my_aru;
3689  if (token->aru == token->seq) {
3690  token->aru_addr = 0;
3691  } else {
3692  token->aru_addr = instance->my_id.addr[0].nodeid;
3693  }
3694  }
3695  if (token->aru == last_aru && token->aru_addr != 0) {
3696  instance->my_aru_count += 1;
3697  } else {
3698  instance->my_aru_count = 0;
3699  }
3700 
3701  /*
3702  * We really don't follow specification there. In specification, OTHER nodes
3703  * detect failure of one node (based on aru_count) and my_id IS NEVER added
3704  * to failed list (so node never mark itself as failed)
3705  */
3706  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
3707  token->aru_addr == instance->my_id.addr[0].nodeid) {
3708 
3710  "FAILED TO RECEIVE");
3711 
3712  instance->failed_to_recv = 1;
3713 
3714  memb_set_merge (&instance->my_id, 1,
3715  instance->my_failed_list,
3716  &instance->my_failed_list_entries);
3717 
3718  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
3719  } else {
3720  instance->my_token_seq = token->token_seq;
3721  token->token_seq += 1;
3722 
3723  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3724  /*
3725  * instance->my_aru == instance->my_high_seq_received means this processor
3726  * has recovered all messages it can recover
3727  * (ie: its retrans queue is empty)
3728  */
3729  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
3730 
3731  if (token->retrans_flg == 0) {
3732  token->retrans_flg = 1;
3733  instance->my_set_retrans_flg = 1;
3734  }
3735  } else
3736  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
3737  token->retrans_flg = 0;
3738  instance->my_set_retrans_flg = 0;
3739  }
3741  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
3742  token->retrans_flg, instance->my_set_retrans_flg,
3743  cs_queue_is_empty (&instance->retrans_message_queue),
3744  instance->my_retrans_flg_count, token->aru);
3745  if (token->retrans_flg == 0) {
3746  instance->my_retrans_flg_count += 1;
3747  } else {
3748  instance->my_retrans_flg_count = 0;
3749  }
3750  if (instance->my_retrans_flg_count == 2) {
3751  instance->my_install_seq = token->seq;
3752  }
3754  "install seq %x aru %x high seq received %x",
3755  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
3756  if (instance->my_retrans_flg_count >= 2 &&
3757  instance->my_received_flg == 0 &&
3758  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
3759  instance->my_received_flg = 1;
3760  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
3761  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
3762  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
3763  }
3764  if (instance->my_retrans_flg_count >= 3 &&
3765  sq_lte_compare (instance->my_install_seq, token->aru)) {
3766  instance->my_rotation_counter += 1;
3767  } else {
3768  instance->my_rotation_counter = 0;
3769  }
3770  if (instance->my_rotation_counter == 2) {
3772  "retrans flag count %x token aru %x install seq %x aru %x %x",
3773  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
3774  instance->my_aru, token->seq);
3775 
3776  memb_state_operational_enter (instance);
3777  instance->my_rotation_counter = 0;
3778  instance->my_retrans_flg_count = 0;
3779  }
3780  }
3781 
3783  token_send (instance, token, forward_token);
3784 
3785 #ifdef GIVEINFO
3786  tv_current = qb_util_nano_current_get ();
3787  tv_diff = tv_current - tv_old;
3788  tv_old = tv_current;
3790  "I held %0.4f ms",
3791  ((float)tv_diff) / 1000000.0);
3792 #endif
3793  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3794  messages_deliver_to_app (instance, 0,
3795  instance->my_high_seq_received);
3796  }
3797 
3798  /*
3799  * Deliver messages after token has been transmitted
3800  * to improve performance
3801  */
3802  reset_token_timeout (instance); // REVIEWED
3803  reset_token_retransmit_timeout (instance); // REVIEWED
3804  if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) &&
3805  instance->my_token_held == 1) {
3806 
3807  start_token_hold_retransmit_timeout (instance);
3808  }
3809 
3810  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
3811  }
3812  break;
3813  }
3814 
3815  if ((forward_token)
3816  && instance->use_heartbeat) {
3817  reset_heartbeat_timeout(instance);
3818  }
3819  else {
3820  cancel_heartbeat_timeout(instance);
3821  }
3822 
3823  return (0);
3824 }
3825 
3826 static void messages_deliver_to_app (
3827  struct totemsrp_instance *instance,
3828  int skip,
3829  unsigned int end_point)
3830 {
3831  struct sort_queue_item *sort_queue_item_p;
3832  unsigned int i;
3833  int res;
3834  struct mcast *mcast_in;
3835  struct mcast mcast_header;
3836  unsigned int range = 0;
3837  int endian_conversion_required;
3838  unsigned int my_high_delivered_stored = 0;
3839 
3840 
3841  range = end_point - instance->my_high_delivered;
3842 
3843  if (range) {
3845  "Delivering %x to %x", instance->my_high_delivered,
3846  end_point);
3847  }
3848  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
3849  my_high_delivered_stored = instance->my_high_delivered;
3850 
3851  /*
3852  * Deliver messages in order from rtr queue to pending delivery queue
3853  */
3854  for (i = 1; i <= range; i++) {
3855 
3856  void *ptr = 0;
3857 
3858  /*
3859  * If out of range of sort queue, stop assembly
3860  */
3861  res = sq_in_range (&instance->regular_sort_queue,
3862  my_high_delivered_stored + i);
3863  if (res == 0) {
3864  break;
3865  }
3866 
3867  res = sq_item_get (&instance->regular_sort_queue,
3868  my_high_delivered_stored + i, &ptr);
3869  /*
3870  * If hole, stop assembly
3871  */
3872  if (res != 0 && skip == 0) {
3873  break;
3874  }
3875 
3876  instance->my_high_delivered = my_high_delivered_stored + i;
3877 
3878  if (res != 0) {
3879  continue;
3880 
3881  }
3882 
3883  sort_queue_item_p = ptr;
3884 
3885  mcast_in = sort_queue_item_p->mcast;
3886  assert (mcast_in != (struct mcast *)0xdeadbeef);
3887 
3888  endian_conversion_required = 0;
3889  if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
3890  endian_conversion_required = 1;
3891  mcast_endian_convert (mcast_in, &mcast_header);
3892  } else {
3893  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
3894  }
3895 
3896  /*
3897  * Skip messages not originated in instance->my_deliver_memb
3898  */
3899  if (skip &&
3900  memb_set_subset (&mcast_header.system_from,
3901  1,
3902  instance->my_deliver_memb_list,
3903  instance->my_deliver_memb_entries) == 0) {
3904 
3905  instance->my_high_delivered = my_high_delivered_stored + i;
3906 
3907  continue;
3908  }
3909 
3910  /*
3911  * Message found
3912  */
3914  "Delivering MCAST message with seq %x to pending delivery queue",
3915  mcast_header.seq);
3916 
3917  /*
3918  * Message is locally originated multicast
3919  */
3920  instance->totemsrp_deliver_fn (
3921  mcast_header.header.nodeid,
3922  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
3923  sort_queue_item_p->msg_len - sizeof (struct mcast),
3924  endian_conversion_required);
3925  }
3926 }
3927 
3928 /*
3929  * recv message handler called when MCAST message type received
3930  */
3931 static int message_handler_mcast (
3932  struct totemsrp_instance *instance,
3933  const void *msg,
3934  size_t msg_len,
3935  int endian_conversion_needed)
3936 {
3937  struct sort_queue_item sort_queue_item;
3938  struct sq *sort_queue;
3939  struct mcast mcast_header;
3940 
3941 
3942  if (endian_conversion_needed) {
3943  mcast_endian_convert (msg, &mcast_header);
3944  } else {
3945  memcpy (&mcast_header, msg, sizeof (struct mcast));
3946  }
3947 
3948  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
3949  sort_queue = &instance->recovery_sort_queue;
3950  } else {
3951  sort_queue = &instance->regular_sort_queue;
3952  }
3953 
3954  assert (msg_len <= FRAME_SIZE_MAX);
3955 
3956 #ifdef TEST_DROP_MCAST_PERCENTAGE
3957  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
3958  return (0);
3959  }
3960 #endif
3961 
3962  /*
3963  * If the message is foreign execute the switch below
3964  */
3965  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
3966  sizeof (struct memb_ring_id)) != 0) {
3967 
3968  switch (instance->memb_state) {
3970  memb_set_merge (
3971  &mcast_header.system_from, 1,
3972  instance->my_proc_list, &instance->my_proc_list_entries);
3973  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
3974  break;
3975 
3976  case MEMB_STATE_GATHER:
3977  if (!memb_set_subset (
3978  &mcast_header.system_from,
3979  1,
3980  instance->my_proc_list,
3981  instance->my_proc_list_entries)) {
3982 
3983  memb_set_merge (&mcast_header.system_from, 1,
3984  instance->my_proc_list, &instance->my_proc_list_entries);
3985  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
3986  return (0);
3987  }
3988  break;
3989 
3990  case MEMB_STATE_COMMIT:
3991  /* discard message */
3992  instance->stats.rx_msg_dropped++;
3993  break;
3994 
3995  case MEMB_STATE_RECOVERY:
3996  /* discard message */
3997  instance->stats.rx_msg_dropped++;
3998  break;
3999  }
4000  return (0);
4001  }
4002 
4004  "Received ringid(%s:%lld) seq %x",
4005  totemip_print (&mcast_header.ring_id.rep),
4006  mcast_header.ring_id.seq,
4007  mcast_header.seq);
4008 
4009  /*
4010  * Add mcast message to rtr queue if not already in rtr queue
4011  * otherwise free io vectors
4012  */
4013  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4014  sq_in_range (sort_queue, mcast_header.seq) &&
4015  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4016 
4017  /*
4018  * Allocate new multicast memory block
4019  */
4020 // TODO LEAK
4021  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4022  if (sort_queue_item.mcast == NULL) {
4023  return (-1); /* error here is corrected by the algorithm */
4024  }
4025  memcpy (sort_queue_item.mcast, msg, msg_len);
4026  sort_queue_item.msg_len = msg_len;
4027 
4028  if (sq_lt_compare (instance->my_high_seq_received,
4029  mcast_header.seq)) {
4030  instance->my_high_seq_received = mcast_header.seq;
4031  }
4032 
4033  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4034  }
4035 
4036  update_aru (instance);
4037  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4038  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4039  }
4040 
4041 /* TODO remove from retrans message queue for old ring in recovery state */
4042  return (0);
4043 }
4044 
4045 static int message_handler_memb_merge_detect (
4046  struct totemsrp_instance *instance,
4047  const void *msg,
4048  size_t msg_len,
4049  int endian_conversion_needed)
4050 {
4052 
4053 
4054  if (endian_conversion_needed) {
4055  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4056  } else {
4057  memcpy (&memb_merge_detect, msg,
4058  sizeof (struct memb_merge_detect));
4059  }
4060 
4061  /*
4062  * do nothing if this is a merge detect from this configuration
4063  */
4064  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4065  sizeof (struct memb_ring_id)) == 0) {
4066 
4067  return (0);
4068  }
4069 
4070  /*
4071  * Execute merge operation
4072  */
4073  switch (instance->memb_state) {
4075  memb_set_merge (&memb_merge_detect.system_from, 1,
4076  instance->my_proc_list, &instance->my_proc_list_entries);
4077  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4078  break;
4079 
4080  case MEMB_STATE_GATHER:
4081  if (!memb_set_subset (
4083  1,
4084  instance->my_proc_list,
4085  instance->my_proc_list_entries)) {
4086 
4087  memb_set_merge (&memb_merge_detect.system_from, 1,
4088  instance->my_proc_list, &instance->my_proc_list_entries);
4089  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4090  return (0);
4091  }
4092  break;
4093 
4094  case MEMB_STATE_COMMIT:
4095  /* do nothing in commit */
4096  break;
4097 
4098  case MEMB_STATE_RECOVERY:
4099  /* do nothing in recovery */
4100  break;
4101  }
4102  return (0);
4103 }
4104 
4105 static void memb_join_process (
4106  struct totemsrp_instance *instance,
4107  const struct memb_join *memb_join)
4108 {
4109  struct srp_addr *proc_list;
4110  struct srp_addr *failed_list;
4111  int gather_entered = 0;
4112  int fail_minus_memb_entries = 0;
4113  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4114 
4115  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4116  failed_list = proc_list + memb_join->proc_list_entries;
4117 
4118 /*
4119  memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
4120  memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
4121  memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4122  memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4123 -*/
4124 
4125  if (memb_set_equal (proc_list,
4126  memb_join->proc_list_entries,
4127  instance->my_proc_list,
4128  instance->my_proc_list_entries) &&
4129 
4130  memb_set_equal (failed_list,
4131  memb_join->failed_list_entries,
4132  instance->my_failed_list,
4133  instance->my_failed_list_entries)) {
4134 
4135  memb_consensus_set (instance, &memb_join->system_from);
4136 
4137  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4138  instance->failed_to_recv = 0;
4139  srp_addr_copy (&instance->my_proc_list[0],
4140  &instance->my_id);
4141  instance->my_proc_list_entries = 1;
4142  instance->my_failed_list_entries = 0;
4143 
4144  memb_state_commit_token_create (instance);
4145 
4146  memb_state_commit_enter (instance);
4147  return;
4148  }
4149  if (memb_consensus_agreed (instance) &&
4150  memb_lowest_in_config (instance)) {
4151 
4152  memb_state_commit_token_create (instance);
4153 
4154  memb_state_commit_enter (instance);
4155  } else {
4156  goto out;
4157  }
4158  } else
4159  if (memb_set_subset (proc_list,
4160  memb_join->proc_list_entries,
4161  instance->my_proc_list,
4162  instance->my_proc_list_entries) &&
4163 
4164  memb_set_subset (failed_list,
4165  memb_join->failed_list_entries,
4166  instance->my_failed_list,
4167  instance->my_failed_list_entries)) {
4168 
4169  goto out;
4170  } else
4171  if (memb_set_subset (&memb_join->system_from, 1,
4172  instance->my_failed_list, instance->my_failed_list_entries)) {
4173 
4174  goto out;
4175  } else {
4176  memb_set_merge (proc_list,
4177  memb_join->proc_list_entries,
4178  instance->my_proc_list, &instance->my_proc_list_entries);
4179 
4180  if (memb_set_subset (
4181  &instance->my_id, 1,
4182  failed_list, memb_join->failed_list_entries)) {
4183 
4184  memb_set_merge (
4185  &memb_join->system_from, 1,
4186  instance->my_failed_list, &instance->my_failed_list_entries);
4187  } else {
4188  if (memb_set_subset (
4189  &memb_join->system_from, 1,
4190  instance->my_memb_list,
4191  instance->my_memb_entries)) {
4192 
4193  if (memb_set_subset (
4194  &memb_join->system_from, 1,
4195  instance->my_failed_list,
4196  instance->my_failed_list_entries) == 0) {
4197 
4198  memb_set_merge (failed_list,
4199  memb_join->failed_list_entries,
4200  instance->my_failed_list, &instance->my_failed_list_entries);
4201  } else {
4202  memb_set_subtract (fail_minus_memb,
4203  &fail_minus_memb_entries,
4204  failed_list,
4205  memb_join->failed_list_entries,
4206  instance->my_memb_list,
4207  instance->my_memb_entries);
4208 
4209  memb_set_merge (fail_minus_memb,
4210  fail_minus_memb_entries,
4211  instance->my_failed_list,
4212  &instance->my_failed_list_entries);
4213  }
4214  }
4215  }
4216  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4217  gather_entered = 1;
4218  }
4219 
4220 out:
4221  if (gather_entered == 0 &&
4222  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4223 
4224  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4225  }
4226 }
4227 
4228 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4229 {
4230  int i;
4231  struct srp_addr *in_proc_list;
4232  struct srp_addr *in_failed_list;
4233  struct srp_addr *out_proc_list;
4234  struct srp_addr *out_failed_list;
4235 
4236  out->header.type = in->header.type;
4238  out->header.nodeid = swab32 (in->header.nodeid);
4239  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4242  out->ring_seq = swab64 (in->ring_seq);
4243 
4244  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4245  in_failed_list = in_proc_list + out->proc_list_entries;
4246  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4247  out_failed_list = out_proc_list + out->proc_list_entries;
4248 
4249  for (i = 0; i < out->proc_list_entries; i++) {
4250  srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4251  }
4252  for (i = 0; i < out->failed_list_entries; i++) {
4253  srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4254  }
4255 }
4256 
4257 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4258 {
4259  int i;
4260  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4261  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4262  struct memb_commit_token_memb_entry *in_memb_list;
4263  struct memb_commit_token_memb_entry *out_memb_list;
4264 
4265  out->header.type = in->header.type;
4267  out->header.nodeid = swab32 (in->header.nodeid);
4268  out->token_seq = swab32 (in->token_seq);
4270  out->ring_id.seq = swab64 (in->ring_id.seq);
4271  out->retrans_flg = swab32 (in->retrans_flg);
4272  out->memb_index = swab32 (in->memb_index);
4273  out->addr_entries = swab32 (in->addr_entries);
4274 
4275  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4276  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4277  for (i = 0; i < out->addr_entries; i++) {
4278  srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4279 
4280  /*
4281  * Only convert the memb entry if it has been set
4282  */
4283  if (in_memb_list[i].ring_id.rep.family != 0) {
4284  totemip_copy_endian_convert (&out_memb_list[i].ring_id.rep,
4285  &in_memb_list[i].ring_id.rep);
4286 
4287  out_memb_list[i].ring_id.seq =
4288  swab64 (in_memb_list[i].ring_id.seq);
4289  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4290  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4291  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4292  }
4293  }
4294 }
4295 
4296 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4297 {
4298  int i;
4299 
4300  out->header.type = in->header.type;
4302  out->header.nodeid = swab32 (in->header.nodeid);
4303  out->seq = swab32 (in->seq);
4304  out->token_seq = swab32 (in->token_seq);
4305  out->aru = swab32 (in->aru);
4307  out->aru_addr = swab32(in->aru_addr);
4308  out->ring_id.seq = swab64 (in->ring_id.seq);
4309  out->fcc = swab32 (in->fcc);
4310  out->backlog = swab32 (in->backlog);
4311  out->retrans_flg = swab32 (in->retrans_flg);
4313  for (i = 0; i < out->rtr_list_entries; i++) {
4315  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4316  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4317  }
4318 }
4319 
4320 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4321 {
4322  out->header.type = in->header.type;
4324  out->header.nodeid = swab32 (in->header.nodeid);
4326 
4327  out->seq = swab32 (in->seq);
4328  out->this_seqno = swab32 (in->this_seqno);
4330  out->ring_id.seq = swab64 (in->ring_id.seq);
4331  out->node_id = swab32 (in->node_id);
4332  out->guarantee = swab32 (in->guarantee);
4333  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4334 }
4335 
4336 static void memb_merge_detect_endian_convert (
4337  const struct memb_merge_detect *in,
4338  struct memb_merge_detect *out)
4339 {
4340  out->header.type = in->header.type;
4342  out->header.nodeid = swab32 (in->header.nodeid);
4344  out->ring_id.seq = swab64 (in->ring_id.seq);
4345  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4346 }
4347 
4348 static int ignore_join_under_operational (
4349  struct totemsrp_instance *instance,
4350  const struct memb_join *memb_join)
4351 {
4352  struct srp_addr *proc_list;
4353  struct srp_addr *failed_list;
4354  unsigned long long ring_seq;
4355 
4356  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4357  failed_list = proc_list + memb_join->proc_list_entries;
4358  ring_seq = memb_join->ring_seq;
4359 
4360  if (memb_set_subset (&instance->my_id, 1,
4361  failed_list, memb_join->failed_list_entries)) {
4362  return (1);
4363  }
4364 
4365  /*
4366  * In operational state, my_proc_list is exactly the same as
4367  * my_memb_list.
4368  */
4369  if ((memb_set_subset (&memb_join->system_from, 1,
4370  instance->my_memb_list, instance->my_memb_entries)) &&
4371  (ring_seq < instance->my_ring_id.seq)) {
4372  return (1);
4373  }
4374 
4375  return (0);
4376 }
4377 
4378 static int message_handler_memb_join (
4379  struct totemsrp_instance *instance,
4380  const void *msg,
4381  size_t msg_len,
4382  int endian_conversion_needed)
4383 {
4384  const struct memb_join *memb_join;
4385  struct memb_join *memb_join_convert = alloca (msg_len);
4386 
4387  if (endian_conversion_needed) {
4388  memb_join = memb_join_convert;
4389  memb_join_endian_convert (msg, memb_join_convert);
4390 
4391  } else {
4392  memb_join = msg;
4393  }
4394  /*
4395  * If the process paused because it wasn't scheduled in a timely
4396  * fashion, flush the join messages because they may be queued
4397  * entries
4398  */
4399  if (pause_flush (instance)) {
4400  return (0);
4401  }
4402 
4403  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4404  instance->token_ring_id_seq = memb_join->ring_seq;
4405  }
4406  switch (instance->memb_state) {
4408  if (!ignore_join_under_operational (instance, memb_join)) {
4409  memb_join_process (instance, memb_join);
4410  }
4411  break;
4412 
4413  case MEMB_STATE_GATHER:
4414  memb_join_process (instance, memb_join);
4415  break;
4416 
4417  case MEMB_STATE_COMMIT:
4418  if (memb_set_subset (&memb_join->system_from,
4419  1,
4420  instance->my_new_memb_list,
4421  instance->my_new_memb_entries) &&
4422 
4423  memb_join->ring_seq >= instance->my_ring_id.seq) {
4424 
4425  memb_join_process (instance, memb_join);
4426  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4427  }
4428  break;
4429 
4430  case MEMB_STATE_RECOVERY:
4431  if (memb_set_subset (&memb_join->system_from,
4432  1,
4433  instance->my_new_memb_list,
4434  instance->my_new_memb_entries) &&
4435 
4436  memb_join->ring_seq >= instance->my_ring_id.seq) {
4437 
4438  memb_join_process (instance, memb_join);
4439  memb_recovery_state_token_loss (instance);
4440  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4441  }
4442  break;
4443  }
4444  return (0);
4445 }
4446 
4447 static int message_handler_memb_commit_token (
4448  struct totemsrp_instance *instance,
4449  const void *msg,
4450  size_t msg_len,
4451  int endian_conversion_needed)
4452 {
4453  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4455  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4456  int sub_entries;
4457 
4458  struct srp_addr *addr;
4459 
4461  "got commit token");
4462 
4463  if (endian_conversion_needed) {
4464  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4465  } else {
4466  memcpy (memb_commit_token_convert, msg, msg_len);
4467  }
4468  memb_commit_token = memb_commit_token_convert;
4469  addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4470 
4471 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4472  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4473  return (0);
4474  }
4475 #endif
4476  switch (instance->memb_state) {
4478  /* discard token */
4479  break;
4480 
4481  case MEMB_STATE_GATHER:
4482  memb_set_subtract (sub, &sub_entries,
4483  instance->my_proc_list, instance->my_proc_list_entries,
4484  instance->my_failed_list, instance->my_failed_list_entries);
4485 
4486  if (memb_set_equal (addr,
4487  memb_commit_token->addr_entries,
4488  sub,
4489  sub_entries) &&
4490 
4491  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4492  memcpy (instance->commit_token, memb_commit_token, msg_len);
4493  memb_state_commit_enter (instance);
4494  }
4495  break;
4496 
4497  case MEMB_STATE_COMMIT:
4498  /*
4499  * If retransmitted commit tokens are sent on this ring
4500  * filter them out and only enter recovery once the
4501  * commit token has traversed the array. This is
4502  * determined by :
4503  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4504  */
4505  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4506  memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4507  memb_state_recovery_enter (instance, memb_commit_token);
4508  }
4509  break;
4510 
4511  case MEMB_STATE_RECOVERY:
4512  if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
4514  "Sending initial ORF token");
4515 
4516  // TODO convert instead of initiate
4517  orf_token_send_initial (instance);
4518  reset_token_timeout (instance); // REVIEWED
4519  reset_token_retransmit_timeout (instance); // REVIEWED
4520  }
4521  break;
4522  }
4523  return (0);
4524 }
4525 
4526 static int message_handler_token_hold_cancel (
4527  struct totemsrp_instance *instance,
4528  const void *msg,
4529  size_t msg_len,
4530  int endian_conversion_needed)
4531 {
4532  const struct token_hold_cancel *token_hold_cancel = msg;
4533 
4534  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4535  sizeof (struct memb_ring_id)) == 0) {
4536 
4537  instance->my_seq_unchanged = 0;
4538  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
4539  timer_function_token_retransmit_timeout (instance);
4540  }
4541  }
4542  return (0);
4543 }
4544 
4546  void *context,
4547  const void *msg,
4548  unsigned int msg_len)
4549 {
4550  struct totemsrp_instance *instance = context;
4551  const struct message_header *message_header = msg;
4552 
4553  if (msg_len < sizeof (struct message_header)) {
4555  "Received message is too short... ignoring %u.",
4556  (unsigned int)msg_len);
4557  return;
4558  }
4559 
4560  switch (message_header->type) {
4562  instance->stats.orf_token_rx++;
4563  break;
4564  case MESSAGE_TYPE_MCAST:
4565  instance->stats.mcast_rx++;
4566  break;
4568  instance->stats.memb_merge_detect_rx++;
4569  break;
4571  instance->stats.memb_join_rx++;
4572  break;
4574  instance->stats.memb_commit_token_rx++;
4575  break;
4577  instance->stats.token_hold_cancel_rx++;
4578  break;
4579  default:
4580  log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
4581 printf ("wrong message type\n");
4582  instance->stats.rx_msg_dropped++;
4583  return;
4584  }
4585  /*
4586  * Handle incoming message
4587  */
4588  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
4589  instance,
4590  msg,
4591  msg_len,
4592  message_header->endian_detector != ENDIAN_LOCAL);
4593 }
4594 
4596  void *context,
4597  const struct totem_ip_address *iface_addr,
4598  unsigned int iface_no)
4599 {
4600  struct totemsrp_instance *instance = context;
4601  int i;
4602 
4603  totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
4604  assert (instance->my_id.addr[iface_no].nodeid);
4605 
4606  totemip_copy (&instance->my_memb_list[0].addr[iface_no], iface_addr);
4607 
4608  if (instance->iface_changes++ == 0) {
4609  instance->memb_ring_id_create_or_load (&instance->my_ring_id,
4610  &instance->my_id.addr[0]);
4611  instance->token_ring_id_seq = instance->my_ring_id.seq;
4612  log_printf (
4613  instance->totemsrp_log_level_debug,
4614  "Created or loaded sequence id %llx.%s for this ring.",
4615  instance->my_ring_id.seq,
4616  totemip_print (&instance->my_ring_id.rep));
4617 
4618  if (instance->totemsrp_service_ready_fn) {
4619  instance->totemsrp_service_ready_fn ();
4620  }
4621 
4622  }
4623 
4624  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
4625  totemsrp_member_add (instance,
4626  &instance->totem_config->interfaces[iface_no].member_list[i],
4627  iface_no);
4628  }
4629 
4630  if (instance->iface_changes >= instance->totem_config->interface_count) {
4631  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
4632  }
4633 }
4634 
4635 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
4636  totem_config->net_mtu -= sizeof (struct mcast);
4637 }
4638 
4640  void *context,
4641  void (*totem_service_ready) (void))
4642 {
4643  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4644 
4645  instance->totemsrp_service_ready_fn = totem_service_ready;
4646 }
4647 
4649  void *context,
4650  const struct totem_ip_address *member,
4651  int ring_no)
4652 {
4653  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4654  int res;
4655 
4656  res = totemrrp_member_add (instance->totemrrp_context, member, ring_no);
4657 
4658  return (res);
4659 }
4660 
4662  void *context,
4663  const struct totem_ip_address *member,
4664  int ring_no)
4665 {
4666  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4667  int res;
4668 
4669  res = totemrrp_member_remove (instance->totemrrp_context, member, ring_no);
4670 
4671  return (res);
4672 }
4673 
4674 void totemsrp_threaded_mode_enable (void *context)
4675 {
4676  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4677 
4678  instance->threaded_mode_enabled = 1;
4679 }
4680 
4681 void totemsrp_trans_ack (void *context)
4682 {
4683  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
4684 
4685  instance->waiting_trans_ack = 0;
4686  instance->totemsrp_waiting_trans_ack_cb_fn (0);
4687 }
unsigned int backlog
Definition: totemsrp.c:208
uint8_t no_addrs
Definition: totemrrp.h:59
unsigned short family
Definition: coroapi.h:97
gather_state_from
Definition: totemsrp.c:529
int totemrrp_iface_check(void *rrp_context)
Definition: totemrrp.c:2207
void(*) in log_level_security)
Definition: totem.h:82
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:4595
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:101
struct srp_addr system_from
Definition: totemsrp.c:218
#define ENDIAN_LOCAL
Definition: totemsrp.c:137
uint64_t gather_entered
Definition: totem.h:260
struct memb_ring_id ring_id
Definition: totemsrp.c:196
struct list_head list
Definition: totemsrp.c:163
uint32_t waiting_trans_ack
Definition: totemsrp.c:513
struct srp_addr system_from
Definition: totemsrp.c:186
struct memb_ring_id ring_id
Definition: totemsrp.c:255
int totemsrp_log_level_debug
Definition: totemsrp.c:423
struct memb_ring_id my_ring_id
Definition: totemsrp.c:333
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
Definition: totem.h:255
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:458
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totemsrp.c:467
struct message_header header
Definition: totemsrp.c:185
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:485
unsigned int proc_list_entries
Definition: totemsrp.c:219
uint32_t value
struct totem_interface * interfaces
Definition: totem.h:114
unsigned int interface_count
Definition: totem.h:115
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1135
struct list_head * next
Definition: list.h:47
uint64_t memb_join_tx
Definition: totem.h:249
unsigned int seq
Definition: totemsrp.c:62
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totem.h:274
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:214
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:98
int totemsrp_log_level_error
Definition: totemsrp.c:417
int old_ring_state_aru
Definition: totemsrp.c:483
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:102
unsigned int seq
Definition: totemsrp.c:203
struct memb_ring_id ring_id
Definition: totemsrp.c:245
int fcc_remcast_current
Definition: totemsrp.c:297
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:410
unsigned int failed_list_entries
Definition: totemsrp.c:220
uint64_t mcast_rx
Definition: totem.h:253
unsigned long long int tv_old
Definition: totemsrp.c:3517
#define SEQNO_START_TOKEN
Definition: totemsrp.c:115
unsigned int token_hold_timeout
Definition: totem.h:133
int member_count
Definition: totem.h:70
unsigned int msg_len
Definition: totemsrp.c:270
struct memb_ring_id ring_id
Definition: totemsrp.c:207
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:71
int totemip_compare(const void *a, const void *b)
Definition: totemip.c:130
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:4648
void * token_sent_event_handle
Definition: totemsrp.c:516
struct timeval tv_old
Definition: totemsrp.c:489
int retrans_flg
Definition: totemsrp.c:210
struct srp_addr system_from
Definition: totemsrp.c:233
int my_new_memb_entries
Definition: totemsrp.c:323
totem_configuration_type
Definition: coroapi.h:110
int totemsrp_log_level_notice
Definition: totemsrp.c:421
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1124
unsigned int my_pbl
Definition: totemsrp.c:499
char rrp_mode[TOTEM_RRP_MODE_BYTES]
Definition: totem.h:161
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:4635
int totemsrp_log_level_warning
Definition: totemsrp.c:419
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1110
void totemrrp_membership_changed(void *rrp_context, enum totem_configuration_type configuration_type, const struct srp_addr *member_list, size_t member_list_entries, const struct srp_addr *left_list, size_t left_list_entries, const struct srp_addr *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemrrp.c:2310
unsigned int my_aru
Definition: totemsrp.c:377
uint64_t memb_merge_detect_rx
Definition: totem.h:248
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1052
int guarantee
Definition: totemsrp.c:66
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:366
struct message_header header
Definition: totemsrp.c:232
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:259
unsigned int seq
Definition: totemsrp.c:187
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:67
char commit_token_storage[40000]
Definition: totemsrp.c:517
unsigned int rrp_problem_count_timeout
Definition: totem.h:153
struct list_head token_callback_sent_listhead
Definition: totemsrp.c:383
Definition: sq.h:40
unsigned int set_aru
Definition: totemsrp.c:479
struct cs_queue new_message_queue
Definition: totemsrp.c:364
int my_rotation_counter
Definition: totemsrp.c:351
int earliest_token
Definition: totem.h:271
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
uint64_t orf_token_tx
Definition: totem.h:245
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3364
uint64_t gather_token_lost
Definition: totem.h:261
int totemsrp_log_level_trace
Definition: totemsrp.c:425
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:95
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
Definition: totemrrp.c:2216
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:335
memb_state
Definition: totemsrp.c:278
void * totemrrp_buffer_alloc(void *rrp_context)
Definition: totemrrp.c:2109
unsigned int downcheck_timeout
Definition: totem.h:145
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:101
uint64_t memb_commit_token_tx
Definition: totem.h:254
Definition: list.h:46
int my_deliver_memb_entries
Definition: totemsrp.c:329
unsigned int max_network_delay
Definition: totem.h:171
unsigned int heartbeat_failures_allowed
Definition: totem.h:169
#define TOTEM_TOKEN_STATS_MAX
Definition: totem.h:273
unsigned int my_last_seq
Definition: totemsrp.c:487
int my_left_memb_entries
Definition: totemsrp.c:331
#define swab64(x)
Definition: swab.h:52
struct message_item __attribute__
unsigned long long token_ring_id_seq
Definition: totemsrp.c:475
struct totem_ip_address mcast_address
Definition: totemsrp.c:443
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3329
unsigned int send_join_timeout
Definition: totem.h:139
unsigned int window_size
Definition: totem.h:173
int guarantee
Definition: totemsrp.c:191
unsigned int seq
Definition: totemsrp.c:197
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:4639
unsigned int rrp_problem_count_threshold
Definition: totem.h:155
struct mcast * mcast
Definition: totemsrp.c:274
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
uint64_t operational_entered
Definition: totem.h:258
unsigned long long ring_seq
Definition: totemsrp.c:221
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totemsrp.c:463
#define INTERFACE_MAX
Definition: coroapi.h:75
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2361
message_type
Definition: totemsrp.c:139
int latest_token
Definition: totem.h:272
uint64_t operational_token_lost
Definition: totem.h:259
unsigned int received_flg
Definition: totemsrp.c:63
uint64_t consensus_timeouts
Definition: totem.h:266
unsigned int aru_addr
Definition: totemsrp.c:206
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
Definition: totemsrp.c:379
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:671
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:406
uint64_t recovery_token_lost
Definition: totem.h:265
unsigned int backlog
Definition: totemsrp.c:66
int this_seqno
Definition: totemsrp.c:188
unsigned int token_retransmits_before_loss_const
Definition: totem.h:135
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:222
struct message_header header
Definition: totemsrp.c:239
int totemrrp_finalize(void *rrp_context)
Definition: totemrrp.c:1965
struct list_head token_callback_received_listhead
Definition: totemsrp.c:381
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2297
struct rtr_item rtr_list[0]
Definition: totemsrp.c:70
unsigned int retrans_flg
Definition: totemsrp.c:256
int totemsrp_ring_reenable(void *srp_context)
Definition: totemsrp.c:1147
struct memb_ring_id ring_id
Definition: totemsrp.c:189
unsigned int seqno_unchanged_const
Definition: totem.h:149
uint64_t commit_token_lost
Definition: totem.h:263
unsigned int miss_count_const
Definition: totem.h:187
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
Definition: totemrrp.c:2231
uint64_t token_hold_cancel_rx
Definition: totem.h:257
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:460
unsigned int join_timeout
Definition: totem.h:137
unsigned int aru
Definition: totemsrp.c:246
unsigned int nodeid
Definition: coroapi.h:96
int totemrrp_send_flush(void *rrp_context)
Definition: totemrrp.c:2154
uint64_t pause_timestamp
Definition: totemsrp.c:503
int my_set_retrans_flg
Definition: totemsrp.c:353
struct message_header header
Definition: totemsrp.c:202
struct totem_ip_address mcast_addr
Definition: totem.h:67
char encapsulated
Definition: totemrrp.c:554
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:85
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2284
Linked list API.
unsigned int received_flg
Definition: totemsrp.c:248
unsigned int my_cbl
Definition: totemsrp.c:501
struct totem_ip_address rep
Definition: coroapi.h:104
unsigned int last_released
Definition: totemsrp.c:477
int orf_token_retransmit_size
Definition: totemsrp.c:387
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2430
unsigned int rrp_autorecovery_check_timeout
Definition: totem.h:159
uint64_t mcast_retx
Definition: totem.h:252
unsigned int msg_len
Definition: totemsrp.c:275
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:97
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:75
unsigned int fail_to_recv_const
Definition: totem.h:147
unsigned int token_seq
Definition: totemsrp.c:204
struct mcast * mcast
Definition: totemsrp.c:269
void * token_recv_event_handle
Definition: totemsrp.c:515
struct totem_ip_address boundto
Definition: totem.h:66
unsigned int my_high_seq_received
Definition: totemsrp.c:347
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
Definition: totemrrp.c:1994
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:441
totem_event_type
Definition: totem.h:212
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:394
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:402
int old_ring_state_saved
Definition: totemsrp.c:481
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:339
uint64_t rx_msg_dropped
Definition: totem.h:267
int totemsrp_log_level_security
Definition: totemsrp.c:415
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:398
struct totem_config * totem_config
Definition: totemsrp.c:493
#define swab32(x)
Definition: swab.h:43
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:396
uint32_t continuous_gather
Definition: totem.h:268
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:4674
unsigned int aru
Definition: totemsrp.c:63
encapsulation_type
Definition: totemsrp.c:148
unsigned int net_mtu
Definition: totem.h:165
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:822
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2352
unsigned int node_id
Definition: totemsrp.c:190
int totemrrp_recv_flush(void *rrp_context)
Definition: totemrrp.c:2145
uint32_t orf_token_discard
Definition: totemsrp.c:509
int my_failed_list_entries
Definition: totemsrp.c:321
struct srp_addr my_id
Definition: totemsrp.c:303
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
uint64_t token_hold_cancel_tx
Definition: totem.h:256
unsigned int token_timeout
Definition: totem.h:129
Definition: totemsrp.c:244
unsigned int high_delivered
Definition: totemsrp.c:247
unsigned int consensus_timeout
Definition: totem.h:141
totemsrp_stats_t stats
Definition: totemsrp.c:507
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
Definition: totemsrp.c:4545
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:83
uint64_t mcast_tx
Definition: totem.h:251
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Definition: totemrrp.c:2116
void * totemrrp_context
Definition: totemsrp.c:491
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:385
struct message_header header
Definition: totemsrp.c:217
struct sq regular_sort_queue
Definition: totemsrp.c:370
int my_retrans_flg_count
Definition: totemsrp.c:355
unsigned int nodeid
Definition: totemsrp.c:63
#define SEQNO_START_MSG
Definition: totemsrp.c:114
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1028
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:429
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:96
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
unsigned short family
Definition: coroapi.h:66
struct cs_queue retrans_message_queue
Definition: totemsrp.c:368
unsigned int aru
Definition: totemsrp.c:205
const char * gather_state_from_desc[]
Definition: totemsrp.c:549
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:404
int my_trans_memb_entries
Definition: totemsrp.c:325
unsigned int my_trc
Definition: totemsrp.c:497
uint64_t memb_merge_detect_tx
Definition: totem.h:247
unsigned int high_delivered
Definition: totemsrp.c:62
struct rtr_item rtr_list[0]
Definition: totemsrp.c:212
totemsrp_stats_t * srp
Definition: totem.h:283
int consensus_list_entries
Definition: totemsrp.c:301
unsigned int rrp_problem_count_mcast_threshold
Definition: totem.h:157
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
Definition: totemrrp.c:2123
char type
Definition: totemsrp.c:60
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:437
uint64_t memb_join_rx
Definition: totem.h:250
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2187
#define FRAME_SIZE_MAX
Definition: totem.h:50
int rtr_list_entries
Definition: totemsrp.c:211
uint32_t threaded_mode_enabled
Definition: totemsrp.c:511
enum totem_callback_token_type callback_type
Definition: totemsrp.c:165
int totemrrp_mcast_recv_empty(void *rrp_context)
Definition: totemrrp.c:2273
int my_proc_list_entries
Definition: totemsrp.c:319
#define list_entry(ptr, type, member)
Definition: list.h:84
unsigned long long ring_seq
Definition: totemsrp.c:64
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:163
unsigned short endian_detector
Definition: totemrrp.c:555
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2173
struct memb_ring_id ring_id
Definition: totemsrp.c:240
#define log_printf(level, format, args...)
Definition: totemsrp.c:683
unsigned long long seq
Definition: coroapi.h:105
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:4681
unsigned int max_messages
Definition: totem.h:175
uint64_t recovery_entered
Definition: totem.h:264
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:408
struct memb_commit_token * commit_token
Definition: totemsrp.c:505
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:299
struct srp_addr addr
Definition: totemsrp.c:157
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:522
int totemsrp_subsys_id
Definition: totemsrp.c:427
unsigned int merge_timeout
Definition: totem.h:143
unsigned int use_heartbeat
Definition: totemsrp.c:495
struct message_header header
Definition: totemsrp.c:253
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:4661
unsigned int token_retransmit_timeout
Definition: totem.h:131
int rtr_list_entries
Definition: totemsrp.c:69
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:100
unsigned int token_seq
Definition: totemsrp.c:254
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:71
unsigned int my_token_seq
Definition: totemsrp.c:389
struct memb_ring_id ring_id
Definition: totemsrp.c:64
unsigned int my_last_aru
Definition: totemsrp.c:341
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
Definition: totemrrp.c:2250
uint64_t commit_entered
Definition: totem.h:262
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:400
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:445
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:451
struct totem_ip_address addr[INTERFACE_MAX]
Definition: totemrrp.h:60
unsigned int rrp_token_expired_timeout
Definition: totem.h:151
struct memb_ring_id ring_id
Definition: totemsrp.c:234
unsigned int my_install_seq
Definition: totemsrp.c:349
uint64_t orf_token_rx
Definition: totem.h:246
unsigned int nodeid
Definition: totemsrp.c:180
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2162
unsigned int threads
Definition: totem.h:167
struct sq recovery_sort_queue
Definition: totemsrp.c:372
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totem.h:191
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
Definition: totemrrp.c:2135
totem_callback_token_type
Definition: coroapi.h:117
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:164
unsigned int my_high_ring_delivered
Definition: totemsrp.c:357
unsigned int fcc
Definition: totemsrp.c:209
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totem.h:195