[SRC] I/O Asincrono windows

Iniciado por Arkangel, Junio 20, 2013, 12:10:37 PM

Tema anterior - Siguiente tema

0 Miembros y 1 Visitante están viendo este tema.

Junio 20, 2013, 12:10:37 PM Ultima modificación: Febrero 08, 2014, 05:37:05 PM por Expermicid
Hola, les traigo un code para la lectura y escritura en windows asincrona.
Se puede usar para pipes,ficheros,socket, etc
En teoria soporta una gran carga de trabajo al usar IOCP y overlapped.
Los IO completion port permiten crear una cola de solicitudes de I/O a medida que se van completando se pueden ir recuperando con GetQueuedCompletionStatus.
Esto es mas eficiente que crear un hilo por cada solicitud, a partir de un cierto numero de hilos la sobrecarga del cambio de tareas empieza a ser significativa y la memoria requerida para crear esso hilos se dispara.
Con este metodo un pequeño numero de hilos puede atender a una gran cantidad de solicitudes.
Esto es ideal para crear servidores tcp que podran atender a un gran numero  de clientes

Aqui tienen el code:

iodevice.h
Código: c

#ifndef IODEVICE_H
#define IODEVICE_H

#include <windows.h>
#include "basicTypes.h"

struct IODevice;
typedef void(*ioCompletionCallback)(struct IODevice* io,byte* buff,ulong BytesTransfered,ulong op,ulong status);


//Op codes for IOCP
#define OP_READ     0
#define OP_WRITE    1
#define OP_EOF      2
#define OP_CLOSE    3

#define STATUS_CANCELLED 0xC0000120

typedef struct OVERLAPPEDex{
    OVERLAPPED Overlapped;
    int opCode;
    byte* buff;
}OVERLAPPEDex;

typedef int (*IODeviceDispatchIoCompletion)(struct IODevice* io,OVERLAPPEDex* Overlapped,ulong BytesTransfered,bool result);
typedef int (*IODeviceWrite)(struct IODevice* io,const byte* buff,ulong nBytes);
typedef int (*IODeviceRead)(struct IODevice* io,byte* buff,ulong maxBuff);
typedef void (*IODeviceCloseHandle)(struct IODevice* io);

typedef struct IODevice{
        enum mode{
            m_none = 0,
            m_read = 1,
            m_write = 2,
            m_rw = 3
        }mode;

        IODeviceRead read;
        IODeviceWrite write;
        IODeviceDispatchIoCompletion dispatchIoCompletion;
        ioCompletionCallback ioCompletion;
        IODeviceCloseHandle closeHandle;
        ulong error;
        void* context;
}IODevice;
struct IODevice* createIODevice(HANDLE h,ulong mode);
HANDLE IODevice_getHandle(struct IODevice* io);
void closeIODevice(struct IODevice* io);
void freeIODevice(struct IODevice *io);

#endif // IODEVICE_H


iodevice.c
Código: c

#include "iodevice.h"
#include <stdio.h>


HANDLE completionPort = null;
void CALLBACK ThreadIoCompletion(int);

void IODevice_closeHandle(struct IODevice *io);

bool inicializeIOCP(){
    bool result = false;

    if(completionPort == null){
        completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
        if(completionPort != null){
            int i;
            for(i = 0;i<8;i++){
                CreateThread(null,0,(LPTHREAD_START_ROUTINE)&ThreadIoCompletion,(LPVOID)i,0,null);
            }
            result = true;
        }else{
            printf("Error creating IOCP %d\n", (int)GetLastError());
        }
    }else {
        result = true;
    }
    return result;
}
typedef struct IODevicePrivate{
    struct IODevice device;
    HANDLE hFile;
    int mode;
    OVERLAPPEDex rOverlapped;
    OVERLAPPEDex wOverlapped;
}IODevicePrivate;

int IODevice_read(struct IODevice* io,byte* buff,ulong maxBuff){
    ulong NumberOfBytesReated = -1;
    if(io && (((IODevicePrivate*)io)->mode & m_read)){
        ioCompletionCallback callback = null;
        OVERLAPPEDex* Overlapped = &((IODevicePrivate*)io)->rOverlapped;
        ZeroMemory(Overlapped,sizeof(*Overlapped));
        Overlapped->opCode = OP_READ;
        Overlapped->buff = buff;

        if(!ReadFile(((IODevicePrivate*)io)->hFile
                           ,buff,maxBuff,&NumberOfBytesReated,(OVERLAPPED*)Overlapped)){
            int error = GetLastError();
            switch (error) {
                case ERROR_IO_PENDING:
                    // si el modo asincrono no esta activado, esperamos a que complete
                    callback = ((IODevicePrivate*)io)->device.ioCompletion;
                    if(!callback){
                        GetOverlappedResult(((IODevicePrivate*)io)->hFile,
                                               (OVERLAPPED*)Overlapped,&NumberOfBytesReated,true)
                    }
                    break;
                default:
                    printf("ReadFile error: %d\n",error);
                    break;
            }
        }
    }
    return NumberOfBytesReated;
}

int IODevice_write(struct IODevice* io,const byte* buff,ulong nBytes){
    ulong NumberOfBytesWritten = -1;
    if(io && (((IODevicePrivate*)io)->mode & m_write)){
        ioCompletionCallback callback = null;
        OVERLAPPEDex* Overlapped = &((IODevicePrivate*)io)->wOverlapped;
        Overlapped->opCode = OP_WRITE;
        Overlapped->buff = (byte*)buff;

        if(!WriteFile(((IODevicePrivate*)io)->hFile
                            ,buff,nBytes,&NumberOfBytesWritten,(OVERLAPPED*)Overlapped)){
            int error = GetLastError();
            switch (error) {
                case ERROR_IO_PENDING:
                    // si el modo asincrono no esta activado, esperamos a que complete
                    callback = io->ioCompletion;
                    if(!callback){
                        GetOverlappedResult(((IODevicePrivate*)io)->hFile,
                                               (OVERLAPPED*)Overlapped,&NumberOfBytesWritten,true)
                    }
                    break;
                default:
                    printf("WriteFile error: %d\n",error);
                    break;
            }
        }
    }
    return NumberOfBytesWritten;
}
int IODevice_dispatchIoCompletion(IODevicePrivate* io,OVERLAPPEDex* Overlapped,ulong BytesTransfered,bool result){
    if(io){
        byte* buff = null;
        int opCode,errorCode = 0;
        if(Overlapped){
            buff = Overlapped->buff;
            errorCode = Overlapped->Overlapped.Internal;
            opCode = Overlapped->opCode;

            if(result && !BytesTransfered && opCode == OP_READ){
                opCode = OP_EOF;
                printf("OP_EOF %u\n",errorCode);
            }
        }else{
            opCode = OP_CLOSE;
        }

        switch(opCode){
            case OP_EOF:
            case OP_READ:
            case OP_WRITE:
                if(io->device.ioCompletion)io->device.ioCompletion((IODevice*)io,buff,
                                                                   BytesTransfered,opCode,
                                                                   errorCode);
                break;
            case OP_CLOSE:
                if(io->device.ioCompletion)io->device.ioCompletion((IODevice*)io,buff,
                                                                   BytesTransfered,opCode,
                                                                   errorCode);
                io->device.closeHandle((IODevice*)io);
                free(io);
            default:
                ;
        }
    }
    return 0;
}
struct IODevice* createIODevice(HANDLE h,ulong mode){
    IODevicePrivate* ioDevice = null;
    if(inicializeIOCP()){
        ioDevice = malloc(sizeof(IODevicePrivate));
        if(ioDevice){
            ZeroMemory(ioDevice,sizeof(*ioDevice));

            ioDevice->mode = mode;
            ioDevice->hFile = h;
            ioDevice->device.read = &IODevice_read;
            ioDevice->device.write = &IODevice_write;
            ioDevice->device.dispatchIoCompletion = (IODeviceDispatchIoCompletion)&IODevice_dispatchIoCompletion;
            ioDevice->device.closeHandle = &IODevice_closeHandle;

            CreateIoCompletionPort(h,completionPort,(DWORD)ioDevice,0);
        }
    }

    return (struct IODevice*)ioDevice;
}
void CALLBACK ThreadIoCompletion(int lpParam){
    int nThreadNo = lpParam;

    OVERLAPPEDex *pOverlapped = NULL;
    IODevicePrivate   *pContext = NULL;
    DWORD            dwBytesTransfered = 0;

    bool result = true;
    while (result) {
        result = GetQueuedCompletionStatus(
                    completionPort,
                    &dwBytesTransfered,
                    (LPDWORD)&pContext,
                    (LPOVERLAPPED*)&pOverlapped,
                    INFINITE);
        if (!pContext){
            //Estan pidiendo parar
            break;
        }
        printf("thread dispatching: %d\n",nThreadNo);
        pContext->device.dispatchIoCompletion((struct IODevice*)pContext,pOverlapped,dwBytesTransfered,result);
    }
}


void closeIODevice(struct IODevice *io){
    //Cancelamos las operaciones pendientes y enviamos un mensaje a la cosa sin

    if(!CancelIo(((IODevicePrivate*)io)->hFile))
        printf("CancelIo error: %u\n",(uint)GetLastError());
    //Cambiamos el modo a none para que no se pueda llamar mas a read y a write
    ((IODevicePrivate*)io)->mode = m_none;
    PostQueuedCompletionStatus(completionPort,0,(uint)io,null);
}
void IODevice_closeHandle(struct IODevice *io){
    CloseHandle(((IODevicePrivate*)io)->hFile);
}

HANDLE IODevice_getHandle(struct IODevice *io){
    return ((IODevicePrivate*)io)->hFile;
}



Saludos


Buen código ;D Sólo faltaría un ejemplo de uso ::)
I code for $$$.

(PGP ID 0xCC050E77)
ASM, C, C++, VB6... skilled [malware] developer

Aqui te oingo unos trozos de un codigo donde lo uso

Código: c


//En esta funcion se aceptarian clientes del socket
int Socket_listen(ushort port,AcceptClientsCallback onAcceptClients){
.....
.....
while ((clientSocket = accept(serverSocket,(SOCKADDR*)&clientAddr,&clientAddrSize)) != INVALID_SOCKET) {
                        //Aqui se crea el IODevice a partir del socket
                        IODevice* ioClient = createIODevice((HANDLE)clientSocket,m_rw);
                        if(ioClient){
                            //Luego si hace falta cambiar alguna funcion como la de close, para casos especificos
                            ioClient->closeHandle = &Net_closeSocket;

                            onAcceptClients((struct sockaddr*)&clientAddr,ioClient);
                        }else{
                            printf("createIODevice error\n");
                            closesocket(clientSocket);
                        }
                    }
...
...
}

void httpClientDispatcher(struct sockaddr* addr,struct IODevice* io){
    //stream_printf(out,"hello\n");
    Http* newHttp = (Http*)malloc(sizeof(*newHttp));
    ZeroMemory(newHttp,sizeof(*newHttp));
    newHttp->maxRead = 1024;
    newHttp->readBuff = malloc(newHttp->maxRead);

    //Aqui se pone la funcion que atendera las notificaciones de datos leidos, escritos, fin de fichero y close
    io->context = newHttp;
    io->ioCompletion = &http_ioCompletion;

    /*
     *Aqui se empieza una solicitud de lectura y otra se escritura, pueden estar pendientes las 2 a la vez
     * Para detectar el fin de fichero hay que intentar leer
     */
    io->read(io,newHttp->readBuff,newHttp->maxRead);

    io->write(io,(const byte*)"hola\n",sizeof("hola\n"));
}

void http_ioCompletion(struct IODevice* io,byte* buff,ulong BytesTransfered,ulong op,ulong status){
    Http* http = (Http*)io->context;

    //Aqui se manejan las diferentes notificaciones
    switch (op) {
        case OP_WRITE:
            printf("http BytesWrited: %u, data: %s\n",(uint)BytesTransfered,buff);
            break;
        case OP_READ:
            if(status != STATUS_CANCELLED && status != ERROR_PROCESS_ABORTED){
                bool continueRead = true;
               
                printf("http BytesReated: %s\n",buff);
                ZeroMemory(http->readBuff,http->maxRead);

                if(strcmp("bye",buff)==0){
                        closeIODevice(io);
                        continueRead = false;
                }


                if(continueRead)io->read(io,http->readBuff,http->maxRead);

            }
            break;
        case OP_EOF:
            closeIODevice(io);
            break;
        case OP_CLOSE:
            printf("Cliente desconectado\n");
            break;
        default:
            break;
    }
}