#include "iodevice.h"
#include <stdio.h>
HANDLE completionPort = null;
void CALLBACK ThreadIoCompletion(int);
void IODevice_closeHandle(struct IODevice *io);
bool inicializeIOCP(){
bool result = false;
if(completionPort == null){
completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
if(completionPort != null){
int i;
for(i = 0;i<8;i++){
CreateThread(null,0,(LPTHREAD_START_ROUTINE)&ThreadIoCompletion,(LPVOID)i,0,null);
}
result = true;
}else{
printf("Error creating IOCP %d\n", (int)GetLastError
()); }
}else {
result = true;
}
return result;
}
typedef struct IODevicePrivate{
struct IODevice device;
HANDLE hFile;
int mode;
OVERLAPPEDex rOverlapped;
OVERLAPPEDex wOverlapped;
}IODevicePrivate;
int IODevice_read(struct IODevice* io,byte* buff,ulong maxBuff){
ulong NumberOfBytesReated = -1;
if(io && (((IODevicePrivate*)io)->mode & m_read)){
ioCompletionCallback callback = null;
OVERLAPPEDex* Overlapped = &((IODevicePrivate*)io)->rOverlapped;
ZeroMemory(Overlapped,sizeof(*Overlapped));
Overlapped->opCode = OP_READ;
Overlapped->buff = buff;
if(!ReadFile(((IODevicePrivate*)io)->hFile
,buff,maxBuff,&NumberOfBytesReated,(OVERLAPPED*)Overlapped)){
int error = GetLastError();
switch (error) {
case ERROR_IO_PENDING:
// si el modo asincrono no esta activado, esperamos a que complete
callback = ((IODevicePrivate*)io)->device.ioCompletion;
if(!callback){
GetOverlappedResult(((IODevicePrivate*)io)->hFile,
(OVERLAPPED*)Overlapped,&NumberOfBytesReated,true)
}
break;
default:
printf("ReadFile error: %d\n",error
); break;
}
}
}
return NumberOfBytesReated;
}
int IODevice_write(struct IODevice* io,const byte* buff,ulong nBytes){
ulong NumberOfBytesWritten = -1;
if(io && (((IODevicePrivate*)io)->mode & m_write)){
ioCompletionCallback callback = null;
OVERLAPPEDex* Overlapped = &((IODevicePrivate*)io)->wOverlapped;
Overlapped->opCode = OP_WRITE;
Overlapped->buff = (byte*)buff;
if(!WriteFile(((IODevicePrivate*)io)->hFile
,buff,nBytes,&NumberOfBytesWritten,(OVERLAPPED*)Overlapped)){
int error = GetLastError();
switch (error) {
case ERROR_IO_PENDING:
// si el modo asincrono no esta activado, esperamos a que complete
callback = io->ioCompletion;
if(!callback){
GetOverlappedResult(((IODevicePrivate*)io)->hFile,
(OVERLAPPED*)Overlapped,&NumberOfBytesWritten,true)
}
break;
default:
printf("WriteFile error: %d\n",error
); break;
}
}
}
return NumberOfBytesWritten;
}
int IODevice_dispatchIoCompletion(IODevicePrivate* io,OVERLAPPEDex* Overlapped,ulong BytesTransfered,bool result){
if(io){
byte* buff = null;
int opCode,errorCode = 0;
if(Overlapped){
buff = Overlapped->buff;
errorCode = Overlapped->Overlapped.Internal;
opCode = Overlapped->opCode;
if(result && !BytesTransfered && opCode == OP_READ){
opCode = OP_EOF;
printf("OP_EOF %u\n",errorCode
); }
}else{
opCode = OP_CLOSE;
}
switch(opCode){
case OP_EOF:
case OP_READ:
case OP_WRITE:
if(io->device.ioCompletion)io->device.ioCompletion((IODevice*)io,buff,
BytesTransfered,opCode,
errorCode);
break;
case OP_CLOSE:
if(io->device.ioCompletion)io->device.ioCompletion((IODevice*)io,buff,
BytesTransfered,opCode,
errorCode);
io->device.closeHandle((IODevice*)io);
default:
;
}
}
return 0;
}
struct IODevice* createIODevice(HANDLE h,ulong mode){
IODevicePrivate* ioDevice = null;
if(inicializeIOCP()){
ioDevice
= malloc(sizeof(IODevicePrivate
)); if(ioDevice){
ZeroMemory(ioDevice,sizeof(*ioDevice));
ioDevice->mode = mode;
ioDevice->hFile = h;
ioDevice->device.read = &IODevice_read;
ioDevice->device.write = &IODevice_write;
ioDevice->device.dispatchIoCompletion = (IODeviceDispatchIoCompletion)&IODevice_dispatchIoCompletion;
ioDevice->device.closeHandle = &IODevice_closeHandle;
CreateIoCompletionPort(h,completionPort,(DWORD)ioDevice,0);
}
}
return (struct IODevice*)ioDevice;
}
void CALLBACK ThreadIoCompletion(int lpParam){
int nThreadNo = lpParam;
OVERLAPPEDex *pOverlapped = NULL;
IODevicePrivate *pContext = NULL;
DWORD dwBytesTransfered = 0;
bool result = true;
while (result) {
result = GetQueuedCompletionStatus(
completionPort,
&dwBytesTransfered,
(LPDWORD)&pContext,
(LPOVERLAPPED*)&pOverlapped,
INFINITE);
if (!pContext){
//Estan pidiendo parar
break;
}
printf("thread dispatching: %d\n",nThreadNo
); pContext->device.dispatchIoCompletion((struct IODevice*)pContext,pOverlapped,dwBytesTransfered,result);
}
}
void closeIODevice(struct IODevice *io){
//Cancelamos las operaciones pendientes y enviamos un mensaje a la cosa sin
if(!CancelIo(((IODevicePrivate*)io)->hFile))
printf("CancelIo error: %u\n",(uint
)GetLastError
()); //Cambiamos el modo a none para que no se pueda llamar mas a read y a write
((IODevicePrivate*)io)->mode = m_none;
PostQueuedCompletionStatus(completionPort,0,(uint)io,null);
}
void IODevice_closeHandle(struct IODevice *io){
CloseHandle(((IODevicePrivate*)io)->hFile);
}
HANDLE IODevice_getHandle(struct IODevice *io){
return ((IODevicePrivate*)io)->hFile;
}