/* See the copyright notice (COPYRIGHT) in this directory. */

/* NAME
 *	pg_get_counter() - Gets unique key values
 *
 * SYNOPSIS
 *	int
 *	pg_get_counter (conn, tname, cname, nkeys, kvalue)
 *	dbConn	*conn;		-- (i) database connection
 *	char	*tname;		-- (i) name of table that dispenses keys
 *	char	*cname;		-- (i) name of key in table_name.keyname
 *	int	nkeys;		-- (i) Number of unique keys to assign
 *	int	*kvalue;	-- (o) Stores highest value assigned
 *	
 * DESCRIPTION
 *	Dispenses unique keys.
 *
 *	The class must already exist and have the following structure:
 *
 *	   create keyid (keyname=char16, keyvalue=int4, lddate=abstime)
 *
 *	It should be populated with the appropriate keynames; for example:
 *
 *	   append keyid (keyname="arid", keyvalue=1, lddate="now")
 *	   define index keyid_keyname on keyid using btree (keyname char16_ops)
 *
 * NOTES
 *	This should probably really use fast path options into Postgres
 *	internals to use the internal oid dispenser (apparently Picasso does
 *	this, so look at Picasso source code).
 *
 * AUTHOR
 * 	J. T. Anderson, SAIC Open Systems Division, jean@gso.saic.com
 */

#ifndef	lint
static char SccsId[] = "@(#)pg_get_counter.c	16.1 8/3/93 Copyright (c) 1992-1993 Science Applications International Corporation";
#endif

#include "gdi_postgres.h"

Proto (static dbStatus,   pg_qa_key, (dbConn *conn, char *object_name));
Proto (static dbStatus,   pg_key_replace, 
	(dbConn *conn, char *table_name, char *ctr_name, int n_keys));
Proto (static dbStatus,   pg_key_retrieve, 
	(dbConn *conn, char *table_name, char *ctr_name, int *key_value,
	int count));
Proto (static void,   pg_cleanup, (char *port));


#define INVALID_KEY 	0
#define QUERY_SIZE	(85 + (5 * PG_OBJ_NAME_SIZE))

#define BAD_VALUE	-1
#define DONE            1
#define NOT_DONE        0
#define KRETRIES	10	/* Number of times to retry if timeout occurs */

dbStatus
pg_get_counter (conn, tname, cname, nkeys, kvalue)
dbConn	*conn;
char	*tname;
char	*cname;
int	nkeys;
int	*kvalue;
{
	char	*routine="pg_get_counter";
	int	done, i, status;
	int	max_sleep_time=28; 	/* works for 8 concurrent procs */

	int		err_code;
	dbStatus	tmp_stat;
	dbErrLev	severity;
	char		err_string [GDI_ERROR_SIZE + 1];

	*kvalue = BAD_VALUE;		/* initialize to invalid value */

	/* -----------------
	 * QA incoming data.
	 * -----------------
	 */
	if( (conn == NULL) || (conn->vendor_conn == NULL) ) 
		return( gdi_error_app(conn, GDI_NOCONNECT,
			"pg_get_counter: not connected to database"));

	if( ( tname == NULL ) || ( strlen(tname) == 0) )
		return( gdi_error_app(conn, GDI_BADDATA,
		  "pg_get_counter: class name is a required input parameter."));
	
	if( ( cname == NULL ) || ( strlen(cname) == 0) ) 
		return( gdi_error_app(conn, GDI_BADDATA,
		  "pg_get_counter: key name is a required input parameter."));

	if ( nkeys < 0 )
		return( gdi_error_app(conn, GDI_BADDATA,
		  "pg_get_counter: number of keys to get must be >= 0."));

	if( pg_qa_key(conn, tname) != GDI_SUCCESS)
		return(GDI_FAILURE);

	for (i=1, done = NOT_DONE ; (i <= KRETRIES) && (done == NOT_DONE) ; i++)
	{
		if(GDI_ERROR_DEBUG(conn))
			fprintf(stderr, "%16s: Attempt %d.\n", routine, i);

		/* -----------------
		 * Begin Transaction
		 * -----------------
		 */
		if(pg_begin_tran ( conn, GDI_NOT_USED, "") != GDI_SUCCESS)
                        return(GDI_FAILURE);

		if (nkeys > 0)
		{
			/* ---------------
			 * Update keyvalue
			 * ---------------
			 */
			status = pg_key_replace (conn, tname, cname, nkeys);
			if( status == PG_LOCK_TIMEOUT ) 
			{
				if( GDI_ERROR_DEBUG(conn) ) {
					fprintf(stderr, 
					"%16s: Error encountered; deadlock?\n",
						routine);
					fprintf(stderr,
						"Sleeping before retry.\n");
				}

				(void) pg_rollback (conn,GDI_NOT_USED);
				gdi_sleep(conn, max_sleep_time);
				continue;
			}
			else if ( status == GDI_FAILURE )
			{
				/* Save the error that caused failure. */
				gdi_error_get (conn, &err_code, err_string,
					sizeof (err_string), &tmp_stat, &severity);

				(void) pg_rollback (conn,GDI_NOT_USED);

				/* Put the actual error back. */
				(void) gdi_error_app (conn,err_code,err_string);
				return(GDI_FAILURE);
			}
		}

		/* -----------------
		 * Retrieve keyvalue
		 * -----------------
		 */
		status = pg_key_retrieve (conn, tname, cname, kvalue, i);
		if( status == PG_DUP_VERSION ) 
		{
			if(GDI_ERROR_DEBUG(conn))
				fprintf(stderr, 
				"%16s: Sleeping before retry.\n", routine);

			(void) pg_rollback (conn,GDI_NOT_USED);
			gdi_sleep(conn, max_sleep_time);
			continue;
		}
		else if ( status == GDI_FAILURE ) 
		{
			/* Save the error that caused failure. */
			gdi_error_get (conn, &err_code, err_string,
				sizeof (err_string), &tmp_stat, &severity);

			(void) pg_rollback (conn,GDI_NOT_USED);

			/* Restore the error that caused failure. */
			(void) gdi_error_app (conn, err_code, err_string);
			return(GDI_FAILURE);
		}

		/* -----------------
		 * Commit Change
		 * -----------------
		 */
		if (pg_commit (conn, GDI_NOT_USED) != GDI_SUCCESS)
                        return(GDI_FAILURE);

		done=DONE;
	}

	if( (done == NOT_DONE) || (*kvalue == BAD_VALUE) ) {
		sprintf(err_string,
		"%s: tried to get '%s' key %d times, but ran out of retries.\n",
			routine, cname, i-1);
		(void) gdi_error_app(conn, GDI_FAILURE, err_string);
		return(GDI_FAILURE);
	}

	if(GDI_ERROR_DEBUG(conn))
		fprintf(stderr, "%16s: succeeded in %d tries.\n", routine, i-1);

	return (GDI_SUCCESS);
}

/* pg_qa_key()
 *
 * Checks to see if the class exists to avoid heap_openr.
 *
 * Private
 */
static
dbStatus
pg_qa_key(conn, object_name)
dbConn	*conn;
char	*object_name;
{
	char	*routine="pg_qa_key";
	char	msg_buf [GDI_ERROR_SIZE + 1];
	int	i;

	/* Does this class exist? */
	if ( (pg_get_relid(conn, object_name, &i)) == GDI_FAILURE)
		return(GDI_FAILURE);
	else if (i == -1)
	{
		sprintf(msg_buf, "%s: class '%s' does not exist",
			routine, object_name);
		(void) gdi_error_app (conn, GDI_NODATA, msg_buf);
		return(GDI_FAILURE);
	}

	return(GDI_SUCCESS);
}

static
dbStatus
pg_key_replace (conn, tname, cname, nkeys)
dbConn  *conn;
char    *tname;
char    *cname;
int     nkeys;
{
	char	*routine="pg_key_replace";
	char	DbQuery[QUERY_SIZE];
	char	*res;

	/* ---------------
	 * Update keyvalue
	 * ---------------
	 */
	sprintf(DbQuery, 
		"replace %s (keyvalue=%s.keyvalue+%d) where %s.keyname=\"%s\"",
		tname, tname, nkeys, tname, cname);

	if(GDI_ERROR_DEBUG(conn) == GDI_DEBUG_VERBOSE)
		fprintf(stderr, "%16s: Query is:\n%s\n", routine, DbQuery);

	res = (char *) PQexec(DbQuery);     
	if ( pg_error(conn, GDI_NOT_USED, res, routine) != GDI_SUCCESS )
	{
		if (*res == 'R')	/* 'R'eally lousy assumption */
			return(PG_LOCK_TIMEOUT);
		else
                	return(GDI_FAILURE);
	}

	if(GDI_ERROR_DEBUG(conn) == GDI_DEBUG_VERBOSE)
		fprintf(stderr, "%16s: Replace succeeded.\n", routine);

	return(GDI_SUCCESS);
}

static
dbStatus
pg_key_retrieve (conn, tname, cname, kvalue, count)
dbConn  *conn;
char    *tname;
char    *cname;
int	*kvalue;
int	count;
{
	char		*routine="pg_key_retrieve";
	char		msg_buf [GDI_ERROR_SIZE + 1];
	char		DbQuery[QUERY_SIZE];
	char		*res, *kname;
	int		ngroups, ntups, tupno, grpno;
	int		koid=0, dup_row=FALSE; /* multiple version check */
	PortalBuffer	*portalbuf;
	int		status, len;

	sprintf(DbQuery,
	"retrieve portal key_port (%s.oid, %s.keyvalue, %s.keyname) where %s.keyname=\"%s\"",
		tname, tname, tname, tname, cname);

	if(GDI_ERROR_DEBUG(conn) == GDI_DEBUG_VERBOSE)
		fprintf(stderr, "%16s: Query is:\n%s\n", routine, DbQuery);

	res = (char *) PQexec(DbQuery);     
	if( pg_error(conn, GDI_NOT_USED, res, routine) != GDI_SUCCESS)
		return(GDI_FAILURE);

	res = (char *) PQexec("fetch all in key_port");
	if( pg_error(conn, GDI_NOT_USED, res, routine) != GDI_SUCCESS)
	{
		pg_cleanup("key_port");
		return(GDI_FAILURE);
	}

	if (*res != 'P') {
		(void) gdi_error_app(conn, GDI_UNIXERR,
			"pg_key_retrieve: no portal!");
		pg_cleanup("key_port");
		return(GDI_FAILURE);
	}

	portalbuf = PQparray(++res);
	ntups = PQntuples (portalbuf);

	if (ntups == 0 )
	{
		sprintf(msg_buf, "%s key '%s' does not exist in class '%s'.",
			routine, cname, tname);
		(void) gdi_error_app(conn, GDI_BADDATA, msg_buf);
		pg_cleanup("key_port");
		return(GDI_FAILURE);
	}
	else if ( (ntups > 1) && (count == 1) )
	{
		sprintf(msg_buf, 
			"%s key '%s' in class '%s' has more than 1 entry (%d).",
			routine, cname, tname, ntups);
		(void) gdi_error_app(conn, GDI_BADDATA, msg_buf);
		pg_cleanup("key_port");
		return(GDI_FAILURE);
	}

	ngroups = PQngroups(portalbuf);

	dup_row=FALSE;
	for (grpno = 0; grpno < ngroups; grpno++)
	{
		ntups = PQntuplesGroup(portalbuf, grpno);
		for (tupno = 0; tupno < ntups; tupno++)
		{
			koid = atoi(PQgetvalue(portalbuf, tupno, 0));
			*kvalue = atoi(PQgetvalue(portalbuf, tupno, 1));

			if(GDI_ERROR_DEBUG(conn))
			{
			  status = pg_get_attr(portalbuf, tupno, 2, &kname, &len);
			  fprintf(stderr, 
			  "%16s: oid=%d, tupno=%d, keyname=%s, keyvalue=%d\n",
				  routine, koid, tupno, kname, *kvalue);
			  UFREE(kname);
			}
		}

			/* Retry if process sees multiple versions (bug fixed
			 * in ./RCS/tqual.c,v 1.18 1993/03/23).
			 */
		if(ntups > 1)
		{
			dup_row=TRUE;
			if(GDI_ERROR_DEBUG(conn))
			   fprintf(stderr,"%16s: found multiple %s versions.\n",
					routine, cname);
		}
	}

		
	pg_cleanup("key_port");

	if(dup_row==TRUE)
		return(PG_DUP_VERSION);

	if(GDI_ERROR_DEBUG(conn) == GDI_DEBUG_VERBOSE)
		fprintf(stderr, "%16s: Retrieve succeeded.\n", routine);

	return(GDI_SUCCESS);
}


static
void
pg_cleanup(port)
char	*port;
{
	char DbQuery[30];

	sprintf(DbQuery, "close %s", port);

	PQclear(port);		/* Free memory */
	PQexec(DbQuery);	/* close portal */
	return;
}
