sockimpl.h

jwatte's picture

#if !defined( etwork_sockimpl_h )
#define etwork_sockimpl_h
 
#include "etwork/etwork.h"
#include "etwork/locker.h"
#include "etwork/buffer.h"
#include "etwork/timer.h"
#include "etwork/errors.h"
#include "etwork/notify.h"
 
#if defined( WIN32 )
#include <windows.h>
#endif
 
#include <stdio.h>  //  for _snprintf
#include <math.h>
 
#include <string>
#include <map>
#include <set>
#include <deque>
#include <algorithm>
#include <new>
 
#include "eimpl.h"
 
 
namespace etwork {
  class Socket;
  class SocketManager : public ISocketManager {
    public:
      SocketManager();
      ~SocketManager();
      bool open( EtworkSettings * settings );
 
      //  ISocketManager
      virtual int poll( double seconds, ISocket ** outActive, int maxActive );
      virtual int accept( ISocket ** outAccepted, int maxAccepted );
      virtual int connect( char const * address, unsigned short port, ISocket ** outConnected );
      virtual void dispose();
 
      void debug_sock_error( ISocket * sock, int err, ErrorArea area, char const * func );
      void remove_socket( Socket * s );
      void regenerate_sets();
      SOCKET socket_id();
      bool handle_listening_read( size_t maxActive );
      bool handle_listening_write( size_t maxActive );
      bool handle_listening_except( size_t maxActive );
      void change_queuing_space();
      void timeout_sockets();
 
      EtworkSettings settings_;
      SOCKET listening_;
      typedef std::map< SOCKET, Socket * > SocketMap;
      SocketMap sockets_;
      typedef std::map< sockaddr_in, Socket * > AddressMap;
      AddressMap socketAddrs_;
      std::deque< Socket * > accepted_;
      Timer time_;
      std::set< ISocket * > active_;
      std::set< ISocket * > notify_;
 
      size_t maxNumSocks_;
      size_t numSocks_;
      size_t maxSock_;
      fd_set * allSet_;
      fd_set * readSet_;
      fd_set * writeSet_;
      fd_set * writeTempSet_;
      fd_set * exceptSet_;
      char * tmpBuffer_;
 
      int nextSocket_;
      int curQueueSpace_;
      double curTime_;
  };
 
  class Socket : public ISocket {
    public:
      Socket( SocketManager * mgr, SOCKET s, sockaddr_in const & addr ) :
        mgr_( mgr ), s_( s ), closed_( false ), accepted_( false ), addr_( addr ),
        bufIn_( mgr->settings_.maxMessageSize, mgr->settings_.queueSize, mgr->settings_.maxMessageCount ),
        bufOut_( mgr->settings_.maxMessageSize, mgr->settings_.queueSize, mgr->settings_.maxMessageCount )
      {
        if( !mgr->settings_.reliable ) {
          s_ = mgr->socket_id();
        }
        writebuf_ = new char[ mgr->settings_.queueSize ];
        writebufData_ = 0;
        data_ = 0;    //  this is the only time I touch the "data" member
        notify_ = 0;
        lastActive_ = mgr_->curTime_;
        lastKeepalive_ = 0;
      }
      ~Socket() {
        close_socket();
        delete[] writebuf_;
      }
 
      //  ISocket
      virtual sockaddr_in address();
      virtual int read( void * buffer, size_t maxSize );
      virtual int write( void const * buffer, size_t size );
      virtual bool closed();
      virtual void dispose();
 
      bool wants_to_write()
      {
        return writebufData_ > 0 || bufOut_.message_count() > 0;
      }
      bool wants_to_read()
      {
        return bufOut_.space_used() < mgr_->settings_.queueSize-mgr_->settings_.maxMessageSize
            && bufOut_.message_count() < mgr_->settings_.maxMessageCount;
      }
      bool do_read();
      bool do_write();
      bool do_except();
      void close_socket()
      {
        if( !closed_ ) {
          closed_ = true;
          mgr_->remove_socket( this );
          if( !IS_SOCKET_ERROR( s_ ) && mgr_->settings_.reliable ) {
            ::closesocket( s_ );
          }
          s_ = INVALID_SOCKET;
        }
      }
 
      SocketManager * mgr_;
      INotify * notify_;
      SOCKET s_;
      bool closed_;
      bool accepted_;
      sockaddr_in addr_;
      Buffer bufIn_;
      Buffer bufOut_;
      double lastActive_; //  for timeouts
      double lastKeepalive_;
 
      //  because of async I/O, I actually need a third buffer for writing
      char * writebuf_;
      size_t writebufData_;
  };
}
 
#endif  //  etwork_sockimpl_h