Return-Path: postman 
Delivery-Date: Thu, 14 Oct 93 19:20:41 PDT
Return-Path: postman
Received: by postgres.Berkeley.EDU (5.61/1.29)
	id AA14945; Thu, 14 Oct 93 19:09:34 -0700
Resent-From: postman (POSTGRES mailing list)
Resent-Message-Id: <9310150209.AA14945@postgres.Berkeley.EDU>
Sender: owner-postman@postgres.Berkeley.EDU
X-Return-Path: jean@gso.SAIC.COM
Received: from gefion.gso.saic.com by postgres.Berkeley.EDU (5.61/1.29)
	id AA14937; Thu, 14 Oct 93 19:09:28 -0700
Received: from ratatosk (ratatosk.gso.saic.com) by gso.SAIC.COM (4.1/SMI-4.1)
	id AA19451; Thu, 14 Oct 93 19:13:05 PDT
Received: by ratatosk (4.1/SMI-4.1)
	id AA02248; Thu, 14 Oct 93 19:13:00 PDT
Date: Thu, 14 Oct 93 19:13:00 PDT
From: jean@gso.SAIC.COM (Jean Anderson)
Message-Id: <9310150213.AA02248@ratatosk>
To: postgres@postgres.Berkeley.EDU
Subject: Re: Concurrency and locking
Resent-To: postgres-dist
Resent-Date: Thu, 14 Oct 93 19:09:33 PDT

Regarding:
> I was cleaning up an old version of a test program that passed an 8
> concurrent user test last Spring on my SUN 4/110.  It gets shared memory
> errors on my Sparc today :-( But if anybody wants to take a crack at it,
> I put it down below.

Well, an 8 user test ran fine on another SUN 4.  Since my Sparc has been 
eating mice lately (I'm on mouse #5), I guess I'm not terribly surprised.

I modified a few things, and put a new copy down below.

A couple notes:

  o I stripped the code down so it would be small--don't take it as an
    example of something to put into production (#include disclaimer).
  o The SUN I ran it on has 24 meg of memory and was configured with
    the EMOREIPCS parameters listed in the installation instructions.
  o You don't need the tqual fix Paul mentioned so long as you load
    unique id's.  Postgres version was the straight 4.1 (no mods).
  o Your mileage (# concurrent users) may vary.  On a small DEC 5000, 
    6 worked reliably, but 8 did not.

When I run it, I usually set up a bunch of windows and let them all run.
They should all end up with unique id's. In each window that retries,
you'll see something like this:

    [jean]greip% pg_get_key
          pg_get_key: Attempt 1.
    NOTICE:Oct 14 18:03:53:Timeout -- possible deadlock
    WARN:Oct 14 18:03:53:WaitOnLock: error on wakeup - Aborting this transaction
    pg_key_replace: R: 
      pg_key_replace: Replace failed.
          pg_get_key: deadlock encountered.  Sleeping before retry.
+-> NOTICE:Oct 14 18:03:53:UserAbortTransactionBlock and not inprogress state
|   NOTICE:Oct 14 18:03:53:AbortTransaction and not in in-progress state 
|   pg_sleep: sleeping 18 seconds
|         pg_get_key: Attempt 2.
|         pg_get_key: succeeded in 2 tries.
|   Back from pg_get_key, key for arid is 57.
|
+--> That "UserAbortTransactionBlock" error is because the code does an 'abort'
     after the deadlock is detected. I can't get it to work without the abort,
     so that's one quirk to live with.


Send any improvements my way and I'll be happy to incorporate and repost.

 -jean
  jean@gso.saic.com
-------------------------< snip, snip, snip >--------------------------------
/*
 * NAME
 *     pg_get_key -- exercise concurrency management.
 *
 * DESCRIPTION
 *     This exercises management of concurrent hits on the database with a
 *     sequencer ("unique key dispenser").
 *
 *     A blocked process should retry up to RETRIES times.
 *
 * CONFIGURATION
 *     This assumes existence of a 'keyid' class with keyname 'arid':
 *
 *     create keyid (keyname=char16, keyvalue=int4, lddate=abstime)
 *     append keyid (keyname="arid", keyvalue=1, lddate="now")
 *     define index keyid_keyname on keyid using btree (keyname char16_ops)
 *
 *     To keep sample code small, the following parameters are hardcoded 
 *     in main() and should be changed to your configuration:
 *
 *          database name  = "geodemo"
 *          tablename      = "keyid"
 *          keyname        = "arid"
 *
 *     If processes run out of retries, increase max_sleep_time in 
 *     pg_get_key().
 *
 * COMPILE
 *     cc -g -I$POSTGRESHOME/include -o pg_get_key pg_get_key.c \
 *         $POSTGRESHOME/lib/libpq.a -lm
 *
 * @(#)pg_get_key.c	1.2 10/14/93
 */

#include <stdio.h>
#include <sys/types.h>
#include <sys/file.h>

#include "tmp/libpq-fe.h"

extern char    PQerrormsg[];

extern int     getpid();
extern void    srand48();
extern long    lrand48();

int            pg_get_key();
int            pg_key_replace(); 
int            pg_key_retrieve();
void           pg_cleanup();
void           pg_sleep ();

#define SUCCESS          0
#define FAILURE          1
#define PG_LOCK_TIMEOUT  2
#define PG_DUP_VERSION   3

#define INVALID_KEY      0

#define BAD_VALUE       -1
#define DONE             1
#define NOT_DONE         0
#define RETRIES         10     /* Number of times to retry if timeout occurs */

int
main()
{
     char     c;
     int     key_value;          /* highest key id returned */
     int     errflag=0;     
     char    command_buf[256];

     /* ======== Open database ======== */

     PQsetdb("geodemo");     /* HARD CODED! */

     /* ======== Call pg_get_key ======== */

     if(pg_get_key("keyid", "arid", 1, &key_value) == FAILURE) 
     {    fprintf(stderr, "db_getkey failed\n");
          exit(-1);
     }
     else 
     {    fprintf(stderr, "Back from pg_get_key, ");
          fprintf(stderr, "key for %s is %d.\n", "arid", key_value);
     }

     /* ======== Close Connection ======== */
     PQfinish();
     exit(0);
}

int
pg_get_key (tname, cname, nkeys, kvalue)
char     *tname;
char     *cname;
int      nkeys;
int      *kvalue;
{
     char    *routine="pg_get_key";
     int     done, i, status;
     int     max_sleep_time=33;    /* Increase if processes exceed RETRIES */

     *kvalue = BAD_VALUE;          /* initialize to invalid value */

     for (i=1, done = NOT_DONE ; (i <= RETRIES) && (done == NOT_DONE) ; i++)
     {
          fprintf(stderr, "%16s: Attempt %d.\n", routine, i);

          /* ======== Begin Transaction ======== */
          (void) PQexec("begin");

          /* ======== Update keyvalue ======== */

          status = pg_key_replace (tname, cname, nkeys);
          if( status == PG_LOCK_TIMEOUT ) 
          {    fprintf(stderr, "%16s: deadlock encountered.  ", routine);
               fprintf(stderr, "Sleeping before retry.\n");

               /* Doing an abort generates the error
                * "UserAbortTransactionBlock and not inprogress state".
                * But if you don't abort, you get a fatal error.
                */
               (void) PQexec("abort"); 
               pg_sleep(max_sleep_time);
               continue;
          }
          else if ( status == FAILURE )
          {    (void) PQexec("abort");
               return(FAILURE);
          }

          /* ======== Retrieve keyvalue ======== */

          status = pg_key_retrieve (tname, cname, kvalue, i);
          if( status == PG_DUP_VERSION ) 
          {    fprintf(stderr, "%16s: Sleeping before retry.\n", routine);
               (void) PQexec("abort");
               pg_sleep(max_sleep_time);
               continue;
          }
          else if ( status == FAILURE ) 
          {    (void) PQexec("abort");
               return(FAILURE);
          }

          /* ======== Commit Change ======== */

          PQexec("end");
          done=DONE;
     }

     if( (done == NOT_DONE) || (*kvalue == BAD_VALUE) ) 
     {
          fprintf(stderr,
          "%s: tried to get '%s' key %d times, but ran out of retries.\n",
               routine, cname, i-1);
          return(FAILURE);
     }

     fprintf(stderr, "%16s: succeeded in %d tries.\n", routine, i-1);

     return (SUCCESS);
}

static
int
pg_key_replace (tname, cname, nkeys)
char    *tname;
char    *cname;
int     nkeys;
{
     char     *routine="pg_key_replace";
     char     DbQuery[200];
     char     *res;

     /* ---------------
      * Update keyvalue
      * ---------------
      */
     sprintf(DbQuery, 
          "replace %s (keyvalue=%s.keyvalue+%d) where %s.keyname=\"%s\"",
          tname, tname, nkeys, tname, cname);

     res = (char *) PQexec(DbQuery);     
     if ( pg_error(res, routine) != SUCCESS )
     {
          fprintf(stderr, "%16s: Replace failed.\n", routine);
          if (*res == 'R')     /* 'R'eally dubious assumption */
               return(PG_LOCK_TIMEOUT);
          else
               return(FAILURE);
     }

     return(SUCCESS);
}

static
int
pg_key_retrieve (tname, cname, kvalue, count)
char    *tname;
char    *cname;
int     *kvalue;
int     count;
{
     char         *routine="pg_key_retrieve";
     char         msg_buf [80];
     char         DbQuery[200];
     char         *res, *kname;
     int          ngroups, ntups, tupno, grpno;
     int          koid=0, dup_row=FALSE; /* multiple version check */
     PortalBuffer *portalbuf;
     int          status, len;

     sprintf(DbQuery,
     "retrieve portal key_port (%s.oid, %s.keyvalue, %s.keyname) where %s.keyname=\"%s\"",
          tname, tname, tname, tname, cname);

     res = (char *) PQexec(DbQuery);     
     if( pg_error(res, routine) != SUCCESS)
          return(FAILURE);

     res = (char *) PQexec("fetch all in key_port");
     if( pg_error(res, routine) != SUCCESS)
     {    pg_cleanup("key_port");
          return(FAILURE);
     }

     if (*res != 'P')
     {    fprintf(stderr, "pg_key_retrieve: no portal!");
          pg_cleanup("key_port");
          return(FAILURE);
     }

     portalbuf = PQparray(++res);
     ntups = PQntuples (portalbuf);

     if (ntups == 0 )
     {    fprintf(stderr, "%s key '%s' does not exist in class '%s'.",
               routine, cname, tname);
          pg_cleanup("key_port");
          return(FAILURE);
     }

     ngroups = PQngroups(portalbuf);

     dup_row=FALSE;
     for (grpno = 0; grpno < ngroups; grpno++)
     {
          ntups = PQntuplesGroup(portalbuf, grpno);
          for (tupno = 0; tupno < ntups; tupno++)
          {
               koid = atoi(PQgetvalue(portalbuf, tupno, 0));
               *kvalue = atoi(PQgetvalue(portalbuf, tupno, 1));
          }

               /* Retry if process sees multiple versions (fixed
                * in tqual.c,v 1.18).
                */
          if(ntups > 1)
          {
               dup_row=TRUE;
               fprintf(stderr,"%16s: found multiple %s versions.\n",
                    routine, cname);
          }
     }

     pg_cleanup("key_port");

     if(dup_row==TRUE)
          return(PG_DUP_VERSION);

     return(SUCCESS);
}


static
void
pg_cleanup(port)
char     *port;
{
     char DbQuery[30];

     sprintf(DbQuery, "close %s", port);

     PQclear(port);       /* Free memory */
     PQexec(DbQuery);     /* close portal */
     return;
}

int
pg_error (pg_buffer, errtext)
char    *pg_buffer;    /* (i) Postgres results buffer */
char    *errtext;      /* (i) custom error text       */
{
     char    *routine = "pg_error";
     int     status;
     char    type_code;

          /* Get the status type */
     type_code = pg_buffer[0];

     switch (type_code)
     {
          case 'A':     /* asynchronous handling? I'm not sure */
          case 'B':     /* ready for 'copy' output */
          case 'C':     /* query 'C'ompleted   (tcop/dest.c)*/
          case 'D':     /* ready for 'copy' input */
          case 'I':     /* the last query finished (tcop/dest.c)*/
          case 'P':     /* Results are in the Portal buffer */
             status = SUCCESS;
             break;
          case 'E':     /* An error occurred */
          case 'R':     /* Something 'R'eally bad happened */
          default:
             status = FAILURE;
             break;
     }

     if(status == FAILURE)
          fprintf(stderr, "%s: %c: %s\n", errtext, type_code, PQerrormsg);

     return (status);
}


void
pg_sleep(max_sleep)
int max_sleep;
{
     int          pid, sleep_time;
     static int   first = TRUE;

     if (max_sleep <= 0)
          return;

                /* Seed with the UNIX pid the first time this is called. */
     if (first == TRUE) 
     {    pid = getpid ();
          srand48 (pid);
          first = FALSE;
     }

     sleep_time = (lrand48 () % max_sleep);

     fprintf(stderr, "pg_sleep: sleeping %d seconds\n", sleep_time);

     sleep (sleep_time);
     return;
}
