head 1.24; access; symbols Version_2_1:1.6 Version_2:1.3; locks; strict; comment @ * @; 1.24 date 92.07.06.05.03.08; author mao; state Exp; branches; next 1.23; 1.23 date 92.03.31.23.12.15; author mer; state Exp; branches; next 1.22; 1.22 date 92.01.21.16.44.25; author hong; state Exp; branches; next 1.21; 1.21 date 91.11.11.23.00.24; author hong; state Exp; branches; next 1.20; 1.20 date 91.11.09.09.50.34; author hong; state Exp; branches; next 1.19; 1.19 date 91.11.02.22.02.26; author hong; state Exp; branches; next 1.18; 1.18 date 91.10.25.16.22.05; author hong; state Exp; branches; next 1.17; 1.17 date 91.10.09.14.52.07; author hong; state Exp; branches; next 1.16; 1.16 date 91.10.08.01.15.07; author hong; state Exp; branches; next 1.15; 1.15 date 91.08.27.14.19.11; author hong; state Exp; branches; next 1.14; 1.14 date 91.08.25.13.35.20; author hong; state Exp; branches; next 1.13; 1.13 date 91.07.31.12.37.57; author hong; state Exp; branches; next 1.12; 1.12 date 91.07.24.16.14.49; author hong; state Exp; branches; next 1.11; 1.11 date 91.07.19.15.12.21; author hong; state Exp; branches; next 1.10; 1.10 date 91.07.17.23.37.44; author hong; state Exp; branches; next 1.9; 1.9 date 91.05.31.10.55.02; author hong; state Exp; branches; next 1.8; 1.8 date 91.04.18.11.39.44; author hong; state Exp; branches; next 1.7; 1.7 date 91.04.04.13.51.16; author hong; state Exp; branches; next 1.6; 1.6 date 90.09.10.20.45.02; author hong; state Exp; branches; next 1.5; 1.5 date 90.08.18.14.16.20; author cimarron; state Exp; branches; next 1.4; 1.4 date 90.08.14.16.20.21; author cimarron; state Exp; branches; next 1.3; 1.3 date 90.05.25.10.36.35; author cimarron; state Version_2; branches; next 1.2; 1.2 date 90.03.31.19.01.51; author cimarron; state Exp; branches; next 1.1; 1.1 date 90.03.12.16.27.18; author cimarron; state Exp; branches; next ; desc @Integration Checkin @ 1.24 log @support for 'returntype = setof typename' in function definition @ text @/* ---------------------------------------------------------------- * FILE * slaves.c * * DESCRIPTION * slave backend management routines * * INTERFACE ROUTINES * SlaveMain() * SlaveBackendsInit() * SlaveBackendsAbort() * * NOTES * * IDENTIFICATION * $Header: /private/mao/postgres/src/tcop/RCS/slaves.c,v 1.23 1992/03/31 23:12:15 mer Exp $ * ---------------------------------------------------------------- */ #include #include #include "tmp/postgres.h" RcsId("$Header: /private/mao/postgres/src/tcop/RCS/slaves.c,v 1.23 1992/03/31 23:12:15 mer Exp $"); /* ---------------- * FILE INCLUDE ORDER GUIDELINES * * 1) tcopdebug.h * 2) various support files ("everything else") * 3) node files * 4) catalog/ files * 5) execdefs.h and execmisc.h, if necessary. * 6) extern files come last. * ---------------- */ #include "tcop/tcopdebug.h" #include "nodes/plannodes.h" #include "nodes/plannodes.a.h" #include "nodes/execnodes.h" #include "executor/execdesc.h" #include "tcop/dest.h" #include "tcop/pquery.h" #include "tcop/slaves.h" #include "access/xact.h" #include "utils/log.h" #include "catalog/syscache.h" /* ---------------- * parallel state variables * ---------------- */ /* * local data structures */ extern int MyPid; /* int representing the process id, defined in execipc.c */ static ProcessNode *SlaveArray, *FreeSlaveP; int NumberOfFreeSlaves; ProcGroupLocalInfo ProcGroupLocalInfoP; /* process group local info */ static ProcGroupLocalInfo FreeProcGroupP; extern SlaveLocalInfoData SlaveLocalInfoD; /* defined in execipc.c */ extern int AdjustParallelismEnabled; FILE *StatFp; static bool RestartForParAdj = false; /* indicating that the longjmp to SlaveRestartPoint is for paradj */ static List QueryDesc; /* * shared data structures */ int *MasterProcessIdP; /* master backend process id */ int *SlaveAbortFlagP; /* flag set during a transaction abort */ extern SlaveInfo SlaveInfoP; /* slave backend info */ extern ProcGroupInfo ProcGroupInfoP; /* process group info */ /* defined in execipc.c to make postmaster happy */ TransactionState SharedTransactionState; /* current transaction info */ #define CONDITION_NORMAL 0 #define CONDITION_ABORT 1 /* -------------------------------- * SendAbortSignals * * This sends a SIGHUP to every other backend in order * to cause them to preform their abort processing when * we discover a reason to abort the current transaction. * -------------------------------- */ void SendAbortSignals() { int nslaves; /* number of slaves */ int i; /* counter */ int p; /* process id */ #ifdef TCOP_SLAVESYNCDEBUG if (IsMaster) elog(DEBUG, "Master Backend sending abort signals"); else elog(DEBUG, "Slave Backend %d sending abort signals", MyPid); #endif TCOP_SLAVESYNCDEBUG nslaves = GetNumberSlaveBackends(); for (i=0; inext; NumberOfFreeSlaves--; return p->pid; } /* ------------------------ * freeSlave * * frees a slave to FreeSlaveP queue * increments NumberOfFreeSlaves * ------------------------ */ void freeSlave(i) int i; { SlaveArray[i].next = FreeSlaveP; FreeSlaveP = SlaveArray + i; SlaveInfoP[i].groupId = -1; SlaveInfoP[i].groupPid = -1; NumberOfFreeSlaves++; } /* ------------------------ * getFreeProcGroup * * get a free process group with nproc free slave processes * ------------------------ */ int getFreeProcGroup(nproc) int nproc; { ProcGroupLocalInfo p; ProcessNode *slavep; int i; int pid; p = FreeProcGroupP; FreeProcGroupP = p->nextfree; pid = getFreeSlave(); SlaveInfoP[pid].groupId = p->id; SlaveInfoP[pid].groupPid = 0; SlaveInfoP[pid].isAddOnSlave = false; SlaveInfoP[pid].isDone = false; p->memberProc = SlaveArray + pid; slavep = p->memberProc; for (i=1; iid; SlaveInfoP[pid].groupPid = i; SlaveInfoP[pid].isAddOnSlave = false; SlaveInfoP[pid].isDone = false; slavep->next = SlaveArray + pid; slavep = slavep->next; } slavep->next = NULL; return p->id; } /* -------------------------- * addSlaveToProcGroup * * add a free slave to an existing process group * -------------------------- */ void addSlaveToProcGroup(slave, group, groupid) int slave; int group; int groupid; { SlaveInfoP[slave].groupId = group; SlaveInfoP[slave].groupPid = groupid; SlaveInfoP[slave].isAddOnSlave = true; SlaveArray[slave].next = ProcGroupLocalInfoP[group].memberProc; ProcGroupLocalInfoP[group].memberProc = SlaveArray + slave; } /* ------------------------- * freeProcGroup * * frees a process group and all the slaves in the group * ------------------------- */ void freeProcGroup(gid) int gid; { ProcessNode *p, *nextp; p=ProcGroupLocalInfoP[gid].memberProc; while (p != NULL) { nextp = p->next; freeSlave(p->pid); p = nextp; } ProcGroupInfoP[gid].status = IDLE; ProcGroupLocalInfoP[gid].fragment = NULL; ProcGroupLocalInfoP[gid].nextfree = FreeProcGroupP; ProcGroupLocalInfoP[gid].resultTmpRelDescList = LispNil; FreeProcGroupP = ProcGroupLocalInfoP + gid; } /* ---------------------------- * getFinishedProcGroup * * walks the array of processes group and find the first * process group with status = FINISHED or PARADJPENDING * ----------------------------- */ int getFinishedProcGroup() { int i; for (i=0; inext) { V_Start(p->pid); } } /* --------------------------------- * signalProcGroup * * send a signal to a process group * --------------------------------- */ void signalProcGroup(groupid, sig) int groupid; int sig; { ProcessNode *p; for (p = ProcGroupLocalInfoP[groupid].memberProc; p != NULL; p = p->next) { kill(SlaveInfoP[p->pid].unixPid, sig); } } /* ------------------------------ * the following routines are a specialized memory allocator for * the process groups. they only supposed to be called by the master * backend. no mutex is done. * ------------------------------ */ static char *CurrentSMSegmentStart; static char *CurrentSMSegmentEnd; static int CurrentSMGroupid; static char *CurrentSMSegmentPointer; /* -------------------------------- * ProcGroupSMBeginAlloc * * begins shared memory allocation for process group * -------------------------------- */ void ProcGroupSMBeginAlloc(groupid) int groupid; { MemoryHeader mp; mp = ExecGetSMSegment(); ProcGroupLocalInfoP[groupid].groupSMQueue = mp; mp->next = NULL; CurrentSMSegmentStart = mp->beginaddr; CurrentSMSegmentEnd = CurrentSMSegmentStart + mp->size; CurrentSMGroupid = groupid; CurrentSMSegmentPointer = CurrentSMSegmentStart; } /* ------------------------------- * ProcGroupSMEndAlloc * * ends shared memory allocation for process group * frees the leftover memory from current memory segment * ------------------------------- */ void ProcGroupSMEndAlloc() { int usedsize; MemoryHeader mp; usedsize = CurrentSMSegmentPointer - CurrentSMSegmentStart; mp = ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue; CurrentSMSegmentStart=CurrentSMSegmentPointer=CurrentSMSegmentEnd = NULL; ExecSMSegmentFreeUnused(mp, usedsize); } /* -------------------------------- * ProcGroupSMAlloc * * allocate shared memory within a process group * if the current memory segment runs out, allocate a new segment * --------------------------------- */ char * ProcGroupSMAlloc(size) int size; { MemoryHeader mp; char *retP; while (CurrentSMSegmentPointer + size > CurrentSMSegmentEnd) { mp = ExecGetSMSegment(); if (mp == NULL) elog(WARN, "out of executor shared memory, got to die."); mp->next = ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue; ProcGroupLocalInfoP[CurrentSMGroupid].groupSMQueue = mp; CurrentSMSegmentStart = mp->beginaddr; CurrentSMSegmentEnd = CurrentSMSegmentStart + mp->size; CurrentSMSegmentPointer = CurrentSMSegmentStart; } retP = CurrentSMSegmentPointer; CurrentSMSegmentPointer = (char*)LONGALIGN(CurrentSMSegmentPointer + size); return retP; } /* ------------------------------- * ProcGroupSMClean * * frees the shared memory allocated for a process group * ------------------------------- */ void ProcGroupSMClean(groupid) int groupid; { MemoryHeader mp, nextp; mp = ProcGroupLocalInfoP[groupid].groupSMQueue; while (mp != NULL) { nextp = mp->next; ExecSMSegmentFree(mp); mp = nextp; } } /* ----------------------------- * the following routines are special functions for copying reldescs to * shared memory * ----------------------------- */ static char *SlaveTmpRelDescMemoryP; /* ------------------------------- * SlaveTmpRelDescInit * * initialize shared memory preallocated for temporary relation descriptors * ------------------------------- */ void SlaveTmpRelDescInit() { SlaveTmpRelDescMemoryP = (char*)SlaveInfoP[MyPid].resultTmpRelDesc; } /* ------------------------------- * SlaveTmpRelDescAlloc * * memory allocation for reldesc copying * Note: there is no boundary checking, so had better pre-allocate * enough memory! * ------------------------------- */ char * SlaveTmpRelDescAlloc(size) int size; { char *retP; retP = SlaveTmpRelDescMemoryP; SlaveTmpRelDescMemoryP = (char*)LONGALIGN(SlaveTmpRelDescMemoryP + size); return retP; } /* --------------------------------- * getProcGroupMaxPage * * find out the largest page number the slaves are scanning * used only after SIGPARADJ signal has been sent to the * process group. * --------------------------------- */ int getProcGroupMaxPage(groupid) int groupid; { ProcessNode *p; int maxpage = NULLPAGE; int page; for (p = ProcGroupLocalInfoP[groupid].memberProc; p != NULL; p = p->next) { #ifdef HAS_TEST_AND_SET S_LOCK(&(SlaveInfoP[p->pid].lock)); #endif page = SlaveInfoP[p->pid].curpage; if (page == NOPARADJ) maxpage = NOPARADJ; if (maxpage < page && maxpage != NOPARADJ) maxpage = page; } return maxpage; } /* --------------------------------------- * paradj_handler * * signal handler for dynamically adjusting degrees of parallelism * XXX only handle heap scan now. * --------------------------------------- */ int paradj_handler() { BlockNumber curpage; HeapTuple curtuple; ItemPointer tid; int groupid; SLAVE1_elog(DEBUG, "slave %d got SIGPARADJ", MyPid); if (SlaveInfoP[MyPid].isDone) { /* ----------------------- * this means that the whole job is almost done * no adjustment to parallelism should be made * ------------------------ */ SlaveInfoP[MyPid].curpage = NOPARADJ; curpage = NOPARADJ; } else if (!SlaveLocalInfoD.isworking || SlaveLocalInfoD.heapscandesc == NULL) { if (SlaveInfoP[MyPid].isAddOnSlave) { SlaveInfoP[MyPid].curpage = SlaveLocalInfoD.startpage; curpage = SlaveLocalInfoD.startpage; } else { SlaveInfoP[MyPid].curpage = NULLPAGE; curpage = NULLPAGE; } } else { curtuple = SlaveLocalInfoD.heapscandesc->rs_ctup; tid = &(curtuple->t_ctid); if (ItemPointerIsValid(tid)) { curpage = ItemPointerGetBlockNumber(tid); } else { curpage = NULLPAGE; } SlaveInfoP[MyPid].curpage = curpage; } #ifdef HAS_TEST_AND_SET S_UNLOCK(&(SlaveInfoP[MyPid].lock)); #endif SLAVE2_elog(DEBUG, "slave %d sending back curpage = %d", MyPid, curpage); groupid = SlaveInfoP[MyPid].groupId; MWaitOne(&(ProcGroupInfoP[groupid].m1lock)); SLAVE1_elog(DEBUG, "slave %d complete handshaking with master", MyPid); if (ProcGroupInfoP[groupid].paradjpage == NOPARADJ) { /* ---------------------- * this means that the master changed his/her mind * no adjustment to parallelism will be done * ---------------------- */ return; } SlaveLocalInfoD.paradjpending = true; SlaveLocalInfoD.paradjpage = ProcGroupInfoP[groupid].paradjpage; SlaveLocalInfoD.newparallel = ProcGroupInfoP[groupid].newparallel; return; } /* ------------------------------------ * paradj_nextpage * * check if parallelism adjustment point is reached, if so * figure out and return the next page to scan. * XXX only works for heapscan right now. * ------------------------------------- */ int paradj_nextpage(page, dir) int page; int dir; { if (SlaveLocalInfoD.paradjpending) { if (page >= SlaveLocalInfoD.paradjpage) { SLAVE2_elog(DEBUG, "slave %d adjusting page skip to %d", MyPid, SlaveLocalInfoD.newparallel); if (SlaveLocalInfoD.newparallel >= SlaveLocalInfoD.nparallel || SlaveInfoP[MyPid].groupPid < SlaveLocalInfoD.newparallel) { SlaveLocalInfoD.nparallel = SlaveLocalInfoD.newparallel; SlaveLocalInfoD.paradjpending = false; if (dir < 0) return SlaveLocalInfoD.paradjpage-SlaveInfoP[MyPid].groupPid; else return SlaveLocalInfoD.paradjpage+SlaveInfoP[MyPid].groupPid; } else { int groupid = SlaveInfoP[MyPid].groupId; Plan plan = QdGetPlan(QueryDesc); EState estate = (EState)get_state(plan); EndPlan(plan, estate); V_Finished(groupid, &(ProcGroupInfoP[groupid].dropoutcounter), PARADJPENDING); RestartForParAdj = true; SlaveRestart(); } } } else return NULLPAGE; } @ 1.23 log @change accessor functions into macros @ text @d16 1 a16 1 * $Header: /users/mer/pg/src/tcop/RCS/slaves.c,v 1.22 1992/01/21 16:44:25 hong Exp mer $ d24 1 a24 1 RcsId("$Header: /users/mer/pg/src/tcop/RCS/slaves.c,v 1.22 1992/01/21 16:44:25 hong Exp mer $"); d432 1 a432 1 SearchSysCacheTuple(ATTNAME, ""); d435 2 a436 2 SearchSysCacheTuple(LANNAME, ""); SearchSysCacheTuple(OPRNAME, ""); d438 1 a438 1 SearchSysCacheTuple(PRONAME, ""); d440 1 a440 1 SearchSysCacheTuple(RELNAME, ""); d442 1 a442 1 SearchSysCacheTuple(TYPNAME, ""); d444 2 a445 2 SearchSysCacheTuple(AMNAME, ""); SearchSysCacheTuple(CLANAME, ""); @ 1.22 log @changes for inter-fragment parallelism @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.21 91/11/11 23:00:24 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.21 91/11/11 23:00:24 hong Exp Locker: hong $"); d1038 1 a1038 1 EState estate = get_state(plan); @ 1.21 log @for prototyping @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.20 91/11/09 09:50:34 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.20 91/11/09 09:50:34 hong Exp Locker: hong $"); d978 6 a983 1 curpage = ItemPointerGetBlockNumber(tid); @ 1.20 log @added mechanism for reducing parallelism of a fragment @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.18 91/10/25 16:22:05 hong Exp $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.18 91/10/25 16:22:05 hong Exp $"); d38 6 a48 4 #include "executor/execdesc.h" #include "nodes/plannodes.h" #include "nodes/plannodes.a.h" #include "nodes/execnodes.h" a413 1 int paradj_handler(); /* intr handler for adjusting parallelism */ @ 1.19 log @default stat file to stderr @ text @d44 3 d62 4 a65 1 FILE *StatFp = stderr; a70 2 MasterCommunicationData *MasterDataP; /* communication data area for master backend */ a240 1 List queryDesc; d242 1 d256 1 a256 1 sprintf(fname, "/usr/tmp/slave%d.stat", MyPid); d266 9 a274 3 SlaveWarnings++; SLAVE1_elog(DEBUG, "Slave Backend %d SlaveBackendsAbort()", MyPid); d276 5 a280 5 SlaveBackendsAbort(); SLAVE1_elog(DEBUG, "Slave Backend %d SlaveBackendsAbort() done", MyPid); } d317 1 d320 2 a321 3 MasterDataP->data[0] : 0); SlaveLocalInfoD.nparallel = ProcGroupInfoP[SlaveInfoP[MyPid].groupId].nprocess; d333 1 a333 2 queryDesc = (List)CopyObject( (List)ProcGroupInfoP[SlaveInfoP[MyPid].groupId].queryDesc); d339 2 a340 2 if (queryDesc != NULL) ProcessQueryDesc(queryDesc); d371 1 a371 1 V_Finished(SlaveInfoP[MyPid].groupId); a472 2 MasterDataP = (MasterCommunicationData*)ExecSMReserve(sizeof(MasterCommunicationData)); a497 1 InitMWaitOneLock(&(MasterDataP->m1lock)); d517 2 a518 2 S_INIT_LOCK(&(SlaveInfoP[i].comdata.lock)); S_LOCK(&(SlaveInfoP[i].comdata.lock)); d522 2 a523 1 ProcGroupInfoP[i].countdown = 0; d525 2 a526 1 S_INIT_LOCK(&(ProcGroupInfoP[i].lock)); d528 2 d575 1 d617 1 a617 1 static void d705 1 d713 1 a713 1 * process group with status = FINISHED d722 2 a723 1 if (ProcGroupInfoP[i].status == FINISHED) d920 1 a920 1 int maxpage = -1; d927 1 a927 1 S_LOCK(&(SlaveInfoP[p->pid].comdata.lock)); d929 1 a929 1 page = SlaveInfoP[p->pid].comdata.data; d951 1 d960 1 a960 1 SlaveInfoP[MyPid].comdata.data = NOPARADJ; d966 1 a966 1 SlaveInfoP[MyPid].comdata.data = SlaveLocalInfoD.startpage; d970 1 a970 1 SlaveInfoP[MyPid].comdata.data = NULLPAGE; d978 1 a978 1 SlaveInfoP[MyPid].comdata.data = curpage; d981 1 a981 1 S_UNLOCK(&(SlaveInfoP[MyPid].comdata.lock)); d984 2 a985 1 MWaitOne(&(MasterDataP->m1lock)); d987 1 a987 1 if (MasterDataP->data[0] == NOPARADJ) { d996 2 a997 2 SlaveLocalInfoD.paradjpage = MasterDataP->data[0]; SlaveLocalInfoD.newparallel = MasterDataP->data[1]; d999 44 @ 1.18 log @added process affinity to slave backends print slave stats to stat file @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.17 91/10/09 14:52:07 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.17 91/10/09 14:52:07 hong Exp Locker: hong $"); d59 1 a59 1 FILE *StatFp; @ 1.17 log @fixed a bug in parallelism adjustment @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.16 91/10/08 01:15:07 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.16 91/10/08 01:15:07 hong Exp Locker: hong $"); d59 1 d239 1 d246 9 d301 2 d333 10 a361 1 SlaveInfoP[MyPid].isDone = true; d404 1 d522 14 a535 1 d568 1 a568 1 else d570 1 @ 1.16 log @bug fixing for dynamic adjustment of parallelism @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.15 91/08/27 14:19:11 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.15 91/08/27 14:19:11 hong Exp Locker: hong $"); d339 1 d601 1 d609 1 d880 3 a882 1 if (maxpage < page) d903 10 d919 2 a920 2 SlaveInfoP[MyPid].comdata.data = -1; curpage = -1; d935 8 @ 1.15 log @stupid fixes to make postmaster link @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.14 91/08/25 13:35:20 hong Exp $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.14 91/08/25 13:35:20 hong Exp $"); d239 5 d302 3 d338 1 d477 4 d621 1 a621 1 addSlaveToProcGroup(slave, group) d624 1 d627 1 a627 1 SlaveInfoP[slave].groupPid = ++(ProcGroupInfoP[group].nprocess); d887 1 d897 17 a913 4 curtuple = SlaveLocalInfoD.heapscandesc->rs_ctup; tid = &(curtuple->t_ctid); curpage = ItemPointerGetBlockNumber(tid); SlaveInfoP[MyPid].comdata.data = curpage; d917 1 d919 1 @ 1.14 log @added supports for dynamic parallelism adjustment @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.13 91/07/31 12:37:57 hong Exp $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.13 91/07/31 12:37:57 hong Exp $"); d52 1 a52 1 int MyPid = -1; /* int representing the process id */ d57 1 a57 1 SlaveLocalInfoData SlaveLocalInfoD; d67 1 a67 1 SlaveInfo SlaveInfoP; /* slave backend info */ @ 1.13 log @minor bug fix @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.12 91/07/24 16:14:49 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.12 91/07/24 16:14:49 hong Exp Locker: hong $"); d57 2 d64 2 d285 13 d371 1 d409 7 d431 2 d458 1 a458 1 d473 4 a513 1 ProcGroupLocalInfoP[i].nmembers = 0; d536 1 a536 1 static int d586 1 d593 1 a597 1 p->nmembers = nproc; d601 18 a638 1 ProcGroupLocalInfoP[gid].nmembers = 0; d681 20 d838 56 @ 1.12 log @added new routines for shared memory management of parallel process groups @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.10 91/07/17 23:37:44 hong Exp $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.10 91/07/17 23:37:44 hong Exp $"); d52 1 a52 1 int MyPid; /* int representing the process id */ d580 1 a580 1 ProcessNode *p; d582 3 a584 1 for (p=ProcGroupLocalInfoP[gid].memberProc; p!=NULL; p=p->next) d586 2 @ 1.11 log @stupid fixes to make postmaster happy to link. @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.10 91/07/17 23:37:44 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.10 91/07/17 23:37:44 hong Exp Locker: hong $"); d64 3 a66 3 extern ProcGroupInfo ProcGroupInfoP; /* process group info */ /* defined in execipc.c to make postmaster happy */ d187 1 a187 2 I_SharedMemoryMutex(); ExecSMClean(); a331 2 SharedTransactionState = (TransactionState) ExecSMHighAlloc(nbytes); d405 4 a408 4 MasterProcessIdP = (int*)ExecSMHighAlloc(sizeof(int)); SlaveAbortFlagP = (int*)ExecSMHighAlloc(sizeof(int)); SlaveInfoP = (SlaveInfo)ExecSMHighAlloc(nslaves * sizeof(SlaveInfoData)); ProcGroupInfoP = (ProcGroupInfo)ExecSMHighAlloc(nslaves * d410 2 d421 5 d611 1 a611 1 * WakeupProcGroup d627 139 @ 1.10 log @changes to support inter-fragment and inter-query parallelism in parallel executor, non-parallel processing remain the same as before @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.9 91/05/31 10:55:02 hong Exp Locker: hong $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.9 91/05/31 10:55:02 hong Exp Locker: hong $"); d64 3 a66 1 ProcGroupInfo ProcGroupInfoP; /* process group info */ @ 1.9 log @fixed a bug, P_Finished() should be called with a parameter @ text @d16 1 a16 1 * $Header: RCS/slaves.c,v 1.8 91/04/18 11:39:44 hong Exp $ d24 1 a24 1 RcsId("$Header: RCS/slaves.c,v 1.8 91/04/18 11:39:44 hong Exp $"); d49 8 a56 1 int MyPid; /* int representing the process id */ d58 3 d62 3 a64 4 int *SlaveProcessIdsP; /* array of slave backend process id's */ Pointer *SlaveQueryDescsP; /* array of pointers to slave query descs */ Pointer *SlaveRetStateP; /* array of pointers to slave return states */ int *SlaveAbortFlagP; /* flag set during a transaction abort */ d68 2 a69 2 #define CONDITION_NORMAL 0 #define CONDITION_ABORT 1 d96 1 a96 1 p = SlaveProcessIdsP[i]; d192 1 a192 1 P_Finished(nslaves); d220 1 a220 1 V_Finished(); a232 1 int nproc; d284 2 a285 9 nproc = 0; for (i=0; i