/******************************************************************************/ /* */ /* ---- NIH NMR Software System ---- */ /* Copyright 1992 and 1993 */ /* Frank Delaglio */ /* NIH Laboratory of Chemical Physics */ /* */ /* This software is not for distribution without */ /* the written permission of the author. */ /* */ /******************************************************************************/ /***/ /* nmrSocket: utilities for nmrPipe parallel processing using socket streams. /***/ #ifndef LINT_CHECK #include #include #include #include #include #include #include #include #include "dataio.h" #include "memory.h" #include "token.h" #include "prec.h" #include "nmrserver.h" #include "nmrpipe.h" #include "nmrtime.h" #include "cmndargs.h" #define USEC_TIMEOUT 10000 #ifdef SOLARIS #define BCOPY bCopy #else #define BCOPY bcopy #endif /***/ /* nmrInitServers: initialize all nmrPipe server processes. /***/ int nmrInitServers( dataInfo, serverListName, pathName ) struct ProcDataInfo *dataInfo; char *serverListName, *pathName; { int i, lineCount, serverCount, buffCapacity, error; char **serverList, serverName[NAMELEN+1]; float serverSpeed, minSpeed; char **rdText(); void freeText(); /***/ /* I/O Mode must be via file descriptors and system calls: /***/ #ifndef FB_SYS_IO (void) fprintf( stderr, "NMRSocket Compile-Time Error: Bad I/O Mode.\n" ); return( 1 ); #endif /***/ /* Initialize. /* Read list of server names. /***/ TIMER_ON( connectTime ); error = 0; serverList = (char **) NULL; minSpeed = 0.0; serverCount = 0; if (dataInfo->inCapacity < 1) dataInfo->inCapacity = 1; if (dataInfo->outCapacity < 1) dataInfo->outCapacity = 1; if (!(serverList = rdText( serverListName, &lineCount, &error ))) { (void) fprintf( stderr, "NMRSocket Client Error reading server list %s\n", serverListName ); goto shutDown; } /***/ /* Scan list for minimum and total server speeds. /***/ for( i = 0; i < lineCount; i++ ) { if (serverList[i][0] != '#' && 2 == cntToken( serverList[i] )) { if (2 != sscanf( serverList[i], "%s %f", serverName, &serverSpeed )) { (void) fprintf( stderr, "NMRSocket Client Error in Server List.\n" ); error = 1; goto shutDown; } if (serverSpeed > 0.0) { dataInfo->server[serverCount].name = strDup( serverName ); dataInfo->server[serverCount].speed = serverSpeed; if (!dataInfo->server[serverCount].name) { (void) fprintf( stderr, "NMRSocket Client Name Memory Error.\n" ); error = 1; goto shutDown; } if (minSpeed == 0.0) minSpeed = serverSpeed; if (minSpeed > serverSpeed) minSpeed = serverSpeed; serverCount++; dataInfo->serverCount = serverCount; } } } /***/ /* Test for valid server info: /***/ if (!serverCount) { (void) fprintf( stderr, "NMRSocket Client Error: Null Server List.\n" ); error = 1; goto shutDown; } if (minSpeed == 0.0) { (void) fprintf( stderr, "NMRSocket Client Error in Server Speeds.\n" ); error = 1; goto shutDown; } /***/ /* Activate each server: /***/ for( i = 0; i < dataInfo->serverCount; i++ ) { buffCapacity = (float)dataInfo->outCapacity*dataInfo->server[i].speed/minSpeed; buffCapacity = buffCapacity < 1 ? 1 : buffCapacity; dataInfo->server[i].capacity = buffCapacity; error = nmrStartServer( dataInfo->argc, dataInfo->argv, pathName, dataInfo->server[i].name, dataInfo->server[i].capacity, &(dataInfo->server[i].sock), &(dataInfo->server[i].pid), i+1, dataInfo->debug ); if (error) { (void) fprintf( stderr, "NMRSocket Client Error starting server.\n" ); goto shutDown; } } /***/ /* Exit point: /***/ shutDown: (void) freeText( serverList ); TIMER_OFF( connectTime ); return( error ); } /***/ /* nmrCloseServers: close server sockets. /***/ int nmrCloseServers( dataInfo, waitFlag ) struct ProcDataInfo *dataInfo; int waitFlag; { char hostName[NAMELEN+1]; struct linger closeWait; int i; /***/ /* Server side: suppress wait on close if needed: /***/ (void) gethostname( hostName, NAMELEN ); if (dataInfo->serverMode == SMODE_SERVER && dataInfo->verbose) { (void) fprintf( stderr, "NMRSocket Server %s waiting to close connection.\n", hostName ); } if (dataInfo->serverSock > -1) { (void) dataClose( dataInfo->serverSock ); dataInfo->serverSock = -1; } if (dataInfo->serverMsgSock > -1) { if (!waitFlag) { closeWait.l_onoff = 0; closeWait.l_linger = 0; (void) setsockopt( dataInfo->serverMsgSock, SOL_SOCKET, SO_LINGER, &closeWait, sizeof( struct linger )); } (void) dataClose( dataInfo->serverMsgSock ); dataInfo->serverMsgSock = -1; if (dataInfo->verbose) { (void) fprintf( stderr, "NMRSocket Server %s closed connection.\n", hostName ); } } /***/ /* Clients must close sockets to server: /***/ for( i = 0; i < dataInfo->serverCount; i++ ) { if (dataInfo->server[i].sock > -1) { (void) dataClose( dataInfo->server[i].sock ); dataInfo->server[i].sock = -1; } } return( 0 ); } /***/ /* nmrKillServers: kill servers on remote machine, free server memory. /***/ int nmrKillServers( dataInfo ) struct ProcDataInfo *dataInfo; { char cmnd[NAMELEN+1]; int i; if (!dataInfo->serverCount) return( 0 ); for( i = 0; i < dataInfo->serverCount; i++ ) { if (dataInfo->server[i].name && dataInfo->server[i].pid > -1) { (void) sprintf( cmnd, "rsh -n %s \"kill -9 %d\" &", dataInfo->server[i].name, dataInfo->server[i].pid ); if (dataInfo->verbose > 2) { (void) fprintf( stderr, "NMRSocket Kill Command: %s\n", cmnd ); } (void) sysCmnd( cmnd ); (void) deAlloc( "sock", dataInfo->server[i].name, 1 + strlen(dataInfo->server[i].name) ); dataInfo->server[i].name = (char *) NULL; } } return( 0 ); } /***/ /* nmrStartServer: create and return a connection to an nmrPipe server process. /***/ int nmrStartServer( argc, argv, pathName, serverName, buffCapacity, sockPtr, pidPtr, serverID, verbose ) int argc, *sockPtr, *pidPtr, buffCapacity, serverID, verbose; char **argv, *pathName, *serverName; { char cmnd[NAMELEN+1], ctemp[NAMELEN+1], portName[NAMELEN+1]; int portID, i, error; /***/ /* Build a unique file name to extract server's socket ID. /* Build remote shell command to start server: /* Avoid "-in arg" "-out arg" "-client arg" and "-buff arg", etc, switches. /***/ if (error = uniqName( pathName, "pipe", portName )) { (void) fprintf( stderr, "NMRSocket Client Error finding unique port file.\n" ); return( error ); } (void) sprintf( cmnd, "rsh -n %s \"%s -server %s -sID %d -buff %d", serverName, argv[0], portName, serverID, buffCapacity ); for( i = 1; i < argc; i++ ) { if (!strcmp( argv[i], "-in" ) || /* No getNthArg needed. */ !strcmp( argv[i], "-out" ) || /* No getNthArg needed. */ !strcmp( argv[i], "-buff" ) || /* No getNthArg needed. */ !strcmp( argv[i], "-ibuff" ) || /* No getNthArg needed. */ !strcmp( argv[i], "-obuff" ) || /* No getNthArg needed. */ !strcmp( argv[i], "-client" )) /* No getNthArg needed. */ { i++; } else { (void) sprintf( ctemp, " %s", getNthArg( argc, argv, i )); (void) strcat( cmnd, ctemp ); } } (void) strcat( cmnd, "\" &" ); /***/ /* Start server. /* Read server port ID and process ID from response file. /* Complete the socket connection. /***/ if (verbose) { (void) fprintf( stderr, "NMRSocket Client starting server %s.\n", serverName ); if (verbose > 2) { (void) fprintf( stderr, " Command: %s\n", cmnd ); } } if (error = sysCmnd( cmnd )) { (void) perror( "NMRSocket Client Error starting shell" ); return( error ); } if (error = getPort( portName, &portID, pidPtr )) { (void) fprintf( stderr, "NMRSocket Client Error reading port file %s\n", portName ); return( error ); } if (verbose) { (void) fprintf( stderr, "NMRSocket Client assigned port %d.\n", portID ); } if (error = nmrConnect( serverName, portID, sockPtr )) { (void) fprintf( stderr, "NMRSocket Client Error connecting to server %s\n", serverName ); } if (verbose) { (void) fprintf( stderr, "NMRSocket Client connected on port %d.\n\n", portID ); } return( 0 ); } /***/ /* nmrConnect: used by client to open a socket to a running server. /***/ int nmrConnect( serverName, portID, sockPtr ) int portID, *sockPtr; char *serverName; { struct sockaddr_in server; struct hostent *hp; struct linger closeWait; void BCOPY(); /***/ /* Create a socket. /* Find info on server for socket via server's name. /* Connect to server via server's port ID. /***/ if (0 > (*sockPtr = socket( AF_INET, SOCK_STREAM, 0 ))) { (void) perror( "NMRSocket Client Error opening socket" ); return( 1 ); } server.sin_family = AF_INET; if (!(hp = gethostbyname( serverName ))) { (void) fprintf( stderr, "NMRSocket Client Error finding host %s\n", serverName ); return( 1 ); } (void) BCOPY( (VOID *)hp->h_addr, (VOID *)&server.sin_addr, (int) hp->h_length ); server.sin_port = htons( portID ); if (0 > connect( *sockPtr, (struct sockaddr *)&server, sizeof(server) )) { (void) perror( "NMRSocket Client Error connecting to socket" ); return( 1 ); } closeWait.l_onoff = 1; closeWait.l_linger = 0; (void) setsockopt( *sockPtr, SOL_SOCKET, SO_LINGER, &closeWait, sizeof( struct linger )); return( 0 ); } /***/ /* setServerIO: set client/server I/O parameters. /***/ int setServerIO( sock, outLen, inLen ) int outLen, inLen; { int i; i = 1; if (0 > setsockopt( sock, SOL_SOCKET, SO_KEEPALIVE, &i, sizeof( int ))) { (void) perror( "NMRSocket Client Warning setting SO_KEEPALIVE" ); } i = sizeof(float)*outLen; if (0 > setsockopt( sock, SOL_SOCKET, SO_SNDBUF, &i, sizeof( int ))) { (void) perror( "NMRSocket Client Warning setting SO_SNDBUF" ); } i = sizeof(float)*inLen; if (0 > setsockopt( sock, SOL_SOCKET, SO_RCVBUF, &i, sizeof( int ))) { (void) perror( "NMRSocket Client Warning setting SO_RCVBUF" ); } return( 0 ); } /***/ /* nmrAcceptClient: used by server to accept an nmrPipe client connection. /***/ int nmrAcceptClient( portName, sockPtr, msgSockPtr, verbose ) int *sockPtr, *msgSockPtr, verbose; char *portName; { int i, length, error; struct sockaddr_in server; char hostName[NAMELEN+1]; struct linger closeWait; *sockPtr = -1; *msgSockPtr = -1; /***/ /* I/O Mode must be via file descriptors and system calls: /***/ #ifndef FB_SYS_IO (void) fprintf( stderr, "NMRSocket Compile-Time Error: Bad I/O Mode.\n" ); return( 1 ); #endif /***/ /* Create a socket. /* Bind it to any available port. /* Inform client of port ID via the respone file. /***/ (void) gethostname( hostName, NAMELEN ); if (verbose) { (void) fprintf( stderr, "NMRSocket Server %s initializing.\n", hostName ); } if (0 > (*sockPtr = socket( AF_INET, SOCK_STREAM, 0 ))) { (void) perror( "NMRSocket Server Error opening socket" ); return( 1 ); } server.sin_family = AF_INET; server.sin_addr.s_addr = INADDR_ANY; server.sin_port = 0; if (0 > bind( *sockPtr, (struct sockaddr *)&server, sizeof(server) )) { (void) perror( "NMRSocket Server Error binding to socket" ); return( 1 ); } length = sizeof(server); if (0 > getsockname( *sockPtr, (struct sockaddr *)&server, &length )) { (void) perror( "NMRSocket Server Error getting socket name" ); return( 1 ); } if (putPort( portName, (int)ntohs( server.sin_port ) )) { (void) fprintf( stderr, "NMRSocket Server Error writing port number.\n" ); return( 1 ); } /***/ /* Listen for client connection request. /* Accept the client connection. /* Set connection to transmit final data before close. /***/ (void) listen( *sockPtr, 1 ); i = 1; do { *msgSockPtr = accept( *sockPtr, (struct sockaddr *)0, (int *)0 ); if (-1 == *msgSockPtr) { (void) uSleep( USEC_TIMEOUT ); } else { if (verbose) { (void) fprintf( stderr, "NMRSocket Server %s accepted connection.\n", hostName ); } closeWait.l_onoff = 1; closeWait.l_linger = 0; error = setsockopt( *sockPtr, SOL_SOCKET, SO_LINGER, &closeWait, sizeof( struct linger )); if (error) { (void) perror( "NMRSocket Server Warning setting SO_LINGER" ); } return( 0 ); } } while( i ); return( 0 ); } /***/ /* putPort: writes port ID and process ID to a file. /***/ int putPort( outName, portID ) char *outName; int portID; { char ctemp[NAMELEN+1]; FILE *outUnit; int pid; /***/ /* File "outName" contains port ID and process ID. /* File "outName." indicates that file 1 is ready. /***/ (void) strcpy( ctemp, outName ); pid = getpid(); if (!(outUnit = fopen( ctemp, "w" ))) { perror( "NMRSocket Server Error opening port file 1" ); return( 1 ); } if (EOF == fprintf( outUnit, "%d %d", portID, pid )) { perror( "NMRSocket Server Error writing port file 1" ); (void) fclose( outUnit ); return( 1 ); } (void) fclose( outUnit ); (void) sprintf( ctemp, "%s.", outName ); if (!(outUnit = fopen( ctemp, "w" ))) { perror( "NMRSocket Server Error opening port file 2" ); return( 1 ); } if (EOF == fprintf( outUnit, "READY" )) { perror( "NMRSocket Server Error writing port file 2" ); (void) fclose( outUnit ); return( 1 ); } (void) fclose( outUnit ); return( 0 ); } /***/ /* getPort: read port ID from text file, delete file afterwards. /***/ int getPort( inName, portID, pid ) char *inName; int *portID, *pid; { char ctemp[NAMELEN+1]; FILE *inUnit; /***/ /* Wait till response file "inName." exists. /* Open response file "inName". /* Read port and pid from response file "inName". /* Delete both files. /***/ (void) sprintf( ctemp, "%s.", inName ); while( !fileExists( ctemp )) { (void) uSleep( USEC_TIMEOUT ); } if (!(inUnit = fopen( inName, "r" ))) { (void) perror( "NMRSocket Client Error opening port file" ); return( -1 ); } if (2 != fscanf( inUnit, "%d %d", portID, pid )) { (void) fprintf( stderr, "NMRSocket Client Error reading port ID.\n" ); (void) fclose( inUnit ); return( 1 ); } (void) fclose( inUnit ); (void) delFile( ctemp ); (void) delFile( inName ); return( 0 ); } /***/ /* uniqName: return a unique file name; file will exist on return. /***/ int uniqName( pathName, rootName, fullName ) char *pathName, *rootName, *fullName; { int i, id; FILE *outUnit; id = getpid(); for( i = 1; i < 1000; i++ ) { (void) sprintf( fullName, "%s/%s%d.%03d", pathName, rootName, id, i ); if (!fileExists( fullName )) { if (!(outUnit = fopen( fullName, "w" ))) { return( 1 ); } if (EOF == fprintf( outUnit, "pid %d", id )) { (void) fclose( outUnit ); return( 1 ); } (void) fclose( outUnit ); return( 0 ); } } return( 1 ); } /***/ /* nullServer: set server structure contents to null state. /***/ int nullServer( server ) Server *server; { server->name = (char *) NULL; server->speed = 0.0; server->pid = -1; server->sock = -1; server->partition = 0; server->capacity = 0; return( 0 ); } /***/ /* sysCmnd: send a command to the operating system. /***/ int sysCmnd( cmnd ) char *cmnd; { int status; TIMER_ON( systemTime ); status = system( cmnd ); TIMER_OFF( systemTime ); return( status ); } #endif /***/ /* Bottom. /***/