博客
关于我
【JDK源码分析系列】SynchronousQueue源码分析
阅读量:363 次
发布时间:2019-03-04

本文共 27052 字,大约阅读时间需要 90 分钟。

【JDK源码分析系列】SynchronousQueue 源码分析

【1】SynchronousQueue 继承体系图示

【2】SynchronousQueue 中并发相关的基础方法

LockSupport 函数列表

// 返回提供给最近一次尚未解除阻塞的 park 方法调用的 blocker 对象,如果该调用不受阻塞,则返回 nullstatic Object getBlocker(Thread t)// 为了线程调度,禁用当前线程,除非许可可用static void park()// 为了线程调度,在许可可用之前禁用当前线程static void park(Object blocker)// 为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用static void parkNanos(long nanos)// 为了线程调度,在许可可用前禁用当前线程,并最多等待指定的等待时间static void parkNanos(Object blocker, long nanos)// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用static void parkUntil(long deadline)// 为了线程调度,在指定的时限前禁用当前线程,除非许可可用static void parkUntil(Object blocker, long deadline)// 如果给定线程的许可尚不可用,则使其可用static void unpark(Thread thread)

Unsafe 中的方法

/**  * 比较并更新对象的某一个对象类型的域  * @param obj 被操作的对象  * @param fieldoffset 被操作的域在对象中的偏移量  * @param expect 域的期望值  * @param update 域的更新值  */  boolean compareAndSwapObject(Object obj,long Fieldoffset, Object expect, Object update);

【3】SynchronousQueue 源码分析

【3.1】SynchronousQueue 构造函数与主要属性

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 内部实现了两种数据结构 // 公平方式 : 队列/TransferQueue // 非公平方式 : 堆栈/TransferStack private transient volatile Transferer
transferer; // 队列不存储数据,所以没有大小,也无法迭代 // 插入操作的返回必须等待另一个线程完成对应数据的删除操作,反之亦然 // 队列由两种数据结构组成,分别是后入先出的堆栈和先入先出的队列,堆栈是非公平的,队列是公平的 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue
() : new TransferStack
(); }}

【3.2】SynchronousQueue 对外部提供的插入元素与获取元素的方法

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 将(元素e)放进队列,直到有另外一个线程从队列中取走 // 成功则结束,失败则中断线程 public void put(E e) throws InterruptedException { // 若e为空则抛异常 if (e == null) throw new NullPointerException(); // 内部调用transferer的transfer方法实现入队或入栈处理 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } } // 从队列头拿数据并删除数据 // 成功则返回,失败则打断线程 public E take() throws InterruptedException { // 内部调用transferer的transfer方法实现出队或出栈处理 E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }}

【3.3】SynchronousQueue 内部数据结构

【3.3.1】内部数据结构公共接口

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 堆栈和双向队列共同的接口 // 负责执行 put or take abstract static class Transferer
{ // e为空时,会直接返回特殊值,不为空会传递给消费者 // timed 为 true,说明会有超时时间 abstract E transfer(E e, boolean timed, long nanos); }}

【3.3.2】SynchronousQueue -- TransferStack

【3.3.2.1】TransferStack 属性

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { static final class TransferStack
extends Transferer
{ // 代表着执行的是 take 方法 static final int REQUEST = 0; // 代表着执行的是 put 方法 static final int DATA = 1; // 代表栈头节点正在阻塞等待其他线程进行 put 或 take 操作 static final int FULFILLING = 2; // 栈头节点 volatile SNode head; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; // 头节点的偏移量 private static final long headOffset; // 静态代码块初始化UNSAFE与headOffset static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class
k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } }}

【3.3.2.2】TransferStack SNode 节点

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { static final class TransferStack
extends Transferer
{ // 栈中元素,节点 static final class SNode { // 栈的下一个,就是被当前栈压在下面的栈元素 volatile SNode next; // 节点匹配,用来判断阻塞栈元素能被唤醒的时机 // 假设先执行 take,此时队列中没有数据,take 被阻塞 // 当有 put 操作时,会把阻塞的栈元素的 match 属性赋值 // 当阻塞的栈元素发现 match 有值时,就会停止阻塞 volatile SNode match; // 被阻塞的线程,栈中元素是无法被阻塞的,是通过线程阻塞来实现的 // waiter 为阻塞的线程 volatile Thread waiter; // 未投递的消息或者未消费的消息 Object item; // 操作的模式 int mode; // 构造方法 SNode(Object item) { this.item = item; } // CAS的方法设置next boolean casNext(SNode cmp, SNode val) { return cmp == next && // nextOffset为类中next属性的偏移量 UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 阻塞等待match非null // CAS方法设置match为s节点 boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { // waiter 属性在 awaitFulfill 方法中被赋值 // 是在需要阻塞节点的情况下被赋值 Thread w = waiter; if (w != null) { waiter = null; // 唤醒线程 LockSupport.unpark(w); } return true; } // 在match非null的情况下 // 直接返回match与s的比较结果 return match == s; } // CAS方法设置match为当前节点 void tryCancel() { UNSAFE.compareAndSwapObject(this, matchOffset, null, this); } boolean isCancelled() { // 判断match是否被取消 return match == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; // 静态代码块初始化Unsafe,match与next的偏移量 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class
k = SNode.class; // 获取节点match属性字段的偏移量 matchOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("match")); // 获取节点next属性字段的偏移量 nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } }}

【3.3.2.3】TransferStack 一般方法

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { static final class TransferStack
extends Transferer
{ // 判断栈是否处于fulfilling状态 static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; } // CAS 的方法设置头节点 boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } // 创建一个节点 // s 为当前新建的节点 // next 为当前节点下一个节点 // mode 为当前节点的模式 // e 为保存于节点中的元素 static SNode snode(SNode s, Object e, SNode next, int mode) { if (s == null) s = new SNode(e); s.mode = mode; s.next = next; return s; } SNode awaitFulfill(SNode s, boolean timed, long nanos) { // deadline 截止时间, // 如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间, // 否则就是 0 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 获取当前线程 Thread w = Thread.currentThread(); // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次 // 比如本次操作是 take 操作,自选次数后,仍没有其他线程 put 数据进来 // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断 if (w.isInterrupted()) // 尝试取消match // 将s节点的match字段设置为s节点本身 // 那么下一轮循环便可以获取match并退出 s.tryCancel(); // 获取s节点的match字段 SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // 超时则尝试取消match // 将s节点的match字段设置为s节点本身 // 那么下一轮循环便可以获取match并退出 s.tryCancel(); continue; } } // 自旋次数减少 1 // 在需要自旋的情况下不会阻塞当前线程 if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; // 自旋完毕仍然得不到匹配的节点 else if (s.waiter == null) // 设置阻塞的线程 s.waiter = w; else if (!timed) // 通过 park 进行阻塞 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // 通过 park 进行定时的阻塞 LockSupport.parkNanos(this, nanos); } } // 判断是否可以自旋 boolean shouldSpin(SNode s) { // 获取头节点 SNode h = head; // 满足条件 // 1. 当前节点是首节点 // 2. 首节点为null // 3. 首节点的模式满则fulfilling要求 return (h == s || h == null || isFulfilling(h.mode)); } void clean(SNode s) { // item 为节点中待传递的消息 s.item = null; // waiter 为节点中的线程用于阻塞节点 s.waiter = null; // 指针指向s节点的下一个节点 SNode past = s.next; // 确定s节点之后连续的被取消的节点 if (past != null && past.isCancelled()) past = past.next; SNode p; // 以头节点为起点,past为终点, // 删除该区间内从头节点开始的连续的被取消的节点 while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // 继续删除已经被取消的节点 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } } }}

【3.3.2.4】TransferStack transfer 方法

图解

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { static final class TransferStack
extends Transferer
{ // 入栈和出栈的底层方法 @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; // e 为空则是 take 方法 // e 不为空则是 put 方法 int mode = (e == null) ? REQUEST : DATA; // 自旋 for (;;) { // 获取头节点情况如下 // 1. 头节点为空,说明队列中还没有数据 // 2. 头节点不为空,并且是 take 类型的,说明有线程正等着拿数据 // 3. 头节点不为空,并且是 put 类型的,说明有线程正等着放数据 SNode h = head; // 栈头为空,说明队列中还没有数据 // 栈头不为空,但栈头的类型和本次操作一致, // 比如都是 put,则将本次 put 操作放到该栈头的前面即可,使得本次 put 能够先执行 if (h == null || h.mode == mode) { // 设置了超时时间,并且 e 进栈或者出栈超时, // 则丢弃本次操作,直接返回 null 值 // 如果栈头此时被取消了,丢弃栈头,取下一个节点继续消费 if (timed && nanos <= 0) { // 栈头操作被取消 if (h != null && h.isCancelled()) // 丢弃栈头,把栈头后一个元素作为栈头 casHead(h, h.next); else // 栈头是空的,直接返回 null return null; // 没有超时,直接把 e 作为新的栈头 } else if (casHead(h, s = snode(s, e, h, mode))) { // e 等待出栈,一种是空队列 take,一种是 put SNode m = awaitFulfill(s, timed, nanos); // 返回s表示空队列一直没有数据或者put的数据一直没人要 if (m == s) { // 做一次清理操作 clean(s); // 返回null return null; } // 表明来了一次take/put请求并且take与put配对成功,此时需要弹出两个节点 if ((h = head) != null && h.next == s) casHead(h, s.next); // 对于take方法返回m的数据 // 对于put方法返回s的数据 return (E) ((mode == REQUEST) ? m.item : s.item); } // 栈头正在等待其他线程 put 或 take // // 执行到此处头节点不为null并且两次请求的模式不同 } else if (!isFulfilling(h.mode)) { // 若栈头节点已经被取消则去除该栈头节点 if (h.isCancelled()) casHead(h, h.next); // 将当前请求节点入栈 else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 自旋等待匹配或被取消 for (;;) { // m 为插入 s 之前的栈头节点 SNode m = s.next; if (m == null) { casHead(s, null); s = null; break; } SNode mn = m.next; // tryMatch 两个作用 // 1 唤醒被阻塞的栈头 m // 2 把当前节点 s 赋值给 m 的 match 属性 // // 这样栈头 m 被唤醒时,就能从 match 中得到本次操作 s // 其中 s.item 记录着本次的操作节点 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m // 对于take方法返回m的数据 // 对于put方法返回s的数据 return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // 将m出栈 // 继续下一轮循环 s.casNext(m, mn); // help unlink } } } else { SNode m = h.next; if (m == null) casHead(h, null); else { SNode mn = m.next; if (m.tryMatch(h)) // 此时m的match属性为h // 则h,m出栈 casHead(h, mn); else // 此时m的match属性不为h // 则m出栈 h.casNext(m, mn); } } } } }}

【3.3.3】SynchronousQueue -- TransferQueue

【3.3.3.1】TransferQueue 属性

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 双向队列 先入先出 公平 static final class TransferQueue
extends Transferer
{ /** 队列头 */ transient volatile QNode head; /** 队列尾 */ transient volatile QNode tail; // 用于指示队列中待清除的节点 transient volatile QNode cleanMe; private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long cleanMeOffset; // 静态代码块初始化Unsafe以及head,tail,cleanMe的偏移量 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class
k = TransferQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); cleanMeOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("cleanMe")); } catch (Exception e) { throw new Error(e); } } }}

【3.3.3.2】TransferQueue QNode 节点

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 双向队列 先入先出 公平 static final class TransferQueue
extends Transferer
{ // 队列的元素 static final class QNode { // 当前元素的下一个元素 volatile QNode next; // 当前元素的值 volatile Object item; // 可以阻塞住的当前线程 // 队列中的元素是通过该变量阻塞的 volatile Thread waiter; // true 是 put,false 是 take final boolean isData; // 队列节点构造函数 QNode(Object item, boolean isData) { this.item = item; this.isData = isData } // CAS的方法设置next boolean casNext(QNode cmp, QNode val) { return next == cmp && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // CAS的方法设置item boolean casItem(Object cmp, Object val) { return item == cmp && UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } /** * Tries to cancel by CAS'ing ref to this as item. */ // CAS方法设置item为当前节点 void tryCancel(Object cmp) { UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this); } // 判断是否已经被取消 boolean isCancelled() { return item == this; } // 判断该节点是否离队 boolean isOffList() { return next == this; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; // 静态代码块初始化Unsafe,item与next的偏移量 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class
k = QNode.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } }}

【3.3.3.3】TransferQueue 一般方法

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 双向队列 先入先出 公平 static final class TransferQueue
extends Transferer
{ // 队列构造函数 TransferQueue() { QNode h = new QNode(null, false); // initialize to dummy node. head = h; tail = h; } // 设置新的头节点 void advanceHead(QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) h.next = h; // forget old next } // 设置新的尾节点 void advanceTail(QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); } // CAS 方法设置 cleanMe boolean casCleanMe(QNode cmp, QNode val) { return cleanMe == cmp && UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val); } Object awaitFulfill(QNode s, E e, boolean timed, long nanos) { // deadline 截止时间, // 如果设置了超时时间的话,死亡时间等于当前时间 + 超时时间, // 否则就是 0 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 获取当前线程 Thread w = Thread.currentThread(); // 自旋的次数,如果设置了超时时间,会自旋 32 次,否则自旋 512 次 // 比如本次操作是 take 操作,自选次数后,仍没有其他线程 put 数据进来 // 就会阻塞,有超时时间的,会阻塞固定的时间,否则一致阻塞下去 int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 当前线程有无被打断,如果过了超时时间,当前线程就会被打断 if (w.isInterrupted()) // 尝试取消 // 设置item为当前节点 // 则下一轮便可以获取item并退出 s.tryCancel(e); Object x = s.item; if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // 超时尝试取消 // 设置item为当前节点 // 则下一轮便可以获取item并退出 s.tryCancel(e); continue; } } // 自旋次数减少 1 // 在需要自旋的情况下不会阻塞当前线程 if (spins > 0) --spins; // 自旋完毕仍然得不到匹配的节点 else if (s.waiter == null) // 设置阻塞的线程 s.waiter = w; else if (!timed) // 通过 park 进行阻塞 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // 通过 park 进行定时的阻塞 LockSupport.parkNanos(this, nanos); } } void clean(QNode pred, QNode s) { // 清除节点对应的线程 // waiter 为节点中的线程用于阻塞节点 s.waiter = null; while (pred.next == s) { QNode h = head; QNode hn = h.next; // 从头节点开始取消的节点前移 if (hn != null && hn.isCancelled()) { advanceHead(h, hn); continue; } QNode t = tail; if (t == h) // 队列为空 return; QNode tn = t.next; if (t != tail) // 防止清理的过程中有数据插入 continue; if (tn != null) { // 防止清理的过程中有数据插入 advanceTail(t, tn); continue; } if (s != t) { // s不是尾节点 QNode sn = s.next; // sn==s表明s已经出队或者 // pred.casNext(s, sn)表明s为pred的下一个节点且出队成功 if (sn == s || pred.casNext(s, sn)) return; } QNode dp = cleanMe; if (dp != null) { QNode d = dp.next; QNode dn; if (d == null || // d 为null或者 d == dp || // d 离队或者 !d.isCancelled() || // d 没有取消或者 (d != t && // d 不是尾节点并且 (dn = d.next) != null && // d 有后继节点并且 dn != d && // d 的后记节点在队列中并且 dp.casNext(d, dn))) // d 出队 casCleanMe(dp, null); // 设置cleanMe为null if (dp == pred) return; // 设置cleanMe } else if (casCleanMe(null, pred)) return; } } }}

【3.3.3.4】TransferQueue transfer 方法

图解

public class SynchronousQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 双向队列 先入先出 公平 static final class TransferQueue
extends Transferer
{ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { QNode s = null; // true是put,false是get boolean isData = (e != null); for (;;) { // 队列头和尾的临时变量,队列是空的时候,t=h QNode t = tail; QNode h = head; // tail和head没有初始化时,无限循环 // tail和head在TransferQueue初始化的时候,已经被赋值空节点 if (t == null || h == null) continue; // 首尾节点相同,说明是空队列 // 或者尾节点的操作和当前节点操作一致 if (h == t || t.isData == isData) { QNode tn = t.next; // 当 t 不是 tail 时,说明 tail 已经被修改过了 // 因为 tail 没有被修改的情况下,t 和 tail 必然相等 // 因为前面刚刚执行赋值操作,t = tail if (t != tail) continue; // 队尾后面的值还不为空,t 还不是队尾,直接把 tn 赋值给 t,这是一步加强校验 if (tn != null) { advanceTail(t, tn); continue; } // 超时直接返回 null if (timed && nanos <= 0) return null; // 构造node节点 if (s == null) s = new QNode(e, isData); // 如果把 e 放到队尾失败,继续递归放进去 if (!t.casNext(null, s)) continue; // 将s节点入队 advanceTail(t, s); // 阻塞住自己 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { // x == s 表明s节点被取消 clean(t, s); return null; } if (!s.isOffList()) { // s仍在队列中 // 则s出队 advanceHead(t, s); if (x != null) s.item = s; s.waiter = null; } return (x != null) ? (E)x : e; // 队列不为空,并且当前操作和队尾不一致 // 也就是说当前操作与队尾是对应的操作 // 比如说对尾是因为 take 被阻塞的,那么当前操作必然是 put } else { // 如果是第一次执行,此处的 m 代表就是 tail // 也就是这行代码体现出队列的公平,每次操作时, // 从头开始按照顺序进行操作 QNode m = h.next; if (t != tail || m == null || h != head) continue; Object x = m.item; if (isData == (x != null) || // 判断当前操作类型 x == m || // 判断是否取消 // 这里把当前的操作值赋值给阻塞住的 m 的 item 属性 // 这样 m 被释放时,就可得到此次操作的值 !m.casItem(x, e)) { advanceHead(h, m); continue; } // 当前操作放到队头 advanceHead(h, m); // 释放队头阻塞节点 LockSupport.unpark(m.waiter); return (x != null) ? (E)x : e; } } } }}

致谢

本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。

【1】面试官系统精讲Java源码及大厂真题

【2】

【3】

【4】

【5】

转载地址:http://uxmr.baihongyu.com/

你可能感兴趣的文章
MySQL 存储过程参数:in、out、inout
查看>>
mysql 存储过程每隔一段时间执行一次
查看>>
mysql 存在update不存在insert
查看>>
Mysql 学习总结(86)—— Mysql 的 JSON 数据类型正确使用姿势
查看>>
Mysql 学习总结(87)—— Mysql 执行计划(Explain)再总结
查看>>
Mysql 学习总结(88)—— Mysql 官方为什么不推荐用雪花 id 和 uuid 做 MySQL 主键
查看>>
Mysql 学习总结(89)—— Mysql 库表容量统计
查看>>
mysql 实现主从复制/主从同步
查看>>
mysql 审核_审核MySQL数据库上的登录
查看>>
mysql 导入 sql 文件时 ERROR 1046 (3D000) no database selected 错误的解决
查看>>
mysql 导入导出大文件
查看>>
MySQL 导出数据
查看>>
mysql 将null转代为0
查看>>
mysql 常用
查看>>
MySQL 常用列类型
查看>>
mysql 常用命令
查看>>
Mysql 常见ALTER TABLE操作
查看>>
MySQL 常见的 9 种优化方法
查看>>
MySQL 常见的开放性问题
查看>>
Mysql 常见错误
查看>>