Return-Path: postman 
Delivery-Date: Thu, 14 Oct 93 14:42:30 PDT
Return-Path: postman
Received: by postgres.Berkeley.EDU (5.61/1.29)
	id AA12348; Thu, 14 Oct 93 14:30:41 -0700
Resent-From: postman (POSTGRES mailing list)
Resent-Message-Id: <9310142130.AA12348@postgres.Berkeley.EDU>
Sender: owner-postman@postgres.Berkeley.EDU
X-Return-Path: jean@gso.SAIC.COM
Received: from gefion.gso.saic.com by postgres.Berkeley.EDU (5.61/1.29)
	id AA12340; Thu, 14 Oct 93 14:30:32 -0700
Received: from ratatosk (ratatosk.gso.saic.com) by gso.SAIC.COM (4.1/SMI-4.1)
	id AA14461; Thu, 14 Oct 93 14:34:08 PDT
Received: by ratatosk (4.1/SMI-4.1)
	id AA00944; Thu, 14 Oct 93 14:34:07 PDT
Date: Thu, 14 Oct 93 14:34:07 PDT
From: jean@gso.SAIC.COM (Jean Anderson)
Message-Id: <9310142134.AA00944@ratatosk>
To: postgres@postgres.Berkeley.EDU
Subject: Re: Concurrency and locking
Resent-To: postgres-dist
Resent-Date: Thu, 14 Oct 93 14:30:39 PDT

Paul Aoki (aoki@postgres.berkeley.edu) writes:
>                                    but without something like
> SELECT ... FOR UPDATE, this kind of application is likely to 
> exhibit similar behavior ....

The SELECT...FOR UPDATE oracle implemented behaves the same as the
sybase/postgres strategy of doing the write, then the read. The main 
difference is the oracle process blocks forever and ever amen unless 
you also slap a "nowait" flag on, which will make it time out with a 
lock wait error.

In other words, the strategy for managing concurrency is pretty much
the same for all these guys:

  o Detect deadlock or lock timeout
    This has admittedly been a problem because PQerrormsg hasn't always
    gotten set. But Paul Aoki has been doing a lot of work to clean up 
    errors (coming soon....). With PQerrormsg set, you can parse errors 
    correctly to discriminate between lock errors, heap_openr, etc.

  o Sleep a while
    How long depends on what you are doing.  You should sleep longer for 
    a 5 minute write than for a 2 second timestamp update.

  o Retry

Robert Withrow (witr@rwwa.COM) writes:
> What would help is if someone could post a test program stressing this
> area of postgres that we could all run, and post the results here...

I was cleaning up an old version of a test program that passed an 8 
concurrent user test last Spring on my SUN 4/110.  It gets shared memory
errors on my Sparc today :-( But if anybody wants to take a crack at it,
I put it down below. In the meantime, I'll try to figure out what the 
problem is on my Sparc.
 
-jean
 jean@gso.saic.com

-------------------------< snip, snip, snip >------------------------------
/*    
 * NAME
 *     pg_get_key -- exercise concurrency management.
 *
 * DESCRIPTION
 *     This exercises management of concurrent hits on the database with a
 *     sequencer ("unique key dispenser").
 *
 *     A blocked process should retry up to RETRIES times.
 *
 * CONFIGURATION
 *     This assumes existence of a 'keyid' class with keyname 'arid':
 *
 *     create keyid (keyname=char16, keyvalue=int4, lddate=abstime)
 *     append keyid (keyname="arid", keyvalue=1, lddate="now")
 *     define index keyid_keyname on keyid using btree (keyname char16_ops)
 *
 *     To keep sample code small, the following parameters are hardcoded 
 *     in main() and should be changed to your configuration:
 *
 *          database name  = "geodemo"
 *          tablename      = "keyid"
 *          keyname        = "arid"
 *
 * COMPILE
 *     cc -g -I$POSTGRESHOME/include -o pg_get_key pg_get_key.c \
 *         $POSTGRESHOME/lib/libpq.a -lm
 */

#include <stdio.h>
#include <sys/types.h>
#include <sys/file.h>

#include "tmp/libpq-fe.h"

extern char    PQerrormsg[];

extern int     getpid();
extern void    srand48();
extern long    lrand48();

int            pg_get_key();
int            pg_key_replace(); 
int            pg_key_retrieve();
void           pg_cleanup();
void           pg_sleep ();

#define SUCCESS          0
#define FAILURE          1
#define PG_LOCK_TIMEOUT  2
#define PG_DUP_VERSION   3

#define INVALID_KEY      0

#define BAD_VALUE       -1
#define DONE             1
#define NOT_DONE         0
#define RETRIES         10     /* Number of times to retry if timeout occurs */

int
main()
{
     char     c;
     int     key_value;          /* highest key id returned */
     int     errflag=0;     
     char    command_buf[256];

     /* ======== Open database ======== */

     PQsetdb("geodemo");     /* HARD CODED! */

     /* ======== Call pg_get_key ======== */

     if(pg_get_key("keyid", "arid", 1, &key_value) == FAILURE) 
     {    fprintf(stderr, "db_getkey failed\n");
          exit(-1);
     }
     else 
     {    fprintf(stderr, "Back from pg_get_key, ");
          fprintf(stderr, "key for %s is %d.\n", "arid", key_value);
     }

     /* ======== Close Connection ======== */
     PQfinish();
     exit(0);
}

int
pg_get_key (tname, cname, nkeys, kvalue)
char     *tname;
char     *cname;
int      nkeys;
int      *kvalue;
{
     char    *routine="pg_get_key";
     int     done, i, status;
     /* int     max_sleep_time=28;*/    /* works for 8 concurrent procs */
     int     max_sleep_time=60;    /* works for 8 concurrent procs */

     *kvalue = BAD_VALUE;          /* initialize to invalid value */

     for (i=1, done = NOT_DONE ; (i <= RETRIES) && (done == NOT_DONE) ; i++)
     {
          fprintf(stderr, "%16s: Attempt %d.\n", routine, i);

          /* ======== Begin Transaction ======== */
          (void) PQexec("begin");

          /* ======== Update keyvalue ======== */

          status = pg_key_replace (tname, cname, nkeys);
          if( status == PG_LOCK_TIMEOUT ) 
          {    fprintf(stderr, "%16s: deadlock encountered.  ", routine);
               fprintf(stderr, "Sleeping before retry.\n");

               (void) PQexec("abort");  /* probably don't need this */
               pg_sleep(max_sleep_time);
               continue;
          }
          else if ( status == FAILURE )
          {    (void) PQexec("abort");
               return(FAILURE);
          }

          /* ======== Retrieve keyvalue ======== */

          status = pg_key_retrieve (tname, cname, kvalue, i);
          if( status == PG_DUP_VERSION ) 
          {    fprintf(stderr, "%16s: Sleeping before retry.\n", routine);
               (void) PQexec("abort");
               pg_sleep(max_sleep_time);
               continue;
          }
          else if ( status == FAILURE ) 
          {    (void) PQexec("abort");
               return(FAILURE);
          }

          /* ======== Commit Change ======== */

          PQexec("end");
          done=DONE;
     }

     if( (done == NOT_DONE) || (*kvalue == BAD_VALUE) ) 
     {
          fprintf(stderr,
          "%s: tried to get '%s' key %d times, but ran out of retries.\n",
               routine, cname, i-1);
          return(FAILURE);
     }

     fprintf(stderr, "%16s: succeeded in %d tries.\n", routine, i-1);

     return (SUCCESS);
}

static
int
pg_key_replace (tname, cname, nkeys)
char    *tname;
char    *cname;
int     nkeys;
{
     char     *routine="pg_key_replace";
     char     DbQuery[200];
     char     *res;

     /* ---------------
      * Update keyvalue
      * ---------------
      */
     sprintf(DbQuery, 
          "replace %s (keyvalue=%s.keyvalue+%d) where %s.keyname=\"%s\"",
          tname, tname, nkeys, tname, cname);

     res = (char *) PQexec(DbQuery);     
     if ( pg_error(res, routine) != SUCCESS )
     {
          fprintf(stderr, "%16s: Replace failed.\n", routine);
          if (*res == 'R')     /* 'R'eally dubious assumption */
               return(PG_LOCK_TIMEOUT);
          else
               return(FAILURE);
     }

     return(SUCCESS);
}

static
int
pg_key_retrieve (tname, cname, kvalue, count)
char    *tname;
char    *cname;
int     *kvalue;
int     count;
{
     char         *routine="pg_key_retrieve";
     char         msg_buf [80];
     char         DbQuery[200];
     char         *res, *kname;
     int          ngroups, ntups, tupno, grpno;
     int          koid=0, dup_row=FALSE; /* multiple version check */
     PortalBuffer *portalbuf;
     int          status, len;

     sprintf(DbQuery,
     "retrieve portal key_port (%s.oid, %s.keyvalue, %s.keyname) where %s.keyname=\"%s\"",
          tname, tname, tname, tname, cname);

     res = (char *) PQexec(DbQuery);     
     if( pg_error(res, routine) != SUCCESS)
          return(FAILURE);

     res = (char *) PQexec("fetch all in key_port");
     if( pg_error(res, routine) != SUCCESS)
     {    pg_cleanup("key_port");
          return(FAILURE);
     }

     if (*res != 'P')
     {    fprintf(stderr, "pg_key_retrieve: no portal!");
          pg_cleanup("key_port");
          return(FAILURE);
     }

     portalbuf = PQparray(++res);
     ntups = PQntuples (portalbuf);

     if (ntups == 0 )
     {    fprintf(stderr, "%s key '%s' does not exist in class '%s'.",
               routine, cname, tname);
          pg_cleanup("key_port");
          return(FAILURE);
     }
     else if ( (ntups > 1) && (count == 1) )
     {    fprintf(stderr, 
               "%s key '%s' in class '%s' has more than 1 entry (%d).",
               routine, cname, tname, ntups);
          pg_cleanup("key_port");
          return(FAILURE);
     }

     ngroups = PQngroups(portalbuf);

     dup_row=FALSE;
     for (grpno = 0; grpno < ngroups; grpno++)
     {
          ntups = PQntuplesGroup(portalbuf, grpno);
          for (tupno = 0; tupno < ntups; tupno++)
          {
               koid = atoi(PQgetvalue(portalbuf, tupno, 0));
               *kvalue = atoi(PQgetvalue(portalbuf, tupno, 1));
          }

               /* Retry if process sees multiple versions (fixed
                * in tqual.c,v 1.18).
                */
          if(ntups > 1)
          {
               dup_row=TRUE;
               fprintf(stderr,"%16s: found multiple %s versions.\n",
                    routine, cname);
          }
     }

     pg_cleanup("key_port");

     if(dup_row==TRUE)
          return(PG_DUP_VERSION);

     return(SUCCESS);
}


static
void
pg_cleanup(port)
char     *port;
{
     char DbQuery[30];

     sprintf(DbQuery, "close %s", port);

     PQclear(port);       /* Free memory */
     PQexec(DbQuery);     /* close portal */
     return;
}

int
pg_error (pg_buffer, errtext)
char    *pg_buffer;    /* (i) Postgres results buffer */
char    *errtext;      /* (i) custom error text       */
{
     char    *routine = "pg_error";
     int     status;
     char    type_code;

          /* Get the status type */
     type_code = pg_buffer[0];

     switch (type_code)
     {
          case 'A':     /* asynchronous handling? I'm not sure */
          case 'B':     /* ready for 'copy' output */
          case 'C':     /* query 'C'ompleted   (tcop/dest.c)*/
          case 'D':     /* ready for 'copy' input */
          case 'I':     /* the last query finished (tcop/dest.c)*/
          case 'P':     /* Results are in the Portal buffer */
             status = SUCCESS;
             break;
          case 'E':     /* An error occurred */
          case 'R':     /* Something 'R'eally bad happened */
          default:
             status = FAILURE;
             break;
     }

     if(status == FAILURE)
          fprintf(stderr, "%s: %c: %s", errtext, type_code, PQerrormsg);

     return (status);
}


void
pg_sleep(max_sleep)
int max_sleep;
{
     int          pid, sleep_time;
     static int   first = TRUE;

     if (max_sleep <= 0)
          return;

                /* Seed with the UNIX pid the first time this is called. */
     if (first == TRUE) 
     {    pid = getpid ();
          srand48 (pid);
          first = FALSE;
     }

     sleep_time = (lrand48 () % max_sleep);

     fprintf(stderr, "gdi_sleep: sleeping %d seconds\n", sleep_time);

     sleep (sleep_time);
     return;
}
