head	1.34;
access;
symbols
	release_4_2:1.34
	aix_ok:1.33
	Version_2_1:1.13;
locks; strict;
comment	@ * @;


1.34
date	94.02.02.00.22.27;	author marc;	state Exp;
branches;
next	1.33;

1.33
date	92.07.04.04.38.27;	author mer;	state Exp;
branches;
next	1.32;

1.32
date	92.06.28.03.48.22;	author mao;	state Exp;
branches;
next	1.31;

1.31
date	92.06.18.06.04.44;	author hong;	state Exp;
branches;
next	1.30;

1.30
date	92.06.16.21.29.18;	author hong;	state Exp;
branches;
next	1.29;

1.29
date	92.03.31.23.12.23;	author mer;	state Exp;
branches;
next	1.28;

1.28
date	92.02.07.10.50.35;	author hong;	state Exp;
branches;
next	1.27;

1.27
date	92.01.21.16.43.58;	author hong;	state Exp;
branches;
next	1.26;

1.26
date	91.11.17.21.06.36;	author mer;	state Exp;
branches;
next	1.25;

1.25
date	91.11.11.22.59.33;	author hong;	state Exp;
branches;
next	1.24;

1.24
date	91.11.09.09.51.00;	author hong;	state Exp;
branches;
next	1.23;

1.23
date	91.10.09.14.53.29;	author hong;	state Exp;
branches;
next	1.22;

1.22
date	91.10.08.01.13.48;	author hong;	state Exp;
branches;
next	1.21;

1.21
date	91.08.25.13.35.03;	author hong;	state Exp;
branches;
next	1.20;

1.20
date	91.08.18.12.33.44;	author hong;	state Exp;
branches;
next	1.19;

1.19
date	91.08.08.15.39.11;	author hong;	state Exp;
branches;
next	1.18;

1.18
date	91.08.02.19.00.46;	author hong;	state Exp;
branches;
next	1.17;

1.17
date	91.07.24.16.13.40;	author hong;	state Exp;
branches;
next	1.16;

1.16
date	91.07.17.23.45.03;	author hong;	state Exp;
branches;
next	1.15;

1.15
date	91.06.18.23.28.36;	author cimarron;	state Exp;
branches;
next	1.14;

1.14
date	91.04.18.11.38.42;	author hong;	state Exp;
branches;
next	1.13;

1.13
date	91.02.27.16.04.17;	author cimarron;	state Exp;
branches;
next	1.12;

1.12
date	91.02.27.14.32.03;	author hong;	state Exp;
branches;
next	1.11;

1.11
date	90.11.12.08.16.58;	author hong;	state Exp;
branches;
next	1.10;

1.10
date	90.11.10.12.29.23;	author hong;	state Exp;
branches;
next	1.9;

1.9
date	90.11.07.12.06.50;	author hong;	state Exp;
branches;
next	1.8;

1.8
date	90.10.02.14.38.43;	author hong;	state Exp;
branches;
next	1.7;

1.7
date	90.10.01.08.44.45;	author cimarron;	state Exp;
branches;
next	1.6;

1.6
date	90.09.18.22.03.44;	author hong;	state Exp;
branches;
next	1.5;

1.5
date	90.09.10.20.43.41;	author hong;	state Exp;
branches;
next	1.4;

1.4
date	90.09.06.16.57.50;	author hong;	state Exp;
branches;
next	1.3;

1.3
date	90.09.06.14.44.04;	author hong;	state Exp;
branches;
next	1.2;

1.2
date	90.08.22.11.14.54;	author hong;	state Exp;
branches;
next	1.1;

1.1
date	90.08.17.13.28.20;	author hong;	state Exp;
branches;
next	;


desc
@routines for parallel query processing
originally in pquery.c
@


1.34
log
@Fixes to make sure we don't expect NameData variables to be NULL terminated
and that we don't write mroe than NAMEDATALEN chars into them
@
text
@/* ----------------------------------------------------------------
 *   FILE
 *	pfrag.c
 *	
 *   DESCRIPTION
 *	POSTGRES process query command code
 *
 *   INTERFACE ROUTINES
 *	ProcessQuery
 *
 *   NOTES
 *
 *   IDENTIFICATION
 *	$Header: /usr/local/devel/postgres.test/src/backend/tcop/RCS/pfrag.c,v 1.33 1992/07/04 04:38:27 mer Exp marc $
 * ----------------------------------------------------------------
 */

#include <signal.h>
#include <math.h>
#include "tmp/postgres.h"
#include "tcop/tcopdebug.h"
#include "tcop/slaves.h"
#include "nodes/pg_lisp.h"
#include "nodes/plannodes.h"
#include "nodes/plannodes.a.h"
#include "nodes/execnodes.h"
#include "nodes/execnodes.a.h"
#include "executor/execmisc.h"
#include "executor/x_execinit.h"
#include "executor/x_hash.h"
#include "tcop/dest.h"
#include "tmp/portal.h"
#include "commands/command.h"
#include "parser/parsetree.h"
#include "storage/lmgr.h"
#include "catalog/pg_relation.h"
#include "utils/lsyscache.h"
#include "utils/log.h"
#include "utils/palloc.h"
#include "nodes/relation.h"
#include "lib/copyfuncs.h"
#include "lib/catalog.h"
#include "tcop/pquery.h"

 RcsId("$Header: /usr/local/devel/postgres.test/src/backend/tcop/RCS/pfrag.c,v 1.33 1992/07/04 04:38:27 mer Exp marc $");

int AdjustParallelismEnabled = 1;
extern int NStriping;
static MasterSchedulingInfoData MasterSchedulingInfoD = {-1, NULL, -1, NULL};
extern ParallelismModes ParallelismMode;

/* ------------------------------------
 *	FingFragments
 *
 *	find all the plan fragments under plan node, mark the fragments starting
 *	with fragmentNo
 *	plan fragments are obtained by decomposing the plan tree across all
 *	blocking edges, i.e., edges out of Hash nodes and Sort nodes
 * ------------------------------------
 */
static List
FindFragments(parsetree, node, fragmentNo)
List parsetree;
Plan node;
int fragmentNo;
{
    List subFragments = LispNil;
    List newFragments;
    Fragment fragment;

    set_fragment(node, fragmentNo);
    if (get_lefttree(node) != NULL) 
      if (ExecIsHash(get_lefttree(node))||ExecIsMergeJoin(get_lefttree(node))) {
	  /* -----------------------------
	   * detected a blocking edge, fragment boundary
	   * -----------------------------
	   */
          fragment = RMakeFragment();
          set_frag_root(fragment, (Plan)get_lefttree(node));
          set_frag_parent_op(fragment, node);
          set_frag_parallel(fragment, 1);
          set_frag_subtrees(fragment, LispNil);
	  set_frag_parsetree(fragment, parsetree);
	  set_frag_is_inprocess(fragment, false);
	  set_frag_iorate(fragment, 0.0);
          subFragments = nappend1(subFragments, (LispValue)fragment);
        }
       else {
          subFragments = FindFragments(parsetree,get_lefttree(node),fragmentNo);
        }
    if (get_righttree(node) != NULL)
       if (ExecIsHash(get_righttree(node)) || ExecIsSort(get_righttree(node))) {
	  /* -----------------------------
	   * detected a blocking edge, fragment boundary
	   * -----------------------------
	   */
          fragment = RMakeFragment();
          set_frag_root(fragment, (Plan)get_righttree(node));
          set_frag_parent_op(fragment, node);
          set_frag_parallel(fragment, 1);
          set_frag_subtrees(fragment, LispNil);
	  set_frag_parsetree(fragment, parsetree);
	  set_frag_is_inprocess(fragment, false);
	  set_frag_iorate(fragment, 0.0);
          subFragments = nappend1(subFragments, (LispValue)fragment);
         }
       else {
         newFragments = FindFragments(parsetree,get_righttree(node),fragmentNo);
         subFragments = nconc(subFragments, newFragments);
        }
    
    return subFragments;
}

#define SEQPERTUPTIME	2e-3 	/* second */
#define INDPERTUPTIME	0.1
#define DEFPERTUPTIME	4e-3

static float
get_pertuptime(plan)
Plan plan;
{
    switch (NodeType(plan)) {
    case classTag(SeqScan):
	return SEQPERTUPTIME;
    case classTag(IndexScan):
	return INDPERTUPTIME;
      }
    return DEFPERTUPTIME;
}

#define AVGINDTUPS	5

static float
compute_frag_iorate(fragment)
Fragment fragment;
{
    Plan plan;
    int fragmentno;
    float pertupletime;
    Plan p;
    float iorate;
    int tupsize;

    plan = get_frag_root(fragment);
    fragmentno = get_fragment(plan);
    pertupletime = 0.0;
    for (;;) {
	pertupletime += get_pertuptime(plan);
	p = get_outerPlan(plan); /* walk down the outer path only for now */
	if (p == NULL || get_fragment(p) != fragmentno)
	    break;
	else
	    plan = p;
      }
    if (ExecIsSeqScan(plan) || ExecIsScanTemps(plan)) {
	iorate = 1.0/(pertupletime * get_plan_tupperpage(plan));
      }
    else if (ExecIsIndexScan(plan)) {
	iorate = 1.0/(pertupletime * AVGINDTUPS);
      }
    return iorate;
}

/* -------------------------------
 *	SetIoRate
 *
 *	compute and set the io rate of each fragment
 * -------------------------------
 */
static void
SetIoRate(fragment)
Fragment fragment;
{
    LispValue x;
    Fragment f;

    set_frag_iorate(fragment, compute_frag_iorate(fragment));
    foreach (x, get_frag_subtrees(fragment)) {
	f = (Fragment)CAR(x);
	SetIoRate(f);
      }
}

/* --------------------------------
 *	InitialPlanFragments
 *
 *	calls FindFragments() recursively to obtain the initial set of
 *	plan fragments -- the largest possible, further decomposition
 *	might be necessary in DecomposeFragments().
 * --------------------------------
 */
Fragment
InitialPlanFragments(parsetree, plan)
Plan plan;
List parsetree;
{
    Plan node;
    LispValue x, y;
    List fragmentlist;
    List subFragments;
    List newFragmentList;
    int fragmentNo = 0;
    Fragment rootFragment;
    Fragment fragment, frag;

    rootFragment = RMakeFragment();
    set_frag_root(rootFragment, plan);
    set_frag_parent_op(rootFragment, NULL);
    set_frag_parallel(rootFragment, 1);
    set_frag_subtrees(rootFragment, LispNil);
    set_frag_parent_frag(rootFragment, NULL);
    set_frag_parsetree(rootFragment, parsetree);
    set_frag_is_inprocess(rootFragment, false);
    set_frag_iorate(rootFragment, 0.0);

    fragmentlist = lispCons((LispValue)rootFragment, LispNil);

    while (!lispNullp(fragmentlist)) {
	newFragmentList = LispNil;
	foreach (x, fragmentlist) {
	   fragment = (Fragment)CAR(x);
	   node = get_frag_root(fragment);
	   subFragments = FindFragments(parsetree, node, fragmentNo++);
	   set_frag_subtrees(fragment, subFragments);
	   foreach (y, subFragments) {
	      frag = (Fragment)CAR(y);
	      set_frag_parent_frag(frag, (FragmentPtr)fragment);
	     }
	   newFragmentList = nconc(newFragmentList, subFragments);
	  }
	fragmentlist = newFragmentList;
      }
    SetIoRate(rootFragment);
    return rootFragment;
}

extern int NBuffers;

/* -----------------------------
 *	GetCurrentMemSize
 *
 *	get the current amount of available memory
 * -----------------------------
 */
static int
GetCurrentMemSize()
{
   return NBuffers;  /* YYY functionalities to be added later */
}

/* -----------------------------
 *	GetCurrentLoadAverage
 *
 *	get the current load average of the system
 * -----------------------------
 */
static float
GetCurrentLoadAverage()
{
    return 0.0;  /* YYY functionalities to be added later */
}

/* ------------------------------
 *	GetReadyFragments
 *
 *	get the set of fragments that are ready to fire, i.e., those that
 *	have no children
 * ------------------------------
 */
static List
GetReadyFragments(fragments)
Fragment fragments;
{
    LispValue x;
    Fragment frag;
    List readyFragments = LispNil;
    List readyFrags;

    if (lispNullp(get_frag_subtrees(fragments)) && 
	!get_frag_is_inprocess(fragments))
       return lispCons((LispValue)fragments, LispNil);
    foreach (x, get_frag_subtrees(fragments)) {
	frag = (Fragment)CAR(x);
	readyFrags = GetReadyFragments(frag);
	readyFragments = nconc(readyFragments, readyFrags);
      }
    return readyFragments;
}

/* ---------------------------------
 *	GetFitFragments
 *
 *	get the set of fragments that can fit in the current available memory
 * ---------------------------------
 */
static List
GetFitFragments(fragmentlist, memsize)
List fragmentlist;
int memsize;
{
    return fragmentlist; /* YYY functionalities to be added later */
}

/* ----------------------------------
 *	DecomposeFragments
 *
 *	decompose fragments into smaller fragments to fit in memsize amount
 *	of memory
 * ----------------------------------
 */
static List
DecomposeFragments(fragmentlist, memsize)
List fragmentlist;
int memsize;
{
    return fragmentlist;  /* YYY functionalities to be added later */
}

/* -----------------------------------
 *	ChooseToFire
 *
 *	choose among all the ready-to-fire fragments which
 *	to execute in parallel
 * -----------------------------------
 */
static List
ChooseToFire(fragmentlist, memsize)
List fragmentlist;
int memsize;
{
    return lispCons(CAR(fragmentlist), LispNil);   
    /* YYY functionalities to be added later */
}

/* ----------------------------------
 *	ChooseFragments
 *
 *	choose the fragments to execute in parallel
 * -----------------------------------
 */
static List
ChooseFragments(fragments, memsize)
Fragment fragments;
int memsize;
{
    List readyFragments;
    List fitFragments;
    List fireFragments;

    readyFragments = GetReadyFragments(fragments);
    if (lispNullp(readyFragments)) return LispNil;
    fitFragments = GetFitFragments(readyFragments, memsize);
    if (lispNullp(fitFragments)) {
	fitFragments = DecomposeFragments(fitFragments, memsize);
	if (lispNullp(fitFragments))
	   elog(WARN, "memory below hashjoin threshold.");
      }
    fireFragments = ChooseToFire(fitFragments, memsize);
    return fireFragments;
}

/* ------------------------------
 *	SetParallelDegree
 *
 *	set the degree of parallelism for fragments in fragmentlist
 * ------------------------------
 */
static void
SetParallelDegree(fragmentlist, nfreeslaves)
List fragmentlist;
int nfreeslaves;
{
    LispValue x;
    Fragment fragment;
    Plan plan;
    int fragmentno;

    /* YYY more functionalities to be added later */
    foreach (x,fragmentlist) {
       fragment = (Fragment)CAR(x);
       plan = get_frag_root(fragment);
       fragmentno = get_fragment(plan);
       if (fragmentno < 0) {
           set_frag_parallel(fragment, 1);  /* YYY */
	  }
       else {
           set_frag_parallel(fragment, nfreeslaves);  /* YYY */
	 }
     }
}

/* ---------------------------------
 *	plan_is_parallelizable
 *
 *	returns true if plan is parallelizable, false otherwise
 * ---------------------------------
 */
static bool
plan_is_parallelizable(plan)
Plan plan;
{
    if (plan == NULL)
	return true;
    if (ExecIsMergeJoin(plan))
	return false;
    if (ExecIsSort(plan))
	return false;
    if (ExecIsIndexScan(plan)) {
	List indxqual;
	LispValue x;
	NameData opname;
	Oper op;
	indxqual = get_indxqual((IndexScan)plan);
	if (length(CAR(indxqual)) < 2)
	    return false;
	foreach (x, CAR(indxqual)) {
	    op = (Oper)CAR(CAR(x));
	    opname = get_opname(get_opno(op));
	    if (namestrcmp(&opname, "=") == 0)
		return false;
	 }
      }
    if (plan_is_parallelizable(get_outerPlan(plan)))
	return true;
    return false;
}

/* ----------------------------------------
 *	nappend1iobound
 *
 *	insert an io-bound fragment into a list in 
 *	descending io rate order.
 * ----------------------------------------
 */
static List
nappend1iobound(ioboundlist, frag)
List ioboundlist;
Fragment frag;
{
    LispValue x;
    Fragment f;

    if (lispNullp(ioboundlist))
	return lispCons((LispValue)frag, LispNil);
    f = (Fragment)CAR(ioboundlist);
    if (get_frag_iorate(frag) > get_frag_iorate(f)) {
	return(nconc(lispCons((LispValue)frag, LispNil), ioboundlist));
      }
    else {
	return(nconc(lispCons((LispValue)f, LispNil), 
		     nappend1iobound(CDR(ioboundlist), frag)));
      }
}

/* ----------------------------------------
 *	nappend1cpubound
 *
 *	insert a cpu-bound fragment into a list in 
 *	ascending io rate order.
 * ----------------------------------------
 */
static List
nappend1cpubound(cpuboundlist, frag)
List cpuboundlist;
Fragment frag;
{
    LispValue x;
    Fragment f;

    if (lispNullp(cpuboundlist))
	return lispCons((LispValue)frag, LispNil);
    f = (Fragment)CAR(cpuboundlist);
    if (get_frag_iorate(frag) < get_frag_iorate(f)) {
	return(nconc(lispCons((LispValue)frag, LispNil), cpuboundlist));
      }
    else {
	return(nconc(lispCons((LispValue)f, LispNil), 
		     nappend1cpubound(CDR(cpuboundlist), frag)));
      }
}

#define DISKBANDWIDTH	60 	/* IO per second */

/* -------------------------------------
 *	ClassifyFragments
 *
 *	classify fragments into io-bound, cpu-bound, unparallelizable or
 *	parallelism-preset.
 * -------------------------------------
 */
static void
ClassifyFragments(fraglist, ioboundlist, cpuboundlist, unparallelizablelist, presetlist)
List fraglist;
List *ioboundlist, *cpuboundlist, *unparallelizablelist, *presetlist;
{
    LispValue x;
    Fragment f;
    Plan p;
    float iorate;
    float diagonal;

    *ioboundlist = LispNil;
    *cpuboundlist = LispNil;
    *unparallelizablelist = LispNil;
    *presetlist = LispNil;
    diagonal = (float)NStriping * DISKBANDWIDTH/(float)GetNumberSlaveBackends();
    foreach (x, fraglist) {
	f = (Fragment)CAR(x);
	p = get_frag_root(f);
	if (!plan_is_parallelizable(p)) {
	    *unparallelizablelist = nappend1(*unparallelizablelist, 
					     (LispValue)f);
	  }
	else if (!lispNullp(parse_parallel(get_frag_parsetree(f)))) {
	    *presetlist = nappend1(*presetlist, (LispValue)f);
	  }
	else {
	    iorate = get_frag_iorate(f);
	    if (iorate > diagonal) {
	        *ioboundlist = nappend1iobound(*ioboundlist, f);
	      }
	    else {
		*cpuboundlist = nappend1cpubound(*cpuboundlist, f);
	      }
	  }
      }
}

/* ---------------------------------
 *	ComputeIoCpuBalancePoint
 *
 *	compute io/cpu balance point of two fragments:
 *	f1, io-bound
 *	f2, cpu-bound
 * ---------------------------------
 */
static void
ComputeIoCpuBalancePoint(f1, f2, x1, x2)
Fragment f1, f2;
int *x1, *x2;
{
    float bandwidth;
    float iorate1, iorate2;
    int nfreeslaves;

    nfreeslaves = GetNumberSlaveBackends();
    bandwidth = NStriping * DISKBANDWIDTH;
    iorate1 = get_frag_iorate(f1);
    iorate2 = get_frag_iorate(f2);
    *x1=(int)floor((double)((bandwidth-iorate2*nfreeslaves)/(iorate1-iorate2)));
    *x2=(int)ceil((double)((iorate1*nfreeslaves-bandwidth)/(iorate1-iorate2)));
}

/* --------------------------------------
 *	MaxFragParallelism
 *
 *	return the maximum parallelism for a fragment
 *	within the limit of number of free processors and disk bandwidth
 * -------------------------------------
 */
static int
MaxFragParallelism(frag)
Fragment frag;
{
    float ioRate;
    int par;

    if (!plan_is_parallelizable(get_frag_root(frag)))
	return 1;
    if (!lispNullp(parse_parallel(get_frag_parsetree(frag))))
	return CInteger(parse_parallel(get_frag_parsetree(frag)));
    ioRate = (float)get_frag_iorate(frag);
    par = MIN((int)floor((double)NStriping*DISKBANDWIDTH/ioRate),
	      NumberOfFreeSlaves);
    return par;
}

/* --------------------------------------
 *	CurMaxFragParallelism
 *
 *	return the maximum parallelism for a fragment
 *	within the limit of current number of free processors and 
 *	available disk bandwidth
 * -------------------------------------
 */
static int
CurMaxFragParallelism(frag, curbandwidth, nfreeslaves)
Fragment frag;
float curbandwidth;
int nfreeslaves;
{
    float ioRate;
    int par;

    if (!plan_is_parallelizable(get_frag_root(frag)))
	return 1;
    if (!lispNullp(parse_parallel(get_frag_parsetree(frag))))
	return CInteger(parse_parallel(get_frag_parsetree(frag)));
    ioRate = (float)get_frag_iorate(frag);
    par = MIN((int)floor((double)curbandwidth/ioRate),
	      nfreeslaves);
    return par;
}

/* ------------------------
 *	AdjustParallelism
 *
 *	dynamically adjust degrees of parallelism of the fragments that
 *	are already in process to take advantage of the extra processors
 * ------------------------
 */
static void
AdjustParallelism(pardelta, groupid)
int pardelta;
int groupid;
{
    int j;
    int slave;
    int max_curpage;
    int size;
    int oldnproc;

    Assert(pardelta != 0);
    SLAVE_elog(DEBUG, "master trying to adjust degrees of parallelism");
    SLAVE1_elog(DEBUG, "master sending signal to process group %d", groupid);
    signalProcGroup(groupid, SIGPARADJ);
    max_curpage = getProcGroupMaxPage(groupid);
    SLAVE1_elog(DEBUG, "master gets maxpage = %d", max_curpage);
    oldnproc = ProcGroupInfoP[groupid].nprocess;
    if (max_curpage == NOPARADJ) {
	/* --------------------------
	 *  forget about adjustment to parallelism
	 *  in this case -- the fragment is almost finished
	 * ---------------------------
	 */
	SLAVE_elog(DEBUG, "master changes mind on adjusting parallelism");
	ProcGroupInfoP[groupid].paradjpage = NOPARADJ;
        OneSignalM(&(ProcGroupInfoP[groupid].m1lock), oldnproc);
	return;
      }
    ProcGroupInfoP[groupid].paradjpage = max_curpage + 1; 
				   /* page on which to adjust par. */
    if (pardelta > 0) {
        ProcGroupInfoP[groupid].nprocess += pardelta;
        ProcGroupInfoP[groupid].scounter.count = 
					      ProcGroupInfoP[groupid].nprocess;
        ProcGroupInfoP[groupid].newparallel = ProcGroupInfoP[groupid].nprocess;
        SLAVE2_elog(DEBUG,
		    "master signals waiting slaves with adjpage=%d,newpar=%d",
	            ProcGroupInfoP[groupid].paradjpage,
		    ProcGroupInfoP[groupid].newparallel);
        OneSignalM(&(ProcGroupInfoP[groupid].m1lock), oldnproc);
        set_frag_parallel(ProcGroupLocalInfoP[groupid].fragment,
		      get_frag_parallel(ProcGroupLocalInfoP[groupid].fragment)+
		      pardelta);
        ProcGroupSMBeginAlloc(groupid);
        size = sizeofTmpRelDesc(QdGetPlan(ProcGroupInfoP[groupid].queryDesc));
        for (j=0; j<pardelta; j++) {
	    if (NumberOfFreeSlaves == 0) {
		elog(WARN, 
		     "trying to adjust to too much parallelism: out of slaves");
	      }
	    slave = getFreeSlave();
	    SLAVE2_elog(DEBUG, "master adding slave %d to procgroup %d", 
			slave, groupid);
            SlaveInfoP[slave].resultTmpRelDesc = 
					(Relation)ProcGroupSMAlloc(size);
            addSlaveToProcGroup(slave, groupid, oldnproc+j);
	    V_Start(slave);
          }
        ProcGroupSMEndAlloc();
      }
    else {
        ProcGroupInfoP[groupid].newparallel = 
				 ProcGroupInfoP[groupid].nprocess + pardelta;
        SLAVE2_elog(DEBUG,
		    "master signals waiting slaves with adjpage=%d,newpar=%d",
	            ProcGroupInfoP[groupid].paradjpage,
		    ProcGroupInfoP[groupid].newparallel);
	ProcGroupInfoP[groupid].dropoutcounter.count = -pardelta;
        OneSignalM(&(ProcGroupInfoP[groupid].m1lock), oldnproc);
      }
}

/* ----------------------------------------------------------------
 *	ParallelOptimize
 *	
 *	analyzes plan fragments and determines what fragments to execute
 *	and with how much parallelism
 *	
 * ----------------------------------------------------------------
 */
static List
ParallelOptimize(fragmentlist)
List fragmentlist;
{
    LispValue y;
    Fragment fragment;
    int memAvail;
    float loadAvg;
    List readyFragmentList;
    List flist;
    List ioBoundFragList, cpuBoundFragList, unparallelizableFragList;
    List presetFraglist;
    List newIoBoundFragList, newCpuBoundFragList;
    Fragment f1, f2, f;
    int x1, x2;
    List fireFragmentList;
    int nfreeslaves;
    LispValue k, x;
    int parallel;
    Plan plan;
    bool io_running, cpu_running;
    int curpar;
    int pardelta;

    fireFragmentList = LispNil;
    nfreeslaves = NumberOfFreeSlaves;

    /* ------------------------------
     *  find those plan fragments that are ready to run, i.e.,
     *  those with all input data ready.
     * ------------------------------
     */
    readyFragmentList = LispNil;
    foreach (y, fragmentlist) {
	fragment = (Fragment)CAR(y);
	flist = GetReadyFragments(fragment);
	readyFragmentList = nconc(readyFragmentList, flist);
      }
    if (ParallelismMode == INTRA_ONLY) {
	f = (Fragment)CAR(readyFragmentList);
	fireFragmentList = lispCons((LispValue)f, LispNil);
	SetParallelDegree(fireFragmentList, MaxFragParallelism(f));
	return fireFragmentList;
      }
    /* -------------------------------
     *  classify the ready fragments into io-bound, cpu-bound and
     *  unparallelizable
     * -------------------------------
     */
    ClassifyFragments(readyFragmentList, &ioBoundFragList, &cpuBoundFragList, 
		      &unparallelizableFragList, &presetFraglist);
    fireFragmentList = LispNil;
    /* -------------------------------
     *  take care of the unparallelizable fragments first
     * -------------------------------
     */
    if (!lispNullp(unparallelizableFragList)) {
	fireFragmentList = lispCons(CAR(unparallelizableFragList), LispNil);
        SetParallelDegree(fireFragmentList, 1);
	elog(NOTICE, "nonparallelizable fragment, running sequentially\n");
	return fireFragmentList;
      }
    /* --------------------------------
     * deal with those fragments with parallelism preset
     * --------------------------------
     */
    if (!lispNullp(presetFraglist)) {
	foreach (x, presetFraglist) {
	    f = (Fragment)CAR(x);
	    k = parse_parallel(get_frag_parsetree(f));
	    parallel = CInteger(k);
	    SetParallelDegree((y=lispCons((LispValue)f, LispNil)), parallel);
	    fireFragmentList = nconc(fireFragmentList, y);
	  }
	SLAVE_elog(DEBUG, "executing fragments with preset parallelism.");
	return fireFragmentList;
      }
    /* ---------------------------------
     *  now we deal with the parallelizable plan fragments
     * ---------------------------------
     */
    if (MasterSchedulingInfoD.ioBoundFrag != NULL) {
	f1 = MasterSchedulingInfoD.ioBoundFrag;
	io_running = true;
      }
    else {
	io_running = false;
	if (lispNullp(ioBoundFragList))
	    f1 = NULL;
	else
	    f1 = (Fragment)CAR(ioBoundFragList);
      }
    if (MasterSchedulingInfoD.cpuBoundFrag != NULL) {
	f2 = MasterSchedulingInfoD.cpuBoundFrag;
	cpu_running = true;
      }
    else {
	cpu_running = false;
	if (lispNullp(cpuBoundFragList))
	    f2 = NULL;
	else
	    f2 = (Fragment)CAR(cpuBoundFragList);
      }
    if (f1 != NULL && f2 != NULL) {
	if (ParallelismMode != INTER_WO_ADJ || (!io_running && !cpu_running)) {
	    ComputeIoCpuBalancePoint(f1, f2, &x1, &x2);
	    SLAVE2_elog(DEBUG, "executing two fragments at balance point (%d, %d).",
		    x1, x2);
	  }
	if (io_running) {
	    curpar = get_frag_parallel(f1);
	    if (ParallelismMode == INTER_WO_ADJ) {
		int newpar;
		float curband;
		curband = NStriping*DISKBANDWIDTH - curpar*get_frag_iorate(f1);
		newpar = CurMaxFragParallelism(f2, curband, NumberOfFreeSlaves);
		if (newpar == 0) return LispNil;
		fireFragmentList = lispCons((LispValue)f2, LispNil);
		SetParallelDegree(fireFragmentList, newpar);
	      }
	    else {
		pardelta = x1 - curpar;
		if (pardelta != 0) {
		   SLAVE_elog(DEBUG, "adjusting parallelism of io-bound task.");
		   AdjustParallelism(pardelta,
				     MasterSchedulingInfoD.ioBoundGroupId);
		  }
		fireFragmentList = lispCons((LispValue)f2, LispNil);
		if (pardelta >= 0) {
		    SetParallelDegree(fireFragmentList, x2);
		  }
		else {
		    SetParallelDegree(fireFragmentList, NumberOfFreeSlaves);
		  }
	      }
	    MasterSchedulingInfoD.cpuBoundFrag = f2;
	  }
	else if (cpu_running) {
	    curpar = get_frag_parallel(f2);
	    if (ParallelismMode == INTER_WO_ADJ) {
		int newpar;
		float curband;
		curband = NStriping*DISKBANDWIDTH - curpar*get_frag_iorate(f2);
		newpar = CurMaxFragParallelism(f1, curband, NumberOfFreeSlaves);
		if (newpar == 0) return LispNil;
		fireFragmentList = lispCons((LispValue)f1, LispNil);
		SetParallelDegree(fireFragmentList, newpar);
	      }
	    else {
		pardelta = x2 - curpar;
		if (pardelta != 0) {
		  SLAVE_elog(DEBUG, "adjusting parallelism of cpu-bound task.");
		  AdjustParallelism(pardelta,
				    MasterSchedulingInfoD.cpuBoundGroupId);
		  }
		fireFragmentList = lispCons((LispValue)f1, LispNil);
		if (pardelta >= 0) {
		    SetParallelDegree(fireFragmentList, x1);
		  }
		else {
		    SetParallelDegree(fireFragmentList, NumberOfFreeSlaves);
		  }
	      }
	    MasterSchedulingInfoD.ioBoundFrag = f1;
	  }
	else {
	    SetParallelDegree((y=lispCons((LispValue)f1, LispNil)), x1);
	    fireFragmentList = nconc(fireFragmentList, y);
	    SetParallelDegree((y=lispCons((LispValue)f2, LispNil)), x2);
	    fireFragmentList = nconc(fireFragmentList, y);
	    MasterSchedulingInfoD.ioBoundFrag = f1;
	    MasterSchedulingInfoD.cpuBoundFrag = f2;
	  }
      }
    else if (f1 == NULL && f2 != NULL) {
	/* -----------------------------
	 *  out of io-bound fragments, use intra-operation parallelism only
	 *  for cpu-bound fragments.
	 * ------------------------------
	 */
	fireFragmentList = lispCons((LispValue)f2, LispNil);
	SetParallelDegree(fireFragmentList, nfreeslaves);
	MasterSchedulingInfoD.cpuBoundFrag = f2;
	SLAVE1_elog(DEBUG, 
            "out of io-bound tasks, running cpu-bound task with parallelism %d",
            nfreeslaves);
      }
    else if (f1 != NULL && f2 == NULL) {
	nfreeslaves = MaxFragParallelism(f1);
	fireFragmentList = lispCons((LispValue)f1, LispNil);
	SetParallelDegree(fireFragmentList, nfreeslaves);
	fireFragmentList = nconc(fireFragmentList, y);
	MasterSchedulingInfoD.ioBoundFrag = f1;
	SLAVE1_elog(DEBUG, 
           "out of cpu-bound tasks, running io-bound task with parallelism %d",
            nfreeslaves);
       }
     return fireFragmentList;
}

#define MINHASHTABLEMEMORYKEY	1000
static IpcMemoryKey nextHashTableMemoryKey = 0;

/* -------------------------
 *	getNextHashTableMemoryKey
 *
 *	get the next hash table key
 * -------------------------
 */
static IpcMemoryKey
getNextHashTableMemoryKey()
{
    extern int MasterPid;
    return (nextHashTableMemoryKey++ + MINHASHTABLEMEMORYKEY + MasterPid);
}

/* ------------------------
 *	sizeofTmpRelDesc
 *
 *	calculate the size of reldesc of the temporary relation of plan
 * ------------------------
 */
static int
sizeofTmpRelDesc(plan)
Plan plan;
{
    List targetList;
    int natts;
    int size;

    targetList = get_qptargetlist(plan);
    natts = ExecTargetListLength(targetList);
    /* ------------------
     * see CopyRelDescUsing() in lib/C/copyfuncs.c if you want to know
     * how size if derived.
     * ------------------
     */
    size = sizeof(RelationData) + (natts - 1) * sizeof(TupleDescriptorData) +
	   sizeof(RuleLock) + sizeof(RelationTupleFormD) +
	   sizeof(LockInfoData) + 
	   natts * (sizeof(AttributeTupleFormData) + sizeof(RuleLock)) +
	   48; /* some extra for possible LONGALIGN() */
    return size;
}

/* ----------------------------------------------------------------
 *	OptimizeAndExecuteFragments
 *
 *	Optimize plan fragments to explore both intra-fragment
 *	and inter-fragment parallelism and execute the "optimal"
 *	parallel plan
 *
 * ----------------------------------------------------------------
 */
void
OptimizeAndExecuteFragments(fragmentlist, destination)
List 		fragmentlist;
CommandDest	destination;
{
    LispValue		x;
    int			i;
    List		currentFragmentList;
    Fragment		fragment;
    int			nparallel;
    List		finalResultRelation;
    Plan		plan;
    List		parsetree;
    Plan		parentPlan;
    Fragment		parentFragment;
    int			groupid;
    ProcessNode		*p;
    List		subtrees;
    List		fragQueryDesc;
    HashJoinTable	hashtable;
    int			hashTableMemorySize;
    IpcMemoryKey	hashTableMemoryKey;
    IpcMemoryKey	getNextHashTableMemoryKey();
    ScanTemps		scantempNode;
    Relation		tempRelationDesc;
    List		tempRelationDescList;
    Relation		shmTempRelationDesc;
    List		fraglist;
    CommandDest		dest;
    int			size;
    
    fraglist = fragmentlist;
    while (!lispNullp(fraglist)) {
	/* ------------
	 * choose the set of fragments to execute and parallelism
	 * for each fragment.
	 * ------------
	 */
        currentFragmentList = ParallelOptimize(fraglist);
	foreach (x, currentFragmentList) {
	   fragment = (Fragment)CAR(x);
	   nparallel = get_frag_parallel(fragment);
	   plan = get_frag_root(fragment);
	   parsetree = get_frag_parsetree(fragment);
	   parentFragment = (Fragment)get_frag_parent_frag(fragment);
	   finalResultRelation = parse_tree_result_relation(parsetree);
	   dest = destination;
	   dest = None; /* WWW */
	   if (ExecIsHash(plan))  {
	      /* ------------
	       *  if it is hashjoin, create the hash table
	       *  so that the slaves can share it
	       * ------------
	       */
	      hashTableMemoryKey = getNextHashTableMemoryKey();
	      set_hashtablekey((Hash)plan, hashTableMemoryKey);
	      hashtable = ExecHashTableCreate(plan);
	      set_hashtable((Hash)plan, hashtable);
	      hashTableMemorySize = get_hashtablesize((Hash)plan);
	      parse_tree_result_relation(parsetree) = LispNil;
	      }
	   else if (get_fragment(plan) >= 0) {
	      /* ------------
	       *  this means that this an intermediate fragment, so
	       *  the result should be kept in some temporary relation
	       * ------------
	       */
	      /* WWW
	      parse_tree_result_relation(parsetree) =
		  lispCons(lispAtom("intotemp"), LispNil);
	       */
	      dest = None;
	     }
	   /* ---------------
	    * create query descriptor for the fragment
	    * ---------------
	    */
	   fragQueryDesc = CreateQueryDesc(parsetree, plan,
					   (char *) NULL,
					   (ObjectId *) NULL,
					   0, dest);

	   /* ---------------
	    * assign a process group to work on the fragment
	    * ---------------
	    */
	   groupid = getFreeProcGroup(nparallel);
	   if (fragment == MasterSchedulingInfoD.ioBoundFrag)
	       MasterSchedulingInfoD.ioBoundGroupId = groupid;
	   else if (fragment == MasterSchedulingInfoD.cpuBoundFrag)
	       MasterSchedulingInfoD.cpuBoundGroupId = groupid;
	   ProcGroupLocalInfoP[groupid].fragment = fragment;
	   ProcGroupInfoP[groupid].status = WORKING;
	   ProcGroupSMBeginAlloc(groupid);
	   ProcGroupInfoP[groupid].queryDesc = (List)
			CopyObjectUsing((Node)fragQueryDesc, ProcGroupSMAlloc);
	   size = sizeofTmpRelDesc(plan);
	   for (p = ProcGroupLocalInfoP[groupid].memberProc;
		p != NULL;
		p = p->next) {
	       SlaveInfoP[p->pid].resultTmpRelDesc = 
		 (Relation)ProcGroupSMAlloc(size);
	      }
	   ProcGroupSMEndAlloc();
	   ProcGroupInfoP[groupid].scounter.count = nparallel;
	   ProcGroupInfoP[groupid].nprocess = nparallel;
#ifdef 	   TCOP_SLAVESYNCDEBUG
	   {
	       char procs[100];
	       p = ProcGroupLocalInfoP[groupid].memberProc;
	       sprintf(procs, "%d", p->pid);
	       for (p = p->next; p != NULL; p = p->next)
		   sprintf(procs+strlen(procs), ",%d", p->pid);
	       SLAVE2_elog(DEBUG, "master to wake up procgroup %d {%s} for",
			   groupid, procs);
	       set_query_range_table(parsetree);
	       pplan(plan);
	       fprintf(stderr, "\n");
	    }
#endif
	   wakeupProcGroup(groupid);
	   set_frag_is_inprocess(fragment, true);
	   /* ---------------
	    * restore the original result relation descriptor
	    * ---------------
	    */
	   parse_tree_result_relation(parsetree) = finalResultRelation;
	 }

       /* ------------
	* if there are extra processors lying around,
	* dynamically adjust degrees of parallelism of
	* fragments that are already in process.
       if (NumberOfFreeSlaves > 0 && AdjustParallelismEnabled) {
	    AdjustParallelism(NumberOfFreeSlaves, -1);
	 }
	* ------------
	*/

       /* ----------------
	* wait for some process group to complete execution
	* ----------------
	*/
MasterWait:
       P_Finished();

       /* --------------
	* some process group has finished processing a fragment,
	* find that group
	* --------------
	*/
       groupid = getFinishedProcGroup();
       if (ProcGroupInfoP[groupid].status == PARADJPENDING) {
	   /* -----------------------------
	    * master decided earlier than process group, groupid's parallelism
	    * was going to be reduced.  now the adjustment point is reached.
	    * master is ready to collect those slaves spared from the
	    * group.
	    * -----------------------------
	    */
	   ProcessNode *p, *prev, *nextp;
	   int newparallel = ProcGroupInfoP[groupid].newparallel;
	   List tmpreldesclist = LispNil;
	   int nfreeslave;

         SLAVE1_elog(DEBUG, "master woken up by paradjpending process group %d",
		       groupid);
	   ProcGroupInfoP[groupid].status = WORKING;
	   tempRelationDescList = 
			     ProcGroupLocalInfoP[groupid].resultTmpRelDescList;
	   prev = NULL;
	   for (p = ProcGroupLocalInfoP[groupid].memberProc;
		p != NULL; p = nextp) {
	       nextp = p->next;
	       if (SlaveInfoP[p->pid].groupPid >= newparallel) {
		   /* ------------------------
		    * before freeing this slave, we have to save its
		    * resultTmpRelDesc somewhere.
		    * -------------------------
		    */
#ifndef PALLOC_DEBUG
		    tempRelationDesc = CopyRelDescUsing(
					    SlaveInfoP[p->pid].resultTmpRelDesc,
					    palloc);
#else
		    tempRelationDesc = CopyRelDescUsing(
					    SlaveInfoP[p->pid].resultTmpRelDesc,
					    palloc_debug);
#endif
		    tempRelationDescList = nappend1(tempRelationDescList,
						    (LispValue)tempRelationDesc);
		    if (prev == NULL) {
			ProcGroupLocalInfoP[groupid].memberProc = nextp;
		      }
		    else {
			prev->next = nextp;
		      }
		    freeSlave(p->pid);
		  }
	     }
	        ProcGroupLocalInfoP[groupid].resultTmpRelDescList =
							tempRelationDescList;
		/* --------------------------
		 * now we have to adjust nprocess and countdown of group groupid
		 * --------------------------
		 */
		nfreeslave = ProcGroupInfoP[groupid].nprocess - 
			     ProcGroupInfoP[groupid].newparallel;
		ProcGroupInfoP[groupid].nprocess = 
				ProcGroupInfoP[groupid].newparallel;
		ProcGroupInfoP[groupid].countdown -= nfreeslave;
		/* ---------------------------
		 * adjust parallelism with the freed slaves
		 * ---------------------------
		 */
		if (MasterSchedulingInfoD.ioBoundGroupId == groupid)
		    AdjustParallelism(nfreeslave, 
				      MasterSchedulingInfoD.cpuBoundGroupId);
		else
		    AdjustParallelism(nfreeslave,
				      MasterSchedulingInfoD.ioBoundGroupId);
		if (ProcGroupInfoP[groupid].countdown == 0) {
		    /* -----------------------
		     * this means that this group has actually finished
		     * go down to process the group
		     * -----------------------
		     */
		   }
		else {
		    /* ------------------------
		     * otherwise, go to P_Finished()
		     * ------------------------
		     */
		    goto MasterWait;
		  }
	  }
       SLAVE1_elog(DEBUG, "master woken up by finished process group %d", 
		   groupid);
       fragment = ProcGroupLocalInfoP[groupid].fragment;
       if (fragment == MasterSchedulingInfoD.ioBoundFrag) {
	   MasterSchedulingInfoD.ioBoundFrag = NULL;
	   MasterSchedulingInfoD.ioBoundGroupId = -1;
	 }
       else if (fragment == MasterSchedulingInfoD.cpuBoundFrag) {
	   MasterSchedulingInfoD.cpuBoundFrag = NULL;
	   MasterSchedulingInfoD.cpuBoundGroupId = -1;
	 }
       nparallel = get_frag_parallel(fragment);
       plan = get_frag_root(fragment);
       parentPlan = get_frag_parent_op(fragment);
       parentFragment = (Fragment)get_frag_parent_frag(fragment);
       /* ---------------
	* delete the finished fragment from the subtree list of its
	* parent fragment
	* ---------------
	*/
       if (parentFragment == NULL)
	  subtrees = LispNil;
       else {
	  subtrees = get_frag_subtrees(parentFragment);
	  set_frag_subtrees(parentFragment,
			    nLispRemove(subtrees, (LispValue)fragment));
	 }
       /* ----------------
	* let the parent fragment know where the result is
	* ----------------
	*/
       if (ExecIsHash(plan)) {
	   /* ----------------
	    *  if it is hashjoin, let the parent know where the hash table is
	    * ----------------
	    */
	   set_hashjointable((HashJoin)parentPlan, hashtable);
	   set_hashjointablekey((HashJoin)parentPlan, hashTableMemoryKey);
	   set_hashjointablesize((HashJoin)parentPlan, hashTableMemorySize);
	   set_hashdone((HashJoin)parentPlan, true);
	  }
       else {
	   List unionplans = LispNil;
	   /* ------------------
	    * if it is ScanTemps node, clean up the temporary relations
	    * they are not needed any more
	    * ------------------
	    */
	   if (ExecIsScanTemps(plan)) {
	       Relation tempreldesc;
	       List	tempRelDescs;
	       LispValue y;

	       tempRelDescs = get_temprelDescs((ScanTemps)plan);
	       foreach (y, tempRelDescs) {
		   tempreldesc = (Relation)CAR(y);
		   ReleaseTmpRelBuffers(tempreldesc);
		   if (FileNameUnlink(
			  relpath((char*)&(tempreldesc->rd_rel->relname))) < 0)
		       elog(WARN, "ExecEndScanTemp: unlink: %m");
		}
	     }
	   if (parentPlan == NULL /* WWW && nparallel == 1 */)
	      /* in this case the whole plan has been finished */
	      fraglist = nLispRemove(fraglist, (LispValue)fragment);
	   else {
	      /* -----------------
	       *  make a ScanTemps node to let the parent collect the tuples
	       *  from a set of temporary relations
	       * -----------------
	       */
	      tempRelationDescList = 
			ProcGroupLocalInfoP[groupid].resultTmpRelDescList;
	      p = ProcGroupLocalInfoP[groupid].memberProc;
	      for (p = ProcGroupLocalInfoP[groupid].memberProc;
		   p != NULL;
		   p = p->next) {
		 shmTempRelationDesc = SlaveInfoP[p->pid].resultTmpRelDesc;
#ifndef PALLOC_DEBUG		     
		 tempRelationDesc = CopyRelDescUsing(shmTempRelationDesc,
						     palloc);
#else
		 tempRelationDesc = CopyRelDescUsing(shmTempRelationDesc,
						     palloc_debug);
#endif PALLOC_DEBUG		     
		 tempRelationDescList = nappend1(tempRelationDescList, 
						 (LispValue)tempRelationDesc);
		 }
	      scantempNode = RMakeScanTemps();
	      set_qptargetlist((Plan)scantempNode, get_qptargetlist(plan));
	      set_temprelDescs(scantempNode, tempRelationDescList);
	      if (parentPlan == NULL) {
		 set_frag_root(fragment, (Plan)scantempNode);
		 set_frag_subtrees(fragment, LispNil);
		 set_fragment((Plan)scantempNode,-1);
					    /*means end of parallelism */
		 set_frag_is_inprocess(fragment, false);
		 set_frag_iorate(fragment, 0.0);
		}
	      else {
	      if (plan == (Plan)get_lefttree(parentPlan)) {
		 set_lefttree(parentPlan, (PlanPtr)scantempNode);
		}
	      else {
		 set_righttree(parentPlan, (PlanPtr)scantempNode);
		}
	      set_fragment((Plan)scantempNode, get_fragment(parentPlan));
	      }
	   }
         }
       /* -----------------
	*  free shared memory
	*  free the finished processed group
	* -----------------
	*/
       ProcGroupSMClean(groupid);
       freeProcGroup(groupid);
     }
}
@


1.33
log
@compute target list correctly
@
text
@d14 1
a14 1
 *	$Header: /private/mer/pg/src/tcop/RCS/pfrag.c,v 1.32 1992/06/28 03:48:22 mao Exp mer $
d45 1
a45 1
 RcsId("$Header: /private/mer/pg/src/tcop/RCS/pfrag.c,v 1.32 1992/06/28 03:48:22 mao Exp mer $");
d420 1
a420 1
	    if (strcmp(&opname, "=") == 0)
@


1.32
log
@rearrange parse, plan to support postquel function invocations
@
text
@d14 1
a14 1
 *	$Header: /private/mao/postgres/src/tcop/RCS/pfrag.c,v 1.31 1992/06/18 06:04:44 hong Exp mao $
d45 1
a45 1
 RcsId("$Header: /private/mao/postgres/src/tcop/RCS/pfrag.c,v 1.31 1992/06/18 06:04:44 hong Exp mao $");
d925 1
a925 1
    natts = length(targetList);
@


1.31
log
@take care of a compiler warning
@
text
@d14 1
a14 1
 *	$Header: /mnt/hong/postgres/src/tcop/RCS/pfrag.c,v 1.30 1992/06/16 21:29:18 hong Exp hong $
d45 1
a45 1
 RcsId("$Header: /mnt/hong/postgres/src/tcop/RCS/pfrag.c,v 1.30 1992/06/16 21:29:18 hong Exp hong $");
d1025 4
a1028 1
	   fragQueryDesc = CreateQueryDesc(parsetree, plan, dest);
@


1.30
log
@to get rid of a compiler complaint
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.29 92/03/31 23:12:23 mer Exp $
d45 1
a45 1
 RcsId("$Header: RCS/pfrag.c,v 1.29 92/03/31 23:12:23 mer Exp $");
a127 2
    default:
	return DEFPERTUPTIME;
d129 1
a129 1
    return 0.0;
@


1.29
log
@change accessor functions into macros
@
text
@d14 1
a14 1
 *	$Header: /users/mer/pg/src/tcop/RCS/pfrag.c,v 1.28 1992/02/07 10:50:35 hong Exp mer $
d45 1
a45 1
 RcsId("$Header: /users/mer/pg/src/tcop/RCS/pfrag.c,v 1.28 1992/02/07 10:50:35 hong Exp mer $");
d131 1
@


1.28
log
@fixed prototyping warnings
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.27 92/01/21 16:43:58 hong Exp $
d45 1
a45 1
 RcsId("$Header: RCS/pfrag.c,v 1.27 92/01/21 16:43:58 hong Exp $");
d79 1
a79 1
          set_frag_root(fragment, get_lefttree(node));
d98 1
a98 1
          set_frag_root(fragment, get_righttree(node));
d229 1
a229 1
	      set_frag_parent_frag(frag, fragment);
d993 1
a993 1
	   parentFragment = get_frag_parent_frag(fragment);
d1196 1
a1196 1
       parentFragment = get_frag_parent_frag(fragment);
d1282 2
a1283 2
	      if (plan == get_lefttree(parentPlan)) {
		 set_lefttree(parentPlan, (Plan)scantempNode);
d1286 1
a1286 1
		 set_righttree(parentPlan, (Plan)scantempNode);
@


1.27
log
@changes for inter-fragment parallelism
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.26 91/11/17 21:06:36 mer Exp Locker: hong $
d45 1
a45 1
 RcsId("$Header: RCS/pfrag.c,v 1.26 91/11/17 21:06:36 mer Exp Locker: hong $");
a130 1
    return 0.0;
d446 1
a446 1
	return lispCons(frag, LispNil);
d449 1
a449 1
	return(nconc(lispCons(frag, LispNil), ioboundlist));
d452 1
a452 1
	return(nconc(lispCons(f, LispNil), 
d473 1
a473 1
	return lispCons(frag, LispNil);
d476 1
a476 1
	return(nconc(lispCons(frag, LispNil), cpuboundlist));
d479 1
a479 1
	return(nconc(lispCons(f, LispNil), 
d513 2
a514 1
	    *unparallelizablelist = nappend1(*unparallelizablelist, f);
d517 1
a517 1
	    *presetlist = nappend1(*presetlist, f);
d735 1
a735 1
	fireFragmentList = lispCons(f, LispNil);
d766 1
a766 1
	    SetParallelDegree((y=lispCons(f, LispNil)), parallel);
d812 1
a812 1
		fireFragmentList = lispCons(f2, LispNil);
d822 1
a822 1
		fireFragmentList = lispCons(f2, LispNil);
d840 1
a840 1
		fireFragmentList = lispCons(f1, LispNil);
d850 1
a850 1
		fireFragmentList = lispCons(f1, LispNil);
d861 1
a861 1
	    SetParallelDegree((y=lispCons(f1, LispNil)), x1);
d863 1
a863 1
	    SetParallelDegree((y=lispCons(f2, LispNil)), x2);
d875 1
a875 1
	fireFragmentList = lispCons(f2, LispNil);
d884 1
a884 1
	fireFragmentList = lispCons(f1, LispNil);
@


1.26
log
@prototyping
@
text
@d14 1
a14 1
 *	$Header: /users/mer/postgres/src/tcop/RCS/pfrag.c,v 1.25 1991/11/11 22:59:33 hong Exp $
d19 1
d45 1
a45 1
 RcsId("$Header: /users/mer/postgres/src/tcop/RCS/pfrag.c,v 1.25 1991/11/11 22:59:33 hong Exp $");
d48 3
d85 1
d104 1
d115 72
d217 1
d236 1
d395 7
a401 1
bool
d431 6
a436 8
/* ----------------------------------------------------------------
 *	ParallelOptimize
 *	
 *	this analyzes the plan in the query descriptor and determines
 *	which fragments to execute based on available virtual
 *	memory resources...
 *	
 * ----------------------------------------------------------------
d439 3
a441 2
ParallelOptimize(fragmentlist)
List fragmentlist;
d444 1
a444 10
    Fragment fragment;
    int memAvail;
    float loadAvg;
    List fireFragments;
    List fireFragmentList;
    int nfreeslaves;
    List parsetree;
    LispValue k;
    int parallel;
    Plan plan;
d446 69
a514 8
    fireFragmentList = LispNil;
    nfreeslaves = NumberOfFreeSlaves;
    foreach (x, fragmentlist) {
	fragment = (Fragment)CAR(x);
	plan = get_frag_root(fragment);
	if (!plan_is_parallelizable(plan)) {
	    parallel = 1;
	    elog(NOTICE, "nonparallelizable fragment, running sequentially\n");
d516 3
d520 3
a522 7
	    parsetree = get_frag_parsetree(fragment);
	    k = parse_parallel(parsetree);
	    if (lispNullp(k)) {
		if (lispNullp(CDR(x)))
		    parallel = nfreeslaves;
		else
		    parallel = 1;
d525 1
a525 3
		parallel = CInteger(k);
		if (parallel > nfreeslaves || parallel == 0)
		    parallel = nfreeslaves;
a527 7
        memAvail = GetCurrentMemSize();
        loadAvg = GetCurrentLoadAverage();
        fireFragments = ChooseFragments(fragment, memAvail);
        SetParallelDegree(fireFragments, parallel);
	nfreeslaves -= parallel;
	if (parallel > 0)
	    fireFragmentList = nconc(fireFragmentList, fireFragments);
a528 1
    return fireFragmentList;
d531 16
a546 2
#define MINHASHTABLEMEMORYKEY	1000
static IpcMemoryKey nextHashTableMemoryKey = 0;
d548 10
a557 2
/* -------------------------
 *	getNextHashTableMemoryKey
d559 3
a561 2
 *	get the next hash table key
 * -------------------------
d563 3
a565 2
static IpcMemoryKey
getNextHashTableMemoryKey()
d567 11
a577 2
    extern int MasterPid;
    return (nextHashTableMemoryKey++ + MINHASHTABLEMEMORYKEY + MasterPid);
d580 2
a581 2
/* ------------------------
 *	sizeofTmpRelDesc
d583 4
a586 2
 *	calculate the size of reldesc of the temporary relation of plan
 * ------------------------
d589 4
a592 2
sizeofTmpRelDesc(plan)
Plan plan;
d594 2
a595 3
    List targetList;
    int natts;
    int size;
d597 8
a604 13
    targetList = get_qptargetlist(plan);
    natts = length(targetList);
    /* ------------------
     * see CopyRelDescUsing() in lib/C/copyfuncs.c if you want to know
     * how size if derived.
     * ------------------
     */
    size = sizeof(RelationData) + (natts - 1) * sizeof(TupleDescriptorData) +
	   sizeof(RuleLock) + sizeof(RelationTupleFormD) +
	   sizeof(LockInfoData) + 
	   natts * (sizeof(AttributeTupleFormData) + sizeof(RuleLock)) +
	   48; /* some extra for possible LONGALIGN() */
    return size;
d615 1
a615 1
AdjustParallelism(pardelta, notgroupid)
d617 1
a617 1
int notgroupid; /* do not adjust this proc group */
d619 1
a619 1
    int i, j;
d627 3
a629 19
    for (i=0; i<GetNumberSlaveBackends(); i++) {
	/* ---------------
	 * for now we only adjust the parallelism of the
	 * first fragment in process, will become
	 * more elaborate later.
	 * ---------------
	 */
	if (i != notgroupid &&
	    ProcGroupInfoP[i].status == WORKING &&
	    get_fragment(QdGetPlan(ProcGroupInfoP[i].queryDesc)) >= 0)
	    break;
      };
    if (i == GetNumberSlaveBackends()) {
	SLAVE_elog(DEBUG, "master finds no fragment to adjust parallelism to");
	return;
      }
    SLAVE1_elog(DEBUG, "master sending signal to process group %d", i);
    signalProcGroup(i, SIGPARADJ);
    max_curpage = getProcGroupMaxPage(i);
d631 1
a631 1
    oldnproc = ProcGroupInfoP[i].nprocess;
d639 2
a640 2
	ProcGroupInfoP[i].paradjpage = NOPARADJ;
        OneSignalM(&(ProcGroupInfoP[i].m1lock), oldnproc);
d643 1
a643 1
    ProcGroupInfoP[i].paradjpage = max_curpage + 1; 
d646 4
a649 3
        ProcGroupInfoP[i].nprocess += pardelta;
        ProcGroupInfoP[i].scounter.count = ProcGroupInfoP[i].nprocess;
        ProcGroupInfoP[i].newparallel = ProcGroupInfoP[i].nprocess;
d652 8
a659 7
	            ProcGroupInfoP[i].paradjpage,ProcGroupInfoP[i].newparallel);
        OneSignalM(&(ProcGroupInfoP[i].m1lock), oldnproc);
        set_frag_parallel(ProcGroupLocalInfoP[i].fragment,
		          get_frag_parallel(ProcGroupLocalInfoP[i].fragment)+
		          pardelta);
        ProcGroupSMBeginAlloc(i);
        size = sizeofTmpRelDesc(QdGetPlan(ProcGroupInfoP[i].queryDesc));
d667 1
a667 1
			slave, i);
d670 1
a670 1
            addSlaveToProcGroup(slave, i, oldnproc+j);
d676 2
a677 1
        ProcGroupInfoP[i].newparallel = ProcGroupInfoP[i].nprocess + pardelta;
d680 4
a683 3
	            ProcGroupInfoP[i].paradjpage,ProcGroupInfoP[i].newparallel);
	ProcGroupInfoP[i].dropoutcounter.count = -pardelta;
        OneSignalM(&(ProcGroupInfoP[i].m1lock), oldnproc);
d688 253
d996 1
d1016 1
d1019 1
d1033 4
a1078 2
	* ------------
	*/
d1082 2
d1113 1
d1161 6
a1166 1
		AdjustParallelism(nfreeslave, groupid);
d1185 8
d1244 1
a1244 1
	   if (parentPlan == NULL && nparallel == 1)
d1279 1
@


1.25
log
@for prototyping
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.24 91/11/09 09:51:00 hong Exp $
d44 1
a44 1
 RcsId("$Header: RCS/pfrag.c,v 1.24 91/11/09 09:51:00 hong Exp $");
d81 1
a81 1
          subFragments = nappend1(subFragments, fragment);
d99 1
a99 1
          subFragments = nappend1(subFragments, fragment);
d140 1
a140 1
    fragmentlist = lispCons(rootFragment, LispNil);
d204 1
a204 1
       return lispCons(fragments, LispNil);
d730 1
a730 1
						    tempRelationDesc);
d787 2
a788 1
	  set_frag_subtrees(parentFragment, nLispRemove(subtrees, fragment));
d827 1
a827 1
	      fraglist = nLispRemove(fraglist, fragment);
d849 1
a849 1
						 tempRelationDesc);
@


1.24
log
@added mechanism for reducing parallelism of a fragment
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.23 91/10/09 14:53:29 hong Exp $
d18 2
a19 1
#include "tcop/tcop.h"
a22 1
#include "nodes/execnodes.h"
d25 1
a26 1
#include "nodes/plannodes.a.h"
d31 2
d37 6
d44 1
a44 1
 RcsId("$Header: RCS/pfrag.c,v 1.23 91/10/09 14:53:29 hong Exp $");
a45 4
extern ScanTemps RMakeScanTemps();
extern Fragment RMakeFragment();
extern Relation CopyRelDescUsing();

d57 1
a57 1
List
d168 1
a168 1
int
d180 1
a180 1
float
d193 1
a193 1
List
d219 1
a219 1
List
d234 1
a234 1
List
d249 1
a249 1
List
d264 1
a264 1
List
d291 1
a291 1
void
d330 1
a330 1
	indxqual = get_indxqual(plan);
d354 1
a354 1
List
d414 1
a414 1
IpcMemoryKey
d457 1
a457 1
void
d530 1
a530 1
        ProcGroupSMEndAlloc(i);
d605 1
a605 1
	      set_hashtablekey(plan, hashTableMemoryKey);
d607 2
a608 2
	      set_hashtable(plan, hashtable);
	      hashTableMemorySize = get_hashtablesize(plan);
d636 1
a636 1
			CopyObjectUsing(fragQueryDesc, ProcGroupSMAlloc);
d798 4
a801 4
	   set_hashjointable(parentPlan, hashtable);
	   set_hashjointablekey(parentPlan, hashTableMemoryKey);
	   set_hashjointablesize(parentPlan, hashTableMemorySize);
	   set_hashdone(parentPlan, true);
d815 1
a815 1
	       tempRelDescs = get_temprelDescs(plan);
d820 1
a820 1
			  relpath(&(tempreldesc->rd_rel->relname))) < 0)
d851 1
a851 1
	      set_qptargetlist(scantempNode, get_qptargetlist(plan));
d854 1
a854 1
		 set_frag_root(fragment, scantempNode);
d856 2
a857 1
		 set_fragment(scantempNode,-1);/*means end of parallelism */
d861 2
a862 2
	      if (plan == (Plan)get_lefttree(parentPlan)) {
		 set_lefttree(parentPlan, scantempNode);
d865 1
a865 1
		 set_righttree(parentPlan, scantempNode);
d867 1
a867 1
	      set_fragment(scantempNode, get_fragment(parentPlan));
@


1.23
log
@fixed a bug in parallelism adjustment
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.22 91/10/08 01:13:48 hong Exp $
d36 1
a36 1
 RcsId("$Header: RCS/pfrag.c,v 1.22 91/10/08 01:13:48 hong Exp $");
d454 3
a456 1
AdjustParallelism()
a462 2
    int nfree;
    extern MasterCommunicationData *MasterDataP;
d464 1
d473 2
a474 1
	if (ProcGroupInfoP[i].status == WORKING &&
d494 2
a495 2
        MasterDataP->data[0] = NOPARADJ;
        OneSignalM(&(MasterDataP->m1lock), oldnproc);
d498 29
a526 19
    MasterDataP->data[0] = max_curpage + 1; /* page on which to adjust par. */
    ProcGroupInfoP[i].nprocess += NumberOfFreeSlaves;
    ProcGroupInfoP[i].countdown = ProcGroupInfoP[i].nprocess;
    MasterDataP->data[1] = ProcGroupInfoP[i].nprocess;
    SLAVE2_elog(DEBUG,"master signals waiting slaves with adjpage=%d,newpar=%d",
	        MasterDataP->data[0], MasterDataP->data[1]);
    OneSignalM(&(MasterDataP->m1lock), oldnproc);
    set_frag_parallel(ProcGroupLocalInfoP[i].fragment,
		      get_frag_parallel(ProcGroupLocalInfoP[i].fragment)+
		      NumberOfFreeSlaves);
    ProcGroupSMBeginAlloc(i);
    size = sizeofTmpRelDesc(QdGetPlan(ProcGroupInfoP[i].queryDesc));
    nfree = NumberOfFreeSlaves;
    for (j=0; j<nfree; j++) {
	slave = getFreeSlave();
	SLAVE2_elog(DEBUG, "master adding slave %d to procgroup %d", slave, i);
        SlaveInfoP[slave].resultTmpRelDesc = (Relation)ProcGroupSMAlloc(size);
        addSlaveToProcGroup(slave, i, oldnproc+j);
	V_Start(slave);
d528 8
a535 1
    ProcGroupSMEndAlloc(i);
d641 1
a641 1
	   ProcGroupInfoP[groupid].countdown = nparallel;
d673 1
a673 1
	    AdjustParallelism();
d680 1
d689 80
a768 1
       SLAVE1_elog(DEBUG, "master woken up by process group %d", groupid);
d829 2
a830 1
	      tempRelationDescList = LispNil;
@


1.22
log
@bug fixing for inter-query parallelism and dynamic parallelism adjustment
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.21 91/08/25 13:35:03 hong Exp Locker: hong $
d36 1
a36 1
 RcsId("$Header: RCS/pfrag.c,v 1.21 91/08/25 13:35:03 hong Exp Locker: hong $");
d484 12
a496 1
    oldnproc = ProcGroupInfoP[i].nprocess;
@


1.21
log
@added supports for dynamic parallelism adjustment
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.20 91/08/18 12:33:44 hong Exp Locker: hong $
d36 1
a36 1
 RcsId("$Header: RCS/pfrag.c,v 1.20 91/08/18 12:33:44 hong Exp Locker: hong $");
d42 1
a42 1
int AdjustParallelismEnabled = 0;
d456 1
a456 1
    int i;
d459 3
a461 1
    int m;
d464 1
d472 2
a473 1
	if (ProcGroupInfoP[i].status == WORKING)
d476 5
d483 1
d485 1
a485 1
    m = ProcGroupInfoP[i].nprocess;
d487 1
d489 3
a491 1
    OneSignalM(&(MasterDataP->m1lock), m);
d495 4
a498 1
    for (i=0; i<NumberOfFreeSlaves; i++) {
d500 3
a502 2
	SlaveInfoP[slave].isAddOnSlave = true;
        addSlaveToProcGroup(slave, i);
d505 1
a555 9
	/* ------------
	 * if there are extra processors lying around,
	 * dynamically adjust degrees of parallelism of
	 * fragments that are already in process.
	 * ------------
	 */
	if (lispNullp(currentFragmentList) && NumberOfFreeSlaves > 0 &&
	    AdjustParallelismEnabled)
	    AdjustParallelism();
d577 1
a577 1
	   else if (parentFragment != NULL || nparallel > 1) {
d613 14
d636 10
d658 1
@


1.20
log
@fixed a stupid bug
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.19 91/08/08 15:39:11 hong Exp Locker: hong $
d34 1
d36 1
a36 1
 RcsId("$Header: RCS/pfrag.c,v 1.19 91/08/08 15:39:11 hong Exp Locker: hong $");
d42 2
d65 1
a65 1
       if (ExecIsHash(get_lefttree(node)) || ExecIsSort(get_lefttree(node))) {
a280 29
/* -----------------------------
 *	set_plan_parallel
 *
 *	set the degree of parallelism for each node in plan
 * -----------------------------
 */
void
set_plan_parallel(plan, nparallel)
Plan plan;
int nparallel;
{
    if (plan == NULL)
       return;
    if (ExecIsNestLoop(plan))  {
       /* --------------------
	* inner path of nestloop join plan should have parallelism 1
	* --------------------
	*/
       set_parallel(plan, nparallel);
       set_plan_parallel(get_outerPlan(plan), nparallel);
       set_plan_parallel(get_innerPlan(plan), 1);
      }
    else  {
       set_parallel(plan, nparallel);
       set_plan_parallel(get_lefttree(plan), nparallel);
       set_plan_parallel(get_righttree(plan), nparallel);
      }
}

a303 1
           set_plan_parallel(plan, 1);
a306 1
           set_plan_parallel(plan, nfreeslaves);
d311 29
d364 1
d370 4
a373 7
	parsetree = get_frag_parsetree(fragment);
	k = parse_parallel(parsetree);
	if (lispNullp(k)) {
	    if (lispNullp(CDR(x)))
		parallel = nfreeslaves;
	    else
	        parallel = 1;
d376 13
a388 3
	    parallel = CInteger(k);
	    if (parallel > nfreeslaves || parallel == 0)
		parallel = nfreeslaves;
d442 1
a442 1
	   12; /* some extra for possible LONGALIGN() */
d446 44
d528 1
d538 9
d594 1
d599 1
a599 1
		 (Relation)ProcGroupSMAlloc(sizeofTmpRelDesc(plan));
d603 1
@


1.19
log
@more support for inter-fragment inter-query parallelism
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.18 91/08/02 19:00:46 hong Exp Locker: kemnitz $
d35 1
a35 1
 RcsId("$Header: RCS/pfrag.c,v 1.18 91/08/02 19:00:46 hong Exp Locker: kemnitz $");
d643 1
@


1.18
log
@minor bug fixes and added some more comments
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.17 91/07/24 16:13:40 hong Exp Locker: hong $
d35 1
a35 1
 RcsId("$Header: RCS/pfrag.c,v 1.17 91/07/24 16:13:40 hong Exp Locker: hong $");
d73 1
d91 1
d131 1
d195 2
a196 1
    if (lispNullp(get_frag_subtrees(fragments)))
d267 1
d359 4
d365 1
d368 13
d384 4
a387 4
        SetParallelDegree(fireFragments, NumberOfFreeSlaves);
	fireFragmentList = fireFragments;
	break;
	fireFragmentList = nconc(fireFragmentList, fireFragments);
d540 1
@


1.17
log
@changes to use the new shared memory manager for parallel executor
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.16 91/07/17 23:45:03 hong Exp Locker: hong $
d32 1
a32 1
#include "utils/lmgr.h"
d35 1
a35 1
 RcsId("$Header: RCS/pfrag.c,v 1.16 91/07/17 23:45:03 hong Exp Locker: hong $");
d41 9
d63 4
d76 1
a76 1
          subFragments = FindFragments(get_lefttree(node), fragmentNo);
d80 4
d93 1
a93 1
         newFragments = FindFragments(get_righttree(node), fragmentNo);
d100 8
d111 1
d152 6
d164 6
d176 7
d202 6
d216 7
d231 7
d247 6
d273 6
d287 4
d302 6
d362 2
d372 6
d385 6
@


1.16
log
@changes to support inter-fragment and inter-query parallelism in parallel
executor, non-parallel processing remain the same as before
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.15 91/06/18 23:28:36 cimarron Exp $
d32 2
d35 1
a35 1
 RcsId("$Header: RCS/pfrag.c,v 1.15 91/06/18 23:28:36 cimarron Exp $");
d290 23
d404 1
d406 8
a413 1
			CopyObjectUsing(fragQueryDesc, ExecSMAlloc);
d531 1
d535 1
a537 6
	    
	/* ----------------
	 *	Clean Shared Memory used during the query
	 * ----------------
	 */
	ExecSMClean();
@


1.15
log
@reorganized executor to use tuple table properly for nested dot stuff
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.14 91/04/18 11:38:42 hong Exp Locker: cimarron $
d20 1
d24 1
d33 1
a33 1
 RcsId("$Header: RCS/pfrag.c,v 1.14 91/04/18 11:38:42 hong Exp Locker: cimarron $");
a34 11
/* ----------------------------------------------------------------
 *	ExecuteFragments
 *
 *	Read the comments for ProcessQuery() below...
 *
 *	Since we do no parallism, planFragments is totally
 *	ignored for now.. -cim 9/18/89
 * ----------------------------------------------------------------
 */
extern Pointer *SlaveQueryDescsP;
extern Pointer *SlaveRetStateP;
a38 204
Fragment
ExecuteFragments(queryDesc, fragmentlist, rootFragment)
List 	queryDesc;
List 	fragmentlist;
Fragment rootFragment;
{
    int			nparallel;
    int			i;
    int			nproc;
    int			nfrag;
    LispValue		x;
    Fragment		fragment;
    Plan		plan;
    Plan		parentPlan;
    List		parseTree;
    HashJoinTable	hashtable;
    List		subtrees;
    Fragment		parentFragment;
    List		fragQueryDesc;
    ScanTemps		scantempNode;
    Relation		tempRelationDesc;
    List		tempRelationDescList;
    Relation		shmTempRelationDesc;
    List		finalResultRelation;
    int			hashTableMemorySize;
    IpcMemoryKey	hashTableMemoryKey;
    IpcMemoryKey	getNextHashTableMemoryKey();
    CommandDest		dest;
    
    /* ----------------
     *	execute the query appropriately if we are running one or
     *  several backend processes.
     * ----------------
     */
    
    if (! ParallelExecutorEnabled()) {
	/* ----------------
	 *   single-backend case. just execute the query
	 *   and return NULL.
	 * ----------------
	 */
	ProcessQueryDesc(queryDesc);
	return NULL;
    } else {
	/* ----------------
	 *   parallel-backend case.  place each plan fragment
	 *   in shared memory and signal slave-backend execution.
	 *   When slave execution is completed, form a new plan
	 *   representing the query with some of the work materialized
	 *   and return this to ProcessQuery().
	 *
	 * ----------------
	 */

	nproc = 0;
	nfrag = 0;
	foreach (x, fragmentlist) {
	   fragment = (Fragment)CAR(x);
	   nparallel = get_frag_parallel(fragment);
	   plan = get_frag_root(fragment);
	   parseTree = QdGetParseTree(queryDesc);
	   dest = QdGetDest(queryDesc);
	   finalResultRelation = parse_tree_result_relation(parseTree);
	   if (ExecIsHash(plan))  {
	      hashTableMemoryKey = getNextHashTableMemoryKey();
	      set_hashtablekey(plan, hashTableMemoryKey);
	      hashtable = ExecHashTableCreate(plan);
	      set_hashtable(plan, hashtable);
	      hashTableMemorySize = get_hashtablesize(plan);
	      parse_tree_result_relation(parseTree) = LispNil;
	      }
	   else if (fragment != rootFragment || nparallel > 1) {
	      parse_tree_result_relation(parseTree) =
		  lispCons(lispAtom("intotemp"), LispNil);
	      dest = None;
	     }
	   fragQueryDesc = CreateQueryDesc(parseTree, plan, dest);

	/* ----------------
	 *	place fragments in shared memory here.  
	 *	each fragment only put one copy of the its
	 *	querydesc in the shared memory.  each slave process
	 *	that executes this fragment will have to
	 *	make a local copy of the querydesc to work on.
	 * ----------------
	 */
	   SlaveQueryDescsP[nfrag] = (Pointer)
	   		CopyObjectUsing(fragQueryDesc, ExecSMAlloc);
	   nproc += nparallel;
	   nfrag++;
	  }

	parse_tree_result_relation(parseTree) = finalResultRelation;

	/* ----------------
	 *	signal slave execution start
	 * ----------------
	 */
	for (i=1; i<nproc; i++) {
	    SLAVE1_elog(DEBUG, "Master Backend: signaling slave %d", i);
	    V_Start(i);
	}

	fragQueryDesc = (List)CopyObject((List)SlaveQueryDescsP[0]);
	ProcessQueryDesc(fragQueryDesc);

	/* ----------------
	 *	wait for slaves to complete execution
	 * ----------------
	 */
	SLAVE_elog(DEBUG, "Master Backend: waiting for slaves...");
	P_Finished(nproc - 1);
	SLAVE_elog(DEBUG, "Master Backend: slaves execution complete!");

	nproc = 0;
	foreach (x, fragmentlist) {
	   fragment = (Fragment)CAR(x);
	   nparallel = get_frag_parallel(fragment);
	   plan = get_frag_root(fragment);
	   parentPlan = get_frag_parent_op(fragment);
	   parentFragment = get_frag_parent_frag(fragment);
	   if (parentFragment == NULL)
	      subtrees = LispNil;
	   else {
	      subtrees = get_frag_subtrees(parentFragment);
	      set_frag_subtrees(parentFragment, set_difference(subtrees,
	                                     lispCons(fragment, LispNil)));
	    }
	   if (ExecIsHash(plan)) {
	       set_hashjointable(parentPlan, hashtable);
	       set_hashjointablekey(parentPlan, hashTableMemoryKey);
	       set_hashjointablesize(parentPlan, hashTableMemorySize);
	       set_hashdone(parentPlan, true);
	      }
   	   else {
	       List unionplans = LispNil;
	       if (ExecIsScanTemps(plan)) {
		   Relation tempreldesc;
		   List	tempRelDescs;
		   LispValue y;

		   tempRelDescs = get_temprelDescs(plan);
		   foreach (y, tempRelDescs) {
		       tempreldesc = (Relation)CAR(y);
		       ReleaseTmpRelBuffers(tempreldesc);
		       if (FileNameUnlink(
			      relpath(&(tempreldesc->rd_rel->relname))) < 0)
			   elog(WARN, "ExecEndScanTemp: unlink: %m");
		    }
		 }
	       if (parentPlan == NULL && nparallel == 1)
		  rootFragment = NULL;
	       else {
		  tempRelationDescList = LispNil;
		  for (i=0; i<nparallel; i++) {
		     shmTempRelationDesc=get_resultTmpRelDesc(
				(ReturnState)SlaveRetStateP[nproc+i]);
#ifndef PALLOC_DEBUG		     
		     tempRelationDesc = CopyRelDescUsing(shmTempRelationDesc,
							 palloc);
#else
		     tempRelationDesc = CopyRelDescUsing(shmTempRelationDesc,
							 palloc_debug);
#endif PALLOC_DEBUG		     
		     tempRelationDescList = nappend1(tempRelationDescList, 
						     tempRelationDesc);
		     }
		  scantempNode = RMakeScanTemps();
		  set_qptargetlist(scantempNode, get_qptargetlist(plan));
		  set_temprelDescs(scantempNode, tempRelationDescList);
		  if (parentPlan == NULL) {
		     set_frag_root(rootFragment, scantempNode);
		     set_frag_subtrees(rootFragment, LispNil);
		     set_fragment(scantempNode,-1);/*means end of parallelism */
		    }
	          else {
	          if (plan == (Plan)get_lefttree(parentPlan)) {
		     set_lefttree(parentPlan, scantempNode);
		    }
	          else {
		     set_righttree(parentPlan, scantempNode);
		    }
	          set_fragment(scantempNode, get_fragment(parentPlan));
	          }
	       }
	   nproc += nparallel;
	 }
       }
	    
	/* ----------------
	 *	Clean Shared Memory used during the query
	 * ----------------
	 */
	ExecSMClean();
	
	/* ----------------
	 *	replace fragments with materialized results and
	 *	return new plan to ProcessQuery.
	 * ----------------
	 */
	return rootFragment;
    }
}

d40 2
a41 1
FindFragments(node, fragmentNo)
d57 1
d70 1
d82 2
a83 2
InitialPlanFragments(originalPlan)
Plan originalPlan;
d95 1
a95 1
    set_frag_root(rootFragment, originalPlan);
d100 1
d109 1
a109 1
	   subFragments = FindFragments(node, fragmentNo++);
d220 1
a220 1
SetParallelDegree(fragmentlist, loadavg)
d222 1
a222 1
float loadavg;
a225 1
    int nslaves;
a228 1
    nslaves = GetNumberSlaveBackends();
d239 2
a240 2
           set_frag_parallel(fragment, nslaves);  /* YYY */
           set_plan_parallel(plan, nslaves);
d256 2
a257 3
ParallelOptimize(queryDesc, planFragments)
List		queryDesc;
Fragment	planFragments;
d259 2
d264 1
d266 10
a275 5
    memAvail = GetCurrentMemSize();
    fireFragments = ChooseFragments(planFragments, memAvail);
    loadAvg = GetCurrentLoadAverage();
    SetParallelDegree(fireFragments, loadAvg);
    return fireFragments;
d284 225
a508 1
    return (nextHashTableMemoryKey++ + MINHASHTABLEMEMORYKEY);
@


1.14
log
@changes to save plan copying in communication between master backend
and slave backend
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.13 91/02/27 16:04:17 cimarron Exp Locker: hong $
d29 1
d31 1
a31 1
 RcsId("$Header: RCS/pfrag.c,v 1.13 91/02/27 16:04:17 cimarron Exp Locker: hong $");
@


1.13
log
@added a fix whem memory debugging
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.12 91/02/27 14:32:03 hong Exp Locker: cimarron $
d30 1
a30 1
 RcsId("$Header: RCS/pfrag.c,v 1.12 91/02/27 14:32:03 hong Exp Locker: cimarron $");
d42 1
d56 1
a60 1
    Plan		shmPlan;
d102 1
d127 4
d133 2
a134 4
	   for (i=0; i<nparallel; i++) {
	      SlaveQueryDescsP[nproc+i] = (Pointer)
	    		CopyObjectUsing(fragQueryDesc, ExecSMAlloc);
	     }
d136 1
d150 2
a151 1
	ProcessQueryDesc((List)SlaveQueryDescsP[0]);
a201 1
		     shmPlan = QdGetPlan((List)SlaveQueryDescsP[nproc+i]);
d203 1
a203 1
					  get_retstate(shmPlan));
@


1.12
log
@to conform with Cim's new destination stuff
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.11 90/11/12 08:16:58 hong Exp Locker: hong $
d30 1
a30 1
 RcsId("$Header: RCS/pfrag.c,v 1.11 90/11/12 08:16:58 hong Exp Locker: hong $");
d199 1
d202 4
@


1.11
log
@now hashjoin only print the result relation schema once
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.10 90/11/10 12:29:23 hong Exp $
d28 1
d30 1
a30 1
 RcsId("$Header: RCS/pfrag.c,v 1.10 90/11/10 12:29:23 hong Exp $");
d73 1
d106 1
d119 1
d121 1
a121 1
	   fragQueryDesc = CreateQueryDesc(parseTree, plan);
@


1.10
log
@changes for parallel hashjoin
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.9 90/11/07 12:06:50 hong Exp Locker: hong $
d29 1
a29 1
 RcsId("$Header: RCS/pfrag.c,v 1.9 90/11/07 12:06:50 hong Exp Locker: hong $");
d111 1
@


1.9
log
@changes for parallel hashjoin.
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.8 90/10/02 14:38:43 hong Exp Locker: hong $
d29 1
a29 1
 RcsId("$Header: RCS/pfrag.c,v 1.8 90/10/02 14:38:43 hong Exp Locker: hong $");
d69 3
d106 2
d110 1
d166 2
d466 8
@


1.8
log
@a fix so retrieve into will work correctly if run in parallel
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.7 90/10/02 12:26:10 hong Exp $
d27 1
d29 1
a29 1
 RcsId("$Header: RCS/pfrag.c,v 1.7 90/10/02 12:26:10 hong Exp $");
d103 2
a104 5
	      int nbatch;
	      /*
	      nbatch = ExecHashPartition(plan);
	      hashtable = ExecHashTableCreate(plan, nbatch);
	      set_hashjointable(plan, hashtable);  YYY */
d159 2
a160 2
	       /* set_hashjointable(parentPlan, hashtable);  */
	       /* YYY more info. needed. */
@


1.7
log
@added comments where more comments are needed
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.6 90/09/18 22:03:44 hong Exp Locker: cimarron $
a19 3

/* XXX had to break up executor.h, otherwise symbol table full */

d28 1
a28 1
 RcsId("$Header: RCS/pfrag.c,v 1.6 90/09/18 22:03:44 hong Exp Locker: cimarron $");
d33 1
a33 2
 *	This is only executed from the Master backend.  Remember the
 *	slave backends have their own special processing loop.
d35 2
a36 6
 *	In the single backend case, this just executes the planned
 *	query.  In the multiple backend case, this divides the query
 *	plan into plan fragments and copys them into shared memory.
 *	Each backend process then goes to work on its fragment.
 *
 *	Read the comments for ProcessQuery() below...
d46 3
a48 3
    List 	queryDesc;
    List 	fragmentlist;
    Fragment	rootFragment;
d67 1
a82 1
	
d86 1
a86 3
	 *   in shared memory, signal slave-backend execution and
	 *   go to work on our fragment.
	 *
d90 1
d96 17
d114 10
a123 27
	    /* ----------------
	     *	XXX what are we doing here? (comment me)
	     * ----------------
	     */
	    fragment = 	(Fragment) CAR(x);
	    nparallel = get_frag_parallel(fragment);
	    plan = 	get_frag_root(fragment);
	    parseTree = QdGetParseTree(queryDesc);
	    
	    parse_tree_result_relation(parseTree) = LispNil;
	    
	    /* ----------------
	     *	XXX what are we doing here? (comment me)
	     *  XXX why are we treating hash nodes specially here?
	     * ----------------
	     */
	    if (ExecIsHash(plan))  {
		int nbatch;
		nbatch =    ExecHashPartition(plan);
		hashtable = (HashJoinTable) ExecHashTableCreate(plan, nbatch);
		/* set_hashjointable(plan, hashtable);  YYY */
	    }
	    else if (fragment != rootFragment || nparallel > 1) {
		parse_tree_result_relation(parseTree) =
		    lispCons(lispAtom("intotemp"), LispNil);
	    }
	    fragQueryDesc = CreateQueryDesc(parseTree, plan);
d125 1
a125 10
	    /* ----------------
	     *	place fragments in shared memory here.  
	     * ----------------
	     */
	    for (i=0; i<nparallel; i++) {
		SlaveQueryDescsP[nproc+i] = (Pointer)
		    CopyObjectUsing(fragQueryDesc, ExecSMAlloc);
	    }
	    nproc += nparallel;
       }
d128 1
a128 3
	 *  all the fragments are in shared memory now.. 
	 *  signal slave execution start, and then go to work
	 *  ourselves.
d139 1
a139 1
	 *  now that we're done, let's wait for slaves to complete execution
a145 4
	/* ----------------
	 *	XXX what are we doing here? (comment me)
	 * ----------------
	 */
d148 22
a169 14
	    fragment = 		(Fragment) CAR(x);
	    nparallel = 		get_frag_parallel(fragment);
	    plan = 		get_frag_root(fragment);
	    parentPlan = 	get_frag_parent_op(fragment);
	    parentFragment = get_frag_parent_frag(fragment);
	   
	    if (parentFragment == NULL)
		subtrees = LispNil;
	    else {
		subtrees = get_frag_subtrees(parentFragment);
		set_frag_subtrees(parentFragment,
				  set_difference(subtrees,
						 lispCons(fragment, LispNil)));
	   }
d171 5
a175 26
	    /* ----------------
	     *	XXX what are we doing here? (comment me)
	     *  XXX why are hash nodes treated special?
	     * ----------------
	     */
	    if (ExecIsHash(plan)) {
		/* set_hashjointable(parentPlan, hashtable);  */
		/* YYY more info. needed. */
	    } else {
		List unionplans = LispNil;
	       
		/* ----------------
		 *	XXX what are we doing here? (comment me)
		 *	XXX why are we treating ScanTemp nodes special?
		 * ----------------
		 */
		if (ExecIsScanTemps(plan)) {
		    Relation 	tempreldesc;
		    List	tempRelDescs;
		    LispValue 	y;

		    tempRelDescs = get_temprelDescs(plan);
		    foreach (y, tempRelDescs) {
			tempreldesc = (Relation)CAR(y);
			ReleaseTmpRelBuffers(tempreldesc);
			if (FileNameUnlink(
d177 1
a177 1
			    elog(WARN, "ExecuteFragments: unlink: %m");
d179 21
a199 19
		}
		
		/* ----------------
		 *	XXX what are we doing here? (comment me)
		 * ----------------
		 */
		if (parentPlan == NULL && nparallel == 1)
		    rootFragment = NULL;
		else {
		    tempRelationDescList = LispNil;
		    for (i=0; i<nparallel; i++) {
			shmPlan = QdGetPlan((List)SlaveQueryDescsP[nproc+i]);
			shmTempRelationDesc=
			    get_resultTmpRelDesc(get_retstate(shmPlan));

			tempRelationDesc =
			    CopyRelDescUsing(shmTempRelationDesc, palloc);
			tempRelationDescList =
			    nappend1(tempRelationDescList, tempRelationDesc);
d201 3
a203 21
		    scantempNode = RMakeScanTemps();
		    set_qptargetlist(scantempNode, get_qptargetlist(plan));
		    set_temprelDescs(scantempNode, tempRelationDescList);

		    /* ----------------
		     *	XXX what are we doing here? (comment me)
		     *  XXX why are we getting very deep in if statements?
		     * ----------------
		     */
		    if (parentPlan == NULL) {
			set_frag_root(rootFragment, scantempNode);
			set_frag_subtrees(rootFragment, LispNil);
			set_fragment(scantempNode,-1); /*means end of parallelism */
		    } else {
			if (plan == (Plan) get_lefttree(parentPlan)) {
			    set_lefttree(parentPlan, scantempNode);
			}
			else {
			    set_righttree(parentPlan, scantempNode);
			}
			set_fragment(scantempNode, get_fragment(parentPlan));
d205 9
a213 4
		}
		nproc += nparallel;
	    }
	}
a229 6
/* --------------------------------
 *	FindFragments
 *
 *	XXX comment me
 * --------------------------------
 */
d232 2
a233 2
    Plan node;
    int fragmentNo;
a238 5
    /* ----------------
     *	XXX what are we doing here? (comment me)
     *  XXX why are hash/sort nodes special
     * ----------------
     */
d240 8
a247 12
    if (get_lefttree(node) != NULL) {
	if (ExecIsHash(get_lefttree(node)) ||
	    ExecIsSort(get_lefttree(node))) {
	    
	    fragment = RMakeFragment();
	    set_frag_root(fragment, get_lefttree(node));
	    set_frag_parent_op(fragment, node);
	    set_frag_parallel(fragment, 1);
	    set_frag_subtrees(fragment, LispNil);
	    subFragments = nappend1(subFragments, fragment);
        } else {
	    subFragments = FindFragments(get_lefttree(node), fragmentNo);
d249 2
a250 22
    }
    
    /* ----------------
     *	XXX what are we doing here? (comment me)
     *  XXX why are hash/sort nodes in the right tree special?
     *  XXX why is this code practically identical to the previous code?
     * ----------------
     */
    if (get_righttree(node) != NULL) {
	if (ExecIsHash(get_righttree(node)) ||
	    ExecIsSort(get_righttree(node))) {
	    
	    fragment = RMakeFragment();
	    set_frag_root(fragment, get_righttree(node));
	    set_frag_parent_op(fragment, node);
	    set_frag_parallel(fragment, 1);
	    set_frag_subtrees(fragment, LispNil);
	    subFragments = nappend1(subFragments, fragment);
	}
	else {
	    newFragments = FindFragments(get_righttree(node), fragmentNo);
	    subFragments = nconc(subFragments, newFragments);
d252 13
a264 1
    }
a268 6
/* --------------------------------
 *	InitialPlanFragments(originalPlan)
 *
 *	XXX comment me
 * --------------------------------
 */
d271 1
a271 1
    Plan originalPlan;
d273 8
a280 8
    Plan 	node;
    List 	x, y;
    List 	fragmentlist;
    List 	subFragments;
    List 	newFragmentList;
    int 	fragmentNo = 0;
    Fragment 	rootFragment;
    Fragment 	fragment, frag;
a281 4
    /* ----------------
     *	XXX what are we doing here? (comment me)
     * ----------------
     */
a282 1
    
a290 4
    /* ----------------
     *	XXX what are we doing here? (comment me)
     * ----------------
     */
a292 1
	
d294 10
a303 12
	    fragment = 		(Fragment)CAR(x);
	    node = 		get_frag_root(fragment);
	    
	    subFragments = 	FindFragments(node, fragmentNo++);
	    
	    set_frag_subtrees(fragment, subFragments);
	    foreach (y, subFragments) {
		frag = (Fragment)CAR(y);
		set_frag_parent_frag(frag, fragment);
	    }
	    newFragmentList = nconc(newFragmentList, subFragments);
	}
d305 1
a305 2
    }
    
a308 5
/* --------------------------------
 *	GetCurrentMemSize
 * --------------------------------
 */

d314 1
a314 5
    /* ----------------
     *	XXX comment me (what will be added later???)
     * ----------------
     */
    return NBuffers;  /* YYY functionalities to be added later */
a316 4
/* --------------------------------
 *	GetCurrentLoadAverage
 * --------------------------------
 */
a319 4
    /* ----------------
     *	XXX comment me (what will be added later???)
     * ----------------
     */
a322 4
/* --------------------------------
 *	GetReadyFragments
 * --------------------------------
 */
d327 4
a330 4
    List 	x;
    Fragment 	frag;
    List 	readyFragments = LispNil;
    List 	readyFrags;
a331 4
    /* ----------------
     *	XXX what are we doing here? (comment me)
     * ----------------
     */
d333 1
a333 6
	return lispCons(fragments, LispNil);
    
    /* ----------------
     *	XXX what are we doing here? (comment me)
     * ----------------
     */
d338 1
a338 2
    }
    
a341 4
/* --------------------------------
 *	GetFitFragments(fragmentlist, memsize)
 * --------------------------------
 */
d344 2
a345 2
    List fragmentlist;
    int memsize;
d347 1
a347 7
    /* YYY functionalities to be added later */
    /* ----------------
     *	XXX comment me (what will be added later???)
     * ----------------
     */
    
    return fragmentlist;
a349 4
/* --------------------------------
 *	DecomposeFragments(fragmentlist, memsize)
 * --------------------------------
 */
d352 2
a353 2
    List fragmentlist;
    int memsize;
d355 1
a355 7
    /* YYY functionalities to be added later */
    /* ----------------
     *	XXX comment me (what will be added later???)
     * ----------------
     */
    
    return fragmentlist; 
a357 6
/* --------------------------------
 *	ChooseToFire
 *
 *	XXX comment me
 * --------------------------------
 */
d360 2
a361 2
    List fragmentlist;
    int memsize;
d363 1
a364 8
    /* ----------------
     *	XXX comment me (what will be added later???)
     * ----------------
     */
    
    return
	lispCons(CAR(fragmentlist), LispNil);   
    
a366 6
/* --------------------------------
 *	ChooseFragments
 *
 *	XXX comment me
 * --------------------------------
 */
d369 2
a370 2
    Fragment fragments;
    int memsize;
a375 4
    /* ----------------
     *	XXX what are we doing here? (comment me)
     * ----------------
     */
a377 6
    
    /* ----------------
     *	XXX what are we doing here? (comment me)
     *  XXX what does this have to do with hash joins?
     * ----------------
     */
d381 2
a382 3
	    elog(WARN, "memory below hashjoin threshold.");
    }
    
a383 1
    
a386 6
/* --------------------------------
 *	set_plan_parallel
 *
 *	XXX comment me
 * --------------------------------
 */
d389 2
a390 2
    Plan plan;
    int nparallel;
a393 6

    /* ----------------
     *	XXX what are we doing here? (comment me)
     *  XXX why are we treating nest loops specially
     * ----------------
     */    
d395 9
a403 8
	set_parallel(plan, nparallel);
	set_plan_parallel(get_outerPlan(plan), nparallel);
	set_plan_parallel(get_innerPlan(plan), 1);
    } else  {
	set_parallel(plan, nparallel);
	set_plan_parallel(get_lefttree(plan), nparallel);
	set_plan_parallel(get_righttree(plan), nparallel);
    }
a405 6
/* --------------------------------
 *	SetParallelDegree
 *
 *	XXX comment me
 * --------------------------------
 */
d408 2
a409 2
    List fragmentlist;
    float loadavg;
d411 5
a415 5
    LispValue 	x;
    Fragment 	fragment;
    int 	nslaves;
    Plan 	plan;
    int 	fragmentno;
a416 4
    /* ----------------
     *	XXX what are we doing here? (comment me)
     * ----------------
     */    
a418 1
    
d420 12
a431 16
	fragment = 	(Fragment)CAR(x);
	plan = 		get_frag_root(fragment);
	fragmentno = 	get_fragment(plan);
	
	/* ----------------
	 *	XXX what are we doing here? (comment me)
	 * ----------------
	 */    
	if (fragmentno < 0) {
	    set_frag_parallel(fragment, 1);  /* YYY */
	    set_plan_parallel(plan, 1);
	} else {
	    set_frag_parallel(fragment, nslaves);  /* YYY */
	    set_plan_parallel(plan, nslaves);
	}
    }
d435 1
a435 1
/* --------------------------------
d442 1
a442 1
 * --------------------------------
d446 2
a447 2
    List	queryDesc;
    Fragment	planFragments;
d449 3
a451 3
    int 	memAvail;
    float	loadAvg;
    List 	fireFragments;
d453 3
a455 4
    memAvail = 		GetCurrentMemSize();
    fireFragments = 	ChooseFragments(planFragments, memAvail);
    loadAvg = 		GetCurrentLoadAverage();
    
a456 1
    
@


1.6
log
@cleaned up file interface
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.5 90/09/10 20:43:41 hong Exp Locker: hong $
d20 1
d22 1
d31 1
a31 1
 RcsId("$Header: RCS/pfrag.c,v 1.5 90/09/10 20:43:41 hong Exp Locker: hong $");
d36 8
a44 3
 *
 *	Since we do no parallism, planFragments is totally
 *	ignored for now.. -cim 9/18/89
d54 3
a56 3
List 	queryDesc;
List 	fragmentlist;
Fragment rootFragment;
d90 1
d94 3
a96 1
	 *   in shared memory and signal slave-backend execution.
a99 1
	 *
a104 16
	   fragment = (Fragment)CAR(x);
	   nparallel = get_frag_parallel(fragment);
	   plan = get_frag_root(fragment);
	   parseTree = QdGetParseTree(queryDesc);
	   parse_tree_result_relation(parseTree) = LispNil;
	   if (ExecIsHash(plan))  {
	      int nbatch;
	      nbatch = ExecHashPartition(plan);
	      hashtable = ExecHashTableCreate(plan, nbatch);
	      /* set_hashjointable(plan, hashtable);  YYY */
	      }
	   else if (fragment != rootFragment || nparallel > 1) {
	      parse_tree_result_relation(parseTree) =
		  lispCons(lispAtom("intotemp"), LispNil);
	     }
	   fragQueryDesc = CreateQueryDesc(parseTree, plan);
d106 27
a132 10
	/* ----------------
	 *	place fragments in shared memory here.  
	 * ----------------
	 */
	   for (i=0; i<nparallel; i++) {
	      SlaveQueryDescsP[nproc+i] = (Pointer)
	    		CopyObjectUsing(fragQueryDesc, ExecSMAlloc);
	     }
	   nproc += nparallel;
	  }
d134 11
d146 3
a148 1
	 *	signal slave execution start
d159 1
a159 1
	 *	wait for slaves to complete execution
d166 4
d172 14
a185 22
	   fragment = (Fragment)CAR(x);
	   nparallel = get_frag_parallel(fragment);
	   plan = get_frag_root(fragment);
	   parentPlan = get_frag_parent_op(fragment);
	   parentFragment = get_frag_parent_frag(fragment);
	   if (parentFragment == NULL)
	      subtrees = LispNil;
	   else {
	      subtrees = get_frag_subtrees(parentFragment);
	      set_frag_subtrees(parentFragment, set_difference(subtrees,
	                                     lispCons(fragment, LispNil)));
	    }
	   if (ExecIsHash(plan)) {
	       /* set_hashjointable(parentPlan, hashtable);  */
	       /* YYY more info. needed. */
	      }
   	   else {
	       List unionplans = LispNil;
	       if (ExecIsScanTemps(plan)) {
		   Relation tempreldesc;
		   List	tempRelDescs;
		   LispValue y;
d187 26
a212 5
		   tempRelDescs = get_temprelDescs(plan);
		   foreach (y, tempRelDescs) {
		       tempreldesc = (Relation)CAR(y);
		       ReleaseTmpRelBuffers(tempreldesc);
		       if (FileNameUnlink(
d214 1
a214 1
			   elog(WARN, "ExecEndScanTemp: unlink: %m");
d216 19
a234 21
		 }
	       if (parentPlan == NULL && nparallel == 1)
		  rootFragment = NULL;
	       else {
		  tempRelationDescList = LispNil;
		  for (i=0; i<nparallel; i++) {
		     shmPlan = QdGetPlan((List)SlaveQueryDescsP[nproc+i]);
		     shmTempRelationDesc=get_resultTmpRelDesc(
					  get_retstate(shmPlan));
		     tempRelationDesc = CopyRelDescUsing(shmTempRelationDesc,
							 palloc);
		     tempRelationDescList = nappend1(tempRelationDescList, 
						     tempRelationDesc);
		     }
		  scantempNode = RMakeScanTemps();
		  set_qptargetlist(scantempNode, get_qptargetlist(plan));
		  set_temprelDescs(scantempNode, tempRelationDescList);
		  if (parentPlan == NULL) {
		     set_frag_root(rootFragment, scantempNode);
		     set_frag_subtrees(rootFragment, LispNil);
		     set_fragment(scantempNode,-1);/*means end of parallelism */
d236 21
a256 3
	          else {
	          if (plan == (Plan)get_lefttree(parentPlan)) {
		     set_lefttree(parentPlan, scantempNode);
d258 4
a261 9
	          else {
		     set_righttree(parentPlan, scantempNode);
		    }
	          set_fragment(scantempNode, get_fragment(parentPlan));
	          }
	       }
	   nproc += nparallel;
	 }
       }
d278 6
d286 2
a287 2
Plan node;
int fragmentNo;
d293 5
d299 12
a310 8
    if (get_lefttree(node) != NULL) 
       if (ExecIsHash(get_lefttree(node)) || ExecIsSort(get_lefttree(node))) {
          fragment = RMakeFragment();
          set_frag_root(fragment, get_lefttree(node));
          set_frag_parent_op(fragment, node);
          set_frag_parallel(fragment, 1);
          set_frag_subtrees(fragment, LispNil);
          subFragments = nappend1(subFragments, fragment);
d312 22
a333 2
       else {
          subFragments = FindFragments(get_lefttree(node), fragmentNo);
d335 1
a335 13
    if (get_righttree(node) != NULL)
       if (ExecIsHash(get_righttree(node)) || ExecIsSort(get_righttree(node))) {
          fragment = RMakeFragment();
          set_frag_root(fragment, get_righttree(node));
          set_frag_parent_op(fragment, node);
          set_frag_parallel(fragment, 1);
          set_frag_subtrees(fragment, LispNil);
          subFragments = nappend1(subFragments, fragment);
         }
       else {
         newFragments = FindFragments(get_righttree(node), fragmentNo);
         subFragments = nconc(subFragments, newFragments);
        }
d340 6
d348 1
a348 1
Plan originalPlan;
d350 8
a357 8
    Plan node;
    LispValue x, y;
    List fragmentlist;
    List subFragments;
    List newFragmentList;
    int fragmentNo = 0;
    Fragment rootFragment;
    Fragment fragment, frag;
d359 4
d364 1
d373 4
d379 1
d381 12
a392 10
	   fragment = (Fragment)CAR(x);
	   node = get_frag_root(fragment);
	   subFragments = FindFragments(node, fragmentNo++);
	   set_frag_subtrees(fragment, subFragments);
	   foreach (y, subFragments) {
	      frag = (Fragment)CAR(y);
	      set_frag_parent_frag(frag, fragment);
	     }
	   newFragmentList = nconc(newFragmentList, subFragments);
	  }
d394 2
a395 1
      }
d399 5
d409 5
a413 1
   return NBuffers;  /* YYY functionalities to be added later */
d416 4
d423 4
d430 4
d438 4
a441 4
    LispValue x;
    Fragment frag;
    List readyFragments = LispNil;
    List readyFrags;
d443 4
d448 6
a453 1
       return lispCons(fragments, LispNil);
d458 2
a459 1
      }
d463 4
d469 2
a470 2
List fragmentlist;
int memsize;
d472 7
a478 1
    return fragmentlist; /* YYY functionalities to be added later */
d481 4
d487 2
a488 2
List fragmentlist;
int memsize;
d490 7
a496 1
    return fragmentlist;  /* YYY functionalities to be added later */
d499 6
d507 2
a508 2
List fragmentlist;
int memsize;
a509 1
    return lispCons(CAR(fragmentlist), LispNil);   
d511 8
d521 6
d529 2
a530 2
Fragment fragments;
int memsize;
d536 4
d542 6
d551 3
a553 2
	   elog(WARN, "memory below hashjoin threshold.");
      }
d555 1
d559 6
d567 2
a568 2
Plan plan;
int nparallel;
d572 6
d579 8
a586 9
       set_parallel(plan, nparallel);
       set_plan_parallel(get_outerPlan(plan), nparallel);
       set_plan_parallel(get_innerPlan(plan), 1);
      }
    else  {
       set_parallel(plan, nparallel);
       set_plan_parallel(get_lefttree(plan), nparallel);
       set_plan_parallel(get_righttree(plan), nparallel);
      }
d589 6
d597 2
a598 2
List fragmentlist;
float loadavg;
d600 5
a604 5
    LispValue x;
    Fragment fragment;
    int nslaves;
    Plan plan;
    int fragmentno;
d606 4
d612 1
d614 16
a629 12
       fragment = (Fragment)CAR(x);
       plan = get_frag_root(fragment);
       fragmentno = get_fragment(plan);
       if (fragmentno < 0) {
           set_frag_parallel(fragment, 1);  /* YYY */
           set_plan_parallel(plan, 1);
	  }
       else {
           set_frag_parallel(fragment, nslaves);  /* YYY */
           set_plan_parallel(plan, nslaves);
	 }
     }
d633 1
a633 1
/* ----------------------------------------------------------------
d640 1
a640 1
 * ----------------------------------------------------------------
d644 2
a645 2
List		queryDesc;
Fragment	planFragments;
d647 3
a649 3
    int memAvail;
    float loadAvg;
    List fireFragments;
d651 4
a654 3
    memAvail = GetCurrentMemSize();
    fireFragments = ChooseFragments(planFragments, memAvail);
    loadAvg = GetCurrentLoadAverage();
d656 1
@


1.5
log
@let the master also do some slave work instead of sitting idle
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.4 90/09/06 16:57:50 hong Exp $
d29 1
a29 1
 RcsId("$Header: RCS/pfrag.c,v 1.4 90/09/06 16:57:50 hong Exp $");
d172 2
a173 1
		       if (unlink(relpath(&(tempreldesc->rd_rel->relname))) < 0)
@


1.4
log
@turned off debugging messages
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.3 90/09/06 14:44:04 hong Exp $
d29 1
a29 1
 RcsId("$Header: RCS/pfrag.c,v 1.3 90/09/06 14:44:04 hong Exp $");
d128 1
a128 1
	for (i=0; i<nproc; i++) {
d133 2
d140 1
a140 1
	P_Finished(nproc);
@


1.3
log
@more towards working
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.2 90/08/22 11:14:54 hong Exp Locker: hong $
d19 1
d29 1
a29 1
 RcsId("$Header: RCS/pfrag.c,v 1.2 90/08/22 11:14:54 hong Exp Locker: hong $");
d129 1
a129 1
	    elog(DEBUG, "Master Backend: signaling slave %d", i);
d137 1
a137 1
	elog(DEBUG, "Master Backend: waiting for slaves...");
d139 1
a139 1
	elog(DEBUG, "Master Backend: slaves execution complete!");
@


1.2
log
@temporary checking
@
text
@d14 1
a14 1
 *	$Header: RCS/pfrag.c,v 1.1 90/08/17 13:28:20 hong Exp Locker: hong $
d25 1
d28 1
a28 4
/* XXX the following #define's are copied from executor.h */
#define ExecIsNestLoop(node)    IsA(node,NestLoop)
#define ExecIsHash(node)        IsA(node,Hash)
#define ExecIsSort(node)        IsA(node,Sort)
a29 15
#define QdGetParseTree(queryDesc)    (List) CAR(CDR(queryDesc))
#define QdGetPlan(queryDesc)         (Plan) CAR(CDR(CDR(queryDesc)))

#define parse_tree_root(parse_tree) \
    CAR(parse_tree)

#define parse_tree_root_result_relation(parse_tree_root) \
    CAR(CDR(CDR(parse_tree_root)))

#define parse_tree_result_relation(parse_tree) \
    parse_tree_root_result_relation(parse_tree_root(parse_tree))


 RcsId("$Header: RCS/pfrag.c,v 1.1 90/08/17 13:28:20 hong Exp Locker: hong $");

d40 1
a40 2
extern ScanTemp RMakeScanTemp();
extern Append RMakeAppend();
d42 1
d63 1
a63 1
    ScanTemp		scantempNode;
d65 2
a66 1
    Append		appendNode;
d99 1
d137 1
a137 1
	P_Finished();
d160 13
d176 1
d179 6
a184 4
		     tempRelationDesc=get_resultTmpRelDesc(get_retstate(shmPlan));
		     scantempNode = RMakeScanTemp();
		     set_temprelDesc(scantempNode, tempRelationDesc);
		     unionplans = nappend1(unionplans, scantempNode);
d186 3
a188 2
		  appendNode = RMakeAppend();
		  set_unionplans(appendNode, unionplans);
d190 1
a190 1
		     set_frag_root(rootFragment, appendNode);
d192 1
a192 1
		     set_fragment(appendNode,-1); /* means end of parallelism */
d196 1
a196 1
		     set_lefttree(parentPlan, appendNode);
d199 1
a199 1
		     set_righttree(parentPlan, appendNode);
d201 1
a201 1
	          set_fragment(appendNode, get_fragment(parentPlan));
@


1.1
log
@Initial revision
@
text
@d14 1
a14 1
 *	$Header: RCS/pquery.c,v 1.28 90/05/25 10:40:07 cimarron Exp Locker: hong $
d45 1
a45 1
 RcsId("$Header: RCS/pquery.c,v 1.28 90/05/25 10:40:07 cimarron Exp Locker: hong $");
d162 5
a166 2
	   subtrees = get_frag_subtrees(parentFragment);
	   set_frag_subtrees(parentFragment, set_difference(subtrees,
d168 1
d175 16
a190 17
	       for (i=0; i<nparallel; i++) {
		  shmPlan = QdGetPlan((List)SlaveQueryDescsP[nproc+i]);
		  /* YYY require EState be put in shared memory */
		  tempRelationDesc=get_resultTmpRelDesc(get_retstate(shmPlan));
		  scantempNode = RMakeScanTemp();
		  set_temprelDesc(scantempNode, tempRelationDesc);
		  unionplans = nappend1(unionplans, scantempNode);
		  }
	       appendNode = RMakeAppend();
	       set_unionplans(appendNode, unionplans);
	       if (parentPlan == NULL) {
		   if (nparallel == 1)
		      rootFragment = NULL;
		   else {
		      set_frag_root(rootFragment, appendNode);
		      set_frag_subtrees(rootFragment, LispNil);
		      set_fragment(appendNode, 0);
d192 9
a200 9
		  }
	       else {
	       if (plan == (Plan)get_lefttree(parentPlan)) {
		  set_lefttree(parentPlan, appendNode);
		 }
	       else {
		  set_righttree(parentPlan, appendNode);
		 }
	       set_fragment(appendNode, get_fragment(parentPlan));
a201 1
	     }
d204 1
d406 1
d413 9
a421 2
       set_frag_parallel(fragment, 1);  /* YYY */
       set_plan_parallel(plan, nslaves);
@
