Java中使用阻塞队列控制线程集实例
队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。
下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。
java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。
生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。
我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。
注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class BlockingQueueTest
{
public static void main(String[] args)
{
Scanner in = new Scanner(System.in);
System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
final int FILE_QUEUE_SIZE = 10;
final int SEARCH_THREADS = 100;
BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++)
new Thread(new SearchTask(queue, keyword)).start();
}
}
/**
* This task enumerates all files in a directory and its subdirectories.
*/
class FileEnumerationTask implements Runnable
{
/**
* Constructs a FileEnumerationTask.
* @param queue the blocking queue to which the enumerated files are added
* @param startingDirectory the directory in which to start the enumeration
*/
public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
{
this.queue = queue;
this.startingDirectory = startingDirectory;
}
public void run()
{
try
{
enumerate(startingDirectory);
queue.put(DUMMY);
}
catch (InterruptedException e)
{
}
}
/**
* Recursively enumerates all files in a given directory and its subdirectories
* @param directory the directory in which to start
*/
public void enumerate(File directory) throws InterruptedException
{
File[] files = directory.listFiles();
for (File file : files)
{
if (file.isDirectory()) enumerate(file);
else queue.put(file);
}
}
public static File DUMMY = new File("");
private BlockingQueue<File> queue;
private File startingDirectory;
}
/**
* This task searches files for a given keyword.
*/
class SearchTask implements Runnable
{
/**
* Constructs a SearchTask.
* @param queue the queue from which to take files
* @param keyword the keyword to look for
*/
public SearchTask(BlockingQueue<File> queue, String keyword)
{
this.queue = queue;
this.keyword = keyword;
}
public void run()
{
try
{
boolean done = false;
while (!done)
{
File file = queue.take();
if (file == FileEnumerationTask.DUMMY)
{
queue.put(file);
done = true;
}
else search(file);
}
}
catch (IOException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
}
}
/**
* Searches a file for a given keyword and prints all matching lines.
* @param file the file to search
*/
public void search(File file) throws IOException
{
Scanner in = new Scanner(new FileInputStream(file));
int lineNumber = 0;
while (in.hasNextLine())
{
lineNumber++;
String line = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
in.close();
}
private BlockingQueue<File> queue;
private String keyword;
}
相关文章
Maven编译错误:程序包com.sun.*包不存在的三种解决方案
J2SE中的类大致可以划分为以下的各个包:java.*,javax.*,org.*,sun.*,本文文章主要介绍了maven编译错误:程序包com.sun.xml.internal.ws.spi不存在的解决方案,感兴趣的可以了解一下2024-02-02更简单更高效的Mybatis Plus最新代码生成器AutoGenerator
这篇文章主要为大家介绍了更简单更高效的Mybatis Plus最新代码生成器AutoGenerator使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2023-02-02Java Stream中自定义Collector实现复杂数据收集的方法
Java Stream API中的Collector接口是一个强大的工具,它允许我们自定义数据收集、转换和聚合的过程,,本文介绍了Java Stream中自定义Collector实现复杂数据收集方法,需要的朋友可以参考下2024-08-08Java多线程编程之ThreadLocal线程范围内的共享变量
这篇文章主要介绍了Java多线程编程之ThreadLocal线程范围内的共享变量,本文讲解了ThreadLocal的作用和目的、ThreadLocal的应用场景、ThreadLocal的使用实例等,需要的朋友可以参考下2015-05-05
最新评论