本站首页    管理页面    写新日志    退出


«September 2025»
123456
78910111213
14151617181920
21222324252627
282930


公告
 本博客在此声明所有文章均为转摘,只做资料收集使用。

我的分类(专题)

日志更新

最新评论

留言板

链接

Blog信息
blog名称:
日志总数:1304
评论数量:2242
留言数量:5
访问次数:7615612
建立时间:2006年5月29日




[J2SE]Java Concurrent框架之阻塞队列(Blocking queue)
软件技术

lhwork 发表于 2006/9/15 10:04:37

引子:大家上过操作系统的都知道“生产者-消费者(Producer-Consumer)”模型,主要讨论的是进程(线程)间的互斥和同步问题,关键是对锁(lock)的申请、独占和释放,在这里我就不罗嗦了。原先我写的Java代码如下: 500)this.width=500'>500)this.width=500'>public class Producer extends Thread500)this.width=500'>{500)this.width=500'>  private ProductList products = ProductList.getInstance();500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public void run()500)this.width=500'>{500)this.width=500'>    int i = 0;500)this.width=500'>    500)this.width=500'>500)this.width=500'>    while(i <= 20)500)this.width=500'>{500)this.width=500'>500)this.width=500'>      synchronized(products)500)this.width=500'>{ // Get lock on product list500)this.width=500'>500)this.width=500'>    if(products.isFull())500)this.width=500'>{500)this.width=500'>      System.out.println("List is full");500)this.width=500'>      products.notify(); // Release the lock500)this.width=500'>500)this.width=500'>    } else500)this.width=500'>{500)this.width=500'>      Product product = new Product(i++); // Produce a product500)this.width=500'>      products.put(product);500)this.width=500'>      System.out.println("Produced product " + product.getId());500)this.width=500'>      products.notify(); // Release lock500)this.width=500'>    }500)this.width=500'>      } // Release the lock500)this.width=500'>    }500)this.width=500'>  }500)this.width=500'>} 500)this.width=500'>500)this.width=500'>public class Consumer extends Thread500)this.width=500'>{500)this.width=500'>  ProductList products = ProductList.getInstance();500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public void run()500)this.width=500'>{500)this.width=500'>500)this.width=500'>    while(true)500)this.width=500'>{500)this.width=500'>500)this.width=500'>      synchronized(products)500)this.width=500'>{500)this.width=500'>500)this.width=500'>    try 500)this.width=500'>{500)this.width=500'>      products.wait(); // Wait for lock500)this.width=500'>      Product product = null;500)this.width=500'>      if(!(products.isEmpty()))500)this.width=500'>        product = products.take();500)this.width=500'>      else500)this.width=500'>        System.out.println("List is empty");500)this.width=500'>      System.out.println("Consumed product " + product.getId()); // Get the lock500)this.width=500'>500)this.width=500'>    } catch (InterruptedException ex) 500)this.width=500'>{500)this.width=500'>      ex.printStackTrace();500)this.width=500'>    }500)this.width=500'>      } // Release the lock500)this.width=500'>    }500)this.width=500'>  }500)this.width=500'>}500)this.width=500'> 500)this.width=500'>import java.util.ArrayList;500)this.width=500'>import java.util.List;500)this.width=500'>500)this.width=500'>500)this.width=500'>public class ProductList 500)this.width=500'>{500)this.width=500'>  private static ProductList instance = new ProductList();500)this.width=500'>  private List<Product> products; // Adapter pattern500)this.width=500'>  public static final int SIZE = 10;500)this.width=500'>  500)this.width=500'>500)this.width=500'>  private ProductList() 500)this.width=500'>{500)this.width=500'>    products = new ArrayList<Product>(SIZE);500)this.width=500'>  }500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public static ProductList getInstance() 500)this.width=500'>{ // Singleton pattern500)this.width=500'>    return instance;500)this.width=500'>  }500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public boolean isFull() 500)this.width=500'>{500)this.width=500'>    return products.size() == SIZE;500)this.width=500'>  }500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public void put(Product product) 500)this.width=500'>{500)this.width=500'>    products.add(product);500)this.width=500'>  }500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public Product take() 500)this.width=500'>{500)this.width=500'>    return products.remove(0);500)this.width=500'>  }500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public boolean isEmpty() 500)this.width=500'>{500)this.width=500'>    return products.isEmpty();500)this.width=500'>  }500)this.width=500'>}500)this.width=500'> 500)this.width=500'>500)this.width=500'>public class Product 500)this.width=500'>{500)this.width=500'>  private int id;500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public Product(int id) 500)this.width=500'>{500)this.width=500'>    this.id = id;500)this.width=500'>  }500)this.width=500'>  500)this.width=500'>500)this.width=500'>  public int getId() 500)this.width=500'>{500)this.width=500'>    return id;500)this.width=500'>  }500)this.width=500'>}500)this.width=500'> 500)this.width=500'>500)this.width=500'>public class Main 500)this.width=500'>{500)this.width=500'>500)this.width=500'>  public static void main(String[] args)500)this.width=500'>{500)this.width=500'>    Producer p = new Producer();500)this.width=500'>    Consumer c = new Consumer();500)this.width=500'>    500)this.width=500'>    p.start();500)this.width=500'>    c.start();500)this.width=500'>  }500)this.width=500'>}500)this.width=500'>虽 然Java对信号量及原语做了更高层次的封装(wait()、notify()、notifyAll()、synchronized{}),但看完上述代 码还是觉得有点麻烦,于是JDK 5在原先collection框架的基础上增加了java.util.concurrent包,封装了许多用于线程并发操作的数据结构和操作。其中的 BlockingQueue接口就是封装了一个阻塞队列的接口,具体地说就是实现了一个用于消费者(多个)和生产者(多个)交换产品的中介,生产者线程在 队列满时阻塞,消费者线程在队列空时阻塞,当然在没有得到锁之前两类线程均会阻塞。详细信息可以参考Java Doc。下面用BlockingQueue实现P-C模型: 500)this.width=500'>500)this.width=500'>class Producer implements Runnable 500)this.width=500'>{500)this.width=500'>   private final BlockingQueue queue;500)this.width=500'>500)this.width=500'>   Producer(BlockingQueue q) 500)this.width=500'>{ queue = q; }500)this.width=500'>500)this.width=500'>   public void run() 500)this.width=500'>{500)this.width=500'>500)this.width=500'>     try 500)this.width=500'>{500)this.width=500'>500)this.width=500'>       while(true) 500)this.width=500'>{ queue.put(produce()); }500)this.width=500'>500)this.width=500'>     } catch (InterruptedException ex) 500)this.width=500'>{ 500)this.width=500'> handle 500)this.width=500'>}500)this.width=500'>   }500)this.width=500'>500)this.width=500'>   Object produce() 500)this.width=500'>{ 500)this.width=500'> }500)this.width=500'> }500)this.width=500'>500)this.width=500'>500)this.width=500'> class Consumer implements Runnable 500)this.width=500'>{500)this.width=500'>   private final BlockingQueue queue;500)this.width=500'>500)this.width=500'>   Consumer(BlockingQueue q) 500)this.width=500'>{ queue = q; }500)this.width=500'>500)this.width=500'>   public void run() 500)this.width=500'>{500)this.width=500'>500)this.width=500'>     try 500)this.width=500'>{500)this.width=500'>500)this.width=500'>       while(true) 500)this.width=500'>{ consume(queue.take()); }500)this.width=500'>500)this.width=500'>     } catch (InterruptedException ex) 500)this.width=500'>{ 500)this.width=500'> handle 500)this.width=500'>}500)this.width=500'>   }500)this.width=500'>500)this.width=500'>   void consume(Object x) 500)this.width=500'>{ 500)this.width=500'> }500)this.width=500'> }500)this.width=500'>500)this.width=500'>500)this.width=500'> class Setup 500)this.width=500'>{500)this.width=500'>500)this.width=500'>   void main() 500)this.width=500'>{500)this.width=500'>     BlockingQueue q = new SomeQueueImplementation();500)this.width=500'>     Producer p = new Producer(q);500)this.width=500'>     Consumer c1 = new Consumer(q);500)this.width=500'>     Consumer c2 = new Consumer(q);500)this.width=500'>     new Thread(p).start();500)this.width=500'>     new Thread(c1).start();500)this.width=500'>     new Thread(c2).start();500)this.width=500'>   }500)this.width=500'> }500)this.width=500'>可以看出代码中没有出现wait()或notify()之类的原语操作,这些操作由concurrent框架负责封装。更全面的讨论可以参考《驯服 Tiger: 并发集合》(IBM)


阅读全文(2056) | 回复(0) | 编辑 | 精华
 



发表评论:
昵称:
密码:
主页:
标题:
验证码:  (不区分大小写,请仔细填写,输错需重写评论内容!)



站点首页 | 联系我们 | 博客注册 | 博客登陆

Sponsored By W3CHINA
W3CHINA Blog 0.8 Processed in 0.094 second(s), page refreshed 144800631 times.
《全国人大常委会关于维护互联网安全的决定》  《计算机信息网络国际联网安全保护管理办法》
苏ICP备05006046号