/*-------------------------------------------------------------------------
 *
 * pgexec.c--
 *    functions related to sending a query down to the backend
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
 *    /usr/local/devel/pglite/cvs/src/libpgtcl/pgexec.c,v 1.2 1995/02/15 00:01:02 jolly Exp
 *
 *-------------------------------------------------------------------------
 */
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include "tcl.h"
#include "libpgtcl.h"
#include "pgtclCmds.h"

/* the tuples array in a PGresGroup  has to grow to accommodate the tuples */
/* returned.  Each time, we grow by this much: */
#define TUPARR_GROW_BY 100

static PGresult* makePGresult(PGconn *conn, char *pname);
static void addTuple(PGresult *res, PGresAttValue *tup);
static PGresAttValue* getTuple(PGconn *conn, PGresult *res, int binary);
static PGresult* makeEmptyPGresult(PGconn *conn, ExecStatusType status);

/*
 * freePGresult -
 *    free's the memory associated with a PGresult
 *
 */
void
freePGresult(PGresult* res)
{
    int i,j;

    /* free all the tuples */
    for (i=0;i<res->ntups;i++) {
	for (j=0;j<res->numAttributes;j++) {
	    if (res->tuples[i][j])
		ckfree(res->tuples[i][j]);
	}
	ckfree(res->tuples[i]);
    }
    ckfree(res->tuples);

    /* free all the attributes */
    for (i=0;i<res->numAttributes;i++) {
	if (res->attDescs[i].name) 
	    ckfree(res->attDescs[i].name);
    }
    ckfree(res->attDescs);
	
    /* free the structure itself */
    ckfree(res);
}

/*
 * PGresult -
 *   returns a newly allocated, initialized PGresult
 *
 */

static PGresult*
makeEmptyPGresult(PGconn *conn, ExecStatusType status)
{
  PGresult *result;

  result = (PGresult*)ckalloc(sizeof(PGresult));

  result->conn = conn;
  result->ntups = 0;
  result->numAttributes = 0;
  result->attDescs = NULL;
  result->tuples = NULL;
  result->tupArrSize = 0;
  result->resultStatus = status;
  return result;
}

/*
 * getTuple -
 *   get the next tuple from the stream
 *   and add it to the current group
 *
 */

static PGresAttValue*
getTuple(PGconn *conn, PGresult* result, int binary)
{
  char bitmap[MAX_FIELDS]; /* the backend sends us a bitmap of  */
                           /* which attributes are null */
  int bitmap_index = 0;
  int i;
  int nbytes;              /* the number of bytes in bitmap  */
  int len;
  char 	bmap;		   /*  One byte of the bitmap */
  int 	bitcnt = 0; 	   /* number of bits examined in current byte */
  int 	vlen;		   /* length of the current field value */
  FILE *Pfin = conn->Pfin;

  PGresAttValue* tup;

  int nfields = result->numAttributes;

  tup = (PGresAttValue*) ckalloc(nfields * sizeof(PGresAttValue));

  nbytes = nfields / BYTELEN;
  if ( (nfields % BYTELEN) > 0)
    nbytes++;
  
  if (pqGets(bitmap, nbytes, Pfin) == 1){
      sprintf(conn->errorMessage,
	      "Error reading null-values bitmap from tuple data stream\n");
      return NULL;
    }

  bmap = bitmap[bitmap_index];
  
  for (i=0;i<nfields;i++) {
    if (!(bmap & 0200)) {
	/* if the field value is absent, make it '\0' */
	/* XXX this makes it impossible to distinguish NULL 
	   attributes from "".  Is that OK?   */
	tup[i] = (PGresAttValue)malloc(1);
	tup[i][0] = '\0';
    }
    else {
      /* get the value length (the first four bytes are for length) */
      pqGetInt(&vlen, 4, Pfin);
      if (binary == 0)
	vlen -= 4;
      /* allocate storage for the value, add 1 for '\0'; */
      tup[i] = (PGresAttValue) malloc(vlen + 1);
      /* read in the value; */
      pqGets((char*)tup[i], vlen, Pfin);
      tup[i][vlen] = '\0';
    }
    /* get the appropriate bitmap */
    bitcnt++;
    if (bitcnt == BYTELEN) {
      bitmap_index++;
      bmap = bitmap[bitmap_index];
      bitcnt = 0;
    } else
      bmap <<= 1;
  }

  return tup;
}


/*
 * addTuple
 *    add a tuple to the PGresult structure, growing it if necessary
 *  to accommodate
 *
 */
static void 
addTuple(PGresult* res, PGresAttValue* tup)
{
  int i;
  PGresAttValue* *newTuples;
  int newsize;

    
  if (res->ntups == res->tupArrSize) { 
    /* grow the array */
    res->tupArrSize += TUPARR_GROW_BY;
    
    if (res->ntups == 0)
      res->tuples = (PGresAttValue**) 
	ckalloc(res->tupArrSize * sizeof(PGresAttValue*));
    else
    /* we can use realloc because shallow copying of the structure is okay */
      res->tuples = (PGresAttValue**) 
	ckrealloc(res->tuples, res->tupArrSize * sizeof(PGresAttValue*));
    }

  res->tuples[res->ntups] = tup;
  res->ntups++;
}

/*
 * PGresult
 *    fill out the PGresult structure with result tuples from the backend
 *  this is called after query has been successfully run and we have
 *  a portal name
 *
 *  ASSUMPTION: we assume only *1* tuple group is returned from the backend
 *
 *  the CALLER is reponsible for free'ing the new PGresult allocated here
 *
 */

static PGresult*
makePGresult(PGconn* conn, char* pname)
{
  PGresult* result;
  char buffer[MAX_MESSAGE_LEN];
  int id;
  int nfields;
  int i,j;
  int done = 0;

  PGresAttValue* newTup;

  FILE* Pfin = conn->Pfin;

  result = makeEmptyPGresult(conn, PGRES_TUPLES_OK);
  
  /* makePGresult() should only be called when the */
  /* id of the stream is 'T' to start with */

  /* the next two bytes are the number of fields  */
  if (pqGetInt(&nfields, 2, Pfin) == 1) {
    sprintf(conn->errorMessage,
	    "could not get the number of fields from the 'T' message\n");
    goto makePGresult_badResponse_return;
  }
  else
    result->numAttributes = nfields;

  /* allocate space for the attribute descriptors */
  if (nfields > 0) {
    result->attDescs = (PGresAttDesc*) ckalloc(nfields * sizeof(PGresAttDesc));
  }

  /* get type info */
  for (i=0;i<nfields;i++) {
    char typName[MAX_MESSAGE_LEN];
    int adtid;
    int adtsize;
    
    if ( pqGets(typName, MAX_MESSAGE_LEN, Pfin) ||
	pqGetInt(&adtid, 4, Pfin) ||
	pqGetInt(&adtsize, 2, Pfin)) {
      sprintf(conn->errorMessage,
	      "error reading type information from the 'T' message\n");
      goto makePGresult_badResponse_return;
    }
   result->attDescs[i].name = ckalloc(strlen(typName)+1);
   strcpy(result->attDescs[i].name,typName);
   result->attDescs[i].adtid = adtid;
   result->attDescs[i].adtsize = adtsize;
  }

  id = getc(Pfin);

  /* process the data stream until we're finished */
  while(!done) {
    switch (id) {
    case 'T': /* a new tuple group */
      sprintf(conn->errorMessage,
	      "makePGresult() -- is not equipped to handle multiple tuple groups.\n");
      goto makePGresult_badResponse_return;
    case 'B': /* a tuple in binary format */
    case 'D': /* a tuple in ASCII format */
      newTup = getTuple(conn, result, (id == 'B'));
      if (newTup == NULL) 
	goto makePGresult_badResponse_return;
      addTuple(result,newTup);
      break;
    case 'A': /* async portal, not supported */
      sprintf(conn->errorMessage, "Asynchronous portals not supported");
      result->resultStatus = PGRES_NONFATAL_ERROR;
      return result;
      break;
    case 'C': /* end of portal tuple stream */
      {
      char command[MAX_MESSAGE_LEN];
      pqGets(command,MAX_MESSAGE_LEN, Pfin); /* read the command tag */
      done = 1;
    }
      break;
    case 'E': /* errors */
      if (pqGets(conn->errorMessage, MAX_ERRMSG_LEN, Pfin) == 1) {
	sprintf(conn->errorMessage,
		"Error return detected from backend, but error message cannot be read");
      }
      result->resultStatus = PGRES_FATAL_ERROR;
      return result;
      break;
    case 'N': /* notices from the backend */
      if (pqGets(conn->errorMessage, MAX_ERRMSG_LEN, Pfin) == 1) {
	sprintf(conn->errorMessage,
	"Error return detected from backend, but error message cannot be read");
    }
    else
      /* XXXX send Notices to stderr for now */
      fprintf(stderr, "%s\n", conn->errorMessage);
      break;
    default: /* uh-oh
		this should never happen but frequently does when the 
		backend dumps core */
      sprintf(conn->errorMessage,"FATAL:  unexpected results from the backend, it probably dumped core.");
      fprintf(stderr, conn->errorMessage);
      result->resultStatus = PGRES_FATAL_ERROR;
      return result;
      break;
    }
    if (!done)
      id = getc(Pfin);
  } /* while (1) */

  result->resultStatus = PGRES_TUPLES_OK;
  return result;

makePGresult_badResponse_return:
  result->resultStatus = PGRES_BAD_RESPONSE;
  return result;

}



/*
 * Pgtcl_pgexec
 *    send a query to the backend and package up the result in a Pgresult
 *
 *  if the query failed, return NULL, conn->errorMessage is set to 
 * a relevant message
 *  if query is successful, a new PGresult is returned
 * the use is responsible for freeing that structure when done with it
 *
 */

PGresult*
Pgtcl_pgexec(PGconn* conn, char* query)
{
  PGresult *result;
  int id, clear;
  char buffer[MAX_MESSAGE_LEN];
  char pname[MAX_MESSAGE_LEN]; /* portal name */
  FILE *Pfin = conn->Pfin;
  FILE *Pfout = conn->Pfout;

  pname[0]='\0';

  /*clear the error string */
  conn->errorMessage[0] = '\0';

  /* check to see if the query string is too long */
  if (strlen(query) > MAX_MESSAGE_LEN) {
    sprintf(conn->errorMessage, "Pgtcl_pgexec() -- query is too long.  Maximum length is %d\n", MAX_MESSAGE_LEN -2 );
    return NULL;
  }

  /* the frontend-backend protocol uses 'Q' to designate queries */
  sprintf(buffer,"Q%s",query);

  /* send the query to the backend; */
  if (pqPuts(buffer,Pfout) == 1) {
      (void) sprintf(conn->errorMessage,
		     "Pgtcl_pgexec() -- while sending query:  %s\n-- fprintf to Pfout failed: errno=%d\n%s\n",
		     query, errno,strerror(errno));
      return NULL;
    }

  /* loop forever because multiple messages, especially NOTICES,
     can come back from the backend
     NOTICES are output directly to stderr
   */

  while (1) {

    /* read the result id */
    id = getc(Pfin);
    if (id == EOF) {
      /* hmm,  no response from the backend-end, that's bad */
      (void) sprintf(conn->errorMessage,
		     "Pgtcl_pgexec() -- No response from backend\n");
      return (PGresult*)NULL;
    }

    switch (id) {
    case 'A': 
      sprintf(conn->errorMessage,"Asynchronous portals are not implemented");
      return (PGresult*)NULL;
      break;
    case 'C': /* portal query command, no tuples returned */
      if (pqGets(buffer, MAX_MESSAGE_LEN, Pfin) == 1) {
	sprintf(conn->errorMessage,
		"Pgtcl_pgexec() -- query command completed, but return message from backend cannot be read");
	return (PGresult*)NULL;
      } 
      else {
	/*
	// since backend may produce more than one result for some commands
	// need to poll until clear 
	// send an empty query down, and keep reading out of the pipe
	// until an 'I' is received.
	*/
	clear = 0;

	pqPuts("Q ",Pfout); /* send an empty query */
	while (!clear)
	  {
	    if (pqGets(buffer,MAX_ERRMSG_LEN,Pfin) == 1)
	      clear = 1;
	    clear = (buffer[0] == 'I');
	  }
	result = makeEmptyPGresult(conn,PGRES_COMMAND_OK);
	return result;
      }
      break;
    case 'E': /* error return */
      if (pqGets(conn->errorMessage, MAX_ERRMSG_LEN, Pfin) == 1) {
	(void) sprintf(conn->errorMessage,
		       "Pgtcl_pgexec() -- error return detected from backend, but error message cannot be read");
      }
      return (PGresult*)NULL;
      break;
    case 'I': /* empty query */
      /* read the throw away the closing '\0' */
      {
	int c;
	if ((c = getc(Pfin)) != '\0') {
	  fprintf(stderr,"error!, unexpected character %c following 'I'\n", c);
	}
	result = makeEmptyPGresult(conn, PGRES_EMPTY_QUERY);
	return result;
      }
      break;
    case 'N': /* notices from the backend */
      if (pqGets(conn->errorMessage, MAX_ERRMSG_LEN, Pfin) == 1) {
	sprintf(conn->errorMessage,
		"Pgtcl_pgexec() -- error return detected from backend, but error message cannot be read");
	return (PGresult*)NULL;
      }
      else
	fprintf(stderr,"%s\n", conn->errorMessage);
      break;
    case 'P': /* synchronous (normal) portal */
      pqGets(pname,MAX_MESSAGE_LEN,Pfin);  /* read in the portal name*/
      break;
    case 'T': /* actual tuple results: */
      return makePGresult(conn, pname);
      break;
    default:
      sprintf(conn->errorMessage,
	      "unknown protocol character %c read from backend\n",
	      id);
      return (PGresult*)NULL;
    } /* switch */
} /* while (1)*/

}

