Parallel put: ------------- current implementaion: int rsDataObjPut (rsComm_t *rsComm, dataObjInp_t *dataObjInp, bytesBuf_t *dataObjInpBBuf, portalOprOut_t **portalOprOut) { remoteFlag = getAndConnRemoteZone (rsComm, dataObjInp, &rodsServerHost, REMOTE_CREATE); if (remoteFlag == LOCAL_HOST) { dataObjInp->openFlags = O_RDWR; status = _rsDataObjPut (rsComm, dataObjInp, dataObjInpBBuf, portalOprOut, BRANCH_MSG); } else { int l1descInx; status = _rcDataObjPut (rodsServerHost->conn, dataObjInp, dataObjInpBBuf, portalOprOut); if (status < 0 || getValByKey (&dataObjInp->condInput, DATA_INCLUDED_KW) != NULL) { return (status); } else { /* have to allocate a local l1descInx to keep track of things * since the file is in remote zone. It sets remoteL1descInx, * oprType = REMOTE_ZONE_OPR and remoteZoneHost so that * rsComplete knows what to do */ l1descInx = allocAndSetL1descForZoneOpr ( (*portalOprOut)->l1descInx, dataObjInp, rodsServerHost, NULL); if (l1descInx < 0) return l1descInx; (*portalOprOut)->l1descInx = l1descInx; return status; } } } _rsDataObjPut() { if (getValByKey (&dataObjInp->condInput, DATA_INCLUDED_KW) != NULL) { status = l3DataPutSingleBuf (rsComm, dataObjInp, dataObjInpBBuf); return; } /* para IO fall through here */ l1descInx = rsDataObjCreate (rsComm, dataObjInp); status = l2DataObjPut (rsComm, l1descInx, portalOprOut); retval = sendAndRecvBranchMsg (rsComm, rsComm->apiInx, status, (void *) *portalOprOut, NULL); if (handlerFlag & INTERNAL_SVR_CALL) { /* internal call. want to know the real status */ return (retval); } else { /* already send the client the status */ return (SYS_NO_HANDLER_REPLY_MSG); } } int l2DataObjPut (rsComm_t *rsComm, int l1descInx, portalOprOut_t **portalOprOut) { /* set dataOprInp->destL3descInx = L1desc[l1descInx].l3descInx; */ initDataOprInp (&dataOprInp, l1descInx, PUT_OPR); if (L1desc[l1descInx].remoteZoneHost != NULL) { status = remoteDataPut (rsComm, &dataOprInp, portalOprOut, L1desc[l1descInx].remoteZoneHost); } else { status = rsDataPut (rsComm, &dataOprInp, portalOprOut); } } typedef struct DataOprInp { int oprType; int numThreads; int srcL3descInx; int destL3descInx; int srcRescTypeInx; int destRescTypeInx; /* XXXXXXX offset and dataSize moved to here because of problem with * 64 bit susue linux that condInput has pointer's in it which * cause condInput to be aligned at 64 the beginning and end of condInput */ rodsLong_t offset; rodsLong_t dataSize; keyValPair_t condInput; } dataOprInp_t; rsDataPut.c: int rsDataPut (rsComm_t *rsComm, dataOprInp_t *dataOprInp, portalOprOut_t **portalOprOut) { if (getValByKey (&dataOprInp->condInput, EXEC_LOCALLY_KW) != NULL) { remoteFlag = LOCAL_HOST; } else { rodsServerHost = FileDesc[l3descInx].rodsServerHost; if (rodsServerHost == NULL) { rodsLog (LOG_NOTICE, "rsDataPut: NULL rodsServerHost"); return (SYS_INTERNAL_NULL_INPUT_ERR); } remoteFlag = rodsServerHost->localFlag; } if (remoteFlag == LOCAL_HOST) { status = _rsDataPut (rsComm, dataOprInp, portalOprOut); } else { addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, ""); status = remoteDataPut (rsComm, dataOprInp, portalOprOut, rodsServerHost); clearKeyVal (&dataOprInp->condInput); } } int _rsDataPut (rsComm_t *rsComm, dataOprInp_t *dataOprInp, portalOprOut_t **portalOprOut) { if (myDataObjPutOut->numThreads == 0) { return 0; } else { portalOpr_t *myPortalOpr; /* setup the portal */ portalSock = createSrvPortal (rsComm, &myDataObjPutOut->portList, proto); /* Note that myPortalOpr is saved in rsComm->portalOpr which will * be checked by the API handler */ myPortalOpr = rsComm->portalOpr = (portalOpr_t *) malloc (sizeof (portalOpr_t)); myPortalOpr->oprType = PUT_OPR; myPortalOpr->portList = myDataObjPutOut->portList; myPortalOpr->dataOprInp = *dataOprInp; memset (&dataOprInp->condInput, 0, sizeof (dataOprInp->condInput)); myPortalOpr->dataOprInp.numThreads = myDataObjPutOut->numThreads; } } rsApiHandler.c: int sendAndProcApiReply ( rsComm_t *rsComm, int apiInx, int status, void *myOutStruct, bytesBuf_t *myOutBsBBuf) { if (rsComm->portalOpr != NULL) { handlePortalOpr (rsComm); } } Parallel get: ------------- current implementaion: Most looks pretty much the same as put. int _rsDataObjGet (rsComm_t *rsComm, dataObjInp_t *dataObjInp, portalOprOut_t **portalOprOut, bytesBuf_t *dataObjOutBBuf, int handlerFlag) /* PHYOPEN_BY_SIZE ask it to check whether "dataInclude" should be done */ addKeyVal (&dataObjInp->condInput, PHYOPEN_BY_SIZE_KW, ""); l1descInx = _rsDataObjOpen (rsComm, dataObjInp); status = l1descInx; /* means file no included */ } Parallel copy: -------------- Replication: int _rsDataObjReplS (rsComm_t *rsComm, dataObjInp_t *dataObjInp, dataObjInfo_t *srcDataObjInfo, rescInfo_t *destRescInfo, char *rescGroupName, dataObjInfo_t *destDataObjInfo) { l1descInx = dataObjOpenForRepl (rsComm, dataObjInp, srcDataObjInfo, destRescInfo, rescGroupName, destDataObjInfo); if (L1desc[l1descInx].stageFlag != NO_STAGING) { status = l3DataStageSync (rsComm, l1descInx); } else if (dataObjInp->numThreads == 0) { status = l3DataCopySingleBuf (rsComm, l1descInx); } else { status = dataObjCopy (rsComm, l1descInx); } } int dataObjOpenForRepl (rsComm_t *rsComm, dataObjInp_t *dataObjInp, dataObjInfo_t *inpSrcDataObjInfo, rescInfo_t *destRescInfo, char *rescGroupName, dataObjInfo_t *inpDestDataObjInfo) { int destRescClass; int srcRescClass = getRescClass (inpSrcDataObjInfo->rescInfo); destRescClass = getRescClass (myDestRescInfo); destL1descInx = allocL1desc (); if (inpDestDataObjInfo != NULL && inpDestDataObjInfo->dataId > 0) { /* overwriting an existing replica */ /* inherit the replStatus of the src */ inpDestDataObjInfo->replStatus = srcDataObjInfo->replStatus; *myDestDataObjInfo = *inpDestDataObjInfo; destExist = 1; replStatus = srcDataObjInfo->replStatus | OPEN_EXISTING_COPY; addKeyVal (&dataObjInp->condInput, FORCE_FLAG_KW, ""); dataObjInp->openFlags |= (O_TRUNC | O_WRONLY); } else { initDataObjInfoForRepl (myDestDataObjInfo, srcDataObjInfo, destRescInfo); } if (destRescClass == COMPOUND_CL) { L1desc[destL1descInx].stageFlag = SYNC_DEST; } else if (srcRescClass == COMPOUND_CL) { L1desc[destL1descInx].stageFlag = STAGE_SRC; } if (dataObjInp->numThreads > 0 && L1desc[destL1descInx].stageFlag == NO_STAGING) { if (destExist > 0) { status = dataOpen (rsComm, destL1descInx); } else { status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp); if (status >= 0) status = dataCreate (rsComm, destL1descInx); } } else { if (destExist == 0) { status = getFilePathName (rsComm, myDestDataObjInfo, L1desc[destL1descInx].dataObjInp); if (status < 0) { freeL1desc (destL1descInx); return (status); } } } /* open the src */ if (dataObjInp->oprType == PHYMV_OPR) { L1desc[srcL1descInx].oprType = PHYMV_SRC; } else { L1desc[srcL1descInx].oprType = REPLICATE_SRC; } if (dataObjInp->numThreads > 0 && L1desc[destL1descInx].stageFlag == NO_STAGING) { dataObjInp_t dataObjCloseInp; dataObjInp->openFlags = O_RDONLY; status = dataOpen (rsComm, srcL1descInx); if (status < 0) { freeL1desc (srcL1descInx); memset (&dataObjCloseInp, 0, sizeof (dataObjCloseInp)); dataObjCloseInp.l1descInx = destL1descInx; rsDataObjClose (rsComm, &dataObjCloseInp); return (status); } } L1desc[destL1descInx].srcL1descInx = srcL1descInx; } int dataObjCopy (rsComm_t *rsComm, int l1descInx) { if (srcRemoteFlag != REMOTE_ZONE_HOST && destRemoteFlag != REMOTE_ZONE_HOST && FileDesc[srcL3descInx].rodsServerHost == FileDesc[destL3descInx].rodsServerHost) { dataOprInp->srcL3descInx = srcL3descInx; dataOprInp->srcRescTypeInx = dataObjInfo->rescInfo->rescTypeInx; if (srcRemoteFlag == LOCAL_HOST) { addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, ""); } } else if (srcRemoteFlag == LOCAL_HOST && destRemoteFlag != LOCAL_HOST) { initDataOprInp (&dataCopyInp.dataOprInp, srcL1descInx, COPY_TO_REM_OPR); status = l2DataObjPut (rsComm, destL1descInx, &portalOprOut); dataCopyInp.portalOprOut = *portalOprOut; addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, ""); } else if (srcRemoteFlag != LOCAL_HOST && destRemoteFlag == LOCAL_HOST) { initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR); status = l2DataObjGet (rsComm, srcL1descInx, &portalOprOut); addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, ""); dataCopyInp.portalOprOut = *portalOprOut; } else { initDataOprInp (&dataCopyInp.dataOprInp, l1descInx, COPY_TO_LOCAL_OPR); status = l2DataObjGet (rsComm, srcL1descInx, &portalOprOut); dataCopyInp.portalOprOut = *portalOprOut; } status = rsDataCopy (rsComm, &dataCopyInp); } int rsDataCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp) { int status; int l3descInx; rodsServerHost_t *rodsServerHost; dataOprInp_t *dataOprInp; dataOprInp = &dataCopyInp->dataOprInp; if (getValByKey (&dataOprInp->condInput, EXEC_LOCALLY_KW) != NULL) { status = _rsDataCopy (rsComm, dataCopyInp); } else { l3descInx = dataOprInp->destL3descInx; rodsServerHost = FileDesc[l3descInx].rodsServerHost; addKeyVal (&dataOprInp->condInput, EXEC_LOCALLY_KW, ""); status = remoteDataCopy (rsComm, dataCopyInp, rodsServerHost); clearKeyVal (&dataOprInp->condInput); } } int remoteDataCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp, rodsServerHost_t *rodsServerHost) { dataCopyInp->dataOprInp.destL3descInx = convL3descInx (dataCopyInp->dataOprInp.destL3descInx); status = rcDataCopy (rodsServerHost->conn, dataCopyInp); } int _rsDataCopy (rsComm_t *rsComm, dataCopyInp_t *dataCopyInp) { dataOprInp = &dataCopyInp->dataOprInp; if (dataOprInp->oprType == SAME_HOST_COPY_OPR) { /* src is on the same host */ retVal = sameHostCopy (rsComm, dataCopyInp); } else if (dataOprInp->oprType == COPY_TO_LOCAL_OPR || dataOprInp->oprType == COPY_TO_REM_OPR) { retVal = remLocCopy (rsComm, dataCopyInp); } } From rsDataObjCopy: int rsDataObjCopy (rsComm_t *rsComm, dataObjCopyInp_t *dataObjCopyInp, transStat_t **transStat) { remoteFlag = getAndConnRemoteZoneForCopy (rsComm, dataObjCopyInp, &rodsServerHost); if (remoteFlag < 0) { return (remoteFlag); } else if (remoteFlag == REMOTE_HOST) { status = _rcDataObjCopy (rodsServerHost->conn, dataObjCopyInp, transStat); return status; } addKeyVal (&srcDataObjInp->condInput, PHYOPEN_BY_SIZE_KW, ""); srcL1descInx = rsDataObjOpen (rsComm, srcDataObjInp); if (L1desc[srcL1descInx].l3descInx <= 2) { /* dataSingleBuf */ addKeyVal (&destDataObjInp->condInput, NO_OPEN_FLAG_KW, ""); } destL1descInx = rsDataObjCreate (rsComm, destDataObjInp); status = _rsDataObjCopy (rsComm, destL1descInx, existFlag); } int _rsDataObjCopy (rsComm_t *rsComm, int destL1descInx, int existFlag) { if (L1desc[srcL1descInx].l3descInx <= 2) { /* no physical file was opened */ status = l3DataCopySingleBuf (rsComm, destL1descInx); if (status >= 0 && existFlag == 0 && destDataObjInfo->specColl == NULL && L1desc[destL1descInx].remoteZoneHost == NULL) { status = svrRegDataObj (rsComm, destDataObjInfo); } } else { destDataObjInp->numThreads = getNumThreads (rsComm, srcDataObjInfo->dataSize, destDataObjInp->numThreads, NULL); status = dataObjCopy (rsComm, destL1descInx); } rsDataObjClose (rsComm, &dataObjCloseInp); }