/*** Copyright (c), The Regents of the University of California            ***
 *** For more information please refer to files in the COPYRIGHT directory ***/
/* specColl.c - special collection (mounted and linked collections) routines */

#include "dataObjOpr.h"
#include "specColl.h"
#include "collection.h"
#include "resource.h"
#include "genQuery.h"
#include "rodsClient.h"

static int HaveFailedSpecCollPath = 0;
static char FailedSpecCollPath[MAX_NAME_LEN];

/* querySpecColl - The query can produce multiple answer and only one
 * is correct. e.g., objPath = /x/yabc can produce answers:
 * /x/y, /x/ya, /x/yabc, etc. The calling subroutine need to screen
 * /x/y, /x/ya out
 * check queueSpecCollCache () for screening.
 */
int
querySpecColl (rsComm_t *rsComm, char *objPath, genQueryOut_t **genQueryOut)
{
    genQueryInp_t genQueryInp;
    int status;
    char condStr[MAX_NAME_LEN];

    if (HaveFailedSpecCollPath && strcmp (objPath, FailedSpecCollPath) == 0)
        return CAT_NO_ROWS_FOUND;

    /* see if objPath is in the path of a spec collection */
    memset (&genQueryInp, 0, sizeof (genQueryInp));

    snprintf (condStr, MAX_NAME_LEN, "parent_of '%s'", objPath);
    addInxVal (&genQueryInp.sqlCondInp, COL_COLL_NAME, condStr);
    rstrcpy (condStr, "like '_%'", MAX_NAME_LEN);
    addInxVal (&genQueryInp.sqlCondInp, COL_COLL_TYPE, condStr);

    addInxIval (&genQueryInp.selectInp, COL_COLL_ID, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_NAME, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_OWNER_NAME, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_OWNER_ZONE, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_CREATE_TIME, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_MODIFY_TIME, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_TYPE, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_INFO1, 1);
    addInxIval (&genQueryInp.selectInp, COL_COLL_INFO2, 1);

    genQueryInp.maxRows = MAX_SQL_ROWS;

    status =  rsGenQuery (rsComm, &genQueryInp, genQueryOut);

    clearGenQueryInp (&genQueryInp);

    if (status < 0) {
        rstrcpy (FailedSpecCollPath, objPath, MAX_NAME_LEN);
        HaveFailedSpecCollPath = 1;
        return (status);
    }

    return (0);
}

/* queueSpecCollCache - queue the specColl given in genQueryOut.
 * genQueryOut may contain multiple answer and only one
 * is correct. e.g., objPath = /x/yabc can produce answers:
 * /x/y, /x/ya, /x/yabc, etc. The calling subroutine need to screen
 * /x/y, /x/ya out
 */

int
queueSpecCollCache (rsComm_t *rsComm, genQueryOut_t *genQueryOut, char *objPath)
{
    specCollCache_t *tmpSpecCollCache;
    int status;
    int i;
    sqlResult_t *dataId;
    sqlResult_t *ownerName;
    sqlResult_t *ownerZone;
    sqlResult_t *createTime;
    sqlResult_t *modifyTime;
    sqlResult_t *collType;
    sqlResult_t *collection;
    sqlResult_t *collInfo1;
    sqlResult_t *collInfo2;
    char *tmpDataId, *tmpOwnerName, *tmpOwnerZone, *tmpCreateTime,
      *tmpModifyTime, *tmpCollType, *tmpCollection, *tmpCollInfo1,
      *tmpCollInfo2;
    specColl_t *specColl;

    if ((dataId = getSqlResultByInx (genQueryOut, COL_COLL_ID)) == NULL) {
        rodsLog (LOG_ERROR,
          "queueSpecCollCache: getSqlResultByInx for COL_COLL_ID failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((ownerName = getSqlResultByInx (genQueryOut,
      COL_COLL_OWNER_NAME)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_OWNER_NAME failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((ownerZone = getSqlResultByInx (genQueryOut,
      COL_COLL_OWNER_ZONE)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_OWNER_ZONE failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((createTime = getSqlResultByInx (genQueryOut,
      COL_COLL_CREATE_TIME)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_CREATE_TIME failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((modifyTime = getSqlResultByInx (genQueryOut,
      COL_COLL_MODIFY_TIME)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_MODIFY_TIME failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((collType = getSqlResultByInx (genQueryOut,
      COL_COLL_TYPE)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_TYPE failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((collection = getSqlResultByInx (genQueryOut,
      COL_COLL_NAME)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_NAME failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((collInfo1 = getSqlResultByInx (genQueryOut,
      COL_COLL_INFO1)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_INFO1 failed");
        return (UNMATCHED_KEY_OR_INDEX);
    } else if ((collInfo2 = getSqlResultByInx (genQueryOut,
      COL_COLL_INFO2)) == NULL) {
        rodsLog (LOG_ERROR,
         "queueSpecCollCache:getSqlResultByInx for COL_COLL_INFO2 failed");
        return (UNMATCHED_KEY_OR_INDEX);
    }

    for (i = 0; i <= genQueryOut->rowCnt; i++) {
        int len;
        char *tmpPtr;

        tmpCollection = &collection->value[collection->len * i];

        len = strlen (tmpCollection);
        tmpPtr = objPath + len;

        if (*tmpPtr == '\0' || *tmpPtr == '/') {
            tmpSpecCollCache = (specCollCache_t*)malloc (sizeof (specCollCache_t));
            memset (tmpSpecCollCache, 0, sizeof (specCollCache_t));

            tmpDataId = &dataId->value[dataId->len * i];
            tmpOwnerName = &ownerName->value[ownerName->len * i];
            tmpOwnerZone = &ownerZone->value[ownerZone->len * i];
            tmpCreateTime = &createTime->value[createTime->len * i];
            tmpModifyTime = &modifyTime->value[modifyTime->len * i];
            tmpCollType = &collType->value[collType->len * i];
            tmpCollInfo1 = &collInfo1->value[collInfo1->len * i];
            tmpCollInfo2 = &collInfo2->value[collInfo2->len * i];

            specColl = &tmpSpecCollCache->specColl;
            status = resolveSpecCollType (tmpCollType, tmpCollection,
              tmpCollInfo1, tmpCollInfo2, specColl);
            if (status < 0) return status;

	    if (specColl->collClass == STRUCT_FILE_COLL && 
	      specColl->type == TAR_STRUCT_FILE_T) {
		/* tar struct file. need to get phyPath */
		status = getPhyPath (rsComm, specColl->objPath,
		  specColl->resource, specColl->phyPath);

	        if (status < 0) {
                    rodsLog (LOG_ERROR,
                      "queueSpecCollCache: getPhyPath failed for %s",
		      specColl->objPath);
                    return status;
		}
	    }
            rstrcpy (tmpSpecCollCache->collId, tmpDataId, NAME_LEN);
            rstrcpy (tmpSpecCollCache->ownerName, tmpOwnerName, NAME_LEN);
            rstrcpy (tmpSpecCollCache->ownerZone, tmpOwnerZone, NAME_LEN);
            rstrcpy (tmpSpecCollCache->createTime, tmpCreateTime, NAME_LEN);
            rstrcpy (tmpSpecCollCache->modifyTime, tmpModifyTime, NAME_LEN);
            tmpSpecCollCache->next = SpecCollCacheHead;
            SpecCollCacheHead = tmpSpecCollCache;
            return 0;
        }
    }

    return CAT_NO_ROWS_FOUND;
}

int
queueSpecCollCacheWithObjStat (rodsObjStat_t *rodsObjStatOut)
{
    specCollCache_t *tmpSpecCollCache;

    tmpSpecCollCache = (specCollCache_t*)malloc (sizeof (specCollCache_t));
    memset (tmpSpecCollCache, 0, sizeof (specCollCache_t));

    tmpSpecCollCache->specColl = *rodsObjStatOut->specColl;

    rstrcpy (tmpSpecCollCache->collId, rodsObjStatOut->dataId, NAME_LEN);
    rstrcpy (tmpSpecCollCache->ownerName, rodsObjStatOut->ownerName, NAME_LEN);
    rstrcpy (tmpSpecCollCache->ownerZone, rodsObjStatOut->ownerZone, NAME_LEN);
    rstrcpy (tmpSpecCollCache->createTime, rodsObjStatOut->createTime,
      NAME_LEN);
    rstrcpy (tmpSpecCollCache->modifyTime, rodsObjStatOut->modifyTime,
      NAME_LEN);

    tmpSpecCollCache->next = SpecCollCacheHead;
    SpecCollCacheHead = tmpSpecCollCache;

    return 0;

}

specCollCache_t *
matchSpecCollCache (char *objPath)
{
    specCollCache_t *tmpSpecCollCache = SpecCollCacheHead;

    while (tmpSpecCollCache != NULL) {
        int len = strlen (tmpSpecCollCache->specColl.collection);
        if (strncmp (tmpSpecCollCache->specColl.collection, objPath, len)
          == 0) {
            char *tmpPtr = objPath + len;

            if (*tmpPtr == '\0' || *tmpPtr == '/') {
                return (tmpSpecCollCache);
            }
        }
        tmpSpecCollCache = tmpSpecCollCache->next;
    }
    return (NULL);
}

/* getSpecCollCache - check if the path is in a special collection.
 * If it is, output the special coll in specCollCache.
 * If not, return SYS_SPEC_COLL_NOT_IN_CACHE if check for inCachOnly or
 * CAT_NO_ROWS_FOUND.
 */
int
getSpecCollCache (rsComm_t *rsComm, char *objPath,
int inCachOnly, specCollCache_t **specCollCache)
{
    int status;
    genQueryOut_t *genQueryOut = NULL;

    if ((*specCollCache = matchSpecCollCache (objPath)) != NULL) {
        return (0);
    } else if (inCachOnly > 0) {
        return (SYS_SPEC_COLL_NOT_IN_CACHE);
    }

    status = querySpecColl (rsComm, objPath, &genQueryOut);
    if (status < 0) return (status);

    status = queueSpecCollCache (rsComm, genQueryOut, objPath);
    freeGenQueryOut (&genQueryOut);

    if (status < 0) return (status);
    *specCollCache = SpecCollCacheHead;  /* queued at top */

    return (0);
}

int
modCollInfo2 (rsComm_t *rsComm, specColl_t *specColl, int clearFlag)
{
    int status;
    char collInfo2[MAX_NAME_LEN];
    collInp_t modCollInp;

    memset (&modCollInp, 0, sizeof (modCollInp));
    rstrcpy (modCollInp.collName, specColl->collection, MAX_NAME_LEN);
    /*** RAJA REMOVED AS HE WANTED TYPE NOT TO CHANGE FOR MSSO ***
    addKeyVal (&modCollInp.condInput, COLLECTION_TYPE_KW,
    TAR_STRUCT_FILE_STR);  MIKE HADFOLLOWING COMMENT BEFORE  ****/  
    /* need this or rsModColl   fail */
    if (clearFlag > 0) {
        rstrcpy (collInfo2, "NULL_SPECIAL_VALUE", MAX_NAME_LEN);
    } else {
        makeCachedStructFileStr (collInfo2, specColl);
    }
    addKeyVal (&modCollInp.condInput, COLLECTION_INFO2_KW, collInfo2);
    status = rsModColl (rsComm, &modCollInp);
    if (status < 0) {
        rodsLog (LOG_NOTICE,
         "tarSubStructFileWrite:rsModColl error for Coll %s,stat=%d",
         modCollInp.collName, status);
    }
    return status;
}

/* statPathInSpecColl - stat the path given in objPath assuming it is
 * in the path of a special collection. The inCachOnly flag asks it to
 * check the specColl in the global cache only. The output of the
 * stat is given in rodsObjStatOut.
 * The object can be in a special collection but does not exist, then
 * objType = UNKNOWN_OBJ_T.
 * The only time inCachOnly is on is called from rsObjStat to check
 * local cache first before calling iCAT to resolve the path.
 *
 */

int
statPathInSpecColl (rsComm_t *rsComm, char *objPath,
int inCachOnly, rodsObjStat_t **rodsObjStatOut)
{
    int status;
    dataObjInfo_t *dataObjInfo = NULL;
    specColl_t *specColl;
    specCollCache_t *specCollCache;

    if ((status = getSpecCollCache (rsComm, objPath, inCachOnly,
      &specCollCache)) < 0) {
        if (status != SYS_SPEC_COLL_NOT_IN_CACHE &&
          status != CAT_NO_ROWS_FOUND){
            rodsLog (LOG_ERROR,
              "statPathInSpecColl: getSpecCollCache for %s, status = %d",
              objPath, status);
        }
        return (status);
    }

    if (*rodsObjStatOut == NULL)
        *rodsObjStatOut = (rodsObjStat_t *) malloc (sizeof (rodsObjStat_t));
    memset (*rodsObjStatOut, 0, sizeof (rodsObjStat_t));
    specColl = &specCollCache->specColl;
    rstrcpy ((*rodsObjStatOut)->dataId, specCollCache->collId, NAME_LEN);
    rstrcpy ((*rodsObjStatOut)->ownerName, specCollCache->ownerName, NAME_LEN);
    rstrcpy ((*rodsObjStatOut)->ownerZone, specCollCache->ownerZone, NAME_LEN);

    status = specCollSubStat (rsComm, specColl, objPath, UNKNOWN_COLL_PERM,
      &dataObjInfo);

    if (status < 0) {
        if (dataObjInfo != NULL) {
            if (dataObjInfo->specColl != NULL) {
                (*rodsObjStatOut)->specColl = dataObjInfo->specColl;
            } else {
                replSpecColl (&specCollCache->specColl,
                  &(*rodsObjStatOut)->specColl);
            }
            if (specColl->collClass == LINKED_COLL) {
                rstrcpy ((*rodsObjStatOut)->specColl->objPath,
                  dataObjInfo->objPath, MAX_NAME_LEN);
            } else {
                (*rodsObjStatOut)->specColl->objPath[0] = '\0';
            }
            dataObjInfo->specColl = NULL;
        }
        (*rodsObjStatOut)->objType = UNKNOWN_OBJ_T;
        rstrcpy ((*rodsObjStatOut)->createTime, specCollCache->createTime,
          NAME_LEN);
        rstrcpy ((*rodsObjStatOut)->modifyTime, specCollCache->modifyTime,
          NAME_LEN);
        freeAllDataObjInfo (dataObjInfo);
        /* XXXXX 0 return is creating a problem for fuse */
        return (0);
    } else {
        (*rodsObjStatOut)->specColl = dataObjInfo->specColl;
        dataObjInfo->specColl = NULL;

        if (specColl->collClass == LINKED_COLL) {
            rstrcpy ((*rodsObjStatOut)->ownerName, dataObjInfo->dataOwnerName,
              NAME_LEN);
            rstrcpy ((*rodsObjStatOut)->ownerZone, dataObjInfo->dataOwnerZone,
              NAME_LEN);
            snprintf ((*rodsObjStatOut)->dataId, NAME_LEN, "%lld",
              dataObjInfo->dataId);
            /* save the linked path here */
            rstrcpy ((*rodsObjStatOut)->specColl->objPath,
              dataObjInfo->objPath, MAX_NAME_LEN);
        }
        (*rodsObjStatOut)->objType = (objType_t)status;
        (*rodsObjStatOut)->objSize = dataObjInfo->dataSize;
        rstrcpy ((*rodsObjStatOut)->createTime, dataObjInfo->dataCreate,
          NAME_LEN);
        rstrcpy ((*rodsObjStatOut)->modifyTime, dataObjInfo->dataModify,
          NAME_LEN);
        freeAllDataObjInfo (dataObjInfo);
    }

    return (status);
}

/* specCollSubStat - Given specColl and the object path (subPath),
 * returns a dataObjInfo and a value COLL_OBJ_T if the path 
 * is a collection or DATA_OBJ_T if the path is a data object or 
 * CAT_NO_ROWS_FOUND or -1 if the object does not exist.
 * 
 */

int
specCollSubStat (rsComm_t *rsComm, specColl_t *specColl,
char *subPath, specCollPerm_t specCollPerm, dataObjInfo_t **dataObjInfo)
{
    int status;
    int objType;
    rodsStat_t *rodsStat = NULL;
    dataObjInfo_t *myDataObjInfo = NULL;;

    if (dataObjInfo == NULL) return USER__NULL_INPUT_ERR;
    *dataObjInfo = NULL;
    if (specColl->collClass == MOUNTED_COLL) {

        /* a mount point */
        myDataObjInfo = *dataObjInfo =
          (dataObjInfo_t *) malloc (sizeof (dataObjInfo_t));
        memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));

        status = resolveResc (specColl->resource, &myDataObjInfo->rescInfo);
        if (status < 0) {
            rodsLog (LOG_ERROR,
              "specCollSubStat: resolveResc error for %s, status = %d",
              specColl->resource, status);
            freeDataObjInfo (myDataObjInfo);
            *dataObjInfo = NULL;
            return (status);
        }

        rstrcpy (myDataObjInfo->objPath, subPath, MAX_NAME_LEN);
        rstrcpy (myDataObjInfo->subPath, subPath, MAX_NAME_LEN);
        rstrcpy (myDataObjInfo->rescName, specColl->resource, NAME_LEN);
        rstrcpy (myDataObjInfo->dataType, "generic", NAME_LEN);

        status = getMountedSubPhyPath (specColl->collection,
          specColl->phyPath, subPath, myDataObjInfo->filePath);
        if (status < 0) {
            freeDataObjInfo (myDataObjInfo);
            *dataObjInfo = NULL;
            return (status);
        }
        replSpecColl (specColl, &myDataObjInfo->specColl);
    } else if (specColl->collClass == LINKED_COLL) {
        /* a link point */
        specCollCache_t *specCollCache = NULL;
        char newPath[MAX_NAME_LEN];
        specColl_t *curSpecColl;
        char *accessStr;
        dataObjInp_t myDataObjInp;
        rodsObjStat_t *rodsObjStatOut = NULL;

        *dataObjInfo = NULL;
        curSpecColl = specColl;

        status = getMountedSubPhyPath (curSpecColl->collection,
          curSpecColl->phyPath, subPath, newPath);
        if (status < 0) {
            return (status);
        }

        status = resolveLinkedPath (rsComm, newPath, &specCollCache, NULL);
        if (status < 0) return status;
        if (specCollCache != NULL &&
          specCollCache->specColl.collClass != LINKED_COLL) {
            status = specCollSubStat (rsComm, &specCollCache->specColl,
              newPath, specCollPerm, dataObjInfo);
            return status;
        }
        bzero (&myDataObjInp, sizeof (myDataObjInp));
        rstrcpy (myDataObjInp.objPath, newPath, MAX_NAME_LEN);

        status = collStat (rsComm, &myDataObjInp, &rodsObjStatOut);
        if (status >= 0) {      /* a collection */
            myDataObjInfo = *dataObjInfo =
              (dataObjInfo_t *) malloc (sizeof (dataObjInfo_t));
            memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
            replSpecColl (curSpecColl, &myDataObjInfo->specColl);
            rstrcpy (myDataObjInfo->objPath, newPath, MAX_NAME_LEN);
            myDataObjInfo->dataId = strtoll (rodsObjStatOut->dataId, 0, 0);
            rstrcpy (myDataObjInfo->dataOwnerName, rodsObjStatOut->ownerName,
              NAME_LEN);
            rstrcpy (myDataObjInfo->dataOwnerZone, rodsObjStatOut->ownerZone,
              NAME_LEN);
            rstrcpy (myDataObjInfo->dataCreate, rodsObjStatOut->createTime,
              TIME_LEN);
            rstrcpy (myDataObjInfo->dataModify, rodsObjStatOut->modifyTime,
              TIME_LEN);
            freeRodsObjStat (rodsObjStatOut);
            return COLL_OBJ_T;
        }

        /* data object */
        if (specCollPerm == READ_COLL_PERM) {
            accessStr = ACCESS_READ_OBJECT;
        } else if (specCollPerm == WRITE_COLL_PERM) {
            accessStr = ACCESS_DELETE_OBJECT;
        } else {
            accessStr = NULL;
        }

        status = getDataObjInfo (rsComm, &myDataObjInp, dataObjInfo,
          accessStr, 0);
        if (status < 0) {
            myDataObjInfo = *dataObjInfo =
              (dataObjInfo_t *) malloc (sizeof (dataObjInfo_t));
            memset (myDataObjInfo, 0, sizeof (dataObjInfo_t));
            replSpecColl (curSpecColl, &myDataObjInfo->specColl);
            rstrcpy (myDataObjInfo->objPath, newPath, MAX_NAME_LEN);
            rodsLog (LOG_DEBUG,
              "specCollSubStat: getDataObjInfo error for %s, status = %d",
              newPath, status);
            return (status);
        } else {
            replSpecColl (curSpecColl, &(*dataObjInfo)->specColl);
            return DATA_OBJ_T;
        }
    } else if (getStructFileType (specColl) >= 0) {
        /* bundle */
        dataObjInp_t myDataObjInp;
        dataObjInfo_t *tmpDataObjInfo;

        bzero (&myDataObjInp, sizeof (myDataObjInp));
        rstrcpy (myDataObjInp.objPath, specColl->objPath, MAX_NAME_LEN);
        status = getDataObjInfo (rsComm, &myDataObjInp, dataObjInfo, NULL, 1);
        if (status < 0) {
            rodsLog (LOG_ERROR,
              "specCollSubStat: getDataObjInfo error for %s, status = %d",
              myDataObjInp.objPath, status);
            *dataObjInfo = NULL;
            return (status);
        }

        /* screen out any stale copies */
        status = sortObjInfoForOpen (rsComm, dataObjInfo,
          &myDataObjInp.condInput, 0);
        if (status < 0) {
            rodsLog (LOG_ERROR,
              "specCollSubStat: sortObjInfoForOpen error for %s. status = %d",
              myDataObjInp.objPath, status);
            return status;
        }

        if (strlen (specColl->resource) > 0) {
            if (requeDataObjInfoByResc (dataObjInfo, specColl->resource,
              0, 1) >= 0) {
                if (strcmp (specColl->resource,
                  (*dataObjInfo)->rescName) != 0) {
                    rodsLog (LOG_ERROR,
                      "specCollSubStat: %s in %s does not match cache resc %s",
                      myDataObjInp.objPath, (*dataObjInfo)->rescName,
                      specColl->resource);
                    freeAllDataObjInfo (*dataObjInfo);
                    *dataObjInfo = NULL;
                    return (SYS_CACHE_STRUCT_FILE_RESC_ERR);
                }
            } else {
                rodsLog (LOG_ERROR,
                  "specCollSubStat: requeDataObjInfoByResc %s, resc %s error",
                  myDataObjInp.objPath, specColl->resource);
                freeAllDataObjInfo (*dataObjInfo);
                *dataObjInfo = NULL;
                return (SYS_CACHE_STRUCT_FILE_RESC_ERR);
            }
        }

        /* free all the other dataObjInfo */
        if ((*dataObjInfo)->next != NULL) {
            freeAllDataObjInfo ((*dataObjInfo)->next);
            (*dataObjInfo)->next = NULL;
        }

        /* fill in DataObjInfo */
        tmpDataObjInfo = *dataObjInfo;
        replSpecColl (specColl, &tmpDataObjInfo->specColl);
        rstrcpy (specColl->resource,
          tmpDataObjInfo->rescName, NAME_LEN);
        rstrcpy (specColl->phyPath,
          tmpDataObjInfo->filePath, MAX_NAME_LEN);
        rstrcpy (tmpDataObjInfo->subPath, subPath, MAX_NAME_LEN);
        specColl->replNum = tmpDataObjInfo->replNum;

        if (strcmp ((*dataObjInfo)->subPath, specColl->collection) == 0) {
            /* no need to go down */
            return (COLL_OBJ_T);
        }
    } else {
       rodsLog (LOG_ERROR,
          "specCollSubStat: Unknown specColl collClass = %d",
          specColl->collClass);
        return (SYS_UNKNOWN_SPEC_COLL_CLASS);
    }

    status = l3Stat (rsComm, *dataObjInfo, &rodsStat);
    if (status < 0) return status;

    if (rodsStat->st_ctim != 0) {
        snprintf ((*dataObjInfo)->dataCreate, NAME_LEN, "%d",
          rodsStat->st_ctim);
        snprintf ((*dataObjInfo)->dataModify, NAME_LEN, "%d",
          rodsStat->st_mtim);
    }

    if (rodsStat->st_mode & S_IFDIR) {
        objType = COLL_OBJ_T;
    } else {
        objType = DATA_OBJ_T;
        (*dataObjInfo)->dataSize = rodsStat->st_size;
    }
    free (rodsStat);

    return (objType);
}

/* resolvePathInSpecColl - given the object path in dataObjInp->objPath, see if
 * it is in the path of a special collection (mounted or structfile).
 * If it is not in a special collection, returns a -ive value.
 * Check permission if specCollPerm is not UNKNOWN_COLL_PERM.
 * If it is, returns a dataObjInfo struct with dataObjInfo->specColl != NULL.
 * Returns COLL_OBJ_T if the path is a collection or DATA_OBJ_T if the
 * path is a data object or SYS_SPEC_COLL_OBJ_NOT_EXIST if it is in
 * special coll but does not exist.
 * The only time inCachOnly is on is called from getDataObjInfoIncSpecColl to 
 * check local cache first before calling iCAT to resolve the path.
 */
int
resolvePathInSpecColl (rsComm_t *rsComm, char *objPath,
specCollPerm_t specCollPerm, int inCachOnly, dataObjInfo_t **dataObjInfo)
{
    specCollCache_t *specCollCache;
    specColl_t *cachedSpecColl;
    int status;
    char *accessStr;

    if (objPath == NULL) {
        return (SYS_INTERNAL_NULL_INPUT_ERR);
    }

    if ((status = getSpecCollCache (rsComm, objPath, inCachOnly,
      &specCollCache)) < 0) {
        return (status);
    } else {
        cachedSpecColl = &specCollCache->specColl;
    }

    if (specCollPerm != UNKNOWN_COLL_PERM) {
        if (specCollPerm == WRITE_COLL_PERM) {
            accessStr = ACCESS_DELETE_OBJECT;
        } else {
            accessStr = ACCESS_READ_OBJECT;
        }

        if (specCollCache->perm < specCollPerm) {
            status = checkCollAccessPerm (rsComm, cachedSpecColl->collection,
              accessStr);
            if (status < 0) {
                rodsLog (LOG_ERROR,
                 "resolvePathInSpecColl:checkCollAccessPerm err for %s,stat=%d",
                  cachedSpecColl->collection, status);
                return (status);
            } else {
                specCollCache->perm = specCollPerm;
            }
        }
    }

    status = specCollSubStat (rsComm, cachedSpecColl, objPath,
      specCollPerm, dataObjInfo);

    if (status < 0) {
        if (*dataObjInfo != NULL) {
           /* does not exist. return the dataObjInfo anyway */
            return (SYS_SPEC_COLL_OBJ_NOT_EXIST);
        }
        rodsLog (LOG_ERROR,
          "resolvePathInSpecColl: specCollSubStat error for %s, status = %d",
          objPath, status);
        return (status);
    } else {
        if (*dataObjInfo != NULL) {
            if (specCollPerm == WRITE_COLL_PERM)
                (*dataObjInfo)->writeFlag = 1;
        }
    }

    return (status);
}

int
resolveLinkedPath (rsComm_t *rsComm, char *objPath,
specCollCache_t **specCollCache, keyValPair_t *condInput)
{
    int linkCnt = 0;
    specColl_t *curSpecColl;
    char prevNewPath[MAX_NAME_LEN];
    specCollCache_t *oldSpecCollCache = NULL;
    int status;

        *specCollCache = NULL;

    if (getValByKey (condInput, TRANSLATED_PATH_KW) != NULL)
        return 0;

    addKeyVal (condInput, TRANSLATED_PATH_KW, "");
    while (getSpecCollCache (rsComm, objPath, 0,  specCollCache) >= 0 &&
      (*specCollCache)->specColl.collClass == LINKED_COLL) {
        oldSpecCollCache = *specCollCache;
        if (linkCnt++ >= MAX_LINK_CNT) {
            rodsLog (LOG_ERROR,
              "resolveLinkedPath: linkCnt for %s exceeds %d",
              objPath, MAX_LINK_CNT);
            return SYS_LINK_CNT_EXCEEDED_ERR;
        }

        curSpecColl = &(*specCollCache)->specColl;
        if (strcmp (curSpecColl->collection, objPath) == 0 &&
          getValByKey (condInput, NO_TRANSLATE_LINKPT_KW) != NULL) {
            /* when doing renaming, don't translate the link point */
            return 0;
        }
        rstrcpy (prevNewPath, objPath, MAX_NAME_LEN);
        status = getMountedSubPhyPath (curSpecColl->collection,
          curSpecColl->phyPath, prevNewPath, objPath);
        if (status < 0) {
            return (status);
        }
    }
    if (*specCollCache == NULL) *specCollCache = oldSpecCollCache;
    return linkCnt;
}