`
badqiu
  • 浏览: 670444 次
  • 性别: Icon_minigender_1
  • 来自: 珠海
社区版块
存档分类
最新评论

rapid-framework工具类介绍一: 异步IO类

阅读更多

在一些特殊的场合,我们可能需要使用异步的IO来大幅提高性能.

如日志信息收集.

 

而rapid-framework提供的异步IO类,使用生产者/消费者的多线程同步模式及Decorator模式,如同使用正常的IO一样,只需套多一层AsyncWriter/AsyncOutputStream,即可将普通IO转换为异步IO来使用.

打开一个异步IO后,将会在后台开启一个异步的线程来写数据.

 

异步的Writer使用:

 

 

BufferedWriter writer = new BufferedWriter(new AsyncWriter(new FileWriter("c:/debug.log")));
writer.write("xxxxx");

异步的OutputStream使用:

 

BufferedOutputStream output = new BufferedOutputStream(new AsyncOutputStream(new FileOutputStream("c:/debug.log")));
output.write("foo".getBytes());

 

在output使用完确保output被close,因为在close时,会强制异步线程将数据全部写入最终的targetOutput. 而调用flush()方法则是空操作,不会写数据.

 

异步IO使用tip(1):

可以将BufferedWriter/BufferedOutputStream的缓冲区加大,以减少写入次数.

 

异步IO使用tip(2):

在close异步IO时也放在一个单独的线程中,因为在实际应用场景中,close异步IO可能是十分耗时的操作.

 

 

AsyncWriter实现源码:

public class AsyncWriter extends Writer {

	private static Log log = LogFactory.getLog(AsyncWriter.class);
	
	private static final int DEFAULT_QUEUE_CAPACITY = 50000;
	private final static char[] CLOSED_SIGNEL = new char[0];
	
	private Writer out;
	private DataProcessorThread dataProcessor;
	private boolean isClosed = false;
	private BlockingQueue<char[]> queue ;
	
	private AsyncExceptinHandler asyncExceptinHandler = new DefaultAsyncExceptinHandler();
	private static long threadSeqNumber;
	private static synchronized long nextThreadID() {
		return ++threadSeqNumber;
    }
	
	private class DataProcessorThread extends Thread {
	    
		private boolean enabled = true;
		private boolean hasRuned = false;
		DataProcessorThread() {
			super("AsyncWriter.DataProcessorThread-"+nextThreadID());
			setDaemon(true);
		}

		public void run() {
			hasRuned = true;
			while (this.enabled || !queue.isEmpty()) {
				
				char[] buf;
				try {
					buf = queue.take();
				} catch (InterruptedException e) {
//					e.printStackTrace();
					continue;
				}
				
				if(buf == CLOSED_SIGNEL) {
					return;
				}
				
				try {
					out.write(buf);
				} catch (IOException e) {
					 asyncExceptinHandler.handle(e);
				}
			}
		}
	}

	public AsyncWriter(Writer out) {
		this(out,DEFAULT_QUEUE_CAPACITY,Thread.NORM_PRIORITY + 1);
	}
	
	public AsyncWriter(Writer out,int queueCapacity) {
		this(out,queueCapacity,Thread.NORM_PRIORITY + 1);
	}
	
	public AsyncWriter(Writer out,int queueCapacity,int dataProcesserThreadPriority) {
		this(out,new ArrayBlockingQueue(queueCapacity),dataProcesserThreadPriority);
	}
	
	public AsyncWriter(Writer out,BlockingQueue queue,int dataProcesserThreadPriority) {
		if(out == null) throw new NullPointerException();
		if(queue == null) throw new NullPointerException();
		
		this.queue = queue;
		this.dataProcessor = new DataProcessorThread();
		if(dataProcesserThreadPriority != Thread.NORM_PRIORITY) {
			this.dataProcessor.setPriority(dataProcesserThreadPriority);
		}
		this.dataProcessor.start();
		this.out = out;
	}
	
	public AsyncWriter(Writer out,AsyncExceptinHandler handler) {
		this(out);
		setAsyncExceptinHandler(handler);
	}

	public void write(char[] buf, int offset, int length) throws IOException {
		synchronized (lock) {
			if(isClosed) throw new IOException("already closed");
			try {
				queue.put(BufferCopyUtils.copyBuffer(buf, offset, length));
			} catch (InterruptedException e) {
				throw new IOException("AsyncWriter occer error",e);
			}
		}
	}

	public void close() throws IOException {
		synchronized (lock) {
			try {
				isClosed = true;
				dataProcessor.enabled = false;
				if(queue.isEmpty()) {
					queue.offer(CLOSED_SIGNEL);
				}
				
				try {
					dataProcessor.join();
				} catch (InterruptedException e) {
					//ignore
				}
				
				if(!dataProcessor.hasRuned) {
					dataProcessor.run();
				}
			}finally {
				out.close();
			}
		}
	}
	
	public void flush() throws IOException {
	}

	protected void finalize() throws Throwable {
		super.finalize();
		if(!isClosed) {
			log.warn("AsyncWriter not close:"+this);
			close();
		}
	}

	public void setAsyncExceptinHandler(AsyncExceptinHandler asyncExceptinHandler) {
		if(asyncExceptinHandler == null) throw new NullPointerException();
		this.asyncExceptinHandler = asyncExceptinHandler;
	}

}
 

 

rapid-framework网站:
http://code.google.com/p/rapid-framework

 

在线javadoc:
http://www.rapid-framework.org.cn/rapid-javadoc-v2.0.x/

 

2
0
分享到:
评论
1 楼 wmj2003 2009-06-10  
rapid-validation 如何将自定义校验方法放到rapid-validation的校验之后。

相关推荐

Global site tag (gtag.js) - Google Analytics