41 #include <sys/types.h>
42 #include <sys/socket.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
55 #include <netinet/in.h>
56 #include <arpa/inet.h>
61 #include <qb/qbipc_common.h>
71 #define MAP_ANONYMOUS MAP_ANON
78 #define GROUP_HASH_SIZE 32
149 static struct list_head downlist_messages_head;
150 static struct list_head joinlist_messages_head;
177 static unsigned int my_member_list_entries;
181 static unsigned int my_old_member_list_entries = 0;
207 static int cpg_lib_init_fn (
void *conn);
209 static int cpg_lib_exit_fn (
void *conn);
211 static void message_handler_req_exec_cpg_procjoin (
215 static void message_handler_req_exec_cpg_procleave (
219 static void message_handler_req_exec_cpg_joinlist (
223 static void message_handler_req_exec_cpg_mcast (
227 static void message_handler_req_exec_cpg_downlist_old (
231 static void message_handler_req_exec_cpg_downlist (
235 static void exec_cpg_procjoin_endian_convert (
void *msg);
237 static void exec_cpg_joinlist_endian_convert (
void *msg);
239 static void exec_cpg_mcast_endian_convert (
void *msg);
241 static void exec_cpg_downlist_endian_convert_old (
void *msg);
243 static void exec_cpg_downlist_endian_convert (
void *msg);
245 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message);
247 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message);
249 static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message);
251 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message);
253 static void message_handler_req_lib_cpg_membership (
void *conn,
254 const void *message);
256 static void message_handler_req_lib_cpg_local_get (
void *conn,
257 const void *message);
259 static void message_handler_req_lib_cpg_iteration_initialize (
261 const void *message);
263 static void message_handler_req_lib_cpg_iteration_next (
265 const void *message);
267 static void message_handler_req_lib_cpg_iteration_finalize (
269 const void *message);
271 static void message_handler_req_lib_cpg_zc_alloc (
273 const void *message);
275 static void message_handler_req_lib_cpg_zc_free (
277 const void *message);
279 static void message_handler_req_lib_cpg_zc_execute (
281 const void *message);
283 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason);
285 static int cpg_exec_send_downlist(
void);
287 static int cpg_exec_send_joinlist(
void);
289 static void downlist_messages_delete (
void);
291 static void downlist_master_choose_and_send (
void);
293 static void joinlist_inform_clients (
void);
295 static void joinlist_messages_delete (
void);
297 static void cpg_sync_init (
298 const unsigned int *trans_list,
299 size_t trans_list_entries,
300 const unsigned int *member_list,
301 size_t member_list_entries,
304 static int cpg_sync_process (
void);
306 static void cpg_sync_activate (
void);
308 static void cpg_sync_abort (
void);
310 static void do_proc_join(
316 static void do_proc_leave(
322 static int notify_lib_totem_membership (
324 int member_list_entries,
325 const unsigned int *member_list);
327 static inline int zcb_all_free (
330 static char *cpg_print_group_name (
343 .lib_handler_fn = message_handler_req_lib_cpg_leave,
347 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
351 .lib_handler_fn = message_handler_req_lib_cpg_membership,
355 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
359 .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
363 .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
367 .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
371 .lib_handler_fn = message_handler_req_lib_cpg_finalize,
375 .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
379 .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
383 .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
394 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
397 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
398 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
401 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
402 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
405 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
406 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
409 .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
410 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
413 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
414 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
419 .
name =
"corosync cluster closed process group service v1.01",
422 .private_data_size =
sizeof (
struct cpg_pd),
425 .lib_init_fn = cpg_lib_init_fn,
426 .lib_exit_fn = cpg_lib_exit_fn,
427 .lib_engine = cpg_lib_engine,
429 .exec_init_fn = cpg_exec_init_fn,
430 .exec_dump_fn = NULL,
431 .exec_engine = cpg_exec_engine,
433 .sync_init = cpg_sync_init,
434 .sync_process = cpg_sync_process,
435 .sync_activate = cpg_sync_activate,
436 .sync_abort = cpg_sync_abort
441 return (&cpg_service_engine);
445 struct qb_ipc_request_header header __attribute__((aligned(8)));
452 struct qb_ipc_request_header header __attribute__((aligned(8)));
461 struct qb_ipc_request_header header __attribute__((aligned(8)));
467 struct qb_ipc_request_header header __attribute__((aligned(8)));
502 for (i = 0; i < group->length; i++) {
505 if (c >=
' ' && c < 0x7f && c !=
'\\') {
509 res[dest_pos++] =
'\\';
510 res[dest_pos++] =
'\\';
512 snprintf(res + dest_pos,
sizeof(res) - dest_pos,
"\\x%02X", c);
522 static void cpg_sync_init (
523 const unsigned int *trans_list,
524 size_t trans_list_entries,
525 const unsigned int *member_list,
526 size_t member_list_entries,
535 memcpy (my_member_list, member_list, member_list_entries *
536 sizeof (
unsigned int));
537 my_member_list_entries = member_list_entries;
539 last_sync_ring_id.nodeid = ring_id->
rep.
nodeid;
540 last_sync_ring_id.seq = ring_id->
seq;
548 for (i = 0; i < my_old_member_list_entries; i++) {
550 for (j = 0; j < trans_list_entries; j++) {
551 if (my_old_member_list[i] == trans_list[j]) {
557 g_req_exec_cpg_downlist.nodeids[entries++] =
558 my_old_member_list[i];
561 g_req_exec_cpg_downlist.left_nodes = entries;
564 static int cpg_sync_process (
void)
569 res = cpg_exec_send_downlist();
576 res = cpg_exec_send_joinlist();
581 static void cpg_sync_activate (
void)
583 memcpy (my_old_member_list, my_member_list,
584 my_member_list_entries *
sizeof (
unsigned int));
585 my_old_member_list_entries = my_member_list_entries;
588 downlist_master_choose_and_send ();
591 joinlist_inform_clients ();
593 downlist_messages_delete ();
595 joinlist_messages_delete ();
597 notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
600 static void cpg_sync_abort (
void)
603 downlist_messages_delete ();
604 joinlist_messages_delete ();
607 static int notify_lib_totem_membership (
609 int member_list_entries,
610 const unsigned int *member_list)
624 res->member_list_entries = member_list_entries;
625 res->header.size = size;
627 res->header.error =
CS_OK;
633 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
644 static int notify_lib_joinlist(
647 int joined_list_entries,
649 int left_list_entries,
662 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
664 if (mar_name_compare (&pi->
group, group_name) == 0) {
668 for (i = 0; i < left_list_entries; i++) {
669 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
686 res->joined_list_entries = joined_list_entries;
687 res->left_list_entries = left_list_entries;
688 res->member_list_entries = count;
690 res->header.size = size;
692 res->header.error =
CS_OK;
695 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
698 if (mar_name_compare (&pi->
group, group_name) == 0) {
702 for (i = 0;i < left_list_entries; i++) {
703 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
709 retgi->nodeid = pi->
nodeid;
710 retgi->pid = pi->
pid;
716 if (left_list_entries) {
718 retgi += left_list_entries;
721 if (joined_list_entries) {
723 retgi += joined_list_entries;
729 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
731 if (mar_name_compare (&cpd->
group_name, group_name) == 0) {
732 assert (joined_list_entries <= 1);
733 if (joined_list_entries) {
734 if (joined_list[0].
pid == cpd->
pid &&
744 if (left_list_entries) {
745 if (left_list[0].
pid == cpd->
pid &&
762 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
768 notify_lib_totem_membership (cpd->
conn, my_old_member_list_entries, my_old_member_list);
775 static void downlist_log(
const char *msg,
struct downlist_msg* dl)
778 "%s: sender %s; members(old:%d left:%d)",
785 static struct downlist_msg* downlist_master_choose (
void)
790 uint32_t cmp_members;
791 uint32_t best_members;
795 for (iter = downlist_messages_head.
next;
796 iter != &downlist_messages_head;
800 downlist_log(
"comparing", cmp);
803 for (i = 0; i < cmp->left_nodes; i++) {
805 log_printf (LOG_DEBUG,
"Ignoring this entry because I'm in the left list\n");
821 best_members = best->old_members - best->left_nodes;
822 cmp_members = cmp->old_members - cmp->left_nodes;
824 if (cmp_members > best_members) {
826 }
else if (cmp_members == best_members) {
827 if (cmp->old_members > best->old_members) {
829 }
else if (cmp->old_members == best->old_members) {
837 assert (best != NULL);
842 static void downlist_master_choose_and_send (
void)
853 int left_list_entries;
856 qb_map_iter_t *miter;
861 stored_msg = downlist_master_choose ();
866 downlist_log(
"chosen downlist", stored_msg);
868 group_map = qb_skiplist_create();
875 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
880 for (i = 0; i < stored_msg->left_nodes; i++) {
882 if (pi->
nodeid == stored_msg->nodeids[i]) {
889 marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->
group);
890 cpg_group.value[cpg_group.length] = 0;
892 pcd = (
struct confchg_data *)qb_map_get(group_map, cpg_group.value);
894 pcd = (
struct confchg_data *)calloc(1,
sizeof(
struct confchg_data));
895 memcpy(&pcd->cpg_group, &cpg_group,
sizeof(
struct cpg_name));
896 qb_map_put(group_map, pcd->cpg_group.value, pcd);
898 size = pcd->left_list_entries;
899 pcd->left_list[size].nodeid = left_pi->
nodeid;
900 pcd->left_list[size].pid = left_pi->
pid;
902 pcd->left_list_entries++;
903 list_del (&left_pi->
list);
909 miter = qb_map_iter_create(group_map);
910 while (qb_map_iter_next(miter, (
void **)&pcd)) {
911 marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
913 log_printf (LOG_DEBUG,
"left_list_entries:%d", pcd->left_list_entries);
914 for (i=0; i<pcd->left_list_entries; i++) {
915 log_printf (LOG_DEBUG,
"left_list[%d] group:%s, ip:%s, pid:%d",
916 i, cpg_print_group_name(&group),
918 pcd->left_list[i].pid);
922 notify_lib_joinlist(&group, NULL,
924 pcd->left_list_entries,
930 qb_map_iter_free(miter);
931 qb_map_destroy(group_map);
937 static void joinlist_remove_zombie_pi_entries (
void)
945 for (pi_iter = process_info_list_head.
next; pi_iter != &process_info_list_head; ) {
947 pi_iter = pi_iter->
next;
960 for (jl_iter = joinlist_messages_head.
next;
961 jl_iter != &joinlist_messages_head;
962 jl_iter = jl_iter->
next) {
971 pi->
pid == stored_msg->
pid &&
984 static void joinlist_inform_clients (
void)
991 for (iter = joinlist_messages_head.
next;
992 iter != &joinlist_messages_head;
997 log_printf (LOG_DEBUG,
"joinlist_messages[%u] group:%s, ip:%s, pid:%d",
998 i++, cpg_print_group_name(&stored_msg->
group_name),
1011 joinlist_remove_zombie_pi_entries ();
1014 static void downlist_messages_delete (
void)
1019 for (iter = downlist_messages_head.
next;
1020 iter != &downlist_messages_head;
1023 iter_next = iter->
next;
1026 list_del (&stored_msg->
list);
1031 static void joinlist_messages_delete (
void)
1036 for (iter = joinlist_messages_head.
next;
1037 iter != &joinlist_messages_head;
1040 iter_next = iter->
next;
1043 list_del (&stored_msg->
list);
1046 list_init (&joinlist_messages_head);
1051 list_init (&downlist_messages_head);
1052 list_init (&joinlist_messages_head);
1066 iter_next = iter->
next;
1069 list_del (&pi->
list);
1073 list_del (&cpg_iteration_instance->
list);
1074 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
1077 static void cpg_pd_finalize (
struct cpg_pd *cpd)
1080 struct cpg_iteration_instance *cpii;
1087 iter_next = iter->
next;
1091 cpg_iteration_instance_finalize (cpii);
1094 list_del (&cpd->
list);
1097 static int cpg_lib_exit_fn (
void *conn)
1108 cpg_pd_finalize (cpd);
1114 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason)
1117 struct iovec req_exec_cpg_iovec;
1136 static void exec_cpg_procjoin_endian_convert (
void *msg)
1140 req_exec_cpg_procjoin->pid =
swab32(req_exec_cpg_procjoin->pid);
1141 swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1142 req_exec_cpg_procjoin->reason =
swab32(req_exec_cpg_procjoin->reason);
1145 static void exec_cpg_joinlist_endian_convert (
void *msg_v)
1148 struct qb_ipc_response_header *res = (
struct qb_ipc_response_header *)msg;
1151 swab_mar_int32_t (&res->size);
1153 while ((
const char*)jle < msg + res->size) {
1160 static void exec_cpg_downlist_endian_convert_old (
void *msg)
1164 static void exec_cpg_downlist_endian_convert (
void *msg)
1169 req_exec_cpg_downlist->left_nodes =
swab32(req_exec_cpg_downlist->left_nodes);
1170 req_exec_cpg_downlist->old_members =
swab32(req_exec_cpg_downlist->old_members);
1172 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1173 req_exec_cpg_downlist->nodeids[i] =
swab32(req_exec_cpg_downlist->nodeids[i]);
1178 static void exec_cpg_mcast_endian_convert (
void *msg)
1182 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1183 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1184 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1185 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1186 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1192 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1196 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1197 mar_name_compare (&pi->
group, group_name) == 0) {
1205 static void do_proc_join(
1208 unsigned int nodeid,
1217 if (process_info_find (name, pid, nodeid) != NULL) {
1227 memcpy(&pi->
group, name,
sizeof(*name));
1228 list_init(&pi->
list);
1233 list_to_add = &process_info_list_head;
1234 for (list = process_info_list_head.
next; list != &process_info_list_head; list = list->
next) {
1244 list_add (&pi->
list, list_to_add);
1246 notify_info.pid = pi->
pid;
1247 notify_info.nodeid =
nodeid;
1248 notify_info.reason = reason;
1250 notify_lib_joinlist(&pi->
group, NULL,
1256 static void do_proc_leave(
1259 unsigned int nodeid,
1266 notify_info.pid = pid;
1267 notify_info.nodeid =
nodeid;
1268 notify_info.reason = reason;
1270 notify_lib_joinlist(name, NULL,
1275 for (iter = process_info_list_head.
next; iter != &process_info_list_head; ) {
1279 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1280 mar_name_compare (&pi->
group, name)==0) {
1281 list_del (&pi->
list);
1287 static void message_handler_req_exec_cpg_downlist_old (
1288 const void *message,
1289 unsigned int nodeid)
1295 static void message_handler_req_exec_cpg_downlist(
1296 const void *message,
1297 unsigned int nodeid)
1299 const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1307 req_exec_cpg_downlist->left_nodes, downlist_state);
1313 stored_msg->old_members = req_exec_cpg_downlist->old_members;
1314 stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
1315 memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
1316 req_exec_cpg_downlist->left_nodes * sizeof (
mar_uint32_t));
1317 list_init (&stored_msg->
list);
1318 list_add (&stored_msg->
list, &downlist_messages_head);
1320 for (i = 0; i < my_member_list_entries; i++) {
1322 for (iter = downlist_messages_head.
next;
1323 iter != &downlist_messages_head;
1324 iter = iter->
next) {
1336 downlist_master_choose_and_send ();
1340 static void message_handler_req_exec_cpg_procjoin (
1341 const void *message,
1342 unsigned int nodeid)
1349 (
unsigned int)req_exec_cpg_procjoin->pid);
1351 do_proc_join (&req_exec_cpg_procjoin->group_name,
1352 req_exec_cpg_procjoin->pid, nodeid,
1356 static void message_handler_req_exec_cpg_procleave (
1357 const void *message,
1358 unsigned int nodeid)
1365 (
unsigned int)req_exec_cpg_procjoin->pid);
1367 do_proc_leave (&req_exec_cpg_procjoin->group_name,
1368 req_exec_cpg_procjoin->pid, nodeid,
1369 req_exec_cpg_procjoin->reason);
1374 static void message_handler_req_exec_cpg_joinlist (
1375 const void *message_v,
1376 unsigned int nodeid)
1378 const char *message = message_v;
1379 const struct qb_ipc_response_header *res = (
const struct qb_ipc_response_header *)message;
1386 while ((
const char*)jle < message + res->size) {
1390 stored_msg->
pid = jle->
pid;
1392 list_init (&stored_msg->
list);
1393 list_add (&stored_msg->
list, &joinlist_messages_head);
1398 static void message_handler_req_exec_cpg_mcast (
1399 const void *message,
1400 unsigned int nodeid)
1404 int msglen = req_exec_cpg_mcast->msglen;
1407 struct iovec iovec[2];
1421 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1422 iovec[1].iov_len = msglen;
1424 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; ) {
1429 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1433 for (pi_iter = process_info_list_head.
next;
1434 pi_iter != &process_info_list_head; pi_iter = pi_iter->
next) {
1438 if (pi->
nodeid == nodeid &&
1439 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1457 static int cpg_exec_send_downlist(
void)
1462 g_req_exec_cpg_downlist.header.size =
sizeof(
struct req_exec_cpg_downlist);
1464 g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1466 iov.iov_base = (
void *)&g_req_exec_cpg_downlist;
1467 iov.iov_len = g_req_exec_cpg_downlist.header.size;
1472 static int cpg_exec_send_joinlist(
void)
1476 struct qb_ipc_response_header *res;
1479 struct iovec req_exec_cpg_iovec;
1481 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1493 buf = alloca(
sizeof(
struct qb_ipc_response_header) +
sizeof(
struct join_list_entry) * count);
1499 jle = (
struct join_list_entry *)(buf +
sizeof(
struct qb_ipc_response_header));
1500 res = (
struct qb_ipc_response_header *)buf;
1502 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1513 res->size =
sizeof(
struct qb_ipc_response_header)+sizeof(struct
join_list_entry) * count;
1515 req_exec_cpg_iovec.iov_base = buf;
1516 req_exec_cpg_iovec.iov_len = res->size;
1521 static int cpg_lib_init_fn (
void *conn)
1524 memset (cpd, 0,
sizeof(
struct cpg_pd));
1526 list_add (&cpd->
list, &cpg_pd_list_head);
1537 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message)
1546 for (iter = cpg_pd_list_head.
next; iter != &cpg_pd_list_head; iter = iter->
next) {
1549 if (cpd_item->
pid == req_lib_cpg_join->pid &&
1550 mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->
group_name) == 0) {
1562 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
1566 mar_name_compare(&req_lib_cpg_join->group_name, &pi->
group) == 0) {
1582 cpd->
pid = req_lib_cpg_join->pid;
1583 cpd->
flags = req_lib_cpg_join->flags;
1584 memcpy (&cpd->
group_name, &req_lib_cpg_join->group_name,
1587 cpg_node_joinleave_send (req_lib_cpg_join->pid,
1588 &req_lib_cpg_join->group_name,
1610 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message)
1632 cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1633 &req_lib_cpg_leave->group_name,
1647 static void message_handler_req_lib_cpg_finalize (
1649 const void *message)
1661 list_del (&cpd->
list);
1662 list_init (&cpd->
list);
1682 fd = open (path, O_RDWR, 0600);
1690 res = ftruncate (fd, bytes);
1692 goto error_close_unlink;
1695 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1698 if (addr == MAP_FAILED) {
1699 goto error_close_unlink;
1702 madvise(addr, bytes, MADV_NOSYNC);
1718 static inline int zcb_alloc (
1720 const char *path_to_file,
1727 zcb_mapped = malloc (
sizeof (
struct zcb_mapped));
1728 if (zcb_mapped == NULL) {
1741 list_init (&zcb_mapped->
list);
1749 static inline int zcb_free (
struct zcb_mapped *zcb_mapped)
1753 res = munmap (zcb_mapped->
addr, zcb_mapped->
size);
1754 list_del (&zcb_mapped->
list);
1759 static inline int zcb_by_addr_free (
struct cpg_pd *cpd,
void *addr)
1762 struct zcb_mapped *zcb_mapped;
1763 unsigned int res = 0;
1768 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1770 if (zcb_mapped->
addr == addr) {
1771 res = zcb_free (zcb_mapped);
1779 static inline int zcb_all_free (
1783 struct zcb_mapped *zcb_mapped;
1788 zcb_mapped =
list_entry (list,
struct zcb_mapped, list);
1792 zcb_free (zcb_mapped);
1802 static uint64_t void2serveraddr (
void *server_ptr)
1810 static void *serveraddr2void (uint64_t
server_addr)
1818 static void message_handler_req_lib_cpg_zc_alloc (
1820 const void *message)
1823 struct qb_ipc_response_header res_header;
1831 res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1838 res_header.size =
sizeof (
struct qb_ipc_response_header);
1845 static void message_handler_req_lib_cpg_zc_free (
1847 const void *message)
1850 struct qb_ipc_response_header res_header;
1856 addr = serveraddr2void (hdr->server_address);
1858 zcb_by_addr_free (cpd, addr);
1860 res_header.size =
sizeof (
struct qb_ipc_response_header);
1868 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message)
1874 struct iovec req_exec_cpg_iovec[2];
1875 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1876 int msglen = req_lib_cpg_mcast->msglen;
1897 if (error ==
CS_OK) {
1898 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
1901 req_exec_cpg_mcast.pid = cpd->
pid;
1902 req_exec_cpg_mcast.msglen = msglen;
1904 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1907 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1908 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1909 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
1910 req_exec_cpg_iovec[1].iov_len = msglen;
1913 assert(result == 0);
1916 conn, group_name.value, cpd->
cpd_state, error);
1920 static void message_handler_req_lib_cpg_zc_execute (
1922 const void *message)
1925 struct qb_ipc_request_header *
header;
1928 struct iovec req_exec_cpg_iovec[2];
1929 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1930 struct req_lib_cpg_mcast *req_lib_cpg_mcast;
1936 header = (
struct qb_ipc_request_header *)(((
char *)serveraddr2void(hdr->server_address) + sizeof (
struct coroipcs_zc_header)));
1937 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)header;
1956 if (error ==
CS_OK) {
1957 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
1960 req_exec_cpg_mcast.pid = cpd->
pid;
1961 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1963 memcpy(&req_exec_cpg_mcast.group_name, &cpd->
group_name,
1966 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1967 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1968 req_exec_cpg_iovec[1].iov_base = (
char *)header +
sizeof(
struct req_lib_cpg_mcast);
1969 req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
1986 static void message_handler_req_lib_cpg_membership (
void *conn,
1987 const void *message)
1990 (
struct req_lib_cpg_membership_get *)message;
1993 int member_count = 0;
2000 for (iter = process_info_list_head.
next;
2001 iter != &process_info_list_head; iter = iter->
next) {
2004 if (mar_name_compare (&pi->
group, &req_lib_cpg_membership_get->group_name) == 0) {
2016 static void message_handler_req_lib_cpg_local_get (
void *conn,
2017 const void *message)
2030 static void message_handler_req_lib_cpg_iteration_initialize (
2032 const void *message)
2039 struct cpg_iteration_instance *cpg_iteration_instance;
2052 res = hdb_handle_create (&cpg_iteration_handle_t_db,
sizeof (
struct cpg_iteration_instance),
2053 &cpg_iteration_handle);
2060 res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (
void *)&cpg_iteration_instance);
2068 cpg_iteration_instance->
handle = cpg_iteration_handle;
2073 for (iter = process_info_list_head.
next; iter != &process_info_list_head; iter = iter->
next) {
2085 iter2 = iter2->
next) {
2088 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2104 if (mar_name_compare (&pi->
group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2117 goto error_put_destroy;
2121 list_init (&new_pi->
list);
2135 iter2 = iter2->
next) {
2138 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2143 list_add (&new_pi->
list, iter2);
2153 list_init (&cpg_iteration_instance->
list);
2159 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2161 if (error !=
CS_OK) {
2162 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2175 static void message_handler_req_lib_cpg_iteration_next (
2177 const void *message)
2181 struct cpg_iteration_instance *cpg_iteration_instance;
2188 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2189 req_lib_cpg_iterationnext->iteration_handle,
2190 (
void *)&cpg_iteration_instance);
2197 assert (cpg_iteration_instance);
2218 hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2228 static void message_handler_req_lib_cpg_iteration_finalize (
2230 const void *message)
2234 struct cpg_iteration_instance *cpg_iteration_instance;
2240 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2241 req_lib_cpg_iterationfinalize->iteration_handle,
2242 (
void *)&cpg_iteration_instance);
2249 assert (cpg_iteration_instance);
2251 cpg_iteration_instance_finalize (cpg_iteration_instance);
2252 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
int initial_totem_conf_sent
mar_cpg_address_t member_list[]
mar_uint32_t sender_nodeid
#define CPG_MAX_NAME_LENGTH
void(* lib_handler_fn)(void *conn, const void *msg)
#define LOGSYS_LEVEL_TRACE
mar_uint32_t sender_nodeid
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
mar_uint32_t old_members __attribute__((aligned(8)))
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
struct message_header header
struct list_head * current_pointer
unsigned char addr[TOTEMIP_ADDRLEN]
struct qb_ipc_request_header header __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
#define log_printf(level, format, args...)
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
struct list_head iteration_instance_list_head
#define SERVICE_ID_MAKE(a, b)
#define LOGSYS_LEVEL_WARNING
void *(* ipc_private_data_get)(void *conn)
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
void(* ipc_refcnt_inc)(void *conn)
#define LOGSYS_LEVEL_ERROR
void(* ipc_refcnt_dec)(void *conn)
struct totem_ip_address rep
mar_uint32_t member_list[]
#define LOGSYS_LEVEL_DEBUG
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
mar_cpg_name_t group_name
mar_cpg_name_t group_name
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
#define PROCESSOR_COUNT_MAX
struct corosync_service_engine cpg_service_engine
struct list_head zcb_mapped_list_head
struct qb_ipc_request_header header __attribute__((aligned(8)))
#define list_entry(ptr, type, member)
mar_cpg_name_t group_name
struct list_head items_list_head
struct memb_ring_id ring_id
const char *(* totem_ifaces_print)(unsigned int nodeid)
struct qb_ipc_request_header header __attribute__((aligned(8)))
unsigned int(* totem_nodeid_get)(void)
DECLARE_LIST_INIT(cpg_pd_list_head)
Message from another node.