/* ----------------------------------------------------------------
 *   FILE
 *	async.c
 *	
 *   DESCRIPTION
 *	Asynchronous notification
 *
 *   INTERFACE ROUTINES
 *      void Async_Notify(char *relname);
 *      void Async_Listen(char *relname,int pid);
 *      void Async_Unlisten(char *relname, int pid);
 *
 *   NOTES
 *	
 *   IDENTIFICATION
 *	$Header: /usr/local/dev/postgres/mastertree/src/commands/RCS/async.c,v 1.4 1992/08/26 21:08:55 mer Exp $
 * ----------------------------------------------------------------
 */
/*
 * Model is:
 * 1. Multiple backends on same machine.
 *
 * 2. Query on one backend sends stuff over an asynchronous portal by 
 *    appending to a relation, and then doing an async. notification
 *    (which takes place after commit) to all listeners on this relation.
 *
 * 3. Async. notification results in all backends listening on relation 
 *    to be woken up, by a process signal kill(2), with name of relation
 *    passed in shared memory.
 *
 * 4. Each backend notifies its respective frontend over the comm
 *    channel using the out-of-band channel.
 *
 * 5. Each frontend receives this notification and processes accordingly.
 *
 * #4,#5 are changing soon with pending rewrite of portal/protocol.
 */

#include <strings.h>
#include <signal.h>
#include <errno.h>
#include "tmp/postgres.h"

RcsId("$Header: /usr/local/dev/postgres/mastertree/src/commands/RCS/async.c,v 1.4 1992/08/26 21:08:55 mer Exp $");

/* ----------------
 *	FILE INCLUDE ORDER GUIDELINES
 *
 *	1) postgres.h
 *	2) various support files ("everything else")
 *	3) node files
 *	4) catalog/ files
 *	5) execdefs.h and execmisc.h, if necessary.
 *	6) extern files come last.
 * ----------------
 */
#include "access/attnum.h"
#include "access/ftup.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/relscan.h"
#include "access/skey.h"
#include "access/tqual.h"

#include "commands/copy.h"
#include "storage/buf.h"
#include "storage/itemptr.h"
#include "tmp/miscadmin.h"
#include "tmp/portal.h"
#include "utils/excid.h"
#include "utils/log.h"
#include "utils/mcxt.h"
#include "utils/palloc.h"
#include "utils/rel.h"

#include "nodes/pg_lisp.h"
#include "tcop/dest.h"
#include "commands/command.h"

#include "catalog/catname.h"
#include "catalog/syscache.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_relation.h"
#include "catalog/pg_type.h"
#include "catalog/pg_listener.h"

#include "executor/execdefs.h"
#include "executor/execdesc.h"

#include "tmp/simplelists.h"

typedef struct {
    char relname[16];
    SLNode Node;
} PendingNotify;

static SLList pendingNotifies;
static int initialized = 0;	/* statics in this module initialized? */

/*
 *--------------------------------------------------------------
 * Async_Notify --
 *
 *      Adds the relation to the list of pending notifies.
 *      All notification happens at end of commit.
 *      
 *
 * Results:
 *      XXX
 *
 * Side effects:
 *      All tuples for relname in pg_listener are updated.
 *
 *--------------------------------------------------------------
 */
void
Async_Notify(relname)
     char *relname;
{
    PendingNotify *p;
    HeapTuple lTuple;
    struct listener *lStruct;
    Relation lRel;
    HeapScanDesc sRel;
    TupleDescriptor tdesc;
    Buffer b;
    Datum d;
    int notifypending, isnull;

    if (! initialized) {
	SLNewList(&pendingNotifies,offsetof(PendingNotify,Node));
	initialized = 1;
    }

    elog(NOTICE,"Async_Notify: %s",relname);
    lRel = heap_openr("pg_listener");
    tdesc = RelationGetTupleDescriptor(lRel);
    sRel = heap_beginscan(lRel,0,NowTimeQual,0,(ScanKey)NULL);
    while (HeapTupleIsValid(lTuple = heap_getnext(sRel,0,&b))) {
	d = (Datum) heap_getattr(lTuple,b,Anum_pg_listener_relname,tdesc,
				 &isnull);
	if (! strcmp(relname,DatumGetPointer(d))) {
	    d = (Datum) heap_getattr(lTuple,b,Anum_pg_listener_notify,tdesc,
				     &isnull);
	    notifypending = (int)DatumGetPointer(d);
	    if (! notifypending) {
		ItemPointerData oldTID;
		oldTID = lTuple->t_ctid;
		((struct listener *)GETSTRUCT(lTuple))->notification = 1;
		heap_replace(lRel,&oldTID,lTuple);
	    }
	}
	ReleaseBuffer(b);
    }
    heap_endscan(sRel);
    heap_close(lRel);
}

#if 0
static int
AsyncExistsPendingNotify(relname)
     char *relname;
{
    for (p = (PendingNotify *)SLGetHead(&pendingNotifies); p != NULL;
	 p = (PendingNotify *)SLGetSucc(&p->Node)) {
	if (!strcmp(p->relname,relname)) {
	    return 1;
	}
    }
    return 0;
}
#endif

/*
 *--------------------------------------------------------------
 * Async_NotifyAtCommit --
 *
 *      Signal all backends listening on relations pending notification.
 *
 *      This corresponds to the 'notify <relation>' command in 
 *      postquel.
 *
 *   XXX: what if we signal ourselves?
 *
 * Results:
 *      XXX
 *
 * Side effects:
 *      XXX
 *
 *--------------------------------------------------------------
 */
void Async_NotifyAtCommit()
{
    char *relname;
    Relation r;
    TupleDescriptor tdesc;
    HeapScanDesc s;
    HeapTuple htup;
    Buffer b;
    Datum d;
    char *relnamei;
    int pid;
    int isnull;
    int notifypending;
    bool didNotify;
    static int reentrant;	/* hack:
				   This is a transaction itself, so we
				   don't want to loop at commit time
				   processing */
    /*
     * XXX Turn off notify for 4.0.1.  Late discovery of implementation flaws.
     */
    return;
#if 0
    if (reentrant)
      return;
    reentrant = 1;
    CommandCounterIncrement();

    if (! initialized) {
	SLNewList(&pendingNotifies,offsetof(PendingNotify,Node));
	initialized = 1;
    }

    r = heap_openr("pg_listener");
    tdesc = RelationGetTupleDescriptor(r);
    s = heap_beginscan(r,0,NowTimeQual,0,(ScanKey)NULL);

    htup = heap_getnext(s,0,&b);
    if (HeapTupleIsValid(htup)) {
	didNotify = true;
	StartTransactionCommand();
    }
    else
	didNotify = false;

    while (HeapTupleIsValid(htup)) {
	d = (Datum) heap_getattr(htup,b,Anum_pg_listener_notify,tdesc,
				 &isnull);
	notifypending = (int)DatumGetPointer(d);
	if (notifypending) {
	    d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
	    pid = (int) DatumGetPointer(d);
	    if (kill (pid,SIGUSR2) < 0) {
		extern int errno;
		if (errno == ESRCH) { /* no such process */
		    heap_delete(r,&htup->t_ctid);
		}
	    }
	}
	ReleaseBuffer(b);
	htup = heap_getnext(s,0,&b);
    }
    heap_endscan(s);
    heap_close(r);
    if (didNotify)
	CommitTransactionCommand();
    reentrant = 0;
#endif
}

#if 0
/*
 *--------------------------------------------------------------
 * Async_NotifyAtAbort --
 *
 *      Gets rid of pending notifies.  List elements are automatically
 *      freed through memory context.
 *      
 *
 * Results:
 *      XXX
 *
 * Side effects:
 *      XXX
 *
 *--------------------------------------------------------------
 */
void
Async_NotifyAtAbort()
{
    SLNewList(&pendingNotifies,offsetof(PendingNotify,Node));
    intialized = 1;
}
#endif

/*
 *--------------------------------------------------------------
 * Async_Listen --
 *
 *      Register a backend (identified by its Unix PID) as listening
 *      on the specified relation.  
 *
 *      This corresponds to the 'listen <relation>' command in 
 *      postquel.
 *
 *      One listener per relation, pg_listener relation is keyed
 *      on (relname,pid) to provide multiple listeners in future.
 *
 * Results:
 *      pg_listeners is updated.
 *
 * Side effects:
 *      XXX
 *
 *--------------------------------------------------------------
 */
void Async_Listen(relname, pid)
     char *relname;
     int pid;
{
    Datum values[Natts_pg_listener];
    char nulls[Natts_pg_listener];
    TupleDescriptor tdesc;
    HeapScanDesc s;
    HeapTuple htup,tup;
    Relation lDesc;
    Buffer b;
    Datum d;
    int i, isnull;
    int alreadyListener = 0;
    int ourPid = getpid();
    char *relnamei;

    elog(NOTICE,"Async_Listen: %s",relname);
    for (i = 0 ; i < Natts_pg_listener; i++) {
        nulls[i] = ' ';
        values[i] = NULL;
    }
    
    i = 0;
    values[i++] = (Datum) relname;
    values[i++] = (Datum) pid;
    values[i++] = (Datum) 0;	/* no notifies pending */

    lDesc = heap_openr(Name_pg_listener);

    /* is someone already listening.  One listener per relation */
    tdesc = RelationGetTupleDescriptor(lDesc);
    s = heap_beginscan(lDesc,0,NowTimeQual,0,(ScanKey)NULL);
    while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
	d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,
				 &isnull);
	relnamei = DatumGetPointer(d);
	if (!strcmp(relnamei,relname)) {
	    d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
	    pid = (int) DatumGetPointer(d);
	    if (pid != ourPid) {
		alreadyListener = 1;
		break;
	    }
	}
	ReleaseBuffer(b);
    }
    heap_endscan(s);

    tup = heap_formtuple(Natts_pg_listener,
			 &lDesc->rd_att,
			 values,
			 nulls);
    heap_insert(lDesc,tup,(double *)NULL);
    
    pfree((Pointer)tup);
    if (alreadyListener) {
	elog(NOTICE,"Async_Listen: already one listener on %s (possibly dead)",relname);
    }
    heap_close(lDesc);
   
}

/*
 *--------------------------------------------------------------
 * Async_Unlisten --
 *
 *      Remove the backend from the list of listening backends
 *      for the specified relation.
 *    
 *      This would correspond to the 'unlisten <relation>' 
 *      command in postquel, but there isn't one yet.
 *
 * Results:
 *      pg_listeners is updated.
 *
 * Side effects:
 *      XXX
 *
 *--------------------------------------------------------------
 */
void Async_Unlisten(relname,pid)
     char *relname;
     int pid;
{
    Relation lDesc;
    HeapTuple lTuple;
    lTuple = SearchSysCacheTuple(LISTENREL,relname,pid);
    lDesc = heap_openr(Name_pg_listener);
    if (lTuple != NULL) {
	heap_delete(lDesc,&lTuple->t_ctid);
    }
    heap_close(lDesc);
}

/*
 * --------------------------------------------------------------
 * Async_NotifyFrontEnd --
 *
 *      Perform an asynchronous notification to front end over
 *      portal comm channel.  The name of the relation which contains the
 *      data is sent to the front end.
 *
 *      We remove the notification flag from the pg_listener tuple
 *      associated with our process.
 *
 * Results:
 *      XXX
 *
 * Side effects:
 *      NB: This is the signal handler for SIGUSR2.  We could have been
 *      in the middle of dumping portal data.  
 *
 *      We make use of the out-of-band channel to transmit the
 *      notification to the front end.  The actual data transfer takes
 *      place at the front end's request.
 *
 * --------------------------------------------------------------
 */
GlobalMemory notifyContext = NULL;

void Async_NotifyFrontEnd()
{
    char *relname;
    extern whereToSendOutput;
    Relation r;
    HeapScanDesc s;
    TupleDescriptor tdesc;
    Datum d;
    Buffer b;
    HeapTuple htup;
    int isnull, notifypending, pid;
    char msg[1025];
    int ourpid = getpid();

    bzero(msg,sizeof(msg));
    
    if (whereToSendOutput != Remote) {
	elog(NOTICE,"Async_NotifyPortal: no asynchronous notifcation on interactive sessions");
	return;
    }

    /* Sorry, this code is mix-n-match styles of using getstruct and
     * heap_getattr.
     */
    StartTransactionCommand();
    {
    /* debugging */
	FILE *f;
	f = fopen("/dev/ttyp6","w");
	fprintf(f,"Got signal\n",msg);

    r = heap_openr("pg_listener");
    tdesc = RelationGetTupleDescriptor(r);
    s = heap_beginscan(r,0,NowTimeQual,0,(ScanKey)NULL);
    while (HeapTupleIsValid(htup = heap_getnext(s,0,&b))) {
	d = (Datum) heap_getattr(htup,b,Anum_pg_listener_notify,tdesc,
				 &isnull);
	notifypending = (int)DatumGetPointer(d);
	if (notifypending) {
	    d = (Datum) heap_getattr(htup,b,Anum_pg_listener_pid,tdesc,&isnull);
	    pid = (int) DatumGetPointer(d);
	    if (pid == ourpid) {
		ItemPointerData oldTID;
		d = (Datum) heap_getattr(htup,b,Anum_pg_listener_relname,tdesc,&isnull);
		relname = DatumGetPointer(d);
		oldTID = htup->t_ctid;
		/* unset notify flag */
		((struct listener *)GETSTRUCT(htup))->notification = 0;
		heap_replace(r,&oldTID,htup);

		/* notify front end of presence,
		   but not any more detail */
		sprintf(msg,"A%s %d",relname,pid);
		/* debugging */
		fprintf(f,"%s\n",msg);
		if (pq_sendoob("A",1)<0) {
		    extern int errno;
		    fprintf(f,"pq_sendoob failed: errno=%d",errno);
		}
		/* call backend PQ lib -- different memory context */
		{
		    MemoryContext orig;
		    if (notifyContext == NULL) {
			notifyContext = CreateGlobalMemory("notify");
		    }
		    orig = MemoryContextSwitchTo((MemoryContext)notifyContext);
		    PQappendNotify(relname,pid);
		    (void) MemoryContextSwitchTo(orig);
		}
	    }
	}
	ReleaseBuffer(b);
    }
	fclose(f);
    }
    heap_endscan(s);
    heap_close(r);
    CommitTransactionCommand();

}
