MP_Lite: The TCP module

The TCP module is designed for workstation/PC clusters running Unix and for individual Unix machines. All machines must trust each other enough to do an ssh/rsh without prompting for a password, and all must share a common file system.

SIGIO interrupt-driven controller

This asynchronous or interrupt driven mode (make tcp) works best if the TCP send and receive buffer sizes are increased as described in the optimization page, but can function well with the default buffer sizes.

All the message queues are local linked lists of message headers that contain all the information about the current state of the message. An MP_ASend() creates a message header and posts it to the end of its send queue for the destination node send_q[dest]. If the queue was empty, the internal function do_send() will activate that send queue, and a writev() copies the 12 byte header and as much data as will fit into the TCP send buffer. If any message data does not fit into the TCP send buffer, the controller sigio_handler() will take care of it when a SIGIO interrupt is generated as the data drains from the buffer. This routine services all active transfers by pushing more data into the send buffers using writev() or pulling more data out of the active receive buffers using readv(), then returns control to the main program. If the send queue for the destination node was already active with other messages, then the message header is simply posted at the end of the send message queue where it will be handled in the order that it was posted.

If any data has not been sent when an MP_Wait() with the msg_id of the MP_ASend() is encountered, it will malloc() and push the remaining data to a send buffer to avoid any possibility of a lock-up condition. The controller will then pull data from the malloc'ed send buffer instead of its original place, and free the memory once the entire message has been sent. A blocking MP_Send() is then just an MP_ASend() followed by an MP_Wait() call.

An MP_ARecv() call from a known source creates a message header and posts it to the end of the receive message queue for the source node recv_q[source]. If the queue was empty, the internal function do_recv() must first check for a matching message in the receive message buffer msg_q[source] for the source node. If one exists, it is copied over to user space and that receive buffer is freed.

If there was no matching message buffered in msg_q[source], do_recv() will activate that receive queue, and a readv() will attempt to read a 12 byte header and data from the TCP receive buffer for the source node. If there is data and the header matches the number of bytes and message tag, the data is copied to user space. If not all the data was there, or if no header was found at all, control is returned to the main program and the controller will continue to check that TCP receive buffer every time a SIGIO interrupt is generated until the message has been completely received.

If a header and data are found, but the header does not match the current message, then an out-of-order message condition exists that must be handled by pushing the current message to the msg_q[source] buffer. A message header is created, a malloc() call creates a buffer for the data, the data is copied to this receive buffer, and the new message header is put at the start of the receive queue so that it gets processed first. The MP_ARecv() then must wait until the out-of-order receive has been completed before the controller will again try to match it to the next header that arrives.

For an unknown source (-1), the msg_q[node] must be checked first for all nodes. If no matching message is found, sigio_handler() must activate all TCP receive buffers and keep messages flowing by buffering them to the message queues if they don't match. This is much less efficient than when the source is known, but can be a convenience when performance is not critical.

An MP_Wait() on the MP_ARecv() msg_id simply blocks until sigio_handler() has completed the receive. MP_Recv() is then simply an MP_ARecv() followed by an MP_Wait().

This module can use either POSIX or BSD signals. POSIX is the default, since it works best for most flavors of Unix.

Performance

Messages sent out-of-order between two nodes will reduce the performance since they get buffered on the destination node. Receives posted without specifying the source node will also kill the performance since all TCP receive buffers must be activated, and all data encountered must be buffered if it does not match.

If the TCP buffers are larger than the messages being transferred, preposting the receives really is not necessary since the data will flow through the TCP buffers without any blocks. When the TCP buffer sizes are smaller than typical message traffic, preposting the receives allows the controller to move the data from the TCP buffer to user space as it comes in, allowing more to flow in from the source node. If no receive is preposted, data will fill the TCP receive buffer on the destination node, then the TCP send buffer on the source node, and the source node may have to block until data starts draining from on the destination node.

For Gigabit Ethernet networks, most MPI implementations can now deliver the raw TCP capabilities to the application layer if the system is tuned properly. This primarily means adjusting the TCP socket buffer sizes. This may involve adjusting them both at the OS level and setting the propper environmental variables for the message-passing implementation.

MP_Lite already tries to increase these buffers to 1 MB, but most systems have hard limits set. If you have control over the cluster you are running on, you can increase these limits. For Linux, the maximum TCP socket buffer sizes can be set in the /etc/sysctl.conf file.

  # Increase the maximum socket buffer sizes
  net.core.rmem_max = 524288
  net.core.wmem_max = 524288

Handshaking

All nodes open the common .mplite.config file generated by the mprun script. This contains information such as the number of nodes, the program name and arguments, and the host name or interface name for each node (or multiple interface names if channel bonding is being used).

In order for the nodes to set up their socket connections, they need to know the listen port numbers for each node. Each node tries to create a listen socket using the same port number, but can rotate to a higher port number if it fails for any reason. If a failure occurs for any reason, a .changeport.hostname.port file is created to signal node 0 that there was a problem and a new port is being used.

Node 0 tries to contact the listen socket on the default port on each host to make initial contact. This will succeed in most cases, but if not node 0 looks for the .changeport file that will signal it to try a different port. This is really only a backup mechanism that makes the system a little more robust against hanging processes from previous runs. However, it will eventually be used to allow multiple MP_Lite jobs to be run concurrently on the same nodes. More work will need to be done before this is possible though.

Once node 0 has all the port numbers accumulated, it sends them to each node in turn. The nodes then start handshaking with each other, with each node accepting connections from each of the lesser nodes and initiating the connections to the greater nodes.

Sync ports

The MP_Sync() function uses a separate port to connect each node to its neighbor below and the one above. This allows a barrier synchronization to be done without being affected by data coming through the primary TCP buffers. The barrier synchronization is done by sending a message from node 0 to node 1, which sends it to node 2, etc., until it is cycled back to node 0. At this point, all nodes have reached the barrier and node 0 cycles another go signal around to all nodes. This double cycle is not the most efficient way of doing a synchronization, but it does provide an absolute barrier.