head     1.2;
branch   ;
access   ;
symbols  ;
locks    mao:1.2; strict;
comment  @ * @;


1.2
date     91.12.10.05.12.20;  author mao;  state Exp;
branches ;
next     1.1;

1.1
date     91.12.10.02.55.52;  author mao;  state Exp;
branches ;
next     ;


desc
@mao's ftp program for postgres using the inversion file system
@


1.2
log
@cleanup, add error handling code in connect, manage stats
@
text
@/*    
 *     pftp -- postgres file transport protocol program.
 *
 *	WARNING:  The interface supported here is unstable.
 */

#include <stdio.h>
#include <sys/types.h>
#include <sys/file.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/errno.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netdb.h>
#include <ctype.h>

#ifndef sprite
#include <sys/signal.h>
#endif /* !sprite */

#include "tmp/c.h"
#include "tmp/oid.h"
#include "tmp/libpq-fe.h"

/* just in case someone left memory debugging turned on... */
#undef palloc
#undef pfree

RcsId("$Header: /users/mao/postgres/sample/RCS/pftp.c,v 1.1 1991/12/10 02:55:52 mao Exp mao $");

extern char     *getenv();
extern char	*get_attr();

extern char     *PQhost;     /* machine on which the backend is running */
extern char     *PQport;     /* comm. port with the postgres backend. */

#define STARTSOCK	1991
#define DEFHOST		"olympus.Berkeley.EDU"
#define FILEDB		"file_db"

typedef struct _ucmd_data {
    char *cmdname;
    char *cmdarg;
} ucmd_data;

int SocketNumber;
char QryBuf[512];

void parse();
int waitsock();
int opensock();
int pftp_readport();
int pftp_writeport();
void pgls();
void pgget();
void pgput();
char *get_attr();

main(argc,argv)
     int argc;
     char **argv;
{
    char *cmd;
    ucmd_data ucmd;
    char *lbuf[80];

    if (argc == 1)
	PQhost = DEFHOST;
    else if (argc == 2)
	PQhost = argv[1];
    else {
        fprintf(stderr, "usage: %s [host]\n", *argv);
        exit (1);
    }

    PQsetdb(FILEDB);

    fprintf(stdout, "> ");
    fflush(stdout);
    while ((cmd = gets(lbuf)) != (char *) NULL) {
	parse(cmd, &ucmd);
	if (ucmd.cmdname == (char *) NULL)
	    continue;
	if (strcmp(ucmd.cmdname, "ls") == 0)
	    pgls(ucmd.cmdarg);
	else if (strcmp(ucmd.cmdname, "put") == 0)
	    pgput(ucmd.cmdarg);
	else if (strcmp(ucmd.cmdname, "get") == 0)
	    pgget(ucmd.cmdarg);
	else if (strcmp(ucmd.cmdname, "quit") == 0)
	    break;
	else
	    fprintf(stderr, "command '%s' unknown\ntry: ls, put, get, or quit\n", ucmd.cmdname);

	fprintf(stdout, "> ");
	fflush(stdout);
    }

    PQfinish();
    exit (0);
}

void
parse(cmd, ucmd)
    char *cmd;
    ucmd_data *ucmd;
{
    ucmd->cmdname = ucmd->cmdarg = (char *) NULL;

    while (isspace(*cmd))
	cmd++;

    if (*cmd == '\0')
	return;

    ucmd->cmdname = cmd;

    while (!isspace(*cmd) && *cmd != '\0')
	cmd++;

    while (isspace(*cmd) && *cmd != '\0')
	*cmd++ = '\0';

    if (*cmd == '\0')
	return;

    ucmd->cmdarg = cmd;
}

void
pgls(fname)
    char *fname;
{
    PortalBuffer *portalbuf;
    char *res;
    int ngroups, ntups, grpno;
    int nflds;
    int tupno;

    if (fname == (char *) NULL)
	sprintf(QryBuf, "retrieve (mao_dirinfo.all)");
    else
	sprintf(QryBuf, "retrieve (mao_dirinfo.all) where mao_dirinfo.fname = \"%s\"::text");

    res = PQexec(QryBuf);

    if (*res == 'E') {
	fprintf(stderr, "%s\nls failed\n", ++res);
	return;
    }

    if (*res != 'P') {
	fprintf(stderr, "pgstats:  no portal?!?\n");
	exit (1);
    }

    /* count result tuples -- get portal first */
    portalbuf = PQparray(++res);
    ngroups = PQngroups(portalbuf);

    for (grpno = 0; grpno < ngroups; grpno++) {
	ntups = PQntuplesGroup(portalbuf, grpno);
	if ((nflds = PQnfieldsGroup(portalbuf, grpno)) != 3) {
	    fprintf(stderr, "pgls: expected 3 attributes, got %d\n", nflds);
	    return;
	}

	for (tupno = 0; tupno < ntups; tupno++) {
	    fprintf(stdout, "%s\t<%s>\t%s bytes\n", 
			    get_attr(portalbuf, tupno, 0),
			    get_attr(portalbuf, tupno, 1),
			    get_attr(portalbuf, tupno, 2));
	}
    }
}

void
pgput(fname)
    char *fname;
{
    PortalBuffer *portalbuf;
    char *res;
    int ngroups, ntups, grpno;
    int nflds;
    int tupno;
    int port;
    oid toid;
    int len;
    struct timezone tz;
    struct timeval tstart, tend;
    char hostname[64];

    gethostname(hostname, 64);
    if ((port = pftp_writeport(fname)) < 0)
	return;

    sprintf(QryBuf, "retrieve (x = pftp_write(\"%s\"::text, %d))",
		 hostname, port);

    gettimeofday(&tstart, &tz);
    res = PQexec(QryBuf);
    gettimeofday(&tend, &tz);

    if (*res == 'E') {
	fprintf(stderr, "%s\nput failed\n", ++res);
	return;
    }

    if (*res != 'P') {
	fprintf(stderr, "put: no portal?!?\n");
	return;
    }

    portalbuf = PQparray(++res);
    toid = atoi(get_attr(portalbuf, 0, 0));
    len = getsize(fname);
    sprintf(QryBuf, "append mao_dirinfo (fname = \"%s\"::text, foid = \"%d\"::oid, fsize = %d)", fname, toid, len);

    res = PQexec(QryBuf);

    if (*res == 'E') {
	fprintf(stderr, "%s\nput failed\n", ++res);
	return;
    }

    showtime(&tstart, &tend, len);
}

void
pgget(fname)
    char *fname;
{
    PortalBuffer *portalbuf;
    char *res;
    int ngroups, ntups, grpno;
    int nflds;
    int tupno;
    oid toid;
    int port;
    int len;
    struct timezone tz;
    struct timeval tstart, tend;
    char hostname[64];

    if (fname == (char *) NULL) {
	fprintf(stderr, "missing file name");
	return;
    }

    sprintf(QryBuf, "retrieve (mao_dirinfo.foid) where mao_dirinfo.fname = \"%s\"::text", fname);

    res = PQexec(QryBuf);

    if (*res == 'E') {
	fprintf(stderr, "%s\nget failed\n", ++res);
	return;
    }

    if (*res != 'P') {
	fprintf(stderr, "get:  no portal?!?\n");
	exit (1);
    }

    /* count result tuples -- get portal first */
    portalbuf = PQparray(++res);
    ngroups = PQngroups(portalbuf);

    for (grpno = 0; grpno < ngroups; grpno++) {
	ntups = PQntuplesGroup(portalbuf, grpno);
	if (ntups = 0) {
	    fprintf(stderr, "%s not found\n", fname);
	    return;
	}

	toid = atoi(get_attr(portalbuf, 0, 0));
    }

    gethostname(hostname, 64);
    if ((port = pftp_readport(fname)) < 0)
	return;

    sprintf(QryBuf, "retrieve (x = pftp_read(\"%s\"::text, %d, \"%d\"::oid))",
		 hostname, port, toid);

    gettimeofday(&tstart, &tz);
    res = PQexec(QryBuf);
    gettimeofday(&tend, &tz);

    if (*res == 'E') {
	fprintf(stderr, "%s\nput failed\n", ++res);
	return;
    }

    if (*res != 'P') {
	fprintf(stderr, "put: no portal?!?\n");
	return;
    }

    portalbuf = PQparray(++res);

    len = atoi(get_attr(portalbuf, 0, 0));
    showtime(&tstart, &tend, len);
}

#define FBUFSIZ	8092
char fbuf[FBUFSIZ];

int
pftp_writeport(fname)
    char *fname;
{
    int fd;
    int nbytes;
    int port;

    if ((fd = open(fname, O_RDONLY, 0666)) < 0) {
	perror(fname);
	return (-1);
    }

    if ((port = opensock()) < 0)
	return (-1);

    if (fork() != 0) {
	/* parent sends the query */
	close (fd);
	close (port);
	return (SocketNumber);
    } else {
	/* child sends the file */
	port = waitsock(port);
	while ((nbytes = read(fd, fbuf, FBUFSIZ)) > 0)
	    write(port, fbuf, nbytes);
	close(fd);
	close(port);
	exit (0);
    }
}

int
pftp_readport(fname)
    char *fname;
{
    int fd;
    int nbytes;
    int port;

    if ((fd = open(fname, O_WRONLY|O_CREAT, 0600)) < 0) {
	perror(fname);
	return (-1);
    }

    if ((port = opensock()) < 0)
	return (-1);

    if (fork() != 0) {
	/* parent sends the query */
	close (fd);
	close (port);
	return (SocketNumber);
    } else {
	/* child sends the file */
	port = waitsock(port);
	while ((nbytes = read(port, fbuf, FBUFSIZ)) > 0)
	    write(fd, fbuf, nbytes);
	close(fd);
	close(port);
	exit (0);
    }
}

int
opensock()
{
    int sock;
    int msgsock;
    struct sockaddr_in server;
    int i;

    /* get a socket on which to listen for connections */
    sock = socket(AF_INET, SOCK_STREAM, 0);
    server.sin_family = AF_INET;
    server.sin_addr.s_addr = INADDR_ANY;

    SocketNumber = STARTSOCK;

    for (;;) {
	server.sin_port = htons(SocketNumber);

	/* bind the socket's name */
	if (bind(sock, &server, sizeof(server)) != 0) {
	    if (errno == EADDRINUSE)
		SocketNumber++;
	    else {
		perror("binding stream socket");
		return (-1);
	    }
	} else
	    break;
    }

    return(sock);
}

int
waitsock(sock)
    int sock;
{
    int i;
    int msgsock;
    struct hostent *h;
    struct sockaddr_in from_client;

    listen(sock, 5);

    /* wait for a connection from front-end */
    i = sizeof(from_client);
    msgsock = accept(sock, &from_client, &i);

    /* figure out who's connected */
    h = gethostbyaddr((char *)&from_client.sin_addr,
		      sizeof (struct in_addr),
		      AF_INET);

    if (h == (struct hostent *) NULL) {
	    fprintf(stderr, "cannot get foreign host name\n");
	    exit (1);
    }

    /* we can use the fe's socket, so we close the original */
    close (sock);

    return (msgsock);
}

char *
get_attr(portalbuf, tupno, attno)
    PortalBuffer portalbuf;
    int tupno;
    int attno;
{
    char *attval;
    char *result;

    attval = PQgetvalue(portalbuf, tupno, attno);
    result = (char *) palloc(strlen(attval) + 1);
    strcpy(result, attval);

    return (result);
}

int
getsize(fname)
    char *fname;
{
    struct stat sbuf;

    if (stat(fname, &sbuf) < 0) {
	perror(fname);
	return (-1);
    } else
	return (sbuf.st_size);
}

showtime(ts, te, len)
    struct timeval *ts;
    struct timeval *te;
    int len;
{
    float etime;

    if (te->tv_usec < ts->tv_usec) {
	te->tv_sec--;
	te->tv_usec += 1000000;
    }
    etime = ((te->tv_sec - ts->tv_sec) * 1.0)
	    + ((te->tv_usec - ts->tv_usec) / 1000000.0);

    fprintf(stdout, "%d bytes transerred in %8.2f sec (%6.2f Kb/sec)\n",
		    len, etime, ((len / 1024.0)/ etime));
}
@


1.1
log
@Initial revision
@
text
@d12 2
d30 1
a30 1
RcsId("$Header$");
d38 3
a40 2
#define DEFHOST	"olympus.Berkeley.EDU"
#define FILEDB	"file_db"
d47 1
a186 1
    char hostname[64];
d189 4
d201 1
d203 1
d217 2
a218 1
    sprintf(QryBuf, "append1 mao_dirinfo (fname = \"%s\"::text, foid = \"%d\"::oid, fsize = %d)", fname, toid, getsize(fname));
d226 2
d240 4
a244 1
    int port;
d251 1
a251 1
    sprintf(QryBuf, "retrieve (mao_dirinfo.foid) where mao_dirinfo.fname = \"%s\"::text");
d256 1
a256 1
	fprintf(stderr, "%s\nls failed\n", ++res);
d261 1
a261 1
	fprintf(stderr, "pgstats:  no portal?!?\n");
d276 1
a276 9
	if ((nflds = PQnfieldsGroup(portalbuf, grpno)) != 1) {
	    fprintf(stderr, "pgls: expected 1 attributes, got %d\n", nflds);
	    return;
	}

	/* XXX only the last file of this name can be retrieved for now */
	for (tupno = 0; tupno < ntups; tupno++) {
	    toid = atoi(get_attr(portalbuf, tupno, 0));
	}
d286 1
d288 1
d301 3
a303 1
    printf("%s bytes transferred\n", get_attr(portalbuf, 0, 0));
d329 1
a329 1
	return (1991);
d361 1
a361 1
	return (1991);
d386 1
a386 1
    server.sin_port = htons(1991);
d388 13
a400 4
    /* bind the socket's name */
    if (bind(sock, &server, sizeof(server)) != 0) {
	perror("binding stream socket");
	return (-1);
d464 18
@
