/******************************************************************************/ /* */ /* X r d S s i S h M a m . c c */ /* */ /* (c) 2015 by the Board of Trustees of the Leland Stanford, Jr., University */ /* Produced by Andrew Hanushevsky for Stanford University under contract */ /* DE-AC02-76-SFO0515 with the Department of Energy */ /* */ /* This file is part of the XRootD software suite. */ /* */ /* XRootD is free software: you can redistribute it and/or modify it under */ /* the terms of the GNU Lesser General Public License as published by the */ /* Free Software Foundation, either version 3 of the License, or (at your */ /* option) any later version. */ /* */ /* XRootD is distributed in the hope that it will be useful, but WITHOUT */ /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */ /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */ /* License for more details. */ /* */ /* You should have received a copy of the GNU Lesser General Public License */ /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */ /* COPYING (GPL license). If not, see . */ /* */ /* The copyright holder's institutional names and contributor's names may not */ /* be used to endorse or promote products derived from this software without */ /* specific prior written permission of the institution or contributor. */ /******************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "XrdSsi/XrdSsiShMam.hh" using namespace std; /* Gentoo removed OF from their copy of zconf.h but we need it here. See https://bugs.gentoo.org/show_bug.cgi?id=383179 for the sad history. This patch modelled after https://trac.osgeo.org/gdal/changeset/24622 */ #ifndef OF #define OF(args) args #endif /******************************************************************************/ /* S h a r e d M e m o r y I n f o r m a t i o n S t r u c t u r e */ /******************************************************************************/ namespace { struct ShmInfo {int verNum; // Always 1st fout bytes int index; // Offset of index int slots; // Number of slots in index int slotsUsed; // Number of entries in use int itemCount; // Number of items in this map int typeSz; // Size of the data payload int itemSz; // Size of each item int keyPos; // Position of key in item int freeItem; // Offset to item on the free list int freeCount; // Number of items on the free list int infoSz; // Size of header also original lowFree int lowFree; // Offset to low memory that is free int highUse; // Offset to high memory that is used char reUse; // When non-zero items can be reused (r/o locking) char multW; // When non-zero multiple writers are allowed char rsvd1; char rsvd2; int maxKeys; // Maximum number of keys int maxKeySz; // Longest allowed key (not including null byte) int hashID; // The name of the hash char typeID[64]; // Name of the type stored here char myName[64]; // Name of the implementation }; #define SHMINFO(x) ((ShmInfo *)shmBase)->x #define SHMADDR(type, offs) (type *)(shmBase + offs) #define SHMOFFS(addr) (char *)addr - shmBase #define ITEM_KEY(x) (char *)x + sizeof(MemItem) + keyPos #define ITEM_VAL(x) (char *)x + sizeof(MemItem) #define ITEM_VOF(x) (char *)x + sizeof(MemItem) - shmBase int PageMask = ~(sysconf(_SC_PAGESIZE)-1); int PageSize = sysconf(_SC_PAGESIZE); } /******************************************************************************/ /* L o c a l C l a s s e s */ /******************************************************************************/ namespace { class EnumJar {public: char *buff; int fd; int iNum; EnumJar(int xfd, int bsz) : buff(new char[bsz]), fd(xfd), iNum(0) {} ~EnumJar() {if (fd >= 0) close(fd); if (buff) delete [] buff; } }; class FileHelper { public: bool autoClose; FileHelper(XrdSsiShMam *mp) : autoClose(false), shMamP(mp) {} ~FileHelper() {if (autoClose) {int rc = errno; shMamP->Detach(); errno = rc;} } private: XrdSsiShMam *shMamP; }; class MutexHelper { public: pthread_rwlock_t *mtxP; MutexHelper(pthread_rwlock_t *mtx, XrdSsiShMam::LockType isrw) : mtxP(mtx) {if (mtx) {if (isrw) pthread_rwlock_wrlock(mtx); else pthread_rwlock_rdlock(mtx); } } ~MutexHelper() {if (mtxP) pthread_rwlock_unlock(mtxP);} }; } /******************************************************************************/ /* F i l e D e s c r i p t o r H a n d l i n g */ /******************************************************************************/ namespace { #if defined(__linux__) && defined(O_CLOEXEC) inline int ShMam_Dup(int oldfd) {return fcntl(oldfd, F_DUPFD_CLOEXEC, 0);} inline int ShMam_Open(const char *path, int flags) {return open(path, flags|O_CLOEXEC);} inline int ShMam_Open(const char *path, int flags, mode_t mode) {return open(path, flags|O_CLOEXEC, mode);} #else inline int ShMam_Dup(int oldfd) {int newfd = dup(oldfd); if (newfd >= 0) fcntl(newfd, F_SETFD, FD_CLOEXEC); return newfd; } inline int ShMam_Open(const char *path, int flags) {int newfd = open(path, flags); if (newfd >= 0) fcntl(newfd, F_SETFD, FD_CLOEXEC); return newfd; } inline int ShMam_Open(const char *path, int flags, mode_t mode) {int newfd = open(path, flags, mode); if (newfd >= 0) fcntl(newfd, F_SETFD, FD_CLOEXEC); return newfd; } #endif inline bool ShMam_Flush(int fd) { #if _POSIX_SYNCHRONIZED_IO > 0 return fdatasync(fd) == 0; #else return fsync(fd) == 0; #endif } /* inline bool ShMam_Flush(void *memP, int sOpt) { if (msync((void *)((uintptr_t)memP & PageMask), PageSize, sOpt)) return true; cerr <<"ShMam: msync() failed; " < maxKLen) {errno = ENAMETOOLONG; return false;} // Check if we need to remap this memory (atomic tests is not needed here). // We need to do this prior to file locking as the requirements may change. // if (verNum != SHMINFO(verNum)) ReMap(RWLock); // Lock the file if we have multiple writers or recycling items // if (lockRW && !lockInfo.FLock()) return false; // First try to find the item // hEnt = Find(theItem, prvItem, key, hash); // If we found it then see if we can replace it. If so and we can reuse the // the item, then just update the data portion. Otherwise, we need to get a // new item and replace the existing item. // if (hEnt) {if (olddata) memcpy(olddata, ITEM_VAL(theItem), shmTypeSz); if (!replace) {errno = EEXIST; return false;} if (reUse) {memcpy(ITEM_VAL(theItem), newdata, shmTypeSz); if (syncOn) Updated(ITEM_VOF(theItem), shmTypeSz); errno = EEXIST; return true; } retEno = EEXIST; } // Get a new item // if (!(newItem = NewItem())) {errno = ENOSPC; return false;} // Construct the new item // newItem->hash = hash; memcpy(ITEM_VAL(newItem), newdata, shmTypeSz); strcpy(ITEM_KEY(newItem), key); // If we are replacing an item then We need to bridge over the item we are // replacing in a way that doesn't make the item disappear for other readers. // Otherwise, we can patch in the new item either on the last item in the chain // or directly off the table. Note that releasing the lock creates a memory // fence. To understand why this this works consider the relationship between: // hEnt prvItem The state of the table // 0 0 Not found because index table slot is zero // 0 !0 Not found in a chain of items, prvItem is the last one // !0 0 Was found and is the first or only item in the chain // !0 !0 Was found and is in the middle or end of the chain // // if (hEnt) Atomic_SET(newItem->next, theItem->next); // Atomic else {hEnt = (unsigned int)hash % shmSlots; if (hEnt == 0) hEnt = 1; SHMINFO(itemCount)++; } iOff = SHMOFFS(newItem); if (prvItem) Atomic_SET_STRICT(prvItem->next, iOff); // Atomic else {SHMINFO(slotsUsed)++; Atomic_SET_STRICT(shmIndex[hEnt],iOff); // Atomic if (syncOn) Updated(SHMOFFS(&shmIndex[hEnt])); } // Indicate which things we changed if we have syncing // if (syncOn) {Updated(0); Updated(SHMOFFS(newItem)); if (prvItem) Updated(SHMOFFS(prvItem)); } // All done, return result // errno = retEno; return true; } /******************************************************************************/ /* A t t a c h */ /******************************************************************************/ bool XrdSsiShMam::Attach(int tout, bool isrw) { FileHelper fileHelp(this); XLockHelper lockInfo(this, (isrw ? RWLock : ROLock)); struct stat Stat1, Stat2; int mMode, oMode; union {int *intP; Atomic(int) *antP;} xntP; // Compute open and mmap options // if (isrw) {oMode = O_RDWR; mMode = PROT_READ|PROT_WRITE; isRW = true; } else { oMode = O_RDONLY; mMode = PROT_READ; isRW = false; } // Attempt to open the file // timeOut = tout; if (tout < 0) tout = 0x7fffffff; while((shmFD = ShMam_Open(shmPath, oMode)) < 0 && tout >= 0) {if (errno != ENOENT) return false; if (!tout) break; Snooze(3); tout -= 3; } // Test if we timed out // if (tout <= 0) {errno = ETIMEDOUT; return false;} fileHelp.autoClose = true; // Lock this file as we don't want it changing on us for now // if (!lockInfo.FLock()) return false; // Get the stat information for this file // if (fstat(shmFD, &Stat1)) return false; // The file is open, try to memory map it // shmBase = (char *)mmap(0, Stat1.st_size, mMode, MAP_SHARED, shmFD, 0); if (shmBase == MAP_FAILED) return false; shmSize = Stat1.st_size; // Make sure we have a valid hash name // if (!shmHash) memcpy(&shmHash, "c32 ", sizeof(int)); // Verify tha the objects in this mapping are compatible with this object // if (SHMINFO(typeSz) != shmTypeSz || strcmp(shmType, SHMINFO(typeID)) || strcmp(shmImpl, SHMINFO(myName)) || shmHash != SHMINFO(hashID)) {errno = EDOM; return false;} // Copy out the information we can use locally // verNum = SHMINFO(verNum); keyPos = SHMINFO(keyPos); maxKLen = SHMINFO(maxKeySz); xntP.intP = SHMADDR(int, SHMINFO(index)); shmIndex = xntP.antP; shmSlots = SHMINFO(slots); shmItemSz = SHMINFO(itemSz); shmInfoSz = SHMINFO(infoSz); // Now, there is a loophole here as the file could have been exported while // we were trying to attach it. If this happened, the inode would change. // We test for this now. If it changed, tell the caller to try again. // if (stat(shmPath, &Stat2) || Stat1.st_dev != Stat2.st_dev || Stat1.st_ino != Stat2.st_ino) {errno = EAGAIN; return false;} accMode = Stat2.st_mode & (S_IRWXU|S_IRWXG|S_IRWXO); // Set locking based on how the table was created // SetLocking(isrw); fileHelp.autoClose = false; return true; } /******************************************************************************/ /* C r e a t e */ /******************************************************************************/ bool XrdSsiShMam::Create(XrdSsiShMat::CRZParms &parms) { static const int minInfoSz = 256; static const int okMode = S_IRWXU|S_IRWXG|S_IROTH; static const int crMode = S_IRWXU|S_IRWXG|S_IROTH; FileHelper fileHelp(this); ShmInfo theInfo; int n, maxEnts, totSz, indexSz; union {int *intP; Atomic(int) *antP;} xntP; // Validate parameter list values // if (parms.indexSz <= 0 || parms.maxKeys <= 0 || parms.maxKLen <= 0) {errno = EINVAL; return false;} if (parms.mode & ~okMode || ((parms.mode & crMode) != crMode)) {errno = EACCES; return false;} // We need the reuse and multw options later so calclulate them now // reUse = (parms.reUse <= 0 ? false : true); multW = (parms.multW <= 0 ? false : true); // Clear the memory segment information we will be constructing // memset(&theInfo, 0, sizeof(theInfo)); // Calculate the info header size (we round up to 1K) // shmInfoSz = (sizeof(ShmInfo)+minInfoSz-1)/minInfoSz*minInfoSz; theInfo.lowFree = theInfo.infoSz = shmInfoSz; // Calculate the size of each item (rounded to a doubleword) // shmItemSz = (shmTypeSz + parms.maxKLen+1 + sizeof(MemItem) + 7)/8*8; theInfo.itemSz = shmItemSz; // Calculate total amount we need for the items // maxEnts = parms.maxKeys; totSz = shmItemSz * maxEnts; totSz = (totSz+7)/8*8; // Calculate the amount we need for the index // indexSz = parms.indexSz*sizeof(int); indexSz = (indexSz+7)/8*8; // Compute total size and adjust it to be a multiple of the page size // totSz = totSz + indexSz + shmInfoSz; totSz = (totSz/PageSize+1)*PageSize; // Generate the hashID if not specified // if (!shmHash) memcpy(&shmHash, "c32 ", sizeof(int)); // Complete the shared memory segment information structure // theInfo.index = totSz-indexSz; theInfo.slots = parms.indexSz; theInfo.typeSz = shmTypeSz; theInfo.highUse = theInfo.index; theInfo.reUse = reUse; theInfo.multW = multW; theInfo.keyPos = keyPos = shmTypeSz + sizeof(MemItem); theInfo.maxKeys = maxEnts; theInfo.maxKeySz = maxKLen = parms.maxKLen; theInfo.hashID = shmHash; strncpy(theInfo.typeID, shmType, sizeof(theInfo.typeID)-1); strncpy(theInfo.myName, shmImpl, sizeof(theInfo.myName)-1); // Create the new filename of the new file we will create // n = strlen(shmPath); shmTemp = (char *)malloc(n+8); sprintf(shmTemp, "%s.new", shmPath); // Open the file creaing as necessary // if ((shmFD = ShMam_Open(shmTemp, O_RDWR|O_CREAT, parms.mode)) < 0) return false; accMode = parms.mode; fileHelp.autoClose = true; // Verify that no one else is using this file. // if (!Lock(true, true)) {errno = EADDRINUSE; return false;} // Make the file as large as need be // if (ftruncate(shmFD, 0) || ftruncate(shmFD, totSz)) return false; // Map the file as a writable shared segment // shmBase = (char *)mmap(0, totSz, PROT_READ|PROT_WRITE, MAP_SHARED, shmFD, 0); if (shmBase == MAP_FAILED) return false; shmSize = totSz; isRW = true; // Copy the segment information into the segment // memcpy(shmBase, &theInfo, sizeof(theInfo)); xntP.intP = SHMADDR(int, SHMINFO(index)); shmIndex = xntP.antP; shmSlots = parms.indexSz; // A created table has, by definition, a single writer until it is exported. // So, we simply keep the r/w lock on the file until we export the file. Other // threads won't change that and other process will not be able to use the file. // lockRO = lockRW = false; fileHelp.autoClose = false; return true; } /******************************************************************************/ /* D e l I t e m */ /******************************************************************************/ bool XrdSsiShMam::DelItem(void *data, const char *key, int hash) { XLockHelper lockInfo(this, RWLock); MemItem *theItem, *prvItem; int hEnt, iOff; // Make sure we can delete an item // if (!shmSize) {errno = ENOTCONN; return false;} if (!isRW) {errno = EROFS; return false;} // Check if we need to remap this memory (atomic tests is not needed here) // if (verNum != SHMINFO(verNum)) ReMap(RWLock); // Lock the file if we have multiple writers or recycling items // We need to do this prior to file locking as the requirements may change. // if (lockRW && !lockInfo.FLock()) return false; // First try to find the item // if (!(hEnt = Find(theItem, prvItem, key, hash))) {if (data) {errno = ENOENT; return false;} return true; } // Return the contents of the item if the caller wishes that // if (data) memcpy(data, ITEM_VAL(theItem), shmTypeSz); // Delete the item from the index. The update of the count need not be atomic. // Also fetching of the next offset need not be atomic as we are the only one. // iOff = theItem->next; SHMINFO(itemCount)--; if (prvItem) Atomic_SET_STRICT(prvItem->next, iOff); // Atomic else {if (!iOff) SHMINFO(slotsUsed)--; Atomic_SET_STRICT(shmIndex[hEnt],iOff); // Atomic } RetItem(theItem); // Indicate the things we updated if need be // if (syncOn) {Updated(0); Updated(SHMOFFS(theItem)); if (prvItem) Updated(SHMOFFS(prvItem)); else Updated(SHMOFFS(&shmIndex[hEnt])); } // All done // return true; } /******************************************************************************/ /* D e t a c h */ /******************************************************************************/ void XrdSsiShMam::Detach() { // Clean up // if (shmFD >= 0) {close(shmFD); shmFD = -1;} if (shmSize) {munmap(shmBase, shmSize); shmSize = 0;} if (shmTemp) {free(shmTemp); shmTemp = 0;} shmIndex = 0; } /******************************************************************************/ /* E n u m e r a t e */ /******************************************************************************/ bool XrdSsiShMam::Enumerate(void *&jar) { EnumJar *theJar = (EnumJar *)jar; // Close off the enumeration // if (theJar) {delete theJar; jar = 0;} return true; } /******************************************************************************/ bool XrdSsiShMam::Enumerate(void *&jar, char *&key, void *&val) { XLockHelper lockInfo(this, ROLock); EnumJar *theJar = (EnumJar *)jar; MemItem *theItem; long long iTest; int rc, newFD, fence, iOff, hash = 0; // Make sure we can get an item // if (!shmSize) {errno = ENOTCONN; return false;} // If this is the first call, initialize the jar. First check if we need to // remap the segment. We need to do this prior to file locking as the // requirements may change. Then create a jar and a shadow copy of the segment. // if (!jar) {if (verNum != SHMINFO(verNum)) ReMap(ROLock); if ((newFD = ShMam_Dup(shmFD)) < 0) return false; theJar = new EnumJar(newFD, shmItemSz); jar = theJar; } else if (theJar->iNum < 0) {Enumerate(jar); errno = ENOENT; return false; } // Lock the file if we have multiple writers or recycling items // if (lockRO && !lockInfo.FLock()) {rc = errno; Enumerate(jar); errno = rc; return false;} // Compute the next key we should start the search at but make sure it will not // generate an overflow. In the process we fetch the stopping point only once. // iTest = (static_cast(theJar->iNum) * shmItemSz) + shmInfoSz; fence = SHMINFO(lowFree); // Atomic?? if (iTest < fence) iOff = static_cast(iTest); else iOff = fence; // Now start the search. Note that pread() must do a memory fence. // theItem = (MemItem *)(theJar->buff); while(iOff < fence) {rc = pread(theJar->fd, theJar->buff, shmItemSz, iOff); if (rc < 0) return false; if (rc != shmItemSz) break; if ((hash = theItem->hash)) break; // Atomic iOff += shmItemSz; } // Check if we found a key // if (!hash) {Enumerate(jar); errno = ENOENT; return false;} // Return the key and and the associated value // key = ITEM_KEY(theItem); val = ITEM_VAL(theItem); // Compute the contents of the new jar // theJar->iNum = (iOff - shmInfoSz)/shmItemSz + 1; return true; } /******************************************************************************/ /* E x p o r t */ /******************************************************************************/ bool XrdSsiShMam::Export() { MutexHelper mtHelp(&myMutex, RWLock); // Make sure we are attached and in R/W mode and exportable // if (!shmSize) {errno = ENOTCONN; return false;} if (!shmTemp) {errno = ENOPROTOOPT; return false;} if (!isRW) {errno = EROFS; return false;} // All that is left is to export the file using the internal interface. Tell // the exporter that we don't have the original file locked. // return ExportIt(false); } /******************************************************************************/ /* Private: E x p o r t I t */ /******************************************************************************/ bool XrdSsiShMam::ExportIt(bool fLocked) { int rc, oldFD; // If data synchronization was wanted, then flush the modified pages to // disk before we make this file visible. // if (syncOn) Flush(); // Open the original file. If it exists then lock it. We will need to do this // locally as the the Lock/Unlock() functions are cognizant of threads and that // is not the case here. We are a singleton. // if ((oldFD = ShMam_Open(shmPath, O_RDWR)) < 0) {if (errno != ENOENT) return false;} else if (!fLocked) {do {rc = flock(oldFD, LOCK_EX);} while(rc < 0 && errno == EINTR); if (rc) return false; } // Rename the new file on top of the old one (the fd's remain in tact) // if (rename(shmTemp, shmPath)) {if (oldFD) close(oldFD); return false;} free(shmTemp); shmTemp = 0; // If there was an original file then we must indicate that a new vesion has // been exported so current users switch to the new version. This is a lazy // version update because we just need readers to eventually realize this. // if (oldFD >= 0) {int vnum; bool noGo = false; if (pread(oldFD, &vnum, sizeof(vnum), 0) == (ssize_t)sizeof(vnum)) {vnum++; if (pwrite(oldFD, &vnum, sizeof(vnum), 0) != (ssize_t)sizeof(vnum)) noGo = true; } else noGo = true; if (noGo) cerr <<"SsiShMam: Unable to update version for " <hash && !strcmp(key, ITEM_KEY(theItem))) return hEnt; prvItem = theItem; iOff = Atomic_GET_STRICT(theItem->next); // Atomic? } // We did not find the item // return 0; } /******************************************************************************/ /* Private: F l u s h */ /******************************************************************************/ bool XrdSsiShMam::Flush() { int rc; // Do appropriate sync // #if _POSIX_SYNCHRONIZED_IO > 0 rc = fdatasync(shmFD) == 0; #else rc = fsync(shmFD) == 0; #endif // If we failed, issue message // if (rc) {rc = errno; cerr <<"ShMam: msync() failed; " <(crc); return (hval ? hval : 1); } /******************************************************************************/ /* Private: L o c k */ /******************************************************************************/ // The caller must have obtained a mutex consistent with the argument passed. bool XrdSsiShMam::Lock(bool xrw, bool nowait) { int rc, act = (xrw ? LOCK_EX : LOCK_SH); // Make sure we have a file descriptor to lock and is not already locked // if (shmFD < 0) {errno = EBADF; return false;} // We must keep track of r/o locks as there may be many requests but we can // only lock the file once for all of them. R/W locks are easier to handle as // only one thread can ever have such a lock request. Atomics do not help // for R/O locks because they suffer from an unlock control race and also // all R/O requestors must wait if the file is locked by another process. // if (xrw) lkCount = 1; else {pthread_mutex_lock(&lkMutex); if (lkCount++) {pthread_mutex_unlock(&lkMutex); return true;} } // Check if we should not wait for the lock // if (nowait) act |= LOCK_NB; // Now obtain the lock // do {rc = flock(shmFD, act);} while(rc < 0 && errno == EINTR); // Decrement lock count if we failed (we were optimistic). Note that we still // have the mutex locked if this was a T/O request. // if (rc) {if (xrw) lkCount = 0; else lkCount--; } // Unlock the mutex if we still have it locked and return result // if (!xrw) pthread_mutex_unlock(&lkMutex); return rc == 0; } /******************************************************************************/ /* I n f o */ /******************************************************************************/ int XrdSsiShMam::Info(const char *vname, char *buff, int blen) { MutexHelper mtHelp(&myMutex, ROLock); // Make sure we can delete an item // if (!shmSize) {errno = ENOTCONN; return 0;} if (!strcmp(vname, "atomics")) {int n = strlen(Atomic_IMP); strcpy(buff, Atomic_IMP); return n; } if (!strcmp(vname, "hash")) {if (!buff || blen < (int)(sizeof(int)+1)) {errno = EMSGSIZE; return -1;} memcpy(buff, &SHMINFO(hashID), sizeof(int)); buff[sizeof(int)] = 0; return strlen(buff); } if (!strcmp(vname, "impl")) {int n = strlen(SHMINFO(myName)); if (!buff || blen < n) {errno = EMSGSIZE; return -1;} strcpy(buff, SHMINFO(myName)); return n; } if (!strcmp(vname, "flockro")) return lockRO; if (!strcmp(vname, "flockrw")) return lockRW; if (!strcmp(vname, "indexsz")) return shmSlots; if (!strcmp(vname, "indexused")) return SHMINFO(slotsUsed); if (!strcmp(vname, "keys")) return SHMINFO(itemCount); // Atomic if (!strcmp(vname, "keysfree")) return (SHMINFO(highUse) - SHMINFO(lowFree))/shmItemSz + SHMINFO(freeCount); if (!strcmp(vname, "maxkeylen")) return SHMINFO(maxKeySz); if (!strcmp(vname, "multw")) return multW; if (!strcmp(vname, "reuse")) return reUse; if (!strcmp(vname, "type")) {int n = strlen(SHMINFO(typeID)); if (!buff || blen < n) {errno = EMSGSIZE; return -1;} strcpy(buff, SHMINFO(typeID)); return n; } if (!strcmp(vname, "typesz")) return SHMINFO(typeSz); // Return variable not supported // errno = ENOSYS; return -1; } /******************************************************************************/ /* Private: N e w I t e m */ /******************************************************************************/ XrdSsiShMam::MemItem *XrdSsiShMam::NewItem() { MemItem *itemP; int iOff; // First see if we can get this from the free chain // if (reUse && SHMINFO(freeItem)) {iOff = SHMINFO(freeItem); itemP = SHMADDR(MemItem, iOff); SHMINFO(freeItem) = itemP->next; SHMINFO(freeCount)--; // Atomic? } else { int newFree = SHMINFO(lowFree) + shmItemSz; if (newFree > SHMINFO(highUse)) itemP = 0; else {iOff = SHMINFO(lowFree); itemP = SHMADDR(MemItem, iOff); SHMINFO(lowFree) = newFree; } } // Return result // return itemP; } /******************************************************************************/ /* Private: R e M a p */ /******************************************************************************/ bool XrdSsiShMam::ReMap(LockType iHave) { XrdSsiShMat::NewParms parms; // If the caller has a read mutex then we must change it to a r/w mutex as we // may be changing all sorts of variables. It will continue holding this mutex. // Fortunately, remappings do not occur very often in practice. // if (iHave == ROLock) {pthread_rwlock_unlock(&myMutex); pthread_rwlock_wrlock(&myMutex); } // Check if the version number no longer differs, then just return. This may // happen because a previous thread forced the remapping and everyone was // waiting for that to happen as we hold the r/w mutex. // if (verNum == SHMINFO(verNum)) return false; // Setup parms // parms.impl = shmImpl; parms.path = shmPath; parms.typeID = shmType; parms.typeSz = shmTypeSz; parms.hashID = shmHash; // Attach the new segment. If we fail, then just continue // XrdSsiShMam newMap(parms); if (!newMap.Attach(timeOut, isRW)) return false; // Swap the new map with our map // SwapMap(newMap); return true; } /******************************************************************************/ /* R e s i z e */ /******************************************************************************/ bool XrdSsiShMam::Resize(XrdSsiShMat::CRZParms &parms) { XLockHelper lockInfo(this, RWLock); XrdSsiShMat::NewParms newParms; MemItem *theItem; void *val; char *key; int fence, iOff, hash; // Make sure we can delete an item // if (!shmSize) {errno = ENOTCONN; return false;} if (!isRW) {errno = EROFS; return false;} // Validate parameter list values // if (parms.indexSz < 0 || parms.maxKeys < 0 || parms.maxKLen < 0) {errno = EINVAL; return false;} // A resize is not permitted on an un-exported segment // if (shmTemp) {errno = EPERM; return false;} // Check if we need to remap this memory (atomic tests is not needed here) // if (verNum != SHMINFO(verNum)) ReMap(RWLock); // Lock the source file // if (!lockInfo.FLock()) return false; // Setup parms for the segment object // newParms.impl = shmImpl; newParms.path = shmPath; newParms.typeID = shmType; newParms.typeSz = shmTypeSz; newParms.hashID = shmHash; // Create a new segment object (this cannot fail). // XrdSsiShMam newMap(newParms); // Set the values in the parameter list for those wanting the current setting. // if (!parms.indexSz) parms.indexSz = shmSlots; if (!parms.maxKeys) parms.maxKeys = SHMINFO(maxKeys); if (!parms.maxKLen) parms.maxKLen = maxKLen; if (parms.reUse < 0) parms.reUse = reUse; if (parms.multW < 0) parms.multW = multW; // Create the new target file // parms.mode = accMode; if (!newMap.Create(parms)) return false; // Compute the offset of the first item and get the offset of teh last item. // fence = SHMINFO(lowFree); // Atomic?? iOff = shmInfoSz; // For each item found in the current map add it to the new map // while(iOff < fence) {theItem = SHMADDR(MemItem, iOff); if ((hash = theItem->hash)) {key = ITEM_KEY(theItem); val = ITEM_VAL(theItem); if (!newMap.AddItem(val, 0, key, hash, true)) return false; } iOff += shmItemSz; } // We need to drop the lock on the file otherwise the export will hang // // All went well, so export this the new map using the internal interface as // we already have the source file locked and export normally tries to lock it. // if (!newMap.ExportIt(true)) return false; // All that we need to do is to swap the map with our map and we are done. // SwapMap(newMap); return true; } /******************************************************************************/ /* Private: R e t I t e m */ /******************************************************************************/ void XrdSsiShMam::RetItem(MemItem *iP) { // Zorch the hash so this item cannot be found. This is problematic for // enumerations. They may or may not include this key, but at least it will // consistent at the time the enumeration happens. // iP->hash = 0; // Atomic? // If reuse is allowed, place the item on the free list // if (reUse) {iP->next = SHMINFO(freeItem); SHMINFO(freeItem) = SHMOFFS(iP); SHMINFO(freeCount)++; //Atomic?? } } /******************************************************************************/ /* Private: S e t L o c k i n g */ /******************************************************************************/ void XrdSsiShMam::SetLocking(bool isrw) { // If we do not have atomics then file locking is mandatory // #ifdef NEED_ATOMIC_MUTEX lockRO = lockRW = true; #else // A reader must lock the file R/O if objects are being reused // lockRO = reUse = SHMINFO(reUse); // A writer must lock the file R/W if objects are being reused or the file may // have multiple writers // multW = SHMINFO(multW); lockRW = reUse || multW; #endif } /******************************************************************************/ /* S n o o z e */ /******************************************************************************/ void XrdSsiShMam::Snooze(int sec) { struct timespec naptime, waketime; // Calculate nano sleep time // naptime.tv_sec = sec; naptime.tv_nsec = 0; // Wait for a number of seconds // while(nanosleep(&naptime, &waketime) && EINTR == errno) {naptime.tv_sec = waketime.tv_sec; naptime.tv_nsec = waketime.tv_nsec; } } /******************************************************************************/ /* Private: S w a p M a p */ /******************************************************************************/ void XrdSsiShMam::SwapMap(XrdSsiShMam &newMap) { // Detach the old map // Detach(); // Swap the maps // shmFD = newMap.shmFD; newMap.shmFD = -1; shmSize = newMap.shmSize; newMap.shmSize = 0; shmBase = newMap.shmBase; newMap.shmBase = 0; shmIndex = newMap.shmIndex; newMap.shmIndex = 0; lockRO = newMap.lockRO; lockRW = newMap.lockRW; reUse = newMap.reUse; multW = newMap.multW; verNum = newMap.verNum; } /******************************************************************************/ /* S y n c */ /******************************************************************************/ bool XrdSsiShMam::Sync() { MutexHelper mtHelp(&myMutex, RWLock); // Make sure we are attached and in R/W mode // if (!shmSize) {errno = ENOTCONN; return false;} if (!isRW) {errno = EROFS; return false;} // For now do a flush as this works in Linux. We may need to generalize this // for all platforms using msync, sigh. // if (!Flush()) return false; // Reset counters // syncBase = false; syncLast = 0; syncQWR = 0; return true; } /******************************************************************************/ bool XrdSsiShMam::Sync(int syncqsz) { MutexHelper mtHelp(&myMutex, RWLock); // Make sure we are attached and in R/W mode // if (!shmSize) {errno = ENOTCONN; return false;} if (!isRW) {errno = EROFS; return false;} if (syncqsz < 0) {errno = EINVAL; return false;} // Flush out pages if sync it turned on // if (syncOn && !Flush()) return false; // Set new queue size // syncQSZ = syncqsz; return true; } /******************************************************************************/ bool XrdSsiShMam::Sync(bool dosync, bool syncdo) { MutexHelper mtHelp(&myMutex, RWLock); // Make sure we are attached and in R/W mode // if (!shmSize) {errno = ENOTCONN; return false;} if (!isRW) {errno = EROFS; return false;} // Flush out pages if sync it turned on // if (syncOn && !Flush()) return false; // Set new options // syncOn = dosync; syncOpt = (syncdo ? MS_SYNC : MS_ASYNC); return true; } /******************************************************************************/ /* Private: U n L o c k */ /******************************************************************************/ // The caller must have obtained a mutex consistent with the argument passed. void XrdSsiShMam::UnLock(bool isrw) { int rc; // Make sure we have a file descriptor to unlock // if (shmFD < 0) return; // If this is a R/W type of lock then we can immediate release it as there // could have been only one writer. Otherwise, we will need to keep track // of the number of R/O locks has dropped to zero before unlocking the file. // Atomics do not help here because of possible thread inversion. // if (isrw) lkCount = 0; else {pthread_mutex_lock(&lkMutex); lkCount--; if (lkCount) {pthread_mutex_unlock(&lkMutex); return;} } // Now release the lock // do {rc = flock(shmFD, LOCK_UN);} while(rc < 0 && errno == EINTR); // If this was a r/o unlock then we have kept the mutex and must unlock it // We kept the mutex to prevent a control race condition. // if (!isrw) pthread_mutex_unlock(&lkMutex); } /******************************************************************************/ /* Private: U p d a t e d */ /******************************************************************************/ void XrdSsiShMam::Updated(int mOff) { // Check if this refers to the info struct // if (!mOff) {if (!syncBase) {syncBase = true; syncQWR++;} } else { if (syncLast != (mOff & PageMask)) {syncLast = (mOff & PageMask); syncQWR++;} } // Check if we need to flush now // if (syncQWR >= syncQSZ) {ShMam_Flush(shmFD); syncQWR = 0;} } /******************************************************************************/ void XrdSsiShMam::Updated(int mOff, int mLen) { int memB = mOff & PageMask; int memE = mOff + mLen; // This is a range update. This is not very precise if update the same page // and the we cross the page boundary. But it should be good enough. // if (memB != syncLast) {syncQWR++; if (memB != (memE & PageMask)) syncQWR++; syncLast = memB; } // Check if we need to flush now // if (syncQWR >= syncQSZ) {ShMam_Flush(shmFD); syncQWR = 0;} }