Kratos

A responsible theme for WordPress

Java代码模拟MapReduce中环形缓冲区的读写过程

MapReduce原理理解:

看下图图示即可明白整体流程

《Java代码模拟MapReduce中环形缓冲区的读写过程》

点击图片可看高清大图

原型:

在mapreduce中,map task调用map处理逻辑将处理后的key和value利用outputcollector.collect()放入一个环形缓冲区中,那么这个缓冲区是有一定大小的,那么如果放入的内容很多很多的时候怎么办呢?其实hadoop里面有这么个机制,在缓冲区达到某一个值或者比率的时候,比如80%,那么hadoop会利用Spiller.spill()将这个80%的数据读出来按照HashPartitioner.getPartitioner()来进行分区并按照key进行快速排序,然后kv键值对会继续向缓冲区中存放,形成一个环形存储逻辑,只要达到80%就将这块取出来,那么缓冲区就会一直可用。

模拟实现:

定义了一个容量为100的字符串数组来模拟环形缓冲区。另外还定义了一个size用来决定原型中比率,默认值给的80.start用来记录这个80内容的起始位置,一个布尔类型的参数为true时意味着缓冲区中已经超过80,需要开始进行Spiller.spill()。

程序主要是两个进程,第一个进程是不断地将源文件按行读取,存入数组中,当数据达到80的时候,将布尔类型标记置为true,并设置起始位置,不需要考虑其他情况,当达到100的时候,会继续往数组中写入数据,从0开始,不断循环,知道文件读取完毕。

第二个进程会不断去验证标记位的值,如果为true,就会获得当前起始位置,将标记位置为false,然后遍历这80的数据,存入到某个编号文件中,读取完毕后会再次去嗅探标记位。

代码实现:

package com.season.spill;
 
import .io.BufferedReader;
import .io.BufferedWriter;
import .io.File;
import .io.FileInputStream;
import .io.FileOutputStream;
import .io.InputStreamReader;
import .io.OutputStreamWriter;
 
public class FileJobSubmitter {
 
	public static final String FILE_READ ="F:\\io\\BufferInput\\season.txt";
	
	public static String[] CIRCLE_BUFFER = new String[100];
 
	public static int BUFFER_SIZE = 80;
 
	public static int BUFFER_START = 0;
 
	public static int BUFFER_COUNTS = 0;
	
	public static boolean READ_FLAG = false;
 
	public static int FILE_ID = 1000;
 
	public static void main(String[] args) {
 
		ReadFileToCircleBuffer rf = new ReadFileToCircleBuffer();
		Thread rfThread = new Thread(rf);
 
		WriteFileFromBuffer wf = new WriteFileFromBuffer();
		Thread wfThread = new Thread(wf);
 
		rfThread.start();
		wfThread.start();
	}
 
	static class ReadFileToCircleBuffer implements Runnable {
 
		@Override
		public void run() {
			try {
				BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(FILE_READ))));
				String lenTxt = null;
				int i = 0 ;
				while((lenTxt = br.readLine()) != null ){
					
					CIRCLE_BUFFER[i] = lenTxt;
					
					BUFFER_COUNTS++;
					if(BUFFER_COUNTS == 80){
						BUFFER_START = i < 79 ? 20 + i : i-79 ;
						READ_FLAG = true ;
						BUFFER_COUNTS = 0 ;
					}
					System.out.println("内容:"+lenTxt+" flag="+ READ_FLAG + " start="+ BUFFER_START+" 总数=" + BUFFER_COUNTS+" i = "+ i);
					
					i = i==99?0:i+1; 
					
					
					Thread.sleep(1000);
				}
				BUFFER_START = i < BUFFER_COUNTS -1 ? 100 - BUFFER_COUNTS  + i : i- BUFFER_COUNTS + 1 ;
				READ_FLAG = true ;
				BUFFER_SIZE = BUFFER_COUNTS;
				
				
				br.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
 
	}
 
	static class WriteFileFromBuffer implements Runnable {
 
		@Override
		public void run() {
 
			while (true) {
				try {
					if (READ_FLAG) {
						READ_FLAG = false;
						BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File("F:\\io\\BufferOutput\\part-" + FILE_ID + "size80.txt"))));
						// 将起点开始的80个记录存入文件中
						int start = BUFFER_START;
						for (int i = 0; i < BUFFER_SIZE; i++) {
							System.out.println( "*******存入:"+CIRCLE_BUFFER[start]);
							String temp = CIRCLE_BUFFER[start];
							
							bw.write(temp);
							bw.newLine();
							Thread.sleep(500);
							start = start==99?0:start+1;
						}
						bw.close();
						FILE_ID++;
					}
 
					Thread.sleep(1000);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
 
		}
 
	}
 
}

总结:

其中有些输出是为了查看当前的进度来验证程序的准确性。经过这次小小的练习,收获颇丰,又将一些很少用到的基本知识复习了一般,不乏有长时间不用忘记的,其实,应该多拿出一些时间来实现某些简单的小程序巩固自己的基础知识,拓实自己。

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注