// 代表需要同步的状态及其getter/setter privatevolatileint state; protectedfinalintgetState(){ return state; } protectedfinalvoidsetState(int newState){ state = newState; } // cas的方式更新state的方法 protectedfinalbooleancompareAndSetState(int expect, int update){ // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
... }
将上述代码转化为图,就是:
以实现独占锁为例(即当前资源只能被一个线程占有),其实现原理如下:state 初始化 0,在多线程条件下,线程要执行临界区的代码,必须首先获取锁,也就是线程需要成功将 state 从 0 加到 1,其他线程再获取的话由于共享资源已被占用,所以会到 FIFO 同步队列去等待,等占有 state 的线程执行完临界区的代码释放资源( state 减 1)后,会唤醒 FIFO 中的下一个等待线程(head 中的下一个结点)去获取锁。 head 结点代表当前占用的线程,其他节点由于暂时获取不到锁所以依次排队等待锁释放。 因为state变量是volatile的,而使用AQS进行加解锁操作时都会修改state变量的值,根据volatile关键字的Happens-Before规则,以及Happens-Before的传递性,就可以保证使用AQS进行同步时的可见性。
// Releases the lock by setting state to zero protectedbooleantryRelease(int releases){ assert releases == 1; // Otherwise unused if (getState() == 0) thrownew IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); returntrue; }
// Provides a Condition Condition newCondition(){ returnnew ConditionObject(); }
// Deserializes properly privatevoidreadObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
// The sync object does all the hard work. We just forward to it. privatefinal Sync sync = new Sync();
/** waitStatus value to indicate thread has cancelled */ staticfinalint CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ staticfinalint SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalint CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalint PROPAGATE = -3;
privatevoidcancelAcquire(Node node){ // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) { node.prev = pred = pred.prev; } // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
privatevoidunparkSuccessor(Node node){ /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
Determination of successor must avoid races with newly queued nodes to set the “next” fields of their predecessors. This is solved when necessary by checking backwards from the atomically updated “tail” when a node’s successor appears to be null. (Or, said differently, the next-links are an optimization so that we don’t usually need a backward scan.)
publicfinalvoidsignalAll(){ if (!isHeldExclusively()) thrownew IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
privatevoiddoSignalAll(Node first){ lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
finalbooleantransferForSignal(Node node){ // 如果该节点在调用signal方法前已经被取消了,则直接跳过这个节点 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse; // 如果该节点在条件队列中正常等待,则利用enq方法将该节点添加至同步队列队列的尾部 // 返回前驱节点 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); returntrue; }