//------------------------------------------------------------------------------ // Copyright (c) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH // Author: Paul-Niklas Kramp // Michal Simon //------------------------------------------------------------------------------ // XRootD is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // XRootD is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with XRootD. If not, see . //------------------------------------------------------------------------------ #include "XrdCl/XrdClLocalFileHandler.hh" #include "XrdCl/XrdClConstants.hh" #include "XrdCl/XrdClPostMaster.hh" #include "XrdCl/XrdClURL.hh" #include "XrdCl/XrdClMessageUtils.hh" #include "XrdCl/XrdClFileSystem.hh" #include "XProtocol/XProtocol.hh" #include #include #include #include #include #include #include #include #include #include #include namespace { class AioCtx { public: enum Opcode { None, Read, Write, Sync }; AioCtx( const XrdCl::HostList &hostList, XrdCl::ResponseHandler *handler ) : opcode( None ), hosts( new XrdCl::HostList( hostList ) ), handler( handler ) { aiocb *ptr = new aiocb(); memset( ptr, 0, sizeof( aiocb ) ); XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv(); int useSignals = XrdCl::DefaultAioSignal; env->GetInt( "AioSignal", useSignals ); if( useSignals ) { static SignalHandlerRegistrator registrator; // registers the signal handler ptr->aio_sigevent.sigev_notify = SIGEV_SIGNAL; ptr->aio_sigevent.sigev_signo = SIGUSR1; } else { ptr->aio_sigevent.sigev_notify = SIGEV_THREAD; ptr->aio_sigevent.sigev_notify_function = ThreadHandler; } ptr->aio_sigevent.sigev_value.sival_ptr = this; cb.reset( ptr ); } void SetWrite( int fd, size_t offset, size_t size, const void *buffer ) { cb->aio_fildes = fd; cb->aio_offset = offset; cb->aio_buf = const_cast( buffer ); cb->aio_nbytes = size; opcode = Opcode::Write; } void SetRead( int fd, size_t offset, size_t size, void *buffer ) { cb->aio_fildes = fd; cb->aio_offset = offset; cb->aio_buf = buffer; cb->aio_nbytes = size; opcode = Opcode::Read; } void SetFsync( int fd ) { cb->aio_fildes = fd; opcode = Opcode::Sync; } static void ThreadHandler( sigval arg ) { std::unique_ptr me( reinterpret_cast( arg.sival_ptr ) ); Handler( std::move( me ) ); } static void SignalHandler( int sig, siginfo_t *info, void *ucontext ) { std::unique_ptr me( reinterpret_cast( info->si_value.sival_ptr ) ); Handler( std::move( me ) ); } operator aiocb*() { return cb.get(); } private: struct SignalHandlerRegistrator { SignalHandlerRegistrator() { struct sigaction newact, oldact; newact.sa_sigaction = SignalHandler; sigemptyset( &newact.sa_mask ); newact.sa_flags = SA_SIGINFO; int rc = sigaction( SIGUSR1, &newact, &oldact ); if( rc < 0 ) throw std::runtime_error( strerror( errno ) ); } }; static void Handler( std::unique_ptr me ) { if( me->opcode == Opcode::None ) return; using namespace XrdCl; int rc = aio_return( me->cb.get() ); if( rc < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, GetErrMsg( me->opcode ), strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); QueueTask( error, 0, me->hosts, me->handler ); } else { AnyObject *resp = 0; if( me->opcode == Opcode::Read ) { ChunkInfo *chunk = new ChunkInfo( me->cb->aio_offset, rc, const_cast( me->cb->aio_buf ) ); resp = new AnyObject(); resp->Set( chunk ); } QueueTask( new XRootDStatus(), resp, me->hosts, me->handler ); } } static const char* GetErrMsg( Opcode opcode ) { static const char readmsg[] = "Read: failed %s"; static const char writemsg[] = "Write: failed %s"; static const char syncmsg[] = "Sync: failed %s"; switch( opcode ) { case Opcode::Read: return readmsg; case Opcode::Write: return writemsg; case Opcode::Sync: return syncmsg; default: return 0; } } static void QueueTask( XrdCl::XRootDStatus *status, XrdCl::AnyObject *resp, XrdCl::HostList *hosts, XrdCl::ResponseHandler *handler ) { using namespace XrdCl; // if it is simply the sync handler we can release the semaphore // and return there is no need to execute this in the thread-pool SyncResponseHandler *syncHandler = dynamic_cast( handler ); if( syncHandler ) { syncHandler->HandleResponse( status, resp ); } else { JobManager *jmngr = DefaultEnv::GetPostMaster()->GetJobManager(); LocalFileTask *task = new LocalFileTask( status, resp, hosts, handler ); jmngr->QueueJob( task ); } } std::unique_ptr cb; Opcode opcode; XrdCl::HostList *hosts; XrdCl::ResponseHandler *handler; }; }; namespace XrdCl { //------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------ LocalFileHandler::LocalFileHandler() : fd( -1 ) { jmngr = DefaultEnv::GetPostMaster()->GetJobManager(); } //------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------ LocalFileHandler::~LocalFileHandler() { } //------------------------------------------------------------------------ // Open //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Open( const std::string& url, uint16_t flags, uint16_t mode, ResponseHandler* handler, uint16_t timeout ) { AnyObject *resp = 0; XRootDStatus st = OpenImpl( url, flags, mode, resp ); if( !st.IsOK() && st.code != errErrorResponse ) return st; return QueueTask( new XRootDStatus( st ), resp, handler ); } XRootDStatus LocalFileHandler::Open( const URL *url, const Message *req, AnyObject *&resp ) { const ClientOpenRequest* request = reinterpret_cast( req->GetBuffer() ); uint16_t flags = ntohs( request->options ); uint16_t mode = ntohs( request->mode ); return OpenImpl( url->GetURL(), flags, mode, resp ); } //------------------------------------------------------------------------ // Close //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Close( ResponseHandler* handler, uint16_t timeout ) { if( close( fd ) == -1 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "Close: file fd: %i %s", fd, strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } return QueueTask( new XRootDStatus(), 0, handler ); } //------------------------------------------------------------------------ // Stat //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Stat( ResponseHandler* handler, uint16_t timeout ) { Log *log = DefaultEnv::GetLog(); struct stat ssp; if( fstat( fd, &ssp ) == -1 ) { log->Error( FileMsg, "Stat: failed fd: %i %s", fd, strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } std::ostringstream data; data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " " << ssp.st_mtime; log->Debug( FileMsg, data.str().c_str() ); StatInfo *statInfo = new StatInfo(); if( !statInfo->ParseServerResponse( data.str().c_str() ) ) { log->Error( FileMsg, "Stat: ParseServerResponse failed." ); delete statInfo; return QueueTask( new XRootDStatus( stError, errErrorResponse, kXR_FSError ), 0, handler ); } AnyObject *resp = new AnyObject(); resp->Set( statInfo ); return QueueTask( new XRootDStatus(), resp, handler ); } //------------------------------------------------------------------------ // Read //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Read( uint64_t offset, uint32_t size, void* buffer, ResponseHandler* handler, uint16_t timeout ) { #if defined(__APPLE__) Log *log = DefaultEnv::GetLog(); int read = 0; if( ( read = pread( fd, buffer, size, offset ) ) == -1 ) { log->Error( FileMsg, "Read: failed %s", strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } ChunkInfo *chunk = new ChunkInfo( offset, read, buffer ); AnyObject *resp = new AnyObject(); resp->Set( chunk ); return QueueTask( new XRootDStatus(), resp, handler ); #else AioCtx *ctx = new AioCtx( pHostList, handler ); ctx->SetRead( fd, offset, size, buffer ); int rc = aio_read( *ctx ); if( rc < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "Read: failed %s", strerror( errno ) ); return XRootDStatus( stError, errOSError, XProtocol::mapError( rc ), strerror( errno ) ); } return XRootDStatus(); #endif } //------------------------------------------------------------------------ // Write //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Write( uint64_t offset, uint32_t size, const void* buffer, ResponseHandler* handler, uint16_t timeout ) { #if defined(__APPLE__) const char *buff = reinterpret_cast( buffer ); size_t bytesWritten = 0; while( bytesWritten < size ) { ssize_t ret = pwrite( fd, buff, size, offset ); if( ret < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "Write: failed %s", strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } offset += ret; buff += ret; bytesWritten += ret; } return QueueTask( new XRootDStatus(), 0, handler ); #else AioCtx *ctx = new AioCtx( pHostList, handler ); ctx->SetWrite( fd, offset, size, buffer ); int rc = aio_write( *ctx ); if( rc < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "Write: failed %s", strerror( errno ) ); return XRootDStatus( stError, errOSError, XProtocol::mapError( rc ), strerror( errno ) ); } return XRootDStatus(); #endif } //------------------------------------------------------------------------ // Sync //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Sync( ResponseHandler* handler, uint16_t timeout ) { #if defined(__APPLE__) if( fsync( fd ) ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "Sync: failed %s", strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errOSError, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } #else AioCtx *ctx = new AioCtx( pHostList, handler ); ctx->SetFsync( fd ); int rc = aio_fsync( O_SYNC, *ctx ); if( rc < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "Sync: failed %s", strerror( errno ) ); return XRootDStatus( stError, errOSError, XProtocol::mapError( rc ), strerror( errno ) ); } return XRootDStatus(); #endif } //------------------------------------------------------------------------ // Truncate //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Truncate( uint64_t size, ResponseHandler* handler, uint16_t timeout ) { if( ftruncate( fd, size ) ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "Truncate: failed, file descriptor: %i, %s", fd, strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } return QueueTask( new XRootDStatus( stOK ), 0, handler ); } //------------------------------------------------------------------------ // VectorRead //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::VectorRead( const ChunkList& chunks, void* buffer, ResponseHandler* handler, uint16_t timeout ) { std::unique_ptr info( new VectorReadInfo() ); size_t totalSize = 0; bool useBuffer( buffer ); for( auto itr = chunks.begin(); itr != chunks.end(); ++itr ) { auto &chunk = *itr; if( !useBuffer ) buffer = chunk.buffer; ssize_t bytesRead = pread( fd, buffer, chunk.length, chunk.offset ); if( bytesRead < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "VectorRead: failed, file descriptor: %i, %s", fd, strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } totalSize += bytesRead; info->GetChunks().push_back( ChunkInfo( chunk.offset, bytesRead, buffer ) ); if( useBuffer ) buffer = reinterpret_cast( buffer ) + bytesRead; } info->SetSize( totalSize ); AnyObject *resp = new AnyObject(); resp->Set( info.release() ); return QueueTask( new XRootDStatus(), resp, handler ); } //------------------------------------------------------------------------ // VectorWrite //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::VectorWrite( const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout ) { for( auto itr = chunks.begin(); itr != chunks.end(); ++itr ) { auto &chunk = *itr; ssize_t bytesWritten = pwrite( fd, chunk.buffer, chunk.length, chunk.offset ); if( bytesWritten < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "VectorWrite: failed, file descriptor: %i, %s", fd, strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } } return QueueTask( new XRootDStatus(), 0, handler ); } //------------------------------------------------------------------------ // WriteV //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::WriteV( uint64_t offset, ChunkList *chunks, ResponseHandler *handler, uint16_t timeout ) { size_t iovcnt = chunks->size(); iovec iovcp[iovcnt]; ssize_t size = 0; for( size_t i = 0; i < iovcnt; ++i ) { iovcp[i].iov_base = (*chunks)[i].buffer; iovcp[i].iov_len = (*chunks)[i].length; size += (*chunks)[i].length; } iovec *iovptr = iovcp; ssize_t bytesWritten = 0; while( bytesWritten < size ) { #ifdef __APPLE__ ssize_t ret = lseek( fd, offset, SEEK_SET ); if( ret >= 0 ) ret = writev( fd, iovptr, iovcnt ); #else ssize_t ret = pwritev( fd, iovptr, iovcnt, offset ); #endif if( ret < 0 ) { Log *log = DefaultEnv::GetLog(); log->Error( FileMsg, "WriteV: failed %s", strerror( errno ) ); XRootDStatus *error = new XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); return QueueTask( error, 0, handler ); } bytesWritten += ret; while( ret ) { if( size_t( ret ) > iovptr[0].iov_len ) { ret -= iovptr[0].iov_len; --iovcnt; ++iovptr; } else { iovptr[0].iov_len -= ret; iovptr[0].iov_base = reinterpret_cast( iovptr[0].iov_base ) + ret; ret = 0; } } } return QueueTask( new XRootDStatus(), 0, handler ); } //------------------------------------------------------------------------ // Fcntl //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Fcntl( const Buffer &arg, ResponseHandler *handler, uint16_t timeout ) { return XRootDStatus( stError, errNotSupported ); } //------------------------------------------------------------------------ // Visa //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::Visa( ResponseHandler *handler, uint16_t timeout ) { return XRootDStatus( stError, errNotSupported ); } //------------------------------------------------------------------------ // QueueTask - queues error/success tasks for all operations. // Must always return stOK. // Is always creating the same HostList containing only localhost. //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::QueueTask( XRootDStatus *st, AnyObject *resp, ResponseHandler *handler ) { // if it is simply the sync handler we can release the semaphore // and return there is no need to execute this in the thread-pool SyncResponseHandler *syncHandler = dynamic_cast( handler ); if( syncHandler ) { syncHandler->HandleResponse( st, resp ); return XRootDStatus(); } HostList *hosts = pHostList.empty() ? 0 : new HostList( pHostList ); LocalFileTask *task = new LocalFileTask( st, resp, hosts, handler ); jmngr->QueueJob( task ); return XRootDStatus(); } //------------------------------------------------------------------------ // MkdirPath - creates the folders specified in file_path // called if kXR_mkdir flag is set //------------------------------------------------------------------------ XRootDStatus LocalFileHandler::MkdirPath( const std::string &path ) { // first find the most up-front component that exists size_t pos = path.rfind( '/' ); while( pos != std::string::npos && pos != 0 ) { std::string tmp = path.substr( 0, pos ); struct stat st; int rc = lstat( tmp.c_str(), &st ); if( rc == 0 ) break; if( errno != ENOENT ) return XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); pos = path.rfind( '/', pos - 1 ); } pos = path.find( '/', pos + 1 ); while( pos != std::string::npos && pos != 0 ) { std::string tmp = path.substr( 0, pos ); if( mkdir( tmp.c_str(), 0755 ) ) { if( errno != EEXIST ) return XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); } pos = path.find( '/', pos + 1 ); } return XRootDStatus(); } XRootDStatus LocalFileHandler::OpenImpl( const std::string &url, uint16_t flags, uint16_t mode, AnyObject *&resp) { Log *log = DefaultEnv::GetLog(); // safe the file URL for the HostList for later pUrl = url; URL fileUrl( url ); if( !fileUrl.IsValid() ) return XRootDStatus( stError, errInvalidArgs ); if( fileUrl.GetHostName() != "localhost" ) return XRootDStatus( stError, errNotSupported ); std::string path = fileUrl.GetPath(); //--------------------------------------------------------------------- // Prepare Flags //--------------------------------------------------------------------- uint16_t openflags = 0; if( flags & kXR_new ) openflags |= O_CREAT | O_EXCL; if( flags & kXR_open_wrto ) openflags |= O_WRONLY; else if( flags & kXR_open_updt ) openflags |= O_RDWR; else openflags |= O_RDONLY; if( flags & kXR_delete ) openflags |= O_CREAT | O_TRUNC; if( flags & kXR_mkdir ) { XRootDStatus st = MkdirPath( path ); if( !st.IsOK() ) { log->Error( FileMsg, "Open MkdirPath failed %s: %s", path.c_str(), strerror( st.errNo ) ); return st; } } //--------------------------------------------------------------------- // Open File //--------------------------------------------------------------------- if( mode == Access::Mode::None) mode = 0600; fd = open( path.c_str(), openflags, mode ); if( fd == -1 ) { log->Error( FileMsg, "Open: open failed: %s: %s", path.c_str(), strerror( errno ) ); return XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); } //--------------------------------------------------------------------- // Stat File and cache statInfo in openInfo //--------------------------------------------------------------------- struct stat ssp; if( fstat( fd, &ssp ) == -1 ) { log->Error( FileMsg, "Open: stat failed." ); return XRootDStatus( stError, errErrorResponse, XProtocol::mapError( errno ), strerror( errno ) ); } std::ostringstream data; data << ssp.st_dev << " " << ssp.st_size << " " << ssp.st_mode << " " << ssp.st_mtime; StatInfo *statInfo = new StatInfo(); if( !statInfo->ParseServerResponse( data.str().c_str() ) ) { log->Error( FileMsg, "Open: ParseServerResponse failed." ); delete statInfo; return XRootDStatus( stError, errErrorResponse, kXR_FSError ); } // add the URL to hosts list pHostList.push_back( HostInfo( pUrl, false ) ); //All went well uint32_t ufd = fd; OpenInfo *openInfo = new OpenInfo( (uint8_t*)&ufd, 1, statInfo ); resp = new AnyObject(); resp->Set( openInfo ); return XRootDStatus(); } XRootDStatus LocalFileHandler::ExecRequest( const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams ) { ClientRequest *req = reinterpret_cast( msg->GetBuffer() ); switch( req->header.requestid ) { case kXR_open: { XRootDStatus st = Open( url.GetURL(), req->open.options, req->open.mode, handler, sendParams.timeout ); delete msg; // in case of other operations msg is owned by the handler return st; } case kXR_close: { return Close( handler, sendParams.timeout ); } case kXR_stat: { return Stat( handler, sendParams.timeout ); } case kXR_read: { return Read( req->read.offset, req->read.rlen, sendParams.chunkList->front().buffer, handler, sendParams.timeout ); } case kXR_write: { ChunkList *chunks = sendParams.chunkList; if( chunks->size() == 1 ) { // it's an ordinary write return Write( req->write.offset, req->write.dlen, chunks->front().buffer, handler, sendParams.timeout ); } // it's WriteV call return WriteV( req->write.offset, sendParams.chunkList, handler, sendParams.timeout ); } case kXR_sync: { return Sync( handler, sendParams.timeout ); } case kXR_truncate: { return Truncate( req->truncate.offset, handler, sendParams.timeout ); } case kXR_writev: { return VectorWrite( *sendParams.chunkList, handler, sendParams.timeout ); } case kXR_readv: { return VectorRead( *sendParams.chunkList, 0, handler, sendParams.timeout ); } default: { return XRootDStatus( stError, errNotSupported ); } } } }