/******************************************************************************/ /* */ /* X r d O s s A i o . c c */ /* */ /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */ /* All Rights Reserved */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #ifdef _POSIX_ASYNCHRONOUS_IO #ifdef __FreeBSD__ #include #endif #ifdef __APPLE__ #include #else #include #endif #endif #include "XrdOss/XrdOssApi.hh" #include "XrdOss/XrdOssTrace.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysPlatform.hh" #include "XrdSys/XrdSysPthread.hh" #include "XrdSfs/XrdSfsAio.hh" // All AIO interfaces are defined here. // Currently we disable aio support for MacOS because it is way too // buggy and incomplete. The two major problems are: // 1) No implementation of sigwaitinfo(). Though we can simulate it... // 2) Event notification returns an incomplete siginfo structure. // #ifdef __APPLE__ #undef _POSIX_ASYNCHRONOUS_IO #endif /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ extern XrdOucTrace OssTrace; //define tident aiop->TIdent extern XrdSysError OssEroute; int XrdOssFile::AioFailure = 0; #ifdef _POSIX_ASYNCHRONOUS_IO #ifdef SIGRTMAX const int OSS_AIO_READ_DONE = SIGRTMAX-1; const int OSS_AIO_WRITE_DONE = SIGRTMAX; #else #define OSS_AIO_READ_DONE SIGUSR1 #define OSS_AIO_WRITE_DONE SIGUSR2 #endif #endif /******************************************************************************/ /* F s y n c */ /******************************************************************************/ /* Function: Async fsync() a file Input: aiop - A aio request object */ int XrdOssFile::Fsync(XrdSfsAio *aiop) { #ifdef _POSIX_ASYNCHRONOUS_IO int rc; // Complete the aio request block and do the operation // if (XrdOssSys::AioAllOk) {aiop->sfsAio.aio_fildes = fd; aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE; aiop->TIdent = tident; // Start the operation // if (!(rc = aio_fsync(O_SYNC, &aiop->sfsAio))) return 0; if (errno != EAGAIN && errno != ENOSYS) return -errno; // Aio failed keep track of the problem (msg every 1024 events). Note // that the handling of the counter is sloppy because we do not lock it. // {int fcnt = AioFailure++; if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "fsync async"); } } #endif // Execute this request in a synchronous fashion // if ((aiop->Result = Fsync())) aiop->Result = -errno; // Simply call the write completion routine and return as if all went well // aiop->doneWrite(); return 0; } /******************************************************************************/ /* R e a d */ /******************************************************************************/ /* Function: Async read `blen' bytes from the associated file, placing in 'buff' Input: aiop - An aio request object Output: <0 -> Operation failed, value is negative errno value. =0 -> Operation queued >0 -> Operation not queued, system resources unavailable or asynchronous I/O is not supported. */ int XrdOssFile::Read(XrdSfsAio *aiop) { #ifdef _POSIX_ASYNCHRONOUS_IO EPNAME("AioRead"); int rc; // Complete the aio request block and do the operation // if (XrdOssSys::AioAllOk) {aiop->sfsAio.aio_fildes = fd; aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_READ_DONE; aiop->TIdent = tident; TRACE(Debug, "Read " <sfsAio.aio_nbytes <<'@' <sfsAio.aio_offset <<" started; aiocb=" <sfsAio))) return 0; if (errno != EAGAIN && errno != ENOSYS) return -errno; // Aio failed keep track of the problem (msg every 1024 events). Note // that the handling of the counter is sloppy because we do not lock it. // {int fcnt = AioFailure++; if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("aio", errno, "read async"); } } #endif // Execute this request in a synchronous fashion // aiop->Result = this->Read((void *)aiop->sfsAio.aio_buf, (off_t)aiop->sfsAio.aio_offset, (size_t)aiop->sfsAio.aio_nbytes); // Simple call the read completion routine and return as if all went well // aiop->doneRead(); return 0; } /******************************************************************************/ /* W r i t e */ /******************************************************************************/ /* Function: Async write `blen' bytes from 'buff' into the associated file Input: aiop - An aio request object. Output: <0 -> Operation failed, value is negative errno value. =0 -> Operation queued >0 -> Operation not queued, system resources unavailable or asynchronous I/O is not supported. */ int XrdOssFile::Write(XrdSfsAio *aiop) { #ifdef _POSIX_ASYNCHRONOUS_IO EPNAME("AioWrite"); int rc; // Complete the aio request block and do the operation // if (XrdOssSys::AioAllOk) {aiop->sfsAio.aio_fildes = fd; aiop->sfsAio.aio_sigevent.sigev_signo = OSS_AIO_WRITE_DONE; aiop->TIdent = tident; TRACE(Debug, "Write " <sfsAio.aio_nbytes <<'@' <sfsAio.aio_offset <<" started; aiocb=" <sfsAio))) return 0; if (errno != EAGAIN && errno != ENOSYS) return -errno; // Aio failed keep track of the problem (msg every 1024 events). Note // that the handling of the counter is sloppy because we do not lock it. // {int fcnt = AioFailure++; if ((fcnt & 0x3ff) == 1) OssEroute.Emsg("Write",errno,"write async"); } } #endif // Execute this request in a synchronous fashion // aiop->Result = this->Write((const void *)aiop->sfsAio.aio_buf, (off_t)aiop->sfsAio.aio_offset, (size_t)aiop->sfsAio.aio_nbytes); // Simply call the write completion routine and return as if all went well // aiop->doneWrite(); return 0; } /******************************************************************************/ /* X r d O s s S y s A I O M e t h o d s */ /******************************************************************************/ /******************************************************************************/ /* G l o b a l s */ /******************************************************************************/ int XrdOssSys::AioAllOk = 0; #if defined(_POSIX_ASYNCHRONOUS_IO) && !defined(HAVE_SIGWTI) // The folowing is for sigwaitinfo() emulation // siginfo_t *XrdOssAioInfoR; siginfo_t *XrdOssAioInfoW; extern "C" {extern void XrdOssAioRSH(int, siginfo_t *, void *);} extern "C" {extern void XrdOssAioWSH(int, siginfo_t *, void *);} #endif /******************************************************************************/ /* A i o I n i t */ /******************************************************************************/ /* Function: Initialize for AIO processing. Return: True if successful, false otherwise. */ int XrdOssSys::AioInit() { #if defined(_POSIX_ASYNCHRONOUS_IO) EPNAME("AioInit"); extern void *XrdOssAioWait(void *carg); pthread_t tid; int retc; #ifndef HAVE_SIGWTI // For those platforms that do not have sigwaitinfo(), we provide the // appropriate emulation using a signal handler. We actually provide for // two handlers since we separate reads from writes. To emulate synchronous // signals, we prohibit one signal hander from interrupting another one. // struct sigaction sa; sa.sa_sigaction = XrdOssAioRSH; sa.sa_flags = SA_SIGINFO; sigemptyset(&sa.sa_mask); sigaddset(&sa.sa_mask, OSS_AIO_WRITE_DONE); if (sigaction(OSS_AIO_READ_DONE, &sa, NULL) < 0) {OssEroute.Emsg("AioInit", errno, "creating AIO read signal handler; " "AIO support terminated."); return 0; } sa.sa_sigaction = XrdOssAioWSH; sa.sa_flags = SA_SIGINFO; sigemptyset(&sa.sa_mask); sigaddset(&sa.sa_mask, OSS_AIO_READ_DONE); if (sigaction(OSS_AIO_WRITE_DONE, &sa, NULL) < 0) {OssEroute.Emsg("AioInit", errno, "creating AIO write signal handler; " "AIO support terminated."); return 0; } #endif // The AIO signal handler consists of two thread (one for read and one for // write) that synhronously wait for AIO events. We assume, blithely, that // the first two real-time signals have been blocked for all threads. // if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait, (void *)(&OSS_AIO_READ_DONE))) < 0) OssEroute.Emsg("AioInit", retc, "creating AIO read signal thread; " "AIO support terminated."); #ifdef __FreeBSD__ else {DEBUG("started AIO read signal thread."); #else else {DEBUG("started AIO read signal thread; tid=" <<(unsigned int)tid); #endif if ((retc = XrdSysThread::Run(&tid, XrdOssAioWait, (void *)(&OSS_AIO_WRITE_DONE))) < 0) OssEroute.Emsg("AioInit", retc, "creating AIO write signal thread; " "AIO support terminated."); #ifdef __FreeBSD__ else {DEBUG("started AIO write signal thread."); #else else {DEBUG("started AIO write signal thread; tid=" <<(unsigned int)tid); #endif AioAllOk = 1; } } // All done // return AioAllOk; #else return 1; #endif } /******************************************************************************/ /* A i o W a i t */ /******************************************************************************/ void *XrdOssAioWait(void *mySigarg) { #ifdef _POSIX_ASYNCHRONOUS_IO EPNAME("AioWait"); int mySignum = *((int *)mySigarg); const char *sigType = (mySignum == OSS_AIO_READ_DONE ? "read" : "write"); const int isRead = (mySignum == OSS_AIO_READ_DONE); sigset_t mySigset; siginfo_t myInfo; XrdSfsAio *aiop; int rc, numsig; ssize_t retval; #ifndef HAVE_SIGWTI extern int sigwaitinfo(const sigset_t *set, siginfo_t *info); extern siginfo_t *XrdOssAioInfoR; extern siginfo_t *XrdOssAioInfoW; // We will catch one signal at a time. So, the address of siginfo_t can be // placed in a global area where the signal handler will find it. We have one // two places where this can go. // if (isRead) XrdOssAioInfoR = &myInfo; else XrdOssAioInfoW = &myInfo; // Initialize the signal we will be suspended for // sigfillset(&mySigset); sigdelset(&mySigset, mySignum); #else // Initialize the signal we will be waiting for // sigemptyset(&mySigset); sigaddset(&mySigset, mySignum); #endif // Simply wait for events and requeue the completed AIO operation // do {do {numsig = sigwaitinfo((const sigset_t *)&mySigset, &myInfo);} while (numsig < 0 && errno == EINTR); if (numsig < 0) {OssEroute.Emsg("AioWait",errno,sigType,"wait for AIO signal"); XrdOssSys::AioAllOk = 0; break; } if (numsig != mySignum || myInfo.si_code != SI_ASYNCIO) {char buff[80]; sprintf(buff, "%d %d", myInfo.si_code, numsig); OssEroute.Emsg("AioWait", "received unexpected signal", buff); continue; } #ifdef __APPLE__ aiop = (XrdSfsAio *)myInfo.si_value.sigval_ptr; #else aiop = (XrdSfsAio *)myInfo.si_value.sival_ptr; #endif while ((rc = aio_error(&aiop->sfsAio)) == EINPROGRESS) {} retval = (ssize_t)aio_return(&aiop->sfsAio); DEBUG(sigType <<" completed for " <TIdent <<"; rc=" <si_signo = info->si_signo; XrdOssAioInfoR->si_errno = info->si_errno; XrdOssAioInfoR->si_code = info->si_code; #ifdef __APPLE__ XrdOssAioInfoR->si_value.sigval_ptr = info->si_addr; #else XrdOssAioInfoR->si_value.sival_ptr = info->si_value.sival_ptr; #endif } } /******************************************************************************/ /* X r d O s s A i o W S H */ /******************************************************************************/ // XrdOssAioRSH handles AIO read signals. This handler was setup at AIO // initialization time but only when this platform does not have sigwaitinfo(). // extern "C" { void XrdOssAioWSH(int signum, siginfo_t *info, void *ucontext) { extern siginfo_t *XrdOssAioInfoW; // If we received a signal, it must have been for an AIO read and the read // signal thread enabled this signal. This means that a valid address exists // in the global read info pointer that we can now fill out. // XrdOssAioInfoW->si_signo = info->si_signo; XrdOssAioInfoW->si_errno = info->si_errno; XrdOssAioInfoW->si_code = info->si_code; #ifdef __APPLE__ XrdOssAioInfoW->si_value.sigval_ptr = info->si_addr; #else XrdOssAioInfoW->si_value.sival_ptr = info->si_value.sival_ptr; #endif } } #endif