Kim Nevelsteen, 2003-07-23

I/O Completion Port model for socket handling (Delphi)



The I/O Completion Port model is a Microsoft specific (sorry Linux freaks) mechanism to handle a massive amount of sockets with only a few threads. In classic socket handling, one thread is assigned per socket. This, however, is not scalable to a huge numbers of sockets and so this technology will prove essential for programming a server to handle thousands of clients. This technology has been around since Windows NT, but due to its complexity is still not widely implemented.

Reference: Microsoft Network Programming, pg.261
Because of the complexity of its design, you should consider using the Completion Port model only if you need your application to manage hundreds or even thousands of sockets simultaneously and you want your application to scale well when more CPUs are added to the system. The most important point to remember is that the I/O Completion Port model is your best choice if you are developing a high-performance server for Windows NT or Windows 2000 that is expected to service many socket I/O requests.

I have found multiple references with example code for Completion Ports. I did an extensive study on the works because the example code in each was rather different. This to know the nitty gritty details behind Completion Ports.

Here is a list of references:

I started with the example code found in MAPP because it seemed more verbose than that given in NETP. However, since it is a Microsoft specification I tried to become aware of the reasoning behind the code given by Microsoft.

Before implementing Completion Ports in Delphi, we will have to take care of the preliminaries. The first to import Winsock 2 (Winsock 2.2 required) and second to define the structures that will be passed to the Windows Completion Port api functions. I have noticed that Delphi Project-JEDI has an implmentation of Winsock2 available. However, I have not tested to see if it has the manitory definitions needed to implement Completion Ports.

1. The Preliminaries - defining structures and api calls



In Winsock2.pas I have defined WSASocket, WSAAccept, WSARecv, WSASend, and WSASendDisconnect functions directly out of the Windows api. WSARecv makes use of a structure called WSABuf, which is used to receive data from the Completion Port functions. WSABuf, in turn, makes use of a dynamic array of characters plus a length denominator. To avoid Delphi compiler problems recognizing the type equality between the structure member and the argument passed to the Completion Port functions, I have defined the type ArrayOfChar.

Now to define the structures needed to get the Completion Ports fully functional. In MAPP, the authors opt to define a context in which the socket is defined consisting of two overlapped I/O structures and a general buffer with length denominator. On close inspection of the code in NETP, I found a difference in implementations. The people at Microsoft chose to define PerHandleData with the socket handle and a general buffer and a separate structure called PerIoData with one overlapped structure and an I/O specific buffer. After lengthy testing with Completion Ports I found the reason for the difference to be usage dependent. The implementation of MAPP is not incorrect, but does, however, limit the number of pending I/O operations to one input and one output operation. In the Microsoft design the user is free to post as many pending I/O operations as desired, seeing as a separate overlapped I/O structure can be made for each post. This does, however, add the burden of keeping track of each created overlapped structure posted to the Completion Port.
I have chosen the more versatile implementation provided by Microsoft.


  TOperationType= (opQuit, opInput, opOutput);

  TPerHandleData= record
    socket : TSocket;
    acMsg : ArrayOfChar;
  end;
  PPerHandleData= ^TPerHandleData;

  TPerIoData= record
    ov : TOverlapped;
    ioType : TOperationType;
    ioBuf : TWSABuf;
  end;
  PPerIoData= ^TPerIoData;

The same PerHandleData structure shall be passed by the Completion Port to each thread that will handle a completed I/O operation. This essentially means that a race condition can occur between two threads trying to access this structure simultaneously. This is not as large of a problem as you might expect, because the Completion Port will serialize I/O operations for you so that data is sent and received in the correct order. This leaves one remaining case where such a race condition can occur and it being of nature that it can be solved without the use of critical sections. I explain fully below. For the purpose of this explanation I have an ArrayOfChar in the PerHandleData structure that will hold all read (partially complete) data.

The PerIoData structure is cleared or recreated before each new I/O operation. Please note that it is not necessary to create the Overlapped.hEvent by definition of the specs, but you may create the event handle (and close it) if you so desire. For a read operation the WSABuf.len shall be set to the maximum number of bytes to be read in one operation whereas a in a write operation it will denote the exact number of bytes to send through the Completion Port. OperationType is included to be able to differentiate between operations in the worker threads. I also use this to post a quit to the worker thread, but the programmer is free to implement a different trigger as desired by using the CompletionKey and/or the PerIoData structure. It might also be worth mentioning that when you pass the Overlapped structure while doing a read or write operation only the memory equal to the size of the Overlapped structure is tainted and therefore additional data appended or preceding the Overlapped structure remains untouched. For clarity I have placed the Overlapped structure first in the PerIoData structure.



2. Implementation - the nitty gritty



And now on to the implementation. When comparing the two implementations given in the references NETP and MAPP, I noticed different function calls were used. There were obvious similarities, but a few important differences to respect. The first which was made clear by the following clause in NETP which leaned my preference to the Microsoft implementation.

Reference: Microsoft Network Programming, pg.271
On a final note, Winsock applications should not use the ReadFile and WriteFile Win32 functions for processing I/O on a completion port in Winsock. These functions do feature an OVERLAPPED structure and can be successfully used on a completion port; however, the WSARecv and WSASend functions are better optimized for processing I/O in Winsock 2. Using ReadFile and WriteFile involves making many more unnecessary kernel/user mode procedure call transitions, thread context switches, and parameter marshaling, resulting in a significant performance penalty.

When programming Completion Ports it is important that the mechanism behind the code is understood.

Completion Ports is based on Overlapped I/O (also known as non-blocking or asynchronous I/O). It allows the programmer to post an I/O operation to the operating system and be notified of its completion. This allows threads to be free for other tasks while (usually lengthy) I/O operations are taking place.



2.1 Posting Overlapped I/O - IssueRead/Write



After assigning a socket to a Completion Port the programmer can post any number of reads or writes to a socket using Overlapped I/O provided a unique PerIoData structure is used for each call. After the socket connect and assign is complete, the programmer can initiate I/O operations as desired, sending data immediately or initiating a wait for client data by posting an initial read. Reads and writes will be serialized separately so that reading and writing can happen simultaneously and critical sections can be avoided for the most part if the programmer takes care.

Reference: MSDN: WSARecv
The lpOverlapped parameter must be valid for the duration of the overlapped operation. If multiple I/O operations are simultaneously outstanding, each must reference a separate WSAOVERLAPPED structure.

Whether it be a read or write, after and only after the I/O is complete should action be taken relative to the data contained in the WSABuf. A single thread gets released from the wait state produced by the function GetQueuedCompletionStatus for each I/O that has completed, at which time it is appropriate to manipulate the data contained in WSABuf. The arguments of GetQueuedCompletionStatus specify how many bytes were sent or received, a Completionkey in which I pass the PerHandleData (as adopted from NETP), and the unique Overlapped I/O structure that corresponds to the PerIoData structure (again as specified in NETP) which identifies exactly which I/O operation has been completed. See the definition of GetQueuedCompletionStatus below for details.

*PITFALL: When I first started playing around with Completion Ports I attempted to read out the data after the WSARecv function for the initial read and subsequent reads. I tested to see if the data was available and attempted to write this to the PerHandleData specific buffer. When stress testing I noticed that the data was not always immediately available and that a race condition existed between the initial read and the read done by the first released worker thread. I even introduced critical sections to try and solve this race condition. This is the wrong approach. Critical sections can be avoided for the IssueRead if Completion Ports are programmed correctly. The Completion Port documentation states that a thread gets released when an I/O operation is complete. Therefore you can leave the data buffer as is until a worker thread is released, after which you can read the buffer without entering a race condition with the listener thread or other threads (provided you use one unique PerIoData structure per I/O operation).

The task of correctly ordering read and write posts can be entrusted to the Completion Port. A single and possibly totally thread can be released to handle each complete I/O operation.

The two predominate functions that will be called multiple times I have declared as IssueRead and IssueWrite. I annotate the IssueRead function for you leaving out the IssueWrite function because it is analogous with the minor difference of copying the write contents to the WSABuf before WSASend is called.

*NOTE: if you do NOT want the Completion Port to handle write output, you may pass nil instead of an Overlapped structure (WSASend specs). This, however, does not allow simultaneous send and receives using the Completion Port mechanism, but causes a thread to be locked while doing the write.


  function IssueRead(pPH : PPerHandleData) : PPerIoData;
  var
    pPIO : PPerIoData;
    lwNumRead, lwFlags : longword;
    iR : integer;
  begin
    pPIO:= CreatePerIoData(opInput);

    lwNumRead:= 0; lwFlags:= 0;
    iR:= WSARecv(pPH^.socket,@(pPIO^.ioBuf),1,lwNumRead,lwFlags,@(pPIO^.ov),nil);

    // IF ( (iR=-1) and (WSAGetLastError=WSA_IO_PENDING) )
    //   THEN connection has been established and data is pending
    //
    // IF ( (iR=0)  and (WSAGetLastError=0) )
    //   THEN possible that connection has been made, but no data available yet

    if (iR<>0) and (WSAGetLastError<>WSA_IO_PENDING) then begin
      DestroyPerIoData(pPIO);
      raise Exception.Create('IssueRead failure: WSAError='+ FormatResult(WSAGetLastError));
    end;

    result:= pPIO;
  end;

The programmer may use the IssueRead and IssueWrite functions as often as he/she pleases. The function returns a specific PerIoData for each post and checks to make sure the return value is either WSA_IO_PENDING or ZERO depending on the current state of the I/O.

MSDN Library: GetQueuedCompletionStatus
  BOOL GetQueuedCompletionStatus(
    HANDLE CompletionPort,       // handle to completion port
    LPDWORD lpNumberOfBytes,     // bytes transferred
    PULONG_PTR lpCompletionKey,  // file completion key
    LPOVERLAPPED *lpOverlapped,  // buffer
    DWORD dwMilliseconds         // optional timeout value
  );
Parameters
CompletionPort
[in] Handle to the completion port of interest. To create a completion port, use the CreateIoCompletionPort function.
lpNumberOfBytes
[out] Pointer to a variable that receives the number of bytes transferred during an I/O operation that has completed.
lpCompletionKey
[out] Pointer to a variable that receives the completion key value associated with the file handle whose I/O operation has completed. A completion key is a per-file key that is specified in a call to CreateIoCompletionPort.
lpOverlapped
[out] Pointer to a variable that receives the address of the OVERLAPPED structure that was specified when the completed I/O operation was started.
dwMilliseconds
[in] Specifies the number of milliseconds that the caller is willing to wait for an completion packet to appear at the completion port. If a completion packet doesn't appear within the specified time, the function times out, returns FALSE, and sets *lpOverlapped to NULL.

If dwMilliseconds is INFINITE, the function will never time out. If dwMilliseconds is zero and there is no I/O operation to dequeue, the function will time out immediately.



2.2 Worker Threads - completed I/O means a released thread



At this point it is time to move on and discuss how the worker threads will handle the arguments returned by GetQueuedCompletionStatus upon the release from a wait state. There are essentially four major different cases that could be returned. I will label these as No Dequeue, Connection Closed, Failed IO, and Successful IO where the last contains the sub cases Input, Output and Quit.

 
  while NOT bQuit do begin
  
    bResult:= GetQueuedCompletionStatus(
      fhCompletionPort,rNrBytesTransferred,Cardinal(pPH),POverlapped(pPIO),INFINITE);
  
    Trace('released...');
  
    if (NOT bResult) and (pPIO=nil) then begin
      ... // function does not dequeue a completion packet
    end
  
    else if (NOT bResult) and (pPIO<>nil) then begin
      ... // dequeues a completion packet for a failed I/O operation
    end
  
    else if (rNrBytesTransferred=0) then begin
      ... // EOF : connection closed
    end
  
    else begin // function dequeues a completion packet for a successful I/O operation!
  
      try
        if (pPIO^.ioType=opQuit) then bQuit:= true 
                
        if (pPIO^.ioType=opInput) then begin
            
          // sMsg= [out] variable
          if (ReadIoDataToMsg(pPH^.acMsg,pPIO^.ioBuf.buf,rNrBytesTransferred,sMsg)) then 
          begin
            IssueWrite(pPH,sMsg); // ECHO to client
          end;
            
          IssueRead(pPH);
  
        end
        else if (pPIO^.ioType=opOutput) then begin
          ... // is rNrBytesTransferred= complete message ? else resend?
        end;
          
      finally DestroyPerIoData(pPIO); end;
      
      // Inserting a sleep here causes the operating system to consider the thread locked 
      // and busy forcing different threads to handle each of the completed I/O operations. 
      Sleep(200);
    end;
   
  end;

In the case of No Dequeue, a thread is released with PerIoData being invalid (=nil). In this case either the programmer has defined a timeout in GetQueuedCompletionStatus which has elapsed (at which point GetLastError would return WAIT_TIMEOUT) or an internal error has occurred.

The course of action I take for Connection Closed and Failed IO is the same. In both cases PerIoData is valid (!=nil) so I delete the structure followed by closing the socket found in the PerHandleData structure. The connection was closed cleanly by the client if GetQueuedCompletionStatus returned true. Otherwise, we find that the client has probably dropped the connection prematurely or has released its socket resources without properly closing the connection. Therefore the server can take it upon itself to close the connection at this time and free its own resources. The argument NrBytesTransferred can be disregard here.

The last and most frequently used case and therefore inherently the most complex is a Successful IO operation. As mentioned this can be split into three sub cases of Input, Output and Quit. The arguments of GetQueuedCompletionStatus will not reveal if the completed I/O was input or output and is therefore it is the responsibility of the programmer to include a mechanism in PerIoData to differentiate between the operations. I found it convenient to include a quit mechanism in the same fashion. Regardless of which sub case is selected there is always a valid PerIoData returned by GetQueuedCompletionStatus and therefore this must be freed before the thread continues on to the next completed I/O.

To explain the aspects of Input and Output I am going to take a more general approach considering the exact functionality is program dependent. A good understanding of the recursive working is important.

Keep in mind that for each IssueRead/Write there will be a complimentary release of a thread resulting in the case Successful IO (except in the event of an internal error, of course). The thread being released is NOT necessarily the same as the one that issued the read or write. It is also extremely important to keep in mind that, as SOON as the IssueRead/Write is called it is best to assume a parallel running thread is already busy handling the completed I/O of the current IssueRead/Write before the call exits. Failure to adhere to this thinking will result in a race condition between threads. So what does this mean practically? Common memory resources should not be accessed after the IssueRead/Write is executed, including essentially the PerHandleData structure and members thereof.



2.3 Successful I/O sub cases - a more concrete design example



In the case of the echo server (one which simply echoes the received data back to the client) I would want the server to start listening for data immediately after the connect. This means that I post an initial IssueRead somewhere after the socket accept to cause the server to enter wait for client data. Thereafter all communication will be handled by the released threads of GetQueuedCompletionStatus. If for example you would want the server to send out handshake data after the socket accept, an initial write could be issued followed by the released threads finishing off the handshake and starting communication.

To further implement the echo server we can handle a Successful IO Input by reading the data out of the PerIoData buffer and calling IssueWrite to echo the data back to the client. If the data is only partial then it can be placed in the global buffer of PerHandleData before the IssueWrite (to avoid Critical Sections). In the event of an exception from IssueWrite I opt to call PostQueuedCompletionStatus with the buffer size of WSABuf set to ZERO so that the next released thread handling the post will close the connection due to the fact that rNrBytesTransferred will be ZERO selecting proper case. Now we have a design question that we need to answer. Since input and output can be handled simultaneously, we have the choice of issuing another IssueRead causing another wait for client data -OR- wait and allow the thread corresponding to the IssueWrite we just called to issue another IssueRead. Again, this is a design question with implications on server performance as well.


  ...

  try IssueWrite(pPH,sMsg); // ECHO to client
  except
    on e : Exception do begin
      Trace(lcSend,e.Message);

      pPIOx:= CreatePerIoData(opOutput,0);
      PostQueuedCompletionStatus(fhCompletionPort,0,Cardinal(pPH),@(pPIOx^.ov));
      exit;
    end;
  end;

{ Be aware that another thread than the current one can process the IssueWrite
  above while the current thread is *HERE*. }
  
  ...

When a thread is released and the case Successful IO Output is reached, this is a signal from the system that some or all of our data has been sent to the client. If only part of the data was transferred more IssueWrites can be called in an attempt to complete the transfer. As stated before, it is the option of the programmer and a design question whether to call an IssueRead in this sub case causing a wait for client data.

To implement part of the cleanup procedure that I will explain below, I have defined a case Successful IO Quit. The sole purpose of this case is to cause a thread to exit. This is only used when the server is ready to shutdown or possibly an unrecoverable error has occurred requiring thread death. Normal operation of the server means that released threads are looped back to the function GetQueuedCompletionStatus so as to enter a wait state ready to service another completed I/O operation.



3. CleanUp - close sockets, exit threads and close handles



To finish off the implementation we must do a cleanup of resources before shutting down the server. This warrants the freeing of resources combined with the killing of threads done in a specific order to ensure a clean shutdown.

Reference: Microsoft Network Programming, pg.269
The main thing to avoid is freeing an OVERLAPPED structure when an overlapped I/O operation is in progress. The best way to prevent this is to call closesocket on every socket handle -- any overlapped I/O operations pending will complete. Once all socket handles are closed, you need to terminate all worker threads ...

Depending on how you have your thread hierarchy built up, determines how you must break it down. In my final implementation I have a master thread which sends Windows messages back and forth between other threads to communicate events and processing commands. The master thread is responsible for creation of all subsequent threads needed, be it as listener or worker. When the master thread gets the signal for a shutdown it is best to respect the following shutdown order.

First, closing the listener socket ensures that no new connections are pending. In the listening loop it can be detected that the listening socket has been closed. At this point the listener thread can send a message to the master thread that it is dying and exit itself. When the listener's message arrives in the master thread it can be used to trigger the next step which is killing off the worker threads.

To realize the next step I created a HandleMgr class to handle and keep track of all socket handles with corresponding info. Using this class I will do exactly what the NETP reference above states, call closesocket on all open sockets. In my implementation I have the worker threads send a message to the master thread signaling that a particular socket is closed. The master thread can then call the HandleMgr to free that socket and other resources. Because we called a close on all sockets, the next signal that we can use to continue our shutdown is when the HandleMgr's socket list of acquired resources is empty. At this point all remaining worker threads can be told to shutdown. To perform this elegantly PostQueuedCompletionStatus can be called once for each worker thread running. Each worker thread will enter the case Successful IO Quit and exit its processing loop.

If the master thread has kept track of all the threads in its possession it should be able to detect that all child threads are dead, listener thread and all worker threads. It is now safe for the master thread to close the Completion Port handle, free other resources and finally exit gracefully. Of course in life things are not always ideal and therefore a "time to wait" should be placed on events so that the shutdown procedure does not hang due to certain resources that have frozen in operation.

This concludes the implementation of Completion Ports for socket handling.



4. Windows Limits - thread max and increasing socket limit



This leaves only one more subject to be addressed which I have not mentioned so far. Since this is an article on how to practically implement Completion Ports two obstacles might be bumped into while stress testing the server. While I have tested the server to handle 20000+ connections, implementing a client to produce that many simultaneous sending and receiving connections proved tricky. The maximum number of threads per application I have found to be about 2000+. Therefore I created an application which started 1000 threads each opening its own connection to the server and looping through a send and receive. This application I started multiple times to test the server.

The second obstacle is that which requires a change in the operating system settings.

Reference: white paper, "Microsoft Windows 2000 TCP/IP Implementation Details"
By default, when an application requests any socket from the system to use for an outbound call, a port between the values of 1024 and 5000 is supplied. The MaxUserPorts parameter can be used to set the value of the uppermost port that the administrator chooses to allow for outbound connections.

HKLM\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters
REG_DWORD MaxUserPort= 5000(default) - 65534

This means that server's as well as the client's (for testing) registry settings will have to be set and a machine reboot exercised if more than 4000 connections is to be reached.


Success!