// Common, group Communication
// Copyright 2002 Alexander Liss
// Implementation of classes
// IPAddress, IPSocketAddress, ASocketBuffer,
// ASocket, ASocketNonBlocking, ASocketBlocking,
// ASocketListener
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
//#include <sys/time.h>
#include "AWait.h"
#include "AUniTime.h"
#include "ASocket.h"
// IPAddress
int IPAddress::set_address(unsigned long z)
{
address=host2net(z);
return 0;
}
int IPAddress::get_dotted(char * buf,unsigned limit)const
{
in_addr a;
a.s_addr=address;
strncpy(buf,::inet_ntoa(a),limit);
return 0;
}
int IPAddress::put_dotted(const char * buf)
{
in_addr_t z = ::inet_addr(buf);
if(z<0) return 1;
address=z;
return 0;
}
int IPAddress::put(const sockaddr& z)
{
sockaddr_in a;
memcpy(&a,&z,sizeof(a));
if(a.sin_family!=AF_INET) return 1;
address=a.sin_addr.s_addr;
return 0;
}
// IPSocketAddress
int IPSocketAddress::get(sockaddr& z)const
{
sockaddr_in a;
a.sin_family=AF_INET;
a.sin_port=port;
a.sin_addr.s_addr=address;
memset(a.sin_zero,0,8);
memcpy(&z,&a,sizeof(a));
return 0;
}
int IPSocketAddress::put(const sockaddr& z)
{
sockaddr_in a;
memcpy(&a,&z,sizeof(a));
if(a.sin_family!=AF_INET) return 1;
port=a.sin_port;
address=a.sin_addr.s_addr;
return 0;
}
// ASocket
ASocket::ASocket(int type):
status(0),
handle(-1)
{
handle = ::socket(AF_INET,type,0);
if(handle < 0) status=errno;
}
ASocket::~ASocket()
{ int g=0;
if(handle>=0)
{
shutdown();
for(int i=0;i<20;++i)
{
g=close(handle);
if(g==EINTR) continue;
break;
}
}
}
int ASocket::set_OOB_signalling()
{
if(status) return 1;
int g=fcntl(handle, F_SETOWN, getpid());
if(g==-1) return errno;
return 0;
}
int ASocket::set_asynchronous()
{
if(status) return 1;
return 0;
}
int ASocket::bind(const IPSocketAddress& z)
{
if(status) return 1;
sockaddr a;
z.get(a);
int g = ::bind(handle,&a,sizeof(a));
if(g) return errno;
return 0;
}
int ASocket::connect(const IPSocketAddress& z)
{
if(status) return 1;
int g=0;
sockaddr a;
z.get(a);
g = ::connect(handle,&a,sizeof(a));
if(g) return errno;
return get_peer_address(peer);
}
int ASocket::reconnect()
{
if(status) return 1;
return connect(peer);
}
int ASocket::get_own_address(IPSocketAddress& z)
{
if(status) return 1;
sockaddr a;
int size=sizeof(a);
int g=::getsockname(handle,&a,&size);
if(g) return errno;
z.put(a);
return 0;
}
int ASocket::get_peer_address(IPSocketAddress& z)
{
if(status) return 1;
sockaddr a;
int size=sizeof(a);
int g=::getpeername(handle,&a,&size);
if(g) return errno;
z.put(a);
return 0;
}
int ASocket::send(unsigned& r, const void *buf,unsigned len,int flags)
{
if(status) return 1;
r=0;
int g=::send(handle,buf,(int)len,flags);
if(g<0)
{
r=0;
return errno;
}
r=(unsigned)g;
return 0;
}
int ASocket::receive(unsigned& r, void *buf,unsigned limit,int flags)
{
if(status) return 1;
r=0;
int g=::recv(handle,buf,(int)limit,flags);
if(g<0)
{
r=0;
return errno;
}
r=(unsigned)g;
return 0;
}
int ASocket::send_to(unsigned& r,const void *buf,unsigned len,const IPSocketAddress& z,int flags)
{
if(status) return 1;
sockaddr a;
z.get(a);
r=0;
int g= ::sendto(handle,buf,(int)len,flags,&a,sizeof(a));
if(g<0)
{
r=0;
return errno;
}
r=(unsigned)g;
return 0;
}
int ASocket::receive_from(unsigned& r,void *buf,unsigned len, IPSocketAddress& z,int flags)
{
if(status) return 1;
sockaddr a;
int size=sizeof(a);
r=0;
int g=::recvfrom(handle,buf,(int)len,flags,&a,&size);
if(g<0)
{
r=0;
return errno;
}
z.put(a);
r=(unsigned)g;
return 0;
}
int ASocket::send( unsigned& r, const msghdr *const buffers,int flags)
{
if(status) return 1;
r=0;
int g=::sendmsg(handle,buffers,flags);
if(g<0)
{
r=0;
return errno;
}
r=(unsigned)g;
return 0;
}
int ASocket::receive(unsigned& r, msghdr *const buffers,int flags)
{
if(status) return 1;
r=0;
int g=::recvmsg(handle,buffers,flags);
if(g<0)
{
r=0;
return errno;
}
r=(unsigned)g;
return 0;
}
int ASocket::set_keep_alive()
{
if(status) return 1;
int z=1;
return set_option(SO_KEEPALIVE,&z,sizeof(z));
}
int ASocket::set_TCP_nodelay()
{
if(status) return 1;
int z=1;
return set_option(TCP_NODELAY,&z,sizeof(z));
}
int ASocket::linger(int seconds)
{
if(status) return 1;
struct linger z;
if(seconds>0)
{
z.l_linger=seconds;
z.l_onoff=1;
}
else
{
z.l_linger=0;
z.l_onoff=0;
}
return set_option(SO_LINGER,&z,sizeof(z));
}
int ASocket::get_send_buffer_size(int &size)
{
if(status) return 1;
int z=sizeof(int);
return get_option(SO_SNDBUF,&size,&z);
}
int ASocket::set_send_buffer_size(int size)
{
if(status) return 1;
int z=size;
return set_option(SO_SNDBUF,&z,sizeof(z));
}
int ASocket::get_receive_buffer_size(int &size)
{
if(status) return 1;
int z=sizeof(int);
return get_option(SO_RCVBUF,&size,&z);
}
int ASocket::set_receive_buffer_size(int size)
{
if(status) return 1;
int z=size;
return set_option(SO_RCVBUF,&z,sizeof(z));
}
bool ASocket::can_recover(int err)
{
if(
err == EAGAIN ||
err == EINTR ||
err == ENOBUFS ||
err == ENOMEM
)
return true;
return false;
}
// ASocketNonBlocking
// done is true when an entire size can be peeked
int ASocketNonBlocking::peek_step(bool& done,void *buf, unsigned size, int flags)
{
unsigned actual=0;
int g = receive(actual,buf,size,flags | MSG_PEEK);
if(g) return g;
if(actual>=size)
done=true;
else
done=false;
return 0;
}
// sends starting from buf[byte] up to (size - byte)
int ASocketNonBlocking::send_step(bool& done,unsigned& byte, const void *buf,unsigned size,int flags)
{
if(size<=byte)
{
done=true;
return 0;
}
unsigned actual=0;
int g = send(actual,((char*)buf+byte),size-byte,flags);
if(g) return g;
byte+=actual;
if(byte>=size)
done=true;
else
done=false;
return 0;
}
// appends to a buffer starting from byte up to (size - byte)
int ASocketNonBlocking::receive_step(bool& done,unsigned& byte, void *buf,unsigned size,int flags)
{
if(size<=byte)
{
done=true;
return 0;
}
unsigned actual=0;
int g = receive(actual,((char*)buf+byte),size-byte,flags);
if(g) return g;
byte+=actual;
if(byte>=size)
done=true;
else
done=false;
return 0;
}
int ASocketNonBlocking::connect_loop(unsigned timeout,unsigned sleep_slice)
{
IPSocketAddress t;
while(1)
{
int g=connect(t);
if(!g) return 0;
if(timeout==0) return 2;
unsigned left=0;
millisleep(left,sleep_slice);
if(timeout>sleep_slice) timeout-=sleep_slice;
else timeout=0;
}
return 1;
}
int ASocketNonBlocking::send_loop(unsigned& bytes,const void *buf,unsigned size,unsigned timeout,unsigned sleep_slice,int flags)
{
bool done=false;
while(1)
{
int g=send_step(done,bytes,buf,size,flags);
if(g && !can_recover(errno)) return 1;
if(done) break;
if(timeout==0) return 2;
unsigned left=0;
millisleep(left,sleep_slice);
if(timeout>sleep_slice) timeout-=sleep_slice;
else timeout=0;
}
return 0;
}
int ASocketNonBlocking::receive_loop(unsigned& bytes, void *buf,unsigned size,unsigned timeout,unsigned sleep_slice,int flags)
{
bool done=false;
while(1)
{
int g=receive_step(done,bytes,buf,size,flags);
if(g && !can_recover(errno)) return 1;
if(done) break;
if(timeout==0) return 2;
unsigned left=0;
millisleep(left,sleep_slice);
if(timeout>sleep_slice) timeout-=sleep_slice;
else timeout=0;
}
return 0;
}
int ASocketNonBlocking::send_loop2(unsigned& bytes,const void *buf,unsigned size,unsigned tt,int flags)
{
struct timeval t;
AUniTime end, timeout, current;
fd_set e, z;
bool done=false;
timeout.put_milliseconds(tt);
end.set_current();
end+=timeout;
while(1)
{
int g=send_step(done,bytes,buf,size,flags);
if(g && !can_recover(errno)) return 1;
if(done) break;
FD_ZERO(&e);
FD_ZERO(&z);
FD_SET(handle, &z);
e = z;
current.set_current();
g=sub(timeout,end,current);
if(!g) return 0; // timeout
timeout.get_timeval(t);
g = select( handle+1,
0, // read
&z, // write
&e, // error
&t );
if(g==0) return 0; // timeout
if ( FD_ISSET(handle, &e) ) return 1; // socket error
if(g < 0 && errno != EINTR) return 2; // select error
}
return 0;
}
int ASocketNonBlocking::receive_loop2(unsigned& bytes, void *buf,unsigned size,unsigned tt,int flags)
{
struct timeval t;
AUniTime end, timeout, current;
fd_set e, z;
bool done=false;
timeout.put_milliseconds(tt);
end.set_current();
end+=timeout;
while(1)
{
int g=receive_step(done,bytes,buf,size,flags);
if(g && !can_recover(errno)) return 1;
if(done) break;
FD_ZERO(&e);
FD_ZERO(&z);
FD_SET(handle, &z);
e = z;
current.set_current();
g=sub(timeout,end,current);
if(!g) return 0; // timeout
timeout.get_timeval(t);
g = select( handle+1,
&z, // read
0, // write
&e, // error
&t );
if(g==0) return 0; // timeout
if ( FD_ISSET(handle, &e) ) return 1; // socket error
if(g < 0 && errno != EINTR) return 2; // select error
}
return 0;
}
int ASocketNonBlocking::set_non_blocking()
{
if(status) return 1;
int g=fcntl(handle, F_SETFL, O_NONBLOCK);
if(g==-1) return errno;
return 0;
}
void ASocketNonBlocking::compiler_dummy(){}
// ASocketBlocking
int ASocketBlocking::set_recovery_timeout(unsigned z)
{
recovery_timeout=z;
return 0;
}
int ASocketBlocking::send_loop(unsigned& len, const void *buf,unsigned data_size,SOCKET_RECOVERY recovery,int flags)
{
if(status) return 1;
if(data_size==0) return 0;
int g=0,count=0;
unsigned z=0;
unsigned timeout=recovery_timeout;
len=0;
while(len<data_size)
{
g=send(z,(char*)buf+len,data_size-len,flags);
if(g || z==0)
{
if(recovery && can_recover(errno))
{
(*recovery)(timeout,count);
len+=z;
continue;
}
else
return errno;
}
len+=z;
}
return 0;
}
int ASocketBlocking::receive_loop(unsigned& len, void *buf,unsigned data_size,SOCKET_RECOVERY recovery,int flags)
{
if(status) return 1;
if(data_size==0) return 0;
int g=0,count=0;
unsigned z=0;
unsigned timeout=recovery_timeout;
len=0;
while(len<data_size)
{
g=receive(z,(char*)buf+len,data_size-len,flags);
if(g || z==0)
{
if(recovery && can_recover(errno))
{
(*recovery)(timeout,count);
len+=z;
continue;
}
else
return errno;
}
len+=z;
}
return 0;
}
void ASocketBlocking::compiler_dummy(){}
// ASocketNonBlockingPtr
void ASocketNonBlockingPtr::compiler_dummy0(){}
// ASocketBlockingPtr
void ASocketBlockingPtr::compiler_dummy0(){}
// ASocketListener
int ASocketListener::listen(int backlog)
{
int g=::listen(socket->get_handle(),backlog);
if(g) return errno;
return 0;
}
ASocketNonBlocking *ASocketListener::new_non_blocking_connection(IPAddress& z)
{
if(!is_set()) return 0;
sockaddr a;
int size=sizeof(a);
int s=-1;
s=::accept(socket->get_handle(),&a,&size);
if(s<0) return 0;
z.put(a);
ASocket::Marker m=ASocket::none;
return new ASocketNonBlocking(s,m);
}
ASocketBlocking *ASocketListener::new_blocking_connection(IPAddress& z)
{
if(!is_set()) return 0;
sockaddr a;
int size=sizeof(a);
int s=-1;
s=::accept(socket->get_handle(),&a,&size);
if(s<0) return 0;
z.put(a);
ASocket::Marker m=ASocket::none;
return new ASocketBlocking(s,m);
}
void ASocketListener::compiler_dummy1(){}
// IP_host
int IP_host(char * name,unsigned limit,IPAddress& a, int i)
{
hostent *hp;
if ( gethostname(name, limit) != 0 )
return 1;
if ( !(hp = gethostbyname(name)) )
return 2;
unsigned long z=0,z1=0;
memcpy ( &z, hp->h_addr_list[i], hp->h_length);
z1=net2host(z);
a.set_address(z1);
return 0;
}
void ASocketStdRecoveryStep(unsigned& milliseconds,int& )
{
if(milliseconds==0) return;
if(milliseconds>=10)
milliseconds-=10;
else
milliseconds=0;
unsigned left=0;
millisleep(left,10);
}