import java.util.*; /*************************************************************** * Define exceptions used in queue operations and sub-ops. * ***************************************************************/ class CBErr extends Exception { private String cause; CBErr( String x ) { cause = x; } public String toString() { return ("Chain break:\n" + cause); } } /*************************************************************** * The cell class represents one queue cell in the memory. * ***************************************************************/ class cell { static memory M; // refers to memory object // that contains all cells public int cellindex; // index of the cell relative to // all cells in the memory public int memloc; // index of first word in memory public final static int numwords = 5; public final static int ALLOCATING= 1; // status types public final static int ACTIVE = 2; // status types public final static int numreservedcells = 3; public final static int HEADCELL = 0; // indices of public final static int NULLCELL = 1; // the reserved public final static int VOIDCELL = 2; // cells public final static int FIRSTCELL = 3; // public static int HEADCELLID; // IDs public static int VOIDCELLID; // of the public static int NULLCELLID; // reserved cells public static int NOID; // impossible ID public static int FIRSTCELLID; // first "real" ID //----- Cell Fields -------------------------------------- public word next; public word status; public word enqid; public word deqid; public word data; //----- end of Cell Fields ------------------------------- // ***** real cell creation ***** cell(memory bank, int index, int location) { M = bank; cellindex = index; // for diagnostics memloc = location; // set up special cell constants HEADCELLID = makeCellid(M.PROCESSES,HEADCELL); VOIDCELLID = makeCellid(M.PROCESSES,VOIDCELL); NULLCELLID = makeCellid(M.PROCESSES,NULLCELL); FIRSTCELLID = makeCellid(M.PROCESSES,FIRSTCELL); NOID = makeCellid(M.PROCESSES,-1); next = bank.RAM[location]; next.write(NOID); // cell has no initial chaining status = bank.RAM[location+1]; status.write(ACTIVE); // cell is initially free enqid = bank.RAM[location+2]; enqid.write(NOID); // cell has no initial id deqid = bank.RAM[location+3]; deqid.write(NOID); data = bank.RAM[location+4]; } // ***** dummy cell creation ***** // mainly just for creating snapshot cells cell() { next = new word(); status = new word(); enqid = new word(); deqid = new word(); data = null; process.sleep(); } // ***** check if a given ID is known ****** static boolean isKnown(int x) { int i; // is cell with ID x known anywhere? if (x == NOID) return false; if (published.isPublished(x)) return true; for (i=0; i 3) System.out.println("Makefree: " + M.C[x]); M.C[x].next.write(VOIDCELL); M.C[x].enqid.write(NOID); M.C[x].deqid.write(NOID); return true; } // ***** cell allocation subroutine ***** static int getFreeCell() { int s,t,i; int c = -1; // loop up to 2*n^2 times to find a free cell s = 2 * M.PROCESSES * M.PROCESSES; searchFree: for (t=0; t> 24) & 0xFF); } // ***** what is seqno of this ID ? ***** public static int seqnoOf(int id) { return (id & 0xFFFFFF); } // ***** display an ID ****** public static String showID( int id ) { int proc = processOf(id); int seqn = seqnoOf(id); if ((proc != M.PROCESSES) && (proc != 255)) return new String( "[" + processOf(id) + ":" + seqnoOf(id) + "]" ); switch (seqn) { case 0: return new String("[HEAD]"); case 1: return new String("[NULL]"); case 2: return new String("[VOID]"); case 3: return new String("[FIRST]"); default: return new String("[NOID]"); } } // ***** where is the cell for id x ? ***** public static int getCellIndex( int x ) throws CBErr { int i; if ((x == NOID) || (x == VOIDCELLID)) return -1; // scan list of cells for specified ID x for (i=0; i 0) return new String("RAM[" + memloc +"] C[" + cellindex + "]" + " next " + showID(next.read()) + " enqid " + showID(enqid.read()) + " deqid " + showID(deqid.read()) + " status " + S); return new String(" next " + showID(next.read()) + " enqid " + showID(enqid.read()) + " deqid " + showID(deqid.read()) + " status " + S); } } // end of class cell /*************************************************************** * The memory class represents an array of words in memory, * * and also maps the published, wills, cells, and freewords. * ***************************************************************/ class memory { // // RAM represents the words in memory; RAM is basically // for debugging and statistics collection over words. // // F, C, P, and W are arrays of the structures mapped on // to words of RAM. // // F is simply an array of n words, used for storage // allocation/request of cells. // // C is the array of cells -- either the three reserved // cells, the queue cells, or those that are free. // // P is the array of published cell ids, used by a process // to "lock" certain ids during an operation. // // W is the array of wills, one per process. // public word[] RAM; public word[] F; public cell[] C; public published[] P; public will[] W; // helpAlloc is an array of "help" indices // used for cell allocation public int[] helpAlloc; // Public Information: # processes, maxSIZE of queue public int PROCESSES; public int ITEMS; int SIZE; public int TRACE; public int PERCENTENQ; // Statistics: # free cells int numFreeCells; int numEnqOps; int numDeqOps; int numQFull; int numQEmpty; int numCBErrs; // ********************************************** // initialize memory to specified number of // (1) processes -- each gets a freeword, will, and // a published area in memory; // (2) queue elements --- each gets a cell, plus // we set aside three extra cells permanently memory(int numprocesses, int numqueueitems) { // save memory-building parameters PROCESSES = numprocesses; ITEMS = numqueueitems; // Allocate the RAM words: // first compute total number of words // that will be needed in shared memory SIZE = numprocesses; // one freeword SIZE += numprocesses * will.numwords; // plus a will SIZE += numprocesses * published.numwords; SIZE += (numqueueitems + cell.numreservedcells) * cell.numwords; RAM = new word[SIZE]; // allocate new array // then allocate memory words int i; for (i=0; i 3) System.out.println("Free Cell Count = " + numFreeCells); } public synchronized void decrFreeCellCount( ) { numFreeCells --; if (TRACE > 3) System.out.println("Free Cell Count = " + numFreeCells); } public synchronized void incrEnqOps( ) { numEnqOps ++; } public synchronized void incrDeqOps( ) { numDeqOps ++; } public synchronized void incrQFull( ) { numQFull ++; } public synchronized void incrQEmpty( ) { numQEmpty ++; } public synchronized void incrCBErrs( ) { numCBErrs ++; } // ***** memory Dump ***** public void dump() { int i; if (TRACE > 2) { System.out.println("\n--- All cells in order ---\n"); for (i=0; i cell -> NULL // (b) must have non-NOID deqid field int Enqid; int Deqid; cell p,q; int xid,i,yid,nid; q = M.C[cell.HEADCELL]; // q is initial cell of chain q.snapCell(oldSnap); // set up "previous" snap xid = oldSnap.next.read(); try { i = cell.getCellIndex(xid); } catch (CBErr e) { return; } // OK if cell not found if (i == -1) return; // Another OK cell not found p = M.C[i]; p.snapCell(snap); // speculative snap i = oldSnap.next.read(); M.P[id].publish(i); // speculative publication if (!q.sameAsCell(oldSnap)) return; if (!p.sameAsCell(snap)) return; // do conditions all apply ? nid = p.next.read(); if (nid == cell.NULLCELLID) return; if (nid == cell.VOIDCELLID) return; yid = p.deqid.read(); if (yid == cell.NOID) return; // cell has deqid filled in, so OK to dechain it // but first make sure it's will is done i = cell.processOf(yid); if (i < M.PROCESSES) M.W[i].markDone(cell.seqnoOf(yid)); if (q.next.compareNswap(xid,nid)) { if (M.TRACE > 3) System.out.println("Process " + id + " deChained " + cell.showID(xid)); M.incrFreeCellCount(); } // now that cell p has been deChained (by this or // by another process) and is still protected by // publication, it's OK to remove its "next" pointer // so as to free up other cells in future p.next.compareNswap(nid,cell.VOIDCELLID); } // ***** deqMark is called to finish an incomplete deqPlace ***** void deqMark( will w ) throws CBErr { int xDeqid; // deqid to be used int xDeqEnqid; // deqEnqId to be used int seqno; // seqno of xDeqid int lag; // enqid of "lag" cell in chain chase cell p,q; // working cells in chain int i; // verify and setup: publish what we try to dequeue xDeqid = w.id.read(); seqno = cell.seqnoOf(xDeqid); xDeqEnqid = w.deqEnqid.read(); if (xDeqEnqid == cell.VOIDCELLID) { w.markDone(seqno); return; } q = M.C[cell.HEADCELL]; // q is initial cell of chain q.snapCell(oldSnap); // set up "previous" snap lag = cell.VOIDCELLID; // dummy lag cell while (true) { // --- main loop: advance one in chain sleep(); i = cell.getCellIndex(oldSnap.next.read()); if (i == -1) throw new CBErr(" Process " + id + " (in deqMark) VOID encountered."); p = M.C[i]; p.snapCell(snap); // speculative snap i = oldSnap.next.read(); M.P[id].publish(i); // speculative publication if (!q.sameAsCell(oldSnap)) throw new CBErr(" Process " + id + " (in deqMark)" + "\n oldSnap: " + oldSnap + "\n q: " + q); if (!p.sameAsCell(snap)) throw new CBErr(" Process " + id + " (in deqMark)" + "\n snap: " + snap + "\n p: " + p); M.P[id].retract(lag); // done with lag ID lag = oldSnap.enqid.read(); // lag for next iteration // now that speculative advance is done, // let's take a look at the new cell we got if (w.type.read() != will.DEQ) return; if (w.isDone(seqno)) return; if (w.deqEnqid.read() != xDeqEnqid) throw new CBErr(" Process " + id + " deqEnqid mismatch."); if (snap.enqid.read() == xDeqEnqid) { // cell for deqPlace is now secure: finish job or // abort the speculative decision on cell p p.deqid.compareNswap(cell.NOID,xDeqid); if (p.deqid.read() == xDeqid) { w.markDone(seqno); return; } // ouch! p is already taken, so back out! while (w.id.read() == xDeqid && w.deqEnqid.read() == xDeqEnqid) w.deqEnqid.compareNswap(xDeqEnqid,xDeqid); throw new CBErr(" Process " + id + " cell stolen (1)"); } if (snap.next.read() == cell.NULLCELLID) { // hmmm... the xDeqEnqid is missing, so // it may have been stolen: back out! while (w.id.read() == xDeqid && (!w.isDone(seqno)) && w.deqEnqid.read() == xDeqEnqid) w.deqEnqid.compareNswap(xDeqEnqid,xDeqid); throw new CBErr(" Process " + id + " cell stolen (2)"); } // end of chain not yet found, so continue cell tcell = oldSnap; oldSnap = snap; snap = tcell; q = p; } } // ***** deqPlace for a given will ***** void deqPlace( will w ) throws CBErr { int xDeqid; // deqid to be used int seqno; // seqno of xDeqid int lag; // enqid of "lag" cell in chain chase cell p,q; // working cells in chain int i; // verify and setup: publish what we try to dequeue xDeqid = w.id.read(); seqno = cell.seqnoOf(xDeqid); M.P[id].publish(xDeqid); // it appears now that xDeqid needs to be deq-placed // so follow the current chain until // (a) the will shows the work is already done, or // (b) the chain already has cell xDeqplace, or // (c) chain breaks due to concurrency, or // (d) the "next" is NULL q = M.C[cell.HEADCELL]; // q is initial cell of chain q.snapCell(oldSnap); // set up "previous" snap lag = cell.VOIDCELLID; // dummy lag cell while (true) { // --- main loop: advance one in chain sleep(); i = cell.getCellIndex(oldSnap.next.read()); if (i == -1) throw new CBErr(" Process " + id + " (in deqPlace) VOID encountered."); p = M.C[i]; p.snapCell(snap); // speculative snap i = oldSnap.next.read(); M.P[id].publish(i); // speculative publication if (!q.sameAsCell(oldSnap)) throw new CBErr(" Process " + id + " (in deqPlace)" + "\n oldSnap: " + oldSnap + "\n q: " + q); if (!p.sameAsCell(snap)) throw new CBErr(" Process " + id + " (in deqPlace)" + "\n snap: " + snap + "\n p: " + p); M.P[id].retract(lag); // done with lag ID lag = oldSnap.enqid.read(); // lag for next iteration // now that speculative advance is done, // let's take a look at the new cell we got if (w.type.read() != will.DEQ) return; if (w.isDone(seqno)) return; if (p.deqid.read() == xDeqid) { w.markDone(seqno); return; } if (p.deqid.read() == cell.NOID) { // p appears to be eligible for a deqplace // now test if will hasn't yet been used w.deqEnqid.compareNswap(xDeqid,snap.enqid.read()); deqMark(w); // for message, compute index of w int k; for (k = 0; k < M.PROCESSES; k++) if (w == M.W[k]) break; if (M.TRACE > 2) { System.out.println("Process " + id + " deqPlaced for " + k + " with deqid " + cell.showID(xDeqid) + " and enqid " + cell.showID(p.enqid.read())); } return; } if (p.next.read() == cell.NULLCELLID) { // queue looks empty, try to mark this fact // in the will by smashing the deqEnqid field if (w.deqEnqid.compareNswap(xDeqid,cell.VOIDCELLID)) { // reservation worked --- so now just finish w.markDone(seqno); return; } else { deqMark(w); throw new CBErr(" Process " + id + " empty queue conflict."); } } // end of chain not yet found, so continue cell tcell = oldSnap; oldSnap = snap; snap = tcell; q = p; } } // ***** thread cell for a given will ***** boolean threadCell( will w ) throws CBErr { cell x; // cell to be threaded int xEnqid; // enqid of x int seqno; // seqno of enqid of x int lag; // enqid of "lag" cell in chain chase cell p,q; // working cells in chain int tind; // temp index for cell testing int i; // verify and setup: publish what we try to thread tind = w.cellindx.read(); if (tind == -1) return true; x = M.C[tind]; xEnqid = x.enqid.read(); seqno = cell.seqnoOf(xEnqid); M.P[id].publish(xEnqid); // it appears now that x needs to be threaded // so follow the current chain until // (a) the will shows the work is already done, or // (b) the chain already has cell x chained, or // (c) chain breaks due to concurrency, or // (d) the "next" is NULL q = M.C[cell.HEADCELL]; // q is initial cell of chain q.snapCell(oldSnap); // set up "previous" snap lag = cell.VOIDCELLID; // dummy lag cell while (true) { // --- main loop: advance one in chain sleep(); i = cell.getCellIndex(oldSnap.next.read()); if (i == -1) throw new CBErr(" Process " + id + " (in threadCell) VOID encountered."); p = M.C[i]; p.snapCell(snap); // speculative snap i = oldSnap.next.read(); M.P[id].publish(i); // speculative publication if (!q.sameAsCell(oldSnap)) throw new CBErr(" Process " + id + " (in threadCell)" + "\n oldSnap: " + oldSnap + "\n q: " + q); if (!p.sameAsCell(snap)) throw new CBErr(" Process " + id + " (in threadCell)" + "\n snap: " + snap + "\n p: " + p); M.P[id].retract(lag); // done with lag ID lag = oldSnap.enqid.read(); // lag for next iteration // now that speculative advance is done, // let's take a look at the new cell we got if (w.type.read() != will.ENQ) return true; if (w.isDone(seqno)) return true; if (p.enqid.read() == xEnqid) return true; tind = w.cellindx.read(); if (tind == -1) return true; if (xEnqid != M.C[tind].enqid.read()) return true; if (p.next.read() == cell.NULLCELLID) { // end of chain: make sure corresponding // will is marked done before the NULLCELLID // is replaced (to prevent any double enqueue) int tproc = cell.processOf(snap.enqid.read()); int tseqn = cell.seqnoOf(snap.enqid.read()); if (tproc < M.PROCESSES) (M.W[tproc]).markDone(tseqn); // now try enchaining x onto p if (p.next.compareNswap(cell.NULLCELLID,xEnqid)) { // will needs to be marked as "done" w.markDone(seqno); // for message, compute index of w int k; for (k = 0; k < M.PROCESSES; k++) if (w == M.W[k]) break; if (M.TRACE > 2) { System.out.println("Process " + id + " enqueued for " + k + " with enqid " + cell.showID(xEnqid)); } return true; } } // end of chain not yet found, so continue cell tcell = oldSnap; oldSnap = snap; snap = tcell; q = p; } } // ***** enqueue operation ***** boolean enqueue( int datum ) { // add to enqueue count M.incrEnqOps(); // also attempt a deChain here // since it might free up a needed cell deChain(); M.P[id].retractALL(); // first step: allocate a new cell or return false int cind = cell.allocate(id); if (cind == -1) return false; cell newcell = M.C[cind]; // second step: obtain an enqid for this new cell int opid = genCellid(); if (M.TRACE > 4) System.out.println("Process " + id + " starts enqueue " + cell.showID(opid)); M.P[id].publish(opid); // this will protect the new cell // when its status becomes active // set up cell fields newcell.enqid.write(opid); newcell.next.write(cell.NULLCELLID); newcell.data.write(datum); newcell.status.write(cell.ACTIVE); // now set up the will field M.W[id].id.write(opid); M.W[id].type.write(will.ENQ); M.W[id].cellindx.write(cind); M.W[id].deqEnqid.write(opid); M.W[id].state.write( cell.makeCellid(will.START,cell.seqnoOf(opid)) ); // at this point, the cell is prepared, // the enqid is published, and the will is // available for anyone to help // attempt to thread the new cell onto // the current queue boolean s = false; while (!s) { try { s = threadCell( M.W[id] ); } catch (CBErr e) { M.incrCBErrs(); if (M.TRACE > 4) System.out.println(e); }; M.P[id].retractALLbut(opid); // undo any published // except our own } // the enqueue has been completed, so // refresh the will, unpublish all and exit M.W[id].id.write(cell.NOID); M.W[id].type.write(will.IDLE); M.W[id].cellindx.write(-1); M.W[id].state.write(cell.makeCellid(will.DONE,0)); M.P[id].retractALL(); return true; } // ***** dequeue operation ***** boolean dequeue( ) { int i,k; // Add to number of dequeue operations M.incrDeqOps(); // obtain a deqid for this operation int opid = genCellid(); M.P[id].publish(opid); // this will protect the deq cell // now set up the will field M.W[id].id.write(opid); M.W[id].type.write(will.DEQ); M.W[id].cellindx.write(-1); M.W[id].deqEnqid.write(opid); M.W[id].state.write( cell.makeCellid(will.START,cell.seqnoOf(opid)) ); // at this point, the cell is prepared, // the deqid is published, and the will is // available for anyone to help // attempt to deqPlace the new cell in the current queue boolean s = false; while (!s) { try { deqPlace( M.W[id] ); M.W[id].state.compareNswap( cell.makeCellid(will.START,cell.seqnoOf(opid)), cell.makeCellid(will.DONE,cell.seqnoOf(opid)) ); s = true; } catch (CBErr e) { M.incrCBErrs(); if (M.TRACE > 4) System.out.println(e); } M.P[id].retractALLbut(opid); // undo any published // except our own } // the deqPlace is done, now find the deq'ed cell M.P[id].retractALLbut(opid); s = false; for (i=0; i 2) System.out.println("Process " + id + " found queue empty."); return false; } // for non-empty queue, dechain the dequeued cell for (i=0; i 4) System.out.println(e); } M.P[id].retractALL(); // undo any publications } } else if (wtype == will.DEQ) { boolean s = false; int helpdeqid = M.W[helpid].id.read(); // get Opid while (!s) { if (M.W[helpid].id.read() != helpdeqid) return; try { deqPlace( M.W[helpid] ); s = true; } catch (CBErr e) { M.incrCBErrs(); if (M.TRACE > 4) System.out.println(e); } M.P[id].retractALL(); // no need for pubs here } } } // ***** process runs this logic ****** public void run() { int i,j; for (i=0; i<16700000; i++) { // try to help some other process helpout(); j = Math.abs(genRand.nextInt()); j = (j % 101); // random num for enq/dequeue if (j > M.PERCENTENQ) { // dequeue a cell if (!dequeue()) { M.incrQEmpty(); } } else { // enqueue a cell if (enqueue(i)) { } else { M.incrQFull(); if (M.TRACE > 2) System.out.println("Process " + id + " found queue full."); } } sleep(); } } } // end of class process /*************************************************************** * The published class represents all the published ids of a * * of a process during various phases of operations. * ***************************************************************/ class published { static memory M; int publishedindex; // process number int memloc; // index of first word in memory public final static int numwords = 5; //----- Published Fields --------------------------------- public word known[]; //----- end of Published Fields -------------------------- // ***** published allocation ****** published(memory bank, int index, int location) { int i; M = bank; publishedindex = index; memloc = location; known = new word[numwords]; for (i=0; i 50)) { System.err.println("numProcesses must be [1..50]"); System.exit(0); } // Restrict number of processes to what is possible continue; } // Look for -q if ( args[i].startsWith("-q") && (i+1) 5)) { System.err.println("trace must be [0..5]"); System.exit(0); } continue; } // Look for -s if ( args[i].startsWith("-s") && (i+1) 100)) { System.err.println("percentEnq must be [0..100]"); System.exit(0); } continue; } // Look for -h if ( args[i].startsWith("-h") ) { System.err.println("qdrive syntax: " + " -p #processes -q #items -t #seconds " + "-s percentEnq -d tracelevel"); System.exit(0); } // Shouldn't arrive at this point! System.err.println("Error parsing command line: "+args[i]); System.exit(0); } } // ***** process runs this logic ****** public static void main(String[] args) { memory M; // the common memory object Runnable Processes[]; // array of processes Thread Tasks[]; // and the associated threads int i; // get command parameters loaded parseArguments(args); M = new memory(numProcesses,numItems); // set up shared memory M.TRACE = trace; M.PERCENTENQ = percentEnq; Processes = new process[numProcesses]; Tasks = new Thread[numProcesses]; for (i=0; i