Hola, les traigo un code para la lectura y escritura en windows asincrona.
Se puede usar para pipes,ficheros,socket, etc
En teoria soporta una gran carga de trabajo al usar IOCP y overlapped.
Los IO completion port permiten crear una cola de solicitudes de I/O a medida que se van completando se pueden ir recuperando con GetQueuedCompletionStatus.
Esto es mas eficiente que crear un hilo por cada solicitud, a partir de un cierto numero de hilos la sobrecarga del cambio de tareas empieza a ser significativa y la memoria requerida para crear esso hilos se dispara.
Con este metodo un pequeño numero de hilos puede atender a una gran cantidad de solicitudes.
Esto es ideal para crear servidores tcp que podran atender a un gran numero de clientes
Aqui tienen el code:
iodevice.h
#ifndef IODEVICE_H
#define IODEVICE_H
#include <windows.h>
#include "basicTypes.h"
struct IODevice;
typedef void(*ioCompletionCallback)(struct IODevice* io,byte* buff,ulong BytesTransfered,ulong op,ulong status);
//Op codes for IOCP
#define OP_READ 0
#define OP_WRITE 1
#define OP_EOF 2
#define OP_CLOSE 3
#define STATUS_CANCELLED 0xC0000120
typedef struct OVERLAPPEDex{
OVERLAPPED Overlapped;
int opCode;
byte* buff;
}OVERLAPPEDex;
typedef int (*IODeviceDispatchIoCompletion)(struct IODevice* io,OVERLAPPEDex* Overlapped,ulong BytesTransfered,bool result);
typedef int (*IODeviceWrite)(struct IODevice* io,const byte* buff,ulong nBytes);
typedef int (*IODeviceRead)(struct IODevice* io,byte* buff,ulong maxBuff);
typedef void (*IODeviceCloseHandle)(struct IODevice* io);
typedef struct IODevice{
enum mode{
m_none = 0,
m_read = 1,
m_write = 2,
m_rw = 3
}mode;
IODeviceRead read;
IODeviceWrite write;
IODeviceDispatchIoCompletion dispatchIoCompletion;
ioCompletionCallback ioCompletion;
IODeviceCloseHandle closeHandle;
ulong error;
void* context;
}IODevice;
struct IODevice* createIODevice(HANDLE h,ulong mode);
HANDLE IODevice_getHandle(struct IODevice* io);
void closeIODevice(struct IODevice* io);
void freeIODevice(struct IODevice *io);
#endif // IODEVICE_H
iodevice.c
#include "iodevice.h"
#include <stdio.h>
HANDLE completionPort = null;
void CALLBACK ThreadIoCompletion(int);
void IODevice_closeHandle(struct IODevice *io);
bool inicializeIOCP(){
bool result = false;
if(completionPort == null){
completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
if(completionPort != null){
int i;
for(i = 0;i<8;i++){
CreateThread(null,0,(LPTHREAD_START_ROUTINE)&ThreadIoCompletion,(LPVOID)i,0,null);
}
result = true;
}else{
printf("Error creating IOCP %d\n", (int)GetLastError());
}
}else {
result = true;
}
return result;
}
typedef struct IODevicePrivate{
struct IODevice device;
HANDLE hFile;
int mode;
OVERLAPPEDex rOverlapped;
OVERLAPPEDex wOverlapped;
}IODevicePrivate;
int IODevice_read(struct IODevice* io,byte* buff,ulong maxBuff){
ulong NumberOfBytesReated = -1;
if(io && (((IODevicePrivate*)io)->mode & m_read)){
ioCompletionCallback callback = null;
OVERLAPPEDex* Overlapped = &((IODevicePrivate*)io)->rOverlapped;
ZeroMemory(Overlapped,sizeof(*Overlapped));
Overlapped->opCode = OP_READ;
Overlapped->buff = buff;
if(!ReadFile(((IODevicePrivate*)io)->hFile
,buff,maxBuff,&NumberOfBytesReated,(OVERLAPPED*)Overlapped)){
int error = GetLastError();
switch (error) {
case ERROR_IO_PENDING:
// si el modo asincrono no esta activado, esperamos a que complete
callback = ((IODevicePrivate*)io)->device.ioCompletion;
if(!callback){
GetOverlappedResult(((IODevicePrivate*)io)->hFile,
(OVERLAPPED*)Overlapped,&NumberOfBytesReated,true)
}
break;
default:
printf("ReadFile error: %d\n",error);
break;
}
}
}
return NumberOfBytesReated;
}
int IODevice_write(struct IODevice* io,const byte* buff,ulong nBytes){
ulong NumberOfBytesWritten = -1;
if(io && (((IODevicePrivate*)io)->mode & m_write)){
ioCompletionCallback callback = null;
OVERLAPPEDex* Overlapped = &((IODevicePrivate*)io)->wOverlapped;
Overlapped->opCode = OP_WRITE;
Overlapped->buff = (byte*)buff;
if(!WriteFile(((IODevicePrivate*)io)->hFile
,buff,nBytes,&NumberOfBytesWritten,(OVERLAPPED*)Overlapped)){
int error = GetLastError();
switch (error) {
case ERROR_IO_PENDING:
// si el modo asincrono no esta activado, esperamos a que complete
callback = io->ioCompletion;
if(!callback){
GetOverlappedResult(((IODevicePrivate*)io)->hFile,
(OVERLAPPED*)Overlapped,&NumberOfBytesWritten,true)
}
break;
default:
printf("WriteFile error: %d\n",error);
break;
}
}
}
return NumberOfBytesWritten;
}
int IODevice_dispatchIoCompletion(IODevicePrivate* io,OVERLAPPEDex* Overlapped,ulong BytesTransfered,bool result){
if(io){
byte* buff = null;
int opCode,errorCode = 0;
if(Overlapped){
buff = Overlapped->buff;
errorCode = Overlapped->Overlapped.Internal;
opCode = Overlapped->opCode;
if(result && !BytesTransfered && opCode == OP_READ){
opCode = OP_EOF;
printf("OP_EOF %u\n",errorCode);
}
}else{
opCode = OP_CLOSE;
}
switch(opCode){
case OP_EOF:
case OP_READ:
case OP_WRITE:
if(io->device.ioCompletion)io->device.ioCompletion((IODevice*)io,buff,
BytesTransfered,opCode,
errorCode);
break;
case OP_CLOSE:
if(io->device.ioCompletion)io->device.ioCompletion((IODevice*)io,buff,
BytesTransfered,opCode,
errorCode);
io->device.closeHandle((IODevice*)io);
free(io);
default:
;
}
}
return 0;
}
struct IODevice* createIODevice(HANDLE h,ulong mode){
IODevicePrivate* ioDevice = null;
if(inicializeIOCP()){
ioDevice = malloc(sizeof(IODevicePrivate));
if(ioDevice){
ZeroMemory(ioDevice,sizeof(*ioDevice));
ioDevice->mode = mode;
ioDevice->hFile = h;
ioDevice->device.read = &IODevice_read;
ioDevice->device.write = &IODevice_write;
ioDevice->device.dispatchIoCompletion = (IODeviceDispatchIoCompletion)&IODevice_dispatchIoCompletion;
ioDevice->device.closeHandle = &IODevice_closeHandle;
CreateIoCompletionPort(h,completionPort,(DWORD)ioDevice,0);
}
}
return (struct IODevice*)ioDevice;
}
void CALLBACK ThreadIoCompletion(int lpParam){
int nThreadNo = lpParam;
OVERLAPPEDex *pOverlapped = NULL;
IODevicePrivate *pContext = NULL;
DWORD dwBytesTransfered = 0;
bool result = true;
while (result) {
result = GetQueuedCompletionStatus(
completionPort,
&dwBytesTransfered,
(LPDWORD)&pContext,
(LPOVERLAPPED*)&pOverlapped,
INFINITE);
if (!pContext){
//Estan pidiendo parar
break;
}
printf("thread dispatching: %d\n",nThreadNo);
pContext->device.dispatchIoCompletion((struct IODevice*)pContext,pOverlapped,dwBytesTransfered,result);
}
}
void closeIODevice(struct IODevice *io){
//Cancelamos las operaciones pendientes y enviamos un mensaje a la cosa sin
if(!CancelIo(((IODevicePrivate*)io)->hFile))
printf("CancelIo error: %u\n",(uint)GetLastError());
//Cambiamos el modo a none para que no se pueda llamar mas a read y a write
((IODevicePrivate*)io)->mode = m_none;
PostQueuedCompletionStatus(completionPort,0,(uint)io,null);
}
void IODevice_closeHandle(struct IODevice *io){
CloseHandle(((IODevicePrivate*)io)->hFile);
}
HANDLE IODevice_getHandle(struct IODevice *io){
return ((IODevicePrivate*)io)->hFile;
}
Saludos
(http://i.imgur.com/fbcMHqO.gif)
Buen código ;D Sólo faltaría un ejemplo de uso ::)
Aqui te oingo unos trozos de un codigo donde lo uso
//En esta funcion se aceptarian clientes del socket
int Socket_listen(ushort port,AcceptClientsCallback onAcceptClients){
.....
.....
while ((clientSocket = accept(serverSocket,(SOCKADDR*)&clientAddr,&clientAddrSize)) != INVALID_SOCKET) {
//Aqui se crea el IODevice a partir del socket
IODevice* ioClient = createIODevice((HANDLE)clientSocket,m_rw);
if(ioClient){
//Luego si hace falta cambiar alguna funcion como la de close, para casos especificos
ioClient->closeHandle = &Net_closeSocket;
onAcceptClients((struct sockaddr*)&clientAddr,ioClient);
}else{
printf("createIODevice error\n");
closesocket(clientSocket);
}
}
...
...
}
void httpClientDispatcher(struct sockaddr* addr,struct IODevice* io){
//stream_printf(out,"hello\n");
Http* newHttp = (Http*)malloc(sizeof(*newHttp));
ZeroMemory(newHttp,sizeof(*newHttp));
newHttp->maxRead = 1024;
newHttp->readBuff = malloc(newHttp->maxRead);
//Aqui se pone la funcion que atendera las notificaciones de datos leidos, escritos, fin de fichero y close
io->context = newHttp;
io->ioCompletion = &http_ioCompletion;
/*
*Aqui se empieza una solicitud de lectura y otra se escritura, pueden estar pendientes las 2 a la vez
* Para detectar el fin de fichero hay que intentar leer
*/
io->read(io,newHttp->readBuff,newHttp->maxRead);
io->write(io,(const byte*)"hola\n",sizeof("hola\n"));
}
void http_ioCompletion(struct IODevice* io,byte* buff,ulong BytesTransfered,ulong op,ulong status){
Http* http = (Http*)io->context;
//Aqui se manejan las diferentes notificaciones
switch (op) {
case OP_WRITE:
printf("http BytesWrited: %u, data: %s\n",(uint)BytesTransfered,buff);
break;
case OP_READ:
if(status != STATUS_CANCELLED && status != ERROR_PROCESS_ABORTED){
bool continueRead = true;
printf("http BytesReated: %s\n",buff);
ZeroMemory(http->readBuff,http->maxRead);
if(strcmp("bye",buff)==0){
closeIODevice(io);
continueRead = false;
}
if(continueRead)io->read(io,http->readBuff,http->maxRead);
}
break;
case OP_EOF:
closeIODevice(io);
break;
case OP_CLOSE:
printf("Cliente desconectado\n");
break;
default:
break;
}
}